Source: lib/bulk/unordered.js

  1. "use strict";
  2. var common = require('./common')
  3. , utils = require('../utils')
  4. , toError = require('../utils').toError
  5. , handleCallback = require('../utils').handleCallback
  6. , shallowClone = utils.shallowClone
  7. , BulkWriteResult = common.BulkWriteResult
  8. , ObjectID = require('mongodb-core').BSON.ObjectID
  9. , BSON = require('mongodb-core').BSON
  10. , Define = require('../metadata')
  11. , Batch = common.Batch
  12. , mergeBatchResults = common.mergeBatchResults;
  13. var bson = new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
  14. BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
  15. BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]);
  16. /**
  17. * Create a FindOperatorsUnordered instance (INTERNAL TYPE, do not instantiate directly)
  18. * @class
  19. * @property {number} length Get the number of operations in the bulk.
  20. * @return {FindOperatorsUnordered} a FindOperatorsUnordered instance.
  21. */
  22. var FindOperatorsUnordered = function(self) {
  23. this.s = self.s;
  24. }
  25. /**
  26. * Add a single update document to the bulk operation
  27. *
  28. * @method
  29. * @param {object} doc update operations
  30. * @throws {MongoError}
  31. * @return {UnorderedBulkOperation}
  32. */
  33. FindOperatorsUnordered.prototype.update = function(updateDocument) {
  34. // Perform upsert
  35. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  36. // Establish the update command
  37. var document = {
  38. q: this.s.currentOp.selector
  39. , u: updateDocument
  40. , multi: true
  41. , upsert: upsert
  42. }
  43. // Clear out current Op
  44. this.s.currentOp = null;
  45. // Add the update document to the list
  46. return addToOperationsList(this, common.UPDATE, document);
  47. }
  48. /**
  49. * Add a single update one document to the bulk operation
  50. *
  51. * @method
  52. * @param {object} doc update operations
  53. * @throws {MongoError}
  54. * @return {UnorderedBulkOperation}
  55. */
  56. FindOperatorsUnordered.prototype.updateOne = function(updateDocument) {
  57. // Perform upsert
  58. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  59. // Establish the update command
  60. var document = {
  61. q: this.s.currentOp.selector
  62. , u: updateDocument
  63. , multi: false
  64. , upsert: upsert
  65. }
  66. // Clear out current Op
  67. this.s.currentOp = null;
  68. // Add the update document to the list
  69. return addToOperationsList(this, common.UPDATE, document);
  70. }
  71. /**
  72. * Add a replace one operation to the bulk operation
  73. *
  74. * @method
  75. * @param {object} doc the new document to replace the existing one with
  76. * @throws {MongoError}
  77. * @return {UnorderedBulkOperation}
  78. */
  79. FindOperatorsUnordered.prototype.replaceOne = function(updateDocument) {
  80. this.updateOne(updateDocument);
  81. }
  82. /**
  83. * Upsert modifier for update bulk operation
  84. *
  85. * @method
  86. * @throws {MongoError}
  87. * @return {FindOperatorsUnordered}
  88. */
  89. FindOperatorsUnordered.prototype.upsert = function() {
  90. this.s.currentOp.upsert = true;
  91. return this;
  92. }
  93. /**
  94. * Add a remove one operation to the bulk operation
  95. *
  96. * @method
  97. * @throws {MongoError}
  98. * @return {UnorderedBulkOperation}
  99. */
  100. FindOperatorsUnordered.prototype.removeOne = function() {
  101. // Establish the update command
  102. var document = {
  103. q: this.s.currentOp.selector
  104. , limit: 1
  105. }
  106. // Clear out current Op
  107. this.s.currentOp = null;
  108. // Add the remove document to the list
  109. return addToOperationsList(this, common.REMOVE, document);
  110. }
  111. /**
  112. * Add a remove operation to the bulk operation
  113. *
  114. * @method
  115. * @throws {MongoError}
  116. * @return {UnorderedBulkOperation}
  117. */
  118. FindOperatorsUnordered.prototype.remove = function() {
  119. // Establish the update command
  120. var document = {
  121. q: this.s.currentOp.selector
  122. , limit: 0
  123. }
  124. // Clear out current Op
  125. this.s.currentOp = null;
  126. // Add the remove document to the list
  127. return addToOperationsList(this, common.REMOVE, document);
  128. }
  129. //
  130. // Add to the operations list
  131. //
  132. var addToOperationsList = function(_self, docType, document) {
  133. // Get the bsonSize
  134. var bsonSize = bson.calculateObjectSize(document, {
  135. checkKeys: false,
  136. });
  137. // Throw error if the doc is bigger than the max BSON size
  138. if(bsonSize >= _self.s.maxBatchSizeBytes) throw toError("document is larger than the maximum size " + _self.s.maxBatchSizeBytes);
  139. // Holds the current batch
  140. _self.s.currentBatch = null;
  141. // Get the right type of batch
  142. if(docType == common.INSERT) {
  143. _self.s.currentBatch = _self.s.currentInsertBatch;
  144. } else if(docType == common.UPDATE) {
  145. _self.s.currentBatch = _self.s.currentUpdateBatch;
  146. } else if(docType == common.REMOVE) {
  147. _self.s.currentBatch = _self.s.currentRemoveBatch;
  148. }
  149. // Create a new batch object if we don't have a current one
  150. if(_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  151. // Check if we need to create a new batch
  152. if(((_self.s.currentBatch.size + 1) >= _self.s.maxWriteBatchSize)
  153. || ((_self.s.currentBatch.sizeBytes + bsonSize) >= _self.s.maxBatchSizeBytes)
  154. || (_self.s.currentBatch.batchType != docType)) {
  155. // Save the batch to the execution stack
  156. _self.s.batches.push(_self.s.currentBatch);
  157. // Create a new batch
  158. _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  159. }
  160. // We have an array of documents
  161. if(Array.isArray(document)) {
  162. throw toError("operation passed in cannot be an Array");
  163. } else {
  164. _self.s.currentBatch.operations.push(document);
  165. _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
  166. _self.s.currentIndex = _self.s.currentIndex + 1;
  167. }
  168. // Save back the current Batch to the right type
  169. if(docType == common.INSERT) {
  170. _self.s.currentInsertBatch = _self.s.currentBatch;
  171. _self.s.bulkResult.insertedIds.push({index: _self.s.bulkResult.insertedIds.length, _id: document._id});
  172. } else if(docType == common.UPDATE) {
  173. _self.s.currentUpdateBatch = _self.s.currentBatch;
  174. } else if(docType == common.REMOVE) {
  175. _self.s.currentRemoveBatch = _self.s.currentBatch;
  176. }
  177. // Update current batch size
  178. _self.s.currentBatch.size = _self.s.currentBatch.size + 1;
  179. _self.s.currentBatch.sizeBytes = _self.s.currentBatch.sizeBytes + bsonSize;
  180. // Return self
  181. return _self;
  182. }
  183. /**
  184. * Create a new UnorderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  185. * @class
  186. * @property {number} length Get the number of operations in the bulk.
  187. * @return {UnorderedBulkOperation} a UnorderedBulkOperation instance.
  188. */
  189. var UnorderedBulkOperation = function(topology, collection, options) {
  190. options = options == null ? {} : options;
  191. // Get the namespace for the write operations
  192. var namespace = collection.collectionName;
  193. // Used to mark operation as executed
  194. var executed = false;
  195. // Current item
  196. // var currentBatch = null;
  197. var currentOp = null;
  198. // Handle to the bson serializer, used to calculate running sizes
  199. var bson = topology.bson;
  200. // Set max byte size
  201. var maxBatchSizeBytes = topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
  202. ? topology.isMasterDoc.maxBsonObjectSize : (1024*1025*16);
  203. var maxWriteBatchSize = topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
  204. ? topology.isMasterDoc.maxWriteBatchSize : 1000;
  205. // Get the write concern
  206. var writeConcern = common.writeConcern(shallowClone(options), collection, options);
  207. // Get the promiseLibrary
  208. var promiseLibrary = options.promiseLibrary;
  209. // No promise library selected fall back
  210. if(!promiseLibrary) {
  211. promiseLibrary = typeof global.Promise == 'function' ?
  212. global.Promise : require('es6-promise').Promise;
  213. }
  214. // Final results
  215. var bulkResult = {
  216. ok: 1
  217. , writeErrors: []
  218. , writeConcernErrors: []
  219. , insertedIds: []
  220. , nInserted: 0
  221. , nUpserted: 0
  222. , nMatched: 0
  223. , nModified: 0
  224. , nRemoved: 0
  225. , upserted: []
  226. };
  227. // Internal state
  228. this.s = {
  229. // Final result
  230. bulkResult: bulkResult
  231. // Current batch state
  232. , currentInsertBatch: null
  233. , currentUpdateBatch: null
  234. , currentRemoveBatch: null
  235. , currentBatch: null
  236. , currentIndex: 0
  237. , batches: []
  238. // Write concern
  239. , writeConcern: writeConcern
  240. // Max batch size options
  241. , maxBatchSizeBytes: maxBatchSizeBytes
  242. , maxWriteBatchSize: maxWriteBatchSize
  243. // Namespace
  244. , namespace: namespace
  245. // BSON
  246. , bson: bson
  247. // Topology
  248. , topology: topology
  249. // Options
  250. , options: options
  251. // Current operation
  252. , currentOp: currentOp
  253. // Executed
  254. , executed: executed
  255. // Collection
  256. , collection: collection
  257. // Promise Library
  258. , promiseLibrary: promiseLibrary
  259. // Bypass validation
  260. , bypassDocumentValidation: typeof options.bypassDocumentValidation == 'boolean' ? options.bypassDocumentValidation : false
  261. }
  262. }
  263. var define = UnorderedBulkOperation.define = new Define('UnorderedBulkOperation', UnorderedBulkOperation, false);
  264. /**
  265. * Add a single insert document to the bulk operation
  266. *
  267. * @param {object} doc the document to insert
  268. * @throws {MongoError}
  269. * @return {UnorderedBulkOperation}
  270. */
  271. UnorderedBulkOperation.prototype.insert = function(document) {
  272. if(this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null) document._id = new ObjectID();
  273. return addToOperationsList(this, common.INSERT, document);
  274. }
  275. /**
  276. * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
  277. *
  278. * @method
  279. * @param {object} selector The selector for the bulk operation.
  280. * @throws {MongoError}
  281. * @return {FindOperatorsUnordered}
  282. */
  283. UnorderedBulkOperation.prototype.find = function(selector) {
  284. if (!selector) {
  285. throw toError("Bulk find operation must specify a selector");
  286. }
  287. // Save a current selector
  288. this.s.currentOp = {
  289. selector: selector
  290. }
  291. return new FindOperatorsUnordered(this);
  292. }
  293. Object.defineProperty(UnorderedBulkOperation.prototype, 'length', {
  294. enumerable: true,
  295. get: function() {
  296. return this.s.currentIndex;
  297. }
  298. });
  299. UnorderedBulkOperation.prototype.raw = function(op) {
  300. var key = Object.keys(op)[0];
  301. // Set up the force server object id
  302. var forceServerObjectId = typeof this.s.options.forceServerObjectId == 'boolean'
  303. ? this.s.options.forceServerObjectId : this.s.collection.s.db.options.forceServerObjectId;
  304. // Update operations
  305. if((op.updateOne && op.updateOne.q)
  306. || (op.updateMany && op.updateMany.q)
  307. || (op.replaceOne && op.replaceOne.q)) {
  308. op[key].multi = op.updateOne || op.replaceOne ? false : true;
  309. return addToOperationsList(this, common.UPDATE, op[key]);
  310. }
  311. // Crud spec update format
  312. if(op.updateOne || op.updateMany || op.replaceOne) {
  313. var multi = op.updateOne || op.replaceOne ? false : true;
  314. var operation = {q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi}
  315. if(op[key].upsert) operation.upsert = true;
  316. return addToOperationsList(this, common.UPDATE, operation);
  317. }
  318. // Remove operations
  319. if(op.removeOne || op.removeMany || (op.deleteOne && op.deleteOne.q) || op.deleteMany && op.deleteMany.q) {
  320. op[key].limit = op.removeOne ? 1 : 0;
  321. return addToOperationsList(this, common.REMOVE, op[key]);
  322. }
  323. // Crud spec delete operations, less efficient
  324. if(op.deleteOne || op.deleteMany) {
  325. var limit = op.deleteOne ? 1 : 0;
  326. operation = {q: op[key].filter, limit: limit}
  327. return addToOperationsList(this, common.REMOVE, operation);
  328. }
  329. // Insert operations
  330. if(op.insertOne && op.insertOne.document == null) {
  331. if(forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
  332. return addToOperationsList(this, common.INSERT, op.insertOne);
  333. } else if(op.insertOne && op.insertOne.document) {
  334. if(forceServerObjectId !== true && op.insertOne.document._id == null) op.insertOne.document._id = new ObjectID();
  335. return addToOperationsList(this, common.INSERT, op.insertOne.document);
  336. }
  337. if(op.insertMany) {
  338. for(var i = 0; i < op.insertMany.length; i++) {
  339. if(forceServerObjectId !== true && op.insertMany[i]._id == null) op.insertMany[i]._id = new ObjectID();
  340. addToOperationsList(this, common.INSERT, op.insertMany[i]);
  341. }
  342. return;
  343. }
  344. // No valid type of operation
  345. throw toError("bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany");
  346. }
  347. //
  348. // Execute the command
  349. var executeBatch = function(self, batch, callback) {
  350. var finalOptions = {ordered: false}
  351. if(self.s.writeConcern != null) {
  352. finalOptions.writeConcern = self.s.writeConcern;
  353. }
  354. var resultHandler = function(err, result) {
  355. // Error is a driver related error not a bulk op error, terminate
  356. if(err && err.driver || err && err.message) {
  357. return handleCallback(callback, err);
  358. }
  359. // If we have and error
  360. if(err) err.ok = 0;
  361. handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, result));
  362. }
  363. // Set an operationIf if provided
  364. if(self.operationId) {
  365. resultHandler.operationId = self.operationId;
  366. }
  367. // Serialize functions
  368. if(self.s.options.serializeFunctions) {
  369. finalOptions.serializeFunctions = true
  370. }
  371. // Is the bypassDocumentValidation options specific
  372. if(self.s.bypassDocumentValidation == true) {
  373. finalOptions.bypassDocumentValidation = true;
  374. }
  375. try {
  376. if(batch.batchType == common.INSERT) {
  377. self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  378. } else if(batch.batchType == common.UPDATE) {
  379. self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  380. } else if(batch.batchType == common.REMOVE) {
  381. self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  382. }
  383. } catch(err) {
  384. // Force top level error
  385. err.ok = 0;
  386. // Merge top level error and return
  387. handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
  388. }
  389. }
  390. //
  391. // Execute all the commands
  392. var executeBatches = function(self, callback) {
  393. var numberOfCommandsToExecute = self.s.batches.length;
  394. // Execute over all the batches
  395. for(var i = 0; i < self.s.batches.length; i++) {
  396. executeBatch(self, self.s.batches[i], function(err) {
  397. // Driver layer error capture it
  398. if(err) error = err;
  399. // Count down the number of commands left to execute
  400. numberOfCommandsToExecute = numberOfCommandsToExecute - 1;
  401. // Execute
  402. if(numberOfCommandsToExecute == 0) {
  403. // Driver level error
  404. if(error) return handleCallback(callback, error);
  405. // Treat write errors
  406. var error = self.s.bulkResult.writeErrors.length > 0 ? toError(self.s.bulkResult.writeErrors[0]) : null;
  407. handleCallback(callback, error, new BulkWriteResult(self.s.bulkResult));
  408. }
  409. });
  410. }
  411. }
  412. /**
  413. * The callback format for results
  414. * @callback UnorderedBulkOperation~resultCallback
  415. * @param {MongoError} error An error instance representing the error during the execution.
  416. * @param {BulkWriteResult} result The bulk write result.
  417. */
  418. /**
  419. * Execute the ordered bulk operation
  420. *
  421. * @method
  422. * @param {object} [options=null] Optional settings.
  423. * @param {(number|string)} [options.w=null] The write concern.
  424. * @param {number} [options.wtimeout=null] The write concern timeout.
  425. * @param {boolean} [options.j=false] Specify a journal write concern.
  426. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  427. * @param {UnorderedBulkOperation~resultCallback} [callback] The result callback
  428. * @throws {MongoError}
  429. * @return {Promise} returns Promise if no callback passed
  430. */
  431. UnorderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
  432. var self = this;
  433. if (this.s.executed) {
  434. var executedError = toError('batch cannot be re-executed');
  435. return (typeof callback === 'function') ?
  436. callback(executedError, null) : this.s.promiseLibrary.reject(executedError);
  437. }
  438. if (typeof _writeConcern === 'function') {
  439. callback = _writeConcern;
  440. } else if (_writeConcern && typeof _writeConcern === 'object') {
  441. this.s.writeConcern = _writeConcern;
  442. }
  443. // If we have current batch
  444. if (this.s.currentInsertBatch) this.s.batches.push(this.s.currentInsertBatch);
  445. if (this.s.currentUpdateBatch) this.s.batches.push(this.s.currentUpdateBatch);
  446. if (this.s.currentRemoveBatch) this.s.batches.push(this.s.currentRemoveBatch);
  447. // If we have no operations in the bulk raise an error
  448. if (this.s.batches.length == 0) {
  449. var emptyBatchError = toError('Invalid Operation, no operations specified');
  450. return (typeof callback === 'function') ?
  451. callback(emptyBatchError, null) : this.s.promiseLibrary.reject(emptyBatchError);
  452. }
  453. // Execute using callback
  454. if (typeof callback === 'function') return executeBatches(this, callback);
  455. // Return a Promise
  456. return new this.s.promiseLibrary(function(resolve, reject) {
  457. executeBatches(self, function(err, r) {
  458. if (err) return reject(err);
  459. resolve(r);
  460. });
  461. });
  462. }
  463. define.classMethod('execute', {callback: true, promise:false});
  464. /**
  465. * Returns an unordered batch object
  466. * @ignore
  467. */
  468. var initializeUnorderedBulkOp = function(topology, collection, options) {
  469. return new UnorderedBulkOperation(topology, collection, options);
  470. }
  471. initializeUnorderedBulkOp.UnorderedBulkOperation = UnorderedBulkOperation;
  472. module.exports = initializeUnorderedBulkOp;
  473. module.exports.Bulk = UnorderedBulkOperation;