Source: lib/gridfs-stream/upload.js

  1. 'use strict';
  2. var core = require('mongodb-core');
  3. var crypto = require('crypto');
  4. var stream = require('stream');
  5. var util = require('util');
  6. var ERROR_NAMESPACE_NOT_FOUND = 26;
  7. module.exports = GridFSBucketWriteStream;
  8. /**
  9. * A writable stream that enables you to write buffers to GridFS.
  10. *
  11. * Do not instantiate this class directly. Use `openUploadStream()` instead.
  12. *
  13. * @class
  14. * @param {GridFSBucket} bucket Handle for this stream's corresponding bucket
  15. * @param {string} filename The value of the 'filename' key in the files doc
  16. * @param {object} [options=null] Optional settings.
  17. * @param {string|number|object} [options.id=null] Custom file id for the GridFS file.
  18. * @param {number} [options.chunkSizeBytes=null] The chunk size to use, in bytes
  19. * @param {number} [options.w=null] The write concern
  20. * @param {number} [options.wtimeout=null] The write concern timeout
  21. * @param {number} [options.j=null] The journal write concern
  22. * @fires GridFSBucketWriteStream#error
  23. * @fires GridFSBucketWriteStream#finish
  24. * @return {GridFSBucketWriteStream} a GridFSBucketWriteStream instance.
  25. */
  26. function GridFSBucketWriteStream(bucket, filename, options) {
  27. options = options || {};
  28. this.bucket = bucket;
  29. this.chunks = bucket.s._chunksCollection;
  30. this.filename = filename;
  31. this.files = bucket.s._filesCollection;
  32. this.options = options;
  33. // Signals the write is all done
  34. this.done = false;
  35. this.id = options.id ? options.id : core.BSON.ObjectId();
  36. this.chunkSizeBytes = this.options.chunkSizeBytes;
  37. this.bufToStore = new Buffer(this.chunkSizeBytes);
  38. this.length = 0;
  39. this.md5 = crypto.createHash('md5');
  40. this.n = 0;
  41. this.pos = 0;
  42. this.state = {
  43. streamEnd: false,
  44. outstandingRequests: 0,
  45. errored: false,
  46. aborted: false,
  47. promiseLibrary: this.bucket.s.promiseLibrary
  48. };
  49. if (!this.bucket.s.calledOpenUploadStream) {
  50. this.bucket.s.calledOpenUploadStream = true;
  51. var _this = this;
  52. checkIndexes(this, function() {
  53. _this.bucket.s.checkedIndexes = true;
  54. _this.bucket.emit('index');
  55. });
  56. }
  57. }
  58. util.inherits(GridFSBucketWriteStream, stream.Writable);
  59. /**
  60. * An error occurred
  61. *
  62. * @event GridFSBucketWriteStream#error
  63. * @type {Error}
  64. */
  65. /**
  66. * `end()` was called and the write stream successfully wrote the file
  67. * metadata and all the chunks to MongoDB.
  68. *
  69. * @event GridFSBucketWriteStream#finish
  70. * @type {object}
  71. */
  72. /**
  73. * Write a buffer to the stream.
  74. *
  75. * @method
  76. * @param {Buffer} chunk Buffer to write
  77. * @param {String} encoding Optional encoding for the buffer
  78. * @param {Function} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
  79. * @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise.
  80. */
  81. GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) {
  82. var _this = this;
  83. return waitForIndexes(this, function() {
  84. return doWrite(_this, chunk, encoding, callback);
  85. });
  86. };
  87. /**
  88. * Places this write stream into an aborted state (all future writes fail)
  89. * and deletes all chunks that have already been written.
  90. *
  91. * @method
  92. * @param {GridFSBucket~errorCallback} callback called when chunks are successfully removed or error occurred
  93. * @return {Promise} if no callback specified
  94. */
  95. GridFSBucketWriteStream.prototype.abort = function(callback) {
  96. if (this.state.streamEnd) {
  97. var error = new Error('Cannot abort a stream that has already completed');
  98. if (typeof callback === 'function') {
  99. return callback(error);
  100. }
  101. return this.state.promiseLibrary.reject(error);
  102. }
  103. if (this.state.aborted) {
  104. error = new Error('Cannot call abort() on a stream twice');
  105. if (typeof callback === 'function') {
  106. return callback(error);
  107. }
  108. return this.state.promiseLibrary.reject(error);
  109. }
  110. this.state.aborted = true;
  111. this.chunks.deleteMany({ files_id: this.id }, function(error) {
  112. if (typeof callback === 'function') callback(error);
  113. });
  114. };
  115. /**
  116. * Tells the stream that no more data will be coming in. The stream will
  117. * persist the remaining data to MongoDB, write the files document, and
  118. * then emit a 'finish' event.
  119. *
  120. * @method
  121. * @param {Buffer} chunk Buffer to write
  122. * @param {String} encoding Optional encoding for the buffer
  123. * @param {Function} callback Function to call when all files and chunks have been persisted to MongoDB
  124. */
  125. GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) {
  126. var _this = this;
  127. if (typeof chunk === 'function') {
  128. (callback = chunk), (chunk = null), (encoding = null);
  129. } else if (typeof encoding === 'function') {
  130. (callback = encoding), (encoding = null);
  131. }
  132. if (checkAborted(this, callback)) {
  133. return;
  134. }
  135. this.state.streamEnd = true;
  136. if (callback) {
  137. this.once('finish', function(result) {
  138. callback(null, result);
  139. });
  140. }
  141. if (!chunk) {
  142. waitForIndexes(this, function() {
  143. writeRemnant(_this);
  144. });
  145. return;
  146. }
  147. this.write(chunk, encoding, function() {
  148. writeRemnant(_this);
  149. });
  150. };
  151. /**
  152. * @ignore
  153. */
  154. function __handleError(_this, error, callback) {
  155. if (_this.state.errored) {
  156. return;
  157. }
  158. _this.state.errored = true;
  159. if (callback) {
  160. return callback(error);
  161. }
  162. _this.emit('error', error);
  163. }
  164. /**
  165. * @ignore
  166. */
  167. function createChunkDoc(filesId, n, data) {
  168. return {
  169. _id: core.BSON.ObjectId(),
  170. files_id: filesId,
  171. n: n,
  172. data: data
  173. };
  174. }
  175. /**
  176. * @ignore
  177. */
  178. function checkChunksIndex(_this, callback) {
  179. _this.chunks.listIndexes().toArray(function(error, indexes) {
  180. if (error) {
  181. // Collection doesn't exist so create index
  182. if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
  183. var index = { files_id: 1, n: 1 };
  184. _this.chunks.createIndex(index, { background: false, unique: true }, function(error) {
  185. if (error) {
  186. return callback(error);
  187. }
  188. callback();
  189. });
  190. return;
  191. }
  192. return callback(error);
  193. }
  194. var hasChunksIndex = false;
  195. indexes.forEach(function(index) {
  196. if (index.key) {
  197. var keys = Object.keys(index.key);
  198. if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
  199. hasChunksIndex = true;
  200. }
  201. }
  202. });
  203. if (hasChunksIndex) {
  204. callback();
  205. } else {
  206. index = { files_id: 1, n: 1 };
  207. var indexOptions = getWriteOptions(_this);
  208. indexOptions.background = false;
  209. indexOptions.unique = true;
  210. _this.chunks.createIndex(index, indexOptions, function(error) {
  211. if (error) {
  212. return callback(error);
  213. }
  214. callback();
  215. });
  216. }
  217. });
  218. }
  219. /**
  220. * @ignore
  221. */
  222. function checkDone(_this, callback) {
  223. if (_this.done) return true;
  224. if (_this.state.streamEnd && _this.state.outstandingRequests === 0 && !_this.state.errored) {
  225. // Set done so we dont' trigger duplicate createFilesDoc
  226. _this.done = true;
  227. // Create a new files doc
  228. var filesDoc = createFilesDoc(
  229. _this.id,
  230. _this.length,
  231. _this.chunkSizeBytes,
  232. _this.md5.digest('hex'),
  233. _this.filename,
  234. _this.options.contentType,
  235. _this.options.aliases,
  236. _this.options.metadata
  237. );
  238. if (checkAborted(_this, callback)) {
  239. return false;
  240. }
  241. _this.files.insert(filesDoc, getWriteOptions(_this), function(error) {
  242. if (error) {
  243. return __handleError(_this, error, callback);
  244. }
  245. _this.emit('finish', filesDoc);
  246. });
  247. return true;
  248. }
  249. return false;
  250. }
  251. /**
  252. * @ignore
  253. */
  254. function checkIndexes(_this, callback) {
  255. _this.files.findOne({}, { _id: 1 }, function(error, doc) {
  256. if (error) {
  257. return callback(error);
  258. }
  259. if (doc) {
  260. return callback();
  261. }
  262. _this.files.listIndexes().toArray(function(error, indexes) {
  263. if (error) {
  264. // Collection doesn't exist so create index
  265. if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
  266. var index = { filename: 1, uploadDate: 1 };
  267. _this.files.createIndex(index, { background: false }, function(error) {
  268. if (error) {
  269. return callback(error);
  270. }
  271. checkChunksIndex(_this, callback);
  272. });
  273. return;
  274. }
  275. return callback(error);
  276. }
  277. var hasFileIndex = false;
  278. indexes.forEach(function(index) {
  279. var keys = Object.keys(index.key);
  280. if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
  281. hasFileIndex = true;
  282. }
  283. });
  284. if (hasFileIndex) {
  285. checkChunksIndex(_this, callback);
  286. } else {
  287. index = { filename: 1, uploadDate: 1 };
  288. var indexOptions = getWriteOptions(_this);
  289. indexOptions.background = false;
  290. _this.files.createIndex(index, indexOptions, function(error) {
  291. if (error) {
  292. return callback(error);
  293. }
  294. checkChunksIndex(_this, callback);
  295. });
  296. }
  297. });
  298. });
  299. }
  300. /**
  301. * @ignore
  302. */
  303. function createFilesDoc(_id, length, chunkSize, md5, filename, contentType, aliases, metadata) {
  304. var ret = {
  305. _id: _id,
  306. length: length,
  307. chunkSize: chunkSize,
  308. uploadDate: new Date(),
  309. md5: md5,
  310. filename: filename
  311. };
  312. if (contentType) {
  313. ret.contentType = contentType;
  314. }
  315. if (aliases) {
  316. ret.aliases = aliases;
  317. }
  318. if (metadata) {
  319. ret.metadata = metadata;
  320. }
  321. return ret;
  322. }
  323. /**
  324. * @ignore
  325. */
  326. function doWrite(_this, chunk, encoding, callback) {
  327. if (checkAborted(_this, callback)) {
  328. return false;
  329. }
  330. var inputBuf = Buffer.isBuffer(chunk) ? chunk : new Buffer(chunk, encoding);
  331. _this.length += inputBuf.length;
  332. // Input is small enough to fit in our buffer
  333. if (_this.pos + inputBuf.length < _this.chunkSizeBytes) {
  334. inputBuf.copy(_this.bufToStore, _this.pos);
  335. _this.pos += inputBuf.length;
  336. callback && callback();
  337. // Note that we reverse the typical semantics of write's return value
  338. // to be compatible with node's `.pipe()` function.
  339. // True means client can keep writing.
  340. return true;
  341. }
  342. // Otherwise, buffer is too big for current chunk, so we need to flush
  343. // to MongoDB.
  344. var inputBufRemaining = inputBuf.length;
  345. var spaceRemaining = _this.chunkSizeBytes - _this.pos;
  346. var numToCopy = Math.min(spaceRemaining, inputBuf.length);
  347. var outstandingRequests = 0;
  348. while (inputBufRemaining > 0) {
  349. var inputBufPos = inputBuf.length - inputBufRemaining;
  350. inputBuf.copy(_this.bufToStore, _this.pos, inputBufPos, inputBufPos + numToCopy);
  351. _this.pos += numToCopy;
  352. spaceRemaining -= numToCopy;
  353. if (spaceRemaining === 0) {
  354. _this.md5.update(_this.bufToStore);
  355. var doc = createChunkDoc(_this.id, _this.n, _this.bufToStore);
  356. ++_this.state.outstandingRequests;
  357. ++outstandingRequests;
  358. if (checkAborted(_this, callback)) {
  359. return false;
  360. }
  361. _this.chunks.insert(doc, getWriteOptions(_this), function(error) {
  362. if (error) {
  363. return __handleError(_this, error);
  364. }
  365. --_this.state.outstandingRequests;
  366. --outstandingRequests;
  367. if (!outstandingRequests) {
  368. _this.emit('drain', doc);
  369. callback && callback();
  370. checkDone(_this);
  371. }
  372. });
  373. spaceRemaining = _this.chunkSizeBytes;
  374. _this.pos = 0;
  375. ++_this.n;
  376. }
  377. inputBufRemaining -= numToCopy;
  378. numToCopy = Math.min(spaceRemaining, inputBufRemaining);
  379. }
  380. // Note that we reverse the typical semantics of write's return value
  381. // to be compatible with node's `.pipe()` function.
  382. // False means the client should wait for the 'drain' event.
  383. return false;
  384. }
  385. /**
  386. * @ignore
  387. */
  388. function getWriteOptions(_this) {
  389. var obj = {};
  390. if (_this.options.writeConcern) {
  391. obj.w = _this.options.writeConcern.w;
  392. obj.wtimeout = _this.options.writeConcern.wtimeout;
  393. obj.j = _this.options.writeConcern.j;
  394. }
  395. return obj;
  396. }
  397. /**
  398. * @ignore
  399. */
  400. function waitForIndexes(_this, callback) {
  401. if (_this.bucket.s.checkedIndexes) {
  402. return callback(false);
  403. }
  404. _this.bucket.once('index', function() {
  405. callback(true);
  406. });
  407. return true;
  408. }
  409. /**
  410. * @ignore
  411. */
  412. function writeRemnant(_this, callback) {
  413. // Buffer is empty, so don't bother to insert
  414. if (_this.pos === 0) {
  415. return checkDone(_this, callback);
  416. }
  417. ++_this.state.outstandingRequests;
  418. // Create a new buffer to make sure the buffer isn't bigger than it needs
  419. // to be.
  420. var remnant = new Buffer(_this.pos);
  421. _this.bufToStore.copy(remnant, 0, 0, _this.pos);
  422. _this.md5.update(remnant);
  423. var doc = createChunkDoc(_this.id, _this.n, remnant);
  424. // If the stream was aborted, do not write remnant
  425. if (checkAborted(_this, callback)) {
  426. return false;
  427. }
  428. _this.chunks.insert(doc, getWriteOptions(_this), function(error) {
  429. if (error) {
  430. return __handleError(_this, error);
  431. }
  432. --_this.state.outstandingRequests;
  433. checkDone(_this);
  434. });
  435. }
  436. /**
  437. * @ignore
  438. */
  439. function checkAborted(_this, callback) {
  440. if (_this.state.aborted) {
  441. if (typeof callback === 'function') {
  442. callback(new Error('this stream has been aborted'));
  443. }
  444. return true;
  445. }
  446. return false;
  447. }