Source: lib/gridfs-stream/upload.js

  1. var core = require('mongodb-core');
  2. var crypto = require('crypto');
  3. var stream = require('stream');
  4. var util = require('util');
  5. var ERROR_NAMESPACE_NOT_FOUND = 26;
  6. module.exports = GridFSBucketWriteStream;
  7. /**
  8. * A writable stream that enables you to write buffers to GridFS.
  9. *
  10. * Do not instantiate this class directly. Use `openUploadStream()` instead.
  11. *
  12. * @class
  13. * @param {GridFSBucket} bucket Handle for this stream's corresponding bucket
  14. * @param {string} filename The value of the 'filename' key in the files doc
  15. * @param {object} [options=null] Optional settings.
  16. * @param {string|number|object} [options.id=null] Custom file id for the GridFS file.
  17. * @param {number} [options.chunkSizeBytes=null] The chunk size to use, in bytes
  18. * @param {number} [options.w=null] The write concern
  19. * @param {number} [options.wtimeout=null] The write concern timeout
  20. * @param {number} [options.j=null] The journal write concern
  21. * @fires GridFSBucketWriteStream#error
  22. * @fires GridFSBucketWriteStream#finish
  23. * @return {GridFSBucketWriteStream} a GridFSBucketWriteStream instance.
  24. */
  25. function GridFSBucketWriteStream(bucket, filename, options) {
  26. options = options || {};
  27. this.bucket = bucket;
  28. this.chunks = bucket.s._chunksCollection;
  29. this.filename = filename;
  30. this.files = bucket.s._filesCollection;
  31. this.options = options;
  32. // Signals the write is all done
  33. this.done = false;
  34. this.id = options.id ? options.id : core.BSON.ObjectId();
  35. this.chunkSizeBytes = this.options.chunkSizeBytes;
  36. this.bufToStore = new Buffer(this.chunkSizeBytes);
  37. this.length = 0;
  38. this.md5 = crypto.createHash('md5');
  39. this.n = 0;
  40. this.pos = 0;
  41. this.state = {
  42. streamEnd: false,
  43. outstandingRequests: 0,
  44. errored: false,
  45. aborted: false,
  46. promiseLibrary: this.bucket.s.promiseLibrary
  47. };
  48. if (!this.bucket.s.calledOpenUploadStream) {
  49. this.bucket.s.calledOpenUploadStream = true;
  50. var _this = this;
  51. checkIndexes(this, function() {
  52. _this.bucket.s.checkedIndexes = true;
  53. _this.bucket.emit('index');
  54. });
  55. }
  56. }
  57. util.inherits(GridFSBucketWriteStream, stream.Writable);
  58. /**
  59. * An error occurred
  60. *
  61. * @event GridFSBucketWriteStream#error
  62. * @type {Error}
  63. */
  64. /**
  65. * `end()` was called and the write stream successfully wrote the file
  66. * metadata and all the chunks to MongoDB.
  67. *
  68. * @event GridFSBucketWriteStream#finish
  69. * @type {object}
  70. */
  71. /**
  72. * Write a buffer to the stream.
  73. *
  74. * @method
  75. * @param {Buffer} chunk Buffer to write
  76. * @param {String} encoding Optional encoding for the buffer
  77. * @param {Function} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
  78. * @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise.
  79. */
  80. GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) {
  81. var _this = this;
  82. return waitForIndexes(this, function() {
  83. return doWrite(_this, chunk, encoding, callback);
  84. });
  85. };
  86. /**
  87. * Places this write stream into an aborted state (all future writes fail)
  88. * and deletes all chunks that have already been written.
  89. *
  90. * @method
  91. * @param {GridFSBucket~errorCallback} callback called when chunks are successfully removed or error occurred
  92. * @return {Promise} if no callback specified
  93. */
  94. GridFSBucketWriteStream.prototype.abort = function(callback) {
  95. if (this.state.streamEnd) {
  96. var error = new Error('Cannot abort a stream that has already completed');
  97. if (typeof callback == 'function') {
  98. return callback(error);
  99. }
  100. return this.state.promiseLibrary.reject(error);
  101. }
  102. if (this.state.aborted) {
  103. error = new Error('Cannot call abort() on a stream twice');
  104. if (typeof callback == 'function') {
  105. return callback(error);
  106. }
  107. return this.state.promiseLibrary.reject(error);
  108. }
  109. this.state.aborted = true;
  110. this.chunks.deleteMany({ files_id: this.id }, function(error) {
  111. if(typeof callback == 'function') callback(error);
  112. });
  113. };
  114. /**
  115. * Tells the stream that no more data will be coming in. The stream will
  116. * persist the remaining data to MongoDB, write the files document, and
  117. * then emit a 'finish' event.
  118. *
  119. * @method
  120. * @param {Buffer} chunk Buffer to write
  121. * @param {String} encoding Optional encoding for the buffer
  122. * @param {Function} callback Function to call when all files and chunks have been persisted to MongoDB
  123. */
  124. GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) {
  125. var _this = this;
  126. if(typeof chunk == 'function') {
  127. callback = chunk, chunk = null, encoding = null;
  128. } else if(typeof encoding == 'function') {
  129. callback = encoding, encoding = null;
  130. }
  131. if (checkAborted(this, callback)) {
  132. return;
  133. }
  134. this.state.streamEnd = true;
  135. if (callback) {
  136. this.once('finish', function(result) {
  137. callback(null, result);
  138. });
  139. }
  140. if (!chunk) {
  141. waitForIndexes(this, function() {
  142. writeRemnant(_this);
  143. });
  144. return;
  145. }
  146. this.write(chunk, encoding, function() {
  147. writeRemnant(_this);
  148. });
  149. };
  150. /**
  151. * @ignore
  152. */
  153. function __handleError(_this, error, callback) {
  154. if (_this.state.errored) {
  155. return;
  156. }
  157. _this.state.errored = true;
  158. if (callback) {
  159. return callback(error);
  160. }
  161. _this.emit('error', error);
  162. }
  163. /**
  164. * @ignore
  165. */
  166. function createChunkDoc(filesId, n, data) {
  167. return {
  168. _id: core.BSON.ObjectId(),
  169. files_id: filesId,
  170. n: n,
  171. data: data
  172. };
  173. }
  174. /**
  175. * @ignore
  176. */
  177. function checkChunksIndex(_this, callback) {
  178. _this.chunks.listIndexes().toArray(function(error, indexes) {
  179. if (error) {
  180. // Collection doesn't exist so create index
  181. if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
  182. var index = { files_id: 1, n: 1 };
  183. _this.chunks.createIndex(index, { background: false, unique: true }, function(error) {
  184. if (error) {
  185. return callback(error);
  186. }
  187. callback();
  188. });
  189. return;
  190. }
  191. return callback(error);
  192. }
  193. var hasChunksIndex = false;
  194. indexes.forEach(function(index) {
  195. if (index.key) {
  196. var keys = Object.keys(index.key);
  197. if (keys.length === 2 && index.key.files_id === 1 &&
  198. index.key.n === 1) {
  199. hasChunksIndex = true;
  200. }
  201. }
  202. });
  203. if (hasChunksIndex) {
  204. callback();
  205. } else {
  206. index = { files_id: 1, n: 1 };
  207. var indexOptions = getWriteOptions(_this);
  208. indexOptions.background = false;
  209. indexOptions.unique = true;
  210. _this.chunks.createIndex(index, indexOptions, function(error) {
  211. if (error) {
  212. return callback(error);
  213. }
  214. callback();
  215. });
  216. }
  217. });
  218. }
  219. /**
  220. * @ignore
  221. */
  222. function checkDone(_this, callback) {
  223. if(_this.done) return true;
  224. if (_this.state.streamEnd &&
  225. _this.state.outstandingRequests === 0 &&
  226. !_this.state.errored) {
  227. // Set done so we dont' trigger duplicate createFilesDoc
  228. _this.done = true;
  229. // Create a new files doc
  230. var filesDoc = createFilesDoc(_this.id, _this.length, _this.chunkSizeBytes,
  231. _this.md5.digest('hex'), _this.filename, _this.options.contentType,
  232. _this.options.aliases, _this.options.metadata);
  233. if (checkAborted(_this, callback)) {
  234. return false;
  235. }
  236. _this.files.insert(filesDoc, getWriteOptions(_this), function(error) {
  237. if (error) {
  238. return __handleError(_this, error, callback);
  239. }
  240. _this.emit('finish', filesDoc);
  241. });
  242. return true;
  243. }
  244. return false;
  245. }
  246. /**
  247. * @ignore
  248. */
  249. function checkIndexes(_this, callback) {
  250. _this.files.findOne({}, { _id: 1 }, function(error, doc) {
  251. if (error) {
  252. return callback(error);
  253. }
  254. if (doc) {
  255. return callback();
  256. }
  257. _this.files.listIndexes().toArray(function(error, indexes) {
  258. if (error) {
  259. // Collection doesn't exist so create index
  260. if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
  261. var index = { filename: 1, uploadDate: 1 };
  262. _this.files.createIndex(index, { background: false }, function(error) {
  263. if (error) {
  264. return callback(error);
  265. }
  266. checkChunksIndex(_this, callback);
  267. });
  268. return;
  269. }
  270. return callback(error);
  271. }
  272. var hasFileIndex = false;
  273. indexes.forEach(function(index) {
  274. var keys = Object.keys(index.key);
  275. if (keys.length === 2 && index.key.filename === 1 &&
  276. index.key.uploadDate === 1) {
  277. hasFileIndex = true;
  278. }
  279. });
  280. if (hasFileIndex) {
  281. checkChunksIndex(_this, callback);
  282. } else {
  283. index = { filename: 1, uploadDate: 1 };
  284. var indexOptions = getWriteOptions(_this);
  285. indexOptions.background = false;
  286. _this.files.createIndex(index, indexOptions, function(error) {
  287. if (error) {
  288. return callback(error);
  289. }
  290. checkChunksIndex(_this, callback);
  291. });
  292. }
  293. });
  294. });
  295. }
  296. /**
  297. * @ignore
  298. */
  299. function createFilesDoc(_id, length, chunkSize, md5, filename, contentType,
  300. aliases, metadata) {
  301. var ret = {
  302. _id: _id,
  303. length: length,
  304. chunkSize: chunkSize,
  305. uploadDate: new Date(),
  306. md5: md5,
  307. filename: filename
  308. };
  309. if (contentType) {
  310. ret.contentType = contentType;
  311. }
  312. if (aliases) {
  313. ret.aliases = aliases;
  314. }
  315. if (metadata) {
  316. ret.metadata = metadata;
  317. }
  318. return ret;
  319. }
  320. /**
  321. * @ignore
  322. */
  323. function doWrite(_this, chunk, encoding, callback) {
  324. if (checkAborted(_this, callback)) {
  325. return false;
  326. }
  327. var inputBuf = (Buffer.isBuffer(chunk)) ?
  328. chunk : new Buffer(chunk, encoding);
  329. _this.length += inputBuf.length;
  330. // Input is small enough to fit in our buffer
  331. if (_this.pos + inputBuf.length < _this.chunkSizeBytes) {
  332. inputBuf.copy(_this.bufToStore, _this.pos);
  333. _this.pos += inputBuf.length;
  334. callback && callback();
  335. // Note that we reverse the typical semantics of write's return value
  336. // to be compatible with node's `.pipe()` function.
  337. // True means client can keep writing.
  338. return true;
  339. }
  340. // Otherwise, buffer is too big for current chunk, so we need to flush
  341. // to MongoDB.
  342. var inputBufRemaining = inputBuf.length;
  343. var spaceRemaining = _this.chunkSizeBytes - _this.pos;
  344. var numToCopy = Math.min(spaceRemaining, inputBuf.length);
  345. var outstandingRequests = 0;
  346. while (inputBufRemaining > 0) {
  347. var inputBufPos = inputBuf.length - inputBufRemaining;
  348. inputBuf.copy(_this.bufToStore, _this.pos,
  349. inputBufPos, inputBufPos + numToCopy);
  350. _this.pos += numToCopy;
  351. spaceRemaining -= numToCopy;
  352. if (spaceRemaining === 0) {
  353. _this.md5.update(_this.bufToStore);
  354. var doc = createChunkDoc(_this.id, _this.n, _this.bufToStore);
  355. ++_this.state.outstandingRequests;
  356. ++outstandingRequests;
  357. if (checkAborted(_this, callback)) {
  358. return false;
  359. }
  360. _this.chunks.insert(doc, getWriteOptions(_this), function(error) {
  361. if (error) {
  362. return __handleError(_this, error);
  363. }
  364. --_this.state.outstandingRequests;
  365. --outstandingRequests;
  366. if (!outstandingRequests) {
  367. _this.emit('drain', doc);
  368. callback && callback();
  369. checkDone(_this);
  370. }
  371. });
  372. spaceRemaining = _this.chunkSizeBytes;
  373. _this.pos = 0;
  374. ++_this.n;
  375. }
  376. inputBufRemaining -= numToCopy;
  377. numToCopy = Math.min(spaceRemaining, inputBufRemaining);
  378. }
  379. // Note that we reverse the typical semantics of write's return value
  380. // to be compatible with node's `.pipe()` function.
  381. // False means the client should wait for the 'drain' event.
  382. return false;
  383. }
  384. /**
  385. * @ignore
  386. */
  387. function getWriteOptions(_this) {
  388. var obj = {};
  389. if (_this.options.writeConcern) {
  390. obj.w = _this.options.writeConcern.w;
  391. obj.wtimeout = _this.options.writeConcern.wtimeout;
  392. obj.j = _this.options.writeConcern.j;
  393. }
  394. return obj;
  395. }
  396. /**
  397. * @ignore
  398. */
  399. function waitForIndexes(_this, callback) {
  400. if (_this.bucket.s.checkedIndexes) {
  401. return callback(false);
  402. }
  403. _this.bucket.once('index', function() {
  404. callback(true);
  405. });
  406. return true;
  407. }
  408. /**
  409. * @ignore
  410. */
  411. function writeRemnant(_this, callback) {
  412. // Buffer is empty, so don't bother to insert
  413. if (_this.pos === 0) {
  414. return checkDone(_this, callback);
  415. }
  416. ++_this.state.outstandingRequests;
  417. // Create a new buffer to make sure the buffer isn't bigger than it needs
  418. // to be.
  419. var remnant = new Buffer(_this.pos);
  420. _this.bufToStore.copy(remnant, 0, 0, _this.pos);
  421. _this.md5.update(remnant);
  422. var doc = createChunkDoc(_this.id, _this.n, remnant);
  423. // If the stream was aborted, do not write remnant
  424. if (checkAborted(_this, callback)) {
  425. return false;
  426. }
  427. _this.chunks.insert(doc, getWriteOptions(_this), function(error) {
  428. if (error) {
  429. return __handleError(_this, error);
  430. }
  431. --_this.state.outstandingRequests;
  432. checkDone(_this);
  433. });
  434. }
  435. /**
  436. * @ignore
  437. */
  438. function checkAborted(_this, callback) {
  439. if (_this.state.aborted) {
  440. if(typeof callback == 'function') {
  441. callback(new Error('this stream has been aborted'));
  442. }
  443. return true;
  444. }
  445. return false;
  446. }