Source: lib/core/sessions.js

  1. 'use strict';
  2. const retrieveBSON = require('./connection/utils').retrieveBSON;
  3. const EventEmitter = require('events');
  4. const BSON = retrieveBSON();
  5. const Binary = BSON.Binary;
  6. const uuidV4 = require('./utils').uuidV4;
  7. const MongoError = require('./error').MongoError;
  8. const isRetryableError = require('././error').isRetryableError;
  9. const MongoNetworkError = require('./error').MongoNetworkError;
  10. const MongoWriteConcernError = require('./error').MongoWriteConcernError;
  11. const Transaction = require('./transactions').Transaction;
  12. const TxnState = require('./transactions').TxnState;
  13. const isPromiseLike = require('./utils').isPromiseLike;
  14. const ReadPreference = require('./topologies/read_preference');
  15. const isTransactionCommand = require('./transactions').isTransactionCommand;
  16. const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
  17. const isSharded = require('./wireprotocol/shared').isSharded;
  18. const maxWireVersion = require('./utils').maxWireVersion;
  19. const minWireVersionForShardedTransactions = 8;
  20. function assertAlive(session, callback) {
  21. if (session.serverSession == null) {
  22. const error = new MongoError('Cannot use a session that has ended');
  23. if (typeof callback === 'function') {
  24. callback(error, null);
  25. return false;
  26. }
  27. throw error;
  28. }
  29. return true;
  30. }
  31. /**
  32. * Options to pass when creating a Client Session
  33. * @typedef {Object} SessionOptions
  34. * @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session
  35. * @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
  36. */
  37. /**
  38. * A BSON document reflecting the lsid of a {@link ClientSession}
  39. * @typedef {Object} SessionId
  40. */
  41. /**
  42. * A class representing a client session on the server
  43. * WARNING: not meant to be instantiated directly.
  44. * @class
  45. * @hideconstructor
  46. */
  47. class ClientSession extends EventEmitter {
  48. /**
  49. * Create a client session.
  50. * WARNING: not meant to be instantiated directly
  51. *
  52. * @param {Topology} topology The current client's topology (Internal Class)
  53. * @param {ServerSessionPool} sessionPool The server session pool (Internal Class)
  54. * @param {SessionOptions} [options] Optional settings
  55. * @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver
  56. */
  57. constructor(topology, sessionPool, options, clientOptions) {
  58. super();
  59. if (topology == null) {
  60. throw new Error('ClientSession requires a topology');
  61. }
  62. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  63. throw new Error('ClientSession requires a ServerSessionPool');
  64. }
  65. options = options || {};
  66. clientOptions = clientOptions || {};
  67. this.topology = topology;
  68. this.sessionPool = sessionPool;
  69. this.hasEnded = false;
  70. this.serverSession = sessionPool.acquire();
  71. this.clientOptions = clientOptions;
  72. this.supports = {
  73. causalConsistency:
  74. typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true
  75. };
  76. this.clusterTime = options.initialClusterTime;
  77. this.operationTime = null;
  78. this.explicit = !!options.explicit;
  79. this.owner = options.owner;
  80. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  81. this.transaction = new Transaction();
  82. }
  83. /**
  84. * The server id associated with this session
  85. * @type {SessionId}
  86. */
  87. get id() {
  88. return this.serverSession.id;
  89. }
  90. /**
  91. * Ends this session on the server
  92. *
  93. * @param {Object} [options] Optional settings. Currently reserved for future use
  94. * @param {Function} [callback] Optional callback for completion of this operation
  95. */
  96. endSession(options, callback) {
  97. if (typeof options === 'function') (callback = options), (options = {});
  98. options = options || {};
  99. if (this.hasEnded) {
  100. if (typeof callback === 'function') callback(null, null);
  101. return;
  102. }
  103. if (this.serverSession && this.inTransaction()) {
  104. this.abortTransaction(); // pass in callback?
  105. }
  106. // mark the session as ended, and emit a signal
  107. this.hasEnded = true;
  108. this.emit('ended', this);
  109. // release the server session back to the pool
  110. this.sessionPool.release(this.serverSession);
  111. this.serverSession = null;
  112. // spec indicates that we should ignore all errors for `endSessions`
  113. if (typeof callback === 'function') callback(null, null);
  114. }
  115. /**
  116. * Advances the operationTime for a ClientSession.
  117. *
  118. * @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
  119. */
  120. advanceOperationTime(operationTime) {
  121. if (this.operationTime == null) {
  122. this.operationTime = operationTime;
  123. return;
  124. }
  125. if (operationTime.greaterThan(this.operationTime)) {
  126. this.operationTime = operationTime;
  127. }
  128. }
  129. /**
  130. * Used to determine if this session equals another
  131. * @param {ClientSession} session
  132. * @return {boolean} true if the sessions are equal
  133. */
  134. equals(session) {
  135. if (!(session instanceof ClientSession)) {
  136. return false;
  137. }
  138. return this.id.id.buffer.equals(session.id.id.buffer);
  139. }
  140. /**
  141. * Increment the transaction number on the internal ServerSession
  142. */
  143. incrementTransactionNumber() {
  144. this.serverSession.txnNumber++;
  145. }
  146. /**
  147. * @returns {boolean} whether this session is currently in a transaction or not
  148. */
  149. inTransaction() {
  150. return this.transaction.isActive;
  151. }
  152. /**
  153. * Starts a new transaction with the given options.
  154. *
  155. * @param {TransactionOptions} options Options for the transaction
  156. */
  157. startTransaction(options) {
  158. assertAlive(this);
  159. if (this.inTransaction()) {
  160. throw new MongoError('Transaction already in progress');
  161. }
  162. const topologyMaxWireVersion = maxWireVersion(this.topology);
  163. if (
  164. isSharded(this.topology) &&
  165. topologyMaxWireVersion != null &&
  166. topologyMaxWireVersion < minWireVersionForShardedTransactions
  167. ) {
  168. throw new MongoError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  169. }
  170. // increment txnNumber
  171. this.incrementTransactionNumber();
  172. // create transaction state
  173. this.transaction = new Transaction(
  174. Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
  175. );
  176. this.transaction.transition(TxnState.STARTING_TRANSACTION);
  177. }
  178. /**
  179. * Commits the currently active transaction in this session.
  180. *
  181. * @param {Function} [callback] optional callback for completion of this operation
  182. * @return {Promise} A promise is returned if no callback is provided
  183. */
  184. commitTransaction(callback) {
  185. if (typeof callback === 'function') {
  186. endTransaction(this, 'commitTransaction', callback);
  187. return;
  188. }
  189. return new Promise((resolve, reject) => {
  190. endTransaction(
  191. this,
  192. 'commitTransaction',
  193. (err, reply) => (err ? reject(err) : resolve(reply))
  194. );
  195. });
  196. }
  197. /**
  198. * Aborts the currently active transaction in this session.
  199. *
  200. * @param {Function} [callback] optional callback for completion of this operation
  201. * @return {Promise} A promise is returned if no callback is provided
  202. */
  203. abortTransaction(callback) {
  204. if (typeof callback === 'function') {
  205. endTransaction(this, 'abortTransaction', callback);
  206. return;
  207. }
  208. return new Promise((resolve, reject) => {
  209. endTransaction(
  210. this,
  211. 'abortTransaction',
  212. (err, reply) => (err ? reject(err) : resolve(reply))
  213. );
  214. });
  215. }
  216. /**
  217. * This is here to ensure that ClientSession is never serialized to BSON.
  218. * @ignore
  219. */
  220. toBSON() {
  221. throw new Error('ClientSession cannot be serialized to BSON.');
  222. }
  223. /**
  224. * A user provided function to be run within a transaction
  225. *
  226. * @callback WithTransactionCallback
  227. * @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
  228. * @returns {Promise} The resulting Promise of operations run within this transaction
  229. */
  230. /**
  231. * Runs a provided lambda within a transaction, retrying either the commit operation
  232. * or entire transaction as needed (and when the error permits) to better ensure that
  233. * the transaction can complete successfully.
  234. *
  235. * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
  236. * return a Promise will result in undefined behavior.
  237. *
  238. * @param {WithTransactionCallback} fn
  239. * @param {TransactionOptions} [options] Optional settings for the transaction
  240. */
  241. withTransaction(fn, options) {
  242. const startTime = Date.now();
  243. return attemptTransaction(this, startTime, fn, options);
  244. }
  245. }
  246. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  247. const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
  248. const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
  249. const MAX_TIME_MS_EXPIRED_CODE = 50;
  250. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  251. 'CannotSatisfyWriteConcern',
  252. 'UnknownReplWriteConcern',
  253. 'UnsatisfiableWriteConcern'
  254. ]);
  255. function hasNotTimedOut(startTime, max) {
  256. return Date.now() - startTime < max;
  257. }
  258. function isUnknownTransactionCommitResult(err) {
  259. return (
  260. isMaxTimeMSExpiredError(err) ||
  261. (!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
  262. err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
  263. err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE)
  264. );
  265. }
  266. function isMaxTimeMSExpiredError(err) {
  267. return (
  268. err.code === MAX_TIME_MS_EXPIRED_CODE ||
  269. (err.writeConcernError && err.writeConcernError.code === MAX_TIME_MS_EXPIRED_CODE)
  270. );
  271. }
  272. function attemptTransactionCommit(session, startTime, fn, options) {
  273. return session.commitTransaction().catch(err => {
  274. if (
  275. err instanceof MongoError &&
  276. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  277. !isMaxTimeMSExpiredError(err)
  278. ) {
  279. if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
  280. return attemptTransactionCommit(session, startTime, fn, options);
  281. }
  282. if (err.hasErrorLabel('TransientTransactionError')) {
  283. return attemptTransaction(session, startTime, fn, options);
  284. }
  285. }
  286. throw err;
  287. });
  288. }
  289. const USER_EXPLICIT_TXN_END_STATES = new Set([
  290. TxnState.NO_TRANSACTION,
  291. TxnState.TRANSACTION_COMMITTED,
  292. TxnState.TRANSACTION_ABORTED
  293. ]);
  294. function userExplicitlyEndedTransaction(session) {
  295. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  296. }
  297. function attemptTransaction(session, startTime, fn, options) {
  298. session.startTransaction(options);
  299. let promise;
  300. try {
  301. promise = fn(session);
  302. } catch (err) {
  303. promise = Promise.reject(err);
  304. }
  305. if (!isPromiseLike(promise)) {
  306. session.abortTransaction();
  307. throw new TypeError('Function provided to `withTransaction` must return a Promise');
  308. }
  309. return promise
  310. .then(() => {
  311. if (userExplicitlyEndedTransaction(session)) {
  312. return;
  313. }
  314. return attemptTransactionCommit(session, startTime, fn, options);
  315. })
  316. .catch(err => {
  317. function maybeRetryOrThrow(err) {
  318. if (
  319. err instanceof MongoError &&
  320. err.hasErrorLabel('TransientTransactionError') &&
  321. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
  322. ) {
  323. return attemptTransaction(session, startTime, fn, options);
  324. }
  325. if (isMaxTimeMSExpiredError(err)) {
  326. if (err.errorLabels == null) {
  327. err.errorLabels = [];
  328. }
  329. err.errorLabels.push('UnknownTransactionCommitResult');
  330. }
  331. throw err;
  332. }
  333. if (session.transaction.isActive) {
  334. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  335. }
  336. return maybeRetryOrThrow(err);
  337. });
  338. }
  339. function endTransaction(session, commandName, callback) {
  340. if (!assertAlive(session, callback)) {
  341. // checking result in case callback was called
  342. return;
  343. }
  344. // handle any initial problematic cases
  345. let txnState = session.transaction.state;
  346. if (txnState === TxnState.NO_TRANSACTION) {
  347. callback(new MongoError('No transaction started'));
  348. return;
  349. }
  350. if (commandName === 'commitTransaction') {
  351. if (
  352. txnState === TxnState.STARTING_TRANSACTION ||
  353. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  354. ) {
  355. // the transaction was never started, we can safely exit here
  356. session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
  357. callback(null, null);
  358. return;
  359. }
  360. if (txnState === TxnState.TRANSACTION_ABORTED) {
  361. callback(new MongoError('Cannot call commitTransaction after calling abortTransaction'));
  362. return;
  363. }
  364. } else {
  365. if (txnState === TxnState.STARTING_TRANSACTION) {
  366. // the transaction was never started, we can safely exit here
  367. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  368. callback(null, null);
  369. return;
  370. }
  371. if (txnState === TxnState.TRANSACTION_ABORTED) {
  372. callback(new MongoError('Cannot call abortTransaction twice'));
  373. return;
  374. }
  375. if (
  376. txnState === TxnState.TRANSACTION_COMMITTED ||
  377. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  378. ) {
  379. callback(new MongoError('Cannot call abortTransaction after calling commitTransaction'));
  380. return;
  381. }
  382. }
  383. // construct and send the command
  384. const command = { [commandName]: 1 };
  385. // apply a writeConcern if specified
  386. let writeConcern;
  387. if (session.transaction.options.writeConcern) {
  388. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  389. } else if (session.clientOptions && session.clientOptions.w) {
  390. writeConcern = { w: session.clientOptions.w };
  391. }
  392. if (txnState === TxnState.TRANSACTION_COMMITTED) {
  393. writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
  394. }
  395. if (writeConcern) {
  396. Object.assign(command, { writeConcern });
  397. }
  398. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  399. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  400. }
  401. function commandHandler(e, r) {
  402. if (commandName === 'commitTransaction') {
  403. session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
  404. if (
  405. e &&
  406. (e instanceof MongoNetworkError ||
  407. e instanceof MongoWriteConcernError ||
  408. isRetryableError(e) ||
  409. isMaxTimeMSExpiredError(e))
  410. ) {
  411. if (e.errorLabels) {
  412. const idx = e.errorLabels.indexOf('TransientTransactionError');
  413. if (idx !== -1) {
  414. e.errorLabels.splice(idx, 1);
  415. }
  416. } else {
  417. e.errorLabels = [];
  418. }
  419. if (isUnknownTransactionCommitResult(e)) {
  420. e.errorLabels.push('UnknownTransactionCommitResult');
  421. // per txns spec, must unpin session in this case
  422. session.transaction.unpinServer();
  423. }
  424. }
  425. } else {
  426. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  427. }
  428. callback(e, r);
  429. }
  430. // The spec indicates that we should ignore all errors on `abortTransaction`
  431. function transactionError(err) {
  432. return commandName === 'commitTransaction' ? err : null;
  433. }
  434. if (
  435. // Assumption here that commandName is "commitTransaction" or "abortTransaction"
  436. session.transaction.recoveryToken &&
  437. supportsRecoveryToken(session)
  438. ) {
  439. command.recoveryToken = session.transaction.recoveryToken;
  440. }
  441. // send the command
  442. session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
  443. if (err && isRetryableError(err)) {
  444. // SPEC-1185: apply majority write concern when retrying commitTransaction
  445. if (command.commitTransaction) {
  446. // per txns spec, must unpin session in this case
  447. session.transaction.unpinServer();
  448. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  449. w: 'majority'
  450. });
  451. }
  452. return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>
  453. commandHandler(transactionError(_err), _reply)
  454. );
  455. }
  456. commandHandler(transactionError(err), reply);
  457. });
  458. }
  459. function supportsRecoveryToken(session) {
  460. const topology = session.topology;
  461. return !!topology.s.options.useRecoveryToken;
  462. }
  463. /**
  464. * Reflects the existence of a session on the server. Can be reused by the session pool.
  465. * WARNING: not meant to be instantiated directly. For internal use only.
  466. * @ignore
  467. */
  468. class ServerSession {
  469. constructor() {
  470. this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
  471. this.lastUse = Date.now();
  472. this.txnNumber = 0;
  473. this.isDirty = false;
  474. }
  475. /**
  476. * Determines if the server session has timed out.
  477. * @ignore
  478. * @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes"
  479. * @return {boolean} true if the session has timed out.
  480. */
  481. hasTimedOut(sessionTimeoutMinutes) {
  482. // Take the difference of the lastUse timestamp and now, which will result in a value in
  483. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  484. const idleTimeMinutes = Math.round(
  485. (((Date.now() - this.lastUse) % 86400000) % 3600000) / 60000
  486. );
  487. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  488. }
  489. }
  490. /**
  491. * Maintains a pool of Server Sessions.
  492. * For internal use only
  493. * @ignore
  494. */
  495. class ServerSessionPool {
  496. constructor(topology) {
  497. if (topology == null) {
  498. throw new Error('ServerSessionPool requires a topology');
  499. }
  500. this.topology = topology;
  501. this.sessions = [];
  502. }
  503. /**
  504. * Ends all sessions in the session pool.
  505. * @ignore
  506. */
  507. endAllPooledSessions() {
  508. if (this.sessions.length) {
  509. this.topology.endSessions(this.sessions.map(session => session.id));
  510. this.sessions = [];
  511. }
  512. }
  513. /**
  514. * Acquire a Server Session from the pool.
  515. * Iterates through each session in the pool, removing any stale sessions
  516. * along the way. The first non-stale session found is removed from the
  517. * pool and returned. If no non-stale session is found, a new ServerSession
  518. * is created.
  519. * @ignore
  520. * @returns {ServerSession}
  521. */
  522. acquire() {
  523. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  524. while (this.sessions.length) {
  525. const session = this.sessions.shift();
  526. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  527. return session;
  528. }
  529. }
  530. return new ServerSession();
  531. }
  532. /**
  533. * Release a session to the session pool
  534. * Adds the session back to the session pool if the session has not timed out yet.
  535. * This method also removes any stale sessions from the pool.
  536. * @ignore
  537. * @param {ServerSession} session The session to release to the pool
  538. */
  539. release(session) {
  540. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  541. while (this.sessions.length) {
  542. const pooledSession = this.sessions[this.sessions.length - 1];
  543. if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
  544. this.sessions.pop();
  545. } else {
  546. break;
  547. }
  548. }
  549. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  550. if (session.isDirty) {
  551. return;
  552. }
  553. // otherwise, readd this session to the session pool
  554. this.sessions.unshift(session);
  555. }
  556. }
  557. }
  558. // TODO: this should be codified in command construction
  559. // @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
  560. function commandSupportsReadConcern(command, options) {
  561. if (
  562. command.aggregate ||
  563. command.count ||
  564. command.distinct ||
  565. command.find ||
  566. command.parallelCollectionScan ||
  567. command.geoNear ||
  568. command.geoSearch
  569. ) {
  570. return true;
  571. }
  572. if (command.mapReduce && options.out && (options.out.inline === 1 || options.out === 'inline')) {
  573. return true;
  574. }
  575. return false;
  576. }
  577. /**
  578. * Optionally decorate a command with sessions specific keys
  579. *
  580. * @ignore
  581. * @param {ClientSession} session the session tracking transaction state
  582. * @param {Object} command the command to decorate
  583. * @param {Object} topology the topology for tracking the cluster time
  584. * @param {Object} [options] Optional settings passed to calling operation
  585. * @return {MongoError|null} An error, if some error condition was met
  586. */
  587. function applySession(session, command, options) {
  588. const serverSession = session.serverSession;
  589. if (serverSession == null) {
  590. // TODO: merge this with `assertAlive`, did not want to throw a try/catch here
  591. return new MongoError('Cannot use a session that has ended');
  592. }
  593. // mark the last use of this session, and apply the `lsid`
  594. serverSession.lastUse = Date.now();
  595. command.lsid = serverSession.id;
  596. // first apply non-transaction-specific sessions data
  597. const inTransaction = session.inTransaction() || isTransactionCommand(command);
  598. const isRetryableWrite = options.willRetryWrite;
  599. const shouldApplyReadConcern = commandSupportsReadConcern(command);
  600. if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
  601. command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
  602. }
  603. // now attempt to apply transaction-specific sessions data
  604. if (!inTransaction) {
  605. if (session.transaction.state !== TxnState.NO_TRANSACTION) {
  606. session.transaction.transition(TxnState.NO_TRANSACTION);
  607. }
  608. // TODO: the following should only be applied to read operation per spec.
  609. // for causal consistency
  610. if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
  611. command.readConcern = command.readConcern || {};
  612. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  613. }
  614. return;
  615. }
  616. if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) {
  617. return new MongoError(
  618. `Read preference in a transaction must be primary, not: ${options.readPreference.mode}`
  619. );
  620. }
  621. // `autocommit` must always be false to differentiate from retryable writes
  622. command.autocommit = false;
  623. if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
  624. session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
  625. command.startTransaction = true;
  626. const readConcern =
  627. session.transaction.options.readConcern || session.clientOptions.readConcern;
  628. if (readConcern) {
  629. command.readConcern = readConcern;
  630. }
  631. if (session.supports.causalConsistency && session.operationTime) {
  632. command.readConcern = command.readConcern || {};
  633. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  634. }
  635. }
  636. }
  637. function updateSessionFromResponse(session, document) {
  638. if (document.$clusterTime) {
  639. resolveClusterTime(session, document.$clusterTime);
  640. }
  641. if (document.operationTime && session && session.supports.causalConsistency) {
  642. session.advanceOperationTime(document.operationTime);
  643. }
  644. if (document.recoveryToken && session && session.inTransaction()) {
  645. session.transaction._recoveryToken = document.recoveryToken;
  646. }
  647. }
  648. module.exports = {
  649. ClientSession,
  650. ServerSession,
  651. ServerSessionPool,
  652. TxnState,
  653. applySession,
  654. updateSessionFromResponse,
  655. commandSupportsReadConcern
  656. };