Source: lib/collection.js

  1. 'use strict';
  2. const checkCollectionName = require('./utils').checkCollectionName;
  3. const ObjectID = require('mongodb-core').BSON.ObjectID;
  4. const Long = require('mongodb-core').BSON.Long;
  5. const Code = require('mongodb-core').BSON.Code;
  6. const f = require('util').format;
  7. const AggregationCursor = require('./aggregation_cursor');
  8. const MongoError = require('mongodb-core').MongoError;
  9. const shallowClone = require('./utils').shallowClone;
  10. const isObject = require('./utils').isObject;
  11. const toError = require('./utils').toError;
  12. const normalizeHintField = require('./utils').normalizeHintField;
  13. const handleCallback = require('./utils').handleCallback;
  14. const decorateCommand = require('./utils').decorateCommand;
  15. const formattedOrderClause = require('./utils').formattedOrderClause;
  16. const ReadPreference = require('mongodb-core').ReadPreference;
  17. const CommandCursor = require('./command_cursor');
  18. const unordered = require('./bulk/unordered');
  19. const ordered = require('./bulk/ordered');
  20. const ChangeStream = require('./change_stream');
  21. const executeOperation = require('./utils').executeOperation;
  22. const applyWriteConcern = require('./utils').applyWriteConcern;
  23. /**
  24. * @fileOverview The **Collection** class is an internal class that embodies a MongoDB collection
  25. * allowing for insert/update/remove/find and other command operation on that MongoDB collection.
  26. *
  27. * **COLLECTION Cannot directly be instantiated**
  28. * @example
  29. * const MongoClient = require('mongodb').MongoClient;
  30. * const test = require('assert');
  31. * // Connection url
  32. * const url = 'mongodb://localhost:27017';
  33. * // Database Name
  34. * const dbName = 'test';
  35. * // Connect using MongoClient
  36. * MongoClient.connect(url, function(err, client) {
  37. * // Create a collection we want to drop later
  38. * const col = client.db(dbName).collection('createIndexExample1');
  39. * // Show that duplicate records got dropped
  40. * col.find({}).toArray(function(err, items) {
  41. * test.equal(null, err);
  42. * test.equal(4, items.length);
  43. * client.close();
  44. * });
  45. * });
  46. */
  47. var mergeKeys = ['readPreference', 'ignoreUndefined'];
  48. /**
  49. * Create a new Collection instance (INTERNAL TYPE, do not instantiate directly)
  50. * @class
  51. * @property {string} collectionName Get the collection name.
  52. * @property {string} namespace Get the full collection namespace.
  53. * @property {object} writeConcern The current write concern values.
  54. * @property {object} readConcern The current read concern values.
  55. * @property {object} hint Get current index hint for collection.
  56. * @return {Collection} a Collection instance.
  57. */
  58. var Collection = function(db, topology, dbName, name, pkFactory, options) {
  59. checkCollectionName(name);
  60. // Unpack variables
  61. var internalHint = null;
  62. var slaveOk = options == null || options.slaveOk == null ? db.slaveOk : options.slaveOk;
  63. var serializeFunctions =
  64. options == null || options.serializeFunctions == null
  65. ? db.s.options.serializeFunctions
  66. : options.serializeFunctions;
  67. var raw = options == null || options.raw == null ? db.s.options.raw : options.raw;
  68. var promoteLongs =
  69. options == null || options.promoteLongs == null
  70. ? db.s.options.promoteLongs
  71. : options.promoteLongs;
  72. var promoteValues =
  73. options == null || options.promoteValues == null
  74. ? db.s.options.promoteValues
  75. : options.promoteValues;
  76. var promoteBuffers =
  77. options == null || options.promoteBuffers == null
  78. ? db.s.options.promoteBuffers
  79. : options.promoteBuffers;
  80. var readPreference = null;
  81. var collectionHint = null;
  82. var namespace = f('%s.%s', dbName, name);
  83. // Get the promiseLibrary
  84. var promiseLibrary = options.promiseLibrary || Promise;
  85. // Assign the right collection level readPreference
  86. if (options && options.readPreference) {
  87. readPreference = options.readPreference;
  88. } else if (db.options.readPreference) {
  89. readPreference = db.options.readPreference;
  90. }
  91. // Set custom primary key factory if provided
  92. pkFactory = pkFactory == null ? ObjectID : pkFactory;
  93. // Internal state
  94. this.s = {
  95. // Set custom primary key factory if provided
  96. pkFactory: pkFactory,
  97. // Db
  98. db: db,
  99. // Topology
  100. topology: topology,
  101. // dbName
  102. dbName: dbName,
  103. // Options
  104. options: options,
  105. // Namespace
  106. namespace: namespace,
  107. // Read preference
  108. readPreference: readPreference,
  109. // SlaveOK
  110. slaveOk: slaveOk,
  111. // Serialize functions
  112. serializeFunctions: serializeFunctions,
  113. // Raw
  114. raw: raw,
  115. // promoteLongs
  116. promoteLongs: promoteLongs,
  117. // promoteValues
  118. promoteValues: promoteValues,
  119. // promoteBuffers
  120. promoteBuffers: promoteBuffers,
  121. // internalHint
  122. internalHint: internalHint,
  123. // collectionHint
  124. collectionHint: collectionHint,
  125. // Name
  126. name: name,
  127. // Promise library
  128. promiseLibrary: promiseLibrary,
  129. // Read Concern
  130. readConcern: options.readConcern
  131. };
  132. };
  133. Object.defineProperty(Collection.prototype, 'dbName', {
  134. enumerable: true,
  135. get: function() {
  136. return this.s.dbName;
  137. }
  138. });
  139. Object.defineProperty(Collection.prototype, 'collectionName', {
  140. enumerable: true,
  141. get: function() {
  142. return this.s.name;
  143. }
  144. });
  145. Object.defineProperty(Collection.prototype, 'namespace', {
  146. enumerable: true,
  147. get: function() {
  148. return this.s.namespace;
  149. }
  150. });
  151. Object.defineProperty(Collection.prototype, 'readConcern', {
  152. enumerable: true,
  153. get: function() {
  154. return this.s.readConcern || { level: 'local' };
  155. }
  156. });
  157. Object.defineProperty(Collection.prototype, 'writeConcern', {
  158. enumerable: true,
  159. get: function() {
  160. var ops = {};
  161. if (this.s.options.w != null) ops.w = this.s.options.w;
  162. if (this.s.options.j != null) ops.j = this.s.options.j;
  163. if (this.s.options.fsync != null) ops.fsync = this.s.options.fsync;
  164. if (this.s.options.wtimeout != null) ops.wtimeout = this.s.options.wtimeout;
  165. return ops;
  166. }
  167. });
  168. /**
  169. * @ignore
  170. */
  171. Object.defineProperty(Collection.prototype, 'hint', {
  172. enumerable: true,
  173. get: function() {
  174. return this.s.collectionHint;
  175. },
  176. set: function(v) {
  177. this.s.collectionHint = normalizeHintField(v);
  178. }
  179. });
  180. /**
  181. * Creates a cursor for a query that can be used to iterate over results from MongoDB
  182. * @method
  183. * @param {object} [query={}] The cursor query object.
  184. * @param {object} [options=null] Optional settings.
  185. * @param {number} [options.limit=0] Sets the limit of documents returned in the query.
  186. * @param {(array|object)} [options.sort=null] Set to sort the documents coming back from the query. Array of indexes, [['a', 1]] etc.
  187. * @param {object} [options.projection=null] The fields to return in the query. Object of fields to include or exclude (not both), {'a':1}
  188. * @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
  189. * @param {number} [options.skip=0] Set to skip N documents ahead in your query (useful for pagination).
  190. * @param {Object} [options.hint=null] Tell the query to use specific indexes in the query. Object of indexes to use, {'_id':1}
  191. * @param {boolean} [options.explain=false] Explain the query instead of returning the data.
  192. * @param {boolean} [options.snapshot=false] Snapshot query.
  193. * @param {boolean} [options.timeout=false] Specify if the cursor can timeout.
  194. * @param {boolean} [options.tailable=false] Specify if the cursor is tailable.
  195. * @param {number} [options.batchSize=0] Set the batchSize for the getMoreCommand when iterating over the query results.
  196. * @param {boolean} [options.returnKey=false] Only return the index key.
  197. * @param {number} [options.maxScan=null] Limit the number of items to scan.
  198. * @param {number} [options.min=null] Set index bounds.
  199. * @param {number} [options.max=null] Set index bounds.
  200. * @param {boolean} [options.showDiskLoc=false] Show disk location of results.
  201. * @param {string} [options.comment=null] You can put a $comment field on a query to make looking in the profiler logs simpler.
  202. * @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
  203. * @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
  204. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  205. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  206. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  207. * @param {boolean} [options.partial=false] Specify if the cursor should return partial results when querying against a sharded system
  208. * @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
  209. * @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
  210. * @param {ClientSession} [options.session] optional session to use for this operation
  211. * @throws {MongoError}
  212. * @return {Cursor}
  213. */
  214. Collection.prototype.find = function(query, options, callback) {
  215. let selector = query;
  216. // figuring out arguments
  217. if (typeof callback !== 'function') {
  218. if (typeof options === 'function') {
  219. callback = options;
  220. options = undefined;
  221. } else if (options == null) {
  222. callback = typeof selector === 'function' ? selector : undefined;
  223. selector = typeof selector === 'object' ? selector : undefined;
  224. }
  225. }
  226. // Ensure selector is not null
  227. selector = selector == null ? {} : selector;
  228. // Validate correctness off the selector
  229. var object = selector;
  230. if (Buffer.isBuffer(object)) {
  231. var object_size = object[0] | (object[1] << 8) | (object[2] << 16) | (object[3] << 24);
  232. if (object_size !== object.length) {
  233. var error = new Error(
  234. 'query selector raw message size does not match message header size [' +
  235. object.length +
  236. '] != [' +
  237. object_size +
  238. ']'
  239. );
  240. error.name = 'MongoError';
  241. throw error;
  242. }
  243. }
  244. // Check special case where we are using an objectId
  245. if (selector != null && selector._bsontype === 'ObjectID') {
  246. selector = { _id: selector };
  247. }
  248. if (!options) options = {};
  249. let projection = options.projection || options.fields;
  250. if (projection && !Buffer.isBuffer(projection) && Array.isArray(projection)) {
  251. projection = projection.length
  252. ? projection.reduce((result, field) => {
  253. result[field] = 1;
  254. return result;
  255. }, {})
  256. : { _id: 1 };
  257. }
  258. var newOptions = {};
  259. // Make a shallow copy of the collection options
  260. for (var key in this.s.options) {
  261. if (mergeKeys.indexOf(key) !== -1) {
  262. newOptions[key] = this.s.options[key];
  263. }
  264. }
  265. // Make a shallow copy of options
  266. for (var optKey in options) {
  267. newOptions[optKey] = options[optKey];
  268. }
  269. // Unpack options
  270. newOptions.skip = options.skip ? options.skip : 0;
  271. newOptions.limit = options.limit ? options.limit : 0;
  272. newOptions.raw = typeof options.raw === 'boolean' ? options.raw : this.s.raw;
  273. newOptions.hint = options.hint != null ? normalizeHintField(options.hint) : this.s.collectionHint;
  274. newOptions.timeout = typeof options.timeout === 'undefined' ? undefined : options.timeout;
  275. // // If we have overridden slaveOk otherwise use the default db setting
  276. newOptions.slaveOk = options.slaveOk != null ? options.slaveOk : this.s.db.slaveOk;
  277. // Add read preference if needed
  278. newOptions = getReadPreference(this, newOptions, this.s.db);
  279. // Set slave ok to true if read preference different from primary
  280. if (
  281. newOptions.readPreference != null &&
  282. (newOptions.readPreference !== 'primary' || newOptions.readPreference.mode !== 'primary')
  283. ) {
  284. newOptions.slaveOk = true;
  285. }
  286. // Ensure the query is an object
  287. if (selector != null && typeof selector !== 'object') {
  288. throw MongoError.create({ message: 'query selector must be an object', driver: true });
  289. }
  290. // Build the find command
  291. var findCommand = {
  292. find: this.s.namespace,
  293. limit: newOptions.limit,
  294. skip: newOptions.skip,
  295. query: selector
  296. };
  297. // Ensure we use the right await data option
  298. if (typeof newOptions.awaitdata === 'boolean') {
  299. newOptions.awaitData = newOptions.awaitdata;
  300. }
  301. // Translate to new command option noCursorTimeout
  302. if (typeof newOptions.timeout === 'boolean') newOptions.noCursorTimeout = newOptions.timeout;
  303. // Merge in options to command
  304. for (var name in newOptions) {
  305. if (newOptions[name] != null && name !== 'session') {
  306. findCommand[name] = newOptions[name];
  307. }
  308. }
  309. if (projection) findCommand.fields = projection;
  310. // Add db object to the new options
  311. newOptions.db = this.s.db;
  312. // Add the promise library
  313. newOptions.promiseLibrary = this.s.promiseLibrary;
  314. // Set raw if available at collection level
  315. if (newOptions.raw == null && typeof this.s.raw === 'boolean') newOptions.raw = this.s.raw;
  316. // Set promoteLongs if available at collection level
  317. if (newOptions.promoteLongs == null && typeof this.s.promoteLongs === 'boolean')
  318. newOptions.promoteLongs = this.s.promoteLongs;
  319. if (newOptions.promoteValues == null && typeof this.s.promoteValues === 'boolean')
  320. newOptions.promoteValues = this.s.promoteValues;
  321. if (newOptions.promoteBuffers == null && typeof this.s.promoteBuffers === 'boolean')
  322. newOptions.promoteBuffers = this.s.promoteBuffers;
  323. // Sort options
  324. if (findCommand.sort) {
  325. findCommand.sort = formattedOrderClause(findCommand.sort);
  326. }
  327. // Set the readConcern
  328. decorateWithReadConcern(findCommand, this, options);
  329. // Decorate find command with collation options
  330. decorateWithCollation(findCommand, this, options);
  331. // Create the cursor
  332. if (typeof callback === 'function')
  333. return handleCallback(
  334. callback,
  335. null,
  336. this.s.topology.cursor(this.s.namespace, findCommand, newOptions)
  337. );
  338. return this.s.topology.cursor(this.s.namespace, findCommand, newOptions);
  339. };
  340. /**
  341. * Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
  342. * one will be added to each of the documents missing it by the driver, mutating the document. This behavior
  343. * can be overridden by setting the **forceServerObjectId** flag.
  344. *
  345. * @method
  346. * @param {object} doc Document to insert.
  347. * @param {object} [options=null] Optional settings.
  348. * @param {(number|string)} [options.w=null] The write concern.
  349. * @param {number} [options.wtimeout=null] The write concern timeout.
  350. * @param {boolean} [options.j=false] Specify a journal write concern.
  351. * @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
  352. * @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
  353. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  354. * @param {ClientSession} [options.session] optional session to use for this operation
  355. * @param {Collection~insertOneWriteOpCallback} [callback] The command result callback
  356. * @return {Promise} returns Promise if no callback passed
  357. */
  358. Collection.prototype.insertOne = function(doc, options, callback) {
  359. if (typeof options === 'function') (callback = options), (options = {});
  360. options = options || {};
  361. // Add ignoreUndfined
  362. if (this.s.options.ignoreUndefined) {
  363. options = shallowClone(options);
  364. options.ignoreUndefined = this.s.options.ignoreUndefined;
  365. }
  366. return executeOperation(this.s.topology, insertOne, [this, doc, options, callback]);
  367. };
  368. var insertOne = function(self, doc, options, callback) {
  369. if (Array.isArray(doc)) {
  370. return callback(
  371. MongoError.create({ message: 'doc parameter must be an object', driver: true })
  372. );
  373. }
  374. insertDocuments(self, [doc], options, function(err, r) {
  375. if (callback == null) return;
  376. if (err && callback) return callback(err);
  377. // Workaround for pre 2.6 servers
  378. if (r == null) return callback(null, { result: { ok: 1 } });
  379. // Add values to top level to ensure crud spec compatibility
  380. r.insertedCount = r.result.n;
  381. r.insertedId = doc._id;
  382. if (callback) callback(null, r);
  383. });
  384. };
  385. var mapInserManyResults = function(docs, r) {
  386. var finalResult = {
  387. result: { ok: 1, n: r.insertedCount },
  388. ops: docs,
  389. insertedCount: r.insertedCount,
  390. insertedIds: r.insertedIds
  391. };
  392. if (r.getLastOp()) {
  393. finalResult.result.opTime = r.getLastOp();
  394. }
  395. return finalResult;
  396. };
  397. /**
  398. * Inserts an array of documents into MongoDB. If documents passed in do not contain the **_id** field,
  399. * one will be added to each of the documents missing it by the driver, mutating the document. This behavior
  400. * can be overridden by setting the **forceServerObjectId** flag.
  401. *
  402. * @method
  403. * @param {object[]} docs Documents to insert.
  404. * @param {object} [options=null] Optional settings.
  405. * @param {(number|string)} [options.w=null] The write concern.
  406. * @param {number} [options.wtimeout=null] The write concern timeout.
  407. * @param {boolean} [options.j=false] Specify a journal write concern.
  408. * @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
  409. * @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
  410. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  411. * @param {boolean} [options.ordered=true] If true, when an insert fails, don't execute the remaining writes. If false, continue with remaining inserts when one fails.
  412. * @param {ClientSession} [options.session] optional session to use for this operation
  413. * @param {Collection~insertWriteOpCallback} [callback] The command result callback
  414. * @return {Promise} returns Promise if no callback passed
  415. */
  416. Collection.prototype.insertMany = function(docs, options, callback) {
  417. var self = this;
  418. if (typeof options === 'function') (callback = options), (options = {});
  419. options = options ? shallowClone(options) : { ordered: true };
  420. if (!Array.isArray(docs) && typeof callback === 'function') {
  421. return callback(
  422. MongoError.create({ message: 'docs parameter must be an array of documents', driver: true })
  423. );
  424. } else if (!Array.isArray(docs)) {
  425. return new this.s.promiseLibrary(function(resolve, reject) {
  426. reject(
  427. MongoError.create({ message: 'docs parameter must be an array of documents', driver: true })
  428. );
  429. });
  430. }
  431. // If keep going set unordered
  432. options['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
  433. // Set up the force server object id
  434. var forceServerObjectId =
  435. typeof options.forceServerObjectId === 'boolean'
  436. ? options.forceServerObjectId
  437. : self.s.db.options.forceServerObjectId;
  438. // Do we want to force the server to assign the _id key
  439. if (forceServerObjectId !== true) {
  440. // Add _id if not specified
  441. for (var i = 0; i < docs.length; i++) {
  442. if (docs[i]._id == null) docs[i]._id = self.s.pkFactory.createPk();
  443. }
  444. }
  445. // Generate the bulk write operations
  446. var operations = [
  447. {
  448. insertMany: docs
  449. }
  450. ];
  451. return executeOperation(this.s.topology, bulkWrite, [this, operations, options, callback], {
  452. resultMutator: result => mapInserManyResults(docs, result)
  453. });
  454. };
  455. /**
  456. * @typedef {Object} Collection~BulkWriteOpResult
  457. * @property {number} insertedCount Number of documents inserted.
  458. * @property {number} matchedCount Number of documents matched for update.
  459. * @property {number} modifiedCount Number of documents modified.
  460. * @property {number} deletedCount Number of documents deleted.
  461. * @property {number} upsertedCount Number of documents upserted.
  462. * @property {object} insertedIds Inserted document generated Id's, hash key is the index of the originating operation
  463. * @property {object} upsertedIds Upserted document generated Id's, hash key is the index of the originating operation
  464. * @property {object} result The command result object.
  465. */
  466. /**
  467. * The callback format for inserts
  468. * @callback Collection~bulkWriteOpCallback
  469. * @param {BulkWriteError} error An error instance representing the error during the execution.
  470. * @param {Collection~BulkWriteOpResult} result The result object if the command was executed successfully.
  471. */
  472. /**
  473. * Perform a bulkWrite operation without a fluent API
  474. *
  475. * Legal operation types are
  476. *
  477. * { insertOne: { document: { a: 1 } } }
  478. *
  479. * { updateOne: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } }
  480. *
  481. * { updateMany: { filter: {a:2}, update: {$set: {a:2}}, upsert:true } }
  482. *
  483. * { deleteOne: { filter: {c:1} } }
  484. *
  485. * { deleteMany: { filter: {c:1} } }
  486. *
  487. * { replaceOne: { filter: {c:3}, replacement: {c:4}, upsert:true}}
  488. *
  489. * If documents passed in do not contain the **_id** field,
  490. * one will be added to each of the documents missing it by the driver, mutating the document. This behavior
  491. * can be overridden by setting the **forceServerObjectId** flag.
  492. *
  493. * @method
  494. * @param {object[]} operations Bulk operations to perform.
  495. * @param {object} [options=null] Optional settings.
  496. * @param {(number|string)} [options.w=null] The write concern.
  497. * @param {number} [options.wtimeout=null] The write concern timeout.
  498. * @param {boolean} [options.j=false] Specify a journal write concern.
  499. * @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
  500. * @param {boolean} [options.ordered=true] Execute write operation in ordered or unordered fashion.
  501. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  502. * @param {ClientSession} [options.session] optional session to use for this operation
  503. * @param {Collection~bulkWriteOpCallback} [callback] The command result callback
  504. * @return {Promise} returns Promise if no callback passed
  505. */
  506. Collection.prototype.bulkWrite = function(operations, options, callback) {
  507. if (typeof options === 'function') (callback = options), (options = {});
  508. options = options || { ordered: true };
  509. if (!Array.isArray(operations)) {
  510. throw MongoError.create({ message: 'operations must be an array of documents', driver: true });
  511. }
  512. return executeOperation(this.s.topology, bulkWrite, [this, operations, options, callback]);
  513. };
  514. var bulkWrite = function(self, operations, options, callback) {
  515. // Add ignoreUndfined
  516. if (self.s.options.ignoreUndefined) {
  517. options = shallowClone(options);
  518. options.ignoreUndefined = self.s.options.ignoreUndefined;
  519. }
  520. // Create the bulk operation
  521. var bulk =
  522. options.ordered === true || options.ordered == null
  523. ? self.initializeOrderedBulkOp(options)
  524. : self.initializeUnorderedBulkOp(options);
  525. // Do we have a collation
  526. var collation = false;
  527. // for each op go through and add to the bulk
  528. try {
  529. for (var i = 0; i < operations.length; i++) {
  530. // Get the operation type
  531. var key = Object.keys(operations[i])[0];
  532. // Check if we have a collation
  533. if (operations[i][key].collation) {
  534. collation = true;
  535. }
  536. // Pass to the raw bulk
  537. bulk.raw(operations[i]);
  538. }
  539. } catch (err) {
  540. return callback(err, null);
  541. }
  542. // Final options for write concern
  543. var finalOptions = applyWriteConcern(
  544. shallowClone(options),
  545. { db: self.s.db, collection: self },
  546. options
  547. );
  548. var writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};
  549. var capabilities = self.s.topology.capabilities();
  550. // Did the user pass in a collation, check if our write server supports it
  551. if (collation && capabilities && !capabilities.commandsTakeCollation) {
  552. return callback(new MongoError(f('server/primary/mongos does not support collation')));
  553. }
  554. // Execute the bulk
  555. bulk.execute(writeCon, finalOptions, function(err, r) {
  556. // We have connection level error
  557. if (!r && err) {
  558. return callback(err, null);
  559. }
  560. r.insertedCount = r.nInserted;
  561. r.matchedCount = r.nMatched;
  562. r.modifiedCount = r.nModified || 0;
  563. r.deletedCount = r.nRemoved;
  564. r.upsertedCount = r.getUpsertedIds().length;
  565. r.upsertedIds = {};
  566. r.insertedIds = {};
  567. // Update the n
  568. r.n = r.insertedCount;
  569. // Inserted documents
  570. var inserted = r.getInsertedIds();
  571. // Map inserted ids
  572. for (var i = 0; i < inserted.length; i++) {
  573. r.insertedIds[inserted[i].index] = inserted[i]._id;
  574. }
  575. // Upserted documents
  576. var upserted = r.getUpsertedIds();
  577. // Map upserted ids
  578. for (i = 0; i < upserted.length; i++) {
  579. r.upsertedIds[upserted[i].index] = upserted[i]._id;
  580. }
  581. // Return the results
  582. callback(null, r);
  583. });
  584. };
  585. var insertDocuments = function(self, docs, options, callback) {
  586. if (typeof options === 'function') (callback = options), (options = {});
  587. options = options || {};
  588. // Ensure we are operating on an array op docs
  589. docs = Array.isArray(docs) ? docs : [docs];
  590. // Get the write concern options
  591. var finalOptions = applyWriteConcern(
  592. shallowClone(options),
  593. { db: self.s.db, collection: self },
  594. options
  595. );
  596. // If keep going set unordered
  597. if (finalOptions.keepGoing === true) finalOptions.ordered = false;
  598. finalOptions['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
  599. // Set up the force server object id
  600. var forceServerObjectId =
  601. typeof options.forceServerObjectId === 'boolean'
  602. ? options.forceServerObjectId
  603. : self.s.db.options.forceServerObjectId;
  604. // Add _id if not specified
  605. if (forceServerObjectId !== true) {
  606. for (var i = 0; i < docs.length; i++) {
  607. if (docs[i]._id === void 0) docs[i]._id = self.s.pkFactory.createPk();
  608. }
  609. }
  610. // File inserts
  611. self.s.topology.insert(self.s.namespace, docs, finalOptions, function(err, result) {
  612. if (callback == null) return;
  613. if (err) return handleCallback(callback, err);
  614. if (result == null) return handleCallback(callback, null, null);
  615. if (result.result.code) return handleCallback(callback, toError(result.result));
  616. if (result.result.writeErrors)
  617. return handleCallback(callback, toError(result.result.writeErrors[0]));
  618. // Add docs to the list
  619. result.ops = docs;
  620. // Return the results
  621. handleCallback(callback, null, result);
  622. });
  623. };
  624. /**
  625. * @typedef {Object} Collection~WriteOpResult
  626. * @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
  627. * @property {object} connection The connection object used for the operation.
  628. * @property {object} result The command result object.
  629. */
  630. /**
  631. * The callback format for inserts
  632. * @callback Collection~writeOpCallback
  633. * @param {MongoError} error An error instance representing the error during the execution.
  634. * @param {Collection~WriteOpResult} result The result object if the command was executed successfully.
  635. */
  636. /**
  637. * @typedef {Object} Collection~insertWriteOpResult
  638. * @property {Number} insertedCount The total amount of documents inserted.
  639. * @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
  640. * @property {Object.<Number, ObjectId>} insertedIds Map of the index of the inserted document to the id of the inserted document.
  641. * @property {object} connection The connection object used for the operation.
  642. * @property {object} result The raw command result object returned from MongoDB (content might vary by server version).
  643. * @property {Number} result.ok Is 1 if the command executed correctly.
  644. * @property {Number} result.n The total count of documents inserted.
  645. */
  646. /**
  647. * @typedef {Object} Collection~insertOneWriteOpResult
  648. * @property {Number} insertedCount The total amount of documents inserted.
  649. * @property {object[]} ops All the documents inserted using insertOne/insertMany/replaceOne. Documents contain the _id field if forceServerObjectId == false for insertOne/insertMany
  650. * @property {ObjectId} insertedId The driver generated ObjectId for the insert operation.
  651. * @property {object} connection The connection object used for the operation.
  652. * @property {object} result The raw command result object returned from MongoDB (content might vary by server version).
  653. * @property {Number} result.ok Is 1 if the command executed correctly.
  654. * @property {Number} result.n The total count of documents inserted.
  655. */
  656. /**
  657. * The callback format for inserts
  658. * @callback Collection~insertWriteOpCallback
  659. * @param {MongoError} error An error instance representing the error during the execution.
  660. * @param {Collection~insertWriteOpResult} result The result object if the command was executed successfully.
  661. */
  662. /**
  663. * The callback format for inserts
  664. * @callback Collection~insertOneWriteOpCallback
  665. * @param {MongoError} error An error instance representing the error during the execution.
  666. * @param {Collection~insertOneWriteOpResult} result The result object if the command was executed successfully.
  667. */
  668. /**
  669. * Inserts a single document or a an array of documents into MongoDB. If documents passed in do not contain the **_id** field,
  670. * one will be added to each of the documents missing it by the driver, mutating the document. This behavior
  671. * can be overridden by setting the **forceServerObjectId** flag.
  672. *
  673. * @method
  674. * @param {(object|object[])} docs Documents to insert.
  675. * @param {object} [options=null] Optional settings.
  676. * @param {(number|string)} [options.w=null] The write concern.
  677. * @param {number} [options.wtimeout=null] The write concern timeout.
  678. * @param {boolean} [options.j=false] Specify a journal write concern.
  679. * @param {boolean} [options.serializeFunctions=false] Serialize functions on any object.
  680. * @param {boolean} [options.forceServerObjectId=false] Force server to assign _id values instead of driver.
  681. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  682. * @param {ClientSession} [options.session] optional session to use for this operation
  683. * @param {Collection~insertWriteOpCallback} [callback] The command result callback
  684. * @return {Promise} returns Promise if no callback passed
  685. * @deprecated Use insertOne, insertMany or bulkWrite
  686. */
  687. Collection.prototype.insert = function(docs, options, callback) {
  688. if (typeof options === 'function') (callback = options), (options = {});
  689. options = options || { ordered: false };
  690. docs = !Array.isArray(docs) ? [docs] : docs;
  691. if (options.keepGoing === true) {
  692. options.ordered = false;
  693. }
  694. return this.insertMany(docs, options, callback);
  695. };
  696. /**
  697. * @typedef {Object} Collection~updateWriteOpResult
  698. * @property {Object} result The raw result returned from MongoDB, field will vary depending on server version.
  699. * @property {Number} result.ok Is 1 if the command executed correctly.
  700. * @property {Number} result.n The total count of documents scanned.
  701. * @property {Number} result.nModified The total count of documents modified.
  702. * @property {Object} connection The connection object used for the operation.
  703. * @property {Number} matchedCount The number of documents that matched the filter.
  704. * @property {Number} modifiedCount The number of documents that were modified.
  705. * @property {Number} upsertedCount The number of documents upserted.
  706. * @property {Object} upsertedId The upserted id.
  707. * @property {ObjectId} upsertedId._id The upserted _id returned from the server.
  708. */
  709. /**
  710. * The callback format for inserts
  711. * @callback Collection~updateWriteOpCallback
  712. * @param {MongoError} error An error instance representing the error during the execution.
  713. * @param {Collection~updateWriteOpResult} result The result object if the command was executed successfully.
  714. */
  715. /**
  716. * Update a single document on MongoDB
  717. * @method
  718. * @param {object} filter The Filter used to select the document to update
  719. * @param {object} update The update operations to be applied to the document
  720. * @param {object} [options=null] Optional settings.
  721. * @param {boolean} [options.upsert=false] Update operation is an upsert.
  722. * @param {(number|string)} [options.w=null] The write concern.
  723. * @param {number} [options.wtimeout=null] The write concern timeout.
  724. * @param {boolean} [options.j=false] Specify a journal write concern.
  725. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  726. * @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
  727. * @param {ClientSession} [options.session] optional session to use for this operation
  728. * @param {Collection~updateWriteOpCallback} [callback] The command result callback
  729. * @return {Promise} returns Promise if no callback passed
  730. */
  731. Collection.prototype.updateOne = function(filter, update, options, callback) {
  732. if (typeof options === 'function') (callback = options), (options = {});
  733. options = options || {};
  734. var err = checkForAtomicOperators(update);
  735. if (err) {
  736. if (typeof callback === 'function') return callback(err);
  737. return this.s.promiseLibrary.reject(err);
  738. }
  739. options = shallowClone(options);
  740. // Add ignoreUndfined
  741. if (this.s.options.ignoreUndefined) {
  742. options = shallowClone(options);
  743. options.ignoreUndefined = this.s.options.ignoreUndefined;
  744. }
  745. return executeOperation(this.s.topology, updateOne, [this, filter, update, options, callback]);
  746. };
  747. var checkForAtomicOperators = function(update) {
  748. var keys = Object.keys(update);
  749. // same errors as the server would give for update doc lacking atomic operators
  750. if (keys.length === 0) {
  751. return toError('The update operation document must contain at least one atomic operator.');
  752. }
  753. if (keys[0][0] !== '$') {
  754. return toError('the update operation document must contain atomic operators.');
  755. }
  756. };
  757. var updateOne = function(self, filter, update, options, callback) {
  758. // Set single document update
  759. options.multi = false;
  760. // Execute update
  761. updateDocuments(self, filter, update, options, function(err, r) {
  762. if (callback == null) return;
  763. if (err && callback) return callback(err);
  764. if (r == null) return callback(null, { result: { ok: 1 } });
  765. r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
  766. r.upsertedId =
  767. Array.isArray(r.result.upserted) && r.result.upserted.length > 0
  768. ? r.result.upserted[0]
  769. : null;
  770. r.upsertedCount =
  771. Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
  772. r.matchedCount =
  773. Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
  774. if (callback) callback(null, r);
  775. });
  776. };
  777. /**
  778. * Replace a document on MongoDB
  779. * @method
  780. * @param {object} filter The Filter used to select the document to update
  781. * @param {object} doc The Document that replaces the matching document
  782. * @param {object} [options=null] Optional settings.
  783. * @param {boolean} [options.upsert=false] Update operation is an upsert.
  784. * @param {(number|string)} [options.w=null] The write concern.
  785. * @param {number} [options.wtimeout=null] The write concern timeout.
  786. * @param {boolean} [options.j=false] Specify a journal write concern.
  787. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  788. * @param {ClientSession} [options.session] optional session to use for this operation
  789. * @param {Collection~updateWriteOpCallback} [callback] The command result callback
  790. * @return {Promise} returns Promise if no callback passed
  791. */
  792. Collection.prototype.replaceOne = function(filter, doc, options, callback) {
  793. if (typeof options === 'function') (callback = options), (options = {});
  794. options = shallowClone(options);
  795. // Add ignoreUndfined
  796. if (this.s.options.ignoreUndefined) {
  797. options = shallowClone(options);
  798. options.ignoreUndefined = this.s.options.ignoreUndefined;
  799. }
  800. return executeOperation(this.s.topology, replaceOne, [this, filter, doc, options, callback]);
  801. };
  802. var replaceOne = function(self, filter, doc, options, callback) {
  803. // Set single document update
  804. options.multi = false;
  805. // Execute update
  806. updateDocuments(self, filter, doc, options, function(err, r) {
  807. if (callback == null) return;
  808. if (err && callback) return callback(err);
  809. if (r == null) return callback(null, { result: { ok: 1 } });
  810. r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
  811. r.upsertedId =
  812. Array.isArray(r.result.upserted) && r.result.upserted.length > 0
  813. ? r.result.upserted[0]
  814. : null;
  815. r.upsertedCount =
  816. Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
  817. r.matchedCount =
  818. Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
  819. r.ops = [doc];
  820. if (callback) callback(null, r);
  821. });
  822. };
  823. /**
  824. * Update multiple documents on MongoDB
  825. * @method
  826. * @param {object} filter The Filter used to select the documents to update
  827. * @param {object} update The update operations to be applied to the document
  828. * @param {object} [options=null] Optional settings.
  829. * @param {boolean} [options.upsert=false] Update operation is an upsert.
  830. * @param {(number|string)} [options.w=null] The write concern.
  831. * @param {number} [options.wtimeout=null] The write concern timeout.
  832. * @param {boolean} [options.j=false] Specify a journal write concern.
  833. * @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
  834. * @param {ClientSession} [options.session] optional session to use for this operation
  835. * @param {Collection~updateWriteOpCallback} [callback] The command result callback
  836. * @return {Promise} returns Promise if no callback passed
  837. */
  838. Collection.prototype.updateMany = function(filter, update, options, callback) {
  839. if (typeof options === 'function') (callback = options), (options = {});
  840. options = options || {};
  841. var err = checkForAtomicOperators(update);
  842. if (err) {
  843. if (typeof callback === 'function') return callback(err);
  844. return this.s.promiseLibrary.reject(err);
  845. }
  846. options = shallowClone(options);
  847. // Add ignoreUndfined
  848. if (this.s.options.ignoreUndefined) {
  849. options = shallowClone(options);
  850. options.ignoreUndefined = this.s.options.ignoreUndefined;
  851. }
  852. return executeOperation(this.s.topology, updateMany, [this, filter, update, options, callback]);
  853. };
  854. var updateMany = function(self, filter, update, options, callback) {
  855. // Set single document update
  856. options.multi = true;
  857. // Execute update
  858. updateDocuments(self, filter, update, options, function(err, r) {
  859. if (callback == null) return;
  860. if (err && callback) return callback(err);
  861. if (r == null) return callback(null, { result: { ok: 1 } });
  862. r.modifiedCount = r.result.nModified != null ? r.result.nModified : r.result.n;
  863. r.upsertedId =
  864. Array.isArray(r.result.upserted) && r.result.upserted.length > 0
  865. ? r.result.upserted[0]
  866. : null;
  867. r.upsertedCount =
  868. Array.isArray(r.result.upserted) && r.result.upserted.length ? r.result.upserted.length : 0;
  869. r.matchedCount =
  870. Array.isArray(r.result.upserted) && r.result.upserted.length > 0 ? 0 : r.result.n;
  871. if (callback) callback(null, r);
  872. });
  873. };
  874. var updateDocuments = function(self, selector, document, options, callback) {
  875. if ('function' === typeof options) (callback = options), (options = null);
  876. if (options == null) options = {};
  877. if (!('function' === typeof callback)) callback = null;
  878. // If we are not providing a selector or document throw
  879. if (selector == null || typeof selector !== 'object')
  880. return callback(toError('selector must be a valid JavaScript object'));
  881. if (document == null || typeof document !== 'object')
  882. return callback(toError('document must be a valid JavaScript object'));
  883. // Get the write concern options
  884. var finalOptions = applyWriteConcern(
  885. shallowClone(options),
  886. { db: self.s.db, collection: self },
  887. options
  888. );
  889. // Do we return the actual result document
  890. // Either use override on the function, or go back to default on either the collection
  891. // level or db
  892. finalOptions['serializeFunctions'] = options['serializeFunctions'] || self.s.serializeFunctions;
  893. // Execute the operation
  894. var op = { q: selector, u: document };
  895. op.upsert = options.upsert !== void 0 ? !!options.upsert : false;
  896. op.multi = options.multi !== void 0 ? !!options.multi : false;
  897. if (finalOptions.arrayFilters) {
  898. op.arrayFilters = finalOptions.arrayFilters;
  899. delete finalOptions.arrayFilters;
  900. }
  901. if (finalOptions.retryWrites && op.multi) {
  902. finalOptions.retryWrites = false;
  903. }
  904. // Have we specified collation
  905. decorateWithCollation(finalOptions, self, options);
  906. // Update options
  907. self.s.topology.update(self.s.namespace, [op], finalOptions, function(err, result) {
  908. if (callback == null) return;
  909. if (err) return handleCallback(callback, err, null);
  910. if (result == null) return handleCallback(callback, null, null);
  911. if (result.result.code) return handleCallback(callback, toError(result.result));
  912. if (result.result.writeErrors)
  913. return handleCallback(callback, toError(result.result.writeErrors[0]));
  914. // Return the results
  915. handleCallback(callback, null, result);
  916. });
  917. };
  918. /**
  919. * Updates documents.
  920. * @method
  921. * @param {object} selector The selector for the update operation.
  922. * @param {object} document The update document.
  923. * @param {object} [options=null] Optional settings.
  924. * @param {(number|string)} [options.w=null] The write concern.
  925. * @param {number} [options.wtimeout=null] The write concern timeout.
  926. * @param {boolean} [options.j=false] Specify a journal write concern.
  927. * @param {boolean} [options.upsert=false] Update operation is an upsert.
  928. * @param {boolean} [options.multi=false] Update one/all documents with operation.
  929. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  930. * @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
  931. * @param {Array} [options.arrayFilters=null] optional list of array filters referenced in filtered positional operators
  932. * @param {ClientSession} [options.session] optional session to use for this operation
  933. * @param {Collection~writeOpCallback} [callback] The command result callback
  934. * @throws {MongoError}
  935. * @return {Promise} returns Promise if no callback passed
  936. * @deprecated use updateOne, updateMany or bulkWrite
  937. */
  938. Collection.prototype.update = function(selector, document, options, callback) {
  939. if (typeof options === 'function') (callback = options), (options = {});
  940. options = options || {};
  941. // Add ignoreUndfined
  942. if (this.s.options.ignoreUndefined) {
  943. options = shallowClone(options);
  944. options.ignoreUndefined = this.s.options.ignoreUndefined;
  945. }
  946. return executeOperation(this.s.topology, updateDocuments, [
  947. this,
  948. selector,
  949. document,
  950. options,
  951. callback
  952. ]);
  953. };
  954. /**
  955. * @typedef {Object} Collection~deleteWriteOpResult
  956. * @property {Object} result The raw result returned from MongoDB, field will vary depending on server version.
  957. * @property {Number} result.ok Is 1 if the command executed correctly.
  958. * @property {Number} result.n The total count of documents deleted.
  959. * @property {Object} connection The connection object used for the operation.
  960. * @property {Number} deletedCount The number of documents deleted.
  961. */
  962. /**
  963. * The callback format for inserts
  964. * @callback Collection~deleteWriteOpCallback
  965. * @param {MongoError} error An error instance representing the error during the execution.
  966. * @param {Collection~deleteWriteOpResult} result The result object if the command was executed successfully.
  967. */
  968. /**
  969. * Delete a document on MongoDB
  970. * @method
  971. * @param {object} filter The Filter used to select the document to remove
  972. * @param {object} [options=null] Optional settings.
  973. * @param {(number|string)} [options.w=null] The write concern.
  974. * @param {number} [options.wtimeout=null] The write concern timeout.
  975. * @param {boolean} [options.j=false] Specify a journal write concern.
  976. * @param {ClientSession} [options.session] optional session to use for this operation
  977. * @param {Collection~deleteWriteOpCallback} [callback] The command result callback
  978. * @return {Promise} returns Promise if no callback passed
  979. */
  980. Collection.prototype.deleteOne = function(filter, options, callback) {
  981. if (typeof options === 'function') (callback = options), (options = {});
  982. options = shallowClone(options);
  983. // Add ignoreUndfined
  984. if (this.s.options.ignoreUndefined) {
  985. options = shallowClone(options);
  986. options.ignoreUndefined = this.s.options.ignoreUndefined;
  987. }
  988. return executeOperation(this.s.topology, deleteOne, [this, filter, options, callback]);
  989. };
  990. var deleteOne = function(self, filter, options, callback) {
  991. options.single = true;
  992. removeDocuments(self, filter, options, function(err, r) {
  993. if (callback == null) return;
  994. if (err && callback) return callback(err);
  995. if (r == null) return callback(null, { result: { ok: 1 } });
  996. r.deletedCount = r.result.n;
  997. if (callback) callback(null, r);
  998. });
  999. };
  1000. Collection.prototype.removeOne = Collection.prototype.deleteOne;
  1001. /**
  1002. * Delete multiple documents on MongoDB
  1003. * @method
  1004. * @param {object} filter The Filter used to select the documents to remove
  1005. * @param {object} [options=null] Optional settings.
  1006. * @param {(number|string)} [options.w=null] The write concern.
  1007. * @param {number} [options.wtimeout=null] The write concern timeout.
  1008. * @param {boolean} [options.j=false] Specify a journal write concern.
  1009. * @param {ClientSession} [options.session] optional session to use for this operation
  1010. * @param {Collection~deleteWriteOpCallback} [callback] The command result callback
  1011. * @return {Promise} returns Promise if no callback passed
  1012. */
  1013. Collection.prototype.deleteMany = function(filter, options, callback) {
  1014. if (typeof options === 'function') (callback = options), (options = {});
  1015. options = shallowClone(options);
  1016. // Add ignoreUndfined
  1017. if (this.s.options.ignoreUndefined) {
  1018. options = shallowClone(options);
  1019. options.ignoreUndefined = this.s.options.ignoreUndefined;
  1020. }
  1021. return executeOperation(this.s.topology, deleteMany, [this, filter, options, callback]);
  1022. };
  1023. var deleteMany = function(self, filter, options, callback) {
  1024. options.single = false;
  1025. removeDocuments(self, filter, options, function(err, r) {
  1026. if (callback == null) return;
  1027. if (err && callback) return callback(err);
  1028. if (r == null) return callback(null, { result: { ok: 1 } });
  1029. r.deletedCount = r.result.n;
  1030. if (callback) callback(null, r);
  1031. });
  1032. };
  1033. var removeDocuments = function(self, selector, options, callback) {
  1034. if (typeof options === 'function') {
  1035. (callback = options), (options = {});
  1036. } else if (typeof selector === 'function') {
  1037. callback = selector;
  1038. options = {};
  1039. selector = {};
  1040. }
  1041. // Create an empty options object if the provided one is null
  1042. options = options || {};
  1043. // Get the write concern options
  1044. var finalOptions = applyWriteConcern(
  1045. shallowClone(options),
  1046. { db: self.s.db, collection: self },
  1047. options
  1048. );
  1049. // If selector is null set empty
  1050. if (selector == null) selector = {};
  1051. // Build the op
  1052. var op = { q: selector, limit: 0 };
  1053. if (options.single) {
  1054. op.limit = 1;
  1055. } else if (finalOptions.retryWrites) {
  1056. finalOptions.retryWrites = false;
  1057. }
  1058. // Have we specified collation
  1059. decorateWithCollation(finalOptions, self, options);
  1060. // Execute the remove
  1061. self.s.topology.remove(self.s.namespace, [op], finalOptions, function(err, result) {
  1062. if (callback == null) return;
  1063. if (err) return handleCallback(callback, err, null);
  1064. if (result == null) return handleCallback(callback, null, null);
  1065. if (result.result.code) return handleCallback(callback, toError(result.result));
  1066. if (result.result.writeErrors)
  1067. return handleCallback(callback, toError(result.result.writeErrors[0]));
  1068. // Return the results
  1069. handleCallback(callback, null, result);
  1070. });
  1071. };
  1072. Collection.prototype.removeMany = Collection.prototype.deleteMany;
  1073. /**
  1074. * Remove documents.
  1075. * @method
  1076. * @param {object} selector The selector for the update operation.
  1077. * @param {object} [options=null] Optional settings.
  1078. * @param {(number|string)} [options.w=null] The write concern.
  1079. * @param {number} [options.wtimeout=null] The write concern timeout.
  1080. * @param {boolean} [options.j=false] Specify a journal write concern.
  1081. * @param {boolean} [options.single=false] Removes the first document found.
  1082. * @param {ClientSession} [options.session] optional session to use for this operation
  1083. * @param {Collection~writeOpCallback} [callback] The command result callback
  1084. * @return {Promise} returns Promise if no callback passed
  1085. * @deprecated use deleteOne, deleteMany or bulkWrite
  1086. */
  1087. Collection.prototype.remove = function(selector, options, callback) {
  1088. if (typeof options === 'function') (callback = options), (options = {});
  1089. options = options || {};
  1090. // Add ignoreUndfined
  1091. if (this.s.options.ignoreUndefined) {
  1092. options = shallowClone(options);
  1093. options.ignoreUndefined = this.s.options.ignoreUndefined;
  1094. }
  1095. return executeOperation(this.s.topology, removeDocuments, [this, selector, options, callback]);
  1096. };
  1097. /**
  1098. * Save a document. Simple full document replacement function. Not recommended for efficiency, use atomic
  1099. * operators and update instead for more efficient operations.
  1100. * @method
  1101. * @param {object} doc Document to save
  1102. * @param {object} [options=null] Optional settings.
  1103. * @param {(number|string)} [options.w=null] The write concern.
  1104. * @param {number} [options.wtimeout=null] The write concern timeout.
  1105. * @param {boolean} [options.j=false] Specify a journal write concern.
  1106. * @param {ClientSession} [options.session] optional session to use for this operation
  1107. * @param {Collection~writeOpCallback} [callback] The command result callback
  1108. * @return {Promise} returns Promise if no callback passed
  1109. * @deprecated use insertOne, insertMany, updateOne or updateMany
  1110. */
  1111. Collection.prototype.save = function(doc, options, callback) {
  1112. if (typeof options === 'function') (callback = options), (options = {});
  1113. options = options || {};
  1114. // Add ignoreUndfined
  1115. if (this.s.options.ignoreUndefined) {
  1116. options = shallowClone(options);
  1117. options.ignoreUndefined = this.s.options.ignoreUndefined;
  1118. }
  1119. return executeOperation(this.s.topology, save, [this, doc, options, callback]);
  1120. };
  1121. var save = function(self, doc, options, callback) {
  1122. // Get the write concern options
  1123. var finalOptions = applyWriteConcern(
  1124. shallowClone(options),
  1125. { db: self.s.db, collection: self },
  1126. options
  1127. );
  1128. // Establish if we need to perform an insert or update
  1129. if (doc._id != null) {
  1130. finalOptions.upsert = true;
  1131. return updateDocuments(self, { _id: doc._id }, doc, finalOptions, callback);
  1132. }
  1133. // Insert the document
  1134. insertDocuments(self, [doc], finalOptions, function(err, r) {
  1135. if (callback == null) return;
  1136. if (doc == null) return handleCallback(callback, null, null);
  1137. if (err) return handleCallback(callback, err, null);
  1138. handleCallback(callback, null, r);
  1139. });
  1140. };
  1141. /**
  1142. * The callback format for results
  1143. * @callback Collection~resultCallback
  1144. * @param {MongoError} error An error instance representing the error during the execution.
  1145. * @param {object} result The result object if the command was executed successfully.
  1146. */
  1147. /**
  1148. * The callback format for an aggregation call
  1149. * @callback Collection~aggregationCallback
  1150. * @param {MongoError} error An error instance representing the error during the execution.
  1151. * @param {AggregationCursor} cursor The cursor if the aggregation command was executed successfully.
  1152. */
  1153. /**
  1154. * Fetches the first document that matches the query
  1155. * @method
  1156. * @param {object} query Query for find Operation
  1157. * @param {object} [options=null] Optional settings.
  1158. * @param {number} [options.limit=0] Sets the limit of documents returned in the query.
  1159. * @param {(array|object)} [options.sort=null] Set to sort the documents coming back from the query. Array of indexes, [['a', 1]] etc.
  1160. * @param {object} [options.projection=null] The fields to return in the query. Object of fields to include or exclude (not both), {'a':1}
  1161. * @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
  1162. * @param {number} [options.skip=0] Set to skip N documents ahead in your query (useful for pagination).
  1163. * @param {Object} [options.hint=null] Tell the query to use specific indexes in the query. Object of indexes to use, {'_id':1}
  1164. * @param {boolean} [options.explain=false] Explain the query instead of returning the data.
  1165. * @param {boolean} [options.snapshot=false] Snapshot query.
  1166. * @param {boolean} [options.timeout=false] Specify if the cursor can timeout.
  1167. * @param {boolean} [options.tailable=false] Specify if the cursor is tailable.
  1168. * @param {number} [options.batchSize=0] Set the batchSize for the getMoreCommand when iterating over the query results.
  1169. * @param {boolean} [options.returnKey=false] Only return the index key.
  1170. * @param {number} [options.maxScan=null] Limit the number of items to scan.
  1171. * @param {number} [options.min=null] Set index bounds.
  1172. * @param {number} [options.max=null] Set index bounds.
  1173. * @param {boolean} [options.showDiskLoc=false] Show disk location of results.
  1174. * @param {string} [options.comment=null] You can put a $comment field on a query to make looking in the profiler logs simpler.
  1175. * @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
  1176. * @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
  1177. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  1178. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  1179. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  1180. * @param {boolean} [options.partial=false] Specify if the cursor should return partial results when querying against a sharded system
  1181. * @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
  1182. * @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
  1183. * @param {ClientSession} [options.session] optional session to use for this operation
  1184. * @param {Collection~resultCallback} [callback] The command result callback
  1185. * @return {Promise} returns Promise if no callback passed
  1186. */
  1187. Collection.prototype.findOne = function(query, options, callback) {
  1188. if (typeof query === 'function') (callback = query), (query = {}), (options = {});
  1189. if (typeof options === 'function') (callback = options), (options = {});
  1190. query = query || {};
  1191. options = options || {};
  1192. return executeOperation(this.s.topology, findOne, [this, query, options, callback]);
  1193. };
  1194. var findOne = function(self, query, options, callback) {
  1195. const cursor = self
  1196. .find(query, options)
  1197. .limit(-1)
  1198. .batchSize(1);
  1199. // Return the item
  1200. cursor.next(function(err, item) {
  1201. if (err != null) return handleCallback(callback, toError(err), null);
  1202. handleCallback(callback, null, item);
  1203. });
  1204. };
  1205. /**
  1206. * The callback format for the collection method, must be used if strict is specified
  1207. * @callback Collection~collectionResultCallback
  1208. * @param {MongoError} error An error instance representing the error during the execution.
  1209. * @param {Collection} collection The collection instance.
  1210. */
  1211. /**
  1212. * Rename the collection.
  1213. *
  1214. * @method
  1215. * @param {string} newName New name of of the collection.
  1216. * @param {object} [options=null] Optional settings.
  1217. * @param {boolean} [options.dropTarget=false] Drop the target name collection if it previously exists.
  1218. * @param {ClientSession} [options.session] optional session to use for this operation
  1219. * @param {Collection~collectionResultCallback} [callback] The results callback
  1220. * @return {Promise} returns Promise if no callback passed
  1221. */
  1222. Collection.prototype.rename = function(newName, options, callback) {
  1223. if (typeof options === 'function') (callback = options), (options = {});
  1224. options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY });
  1225. return executeOperation(this.s.topology, rename, [this, newName, options, callback]);
  1226. };
  1227. var rename = function(self, newName, options, callback) {
  1228. // Check the collection name
  1229. checkCollectionName(newName);
  1230. // Build the command
  1231. var renameCollection = f('%s.%s', self.s.dbName, self.s.name);
  1232. var toCollection = f('%s.%s', self.s.dbName, newName);
  1233. var dropTarget = typeof options.dropTarget === 'boolean' ? options.dropTarget : false;
  1234. var cmd = { renameCollection: renameCollection, to: toCollection, dropTarget: dropTarget };
  1235. // Decorate command with writeConcern if supported
  1236. applyWriteConcern(cmd, { db: self.s.db, collection: self }, options);
  1237. // Execute against admin
  1238. self.s.db.admin().command(cmd, options, function(err, doc) {
  1239. if (err) return handleCallback(callback, err, null);
  1240. // We have an error
  1241. if (doc.errmsg) return handleCallback(callback, toError(doc), null);
  1242. try {
  1243. return handleCallback(
  1244. callback,
  1245. null,
  1246. new Collection(
  1247. self.s.db,
  1248. self.s.topology,
  1249. self.s.dbName,
  1250. newName,
  1251. self.s.pkFactory,
  1252. self.s.options
  1253. )
  1254. );
  1255. } catch (err) {
  1256. return handleCallback(callback, toError(err), null);
  1257. }
  1258. });
  1259. };
  1260. /**
  1261. * Drop the collection from the database, removing it permanently. New accesses will create a new collection.
  1262. *
  1263. * @method
  1264. * @param {object} [options=null] Optional settings.
  1265. * @param {ClientSession} [options.session] optional session to use for this operation
  1266. * @param {Collection~resultCallback} [callback] The results callback
  1267. * @return {Promise} returns Promise if no callback passed
  1268. */
  1269. Collection.prototype.drop = function(options, callback) {
  1270. if (typeof options === 'function') (callback = options), (options = {});
  1271. options = options || {};
  1272. return executeOperation(this.s.topology, this.s.db.dropCollection.bind(this.s.db), [
  1273. this.s.name,
  1274. options,
  1275. callback
  1276. ]);
  1277. };
  1278. /**
  1279. * Returns the options of the collection.
  1280. *
  1281. * @method
  1282. * @param {Object} [options] Optional settings
  1283. * @param {ClientSession} [options.session] optional session to use for this operation
  1284. * @param {Collection~resultCallback} [callback] The results callback
  1285. * @return {Promise} returns Promise if no callback passed
  1286. */
  1287. Collection.prototype.options = function(opts, callback) {
  1288. if (typeof opts === 'function') (callback = opts), (opts = {});
  1289. opts = opts || {};
  1290. return executeOperation(this.s.topology, options, [this, opts, callback]);
  1291. };
  1292. var options = function(self, opts, callback) {
  1293. self.s.db.listCollections({ name: self.s.name }, opts).toArray(function(err, collections) {
  1294. if (err) return handleCallback(callback, err);
  1295. if (collections.length === 0) {
  1296. return handleCallback(
  1297. callback,
  1298. MongoError.create({ message: f('collection %s not found', self.s.namespace), driver: true })
  1299. );
  1300. }
  1301. handleCallback(callback, err, collections[0].options || null);
  1302. });
  1303. };
  1304. /**
  1305. * Returns if the collection is a capped collection
  1306. *
  1307. * @method
  1308. * @param {Object} [options] Optional settings
  1309. * @param {ClientSession} [options.session] optional session to use for this operation
  1310. * @param {Collection~resultCallback} [callback] The results callback
  1311. * @return {Promise} returns Promise if no callback passed
  1312. */
  1313. Collection.prototype.isCapped = function(options, callback) {
  1314. if (typeof options === 'function') (callback = options), (options = {});
  1315. options = options || {};
  1316. return executeOperation(this.s.topology, isCapped, [this, options, callback]);
  1317. };
  1318. var isCapped = function(self, options, callback) {
  1319. self.options(options, function(err, document) {
  1320. if (err) return handleCallback(callback, err);
  1321. handleCallback(callback, null, document && document.capped);
  1322. });
  1323. };
  1324. /**
  1325. * Creates an index on the db and collection collection.
  1326. * @method
  1327. * @param {(string|object)} fieldOrSpec Defines the index.
  1328. * @param {object} [options=null] Optional settings.
  1329. * @param {(number|string)} [options.w=null] The write concern.
  1330. * @param {number} [options.wtimeout=null] The write concern timeout.
  1331. * @param {boolean} [options.j=false] Specify a journal write concern.
  1332. * @param {boolean} [options.unique=false] Creates an unique index.
  1333. * @param {boolean} [options.sparse=false] Creates a sparse index.
  1334. * @param {boolean} [options.background=false] Creates the index in the background, yielding whenever possible.
  1335. * @param {boolean} [options.dropDups=false] A unique index cannot be created on a key that has pre-existing duplicate values. If you would like to create the index anyway, keeping the first document the database indexes and deleting all subsequent documents that have duplicate value
  1336. * @param {number} [options.min=null] For geospatial indexes set the lower bound for the co-ordinates.
  1337. * @param {number} [options.max=null] For geospatial indexes set the high bound for the co-ordinates.
  1338. * @param {number} [options.v=null] Specify the format version of the indexes.
  1339. * @param {number} [options.expireAfterSeconds=null] Allows you to expire data on indexes applied to a data (MongoDB 2.2 or higher)
  1340. * @param {string} [options.name=null] Override the autogenerated index name (useful if the resulting name is larger than 128 bytes)
  1341. * @param {object} [options.partialFilterExpression=null] Creates a partial index based on the given filter object (MongoDB 3.2 or higher)
  1342. * @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
  1343. * @param {ClientSession} [options.session] optional session to use for this operation
  1344. * @param {Collection~resultCallback} [callback] The command result callback
  1345. * @return {Promise} returns Promise if no callback passed
  1346. */
  1347. Collection.prototype.createIndex = function(fieldOrSpec, options, callback) {
  1348. if (typeof options === 'function') (callback = options), (options = {});
  1349. options = options || {};
  1350. return executeOperation(this.s.topology, createIndex, [this, fieldOrSpec, options, callback]);
  1351. };
  1352. var createIndex = function(self, fieldOrSpec, options, callback) {
  1353. self.s.db.createIndex(self.s.name, fieldOrSpec, options, callback);
  1354. };
  1355. /**
  1356. * Creates multiple indexes in the collection, this method is only supported for
  1357. * MongoDB 2.6 or higher. Earlier version of MongoDB will throw a command not supported
  1358. * error. Index specifications are defined at https://www.mongodb.com/docs/manual/reference/command/createIndexes/.
  1359. * @method
  1360. * @param {array} indexSpecs An array of index specifications to be created
  1361. * @param {Object} [options] Optional settings
  1362. * @param {ClientSession} [options.session] optional session to use for this operation
  1363. * @param {Collection~resultCallback} [callback] The command result callback
  1364. * @return {Promise} returns Promise if no callback passed
  1365. */
  1366. Collection.prototype.createIndexes = function(indexSpecs, options, callback) {
  1367. if (typeof options === 'function') (callback = options), (options = {});
  1368. options = options ? shallowClone(options) : {};
  1369. if (typeof options.maxTimeMS !== 'number') delete options.maxTimeMS;
  1370. return executeOperation(this.s.topology, createIndexes, [this, indexSpecs, options, callback]);
  1371. };
  1372. var createIndexes = function(self, indexSpecs, options, callback) {
  1373. var capabilities = self.s.topology.capabilities();
  1374. // Ensure we generate the correct name if the parameter is not set
  1375. for (var i = 0; i < indexSpecs.length; i++) {
  1376. if (indexSpecs[i].name == null) {
  1377. var keys = [];
  1378. // Did the user pass in a collation, check if our write server supports it
  1379. if (indexSpecs[i].collation && capabilities && !capabilities.commandsTakeCollation) {
  1380. return callback(new MongoError(f('server/primary/mongos does not support collation')));
  1381. }
  1382. for (var name in indexSpecs[i].key) {
  1383. keys.push(f('%s_%s', name, indexSpecs[i].key[name]));
  1384. }
  1385. // Set the name
  1386. indexSpecs[i].name = keys.join('_');
  1387. }
  1388. }
  1389. options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY });
  1390. // Execute the index
  1391. self.s.db.command(
  1392. {
  1393. createIndexes: self.s.name,
  1394. indexes: indexSpecs
  1395. },
  1396. options,
  1397. callback
  1398. );
  1399. };
  1400. /**
  1401. * Drops an index from this collection.
  1402. * @method
  1403. * @param {string} indexName Name of the index to drop.
  1404. * @param {object} [options=null] Optional settings.
  1405. * @param {(number|string)} [options.w=null] The write concern.
  1406. * @param {number} [options.wtimeout=null] The write concern timeout.
  1407. * @param {boolean} [options.j=false] Specify a journal write concern.
  1408. * @param {ClientSession} [options.session] optional session to use for this operation
  1409. * @param {number} [options.maxTimeMS] Number of miliseconds to wait before aborting the query.
  1410. * @param {Collection~resultCallback} [callback] The command result callback
  1411. * @return {Promise} returns Promise if no callback passed
  1412. */
  1413. Collection.prototype.dropIndex = function(indexName, options, callback) {
  1414. var args = Array.prototype.slice.call(arguments, 1);
  1415. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  1416. options = args.length ? args.shift() || {} : {};
  1417. // Run only against primary
  1418. options.readPreference = ReadPreference.PRIMARY;
  1419. return executeOperation(this.s.topology, dropIndex, [this, indexName, options, callback]);
  1420. };
  1421. var dropIndex = function(self, indexName, options, callback) {
  1422. // Delete index command
  1423. var cmd = { dropIndexes: self.s.name, index: indexName };
  1424. // Decorate command with writeConcern if supported
  1425. applyWriteConcern(cmd, { db: self.s.db, collection: self }, options);
  1426. // Execute command
  1427. self.s.db.command(cmd, options, function(err, result) {
  1428. if (typeof callback !== 'function') return;
  1429. if (err) return handleCallback(callback, err, null);
  1430. handleCallback(callback, null, result);
  1431. });
  1432. };
  1433. /**
  1434. * Drops all indexes from this collection.
  1435. * @method
  1436. * @param {Object} [options] Optional settings
  1437. * @param {ClientSession} [options.session] optional session to use for this operation
  1438. * @param {number} [options.maxTimeMS] Number of miliseconds to wait before aborting the query.
  1439. * @param {Collection~resultCallback} [callback] The command result callback
  1440. * @return {Promise} returns Promise if no callback passed
  1441. */
  1442. Collection.prototype.dropIndexes = function(options, callback) {
  1443. if (typeof options === 'function') (callback = options), (options = {});
  1444. options = options ? shallowClone(options) : {};
  1445. if (typeof options.maxTimeMS !== 'number') delete options.maxTimeMS;
  1446. return executeOperation(this.s.topology, dropIndexes, [this, options, callback]);
  1447. };
  1448. var dropIndexes = function(self, options, callback) {
  1449. self.dropIndex('*', options, function(err) {
  1450. if (err) return handleCallback(callback, err, false);
  1451. handleCallback(callback, null, true);
  1452. });
  1453. };
  1454. /**
  1455. * Drops all indexes from this collection.
  1456. * @method
  1457. * @deprecated use dropIndexes
  1458. * @param {Collection~resultCallback} callback The command result callback
  1459. * @return {Promise} returns Promise if no [callback] passed
  1460. */
  1461. Collection.prototype.dropAllIndexes = Collection.prototype.dropIndexes;
  1462. /**
  1463. * Reindex all indexes on the collection
  1464. * Warning: reIndex is a blocking operation (indexes are rebuilt in the foreground) and will be slow for large collections.
  1465. * @method
  1466. * @param {Object} [options] Optional settings
  1467. * @param {ClientSession} [options.session] optional session to use for this operation
  1468. * @param {Collection~resultCallback} [callback] The command result callback
  1469. * @return {Promise} returns Promise if no callback passed
  1470. */
  1471. Collection.prototype.reIndex = function(options, callback) {
  1472. if (typeof options === 'function') (callback = options), (options = {});
  1473. options = options || {};
  1474. return executeOperation(this.s.topology, reIndex, [this, options, callback]);
  1475. };
  1476. var reIndex = function(self, options, callback) {
  1477. // Reindex
  1478. var cmd = { reIndex: self.s.name };
  1479. // Execute the command
  1480. self.s.db.command(cmd, options, function(err, result) {
  1481. if (callback == null) return;
  1482. if (err) return handleCallback(callback, err, null);
  1483. handleCallback(callback, null, result.ok ? true : false);
  1484. });
  1485. };
  1486. /**
  1487. * Get the list of all indexes information for the collection.
  1488. *
  1489. * @method
  1490. * @param {object} [options=null] Optional settings.
  1491. * @param {number} [options.batchSize=null] The batchSize for the returned command cursor or if pre 2.8 the systems batch collection
  1492. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  1493. * @param {ClientSession} [options.session] optional session to use for this operation
  1494. * @return {CommandCursor}
  1495. */
  1496. Collection.prototype.listIndexes = function(options) {
  1497. options = options || {};
  1498. // Clone the options
  1499. options = shallowClone(options);
  1500. // Determine the read preference in the options.
  1501. options = getReadPreference(this, options, this.s.db, this);
  1502. // Set the CommandCursor constructor
  1503. options.cursorFactory = CommandCursor;
  1504. // Set the promiseLibrary
  1505. options.promiseLibrary = this.s.promiseLibrary;
  1506. if (!this.s.topology.capabilities()) {
  1507. throw new MongoError('cannot connect to server');
  1508. }
  1509. // We have a list collections command
  1510. if (this.s.topology.capabilities().hasListIndexesCommand) {
  1511. // Cursor options
  1512. var cursor = options.batchSize ? { batchSize: options.batchSize } : {};
  1513. // Build the command
  1514. var command = { listIndexes: this.s.name, cursor: cursor };
  1515. // Execute the cursor
  1516. cursor = this.s.topology.cursor(f('%s.$cmd', this.s.dbName), command, options);
  1517. // Do we have a readPreference, apply it
  1518. if (options.readPreference) cursor.setReadPreference(options.readPreference);
  1519. // Return the cursor
  1520. return cursor;
  1521. }
  1522. // Get the namespace
  1523. var ns = f('%s.system.indexes', this.s.dbName);
  1524. // Get the query
  1525. cursor = this.s.topology.cursor(ns, { find: ns, query: { ns: this.s.namespace } }, options);
  1526. // Do we have a readPreference, apply it
  1527. if (options.readPreference) cursor.setReadPreference(options.readPreference);
  1528. // Set the passed in batch size if one was provided
  1529. if (options.batchSize) cursor = cursor.batchSize(options.batchSize);
  1530. // Return the cursor
  1531. return cursor;
  1532. };
  1533. /**
  1534. * Ensures that an index exists, if it does not it creates it
  1535. * @method
  1536. * @deprecated use createIndexes instead
  1537. * @param {(string|object)} fieldOrSpec Defines the index.
  1538. * @param {object} [options=null] Optional settings.
  1539. * @param {(number|string)} [options.w=null] The write concern.
  1540. * @param {number} [options.wtimeout=null] The write concern timeout.
  1541. * @param {boolean} [options.j=false] Specify a journal write concern.
  1542. * @param {boolean} [options.unique=false] Creates an unique index.
  1543. * @param {boolean} [options.sparse=false] Creates a sparse index.
  1544. * @param {boolean} [options.background=false] Creates the index in the background, yielding whenever possible.
  1545. * @param {boolean} [options.dropDups=false] A unique index cannot be created on a key that has pre-existing duplicate values. If you would like to create the index anyway, keeping the first document the database indexes and deleting all subsequent documents that have duplicate value
  1546. * @param {number} [options.min=null] For geospatial indexes set the lower bound for the co-ordinates.
  1547. * @param {number} [options.max=null] For geospatial indexes set the high bound for the co-ordinates.
  1548. * @param {number} [options.v=null] Specify the format version of the indexes.
  1549. * @param {number} [options.expireAfterSeconds=null] Allows you to expire data on indexes applied to a data (MongoDB 2.2 or higher)
  1550. * @param {number} [options.name=null] Override the autogenerated index name (useful if the resulting name is larger than 128 bytes)
  1551. * @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
  1552. * @param {ClientSession} [options.session] optional session to use for this operation
  1553. * @param {Collection~resultCallback} [callback] The command result callback
  1554. * @return {Promise} returns Promise if no callback passed
  1555. */
  1556. Collection.prototype.ensureIndex = function(fieldOrSpec, options, callback) {
  1557. if (typeof options === 'function') (callback = options), (options = {});
  1558. options = options || {};
  1559. return executeOperation(this.s.topology, ensureIndex, [this, fieldOrSpec, options, callback]);
  1560. };
  1561. var ensureIndex = function(self, fieldOrSpec, options, callback) {
  1562. self.s.db.ensureIndex(self.s.name, fieldOrSpec, options, callback);
  1563. };
  1564. /**
  1565. * Checks if one or more indexes exist on the collection, fails on first non-existing index
  1566. * @method
  1567. * @param {(string|array)} indexes One or more index names to check.
  1568. * @param {Object} [options] Optional settings
  1569. * @param {ClientSession} [options.session] optional session to use for this operation
  1570. * @param {Collection~resultCallback} [callback] The command result callback
  1571. * @return {Promise} returns Promise if no callback passed
  1572. */
  1573. Collection.prototype.indexExists = function(indexes, options, callback) {
  1574. if (typeof options === 'function') (callback = options), (options = {});
  1575. options = options || {};
  1576. return executeOperation(this.s.topology, indexExists, [this, indexes, options, callback]);
  1577. };
  1578. var indexExists = function(self, indexes, options, callback) {
  1579. self.indexInformation(options, function(err, indexInformation) {
  1580. // If we have an error return
  1581. if (err != null) return handleCallback(callback, err, null);
  1582. // Let's check for the index names
  1583. if (!Array.isArray(indexes))
  1584. return handleCallback(callback, null, indexInformation[indexes] != null);
  1585. // Check in list of indexes
  1586. for (var i = 0; i < indexes.length; i++) {
  1587. if (indexInformation[indexes[i]] == null) {
  1588. return handleCallback(callback, null, false);
  1589. }
  1590. }
  1591. // All keys found return true
  1592. return handleCallback(callback, null, true);
  1593. });
  1594. };
  1595. /**
  1596. * Retrieves this collections index info.
  1597. * @method
  1598. * @param {object} [options=null] Optional settings.
  1599. * @param {boolean} [options.full=false] Returns the full raw index information.
  1600. * @param {ClientSession} [options.session] optional session to use for this operation
  1601. * @param {Collection~resultCallback} [callback] The command result callback
  1602. * @return {Promise} returns Promise if no callback passed
  1603. */
  1604. Collection.prototype.indexInformation = function(options, callback) {
  1605. var args = Array.prototype.slice.call(arguments, 0);
  1606. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  1607. options = args.length ? args.shift() || {} : {};
  1608. return executeOperation(this.s.topology, indexInformation, [this, options, callback]);
  1609. };
  1610. var indexInformation = function(self, options, callback) {
  1611. self.s.db.indexInformation(self.s.name, options, callback);
  1612. };
  1613. /**
  1614. * The callback format for results
  1615. * @callback Collection~countCallback
  1616. * @param {MongoError} error An error instance representing the error during the execution.
  1617. * @param {number} result The count of documents that matched the query.
  1618. */
  1619. /**
  1620. * Count number of matching documents in the db to a query.
  1621. * @method
  1622. * @param {object} query The query for the count.
  1623. * @param {object} [options=null] Optional settings.
  1624. * @param {boolean} [options.limit=null] The limit of documents to count.
  1625. * @param {boolean} [options.skip=null] The number of documents to skip for the count.
  1626. * @param {string} [options.hint=null] An index name hint for the query.
  1627. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  1628. * @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
  1629. * @param {ClientSession} [options.session] optional session to use for this operation
  1630. * @param {Collection~countCallback} [callback] The command result callback
  1631. * @return {Promise} returns Promise if no callback passed
  1632. */
  1633. Collection.prototype.count = function(query, options, callback) {
  1634. var args = Array.prototype.slice.call(arguments, 0);
  1635. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  1636. query = args.length ? args.shift() || {} : {};
  1637. options = args.length ? args.shift() || {} : {};
  1638. return executeOperation(this.s.topology, count, [this, query, options, callback]);
  1639. };
  1640. var count = function(self, query, options, callback) {
  1641. var skip = options.skip;
  1642. var limit = options.limit;
  1643. var hint = options.hint;
  1644. var maxTimeMS = options.maxTimeMS;
  1645. // Final query
  1646. var cmd = {
  1647. count: self.s.name,
  1648. query: query
  1649. };
  1650. // Add limit, skip and maxTimeMS if defined
  1651. if (typeof skip === 'number') cmd.skip = skip;
  1652. if (typeof limit === 'number') cmd.limit = limit;
  1653. if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
  1654. if (hint) cmd.hint = hint;
  1655. options = shallowClone(options);
  1656. // Ensure we have the right read preference inheritance
  1657. options = getReadPreference(self, options, self.s.db);
  1658. // Do we have a readConcern specified
  1659. decorateWithReadConcern(cmd, self, options);
  1660. // Have we specified collation
  1661. decorateWithCollation(cmd, self, options);
  1662. // Execute command
  1663. self.s.db.command(cmd, options, function(err, result) {
  1664. if (err) return handleCallback(callback, err);
  1665. handleCallback(callback, null, result.n);
  1666. });
  1667. };
  1668. /**
  1669. * The distinct command returns returns a list of distinct values for the given key across a collection.
  1670. * @method
  1671. * @param {string} key Field of the document to find distinct values for.
  1672. * @param {object} query The query for filtering the set of documents to which we apply the distinct filter.
  1673. * @param {object} [options=null] Optional settings.
  1674. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  1675. * @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
  1676. * @param {ClientSession} [options.session] optional session to use for this operation
  1677. * @param {Collection~resultCallback} [callback] The command result callback
  1678. * @return {Promise} returns Promise if no callback passed
  1679. */
  1680. Collection.prototype.distinct = function(key, query, options, callback) {
  1681. var args = Array.prototype.slice.call(arguments, 1);
  1682. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  1683. var queryOption = args.length ? args.shift() || {} : {};
  1684. var optionsOption = args.length ? args.shift() || {} : {};
  1685. return executeOperation(this.s.topology, distinct, [
  1686. this,
  1687. key,
  1688. queryOption,
  1689. optionsOption,
  1690. callback
  1691. ]);
  1692. };
  1693. var distinct = function(self, key, query, options, callback) {
  1694. // maxTimeMS option
  1695. var maxTimeMS = options.maxTimeMS;
  1696. // Distinct command
  1697. var cmd = {
  1698. distinct: self.s.name,
  1699. key: key,
  1700. query: query
  1701. };
  1702. options = shallowClone(options);
  1703. // Ensure we have the right read preference inheritance
  1704. options = getReadPreference(self, options, self.s.db, self);
  1705. // Add maxTimeMS if defined
  1706. if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
  1707. // Do we have a readConcern specified
  1708. decorateWithReadConcern(cmd, self, options);
  1709. // Have we specified collation
  1710. decorateWithCollation(cmd, self, options);
  1711. // Execute the command
  1712. self.s.db.command(cmd, options, function(err, result) {
  1713. if (err) return handleCallback(callback, err);
  1714. handleCallback(callback, null, result.values);
  1715. });
  1716. };
  1717. /**
  1718. * Retrieve all the indexes on the collection.
  1719. * @method
  1720. * @param {Object} [options] Optional settings
  1721. * @param {ClientSession} [options.session] optional session to use for this operation
  1722. * @param {Collection~resultCallback} [callback] The command result callback
  1723. * @return {Promise} returns Promise if no callback passed
  1724. */
  1725. Collection.prototype.indexes = function(options, callback) {
  1726. if (typeof options === 'function') (callback = options), (options = {});
  1727. options = options || {};
  1728. return executeOperation(this.s.topology, indexes, [this, options, callback]);
  1729. };
  1730. var indexes = function(self, options, callback) {
  1731. options = Object.assign({}, { full: true }, options);
  1732. self.s.db.indexInformation(self.s.name, options, callback);
  1733. };
  1734. /**
  1735. * Get all the collection statistics.
  1736. *
  1737. * @method
  1738. * @param {object} [options=null] Optional settings.
  1739. * @param {number} [options.scale=null] Divide the returned sizes by scale value.
  1740. * @param {ClientSession} [options.session] optional session to use for this operation
  1741. * @param {Collection~resultCallback} [callback] The collection result callback
  1742. * @return {Promise} returns Promise if no callback passed
  1743. */
  1744. Collection.prototype.stats = function(options, callback) {
  1745. var args = Array.prototype.slice.call(arguments, 0);
  1746. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  1747. options = args.length ? args.shift() || {} : {};
  1748. return executeOperation(this.s.topology, stats, [this, options, callback]);
  1749. };
  1750. var stats = function(self, options, callback) {
  1751. // Build command object
  1752. var commandObject = {
  1753. collStats: self.s.name
  1754. };
  1755. // Check if we have the scale value
  1756. if (options['scale'] != null) commandObject['scale'] = options['scale'];
  1757. options = shallowClone(options);
  1758. // Ensure we have the right read preference inheritance
  1759. options = getReadPreference(self, options, self.s.db, self);
  1760. // Execute the command
  1761. self.s.db.command(commandObject, options, callback);
  1762. };
  1763. /**
  1764. * @typedef {Object} Collection~findAndModifyWriteOpResult
  1765. * @property {object} value Document returned from findAndModify command.
  1766. * @property {object} lastErrorObject The raw lastErrorObject returned from the command.
  1767. * @property {Number} ok Is 1 if the command executed correctly.
  1768. */
  1769. /**
  1770. * The callback format for inserts
  1771. * @callback Collection~findAndModifyCallback
  1772. * @param {MongoError} error An error instance representing the error during the execution.
  1773. * @param {Collection~findAndModifyWriteOpResult} result The result object if the command was executed successfully.
  1774. */
  1775. /**
  1776. * Find a document and delete it in one atomic operation, requires a write lock for the duration of the operation.
  1777. *
  1778. * @method
  1779. * @param {object} filter Document selection filter.
  1780. * @param {object} [options=null] Optional settings.
  1781. * @param {object} [options.projection=null] Limits the fields to return for all matching documents.
  1782. * @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
  1783. * @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
  1784. * @param {ClientSession} [options.session] optional session to use for this operation
  1785. * @param {Collection~findAndModifyCallback} [callback] The collection result callback
  1786. * @return {Promise} returns Promise if no callback passed
  1787. */
  1788. Collection.prototype.findOneAndDelete = function(filter, options, callback) {
  1789. if (typeof options === 'function') (callback = options), (options = {});
  1790. options = options || {};
  1791. // Basic validation
  1792. if (filter == null || typeof filter !== 'object')
  1793. throw toError('filter parameter must be an object');
  1794. return executeOperation(this.s.topology, findOneAndDelete, [this, filter, options, callback]);
  1795. };
  1796. var findOneAndDelete = function(self, filter, options, callback) {
  1797. // Final options
  1798. var finalOptions = shallowClone(options);
  1799. finalOptions['fields'] = options.projection;
  1800. finalOptions['remove'] = true;
  1801. // Execute find and Modify
  1802. self.findAndModify(filter, options.sort, null, finalOptions, callback);
  1803. };
  1804. /**
  1805. * Find a document and replace it in one atomic operation, requires a write lock for the duration of the operation.
  1806. *
  1807. * @method
  1808. * @param {object} filter Document selection filter.
  1809. * @param {object} replacement Document replacing the matching document.
  1810. * @param {object} [options=null] Optional settings.
  1811. * @param {object} [options.projection=null] Limits the fields to return for all matching documents.
  1812. * @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
  1813. * @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
  1814. * @param {boolean} [options.upsert=false] Upsert the document if it does not exist.
  1815. * @param {boolean} [options.returnOriginal=true] When false, returns the updated document rather than the original. The default is true.
  1816. * @param {ClientSession} [options.session] optional session to use for this operation
  1817. * @param {Collection~findAndModifyCallback} [callback] The collection result callback
  1818. * @return {Promise} returns Promise if no callback passed
  1819. */
  1820. Collection.prototype.findOneAndReplace = function(filter, replacement, options, callback) {
  1821. if (typeof options === 'function') (callback = options), (options = {});
  1822. options = options || {};
  1823. // Basic validation
  1824. if (filter == null || typeof filter !== 'object')
  1825. throw toError('filter parameter must be an object');
  1826. if (replacement == null || typeof replacement !== 'object')
  1827. throw toError('replacement parameter must be an object');
  1828. return executeOperation(this.s.topology, findOneAndReplace, [
  1829. this,
  1830. filter,
  1831. replacement,
  1832. options,
  1833. callback
  1834. ]);
  1835. };
  1836. var findOneAndReplace = function(self, filter, replacement, options, callback) {
  1837. // Final options
  1838. var finalOptions = shallowClone(options);
  1839. finalOptions['fields'] = options.projection;
  1840. finalOptions['update'] = true;
  1841. finalOptions['new'] = options.returnOriginal !== void 0 ? !options.returnOriginal : false;
  1842. finalOptions['upsert'] = options.upsert !== void 0 ? !!options.upsert : false;
  1843. // Execute findAndModify
  1844. self.findAndModify(filter, options.sort, replacement, finalOptions, callback);
  1845. };
  1846. /**
  1847. * Find a document and update it in one atomic operation, requires a write lock for the duration of the operation.
  1848. *
  1849. * @method
  1850. * @param {object} filter Document selection filter.
  1851. * @param {object} update Update operations to be performed on the document
  1852. * @param {object} [options=null] Optional settings.
  1853. * @param {object} [options.projection=null] Limits the fields to return for all matching documents.
  1854. * @param {object} [options.sort=null] Determines which document the operation modifies if the query selects multiple documents.
  1855. * @param {number} [options.maxTimeMS=null] The maximum amount of time to allow the query to run.
  1856. * @param {boolean} [options.upsert=false] Upsert the document if it does not exist.
  1857. * @param {boolean} [options.returnOriginal=true] When false, returns the updated document rather than the original. The default is true.
  1858. * @param {ClientSession} [options.session] optional session to use for this operation
  1859. * @param {Collection~findAndModifyCallback} [callback] The collection result callback
  1860. * @return {Promise} returns Promise if no callback passed
  1861. */
  1862. Collection.prototype.findOneAndUpdate = function(filter, update, options, callback) {
  1863. if (typeof options === 'function') (callback = options), (options = {});
  1864. options = options || {};
  1865. // Basic validation
  1866. if (filter == null || typeof filter !== 'object')
  1867. throw toError('filter parameter must be an object');
  1868. if (update == null || typeof update !== 'object')
  1869. throw toError('update parameter must be an object');
  1870. return executeOperation(this.s.topology, findOneAndUpdate, [
  1871. this,
  1872. filter,
  1873. update,
  1874. options,
  1875. callback
  1876. ]);
  1877. };
  1878. var findOneAndUpdate = function(self, filter, update, options, callback) {
  1879. // Final options
  1880. var finalOptions = shallowClone(options);
  1881. finalOptions['fields'] = options.projection;
  1882. finalOptions['update'] = true;
  1883. finalOptions['new'] =
  1884. typeof options.returnOriginal === 'boolean' ? !options.returnOriginal : false;
  1885. finalOptions['upsert'] = typeof options.upsert === 'boolean' ? options.upsert : false;
  1886. // Execute findAndModify
  1887. self.findAndModify(filter, options.sort, update, finalOptions, callback);
  1888. };
  1889. /**
  1890. * Find and update a document.
  1891. * @method
  1892. * @param {object} query Query object to locate the object to modify.
  1893. * @param {array} sort If multiple docs match, choose the first one in the specified sort order as the object to manipulate.
  1894. * @param {object} doc The fields/vals to be updated.
  1895. * @param {object} [options=null] Optional settings.
  1896. * @param {(number|string)} [options.w=null] The write concern.
  1897. * @param {number} [options.wtimeout=null] The write concern timeout.
  1898. * @param {boolean} [options.j=false] Specify a journal write concern.
  1899. * @param {boolean} [options.remove=false] Set to true to remove the object before returning.
  1900. * @param {boolean} [options.upsert=false] Perform an upsert operation.
  1901. * @param {boolean} [options.new=false] Set to true if you want to return the modified object rather than the original. Ignored for remove.
  1902. * @param {object} [options.projection=null] Object containing the field projection for the result returned from the operation.
  1903. * @param {object} [options.fields=null] **Deprecated** Use `options.projection` instead
  1904. * @param {ClientSession} [options.session] optional session to use for this operation
  1905. * @param {Collection~findAndModifyCallback} [callback] The command result callback
  1906. * @return {Promise} returns Promise if no callback passed
  1907. * @deprecated use findOneAndUpdate, findOneAndReplace or findOneAndDelete instead
  1908. */
  1909. Collection.prototype.findAndModify = function(query, sort, doc, options, callback) {
  1910. var args = Array.prototype.slice.call(arguments, 1);
  1911. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  1912. sort = args.length ? args.shift() || [] : [];
  1913. doc = args.length ? args.shift() : null;
  1914. options = args.length ? args.shift() || {} : {};
  1915. // Clone options
  1916. options = shallowClone(options);
  1917. // Force read preference primary
  1918. options.readPreference = ReadPreference.PRIMARY;
  1919. return executeOperation(this.s.topology, findAndModify, [
  1920. this,
  1921. query,
  1922. sort,
  1923. doc,
  1924. options,
  1925. callback
  1926. ]);
  1927. };
  1928. var findAndModify = function(self, query, sort, doc, options, callback) {
  1929. // Create findAndModify command object
  1930. var queryObject = {
  1931. findandmodify: self.s.name,
  1932. query: query
  1933. };
  1934. sort = formattedOrderClause(sort);
  1935. if (sort) {
  1936. queryObject.sort = sort;
  1937. }
  1938. queryObject.new = options.new ? true : false;
  1939. queryObject.remove = options.remove ? true : false;
  1940. queryObject.upsert = options.upsert ? true : false;
  1941. const projection = options.projection || options.fields;
  1942. if (projection) {
  1943. queryObject.fields = projection;
  1944. }
  1945. if (options.arrayFilters) {
  1946. queryObject.arrayFilters = options.arrayFilters;
  1947. delete options.arrayFilters;
  1948. }
  1949. if (doc && !options.remove) {
  1950. queryObject.update = doc;
  1951. }
  1952. if (options.maxTimeMS) queryObject.maxTimeMS = options.maxTimeMS;
  1953. // Either use override on the function, or go back to default on either the collection
  1954. // level or db
  1955. if (options['serializeFunctions'] != null) {
  1956. options['serializeFunctions'] = options['serializeFunctions'];
  1957. } else {
  1958. options['serializeFunctions'] = self.s.serializeFunctions;
  1959. }
  1960. // No check on the documents
  1961. options.checkKeys = false;
  1962. // Get the write concern settings
  1963. var finalOptions = applyWriteConcern(options, { db: self.s.db, collection: self }, options);
  1964. // Decorate the findAndModify command with the write Concern
  1965. if (finalOptions.writeConcern) {
  1966. queryObject.writeConcern = finalOptions.writeConcern;
  1967. }
  1968. // Have we specified bypassDocumentValidation
  1969. if (typeof finalOptions.bypassDocumentValidation === 'boolean') {
  1970. queryObject.bypassDocumentValidation = finalOptions.bypassDocumentValidation;
  1971. }
  1972. // Have we specified collation
  1973. decorateWithCollation(queryObject, self, finalOptions);
  1974. // Execute the command
  1975. self.s.db.command(queryObject, finalOptions, function(err, result) {
  1976. if (err) return handleCallback(callback, err, null);
  1977. return handleCallback(callback, null, result);
  1978. });
  1979. };
  1980. /**
  1981. * Find and remove a document.
  1982. * @method
  1983. * @param {object} query Query object to locate the object to modify.
  1984. * @param {array} sort If multiple docs match, choose the first one in the specified sort order as the object to manipulate.
  1985. * @param {object} [options=null] Optional settings.
  1986. * @param {(number|string)} [options.w=null] The write concern.
  1987. * @param {number} [options.wtimeout=null] The write concern timeout.
  1988. * @param {boolean} [options.j=false] Specify a journal write concern.
  1989. * @param {ClientSession} [options.session] optional session to use for this operation
  1990. * @param {Collection~resultCallback} [callback] The command result callback
  1991. * @return {Promise} returns Promise if no callback passed
  1992. * @deprecated use findOneAndDelete instead
  1993. */
  1994. Collection.prototype.findAndRemove = function(query, sort, options, callback) {
  1995. var args = Array.prototype.slice.call(arguments, 1);
  1996. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  1997. sort = args.length ? args.shift() || [] : [];
  1998. options = args.length ? args.shift() || {} : {};
  1999. return executeOperation(this.s.topology, findAndRemove, [this, query, sort, options, callback]);
  2000. };
  2001. var findAndRemove = function(self, query, sort, options, callback) {
  2002. // Add the remove option
  2003. options['remove'] = true;
  2004. // Execute the callback
  2005. self.findAndModify(query, sort, null, options, callback);
  2006. };
  2007. function decorateWithCollation(command, self, options) {
  2008. // Do we support collation 3.4 and higher
  2009. var capabilities = self.s.topology.capabilities();
  2010. // Do we support write concerns 3.4 and higher
  2011. if (capabilities && capabilities.commandsTakeCollation) {
  2012. if (options.collation && typeof options.collation === 'object') {
  2013. command.collation = options.collation;
  2014. }
  2015. }
  2016. }
  2017. function decorateWithReadConcern(command, self, options) {
  2018. let readConcern = Object.assign({}, command.readConcern || {});
  2019. if (self.s.readConcern) {
  2020. Object.assign(readConcern, self.s.readConcern);
  2021. }
  2022. if (
  2023. options.session &&
  2024. options.session.supports.causalConsistency &&
  2025. options.session.operationTime
  2026. ) {
  2027. Object.assign(readConcern, { afterClusterTime: options.session.operationTime });
  2028. }
  2029. if (Object.keys(readConcern).length > 0) {
  2030. Object.assign(command, { readConcern: readConcern });
  2031. }
  2032. }
  2033. /**
  2034. * Execute an aggregation framework pipeline against the collection, needs MongoDB >= 2.2
  2035. * @method
  2036. * @param {object} pipeline Array containing all the aggregation framework commands for the execution.
  2037. * @param {object} [options=null] Optional settings.
  2038. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  2039. * @param {object} [options.cursor=null] Return the query as cursor, on 2.6 > it returns as a real cursor on pre 2.6 it returns as an emulated cursor.
  2040. * @param {number} [options.cursor.batchSize=null] The batchSize for the cursor
  2041. * @param {boolean} [options.explain=false] Explain returns the aggregation execution plan (requires mongodb 2.6 >).
  2042. * @param {boolean} [options.allowDiskUse=false] allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 >).
  2043. * @param {number} [options.maxTimeMS=null] maxTimeMS specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point.
  2044. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  2045. * @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
  2046. * @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
  2047. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  2048. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  2049. * @param {object} [options.collation=null] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
  2050. * @param {string} [options.comment] Add a comment to an aggregation command
  2051. * @param {ClientSession} [options.session] optional session to use for this operation
  2052. * @param {Collection~aggregationCallback} callback The command result callback
  2053. * @return {(null|AggregationCursor)}
  2054. */
  2055. Collection.prototype.aggregate = function(pipeline, options, callback) {
  2056. var self = this;
  2057. if (Array.isArray(pipeline)) {
  2058. // Set up callback if one is provided
  2059. if (typeof options === 'function') {
  2060. callback = options;
  2061. options = {};
  2062. }
  2063. // If we have no options or callback we are doing
  2064. // a cursor based aggregation
  2065. if (options == null && callback == null) {
  2066. options = {};
  2067. }
  2068. } else {
  2069. // Aggregation pipeline passed as arguments on the method
  2070. var args = Array.prototype.slice.call(arguments, 0);
  2071. // Get the callback
  2072. callback = args.pop();
  2073. // Get the possible options object
  2074. var opts = args[args.length - 1];
  2075. // If it contains any of the admissible options pop it of the args
  2076. options =
  2077. opts &&
  2078. (opts.readPreference ||
  2079. opts.explain ||
  2080. opts.cursor ||
  2081. opts.out ||
  2082. opts.maxTimeMS ||
  2083. opts.hint ||
  2084. opts.allowDiskUse)
  2085. ? args.pop()
  2086. : {};
  2087. // Left over arguments is the pipeline
  2088. pipeline = args;
  2089. }
  2090. // Ignore readConcern option
  2091. var ignoreReadConcern = false;
  2092. // Build the command
  2093. var command = { aggregate: this.s.name, pipeline: pipeline };
  2094. // If out was specified
  2095. if (typeof options.out === 'string') {
  2096. pipeline.push({ $out: options.out });
  2097. // Ignore read concern
  2098. ignoreReadConcern = true;
  2099. } else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
  2100. ignoreReadConcern = true;
  2101. }
  2102. // Decorate command with writeConcern if out has been specified
  2103. if (
  2104. pipeline.length > 0 &&
  2105. pipeline[pipeline.length - 1]['$out'] &&
  2106. self.s.topology.capabilities().commandsTakeWriteConcern
  2107. ) {
  2108. applyWriteConcern(command, { db: self.s.db, collection: self }, options);
  2109. }
  2110. // Have we specified collation
  2111. decorateWithCollation(command, self, options);
  2112. // If we have bypassDocumentValidation set
  2113. if (typeof options.bypassDocumentValidation === 'boolean') {
  2114. command.bypassDocumentValidation = options.bypassDocumentValidation;
  2115. }
  2116. // Do we have a readConcern specified
  2117. if (!ignoreReadConcern) {
  2118. decorateWithReadConcern(command, self, options);
  2119. }
  2120. // If we have allowDiskUse defined
  2121. if (options.allowDiskUse) command.allowDiskUse = options.allowDiskUse;
  2122. if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
  2123. // If we are giving a hint
  2124. if (options.hint) command.hint = options.hint;
  2125. options = shallowClone(options);
  2126. // Ensure we have the right read preference inheritance
  2127. options = getReadPreference(this, options, this.s.db, this);
  2128. // If explain has been specified add it
  2129. if (options.explain) {
  2130. if (command.readConcern || command.writeConcern) {
  2131. throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
  2132. }
  2133. command.explain = options.explain;
  2134. }
  2135. if (typeof options.comment === 'string') command.comment = options.comment;
  2136. // Validate that cursor options is valid
  2137. if (options.cursor != null && typeof options.cursor !== 'object') {
  2138. throw toError('cursor options must be an object');
  2139. }
  2140. options.cursor = options.cursor || {};
  2141. if (options.batchSize) options.cursor.batchSize = options.batchSize;
  2142. command.cursor = options.cursor;
  2143. // promiseLibrary
  2144. options.promiseLibrary = this.s.promiseLibrary;
  2145. // Set the AggregationCursor constructor
  2146. options.cursorFactory = AggregationCursor;
  2147. if (typeof callback !== 'function') {
  2148. if (!this.s.topology.capabilities()) {
  2149. throw new MongoError('cannot connect to server');
  2150. }
  2151. // Allow disk usage command
  2152. if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
  2153. if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
  2154. // Execute the cursor
  2155. return this.s.topology.cursor(this.s.namespace, command, options);
  2156. }
  2157. return handleCallback(callback, null, this.s.topology.cursor(this.s.namespace, command, options));
  2158. };
  2159. /**
  2160. * Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
  2161. * @method
  2162. * @since 3.0.0
  2163. * @param {Array} [pipeline=null] An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
  2164. * @param {object} [options=null] Optional settings
  2165. * @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  2166. * @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
  2167. * @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
  2168. * @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://www.mongodb.com/docs/manual/reference/command/aggregate|aggregation documentation}.
  2169. * @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://www.mongodb.com/docs/manual/reference/command/aggregate|aggregation documentation}.
  2170. * @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://www.mongodb.com/docs/manual/reference/read-preference|read preference documentation}.
  2171. * @param {ClientSession} [options.session] optional session to use for this operation
  2172. * @return {ChangeStream} a ChangeStream instance.
  2173. */
  2174. Collection.prototype.watch = function(pipeline, options) {
  2175. pipeline = pipeline || [];
  2176. options = options || {};
  2177. // Allow optionally not specifying a pipeline
  2178. if (!Array.isArray(pipeline)) {
  2179. options = pipeline;
  2180. pipeline = [];
  2181. }
  2182. return new ChangeStream(this, pipeline, options);
  2183. };
  2184. /**
  2185. * The callback format for results
  2186. * @callback Collection~parallelCollectionScanCallback
  2187. * @param {MongoError} error An error instance representing the error during the execution.
  2188. * @param {Cursor[]} cursors A list of cursors returned allowing for parallel reading of collection.
  2189. */
  2190. /**
  2191. * Return N number of parallel cursors for a collection allowing parallel reading of entire collection. There are
  2192. * no ordering guarantees for returned results.
  2193. * @method
  2194. * @param {object} [options=null] Optional settings.
  2195. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  2196. * @param {number} [options.batchSize=null] Set the batchSize for the getMoreCommand when iterating over the query results.
  2197. * @param {number} [options.numCursors=1] The maximum number of parallel command cursors to return (the number of returned cursors will be in the range 1:numCursors)
  2198. * @param {boolean} [options.raw=false] Return all BSON documents as Raw Buffer documents.
  2199. * @param {Collection~parallelCollectionScanCallback} [callback] The command result callback
  2200. * @return {Promise} returns Promise if no callback passed
  2201. */
  2202. Collection.prototype.parallelCollectionScan = function(options, callback) {
  2203. if (typeof options === 'function') (callback = options), (options = { numCursors: 1 });
  2204. // Set number of cursors to 1
  2205. options.numCursors = options.numCursors || 1;
  2206. options.batchSize = options.batchSize || 1000;
  2207. options = shallowClone(options);
  2208. // Ensure we have the right read preference inheritance
  2209. options = getReadPreference(this, options, this.s.db, this);
  2210. // Add a promiseLibrary
  2211. options.promiseLibrary = this.s.promiseLibrary;
  2212. if (options.session) {
  2213. options.session = undefined;
  2214. }
  2215. return executeOperation(this.s.topology, parallelCollectionScan, [this, options, callback], {
  2216. skipSessions: true
  2217. });
  2218. };
  2219. var parallelCollectionScan = function(self, options, callback) {
  2220. // Create command object
  2221. var commandObject = {
  2222. parallelCollectionScan: self.s.name,
  2223. numCursors: options.numCursors
  2224. };
  2225. // Do we have a readConcern specified
  2226. decorateWithReadConcern(commandObject, self, options);
  2227. // Store the raw value
  2228. var raw = options.raw;
  2229. delete options['raw'];
  2230. // Execute the command
  2231. self.s.db.command(commandObject, options, function(err, result) {
  2232. if (err) return handleCallback(callback, err, null);
  2233. if (result == null)
  2234. return handleCallback(
  2235. callback,
  2236. new Error('no result returned for parallelCollectionScan'),
  2237. null
  2238. );
  2239. var cursors = [];
  2240. // Add the raw back to the option
  2241. if (raw) options.raw = raw;
  2242. // Create command cursors for each item
  2243. for (var i = 0; i < result.cursors.length; i++) {
  2244. var rawId = result.cursors[i].cursor.id;
  2245. // Convert cursorId to Long if needed
  2246. var cursorId = typeof rawId === 'number' ? Long.fromNumber(rawId) : rawId;
  2247. // Add a command cursor
  2248. cursors.push(self.s.topology.cursor(self.s.namespace, cursorId, options));
  2249. }
  2250. handleCallback(callback, null, cursors);
  2251. });
  2252. };
  2253. /**
  2254. * Execute a geo search using a geo haystack index on a collection.
  2255. *
  2256. * @method
  2257. * @param {number} x Point to search on the x axis, ensure the indexes are ordered in the same order.
  2258. * @param {number} y Point to search on the y axis, ensure the indexes are ordered in the same order.
  2259. * @param {object} [options=null] Optional settings.
  2260. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  2261. * @param {number} [options.maxDistance=null] Include results up to maxDistance from the point.
  2262. * @param {object} [options.search=null] Filter the results by a query.
  2263. * @param {number} [options.limit=false] Max number of results to return.
  2264. * @param {ClientSession} [options.session] optional session to use for this operation
  2265. * @param {Collection~resultCallback} [callback] The command result callback
  2266. * @return {Promise} returns Promise if no callback passed
  2267. */
  2268. Collection.prototype.geoHaystackSearch = function(x, y, options, callback) {
  2269. var args = Array.prototype.slice.call(arguments, 2);
  2270. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  2271. options = args.length ? args.shift() || {} : {};
  2272. return executeOperation(this.s.topology, geoHaystackSearch, [this, x, y, options, callback]);
  2273. };
  2274. var geoHaystackSearch = function(self, x, y, options, callback) {
  2275. // Build command object
  2276. var commandObject = {
  2277. geoSearch: self.s.name,
  2278. near: [x, y]
  2279. };
  2280. // Remove read preference from hash if it exists
  2281. commandObject = decorateCommand(commandObject, options, { readPreference: true, session: true });
  2282. options = shallowClone(options);
  2283. // Ensure we have the right read preference inheritance
  2284. options = getReadPreference(self, options, self.s.db, self);
  2285. // Do we have a readConcern specified
  2286. decorateWithReadConcern(commandObject, self, options);
  2287. // Execute the command
  2288. self.s.db.command(commandObject, options, function(err, res) {
  2289. if (err) return handleCallback(callback, err);
  2290. if (res.err || res.errmsg) handleCallback(callback, toError(res));
  2291. // should we only be returning res.results here? Not sure if the user
  2292. // should see the other return information
  2293. handleCallback(callback, null, res);
  2294. });
  2295. };
  2296. /**
  2297. * Group function helper
  2298. * @ignore
  2299. */
  2300. // var groupFunction = function () {
  2301. // var c = db[ns].find(condition);
  2302. // var map = new Map();
  2303. // var reduce_function = reduce;
  2304. //
  2305. // while (c.hasNext()) {
  2306. // var obj = c.next();
  2307. // var key = {};
  2308. //
  2309. // for (var i = 0, len = keys.length; i < len; ++i) {
  2310. // var k = keys[i];
  2311. // key[k] = obj[k];
  2312. // }
  2313. //
  2314. // var aggObj = map.get(key);
  2315. //
  2316. // if (aggObj == null) {
  2317. // var newObj = Object.extend({}, key);
  2318. // aggObj = Object.extend(newObj, initial);
  2319. // map.put(key, aggObj);
  2320. // }
  2321. //
  2322. // reduce_function(obj, aggObj);
  2323. // }
  2324. //
  2325. // return { "result": map.values() };
  2326. // }.toString();
  2327. var groupFunction =
  2328. 'function () {\nvar c = db[ns].find(condition);\nvar map = new Map();\nvar reduce_function = reduce;\n\nwhile (c.hasNext()) {\nvar obj = c.next();\nvar key = {};\n\nfor (var i = 0, len = keys.length; i < len; ++i) {\nvar k = keys[i];\nkey[k] = obj[k];\n}\n\nvar aggObj = map.get(key);\n\nif (aggObj == null) {\nvar newObj = Object.extend({}, key);\naggObj = Object.extend(newObj, initial);\nmap.put(key, aggObj);\n}\n\nreduce_function(obj, aggObj);\n}\n\nreturn { "result": map.values() };\n}';
  2329. /**
  2330. * Run a group command across a collection
  2331. *
  2332. * @method
  2333. * @param {(object|array|function|code)} keys An object, array or function expressing the keys to group by.
  2334. * @param {object} condition An optional condition that must be true for a row to be considered.
  2335. * @param {object} initial Initial value of the aggregation counter object.
  2336. * @param {(function|Code)} reduce The reduce function aggregates (reduces) the objects iterated
  2337. * @param {(function|Code)} finalize An optional function to be run on each item in the result set just before the item is returned.
  2338. * @param {boolean} command Specify if you wish to run using the internal group command or using eval, default is true.
  2339. * @param {object} [options=null] Optional settings.
  2340. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  2341. * @param {ClientSession} [options.session] optional session to use for this operation
  2342. * @param {Collection~resultCallback} [callback] The command result callback
  2343. * @return {Promise} returns Promise if no callback passed
  2344. * @deprecated MongoDB 3.6 or higher will no longer support the group command. We recommend rewriting using the aggregation framework.
  2345. */
  2346. Collection.prototype.group = function(
  2347. keys,
  2348. condition,
  2349. initial,
  2350. reduce,
  2351. finalize,
  2352. command,
  2353. options,
  2354. callback
  2355. ) {
  2356. var args = Array.prototype.slice.call(arguments, 3);
  2357. callback = typeof args[args.length - 1] === 'function' ? args.pop() : undefined;
  2358. reduce = args.length ? args.shift() : null;
  2359. finalize = args.length ? args.shift() : null;
  2360. command = args.length ? args.shift() : null;
  2361. options = args.length ? args.shift() || {} : {};
  2362. // Make sure we are backward compatible
  2363. if (!(typeof finalize === 'function')) {
  2364. command = finalize;
  2365. finalize = null;
  2366. }
  2367. if (
  2368. !Array.isArray(keys) &&
  2369. keys instanceof Object &&
  2370. typeof keys !== 'function' &&
  2371. !(keys._bsontype === 'Code')
  2372. ) {
  2373. keys = Object.keys(keys);
  2374. }
  2375. if (typeof reduce === 'function') {
  2376. reduce = reduce.toString();
  2377. }
  2378. if (typeof finalize === 'function') {
  2379. finalize = finalize.toString();
  2380. }
  2381. // Set up the command as default
  2382. command = command == null ? true : command;
  2383. return executeOperation(this.s.topology, group, [
  2384. this,
  2385. keys,
  2386. condition,
  2387. initial,
  2388. reduce,
  2389. finalize,
  2390. command,
  2391. options,
  2392. callback
  2393. ]);
  2394. };
  2395. var group = function(self, keys, condition, initial, reduce, finalize, command, options, callback) {
  2396. // Execute using the command
  2397. if (command) {
  2398. var reduceFunction = reduce && reduce._bsontype === 'Code' ? reduce : new Code(reduce);
  2399. var selector = {
  2400. group: {
  2401. ns: self.s.name,
  2402. $reduce: reduceFunction,
  2403. cond: condition,
  2404. initial: initial,
  2405. out: 'inline'
  2406. }
  2407. };
  2408. // if finalize is defined
  2409. if (finalize != null) selector.group['finalize'] = finalize;
  2410. // Set up group selector
  2411. if ('function' === typeof keys || (keys && keys._bsontype === 'Code')) {
  2412. selector.group.$keyf = keys && keys._bsontype === 'Code' ? keys : new Code(keys);
  2413. } else {
  2414. var hash = {};
  2415. keys.forEach(function(key) {
  2416. hash[key] = 1;
  2417. });
  2418. selector.group.key = hash;
  2419. }
  2420. options = shallowClone(options);
  2421. // Ensure we have the right read preference inheritance
  2422. options = getReadPreference(self, options, self.s.db, self);
  2423. // Do we have a readConcern specified
  2424. decorateWithReadConcern(selector, self, options);
  2425. // Have we specified collation
  2426. decorateWithCollation(selector, self, options);
  2427. // Execute command
  2428. self.s.db.command(selector, options, function(err, result) {
  2429. if (err) return handleCallback(callback, err, null);
  2430. handleCallback(callback, null, result.retval);
  2431. });
  2432. } else {
  2433. // Create execution scope
  2434. var scope = reduce != null && reduce._bsontype === 'Code' ? reduce.scope : {};
  2435. scope.ns = self.s.name;
  2436. scope.keys = keys;
  2437. scope.condition = condition;
  2438. scope.initial = initial;
  2439. // Pass in the function text to execute within mongodb.
  2440. var groupfn = groupFunction.replace(/ reduce;/, reduce.toString() + ';');
  2441. self.s.db.eval(new Code(groupfn, scope), null, options, function(err, results) {
  2442. if (err) return handleCallback(callback, err, null);
  2443. handleCallback(callback, null, results.result || results);
  2444. });
  2445. }
  2446. };
  2447. /**
  2448. * Functions that are passed as scope args must
  2449. * be converted to Code instances.
  2450. * @ignore
  2451. */
  2452. function processScope(scope) {
  2453. if (!isObject(scope) || scope._bsontype === 'ObjectID') {
  2454. return scope;
  2455. }
  2456. var keys = Object.keys(scope);
  2457. var i = keys.length;
  2458. var key;
  2459. var new_scope = {};
  2460. while (i--) {
  2461. key = keys[i];
  2462. if ('function' === typeof scope[key]) {
  2463. new_scope[key] = new Code(String(scope[key]));
  2464. } else {
  2465. new_scope[key] = processScope(scope[key]);
  2466. }
  2467. }
  2468. return new_scope;
  2469. }
  2470. /**
  2471. * Run Map Reduce across a collection. Be aware that the inline option for out will return an array of results not a collection.
  2472. *
  2473. * @method
  2474. * @param {(function|string)} map The mapping function.
  2475. * @param {(function|string)} reduce The reduce function.
  2476. * @param {object} [options=null] Optional settings.
  2477. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  2478. * @param {object} [options.out=null] Sets the output target for the map reduce job. *{inline:1} | {replace:'collectionName'} | {merge:'collectionName'} | {reduce:'collectionName'}*
  2479. * @param {object} [options.query=null] Query filter object.
  2480. * @param {object} [options.sort=null] Sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces.
  2481. * @param {number} [options.limit=null] Number of objects to return from collection.
  2482. * @param {boolean} [options.keeptemp=false] Keep temporary data.
  2483. * @param {(function|string)} [options.finalize=null] Finalize function.
  2484. * @param {object} [options.scope=null] Can pass in variables that can be access from map/reduce/finalize.
  2485. * @param {boolean} [options.jsMode=false] It is possible to make the execution stay in JS. Provided in MongoDB > 2.0.X.
  2486. * @param {boolean} [options.verbose=false] Provide statistics on job execution time.
  2487. * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
  2488. * @param {ClientSession} [options.session] optional session to use for this operation
  2489. * @param {Collection~resultCallback} [callback] The command result callback
  2490. * @throws {MongoError}
  2491. * @return {Promise} returns Promise if no callback passed
  2492. */
  2493. Collection.prototype.mapReduce = function(map, reduce, options, callback) {
  2494. if ('function' === typeof options) (callback = options), (options = {});
  2495. // Out must allways be defined (make sure we don't break weirdly on pre 1.8+ servers)
  2496. if (null == options.out) {
  2497. throw new Error(
  2498. 'the out option parameter must be defined, see mongodb docs for possible values'
  2499. );
  2500. }
  2501. if ('function' === typeof map) {
  2502. map = map.toString();
  2503. }
  2504. if ('function' === typeof reduce) {
  2505. reduce = reduce.toString();
  2506. }
  2507. if ('function' === typeof options.finalize) {
  2508. options.finalize = options.finalize.toString();
  2509. }
  2510. return executeOperation(this.s.topology, mapReduce, [this, map, reduce, options, callback]);
  2511. };
  2512. var mapReduce = function(self, map, reduce, options, callback) {
  2513. var mapCommandHash = {
  2514. mapreduce: self.s.name,
  2515. map: map,
  2516. reduce: reduce
  2517. };
  2518. // Exclusion list
  2519. var exclusionList = ['readPreference', 'session'];
  2520. // Add any other options passed in
  2521. for (var n in options) {
  2522. if ('scope' === n) {
  2523. mapCommandHash[n] = processScope(options[n]);
  2524. } else {
  2525. // Only include if not in exclusion list
  2526. if (exclusionList.indexOf(n) === -1) {
  2527. mapCommandHash[n] = options[n];
  2528. }
  2529. }
  2530. }
  2531. options = shallowClone(options);
  2532. // Ensure we have the right read preference inheritance
  2533. options = getReadPreference(self, options, self.s.db, self);
  2534. // If we have a read preference and inline is not set as output fail hard
  2535. if (
  2536. options.readPreference !== false &&
  2537. options.readPreference !== 'primary' &&
  2538. options['out'] &&
  2539. (options['out'].inline !== 1 && options['out'] !== 'inline')
  2540. ) {
  2541. // Force readPreference to primary
  2542. options.readPreference = 'primary';
  2543. // Decorate command with writeConcern if supported
  2544. applyWriteConcern(mapCommandHash, { db: self.s.db, collection: self }, options);
  2545. } else {
  2546. decorateWithReadConcern(mapCommandHash, self, options);
  2547. }
  2548. // Is bypassDocumentValidation specified
  2549. if (typeof options.bypassDocumentValidation === 'boolean') {
  2550. mapCommandHash.bypassDocumentValidation = options.bypassDocumentValidation;
  2551. }
  2552. // Have we specified collation
  2553. decorateWithCollation(mapCommandHash, self, options);
  2554. // Execute command
  2555. self.s.db.command(mapCommandHash, options, function(err, result) {
  2556. if (err) return handleCallback(callback, err);
  2557. // Check if we have an error
  2558. if (1 !== result.ok || result.err || result.errmsg) {
  2559. return handleCallback(callback, toError(result));
  2560. }
  2561. // Create statistics value
  2562. var stats = {};
  2563. if (result.timeMillis) stats['processtime'] = result.timeMillis;
  2564. if (result.counts) stats['counts'] = result.counts;
  2565. if (result.timing) stats['timing'] = result.timing;
  2566. // invoked with inline?
  2567. if (result.results) {
  2568. // If we wish for no verbosity
  2569. if (options['verbose'] == null || !options['verbose']) {
  2570. return handleCallback(callback, null, result.results);
  2571. }
  2572. return handleCallback(callback, null, { results: result.results, stats: stats });
  2573. }
  2574. // The returned collection
  2575. var collection = null;
  2576. // If we have an object it's a different db
  2577. if (result.result != null && typeof result.result === 'object') {
  2578. var doc = result.result;
  2579. // Return a collection from another db
  2580. var Db = require('./db');
  2581. collection = new Db(doc.db, self.s.db.s.topology, self.s.db.s.options).collection(
  2582. doc.collection
  2583. );
  2584. } else {
  2585. // Create a collection object that wraps the result collection
  2586. collection = self.s.db.collection(result.result);
  2587. }
  2588. // If we wish for no verbosity
  2589. if (options['verbose'] == null || !options['verbose']) {
  2590. return handleCallback(callback, err, collection);
  2591. }
  2592. // Return stats as third set of values
  2593. handleCallback(callback, err, { collection: collection, stats: stats });
  2594. });
  2595. };
  2596. /**
  2597. * Initiate a Out of order batch write operation. All operations will be buffered into insert/update/remove commands executed out of order.
  2598. *
  2599. * @method
  2600. * @param {object} [options=null] Optional settings.
  2601. * @param {(number|string)} [options.w=null] The write concern.
  2602. * @param {number} [options.wtimeout=null] The write concern timeout.
  2603. * @param {boolean} [options.j=false] Specify a journal write concern.
  2604. * @param {ClientSession} [options.session] optional session to use for this operation
  2605. * @return {UnorderedBulkOperation}
  2606. */
  2607. Collection.prototype.initializeUnorderedBulkOp = function(options) {
  2608. options = options || {};
  2609. options.promiseLibrary = this.s.promiseLibrary;
  2610. return unordered(this.s.topology, this, options);
  2611. };
  2612. /**
  2613. * Initiate an In order bulk write operation, operations will be serially executed in the order they are added, creating a new operation for each switch in types.
  2614. *
  2615. * @method
  2616. * @param {object} [options=null] Optional settings.
  2617. * @param {(number|string)} [options.w=null] The write concern.
  2618. * @param {number} [options.wtimeout=null] The write concern timeout.
  2619. * @param {boolean} [options.j=false] Specify a journal write concern.
  2620. * @param {ClientSession} [options.session] optional session to use for this operation
  2621. * @param {OrderedBulkOperation} callback The command result callback
  2622. * @return {null}
  2623. */
  2624. Collection.prototype.initializeOrderedBulkOp = function(options) {
  2625. options = options || {};
  2626. options.promiseLibrary = this.s.promiseLibrary;
  2627. return ordered(this.s.topology, this, options);
  2628. };
  2629. // Figure out the read preference
  2630. var getReadPreference = function(self, options, db) {
  2631. let r = null;
  2632. if (options.readPreference) {
  2633. r = options.readPreference;
  2634. } else if (self.s.readPreference) {
  2635. r = self.s.readPreference;
  2636. } else if (db.s.readPreference) {
  2637. r = db.s.readPreference;
  2638. } else {
  2639. return options;
  2640. }
  2641. if (typeof r === 'string') {
  2642. options.readPreference = new ReadPreference(r);
  2643. } else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
  2644. const mode = r.mode || r.preference;
  2645. if (mode && typeof mode === 'string') {
  2646. options.readPreference = new ReadPreference(mode, r.tags, {
  2647. maxStalenessSeconds: r.maxStalenessSeconds
  2648. });
  2649. }
  2650. } else if (!(r instanceof ReadPreference)) {
  2651. throw new TypeError('Invalid read preference: ' + r);
  2652. }
  2653. return options;
  2654. };
  2655. module.exports = Collection;