BVB Source Codes

parse-server Show ParseLiveQueryServer.js Source code

Return Download parse-server: download ParseLiveQueryServer.js Source code - Download parse-server Source code - Type:.js
  1. import tv4 from 'tv4';
  2. import Parse from 'parse/node';
  3. import { Subscription } from './Subscription';
  4. import { Client } from './Client';
  5. import { ParseWebSocketServer } from './ParseWebSocketServer';
  6. import logger from '../logger';
  7. import RequestSchema from './RequestSchema';
  8. import { matchesQuery, queryHash } from './QueryTools';
  9. import { ParsePubSub } from './ParsePubSub';
  10. import { SessionTokenCache } from './SessionTokenCache';
  11. import _ from 'lodash';
  12.  
  13. class ParseLiveQueryServer {
  14.   clientId: number;
  15.   clients: Object;
  16.   // className -> (queryHash -> subscription)
  17.   subscriptions: Object;
  18.   parseWebSocketServer: Object;
  19.   keyPairs : any;
  20.   // The subscriber we use to get object update from publisher
  21.   subscriber: Object;
  22.  
  23.   constructor(server: any, config: any) {
  24.     this.clientId = 0;
  25.     this.clients = new Map();
  26.     this.subscriptions = new Map();
  27.  
  28.     config = config || {};
  29.  
  30.     // Store keys, convert obj to map
  31.     const keyPairs = config.keyPairs || {};
  32.     this.keyPairs = new Map();
  33.     for (const key of Object.keys(keyPairs)) {
  34.       this.keyPairs.set(key, keyPairs[key]);
  35.     }
  36.     logger.verbose('Support key pairs', this.keyPairs);
  37.  
  38.     // Initialize Parse
  39.     Parse.Object.disableSingleInstance();
  40.  
  41.     const serverURL = config.serverURL || Parse.serverURL;
  42.     Parse.serverURL = serverURL;
  43.     const appId = config.appId || Parse.applicationId;
  44.     const javascriptKey = Parse.javaScriptKey;
  45.     const masterKey = config.masterKey || Parse.masterKey;
  46.     Parse.initialize(appId, javascriptKey, masterKey);
  47.  
  48.     // Initialize websocket server
  49.     this.parseWebSocketServer = new ParseWebSocketServer(
  50.       server,
  51.       (parseWebsocket) => this._onConnect(parseWebsocket),
  52.       config.websocketTimeout
  53.     );
  54.  
  55.     // Initialize subscriber
  56.     this.subscriber = ParsePubSub.createSubscriber(config);
  57.     this.subscriber.subscribe(Parse.applicationId + 'afterSave');
  58.     this.subscriber.subscribe(Parse.applicationId + 'afterDelete');
  59.     // Register message handler for subscriber. When publisher get messages, it will publish message
  60.     // to the subscribers and the handler will be called.
  61.     this.subscriber.on('message', (channel, messageStr) => {
  62.       logger.verbose('Subscribe messsage %j', messageStr);
  63.       let message;
  64.       try {
  65.         message = JSON.parse(messageStr);
  66.       } catch(e) {
  67.         logger.error('unable to parse message', messageStr, e);
  68.         return;
  69.       }
  70.       this._inflateParseObject(message);
  71.       if (channel === Parse.applicationId + 'afterSave') {
  72.         this._onAfterSave(message);
  73.       } else if (channel === Parse.applicationId + 'afterDelete') {
  74.         this._onAfterDelete(message);
  75.       } else {
  76.         logger.error('Get message %s from unknown channel %j', message, channel);
  77.       }
  78.     });
  79.  
  80.     // Initialize sessionToken cache
  81.     this.sessionTokenCache = new SessionTokenCache(config.cacheTimeout);
  82.   }
  83.  
  84.   // Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.
  85.   // Message.originalParseObject is the original ParseObject JSON.
  86.   _inflateParseObject(message: any): void {
  87.     // Inflate merged object
  88.     const currentParseObject = message.currentParseObject;
  89.     let className = currentParseObject.className;
  90.     let parseObject = new Parse.Object(className);
  91.     parseObject._finishFetch(currentParseObject);
  92.     message.currentParseObject = parseObject;
  93.     // Inflate original object
  94.     const originalParseObject = message.originalParseObject;
  95.     if (originalParseObject) {
  96.       className = originalParseObject.className;
  97.       parseObject = new Parse.Object(className);
  98.       parseObject._finishFetch(originalParseObject);
  99.       message.originalParseObject = parseObject;
  100.     }
  101.   }
  102.  
  103.   // Message is the JSON object from publisher after inflated. Message.currentParseObject is the ParseObject after changes.
  104.   // Message.originalParseObject is the original ParseObject.
  105.   _onAfterDelete(message: any): void {
  106.     logger.verbose(Parse.applicationId + 'afterDelete is triggered');
  107.  
  108.     const deletedParseObject = message.currentParseObject.toJSON();
  109.     const className = deletedParseObject.className;
  110.     logger.verbose('ClassName: %j | ObjectId: %s', className, deletedParseObject.id);
  111.     logger.verbose('Current client number : %d', this.clients.size);
  112.  
  113.     const classSubscriptions = this.subscriptions.get(className);
  114.     if (typeof classSubscriptions === 'undefined') {
  115.       logger.debug('Can not find subscriptions under this class ' + className);
  116.       return;
  117.     }
  118.     for (const subscription of classSubscriptions.values()) {
  119.       const isSubscriptionMatched = this._matchesSubscription(deletedParseObject, subscription);
  120.       if (!isSubscriptionMatched) {
  121.         continue;
  122.       }
  123.       for (const [clientId, requestIds] of _.entries(subscription.clientRequestIds)) {
  124.         const client = this.clients.get(clientId);
  125.         if (typeof client === 'undefined') {
  126.           continue;
  127.         }
  128.         for (const requestId of requestIds) {
  129.           const acl = message.currentParseObject.getACL();
  130.           // Check ACL
  131.           this._matchesACL(acl, client, requestId).then((isMatched) => {
  132.             if (!isMatched) {
  133.               return null;
  134.             }
  135.             client.pushDelete(requestId, deletedParseObject);
  136.           }, (error) => {
  137.             logger.error('Matching ACL error : ', error);
  138.           });
  139.         }
  140.       }
  141.     }
  142.   }
  143.  
  144.   // Message is the JSON object from publisher after inflated. Message.currentParseObject is the ParseObject after changes.
  145.   // Message.originalParseObject is the original ParseObject.
  146.   _onAfterSave(message: any): void {
  147.     logger.verbose(Parse.applicationId + 'afterSave is triggered');
  148.  
  149.     let originalParseObject = null;
  150.     if (message.originalParseObject) {
  151.       originalParseObject = message.originalParseObject.toJSON();
  152.     }
  153.     const currentParseObject = message.currentParseObject.toJSON();
  154.     const className = currentParseObject.className;
  155.     logger.verbose('ClassName: %s | ObjectId: %s', className, currentParseObject.id);
  156.     logger.verbose('Current client number : %d', this.clients.size);
  157.  
  158.     const classSubscriptions = this.subscriptions.get(className);
  159.     if (typeof classSubscriptions === 'undefined') {
  160.       logger.debug('Can not find subscriptions under this class ' + className);
  161.       return;
  162.     }
  163.     for (const subscription of classSubscriptions.values()) {
  164.       const isOriginalSubscriptionMatched = this._matchesSubscription(originalParseObject, subscription);
  165.       const isCurrentSubscriptionMatched = this._matchesSubscription(currentParseObject, subscription);
  166.       for (const [clientId, requestIds] of _.entries(subscription.clientRequestIds)) {
  167.         const client = this.clients.get(clientId);
  168.         if (typeof client === 'undefined') {
  169.           continue;
  170.         }
  171.         for (const requestId of requestIds) {
  172.           // Set orignal ParseObject ACL checking promise, if the object does not match
  173.           // subscription, we do not need to check ACL
  174.           let originalACLCheckingPromise;
  175.           if (!isOriginalSubscriptionMatched) {
  176.             originalACLCheckingPromise = Parse.Promise.as(false);
  177.           } else {
  178.             let originalACL;
  179.             if (message.originalParseObject) {
  180.               originalACL = message.originalParseObject.getACL();
  181.             }
  182.             originalACLCheckingPromise = this._matchesACL(originalACL, client, requestId);
  183.           }
  184.           // Set current ParseObject ACL checking promise, if the object does not match
  185.           // subscription, we do not need to check ACL
  186.           let currentACLCheckingPromise;
  187.           if (!isCurrentSubscriptionMatched) {
  188.             currentACLCheckingPromise = Parse.Promise.as(false);
  189.           } else {
  190.             const currentACL = message.currentParseObject.getACL();
  191.             currentACLCheckingPromise = this._matchesACL(currentACL, client, requestId);
  192.           }
  193.  
  194.           Parse.Promise.when(
  195.             originalACLCheckingPromise,
  196.             currentACLCheckingPromise
  197.           ).then((isOriginalMatched, isCurrentMatched) => {
  198.             logger.verbose('Original %j | Current %j | Match: %s, %s, %s, %s | Query: %s',
  199.               originalParseObject,
  200.               currentParseObject,
  201.               isOriginalSubscriptionMatched,
  202.               isCurrentSubscriptionMatched,
  203.               isOriginalMatched,
  204.               isCurrentMatched,
  205.               subscription.hash
  206.             );
  207.  
  208.             // Decide event type
  209.             let type;
  210.             if (isOriginalMatched && isCurrentMatched) {
  211.               type = 'Update';
  212.             } else if (isOriginalMatched && !isCurrentMatched) {
  213.               type = 'Leave';
  214.             } else if (!isOriginalMatched && isCurrentMatched) {
  215.               if (originalParseObject) {
  216.                 type = 'Enter';
  217.               } else {
  218.                 type = 'Create';
  219.               }
  220.             } else {
  221.               return null;
  222.             }
  223.             const functionName = 'push' + type;
  224.             client[functionName](requestId, currentParseObject);
  225.           }, (error) => {
  226.             logger.error('Matching ACL error : ', error);
  227.           });
  228.         }
  229.       }
  230.     }
  231.   }
  232.  
  233.   _onConnect(parseWebsocket: any): void {
  234.     parseWebsocket.on('message', (request) => {
  235.       if (typeof request === 'string') {
  236.         try {
  237.           request = JSON.parse(request);
  238.         } catch(e) {
  239.           logger.error('unable to parse request', request, e);
  240.           return;
  241.         }
  242.       }
  243.       logger.verbose('Request: %j', request);
  244.  
  245.       // Check whether this request is a valid request, return error directly if not
  246.       if (!tv4.validate(request, RequestSchema['general']) || !tv4.validate(request, RequestSchema[request.op])) {
  247.         Client.pushError(parseWebsocket, 1, tv4.error.message);
  248.         logger.error('Connect message error %s', tv4.error.message);
  249.         return;
  250.       }
  251.  
  252.       switch(request.op) {
  253.       case 'connect':
  254.         this._handleConnect(parseWebsocket, request);
  255.         break;
  256.       case 'subscribe':
  257.         this._handleSubscribe(parseWebsocket, request);
  258.         break;
  259.       case 'update':
  260.         this._handleUpdateSubscription(parseWebsocket, request);
  261.         break;
  262.       case 'unsubscribe':
  263.         this._handleUnsubscribe(parseWebsocket, request);
  264.         break;
  265.       default:
  266.         Client.pushError(parseWebsocket, 3, 'Get unknown operation');
  267.         logger.error('Get unknown operation', request.op);
  268.       }
  269.     });
  270.  
  271.     parseWebsocket.on('disconnect', () => {
  272.       logger.info('Client disconnect: %d', parseWebsocket.clientId);
  273.       const clientId = parseWebsocket.clientId;
  274.       if (!this.clients.has(clientId)) {
  275.         logger.error('Can not find client %d on disconnect', clientId);
  276.         return;
  277.       }
  278.  
  279.       // Delete client
  280.       const client = this.clients.get(clientId);
  281.       this.clients.delete(clientId);
  282.  
  283.       // Delete client from subscriptions
  284.       for (const [requestId, subscriptionInfo] of _.entries(client.subscriptionInfos)) {
  285.         const subscription = subscriptionInfo.subscription;
  286.         subscription.deleteClientSubscription(clientId, requestId);
  287.  
  288.         // If there is no client which is subscribing this subscription, remove it from subscriptions
  289.         const classSubscriptions = this.subscriptions.get(subscription.className);
  290.         if (!subscription.hasSubscribingClient()) {
  291.           classSubscriptions.delete(subscription.hash);
  292.         }
  293.         // If there is no subscriptions under this class, remove it from subscriptions
  294.         if (classSubscriptions.size === 0) {
  295.           this.subscriptions.delete(subscription.className);
  296.         }
  297.       }
  298.  
  299.       logger.verbose('Current clients %d', this.clients.size);
  300.       logger.verbose('Current subscriptions %d', this.subscriptions.size);
  301.     });
  302.   }
  303.  
  304.   _matchesSubscription(parseObject: any, subscription: any): boolean {
  305.     // Object is undefined or null, not match
  306.     if (!parseObject) {
  307.       return false;
  308.     }
  309.     return matchesQuery(parseObject, subscription.query);
  310.   }
  311.  
  312.   _matchesACL(acl: any, client: any, requestId: number): any {
  313.     // If ACL is undefined or null, or ACL has public read access, return true directly
  314.     if (!acl || acl.getPublicReadAccess()) {
  315.       return Parse.Promise.as(true);
  316.     }
  317.     // Check subscription sessionToken matches ACL first
  318.     const subscriptionInfo = client.getSubscriptionInfo(requestId);
  319.     if (typeof subscriptionInfo === 'undefined') {
  320.       return Parse.Promise.as(false);
  321.     }
  322.  
  323.     const subscriptionSessionToken = subscriptionInfo.sessionToken;
  324.     return this.sessionTokenCache.getUserId(subscriptionSessionToken).then((userId) => {
  325.       return acl.getReadAccess(userId);
  326.     }).then((isSubscriptionSessionTokenMatched) => {
  327.       if (isSubscriptionSessionTokenMatched) {
  328.         return Parse.Promise.as(true);
  329.       }
  330.  
  331.       // Check if the user has any roles that match the ACL
  332.       return new Parse.Promise((resolve, reject) => {
  333.  
  334.         // Resolve false right away if the acl doesn't have any roles
  335.         const acl_has_roles = Object.keys(acl.permissionsById).some(key => key.startsWith("role:"));
  336.         if (!acl_has_roles) {
  337.           return resolve(false);
  338.         }
  339.  
  340.         this.sessionTokenCache.getUserId(subscriptionSessionToken)
  341.         .then((userId) => {
  342.  
  343.             // Pass along a null if there is no user id
  344.           if (!userId) {
  345.             return Parse.Promise.as(null);
  346.           }
  347.  
  348.             // Prepare a user object to query for roles
  349.             // To eliminate a query for the user, create one locally with the id
  350.           var user = new Parse.User();
  351.           user.id = userId;
  352.           return user;
  353.  
  354.         })
  355.         .then((user) => {
  356.  
  357.             // Pass along an empty array (of roles) if no user
  358.           if (!user) {
  359.             return Parse.Promise.as([]);
  360.           }
  361.  
  362.             // Then get the user's roles
  363.           var rolesQuery = new Parse.Query(Parse.Role);
  364.           rolesQuery.equalTo("users", user);
  365.           return rolesQuery.find({useMasterKey:true});
  366.         }).
  367.         then((roles) => {
  368.  
  369.             // Finally, see if any of the user's roles allow them read access
  370.           for (const role of roles) {
  371.             if (acl.getRoleReadAccess(role)) {
  372.               return resolve(true);
  373.             }
  374.           }
  375.           resolve(false);
  376.         })
  377.         .catch((error) => {
  378.           reject(error);
  379.         });
  380.  
  381.       });
  382.     }).then((isRoleMatched) => {
  383.  
  384.       if(isRoleMatched) {
  385.         return Parse.Promise.as(true);
  386.       }
  387.  
  388.       // Check client sessionToken matches ACL
  389.       const clientSessionToken = client.sessionToken;
  390.       return this.sessionTokenCache.getUserId(clientSessionToken).then((userId) => {
  391.         return acl.getReadAccess(userId);
  392.       });
  393.     }).then((isMatched) => {
  394.       return Parse.Promise.as(isMatched);
  395.     }, () => {
  396.       return Parse.Promise.as(false);
  397.     });
  398.   }
  399.  
  400.   _handleConnect(parseWebsocket: any, request: any): any {
  401.     if (!this._validateKeys(request, this.keyPairs)) {
  402.       Client.pushError(parseWebsocket, 4, 'Key in request is not valid');
  403.       logger.error('Key in request is not valid');
  404.       return;
  405.     }
  406.     const client = new Client(this.clientId, parseWebsocket);
  407.     parseWebsocket.clientId = this.clientId;
  408.     this.clientId += 1;
  409.     this.clients.set(parseWebsocket.clientId, client);
  410.     logger.info('Create new client: %d', parseWebsocket.clientId);
  411.     client.pushConnect();
  412.   }
  413.  
  414.   _validateKeys(request: any, validKeyPairs: any): boolean {
  415.     if (!validKeyPairs || validKeyPairs.size == 0) {
  416.       return true;
  417.     }
  418.     let isValid = false;
  419.     for (const [key, secret] of validKeyPairs) {
  420.       if (!request[key] || request[key] !== secret) {
  421.         continue;
  422.       }
  423.       isValid = true;
  424.       break;
  425.     }
  426.     return isValid;
  427.   }
  428.  
  429.   _handleSubscribe(parseWebsocket: any, request: any): any {
  430.     // If we can not find this client, return error to client
  431.     if (!parseWebsocket.hasOwnProperty('clientId')) {
  432.       Client.pushError(parseWebsocket, 2, 'Can not find this client, make sure you connect to server before subscribing');
  433.       logger.error('Can not find this client, make sure you connect to server before subscribing');
  434.       return;
  435.     }
  436.     const client = this.clients.get(parseWebsocket.clientId);
  437.  
  438.     // Get subscription from subscriptions, create one if necessary
  439.     const subscriptionHash = queryHash(request.query);
  440.     // Add className to subscriptions if necessary
  441.     const className = request.query.className;
  442.     if (!this.subscriptions.has(className)) {
  443.       this.subscriptions.set(className, new Map());
  444.     }
  445.     const classSubscriptions = this.subscriptions.get(className);
  446.     let subscription;
  447.     if (classSubscriptions.has(subscriptionHash)) {
  448.       subscription = classSubscriptions.get(subscriptionHash);
  449.     } else {
  450.       subscription = new Subscription(className, request.query.where, subscriptionHash);
  451.       classSubscriptions.set(subscriptionHash, subscription);
  452.     }
  453.  
  454.     // Add subscriptionInfo to client
  455.     const subscriptionInfo = {
  456.       subscription: subscription
  457.     };
  458.     // Add selected fields and sessionToken for this subscription if necessary
  459.     if (request.query.fields) {
  460.       subscriptionInfo.fields = request.query.fields;
  461.     }
  462.     if (request.sessionToken) {
  463.       subscriptionInfo.sessionToken = request.sessionToken;
  464.     }
  465.     client.addSubscriptionInfo(request.requestId, subscriptionInfo);
  466.  
  467.     // Add clientId to subscription
  468.     subscription.addClientSubscription(parseWebsocket.clientId, request.requestId);
  469.  
  470.     client.pushSubscribe(request.requestId);
  471.  
  472.     logger.verbose('Create client %d new subscription: %d', parseWebsocket.clientId, request.requestId);
  473.     logger.verbose('Current client number: %d', this.clients.size);
  474.   }
  475.  
  476.   _handleUpdateSubscription(parseWebsocket: any, request: any): any {
  477.     this._handleUnsubscribe(parseWebsocket, request, false);
  478.     this._handleSubscribe(parseWebsocket, request);
  479.   }
  480.  
  481.   _handleUnsubscribe(parseWebsocket: any, request: any, notifyClient: bool = true): any {
  482.     // If we can not find this client, return error to client
  483.     if (!parseWebsocket.hasOwnProperty('clientId')) {
  484.       Client.pushError(parseWebsocket, 2, 'Can not find this client, make sure you connect to server before unsubscribing');
  485.       logger.error('Can not find this client, make sure you connect to server before unsubscribing');
  486.       return;
  487.     }
  488.     const requestId = request.requestId;
  489.     const client = this.clients.get(parseWebsocket.clientId);
  490.     if (typeof client === 'undefined') {
  491.       Client.pushError(parseWebsocket, 2, 'Cannot find client with clientId '  + parseWebsocket.clientId +
  492.         '. Make sure you connect to live query server before unsubscribing.');
  493.       logger.error('Can not find this client ' + parseWebsocket.clientId);
  494.       return;
  495.     }
  496.  
  497.     const subscriptionInfo = client.getSubscriptionInfo(requestId);
  498.     if (typeof subscriptionInfo === 'undefined') {
  499.       Client.pushError(parseWebsocket, 2, 'Cannot find subscription with clientId '  + parseWebsocket.clientId +
  500.         ' subscriptionId ' + requestId + '. Make sure you subscribe to live query server before unsubscribing.');
  501.       logger.error('Can not find subscription with clientId ' + parseWebsocket.clientId +  ' subscriptionId ' + requestId);
  502.       return;
  503.     }
  504.  
  505.     // Remove subscription from client
  506.     client.deleteSubscriptionInfo(requestId);
  507.     // Remove client from subscription
  508.     const subscription = subscriptionInfo.subscription;
  509.     const className = subscription.className;
  510.     subscription.deleteClientSubscription(parseWebsocket.clientId, requestId);
  511.     // If there is no client which is subscribing this subscription, remove it from subscriptions
  512.     const classSubscriptions = this.subscriptions.get(className);
  513.     if (!subscription.hasSubscribingClient()) {
  514.       classSubscriptions.delete(subscription.hash);
  515.     }
  516.     // If there is no subscriptions under this class, remove it from subscriptions
  517.     if (classSubscriptions.size === 0) {
  518.       this.subscriptions.delete(className);
  519.     }
  520.  
  521.     if (!notifyClient) {
  522.       return;
  523.     }
  524.  
  525.     client.pushUnsubscribe(request.requestId);
  526.  
  527.     logger.verbose('Delete client: %d | subscription: %d', parseWebsocket.clientId, request.requestId);
  528.   }
  529. }
  530.  
  531. export {
  532.   ParseLiveQueryServer
  533. }
  534.  
