Source: lib/change_stream.js

  1. 'use strict';
  2. var EventEmitter = require('events'),
  3. inherits = require('util').inherits,
  4. MongoNetworkError = require('mongodb-core').MongoNetworkError;
  5. var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];
  6. /**
  7. * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
  8. * @class ChangeStream
  9. * @since 3.0.0
  10. * @param {(Db|Collection)} changeDomain The collection against which to create the change stream
  11. * @param {Array} pipeline An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
  12. * @param {object} [options=null] Optional settings
  13. * @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  14. * @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
  15. * @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
  16. * @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://www.mongodb.com/docs/manual/reference/command/aggregate|aggregation documentation}.
  17. * @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://www.mongodb.com/docs/manual/reference/command/aggregate|aggregation documentation}.
  18. * @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://www.mongodb.com/docs/manual/reference/read-preference|read preference documentation}.
  19. * @fires ChangeStream#close
  20. * @fires ChangeStream#change
  21. * @fires ChangeStream#end
  22. * @fires ChangeStream#error
  23. * @return {ChangeStream} a ChangeStream instance.
  24. */
  25. var ChangeStream = function(collection, pipeline, options) {
  26. var Collection = require('./collection');
  27. // Ensure the provided collection is actually a collection
  28. if (!(collection instanceof Collection)) {
  29. throw new Error(
  30. 'collection provided to ChangeStream constructor is not an instance of Collection'
  31. );
  32. }
  33. var self = this;
  34. self.pipeline = pipeline || [];
  35. self.options = options || {};
  36. self.promiseLibrary = collection.s.promiseLibrary;
  37. // Extract namespace and serverConfig from the collection
  38. self.namespace = {
  39. collection: collection.collectionName,
  40. database: collection.s.db.databaseName
  41. };
  42. self.serverConfig = collection.s.db.serverConfig;
  43. // Determine correct read preference
  44. self.options.readPreference = self.options.readPreference || collection.s.readPreference;
  45. // Create contained Change Stream cursor
  46. self.cursor = createChangeStreamCursor(self);
  47. // Listen for any `change` listeners being added to ChangeStream
  48. self.on('newListener', function(eventName) {
  49. if (eventName === 'change' && self.cursor && self.cursor.listenerCount('change') === 0) {
  50. self.cursor.on('data', function(change) {
  51. processNewChange(self, null, change);
  52. });
  53. }
  54. });
  55. // Listen for all `change` listeners being removed from ChangeStream
  56. self.on('removeListener', function(eventName) {
  57. if (eventName === 'change' && self.listenerCount('change') === 0 && self.cursor) {
  58. self.cursor.removeAllListeners('data');
  59. }
  60. });
  61. };
  62. inherits(ChangeStream, EventEmitter);
  63. // Create a new change stream cursor based on self's configuration
  64. var createChangeStreamCursor = function(self) {
  65. if (self.resumeToken) {
  66. self.options.resumeAfter = self.resumeToken;
  67. }
  68. var changeStreamCursor = buildChangeStreamAggregationCommand(
  69. self.serverConfig,
  70. self.namespace,
  71. self.pipeline,
  72. self.resumeToken,
  73. self.options
  74. );
  75. /**
  76. * Fired for each new matching change in the specified namespace. Attaching a `change` event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available.
  77. *
  78. * @event ChangeStream#change
  79. * @type {object}
  80. */
  81. if (self.listenerCount('change') > 0) {
  82. changeStreamCursor.on('data', function(change) {
  83. processNewChange(self, null, change);
  84. });
  85. }
  86. /**
  87. * Change stream close event
  88. *
  89. * @event ChangeStream#close
  90. * @type {null}
  91. */
  92. changeStreamCursor.on('close', function() {
  93. self.emit('close');
  94. });
  95. /**
  96. * Change stream end event
  97. *
  98. * @event ChangeStream#end
  99. * @type {null}
  100. */
  101. changeStreamCursor.on('end', function() {
  102. self.emit('end');
  103. });
  104. /**
  105. * Fired when the stream encounters an error.
  106. *
  107. * @event ChangeStream#error
  108. * @type {Error}
  109. */
  110. changeStreamCursor.on('error', function(error) {
  111. self.emit('error', error);
  112. });
  113. if (self.pipeDestinations) {
  114. const cursorStream = changeStreamCursor.stream(self.streamOptions);
  115. for (let pipeDestination in self.pipeDestinations) {
  116. cursorStream.pipe(pipeDestination);
  117. }
  118. }
  119. return changeStreamCursor;
  120. };
  121. var buildChangeStreamAggregationCommand = function(
  122. serverConfig,
  123. namespace,
  124. pipeline,
  125. resumeToken,
  126. options
  127. ) {
  128. var changeStreamStageOptions = {};
  129. if (options.fullDocument) {
  130. changeStreamStageOptions.fullDocument = options.fullDocument;
  131. }
  132. if (resumeToken || options.resumeAfter) {
  133. changeStreamStageOptions.resumeAfter = resumeToken || options.resumeAfter;
  134. }
  135. // Map cursor options
  136. var cursorOptions = {};
  137. cursorOptionNames.forEach(function(optionName) {
  138. if (options[optionName]) {
  139. cursorOptions[optionName] = options[optionName];
  140. }
  141. });
  142. var changeStreamPipeline = [{ $changeStream: changeStreamStageOptions }];
  143. changeStreamPipeline = changeStreamPipeline.concat(pipeline);
  144. var command = {
  145. aggregate: namespace.collection,
  146. pipeline: changeStreamPipeline,
  147. readConcern: { level: 'majority' },
  148. cursor: {
  149. batchSize: options.batchSize || 1
  150. }
  151. };
  152. // Create and return the cursor
  153. return serverConfig.cursor(
  154. namespace.database + '.' + namespace.collection,
  155. command,
  156. cursorOptions
  157. );
  158. };
  159. /**
  160. * Check if there is any document still available in the Change Stream
  161. * @function ChangeStream.prototype.hasNext
  162. * @param {ChangeStream~resultCallback} [callback] The result callback.
  163. * @throws {MongoError}
  164. * @return {Promise} returns Promise if no callback passed
  165. */
  166. ChangeStream.prototype.hasNext = function(callback) {
  167. return this.cursor.hasNext(callback);
  168. };
  169. /**
  170. * Get the next available document from the Change Stream, returns null if no more documents are available.
  171. * @function ChangeStream.prototype.next
  172. * @param {ChangeStream~resultCallback} [callback] The result callback.
  173. * @throws {MongoError}
  174. * @return {Promise} returns Promise if no callback passed
  175. */
  176. ChangeStream.prototype.next = function(callback) {
  177. var self = this;
  178. if (this.isClosed()) {
  179. if (callback) return callback(new Error('Change Stream is not open.'), null);
  180. return self.promiseLibrary.reject(new Error('Change Stream is not open.'));
  181. }
  182. return this.cursor
  183. .next()
  184. .then(function(change) {
  185. return processNewChange(self, null, change, callback);
  186. })
  187. .catch(function(err) {
  188. return processNewChange(self, err, null, callback);
  189. });
  190. };
  191. /**
  192. * Is the cursor closed
  193. * @method ChangeStream.prototype.isClosed
  194. * @return {boolean}
  195. */
  196. ChangeStream.prototype.isClosed = function() {
  197. if (this.cursor) {
  198. return this.cursor.isClosed();
  199. }
  200. return true;
  201. };
  202. /**
  203. * Close the Change Stream
  204. * @method ChangeStream.prototype.close
  205. * @param {ChangeStream~resultCallback} [callback] The result callback.
  206. * @return {Promise} returns Promise if no callback passed
  207. */
  208. ChangeStream.prototype.close = function(callback) {
  209. if (!this.cursor) {
  210. if (callback) return callback();
  211. return this.promiseLibrary.resolve();
  212. }
  213. // Tidy up the existing cursor
  214. var cursor = this.cursor;
  215. delete this.cursor;
  216. return cursor.close(callback);
  217. };
  218. /**
  219. * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
  220. * @method
  221. * @param {Writable} destination The destination for writing data
  222. * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options}
  223. * @return {null}
  224. */
  225. ChangeStream.prototype.pipe = function(destination, options) {
  226. if (!this.pipeDestinations) {
  227. this.pipeDestinations = [];
  228. }
  229. this.pipeDestinations.push(destination);
  230. return this.cursor.pipe(destination, options);
  231. };
  232. /**
  233. * This method will remove the hooks set up for a previous pipe() call.
  234. * @param {Writable} [destination] The destination for writing data
  235. * @return {null}
  236. */
  237. ChangeStream.prototype.unpipe = function(destination) {
  238. if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) {
  239. this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1);
  240. }
  241. return this.cursor.unpipe(destination);
  242. };
  243. /**
  244. * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
  245. * @return {null}
  246. */
  247. ChangeStream.prototype.pause = function() {
  248. return this.cursor.pause();
  249. };
  250. /**
  251. * This method will cause the readable stream to resume emitting data events.
  252. * @return {null}
  253. */
  254. ChangeStream.prototype.resume = function() {
  255. return this.cursor.resume();
  256. };
  257. /**
  258. * Return a modified Readable stream including a possible transform method.
  259. * @method
  260. * @param {object} [options=null] Optional settings.
  261. * @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream.
  262. * @return {Cursor}
  263. */
  264. ChangeStream.prototype.stream = function(options) {
  265. this.streamOptions = options;
  266. return this.cursor.stream(options);
  267. };
  268. // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
  269. var processNewChange = function(self, err, change, callback) {
  270. // Handle errors
  271. if (err) {
  272. // Handle resumable MongoNetworkErrors
  273. if (err instanceof MongoNetworkError && !self.attemptingResume) {
  274. self.attemptingResume = true;
  275. if (callback) {
  276. return self.cursor.close(function(closeErr) {
  277. if (closeErr) {
  278. return callback(err, null);
  279. }
  280. self.cursor = createChangeStreamCursor(self);
  281. return self.next(callback);
  282. });
  283. }
  284. return self.cursor
  285. .close()
  286. .then(() => (self.cursor = createChangeStreamCursor(self)))
  287. .then(() => self.next());
  288. }
  289. if (typeof callback === 'function') return callback(err, null);
  290. if (self.listenerCount('error')) return self.emit('error', err);
  291. return self.promiseLibrary.reject(err);
  292. }
  293. self.attemptingResume = false;
  294. // Cache the resume token if it is present. If it is not present return an error.
  295. if (!change || !change._id) {
  296. var noResumeTokenError = new Error(
  297. 'A change stream document has been received that lacks a resume token (_id).'
  298. );
  299. if (typeof callback === 'function') return callback(noResumeTokenError, null);
  300. if (self.listenerCount('error')) return self.emit('error', noResumeTokenError);
  301. return self.promiseLibrary.reject(noResumeTokenError);
  302. }
  303. self.resumeToken = change._id;
  304. // Return the change
  305. if (typeof callback === 'function') return callback(err, change);
  306. if (self.listenerCount('change')) return self.emit('change', change);
  307. return self.promiseLibrary.resolve(change);
  308. };
  309. /**
  310. * The callback format for results
  311. * @callback ChangeStream~resultCallback
  312. * @param {MongoError} error An error instance representing the error during the execution.
  313. * @param {(object|null)} result The result object if the command was executed successfully.
  314. */
  315. module.exports = ChangeStream;