BVB Source Codes

uWebSockets Show WebSocket.cpp Source code

Return Download uWebSockets: download WebSocket.cpp Source code - Download uWebSockets Source code - Type:.cpp
  1. #include "WebSocket.h"
  2. #include "Group.h"
  3. #include "Hub.h"
  4.  
  5. namespace uWS {
  6.  
  7. /*
  8.  * Frames and sends a WebSocket message.
  9.  *
  10.  * Hints: Consider using any of the prepare function if any of their
  11.  * use cases match what you are trying to achieve (pub/sub, broadcast)
  12.  *
  13.  * Thread safe
  14.  *
  15.  */
  16. template <bool isServer>
  17. void WebSocket<isServer>::send(const char *message, size_t length, OpCode opCode, void(*callback)(WebSocket<isServer> *webSocket, void *data, bool cancelled, void *reserved), void *callbackData) {
  18.  
  19. #ifdef UWS_THREADSAFE
  20.     std::lock_guard<std::recursive_mutex> lockGuard(*nodeData->asyncMutex);
  21.     if (isClosed()) {
  22.         if (callback) {
  23.             callback(this, callbackData, true, nullptr);
  24.         }
  25.         return;
  26.     }
  27. #endif
  28.  
  29.     const int HEADER_LENGTH = WebSocketProtocol<!isServer, WebSocket<!isServer>>::LONG_MESSAGE_HEADER;
  30.  
  31.     struct TransformData {
  32.         OpCode opCode;
  33.     } transformData = {opCode};
  34.  
  35.     struct WebSocketTransformer {
  36.         static size_t estimate(const char *data, size_t length) {
  37.             return length + HEADER_LENGTH;
  38.         }
  39.  
  40.         static size_t transform(const char *src, char *dst, size_t length, TransformData transformData) {
  41.             return WebSocketProtocol<isServer, WebSocket<isServer>>::formatMessage(dst, src, length, transformData.opCode, length, false);
  42.         }
  43.     };
  44.  
  45.     sendTransformed<WebSocketTransformer>((char *) message, length, (void(*)(void *, void *, bool, void *)) callback, callbackData, transformData);
  46. }
  47.  
  48. /*
  49.  * Prepares a single message for use with sendPrepared.
  50.  *
  51.  * Hints: Useful in cases where you need to send the same message to many
  52.  * recipients. Do not use when only sending one message.
  53.  *
  54.  * Thread safe
  55.  *
  56.  */
  57. template <bool isServer>
  58. typename WebSocket<isServer>::PreparedMessage *WebSocket<isServer>::prepareMessage(char *data, size_t length, OpCode opCode, bool compressed, void(*callback)(WebSocket<isServer> *webSocket, void *data, bool cancelled, void *reserved)) {
  59.     PreparedMessage *preparedMessage = new PreparedMessage;
  60.     preparedMessage->buffer = new char[length + 10];
  61.     preparedMessage->length = WebSocketProtocol<isServer, WebSocket<isServer>>::formatMessage(preparedMessage->buffer, data, length, opCode, length, compressed);
  62.     preparedMessage->references = 1;
  63.     preparedMessage->callback = (void(*)(void *, void *, bool, void *)) callback;
  64.     return preparedMessage;
  65. }
  66.  
  67. /*
  68.  * Prepares a batch of messages to send as one single TCP packet / syscall.
  69.  *
  70.  * Hints: Useful when doing pub/sub-like broadcasts where many recipients should receive many
  71.  * messages. Do not use if only sending one message.
  72.  *
  73.  * Thread safe
  74.  *
  75.  */
  76. template <bool isServer>
  77. typename WebSocket<isServer>::PreparedMessage *WebSocket<isServer>::prepareMessageBatch(std::vector<std::string> &messages, std::vector<int> &excludedMessages, OpCode opCode, bool compressed, void (*callback)(WebSocket<isServer> *, void *, bool, void *))
  78. {
  79.     // should be sent in!
  80.     size_t batchLength = 0;
  81.     for (size_t i = 0; i < messages.size(); i++) {
  82.         batchLength += messages[i].length();
  83.     }
  84.  
  85.     PreparedMessage *preparedMessage = new PreparedMessage;
  86.     preparedMessage->buffer = new char[batchLength + 10 * messages.size()];
  87.  
  88.     int offset = 0;
  89.     for (size_t i = 0; i < messages.size(); i++) {
  90.         offset += WebSocketProtocol<isServer, WebSocket<isServer>>::formatMessage(preparedMessage->buffer + offset, messages[i].data(), messages[i].length(), opCode, messages[i].length(), compressed);
  91.     }
  92.     preparedMessage->length = offset;
  93.     preparedMessage->references = 1;
  94.     preparedMessage->callback = (void(*)(void *, void *, bool, void *)) callback;
  95.     return preparedMessage;
  96. }
  97.  
  98. /*
  99.  * Sends a prepared message.
  100.  *
  101.  * Hints: Used to improve broadcasting and similar use cases where the same
  102.  * message is sent to multiple recipients. Do not used if only sending one message
  103.  * in total.
  104.  *
  105.  * Warning: Modifies passed PreparedMessage and is thus not thread safe. Other
  106.  * data is also modified and it makes sense to not make this function thread-safe
  107.  * since it is a central part in broadcasting and other high-perf code paths.
  108.  *
  109.  */
  110. template <bool isServer>
  111. void WebSocket<isServer>::sendPrepared(typename WebSocket<isServer>::PreparedMessage *preparedMessage, void *callbackData) {
  112.     // todo: see if this can be made a transformer instead
  113.     preparedMessage->references++;
  114.     void (*callback)(void *webSocket, void *userData, bool cancelled, void *reserved) = [](void *webSocket, void *userData, bool cancelled, void *reserved) {
  115.         PreparedMessage *preparedMessage = (PreparedMessage *) userData;
  116.         bool lastReference = !--preparedMessage->references;
  117.  
  118.         if (preparedMessage->callback) {
  119.             preparedMessage->callback(webSocket, reserved, cancelled, (void *) lastReference);
  120.         }
  121.  
  122.         if (lastReference) {
  123.             delete [] preparedMessage->buffer;
  124.             delete preparedMessage;
  125.         }
  126.     };
  127.  
  128.     // candidate for fixed size pool allocator
  129.     int memoryLength = sizeof(Queue::Message);
  130.     int memoryIndex = nodeData->getMemoryBlockIndex(memoryLength);
  131.  
  132.     Queue::Message *messagePtr = (Queue::Message *) nodeData->getSmallMemoryBlock(memoryIndex);
  133.     messagePtr->data = preparedMessage->buffer;
  134.     messagePtr->length = preparedMessage->length;
  135.  
  136.     bool wasTransferred;
  137.     if (write(messagePtr, wasTransferred)) {
  138.         if (!wasTransferred) {
  139.             nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex);
  140.             if (callback) {
  141.                 callback(this, preparedMessage, false, callbackData);
  142.             }
  143.         } else {
  144.             messagePtr->callback = callback;
  145.             messagePtr->callbackData = preparedMessage;
  146.             messagePtr->reserved = callbackData;
  147.         }
  148.     } else {
  149.         nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex);
  150.         if (callback) {
  151.             callback(this, preparedMessage, true, callbackData);
  152.         }
  153.     }
  154. }
  155.  
  156. /*
  157.  * Decrements the reference count of passed PreparedMessage. On zero references
  158.  * the memory will be deleted.
  159.  *
  160.  * Hints: Used together with prepareMessage, prepareMessageBatch and similar calls.
  161.  *
  162.  * Warning: Will modify passed PrepareMessage and is thus not thread safe by itself.
  163.  *
  164.  */
  165. template <bool isServer>
  166. void WebSocket<isServer>::finalizeMessage(typename WebSocket<isServer>::PreparedMessage *preparedMessage) {
  167.     if (!--preparedMessage->references) {
  168.         delete [] preparedMessage->buffer;
  169.         delete preparedMessage;
  170.     }
  171. }
  172.  
  173. template <bool isServer>
  174. uS::Socket *WebSocket<isServer>::onData(uS::Socket *s, char *data, size_t length) {
  175.     WebSocket<isServer> *webSocket = static_cast<WebSocket<isServer> *>(s);
  176.  
  177.     webSocket->hasOutstandingPong = false;
  178.     if (!webSocket->isShuttingDown()) {
  179.         webSocket->cork(true);
  180.         WebSocketProtocol<isServer, WebSocket<isServer>>::consume(data, length, webSocket);
  181.         if (!webSocket->isClosed()) {
  182.             webSocket->cork(false);
  183.         }
  184.     }
  185.  
  186.     return webSocket;
  187. }
  188.  
  189. /*
  190.  * Immediately terminates this WebSocket. Will call onDisconnection of its Group.
  191.  *
  192.  * Hints: Close code will be 1006 and message will be empty.
  193.  *
  194.  */
  195. template <bool isServer>
  196. void WebSocket<isServer>::terminate() {
  197.  
  198. #ifdef UWS_THREADSAFE
  199.     std::lock_guard<std::recursive_mutex> lockGuard(*nodeData->asyncMutex);
  200.     if (isClosed()) {
  201.         return;
  202.     }
  203. #endif
  204.  
  205.     WebSocket<isServer>::onEnd(this);
  206. }
  207.  
  208. /*
  209.  * Transfers this WebSocket from its current Group to specified Group.
  210.  *
  211.  * Receiving Group has to have called listen(uWS::TRANSFERS) prior.
  212.  *
  213.  * Hints: Useful to implement subprotocols on the same thread and Loop
  214.  * or to transfer WebSockets between threads at any point (dynamic load balancing).
  215.  *
  216.  * Warning: From the point of call to the point of onTransfer, this WebSocket
  217.  * is invalid and cannot be used. What you put in is not guaranteed to be what you
  218.  * get in onTransfer, the only guaranteed consistency is passed userData is the userData
  219.  * of given WebSocket in onTransfer. Use setUserData and getUserData to identify the WebSocket.
  220.  */
  221. template <bool isServer>
  222. void WebSocket<isServer>::transfer(Group<isServer> *group) {
  223.     Group<isServer>::from(this)->removeWebSocket(this);
  224.     if (group->loop == Group<isServer>::from(this)->loop) {
  225.         // fast path
  226.         this->nodeData = group;
  227.         Group<isServer>::from(this)->addWebSocket(this);
  228.         Group<isServer>::from(this)->transferHandler(this);
  229.     } else {
  230.         // slow path
  231.         uS::Socket::transfer((uS::NodeData *) group, [](Poll *p) {
  232.             WebSocket<isServer> *webSocket = (WebSocket<isServer> *) p;
  233.             Group<isServer>::from(webSocket)->addWebSocket(webSocket);
  234.             Group<isServer>::from(webSocket)->transferHandler(webSocket);
  235.         });
  236.     }
  237. }
  238.  
  239. /*
  240.  * Immediately calls onDisconnection of its Group and begins a passive
  241.  * WebSocket closedown handshake in the background (might succeed or not,
  242.  * we don't care).
  243.  *
  244.  * Hints: Close code and message will be what you pass yourself.
  245.  *
  246.  */
  247. template <bool isServer>
  248. void WebSocket<isServer>::close(int code, const char *message, size_t length) {
  249.  
  250.     // startTimeout is not thread safe
  251.  
  252.     static const int MAX_CLOSE_PAYLOAD = 123;
  253.     length = std::min<size_t>(MAX_CLOSE_PAYLOAD, length);
  254.     Group<isServer>::from(this)->removeWebSocket(this);
  255.     Group<isServer>::from(this)->disconnectionHandler(this, code, (char *) message, length);
  256.     setShuttingDown(true);
  257.  
  258.     // todo: using the shared timer in the group, we can skip creating a new timer per socket
  259.     // only this line and the one in Hub::connect uses the timeout feature
  260.     startTimeout<WebSocket<isServer>::onEnd>();
  261.  
  262.     char closePayload[MAX_CLOSE_PAYLOAD + 2];
  263.     int closePayloadLength = WebSocketProtocol<isServer, WebSocket<isServer>>::formatClosePayload(closePayload, code, message, length);
  264.     send(closePayload, closePayloadLength, OpCode::CLOSE, [](WebSocket<isServer> *p, void *data, bool cancelled, void *reserved) {
  265.         if (!cancelled) {
  266.             p->shutdown();
  267.         }
  268.     });
  269. }
  270.  
  271. template <bool isServer>
  272. void WebSocket<isServer>::onEnd(uS::Socket *s) {
  273.     WebSocket<isServer> *webSocket = static_cast<WebSocket<isServer> *>(s);
  274.  
  275.     if (!webSocket->isShuttingDown()) {
  276.         Group<isServer>::from(webSocket)->removeWebSocket(webSocket);
  277.         Group<isServer>::from(webSocket)->disconnectionHandler(webSocket, 1006, nullptr, 0);
  278.     } else {
  279.         webSocket->cancelTimeout();
  280.     }
  281.  
  282.     webSocket->template closeSocket<WebSocket<isServer>>();
  283.  
  284.     while (!webSocket->messageQueue.empty()) {
  285.         Queue::Message *message = webSocket->messageQueue.front();
  286.         if (message->callback) {
  287.             message->callback(nullptr, message->callbackData, true, nullptr);
  288.         }
  289.         webSocket->messageQueue.pop();
  290.     }
  291.  
  292.     webSocket->nodeData->clearPendingPollChanges(webSocket);
  293. }
  294.  
  295. template <bool isServer>
  296. bool WebSocket<isServer>::handleFragment(char *data, size_t length, unsigned int remainingBytes, int opCode, bool fin, WebSocketState<isServer> *webSocketState) {
  297.     WebSocket<isServer> *webSocket = static_cast<WebSocket<isServer> *>(webSocketState);
  298.     Group<isServer> *group = Group<isServer>::from(webSocket);
  299.  
  300.     if (opCode < 3) {
  301.         if (!remainingBytes && fin && !webSocket->fragmentBuffer.length()) {
  302.             if (webSocket->compressionStatus == WebSocket<isServer>::CompressionStatus::COMPRESSED_FRAME) {
  303.                     webSocket->compressionStatus = WebSocket<isServer>::CompressionStatus::ENABLED;
  304.                     data = group->hub->inflate(data, length, group->maxPayload);
  305.                     if (!data) {
  306.                         forceClose(webSocketState);
  307.                         return true;
  308.                     }
  309.             }
  310.  
  311.             if (opCode == 1 && !WebSocketProtocol<isServer, WebSocket<isServer>>::isValidUtf8((unsigned char *) data, length)) {
  312.                 forceClose(webSocketState);
  313.                 return true;
  314.             }
  315.  
  316.             group->messageHandler(webSocket, data, length, (OpCode) opCode);
  317.             if (webSocket->isClosed() || webSocket->isShuttingDown()) {
  318.                 return true;
  319.             }
  320.         } else {
  321.             webSocket->fragmentBuffer.append(data, length);
  322.             if (!remainingBytes && fin) {
  323.                 length = webSocket->fragmentBuffer.length();
  324.                 if (webSocket->compressionStatus == WebSocket<isServer>::CompressionStatus::COMPRESSED_FRAME) {
  325.                         webSocket->compressionStatus = WebSocket<isServer>::CompressionStatus::ENABLED;
  326.                         webSocket->fragmentBuffer.append("....");
  327.                         data = group->hub->inflate((char *) webSocket->fragmentBuffer.data(), length, group->maxPayload);
  328.                         if (!data) {
  329.                             forceClose(webSocketState);
  330.                             return true;
  331.                         }
  332.                 } else {
  333.                     data = (char *) webSocket->fragmentBuffer.data();
  334.                 }
  335.  
  336.                 if (opCode == 1 && !WebSocketProtocol<isServer, WebSocket<isServer>>::isValidUtf8((unsigned char *) data, length)) {
  337.                     forceClose(webSocketState);
  338.                     return true;
  339.                 }
  340.  
  341.                 group->messageHandler(webSocket, data, length, (OpCode) opCode);
  342.                 if (webSocket->isClosed() || webSocket->isShuttingDown()) {
  343.                     return true;
  344.                 }
  345.                 webSocket->fragmentBuffer.clear();
  346.             }
  347.         }
  348.     } else {
  349.         if (!remainingBytes && fin && !webSocket->controlTipLength) {
  350.             if (opCode == CLOSE) {
  351.                 typename WebSocketProtocol<isServer, WebSocket<isServer>>::CloseFrame closeFrame = WebSocketProtocol<isServer, WebSocket<isServer>>::parseClosePayload(data, length);
  352.                 webSocket->close(closeFrame.code, closeFrame.message, closeFrame.length);
  353.                 return true;
  354.             } else {
  355.                 if (opCode == PING) {
  356.                     webSocket->send(data, length, (OpCode) OpCode::PONG);
  357.                     group->pingHandler(webSocket, data, length);
  358.                     if (webSocket->isClosed() || webSocket->isShuttingDown()) {
  359.                         return true;
  360.                     }
  361.                 } else if (opCode == PONG) {
  362.                     group->pongHandler(webSocket, data, length);
  363.                     if (webSocket->isClosed() || webSocket->isShuttingDown()) {
  364.                         return true;
  365.                     }
  366.                 }
  367.             }
  368.         } else {
  369.             webSocket->fragmentBuffer.append(data, length);
  370.             webSocket->controlTipLength += length;
  371.  
  372.             if (!remainingBytes && fin) {
  373.                 char *controlBuffer = (char *) webSocket->fragmentBuffer.data() + webSocket->fragmentBuffer.length() - webSocket->controlTipLength;
  374.                 if (opCode == CLOSE) {
  375.                     typename WebSocketProtocol<isServer, WebSocket<isServer>>::CloseFrame closeFrame = WebSocketProtocol<isServer, WebSocket<isServer>>::parseClosePayload(controlBuffer, webSocket->controlTipLength);
  376.                     webSocket->close(closeFrame.code, closeFrame.message, closeFrame.length);
  377.                     return true;
  378.                 } else {
  379.                     if (opCode == PING) {
  380.                         webSocket->send(controlBuffer, webSocket->controlTipLength, (OpCode) OpCode::PONG);
  381.                         group->pingHandler(webSocket, controlBuffer, webSocket->controlTipLength);
  382.                         if (webSocket->isClosed() || webSocket->isShuttingDown()) {
  383.                             return true;
  384.                         }
  385.                     } else if (opCode == PONG) {
  386.                         group->pongHandler(webSocket, controlBuffer, webSocket->controlTipLength);
  387.                         if (webSocket->isClosed() || webSocket->isShuttingDown()) {
  388.                             return true;
  389.                         }
  390.                     }
  391.                 }
  392.  
  393.                 webSocket->fragmentBuffer.resize(webSocket->fragmentBuffer.length() - webSocket->controlTipLength);
  394.                 webSocket->controlTipLength = 0;
  395.             }
  396.         }
  397.     }
  398.  
  399.     return false;
  400. }
  401.  
  402. template struct WebSocket<SERVER>;
  403. template struct WebSocket<CLIENT>;
  404.  
  405. }
  406.  
downloadWebSocket.cpp Source code - Download uWebSockets Source code
Related Source Codes/Software:
realworld - TodoMVC for the RealWorld - Exemplary fullstack Me... 2017-06-11
goreplay - GoReplay is an open-source tool for capturing and ... 2017-06-10
pyenv - Simple Python version management 2017-06-10
redux-saga - An alternative side effect model for Redux apps ... 2017-06-10
angular-starter - 2017-06-10
rkt - rkt is a pod-native container engine for Linux. It... 2017-06-11
reactide - Reactide is the first dedicated IDE for React web ... 2017-06-11
postal - 2017-06-11
CRYENGINE - CRYENGINE is a powerful real-time game development... 2017-06-11
uWebSockets - Tiny WebSockets https://for... 2017-06-11

 Back to top