downloadParseLiveQueryServer.js Source code - Download parse-server Source code
Related Source Codes/Software:
react-boilerplate - 2017-06-07
webtorrent - Streaming torrent client for the web ... 2017-06-06
machine-learning-for-software-engineers - A complete daily plan for studying to become a mac... 2017-06-06
upterm - A terminal emulator for the 21st century. 2017-06-06
lottie-android - Render After Effects animations natively on Androi... 2017-06-07
AsyncDisplayKit - Smooth asynchronous user interfaces for iOS apps. ... 2017-06-07
ionicons - The premium icon font for Ionic ... 2017-06-07
storybook - 2017-06-07
prettier - Prettier is an opinionated JavaScript formatter. ... 2017-06-08
CRYENGINE - CRYENGINE is a powerful real-time game development... 2017-06-11
postal - 2017-06-11
reactide - Reactide is the first dedicated IDE for React web ... 2017-06-11
rkt - rkt is a pod-native container engine for Linux. It... 2017-06-11
uWebSockets - Tiny WebSockets https://for... 2017-06-11
realworld - TodoMVC for the RealWorld - Exemplary fullstack Me... 2017-06-11
goreplay - GoReplay is an open-source tool for capturing and ... 2017-06-10
pyenv - Simple Python version management 2017-06-10
redux-saga - An alternative side effect model for Redux apps ... 2017-06-10
angular-starter - 2017-06-10

 Back to top