Source: lib/bulk/ordered.js

  1. "use strict";
  2. var common = require('./common')
  3. , utils = require('../utils')
  4. , toError = require('../utils').toError
  5. , handleCallback = require('../utils').handleCallback
  6. , shallowClone = utils.shallowClone
  7. , BulkWriteResult = common.BulkWriteResult
  8. , ObjectID = require('mongodb-core').BSON.ObjectID
  9. , Define = require('../metadata')
  10. , BSON = require('mongodb-core').BSON
  11. , Batch = common.Batch
  12. , mergeBatchResults = common.mergeBatchResults;
  13. var bson = new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
  14. BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
  15. BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]);
  16. /**
  17. * Create a FindOperatorsOrdered instance (INTERNAL TYPE, do not instantiate directly)
  18. * @class
  19. * @return {FindOperatorsOrdered} a FindOperatorsOrdered instance.
  20. */
  21. var FindOperatorsOrdered = function(self) {
  22. this.s = self.s;
  23. }
  24. /**
  25. * Add a single update document to the bulk operation
  26. *
  27. * @method
  28. * @param {object} doc update operations
  29. * @throws {MongoError}
  30. * @return {OrderedBulkOperation}
  31. */
  32. FindOperatorsOrdered.prototype.update = function(updateDocument) {
  33. // Perform upsert
  34. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  35. // Establish the update command
  36. var document = {
  37. q: this.s.currentOp.selector
  38. , u: updateDocument
  39. , multi: true
  40. , upsert: upsert
  41. }
  42. // Clear out current Op
  43. this.s.currentOp = null;
  44. // Add the update document to the list
  45. return addToOperationsList(this, common.UPDATE, document);
  46. }
  47. /**
  48. * Add a single update one document to the bulk operation
  49. *
  50. * @method
  51. * @param {object} doc update operations
  52. * @throws {MongoError}
  53. * @return {OrderedBulkOperation}
  54. */
  55. FindOperatorsOrdered.prototype.updateOne = function(updateDocument) {
  56. // Perform upsert
  57. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  58. // Establish the update command
  59. var document = {
  60. q: this.s.currentOp.selector
  61. , u: updateDocument
  62. , multi: false
  63. , upsert: upsert
  64. }
  65. // Clear out current Op
  66. this.s.currentOp = null;
  67. // Add the update document to the list
  68. return addToOperationsList(this, common.UPDATE, document);
  69. }
  70. /**
  71. * Add a replace one operation to the bulk operation
  72. *
  73. * @method
  74. * @param {object} doc the new document to replace the existing one with
  75. * @throws {MongoError}
  76. * @return {OrderedBulkOperation}
  77. */
  78. FindOperatorsOrdered.prototype.replaceOne = function(updateDocument) {
  79. this.updateOne(updateDocument);
  80. }
  81. /**
  82. * Upsert modifier for update bulk operation
  83. *
  84. * @method
  85. * @throws {MongoError}
  86. * @return {FindOperatorsOrdered}
  87. */
  88. FindOperatorsOrdered.prototype.upsert = function() {
  89. this.s.currentOp.upsert = true;
  90. return this;
  91. }
  92. /**
  93. * Add a remove one operation to the bulk operation
  94. *
  95. * @method
  96. * @throws {MongoError}
  97. * @return {OrderedBulkOperation}
  98. */
  99. FindOperatorsOrdered.prototype.deleteOne = function() {
  100. // Establish the update command
  101. var document = {
  102. q: this.s.currentOp.selector
  103. , limit: 1
  104. }
  105. // Clear out current Op
  106. this.s.currentOp = null;
  107. // Add the remove document to the list
  108. return addToOperationsList(this, common.REMOVE, document);
  109. }
  110. // Backward compatibility
  111. FindOperatorsOrdered.prototype.removeOne = FindOperatorsOrdered.prototype.deleteOne;
  112. /**
  113. * Add a remove operation to the bulk operation
  114. *
  115. * @method
  116. * @throws {MongoError}
  117. * @return {OrderedBulkOperation}
  118. */
  119. FindOperatorsOrdered.prototype.delete = function() {
  120. // Establish the update command
  121. var document = {
  122. q: this.s.currentOp.selector
  123. , limit: 0
  124. }
  125. // Clear out current Op
  126. this.s.currentOp = null;
  127. // Add the remove document to the list
  128. return addToOperationsList(this, common.REMOVE, document);
  129. }
  130. // Backward compatibility
  131. FindOperatorsOrdered.prototype.remove = FindOperatorsOrdered.prototype.delete;
  132. // Add to internal list of documents
  133. var addToOperationsList = function(_self, docType, document) {
  134. // Get the bsonSize
  135. var bsonSize = bson.calculateObjectSize(document, {
  136. checkKeys: false,
  137. });
  138. // Throw error if the doc is bigger than the max BSON size
  139. if(bsonSize >= _self.s.maxBatchSizeBytes) {
  140. throw toError("document is larger than the maximum size " + _self.s.maxBatchSizeBytes);
  141. }
  142. // Create a new batch object if we don't have a current one
  143. if(_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  144. // Check if we need to create a new batch
  145. if(((_self.s.currentBatchSize + 1) >= _self.s.maxWriteBatchSize)
  146. || ((_self.s.currentBatchSizeBytes + _self.s.currentBatchSizeBytes) >= _self.s.maxBatchSizeBytes)
  147. || (_self.s.currentBatch.batchType != docType)) {
  148. // Save the batch to the execution stack
  149. _self.s.batches.push(_self.s.currentBatch);
  150. // Create a new batch
  151. _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  152. // Reset the current size trackers
  153. _self.s.currentBatchSize = 0;
  154. _self.s.currentBatchSizeBytes = 0;
  155. } else {
  156. // Update current batch size
  157. _self.s.currentBatchSize = _self.s.currentBatchSize + 1;
  158. _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
  159. }
  160. if(docType == common.INSERT) {
  161. _self.s.bulkResult.insertedIds.push({index: _self.s.currentIndex, _id: document._id});
  162. }
  163. // We have an array of documents
  164. if(Array.isArray(document)) {
  165. throw toError("operation passed in cannot be an Array");
  166. } else {
  167. _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
  168. _self.s.currentBatch.operations.push(document)
  169. _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
  170. _self.s.currentIndex = _self.s.currentIndex + 1;
  171. }
  172. // Return self
  173. return _self;
  174. }
  175. /**
  176. * Create a new OrderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  177. * @class
  178. * @property {number} length Get the number of operations in the bulk.
  179. * @return {OrderedBulkOperation} a OrderedBulkOperation instance.
  180. */
  181. function OrderedBulkOperation(topology, collection, options) {
  182. options = options == null ? {} : options;
  183. // TODO Bring from driver information in isMaster
  184. var executed = false;
  185. // Current item
  186. var currentOp = null;
  187. // Handle to the bson serializer, used to calculate running sizes
  188. var bson = topology.bson;
  189. // Namespace for the operation
  190. var namespace = collection.collectionName;
  191. // Set max byte size
  192. var maxBatchSizeBytes = topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
  193. ? topology.isMasterDoc.maxBsonObjectSize : (1024*1025*16);
  194. var maxWriteBatchSize = topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
  195. ? topology.isMasterDoc.maxWriteBatchSize : 1000;
  196. // Get the write concern
  197. var writeConcern = common.writeConcern(shallowClone(options), collection, options);
  198. // Get the promiseLibrary
  199. var promiseLibrary = options.promiseLibrary;
  200. // No promise library selected fall back
  201. if(!promiseLibrary) {
  202. promiseLibrary = typeof global.Promise == 'function' ?
  203. global.Promise : require('es6-promise').Promise;
  204. }
  205. // Final results
  206. var bulkResult = {
  207. ok: 1
  208. , writeErrors: []
  209. , writeConcernErrors: []
  210. , insertedIds: []
  211. , nInserted: 0
  212. , nUpserted: 0
  213. , nMatched: 0
  214. , nModified: 0
  215. , nRemoved: 0
  216. , upserted: []
  217. };
  218. // Internal state
  219. this.s = {
  220. // Final result
  221. bulkResult: bulkResult
  222. // Current batch state
  223. , currentBatch: null
  224. , currentIndex: 0
  225. , currentBatchSize: 0
  226. , currentBatchSizeBytes: 0
  227. , batches: []
  228. // Write concern
  229. , writeConcern: writeConcern
  230. // Max batch size options
  231. , maxBatchSizeBytes: maxBatchSizeBytes
  232. , maxWriteBatchSize: maxWriteBatchSize
  233. // Namespace
  234. , namespace: namespace
  235. // BSON
  236. , bson: bson
  237. // Topology
  238. , topology: topology
  239. // Options
  240. , options: options
  241. // Current operation
  242. , currentOp: currentOp
  243. // Executed
  244. , executed: executed
  245. // Collection
  246. , collection: collection
  247. // Promise Library
  248. , promiseLibrary: promiseLibrary
  249. // Fundamental error
  250. , err: null
  251. // Bypass validation
  252. , bypassDocumentValidation: typeof options.bypassDocumentValidation == 'boolean' ? options.bypassDocumentValidation : false
  253. }
  254. }
  255. var define = OrderedBulkOperation.define = new Define('OrderedBulkOperation', OrderedBulkOperation, false);
  256. OrderedBulkOperation.prototype.raw = function(op) {
  257. var key = Object.keys(op)[0];
  258. // Set up the force server object id
  259. var forceServerObjectId = typeof this.s.options.forceServerObjectId == 'boolean'
  260. ? this.s.options.forceServerObjectId : this.s.collection.s.db.options.forceServerObjectId;
  261. // Update operations
  262. if((op.updateOne && op.updateOne.q)
  263. || (op.updateMany && op.updateMany.q)
  264. || (op.replaceOne && op.replaceOne.q)) {
  265. op[key].multi = op.updateOne || op.replaceOne ? false : true;
  266. return addToOperationsList(this, common.UPDATE, op[key]);
  267. }
  268. // Crud spec update format
  269. if(op.updateOne || op.updateMany || op.replaceOne) {
  270. var multi = op.updateOne || op.replaceOne ? false : true;
  271. var operation = {q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi}
  272. operation.upsert = op[key].upsert ? true: false;
  273. if(op.collation) operation.collation = op.collation;
  274. return addToOperationsList(this, common.UPDATE, operation);
  275. }
  276. // Remove operations
  277. if(op.removeOne || op.removeMany || (op.deleteOne && op.deleteOne.q) || op.deleteMany && op.deleteMany.q) {
  278. op[key].limit = op.removeOne ? 1 : 0;
  279. return addToOperationsList(this, common.REMOVE, op[key]);
  280. }
  281. // Crud spec delete operations, less efficient
  282. if(op.deleteOne || op.deleteMany) {
  283. var limit = op.deleteOne ? 1 : 0;
  284. operation = {q: op[key].filter, limit: limit}
  285. if(op.collation) operation.collation = op.collation;
  286. return addToOperationsList(this, common.REMOVE, operation);
  287. }
  288. // Insert operations
  289. if(op.insertOne && op.insertOne.document == null) {
  290. if(forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
  291. return addToOperationsList(this, common.INSERT, op.insertOne);
  292. } else if(op.insertOne && op.insertOne.document) {
  293. if(forceServerObjectId !== true && op.insertOne.document._id == null) op.insertOne.document._id = new ObjectID();
  294. return addToOperationsList(this, common.INSERT, op.insertOne.document);
  295. }
  296. if(op.insertMany) {
  297. for(var i = 0; i < op.insertMany.length; i++) {
  298. if(forceServerObjectId !== true && op.insertMany[i]._id == null) op.insertMany[i]._id = new ObjectID();
  299. addToOperationsList(this, common.INSERT, op.insertMany[i]);
  300. }
  301. return;
  302. }
  303. // No valid type of operation
  304. throw toError("bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany");
  305. }
  306. /**
  307. * Add a single insert document to the bulk operation
  308. *
  309. * @param {object} doc the document to insert
  310. * @throws {MongoError}
  311. * @return {OrderedBulkOperation}
  312. */
  313. OrderedBulkOperation.prototype.insert = function(document) {
  314. if(this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null) document._id = new ObjectID();
  315. return addToOperationsList(this, common.INSERT, document);
  316. }
  317. /**
  318. * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
  319. *
  320. * @method
  321. * @param {object} selector The selector for the bulk operation.
  322. * @throws {MongoError}
  323. * @return {FindOperatorsOrdered}
  324. */
  325. OrderedBulkOperation.prototype.find = function(selector) {
  326. if (!selector) {
  327. throw toError("Bulk find operation must specify a selector");
  328. }
  329. // Save a current selector
  330. this.s.currentOp = {
  331. selector: selector
  332. }
  333. return new FindOperatorsOrdered(this);
  334. }
  335. Object.defineProperty(OrderedBulkOperation.prototype, 'length', {
  336. enumerable: true,
  337. get: function() {
  338. return this.s.currentIndex;
  339. }
  340. });
  341. //
  342. // Execute next write command in a chain
  343. var executeCommands = function(self, callback) {
  344. if(self.s.batches.length == 0) {
  345. return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
  346. }
  347. // Ordered execution of the command
  348. var batch = self.s.batches.shift();
  349. var resultHandler = function(err, result) {
  350. // Error is a driver related error not a bulk op error, terminate
  351. if(err && err.driver || err && err.message) {
  352. return handleCallback(callback, err);
  353. }
  354. // If we have and error
  355. if(err) err.ok = 0;
  356. // Merge the results together
  357. var mergeResult = mergeBatchResults(true, batch, self.s.bulkResult, err, result);
  358. if(mergeResult != null) {
  359. return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
  360. }
  361. // If we are ordered and have errors and they are
  362. // not all replication errors terminate the operation
  363. if(self.s.bulkResult.writeErrors.length > 0) {
  364. return handleCallback(callback, toError(self.s.bulkResult.writeErrors[0]), new BulkWriteResult(self.s.bulkResult));
  365. }
  366. // Execute the next command in line
  367. executeCommands(self, callback);
  368. }
  369. var finalOptions = {ordered: true}
  370. if(self.s.writeConcern != null) {
  371. finalOptions.writeConcern = self.s.writeConcern;
  372. }
  373. // Set an operationIf if provided
  374. if(self.operationId) {
  375. resultHandler.operationId = self.operationId;
  376. }
  377. // Serialize functions
  378. if(self.s.options.serializeFunctions) {
  379. finalOptions.serializeFunctions = true
  380. }
  381. // Serialize functions
  382. if(self.s.options.ignoreUndefined) {
  383. finalOptions.ignoreUndefined = true
  384. }
  385. // Is the bypassDocumentValidation options specific
  386. if(self.s.bypassDocumentValidation == true) {
  387. finalOptions.bypassDocumentValidation = true;
  388. }
  389. try {
  390. if(batch.batchType == common.INSERT) {
  391. self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  392. } else if(batch.batchType == common.UPDATE) {
  393. self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  394. } else if(batch.batchType == common.REMOVE) {
  395. self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  396. }
  397. } catch(err) {
  398. // Force top level error
  399. err.ok = 0;
  400. // Merge top level error and return
  401. handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
  402. }
  403. }
  404. /**
  405. * The callback format for results
  406. * @callback OrderedBulkOperation~resultCallback
  407. * @param {MongoError} error An error instance representing the error during the execution.
  408. * @param {BulkWriteResult} result The bulk write result.
  409. */
  410. /**
  411. * Execute the ordered bulk operation
  412. *
  413. * @method
  414. * @param {object} [options=null] Optional settings.
  415. * @param {(number|string)} [options.w=null] The write concern.
  416. * @param {number} [options.wtimeout=null] The write concern timeout.
  417. * @param {boolean} [options.j=false] Specify a journal write concern.
  418. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  419. * @param {OrderedBulkOperation~resultCallback} [callback] The result callback
  420. * @throws {MongoError}
  421. * @return {Promise} returns Promise if no callback passed
  422. */
  423. OrderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
  424. var self = this;
  425. if (this.s.executed) {
  426. var executedError = toError('batch cannot be re-executed');
  427. return (typeof callback === 'function') ?
  428. callback(executedError, null) : this.s.promiseLibrary.reject(executedError);
  429. }
  430. if (typeof _writeConcern === 'function') {
  431. callback = _writeConcern;
  432. } else if (_writeConcern && typeof _writeConcern === 'object') {
  433. this.s.writeConcern = _writeConcern;
  434. }
  435. // If we have current batch
  436. if (this.s.currentBatch) this.s.batches.push(this.s.currentBatch)
  437. // If we have no operations in the bulk raise an error
  438. if (this.s.batches.length == 0) {
  439. var emptyBatchError = toError('Invalid Operation, no operations specified');
  440. return (typeof callback === 'function') ?
  441. callback(emptyBatchError, null) : this.s.promiseLibrary.reject(emptyBatchError);
  442. }
  443. // Execute using callback
  444. if (typeof callback === 'function') {
  445. return executeCommands(this, callback);
  446. }
  447. // Return a Promise
  448. return new this.s.promiseLibrary(function(resolve, reject) {
  449. executeCommands(self, function(err, r) {
  450. if (err) return reject(err);
  451. resolve(r);
  452. });
  453. });
  454. }
  455. define.classMethod('execute', {callback: true, promise:false});
  456. /**
  457. * Returns an unordered batch object
  458. * @ignore
  459. */
  460. var initializeOrderedBulkOp = function(topology, collection, options) {
  461. return new OrderedBulkOperation(topology, collection, options);
  462. }
  463. initializeOrderedBulkOp.OrderedBulkOperation = OrderedBulkOperation;
  464. module.exports = initializeOrderedBulkOp;
  465. module.exports.Bulk = OrderedBulkOperation;