Source: lib/bulk/ordered.js

  1. 'use strict';
  2. const common = require('./common');
  3. const utils = require('../utils');
  4. const toError = require('../utils').toError;
  5. const handleCallback = require('../utils').handleCallback;
  6. const shallowClone = utils.shallowClone;
  7. const BulkWriteResult = common.BulkWriteResult;
  8. const ObjectID = require('mongodb-core').BSON.ObjectID;
  9. const BSON = require('mongodb-core').BSON;
  10. const Batch = common.Batch;
  11. const mergeBatchResults = common.mergeBatchResults;
  12. const executeOperation = utils.executeOperation;
  13. const BulkWriteError = require('./common').BulkWriteError;
  14. const applyWriteConcern = utils.applyWriteConcern;
  15. var bson = new BSON([
  16. BSON.Binary,
  17. BSON.Code,
  18. BSON.DBRef,
  19. BSON.Decimal128,
  20. BSON.Double,
  21. BSON.Int32,
  22. BSON.Long,
  23. BSON.Map,
  24. BSON.MaxKey,
  25. BSON.MinKey,
  26. BSON.ObjectId,
  27. BSON.BSONRegExp,
  28. BSON.Symbol,
  29. BSON.Timestamp
  30. ]);
  31. /**
  32. * Create a FindOperatorsOrdered instance (INTERNAL TYPE, do not instantiate directly)
  33. * @class
  34. * @return {FindOperatorsOrdered} a FindOperatorsOrdered instance.
  35. */
  36. var FindOperatorsOrdered = function(self) {
  37. this.s = self.s;
  38. };
  39. /**
  40. * Add a single update document to the bulk operation
  41. *
  42. * @method
  43. * @param {object} doc update operations
  44. * @throws {MongoError}
  45. * @return {OrderedBulkOperation}
  46. */
  47. FindOperatorsOrdered.prototype.update = function(updateDocument) {
  48. // Perform upsert
  49. var upsert = typeof this.s.currentOp.upsert === 'boolean' ? this.s.currentOp.upsert : false;
  50. // Establish the update command
  51. var document = {
  52. q: this.s.currentOp.selector,
  53. u: updateDocument,
  54. multi: true,
  55. upsert: upsert
  56. };
  57. // Clear out current Op
  58. this.s.currentOp = null;
  59. // Add the update document to the list
  60. return addToOperationsList(this, common.UPDATE, document);
  61. };
  62. /**
  63. * Add a single update one document to the bulk operation
  64. *
  65. * @method
  66. * @param {object} doc update operations
  67. * @throws {MongoError}
  68. * @return {OrderedBulkOperation}
  69. */
  70. FindOperatorsOrdered.prototype.updateOne = function(updateDocument) {
  71. // Perform upsert
  72. var upsert = typeof this.s.currentOp.upsert === 'boolean' ? this.s.currentOp.upsert : false;
  73. // Establish the update command
  74. var document = {
  75. q: this.s.currentOp.selector,
  76. u: updateDocument,
  77. multi: false,
  78. upsert: upsert
  79. };
  80. // Clear out current Op
  81. this.s.currentOp = null;
  82. // Add the update document to the list
  83. return addToOperationsList(this, common.UPDATE, document);
  84. };
  85. /**
  86. * Add a replace one operation to the bulk operation
  87. *
  88. * @method
  89. * @param {object} doc the new document to replace the existing one with
  90. * @throws {MongoError}
  91. * @return {OrderedBulkOperation}
  92. */
  93. FindOperatorsOrdered.prototype.replaceOne = function(updateDocument) {
  94. this.updateOne(updateDocument);
  95. };
  96. /**
  97. * Upsert modifier for update bulk operation
  98. *
  99. * @method
  100. * @throws {MongoError}
  101. * @return {FindOperatorsOrdered}
  102. */
  103. FindOperatorsOrdered.prototype.upsert = function() {
  104. this.s.currentOp.upsert = true;
  105. return this;
  106. };
  107. /**
  108. * Add a remove one operation to the bulk operation
  109. *
  110. * @method
  111. * @throws {MongoError}
  112. * @return {OrderedBulkOperation}
  113. */
  114. FindOperatorsOrdered.prototype.deleteOne = function() {
  115. // Establish the update command
  116. var document = {
  117. q: this.s.currentOp.selector,
  118. limit: 1
  119. };
  120. // Clear out current Op
  121. this.s.currentOp = null;
  122. // Add the remove document to the list
  123. return addToOperationsList(this, common.REMOVE, document);
  124. };
  125. // Backward compatibility
  126. FindOperatorsOrdered.prototype.removeOne = FindOperatorsOrdered.prototype.deleteOne;
  127. /**
  128. * Add a remove operation to the bulk operation
  129. *
  130. * @method
  131. * @throws {MongoError}
  132. * @return {OrderedBulkOperation}
  133. */
  134. FindOperatorsOrdered.prototype.delete = function() {
  135. // Establish the update command
  136. var document = {
  137. q: this.s.currentOp.selector,
  138. limit: 0
  139. };
  140. // Clear out current Op
  141. this.s.currentOp = null;
  142. // Add the remove document to the list
  143. return addToOperationsList(this, common.REMOVE, document);
  144. };
  145. // Backward compatibility
  146. FindOperatorsOrdered.prototype.remove = FindOperatorsOrdered.prototype.delete;
  147. // Add to internal list of documents
  148. var addToOperationsList = function(_self, docType, document) {
  149. // Get the bsonSize
  150. var bsonSize = bson.calculateObjectSize(document, {
  151. checkKeys: false
  152. });
  153. // Throw error if the doc is bigger than the max BSON size
  154. if (bsonSize >= _self.s.maxBatchSizeBytes) {
  155. throw toError('document is larger than the maximum size ' + _self.s.maxBatchSizeBytes);
  156. }
  157. // Create a new batch object if we don't have a current one
  158. if (_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  159. // Check if we need to create a new batch
  160. if (
  161. _self.s.currentBatchSize + 1 >= _self.s.maxWriteBatchSize ||
  162. _self.s.currentBatchSizeBytes + _self.s.currentBatchSizeBytes >= _self.s.maxBatchSizeBytes ||
  163. _self.s.currentBatch.batchType !== docType
  164. ) {
  165. // Save the batch to the execution stack
  166. _self.s.batches.push(_self.s.currentBatch);
  167. // Create a new batch
  168. _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  169. // Reset the current size trackers
  170. _self.s.currentBatchSize = 0;
  171. _self.s.currentBatchSizeBytes = 0;
  172. } else {
  173. // Update current batch size
  174. _self.s.currentBatchSize = _self.s.currentBatchSize + 1;
  175. _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
  176. }
  177. if (docType === common.INSERT) {
  178. _self.s.bulkResult.insertedIds.push({ index: _self.s.currentIndex, _id: document._id });
  179. }
  180. // We have an array of documents
  181. if (Array.isArray(document)) {
  182. throw toError('operation passed in cannot be an Array');
  183. } else {
  184. _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
  185. _self.s.currentBatch.operations.push(document);
  186. _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
  187. _self.s.currentIndex = _self.s.currentIndex + 1;
  188. }
  189. // Return self
  190. return _self;
  191. };
  192. /**
  193. * Create a new OrderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  194. * @class
  195. * @property {number} length Get the number of operations in the bulk.
  196. * @return {OrderedBulkOperation} a OrderedBulkOperation instance.
  197. */
  198. function OrderedBulkOperation(topology, collection, options) {
  199. options = options == null ? {} : options;
  200. // TODO Bring from driver information in isMaster
  201. var executed = false;
  202. // Current item
  203. var currentOp = null;
  204. // Handle to the bson serializer, used to calculate running sizes
  205. var bson = topology.bson;
  206. // Namespace for the operation
  207. var namespace = collection.collectionName;
  208. // Set max byte size
  209. var maxBatchSizeBytes =
  210. topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
  211. ? topology.isMasterDoc.maxBsonObjectSize
  212. : 1024 * 1025 * 16;
  213. var maxWriteBatchSize =
  214. topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
  215. ? topology.isMasterDoc.maxWriteBatchSize
  216. : 1000;
  217. // Get the write concern
  218. var writeConcern = applyWriteConcern(shallowClone(options), { collection: collection }, options);
  219. writeConcern = writeConcern.writeConcern;
  220. // Get the promiseLibrary
  221. var promiseLibrary = options.promiseLibrary || Promise;
  222. // Final results
  223. var bulkResult = {
  224. ok: 1,
  225. writeErrors: [],
  226. writeConcernErrors: [],
  227. insertedIds: [],
  228. nInserted: 0,
  229. nUpserted: 0,
  230. nMatched: 0,
  231. nModified: 0,
  232. nRemoved: 0,
  233. upserted: []
  234. };
  235. // Internal state
  236. this.s = {
  237. // Final result
  238. bulkResult: bulkResult,
  239. // Current batch state
  240. currentBatch: null,
  241. currentIndex: 0,
  242. currentBatchSize: 0,
  243. currentBatchSizeBytes: 0,
  244. batches: [],
  245. // Write concern
  246. writeConcern: writeConcern,
  247. // Max batch size options
  248. maxBatchSizeBytes: maxBatchSizeBytes,
  249. maxWriteBatchSize: maxWriteBatchSize,
  250. // Namespace
  251. namespace: namespace,
  252. // BSON
  253. bson: bson,
  254. // Topology
  255. topology: topology,
  256. // Options
  257. options: options,
  258. // Current operation
  259. currentOp: currentOp,
  260. // Executed
  261. executed: executed,
  262. // Collection
  263. collection: collection,
  264. // Promise Library
  265. promiseLibrary: promiseLibrary,
  266. // Fundamental error
  267. err: null,
  268. // Bypass validation
  269. bypassDocumentValidation:
  270. typeof options.bypassDocumentValidation === 'boolean'
  271. ? options.bypassDocumentValidation
  272. : false,
  273. // check keys
  274. checkKeys: typeof options.checkKeys === 'boolean' ? options.checkKeys : true
  275. };
  276. }
  277. OrderedBulkOperation.prototype.raw = function(op) {
  278. var key = Object.keys(op)[0];
  279. // Set up the force server object id
  280. var forceServerObjectId =
  281. typeof this.s.options.forceServerObjectId === 'boolean'
  282. ? this.s.options.forceServerObjectId
  283. : this.s.collection.s.db.options.forceServerObjectId;
  284. // Update operations
  285. if (
  286. (op.updateOne && op.updateOne.q) ||
  287. (op.updateMany && op.updateMany.q) ||
  288. (op.replaceOne && op.replaceOne.q)
  289. ) {
  290. op[key].multi = op.updateOne || op.replaceOne ? false : true;
  291. return addToOperationsList(this, common.UPDATE, op[key]);
  292. }
  293. // Crud spec update format
  294. if (op.updateOne || op.updateMany || op.replaceOne) {
  295. var multi = op.updateOne || op.replaceOne ? false : true;
  296. var operation = { q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi };
  297. operation.upsert = op[key].upsert ? true : false;
  298. if (op.collation) operation.collation = op.collation;
  299. if (op[key].arrayFilters) operation.arrayFilters = op[key].arrayFilters;
  300. return addToOperationsList(this, common.UPDATE, operation);
  301. }
  302. // Remove operations
  303. if (
  304. op.removeOne ||
  305. op.removeMany ||
  306. (op.deleteOne && op.deleteOne.q) ||
  307. (op.deleteMany && op.deleteMany.q)
  308. ) {
  309. op[key].limit = op.removeOne ? 1 : 0;
  310. return addToOperationsList(this, common.REMOVE, op[key]);
  311. }
  312. // Crud spec delete operations, less efficient
  313. if (op.deleteOne || op.deleteMany) {
  314. var limit = op.deleteOne ? 1 : 0;
  315. operation = { q: op[key].filter, limit: limit };
  316. if (op.collation) operation.collation = op.collation;
  317. return addToOperationsList(this, common.REMOVE, operation);
  318. }
  319. // Insert operations
  320. if (op.insertOne && op.insertOne.document == null) {
  321. if (forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
  322. return addToOperationsList(this, common.INSERT, op.insertOne);
  323. } else if (op.insertOne && op.insertOne.document) {
  324. if (forceServerObjectId !== true && op.insertOne.document._id == null)
  325. op.insertOne.document._id = new ObjectID();
  326. return addToOperationsList(this, common.INSERT, op.insertOne.document);
  327. }
  328. if (op.insertMany) {
  329. for (var i = 0; i < op.insertMany.length; i++) {
  330. if (forceServerObjectId !== true && op.insertMany[i]._id == null)
  331. op.insertMany[i]._id = new ObjectID();
  332. addToOperationsList(this, common.INSERT, op.insertMany[i]);
  333. }
  334. return;
  335. }
  336. // No valid type of operation
  337. throw toError(
  338. 'bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany'
  339. );
  340. };
  341. /**
  342. * Add a single insert document to the bulk operation
  343. *
  344. * @param {object} doc the document to insert
  345. * @throws {MongoError}
  346. * @return {OrderedBulkOperation}
  347. */
  348. OrderedBulkOperation.prototype.insert = function(document) {
  349. if (this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null)
  350. document._id = new ObjectID();
  351. return addToOperationsList(this, common.INSERT, document);
  352. };
  353. /**
  354. * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
  355. *
  356. * @method
  357. * @param {object} selector The selector for the bulk operation.
  358. * @throws {MongoError}
  359. * @return {FindOperatorsOrdered}
  360. */
  361. OrderedBulkOperation.prototype.find = function(selector) {
  362. if (!selector) {
  363. throw toError('Bulk find operation must specify a selector');
  364. }
  365. // Save a current selector
  366. this.s.currentOp = {
  367. selector: selector
  368. };
  369. return new FindOperatorsOrdered(this);
  370. };
  371. Object.defineProperty(OrderedBulkOperation.prototype, 'length', {
  372. enumerable: true,
  373. get: function() {
  374. return this.s.currentIndex;
  375. }
  376. });
  377. //
  378. // Execute next write command in a chain
  379. var executeCommands = function(self, options, callback) {
  380. if (self.s.batches.length === 0) {
  381. return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
  382. }
  383. // Ordered execution of the command
  384. var batch = self.s.batches.shift();
  385. var resultHandler = function(err, result) {
  386. // Error is a driver related error not a bulk op error, terminate
  387. if ((err && err.driver) || (err && err.message)) {
  388. return handleCallback(callback, err);
  389. }
  390. // If we have and error
  391. if (err) err.ok = 0;
  392. // Merge the results together
  393. var mergeResult = mergeBatchResults(true, batch, self.s.bulkResult, err, result);
  394. const writeResult = new BulkWriteResult(self.s.bulkResult);
  395. if (mergeResult != null) {
  396. return handleCallback(callback, null, writeResult);
  397. }
  398. // If we are ordered and have errors and they are
  399. // not all replication errors terminate the operation
  400. if (self.s.bulkResult.writeErrors.length > 0) {
  401. if (self.s.bulkResult.writeErrors.length === 1) {
  402. return handleCallback(
  403. callback,
  404. new BulkWriteError(toError(self.s.bulkResult.writeErrors[0]), writeResult),
  405. null
  406. );
  407. }
  408. return handleCallback(
  409. callback,
  410. new BulkWriteError(
  411. toError({
  412. message: 'write operation failed',
  413. code: self.s.bulkResult.writeErrors[0].code,
  414. writeErrors: self.s.bulkResult.writeErrors
  415. }),
  416. writeResult
  417. ),
  418. null
  419. );
  420. } else if (writeResult.getWriteConcernError()) {
  421. return handleCallback(
  422. callback,
  423. new BulkWriteError(toError(writeResult.getWriteConcernError()), writeResult),
  424. null
  425. );
  426. }
  427. // Execute the next command in line
  428. executeCommands(self, options, callback);
  429. };
  430. var finalOptions = Object.assign({ ordered: true }, options);
  431. if (self.s.writeConcern != null) {
  432. finalOptions.writeConcern = self.s.writeConcern;
  433. }
  434. // Set an operationIf if provided
  435. if (self.operationId) {
  436. resultHandler.operationId = self.operationId;
  437. }
  438. // Serialize functions
  439. if (self.s.options.serializeFunctions) {
  440. finalOptions.serializeFunctions = true;
  441. }
  442. // Ignore undefined
  443. if (self.s.options.ignoreUndefined) {
  444. finalOptions.ignoreUndefined = true;
  445. }
  446. // Is the bypassDocumentValidation options specific
  447. if (self.s.bypassDocumentValidation === true) {
  448. finalOptions.bypassDocumentValidation = true;
  449. }
  450. // Is the checkKeys option disabled
  451. if (self.s.checkKeys === false) {
  452. finalOptions.checkKeys = false;
  453. }
  454. if (finalOptions.retryWrites) {
  455. if (batch.batchType === common.UPDATE) {
  456. finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
  457. }
  458. if (batch.batchType === common.REMOVE) {
  459. finalOptions.retryWrites =
  460. finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
  461. }
  462. }
  463. try {
  464. if (batch.batchType === common.INSERT) {
  465. self.s.topology.insert(
  466. self.s.collection.namespace,
  467. batch.operations,
  468. finalOptions,
  469. resultHandler
  470. );
  471. } else if (batch.batchType === common.UPDATE) {
  472. self.s.topology.update(
  473. self.s.collection.namespace,
  474. batch.operations,
  475. finalOptions,
  476. resultHandler
  477. );
  478. } else if (batch.batchType === common.REMOVE) {
  479. self.s.topology.remove(
  480. self.s.collection.namespace,
  481. batch.operations,
  482. finalOptions,
  483. resultHandler
  484. );
  485. }
  486. } catch (err) {
  487. // Force top level error
  488. err.ok = 0;
  489. // Merge top level error and return
  490. handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
  491. }
  492. };
  493. /**
  494. * The callback format for results
  495. * @callback OrderedBulkOperation~resultCallback
  496. * @param {MongoError} error An error instance representing the error during the execution.
  497. * @param {BulkWriteResult} result The bulk write result.
  498. */
  499. /**
  500. * Execute the ordered bulk operation
  501. *
  502. * @method
  503. * @param {object} [options=null] Optional settings.
  504. * @param {(number|string)} [options.w=null] The write concern.
  505. * @param {number} [options.wtimeout=null] The write concern timeout.
  506. * @param {boolean} [options.j=false] Specify a journal write concern.
  507. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  508. * @param {OrderedBulkOperation~resultCallback} [callback] The result callback
  509. * @throws {MongoError}
  510. * @return {Promise} returns Promise if no callback passed
  511. */
  512. OrderedBulkOperation.prototype.execute = function(_writeConcern, options, callback) {
  513. if (typeof options === 'function') (callback = options), (options = {});
  514. options = options || {};
  515. if (this.s.executed) {
  516. var executedError = toError('batch cannot be re-executed');
  517. return typeof callback === 'function'
  518. ? callback(executedError, null)
  519. : this.s.promiseLibrary.reject(executedError);
  520. }
  521. if (typeof _writeConcern === 'function') {
  522. callback = _writeConcern;
  523. } else if (_writeConcern && typeof _writeConcern === 'object') {
  524. this.s.writeConcern = _writeConcern;
  525. }
  526. // If we have current batch
  527. if (this.s.currentBatch) this.s.batches.push(this.s.currentBatch);
  528. // If we have no operations in the bulk raise an error
  529. if (this.s.batches.length === 0) {
  530. var emptyBatchError = toError('Invalid Operation, no operations specified');
  531. return typeof callback === 'function'
  532. ? callback(emptyBatchError, null)
  533. : this.s.promiseLibrary.reject(emptyBatchError);
  534. }
  535. return executeOperation(this.s.topology, executeCommands, [this, options, callback]);
  536. };
  537. /**
  538. * Returns an unordered batch object
  539. * @ignore
  540. */
  541. var initializeOrderedBulkOp = function(topology, collection, options) {
  542. return new OrderedBulkOperation(topology, collection, options);
  543. };
  544. initializeOrderedBulkOp.OrderedBulkOperation = OrderedBulkOperation;
  545. module.exports = initializeOrderedBulkOp;
  546. module.exports.Bulk = OrderedBulkOperation;