BVB Source Codes

uWebSockets Show Socket.h Source code

Return Download uWebSockets: download Socket.h Source code - Download uWebSockets Source code - Type:.h
  1. #ifndef SOCKET_UWS_H
  2. #define SOCKET_UWS_H
  3.  
  4. #include "Networking.h"
  5.  
  6. namespace uS {
  7.  
  8. struct TransferData {
  9.     // Connection state
  10.     uv_os_sock_t fd;
  11.     SSL *ssl;
  12.  
  13.     // Poll state
  14.     void (*pollCb)(Poll *, int, int);
  15.     int pollEvents;
  16.  
  17.     // User state
  18.     void *userData;
  19.  
  20.     // Destination
  21.     NodeData *destination;
  22.     void (*transferCb)(Poll *);
  23. };
  24.  
  25. // perfectly 64 bytes (4 + 60)
  26. struct WIN32_EXPORT Socket : Poll {
  27. protected:
  28.     struct {
  29.         int poll : 4;
  30.         int shuttingDown : 4;
  31.     } state = {0, false};
  32.  
  33.     SSL *ssl;
  34.     void *user = nullptr;
  35.     NodeData *nodeData;
  36.  
  37.     // this is not needed by HttpSocket!
  38.     struct Queue {
  39.         struct Message {
  40.             const char *data;
  41.             size_t length;
  42.             Message *nextMessage = nullptr;
  43.             void (*callback)(void *socket, void *data, bool cancelled, void *reserved) = nullptr;
  44.             void *callbackData = nullptr, *reserved = nullptr;
  45.         };
  46.  
  47.         Message *head = nullptr, *tail = nullptr;
  48.         void pop()
  49.         {
  50.             Message *nextMessage;
  51.             if ((nextMessage = head->nextMessage)) {
  52.                 delete [] (char *) head;
  53.                 head = nextMessage;
  54.             } else {
  55.                 delete [] (char *) head;
  56.                 head = tail = nullptr;
  57.             }
  58.         }
  59.  
  60.         bool empty() {return head == nullptr;}
  61.         Message *front() {return head;}
  62.  
  63.         void push(Message *message)
  64.         {
  65.             message->nextMessage = nullptr;
  66.             if (tail) {
  67.                 tail->nextMessage = message;
  68.                 tail = message;
  69.             } else {
  70.                 head = message;
  71.                 tail = message;
  72.             }
  73.         }
  74.     } messageQueue;
  75.  
  76.     int getPoll() {
  77.         return state.poll;
  78.     }
  79.  
  80.     int setPoll(int poll) {
  81.         state.poll = poll;
  82.         return poll;
  83.     }
  84.  
  85.     void setShuttingDown(bool shuttingDown) {
  86.         state.shuttingDown = shuttingDown;
  87.     }
  88.  
  89.     void transfer(NodeData *nodeData, void (*cb)(Poll *)) {
  90.         // userData is invalid from now on till onTransfer
  91.         setUserData(new TransferData({getFd(), ssl, getCb(), getPoll(), getUserData(), nodeData, cb}));
  92.         stop(this->nodeData->loop);
  93.         close(this->nodeData->loop, [](Poll *p) {
  94.             Socket *s = (Socket *) p;
  95.             TransferData *transferData = (TransferData *) s->getUserData();
  96.  
  97.             transferData->destination->asyncMutex->lock();
  98.             bool wasEmpty = transferData->destination->transferQueue.empty();
  99.             transferData->destination->transferQueue.push_back(s);
  100.             transferData->destination->asyncMutex->unlock();
  101.  
  102.             if (wasEmpty) {
  103.                 transferData->destination->async->send();
  104.             }
  105.         });
  106.     }
  107.  
  108.     void changePoll(Socket *socket) {
  109.         if (!threadSafeChange(nodeData->loop, this, socket->getPoll())) {
  110.             if (socket->nodeData->tid != pthread_self()) {
  111.                 socket->nodeData->asyncMutex->lock();
  112.                 socket->nodeData->changePollQueue.push_back(socket);
  113.                 socket->nodeData->asyncMutex->unlock();
  114.                 socket->nodeData->async->send();
  115.             } else {
  116.                 change(socket->nodeData->loop, socket, socket->getPoll());
  117.             }
  118.         }
  119.     }
  120.  
  121.     // clears user data!
  122.     template <void onTimeout(Socket *)>
  123.     void startTimeout(int timeoutMs = 15000) {
  124.         Timer *timer = new Timer(nodeData->loop);
  125.         timer->setData(this);
  126.         timer->start([](Timer *timer) {
  127.             Socket *s = (Socket *) timer->getData();
  128.             s->cancelTimeout();
  129.             onTimeout(s);
  130.         }, timeoutMs, 0);
  131.  
  132.         user = timer;
  133.     }
  134.  
  135.     void cancelTimeout() {
  136.         Timer *timer = (Timer *) getUserData();
  137.         if (timer) {
  138.             timer->stop();
  139.             timer->close();
  140.             user = nullptr;
  141.         }
  142.     }
  143.  
  144.     template <class STATE>
  145.     static void sslIoHandler(Poll *p, int status, int events) {
  146.         Socket *socket = (Socket *) p;
  147.  
  148.         if (status < 0) {
  149.             STATE::onEnd((Socket *) p);
  150.             return;
  151.         }
  152.  
  153.         if (!socket->messageQueue.empty() && ((events & UV_WRITABLE) || SSL_want(socket->ssl) == SSL_READING)) {
  154.             socket->cork(true);
  155.             while (true) {
  156.                 Queue::Message *messagePtr = socket->messageQueue.front();
  157.                 int sent = SSL_write(socket->ssl, messagePtr->data, messagePtr->length);
  158.                 if (sent == (ssize_t) messagePtr->length) {
  159.                     if (messagePtr->callback) {
  160.                         messagePtr->callback(p, messagePtr->callbackData, false, messagePtr->reserved);
  161.                     }
  162.                     socket->messageQueue.pop();
  163.                     if (socket->messageQueue.empty()) {
  164.                         if ((socket->state.poll & UV_WRITABLE) && SSL_want(socket->ssl) != SSL_WRITING) {
  165.                             socket->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE));
  166.                         }
  167.                         break;
  168.                     }
  169.                 } else if (sent <= 0) {
  170.                     switch (SSL_get_error(socket->ssl, sent)) {
  171.                     case SSL_ERROR_WANT_READ:
  172.                         break;
  173.                     case SSL_ERROR_WANT_WRITE:
  174.                         if ((socket->getPoll() & UV_WRITABLE) == 0) {
  175.                             socket->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE));
  176.                         }
  177.                         break;
  178.                     default:
  179.                         STATE::onEnd((Socket *) p);
  180.                         return;
  181.                     }
  182.                     break;
  183.                 }
  184.             }
  185.             socket->cork(false);
  186.         }
  187.  
  188.         if (events & UV_READABLE) {
  189.             do {
  190.                 int length = SSL_read(socket->ssl, socket->nodeData->recvBuffer, socket->nodeData->recvLength);
  191.                 if (length <= 0) {
  192.                     switch (SSL_get_error(socket->ssl, length)) {
  193.                     case SSL_ERROR_WANT_READ:
  194.                         break;
  195.                     case SSL_ERROR_WANT_WRITE:
  196.                         if ((socket->getPoll() & UV_WRITABLE) == 0) {
  197.                             socket->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE));
  198.                         }
  199.                         break;
  200.                     default:
  201.                         STATE::onEnd((Socket *) p);
  202.                         return;
  203.                     }
  204.                     break;
  205.                 } else {
  206.                     // Warning: onData can delete the socket! Happens when HttpSocket upgrades
  207.                     socket = STATE::onData((Socket *) p, socket->nodeData->recvBuffer, length);
  208.                     if (socket->isClosed() || socket->isShuttingDown()) {
  209.                         return;
  210.                     }
  211.                 }
  212.             } while (SSL_pending(socket->ssl));
  213.         }
  214.     }
  215.  
  216.     template <class STATE>
  217.     static void ioHandler(Poll *p, int status, int events) {
  218.         Socket *socket = (Socket *) p;
  219.         NodeData *nodeData = socket->nodeData;
  220.         Context *netContext = nodeData->netContext;
  221.  
  222.         if (status < 0) {
  223.             STATE::onEnd((Socket *) p);
  224.             return;
  225.         }
  226.  
  227.         if (events & UV_WRITABLE) {
  228.             if (!socket->messageQueue.empty() && (events & UV_WRITABLE)) {
  229.                 socket->cork(true);
  230.                 while (true) {
  231.                     Queue::Message *messagePtr = socket->messageQueue.front();
  232.                     ssize_t sent = ::send(socket->getFd(), messagePtr->data, messagePtr->length, MSG_NOSIGNAL);
  233.                     if (sent == (ssize_t) messagePtr->length) {
  234.                         if (messagePtr->callback) {
  235.                             messagePtr->callback(p, messagePtr->callbackData, false, messagePtr->reserved);
  236.                         }
  237.                         socket->messageQueue.pop();
  238.                         if (socket->messageQueue.empty()) {
  239.                             // todo, remove bit, don't set directly
  240.                             socket->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE));
  241.                             break;
  242.                         }
  243.                     } else if (sent == SOCKET_ERROR) {
  244.                         if (!netContext->wouldBlock()) {
  245.                             STATE::onEnd((Socket *) p);
  246.                             return;
  247.                         }
  248.                         break;
  249.                     } else {
  250.                         messagePtr->length -= sent;
  251.                         messagePtr->data += sent;
  252.                         break;
  253.                     }
  254.                 }
  255.                 socket->cork(false);
  256.             }
  257.         }
  258.  
  259.         if (events & UV_READABLE) {
  260.             int length = recv(socket->getFd(), nodeData->recvBuffer, nodeData->recvLength, 0);
  261.             if (length > 0) {
  262.                 STATE::onData((Socket *) p, nodeData->recvBuffer, length);
  263.             } else if (length <= 0 || (length == SOCKET_ERROR && !netContext->wouldBlock())) {
  264.                 STATE::onEnd((Socket *) p);
  265.             }
  266.         }
  267.  
  268.     }
  269.  
  270.     template<class STATE>
  271.     void setState() {
  272.         if (ssl) {
  273.             setCb(sslIoHandler<STATE>);
  274.         } else {
  275.             setCb(ioHandler<STATE>);
  276.         }
  277.     }
  278.  
  279.     bool hasEmptyQueue() {
  280.         return messageQueue.empty();
  281.     }
  282.  
  283.     void enqueue(Queue::Message *message) {
  284.         messageQueue.push(message);
  285.     }
  286.  
  287.     Queue::Message *allocMessage(size_t length, const char *data = 0) {
  288.         Queue::Message *messagePtr = (Queue::Message *) new char[sizeof(Queue::Message) + length];
  289.         messagePtr->length = length;
  290.         messagePtr->data = ((char *) messagePtr) + sizeof(Queue::Message);
  291.         messagePtr->nextMessage = nullptr;
  292.  
  293.         if (data) {
  294.             memcpy((char *) messagePtr->data, data, messagePtr->length);
  295.         }
  296.  
  297.         return messagePtr;
  298.     }
  299.  
  300.     void freeMessage(Queue::Message *message) {
  301.         delete [] (char *) message;
  302.     }
  303.  
  304.     bool write(Queue::Message *message, bool &wasTransferred) {
  305.         ssize_t sent = 0;
  306.         if (messageQueue.empty()) {
  307.  
  308.             if (ssl) {
  309.                 sent = SSL_write(ssl, message->data, message->length);
  310.                 if (sent == (ssize_t) message->length) {
  311.                     wasTransferred = false;
  312.                     return true;
  313.                 } else if (sent < 0) {
  314.                     switch (SSL_get_error(ssl, sent)) {
  315.                     case SSL_ERROR_WANT_READ:
  316.                         break;
  317.                     case SSL_ERROR_WANT_WRITE:
  318.                         if ((getPoll() & UV_WRITABLE) == 0) {
  319.                             setPoll(getPoll() | UV_WRITABLE);
  320.                             changePoll(this);
  321.                         }
  322.                         break;
  323.                     default:
  324.                         return false;
  325.                     }
  326.                 }
  327.             } else {
  328.                 sent = ::send(getFd(), message->data, message->length, MSG_NOSIGNAL);
  329.                 if (sent == (ssize_t) message->length) {
  330.                     wasTransferred = false;
  331.                     return true;
  332.                 } else if (sent == SOCKET_ERROR) {
  333.                     if (!nodeData->netContext->wouldBlock()) {
  334.                         return false;
  335.                     }
  336.                 } else {
  337.                     message->length -= sent;
  338.                     message->data += sent;
  339.                 }
  340.  
  341.                 if ((getPoll() & UV_WRITABLE) == 0) {
  342.                     setPoll(getPoll() | UV_WRITABLE);
  343.                     changePoll(this);
  344.                 }
  345.             }
  346.         }
  347.         messageQueue.push(message);
  348.         wasTransferred = true;
  349.         return true;
  350.     }
  351.  
  352.     template <class T, class D>
  353.     void sendTransformed(const char *message, size_t length, void(*callback)(void *socket, void *data, bool cancelled, void *reserved), void *callbackData, D transformData) {
  354.         size_t estimatedLength = T::estimate(message, length) + sizeof(Queue::Message);
  355.  
  356.         if (hasEmptyQueue()) {
  357.             if (estimatedLength <= uS::NodeData::preAllocMaxSize) {
  358.                 int memoryLength = estimatedLength;
  359.                 int memoryIndex = nodeData->getMemoryBlockIndex(memoryLength);
  360.  
  361.                 Queue::Message *messagePtr = (Queue::Message *) nodeData->getSmallMemoryBlock(memoryIndex);
  362.                 messagePtr->data = ((char *) messagePtr) + sizeof(Queue::Message);
  363.                 messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData);
  364.  
  365.                 bool wasTransferred;
  366.                 if (write(messagePtr, wasTransferred)) {
  367.                     if (!wasTransferred) {
  368.                         nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex);
  369.                         if (callback) {
  370.                             callback(this, callbackData, false, nullptr);
  371.                         }
  372.                     } else {
  373.                         messagePtr->callback = callback;
  374.                         messagePtr->callbackData = callbackData;
  375.                     }
  376.                 } else {
  377.                     nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex);
  378.                     if (callback) {
  379.                         callback(this, callbackData, true, nullptr);
  380.                     }
  381.                 }
  382.             } else {
  383.                 Queue::Message *messagePtr = allocMessage(estimatedLength - sizeof(Queue::Message));
  384.                 messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData);
  385.  
  386.                 bool wasTransferred;
  387.                 if (write(messagePtr, wasTransferred)) {
  388.                     if (!wasTransferred) {
  389.                         freeMessage(messagePtr);
  390.                         if (callback) {
  391.                             callback(this, callbackData, false, nullptr);
  392.                         }
  393.                     } else {
  394.                         messagePtr->callback = callback;
  395.                         messagePtr->callbackData = callbackData;
  396.                     }
  397.                 } else {
  398.                     freeMessage(messagePtr);
  399.                     if (callback) {
  400.                         callback(this, callbackData, true, nullptr);
  401.                     }
  402.                 }
  403.             }
  404.         } else {
  405.             Queue::Message *messagePtr = allocMessage(estimatedLength - sizeof(Queue::Message));
  406.             messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData);
  407.             messagePtr->callback = callback;
  408.             messagePtr->callbackData = callbackData;
  409.             enqueue(messagePtr);
  410.         }
  411.     }
  412.  
  413. public:
  414.     Socket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Poll(loop, fd), ssl(ssl), nodeData(nodeData) {
  415.         if (ssl) {
  416.             // OpenSSL treats SOCKETs as int
  417.             SSL_set_fd(ssl, (int) fd);
  418.             SSL_set_mode(ssl, SSL_MODE_RELEASE_BUFFERS);
  419.         }
  420.     }
  421.  
  422.     NodeData *getNodeData() {
  423.         return nodeData;
  424.     }
  425.  
  426.     Poll *next = nullptr, *prev = nullptr;
  427.  
  428.     void *getUserData() {
  429.         return user;
  430.     }
  431.  
  432.     void setUserData(void *user) {
  433.         this->user = user;
  434.     }
  435.  
  436.     struct Address {
  437.         unsigned int port;
  438.         const char *address;
  439.         const char *family;
  440.     };
  441.  
  442.     Address getAddress();
  443.  
  444.     void setNoDelay(int enable) {
  445.         setsockopt(getFd(), IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int));
  446.     }
  447.  
  448.     void cork(int enable) {
  449. #if defined(TCP_CORK)
  450.         // Linux & SmartOS have proper TCP_CORK
  451.         setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, &enable, sizeof(int));
  452. #elif defined(TCP_NOPUSH)
  453.         // Mac OS X & FreeBSD have TCP_NOPUSH
  454.         setsockopt(getFd(), IPPROTO_TCP, TCP_NOPUSH, &enable, sizeof(int));
  455.         if (!enable) {
  456.             // Tested on OS X, FreeBSD situation is unclear
  457.             ::send(getFd(), "", 0, MSG_NOSIGNAL);
  458.         }
  459. #endif
  460.     }
  461.  
  462.     void shutdown() {
  463.         if (ssl) {
  464.             //todo: poll in/out - have the io_cb recall shutdown if failed
  465.             SSL_shutdown(ssl);
  466.         } else {
  467.             ::shutdown(getFd(), SHUT_WR);
  468.         }
  469.     }
  470.  
  471.     template <class T>
  472.     void closeSocket() {
  473.         uv_os_sock_t fd = getFd();
  474.         Context *netContext = nodeData->netContext;
  475.         stop(nodeData->loop);
  476.         netContext->closeSocket(fd);
  477.  
  478.         if (ssl) {
  479.             SSL_free(ssl);
  480.         }
  481.  
  482.         Poll::close(nodeData->loop, [](Poll *p) {
  483.             delete (T *) p;
  484.         });
  485.     }
  486.  
  487.     bool isShuttingDown() {
  488.         return state.shuttingDown;
  489.     }
  490.  
  491.     friend class Node;
  492.     friend struct NodeData;
  493. };
  494.  
  495. struct ListenSocket : Socket {
  496.  
  497.     ListenSocket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Socket(nodeData, loop, fd, ssl) {
  498.  
  499.     }
  500.  
  501.     Timer *timer = nullptr;
  502.     uS::TLS::Context sslContext;
  503. };
  504.  
  505. }
  506.  
  507. #endif // SOCKET_UWS_H
  508.  
downloadSocket.h Source code - Download uWebSockets Source code
Related Source Codes/Software:
realworld - TodoMVC for the RealWorld - Exemplary fullstack Me... 2017-06-11
goreplay - GoReplay is an open-source tool for capturing and ... 2017-06-10
pyenv - Simple Python version management 2017-06-10
redux-saga - An alternative side effect model for Redux apps ... 2017-06-10
angular-starter - 2017-06-10
rkt - rkt is a pod-native container engine for Linux. It... 2017-06-11
reactide - Reactide is the first dedicated IDE for React web ... 2017-06-11
postal - 2017-06-11
CRYENGINE - CRYENGINE is a powerful real-time game development... 2017-06-11
uWebSockets - Tiny WebSockets https://for... 2017-06-11

 Back to top