Source: lib/gridfs-stream/index.js

  1. 'use strict';
  2. var Emitter = require('events').EventEmitter;
  3. var GridFSBucketReadStream = require('./download');
  4. var GridFSBucketWriteStream = require('./upload');
  5. var shallowClone = require('../utils').shallowClone;
  6. var toError = require('../utils').toError;
  7. var util = require('util');
  8. var executeOperation = require('../utils').executeOperation;
  9. var DEFAULT_GRIDFS_BUCKET_OPTIONS = {
  10. bucketName: 'fs',
  11. chunkSizeBytes: 255 * 1024
  12. };
  13. module.exports = GridFSBucket;
  14. /**
  15. * Constructor for a streaming GridFS interface
  16. * @class
  17. * @param {Db} db A db handle
  18. * @param {object} [options=null] Optional settings.
  19. * @param {string} [options.bucketName="fs"] The 'files' and 'chunks' collections will be prefixed with the bucket name followed by a dot.
  20. * @param {number} [options.chunkSizeBytes=255 * 1024] Number of bytes stored in each chunk. Defaults to 255KB
  21. * @param {object} [options.writeConcern=null] Optional write concern to be passed to write operations, for instance `{ w: 1 }`
  22. * @param {object} [options.readPreference=null] Optional read preference to be passed to read operations
  23. * @fires GridFSBucketWriteStream#index
  24. * @return {GridFSBucket}
  25. */
  26. function GridFSBucket(db, options) {
  27. Emitter.apply(this);
  28. this.setMaxListeners(0);
  29. if (options && typeof options === 'object') {
  30. options = shallowClone(options);
  31. var keys = Object.keys(DEFAULT_GRIDFS_BUCKET_OPTIONS);
  32. for (var i = 0; i < keys.length; ++i) {
  33. if (!options[keys[i]]) {
  34. options[keys[i]] = DEFAULT_GRIDFS_BUCKET_OPTIONS[keys[i]];
  35. }
  36. }
  37. } else {
  38. options = DEFAULT_GRIDFS_BUCKET_OPTIONS;
  39. }
  40. this.s = {
  41. db: db,
  42. options: options,
  43. _chunksCollection: db.collection(options.bucketName + '.chunks'),
  44. _filesCollection: db.collection(options.bucketName + '.files'),
  45. checkedIndexes: false,
  46. calledOpenUploadStream: false,
  47. promiseLibrary: db.s.promiseLibrary || Promise
  48. };
  49. }
  50. util.inherits(GridFSBucket, Emitter);
  51. /**
  52. * When the first call to openUploadStream is made, the upload stream will
  53. * check to see if it needs to create the proper indexes on the chunks and
  54. * files collections. This event is fired either when 1) it determines that
  55. * no index creation is necessary, 2) when it successfully creates the
  56. * necessary indexes.
  57. *
  58. * @event GridFSBucket#index
  59. * @type {Error}
  60. */
  61. /**
  62. * Returns a writable stream (GridFSBucketWriteStream) for writing
  63. * buffers to GridFS. The stream's 'id' property contains the resulting
  64. * file's id.
  65. * @method
  66. * @param {string} filename The value of the 'filename' key in the files doc
  67. * @param {object} [options=null] Optional settings.
  68. * @param {number} [options.chunkSizeBytes=null] Optional overwrite this bucket's chunkSizeBytes for this file
  69. * @param {object} [options.metadata=null] Optional object to store in the file document's `metadata` field
  70. * @param {string} [options.contentType=null] Optional string to store in the file document's `contentType` field
  71. * @param {array} [options.aliases=null] Optional array of strings to store in the file document's `aliases` field
  72. * @return {GridFSBucketWriteStream}
  73. */
  74. GridFSBucket.prototype.openUploadStream = function(filename, options) {
  75. if (options) {
  76. options = shallowClone(options);
  77. } else {
  78. options = {};
  79. }
  80. if (!options.chunkSizeBytes) {
  81. options.chunkSizeBytes = this.s.options.chunkSizeBytes;
  82. }
  83. return new GridFSBucketWriteStream(this, filename, options);
  84. };
  85. /**
  86. * Returns a writable stream (GridFSBucketWriteStream) for writing
  87. * buffers to GridFS for a custom file id. The stream's 'id' property contains the resulting
  88. * file's id.
  89. * @method
  90. * @param {string|number|object} id A custom id used to identify the file
  91. * @param {string} filename The value of the 'filename' key in the files doc
  92. * @param {object} [options=null] Optional settings.
  93. * @param {number} [options.chunkSizeBytes=null] Optional overwrite this bucket's chunkSizeBytes for this file
  94. * @param {object} [options.metadata=null] Optional object to store in the file document's `metadata` field
  95. * @param {string} [options.contentType=null] Optional string to store in the file document's `contentType` field
  96. * @param {array} [options.aliases=null] Optional array of strings to store in the file document's `aliases` field
  97. * @return {GridFSBucketWriteStream}
  98. */
  99. GridFSBucket.prototype.openUploadStreamWithId = function(id, filename, options) {
  100. if (options) {
  101. options = shallowClone(options);
  102. } else {
  103. options = {};
  104. }
  105. if (!options.chunkSizeBytes) {
  106. options.chunkSizeBytes = this.s.options.chunkSizeBytes;
  107. }
  108. options.id = id;
  109. return new GridFSBucketWriteStream(this, filename, options);
  110. };
  111. /**
  112. * Returns a readable stream (GridFSBucketReadStream) for streaming file
  113. * data from GridFS.
  114. * @method
  115. * @param {ObjectId} id The id of the file doc
  116. * @param {Object} [options=null] Optional settings.
  117. * @param {Number} [options.start=null] Optional 0-based offset in bytes to start streaming from
  118. * @param {Number} [options.end=null] Optional 0-based offset in bytes to stop streaming before
  119. * @return {GridFSBucketReadStream}
  120. */
  121. GridFSBucket.prototype.openDownloadStream = function(id, options) {
  122. var filter = { _id: id };
  123. options = {
  124. start: options && options.start,
  125. end: options && options.end
  126. };
  127. return new GridFSBucketReadStream(
  128. this.s._chunksCollection,
  129. this.s._filesCollection,
  130. this.s.options.readPreference,
  131. filter,
  132. options
  133. );
  134. };
  135. /**
  136. * Deletes a file with the given id
  137. * @method
  138. * @param {ObjectId} id The id of the file doc
  139. * @param {GridFSBucket~errorCallback} [callback]
  140. */
  141. GridFSBucket.prototype.delete = function(id, callback) {
  142. return executeOperation(this.s.db.s.topology, _delete, [this, id, callback], {
  143. skipSessions: true
  144. });
  145. };
  146. /**
  147. * @ignore
  148. */
  149. function _delete(_this, id, callback) {
  150. _this.s._filesCollection.deleteOne({ _id: id }, function(error, res) {
  151. if (error) {
  152. return callback(error);
  153. }
  154. _this.s._chunksCollection.deleteMany({ files_id: id }, function(error) {
  155. if (error) {
  156. return callback(error);
  157. }
  158. // Delete orphaned chunks before returning FileNotFound
  159. if (!res.result.n) {
  160. var errmsg = 'FileNotFound: no file with id ' + id + ' found';
  161. return callback(new Error(errmsg));
  162. }
  163. callback();
  164. });
  165. });
  166. }
  167. /**
  168. * Convenience wrapper around find on the files collection
  169. * @method
  170. * @param {Object} filter
  171. * @param {Object} [options=null] Optional settings for cursor
  172. * @param {number} [options.batchSize=null] Optional batch size for cursor
  173. * @param {number} [options.limit=null] Optional limit for cursor
  174. * @param {number} [options.maxTimeMS=null] Optional maxTimeMS for cursor
  175. * @param {boolean} [options.noCursorTimeout=null] Optionally set cursor's `noCursorTimeout` flag
  176. * @param {number} [options.skip=null] Optional skip for cursor
  177. * @param {object} [options.sort=null] Optional sort for cursor
  178. * @return {Cursor}
  179. */
  180. GridFSBucket.prototype.find = function(filter, options) {
  181. filter = filter || {};
  182. options = options || {};
  183. var cursor = this.s._filesCollection.find(filter);
  184. if (options.batchSize != null) {
  185. cursor.batchSize(options.batchSize);
  186. }
  187. if (options.limit != null) {
  188. cursor.limit(options.limit);
  189. }
  190. if (options.maxTimeMS != null) {
  191. cursor.maxTimeMS(options.maxTimeMS);
  192. }
  193. if (options.noCursorTimeout != null) {
  194. cursor.addCursorFlag('noCursorTimeout', options.noCursorTimeout);
  195. }
  196. if (options.skip != null) {
  197. cursor.skip(options.skip);
  198. }
  199. if (options.sort != null) {
  200. cursor.sort(options.sort);
  201. }
  202. return cursor;
  203. };
  204. /**
  205. * Returns a readable stream (GridFSBucketReadStream) for streaming the
  206. * file with the given name from GridFS. If there are multiple files with
  207. * the same name, this will stream the most recent file with the given name
  208. * (as determined by the `uploadDate` field). You can set the `revision`
  209. * option to change this behavior.
  210. * @method
  211. * @param {String} filename The name of the file to stream
  212. * @param {Object} [options=null] Optional settings
  213. * @param {number} [options.revision=-1] The revision number relative to the oldest file with the given filename. 0 gets you the oldest file, 1 gets you the 2nd oldest, -1 gets you the newest.
  214. * @param {Number} [options.start=null] Optional 0-based offset in bytes to start streaming from
  215. * @param {Number} [options.end=null] Optional 0-based offset in bytes to stop streaming before
  216. * @return {GridFSBucketReadStream}
  217. */
  218. GridFSBucket.prototype.openDownloadStreamByName = function(filename, options) {
  219. var sort = { uploadDate: -1 };
  220. var skip = null;
  221. if (options && options.revision != null) {
  222. if (options.revision >= 0) {
  223. sort = { uploadDate: 1 };
  224. skip = options.revision;
  225. } else {
  226. skip = -options.revision - 1;
  227. }
  228. }
  229. var filter = { filename: filename };
  230. options = {
  231. sort: sort,
  232. skip: skip,
  233. start: options && options.start,
  234. end: options && options.end
  235. };
  236. return new GridFSBucketReadStream(
  237. this.s._chunksCollection,
  238. this.s._filesCollection,
  239. this.s.options.readPreference,
  240. filter,
  241. options
  242. );
  243. };
  244. /**
  245. * Renames the file with the given _id to the given string
  246. * @method
  247. * @param {ObjectId} id the id of the file to rename
  248. * @param {String} filename new name for the file
  249. * @param {GridFSBucket~errorCallback} [callback]
  250. */
  251. GridFSBucket.prototype.rename = function(id, filename, callback) {
  252. return executeOperation(this.s.db.s.topology, _rename, [this, id, filename, callback], {
  253. skipSessions: true
  254. });
  255. };
  256. /**
  257. * @ignore
  258. */
  259. function _rename(_this, id, filename, callback) {
  260. var filter = { _id: id };
  261. var update = { $set: { filename: filename } };
  262. _this.s._filesCollection.updateOne(filter, update, function(error, res) {
  263. if (error) {
  264. return callback(error);
  265. }
  266. if (!res.result.n) {
  267. return callback(toError('File with id ' + id + ' not found'));
  268. }
  269. callback();
  270. });
  271. }
  272. /**
  273. * Removes this bucket's files collection, followed by its chunks collection.
  274. * @method
  275. * @param {GridFSBucket~errorCallback} [callback]
  276. */
  277. GridFSBucket.prototype.drop = function(callback) {
  278. return executeOperation(this.s.db.s.topology, _drop, [this, callback], {
  279. skipSessions: true
  280. });
  281. };
  282. /**
  283. * @ignore
  284. */
  285. function _drop(_this, callback) {
  286. _this.s._filesCollection.drop(function(error) {
  287. if (error) {
  288. return callback(error);
  289. }
  290. _this.s._chunksCollection.drop(function(error) {
  291. if (error) {
  292. return callback(error);
  293. }
  294. return callback();
  295. });
  296. });
  297. }
  298. /**
  299. * Callback format for all GridFSBucket methods that can accept a callback.
  300. * @callback GridFSBucket~errorCallback
  301. * @param {MongoError} error An error instance representing any errors that occurred
  302. */