Source: lib/core/sessions.js

  1. 'use strict';
  2. const retrieveBSON = require('./connection/utils').retrieveBSON;
  3. const EventEmitter = require('events');
  4. const BSON = retrieveBSON();
  5. const Binary = BSON.Binary;
  6. const uuidV4 = require('./utils').uuidV4;
  7. const MongoError = require('./error').MongoError;
  8. const isRetryableError = require('././error').isRetryableError;
  9. const MongoNetworkError = require('./error').MongoNetworkError;
  10. const MongoWriteConcernError = require('./error').MongoWriteConcernError;
  11. const Transaction = require('./transactions').Transaction;
  12. const TxnState = require('./transactions').TxnState;
  13. const isPromiseLike = require('./utils').isPromiseLike;
  14. const ReadPreference = require('./topologies/read_preference');
  15. const isTransactionCommand = require('./transactions').isTransactionCommand;
  16. const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
  17. const isSharded = require('./wireprotocol/shared').isSharded;
  18. const maxWireVersion = require('./utils').maxWireVersion;
  19. const minWireVersionForShardedTransactions = 8;
  20. function assertAlive(session, callback) {
  21. if (session.serverSession == null) {
  22. const error = new MongoError('Cannot use a session that has ended');
  23. if (typeof callback === 'function') {
  24. callback(error, null);
  25. return false;
  26. }
  27. throw error;
  28. }
  29. return true;
  30. }
  31. /**
  32. * Options to pass when creating a Client Session
  33. * @typedef {Object} SessionOptions
  34. * @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session
  35. * @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
  36. */
  37. /**
  38. * A BSON document reflecting the lsid of a {@link ClientSession}
  39. * @typedef {Object} SessionId
  40. */
  41. /**
  42. * A class representing a client session on the server
  43. * WARNING: not meant to be instantiated directly.
  44. * @class
  45. * @hideconstructor
  46. */
  47. class ClientSession extends EventEmitter {
  48. /**
  49. * Create a client session.
  50. * WARNING: not meant to be instantiated directly
  51. *
  52. * @param {Topology} topology The current client's topology (Internal Class)
  53. * @param {ServerSessionPool} sessionPool The server session pool (Internal Class)
  54. * @param {SessionOptions} [options] Optional settings
  55. * @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver
  56. */
  57. constructor(topology, sessionPool, options, clientOptions) {
  58. super();
  59. if (topology == null) {
  60. throw new Error('ClientSession requires a topology');
  61. }
  62. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  63. throw new Error('ClientSession requires a ServerSessionPool');
  64. }
  65. options = options || {};
  66. clientOptions = clientOptions || {};
  67. this.topology = topology;
  68. this.sessionPool = sessionPool;
  69. this.hasEnded = false;
  70. this.serverSession = sessionPool.acquire();
  71. this.clientOptions = clientOptions;
  72. this.supports = {
  73. causalConsistency:
  74. typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true
  75. };
  76. this.clusterTime = options.initialClusterTime;
  77. this.operationTime = null;
  78. this.explicit = !!options.explicit;
  79. this.owner = options.owner;
  80. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  81. this.transaction = new Transaction();
  82. }
  83. /**
  84. * The server id associated with this session
  85. * @type {SessionId}
  86. */
  87. get id() {
  88. return this.serverSession.id;
  89. }
  90. /**
  91. * Ends this session on the server
  92. *
  93. * @param {Object} [options] Optional settings. Currently reserved for future use
  94. * @param {Function} [callback] Optional callback for completion of this operation
  95. */
  96. endSession(options, callback) {
  97. if (typeof options === 'function') (callback = options), (options = {});
  98. options = options || {};
  99. if (this.hasEnded) {
  100. if (typeof callback === 'function') callback(null, null);
  101. return;
  102. }
  103. if (this.serverSession && this.inTransaction()) {
  104. this.abortTransaction(); // pass in callback?
  105. }
  106. // mark the session as ended, and emit a signal
  107. this.hasEnded = true;
  108. this.emit('ended', this);
  109. // release the server session back to the pool
  110. this.sessionPool.release(this.serverSession);
  111. this.serverSession = null;
  112. // spec indicates that we should ignore all errors for `endSessions`
  113. if (typeof callback === 'function') callback(null, null);
  114. }
  115. /**
  116. * Advances the operationTime for a ClientSession.
  117. *
  118. * @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
  119. */
  120. advanceOperationTime(operationTime) {
  121. if (this.operationTime == null) {
  122. this.operationTime = operationTime;
  123. return;
  124. }
  125. if (operationTime.greaterThan(this.operationTime)) {
  126. this.operationTime = operationTime;
  127. }
  128. }
  129. /**
  130. * Used to determine if this session equals another
  131. * @param {ClientSession} session
  132. * @return {boolean} true if the sessions are equal
  133. */
  134. equals(session) {
  135. if (!(session instanceof ClientSession)) {
  136. return false;
  137. }
  138. return this.id.id.buffer.equals(session.id.id.buffer);
  139. }
  140. /**
  141. * Increment the transaction number on the internal ServerSession
  142. */
  143. incrementTransactionNumber() {
  144. this.serverSession.txnNumber++;
  145. }
  146. /**
  147. * @returns {boolean} whether this session is currently in a transaction or not
  148. */
  149. inTransaction() {
  150. return this.transaction.isActive;
  151. }
  152. /**
  153. * Starts a new transaction with the given options.
  154. *
  155. * @param {TransactionOptions} options Options for the transaction
  156. */
  157. startTransaction(options) {
  158. assertAlive(this);
  159. if (this.inTransaction()) {
  160. throw new MongoError('Transaction already in progress');
  161. }
  162. const topologyMaxWireVersion = maxWireVersion(this.topology);
  163. if (
  164. isSharded(this.topology) &&
  165. topologyMaxWireVersion != null &&
  166. topologyMaxWireVersion < minWireVersionForShardedTransactions
  167. ) {
  168. throw new MongoError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  169. }
  170. // increment txnNumber
  171. this.incrementTransactionNumber();
  172. // create transaction state
  173. this.transaction = new Transaction(
  174. Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
  175. );
  176. this.transaction.transition(TxnState.STARTING_TRANSACTION);
  177. }
  178. /**
  179. * Commits the currently active transaction in this session.
  180. *
  181. * @param {Function} [callback] optional callback for completion of this operation
  182. * @return {Promise} A promise is returned if no callback is provided
  183. */
  184. commitTransaction(callback) {
  185. if (typeof callback === 'function') {
  186. endTransaction(this, 'commitTransaction', callback);
  187. return;
  188. }
  189. return new Promise((resolve, reject) => {
  190. endTransaction(this, 'commitTransaction', (err, reply) =>
  191. err ? reject(err) : resolve(reply)
  192. );
  193. });
  194. }
  195. /**
  196. * Aborts the currently active transaction in this session.
  197. *
  198. * @param {Function} [callback] optional callback for completion of this operation
  199. * @return {Promise} A promise is returned if no callback is provided
  200. */
  201. abortTransaction(callback) {
  202. if (typeof callback === 'function') {
  203. endTransaction(this, 'abortTransaction', callback);
  204. return;
  205. }
  206. return new Promise((resolve, reject) => {
  207. endTransaction(this, 'abortTransaction', (err, reply) =>
  208. err ? reject(err) : resolve(reply)
  209. );
  210. });
  211. }
  212. /**
  213. * This is here to ensure that ClientSession is never serialized to BSON.
  214. * @ignore
  215. */
  216. toBSON() {
  217. throw new Error('ClientSession cannot be serialized to BSON.');
  218. }
  219. /**
  220. * A user provided function to be run within a transaction
  221. *
  222. * @callback WithTransactionCallback
  223. * @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
  224. * @returns {Promise} The resulting Promise of operations run within this transaction
  225. */
  226. /**
  227. * Runs a provided lambda within a transaction, retrying either the commit operation
  228. * or entire transaction as needed (and when the error permits) to better ensure that
  229. * the transaction can complete successfully.
  230. *
  231. * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
  232. * return a Promise will result in undefined behavior.
  233. *
  234. * @param {WithTransactionCallback} fn
  235. * @param {TransactionOptions} [options] Optional settings for the transaction
  236. */
  237. withTransaction(fn, options) {
  238. const startTime = Date.now();
  239. return attemptTransaction(this, startTime, fn, options);
  240. }
  241. }
  242. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  243. const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
  244. const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
  245. const MAX_TIME_MS_EXPIRED_CODE = 50;
  246. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  247. 'CannotSatisfyWriteConcern',
  248. 'UnknownReplWriteConcern',
  249. 'UnsatisfiableWriteConcern'
  250. ]);
  251. function hasNotTimedOut(startTime, max) {
  252. return Date.now() - startTime < max;
  253. }
  254. function isUnknownTransactionCommitResult(err) {
  255. return (
  256. isMaxTimeMSExpiredError(err) ||
  257. (!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
  258. err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
  259. err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE)
  260. );
  261. }
  262. function isMaxTimeMSExpiredError(err) {
  263. return (
  264. err.code === MAX_TIME_MS_EXPIRED_CODE ||
  265. (err.writeConcernError && err.writeConcernError.code === MAX_TIME_MS_EXPIRED_CODE)
  266. );
  267. }
  268. function attemptTransactionCommit(session, startTime, fn, options) {
  269. return session.commitTransaction().catch(err => {
  270. if (
  271. err instanceof MongoError &&
  272. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  273. !isMaxTimeMSExpiredError(err)
  274. ) {
  275. if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
  276. return attemptTransactionCommit(session, startTime, fn, options);
  277. }
  278. if (err.hasErrorLabel('TransientTransactionError')) {
  279. return attemptTransaction(session, startTime, fn, options);
  280. }
  281. }
  282. throw err;
  283. });
  284. }
  285. const USER_EXPLICIT_TXN_END_STATES = new Set([
  286. TxnState.NO_TRANSACTION,
  287. TxnState.TRANSACTION_COMMITTED,
  288. TxnState.TRANSACTION_ABORTED
  289. ]);
  290. function userExplicitlyEndedTransaction(session) {
  291. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  292. }
  293. function attemptTransaction(session, startTime, fn, options) {
  294. session.startTransaction(options);
  295. let promise;
  296. try {
  297. promise = fn(session);
  298. } catch (err) {
  299. promise = Promise.reject(err);
  300. }
  301. if (!isPromiseLike(promise)) {
  302. session.abortTransaction();
  303. throw new TypeError('Function provided to `withTransaction` must return a Promise');
  304. }
  305. return promise
  306. .then(() => {
  307. if (userExplicitlyEndedTransaction(session)) {
  308. return;
  309. }
  310. return attemptTransactionCommit(session, startTime, fn, options);
  311. })
  312. .catch(err => {
  313. function maybeRetryOrThrow(err) {
  314. if (
  315. err instanceof MongoError &&
  316. err.hasErrorLabel('TransientTransactionError') &&
  317. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
  318. ) {
  319. return attemptTransaction(session, startTime, fn, options);
  320. }
  321. if (isMaxTimeMSExpiredError(err)) {
  322. if (err.errorLabels == null) {
  323. err.errorLabels = [];
  324. }
  325. err.errorLabels.push('UnknownTransactionCommitResult');
  326. }
  327. throw err;
  328. }
  329. if (session.transaction.isActive) {
  330. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  331. }
  332. return maybeRetryOrThrow(err);
  333. });
  334. }
  335. function endTransaction(session, commandName, callback) {
  336. if (!assertAlive(session, callback)) {
  337. // checking result in case callback was called
  338. return;
  339. }
  340. // handle any initial problematic cases
  341. let txnState = session.transaction.state;
  342. if (txnState === TxnState.NO_TRANSACTION) {
  343. callback(new MongoError('No transaction started'));
  344. return;
  345. }
  346. if (commandName === 'commitTransaction') {
  347. if (
  348. txnState === TxnState.STARTING_TRANSACTION ||
  349. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  350. ) {
  351. // the transaction was never started, we can safely exit here
  352. session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
  353. callback(null, null);
  354. return;
  355. }
  356. if (txnState === TxnState.TRANSACTION_ABORTED) {
  357. callback(new MongoError('Cannot call commitTransaction after calling abortTransaction'));
  358. return;
  359. }
  360. } else {
  361. if (txnState === TxnState.STARTING_TRANSACTION) {
  362. // the transaction was never started, we can safely exit here
  363. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  364. callback(null, null);
  365. return;
  366. }
  367. if (txnState === TxnState.TRANSACTION_ABORTED) {
  368. callback(new MongoError('Cannot call abortTransaction twice'));
  369. return;
  370. }
  371. if (
  372. txnState === TxnState.TRANSACTION_COMMITTED ||
  373. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  374. ) {
  375. callback(new MongoError('Cannot call abortTransaction after calling commitTransaction'));
  376. return;
  377. }
  378. }
  379. // construct and send the command
  380. const command = { [commandName]: 1 };
  381. // apply a writeConcern if specified
  382. let writeConcern;
  383. if (session.transaction.options.writeConcern) {
  384. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  385. } else if (session.clientOptions && session.clientOptions.w) {
  386. writeConcern = { w: session.clientOptions.w };
  387. }
  388. if (txnState === TxnState.TRANSACTION_COMMITTED) {
  389. writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
  390. }
  391. if (writeConcern) {
  392. Object.assign(command, { writeConcern });
  393. }
  394. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  395. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  396. }
  397. function commandHandler(e, r) {
  398. if (commandName === 'commitTransaction') {
  399. session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
  400. if (
  401. e &&
  402. (e instanceof MongoNetworkError ||
  403. e instanceof MongoWriteConcernError ||
  404. isRetryableError(e) ||
  405. isMaxTimeMSExpiredError(e))
  406. ) {
  407. if (e.errorLabels) {
  408. const idx = e.errorLabels.indexOf('TransientTransactionError');
  409. if (idx !== -1) {
  410. e.errorLabels.splice(idx, 1);
  411. }
  412. } else {
  413. e.errorLabels = [];
  414. }
  415. if (isUnknownTransactionCommitResult(e)) {
  416. e.errorLabels.push('UnknownTransactionCommitResult');
  417. // per txns spec, must unpin session in this case
  418. session.transaction.unpinServer();
  419. }
  420. }
  421. } else {
  422. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  423. }
  424. callback(e, r);
  425. }
  426. // The spec indicates that we should ignore all errors on `abortTransaction`
  427. function transactionError(err) {
  428. return commandName === 'commitTransaction' ? err : null;
  429. }
  430. if (
  431. // Assumption here that commandName is "commitTransaction" or "abortTransaction"
  432. session.transaction.recoveryToken &&
  433. supportsRecoveryToken(session)
  434. ) {
  435. command.recoveryToken = session.transaction.recoveryToken;
  436. }
  437. // send the command
  438. session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
  439. if (err && isRetryableError(err)) {
  440. // SPEC-1185: apply majority write concern when retrying commitTransaction
  441. if (command.commitTransaction) {
  442. // per txns spec, must unpin session in this case
  443. session.transaction.unpinServer();
  444. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  445. w: 'majority'
  446. });
  447. }
  448. return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>
  449. commandHandler(transactionError(_err), _reply)
  450. );
  451. }
  452. commandHandler(transactionError(err), reply);
  453. });
  454. }
  455. function supportsRecoveryToken(session) {
  456. const topology = session.topology;
  457. return !!topology.s.options.useRecoveryToken;
  458. }
  459. /**
  460. * Reflects the existence of a session on the server. Can be reused by the session pool.
  461. * WARNING: not meant to be instantiated directly. For internal use only.
  462. * @ignore
  463. */
  464. class ServerSession {
  465. constructor() {
  466. this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
  467. this.lastUse = Date.now();
  468. this.txnNumber = 0;
  469. this.isDirty = false;
  470. }
  471. /**
  472. * Determines if the server session has timed out.
  473. * @ignore
  474. * @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes"
  475. * @return {boolean} true if the session has timed out.
  476. */
  477. hasTimedOut(sessionTimeoutMinutes) {
  478. // Take the difference of the lastUse timestamp and now, which will result in a value in
  479. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  480. const idleTimeMinutes = Math.round(
  481. (((Date.now() - this.lastUse) % 86400000) % 3600000) / 60000
  482. );
  483. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  484. }
  485. }
  486. /**
  487. * Maintains a pool of Server Sessions.
  488. * For internal use only
  489. * @ignore
  490. */
  491. class ServerSessionPool {
  492. constructor(topology) {
  493. if (topology == null) {
  494. throw new Error('ServerSessionPool requires a topology');
  495. }
  496. this.topology = topology;
  497. this.sessions = [];
  498. }
  499. /**
  500. * Ends all sessions in the session pool.
  501. * @ignore
  502. */
  503. endAllPooledSessions() {
  504. if (this.sessions.length) {
  505. this.topology.endSessions(this.sessions.map(session => session.id));
  506. this.sessions = [];
  507. }
  508. }
  509. /**
  510. * Acquire a Server Session from the pool.
  511. * Iterates through each session in the pool, removing any stale sessions
  512. * along the way. The first non-stale session found is removed from the
  513. * pool and returned. If no non-stale session is found, a new ServerSession
  514. * is created.
  515. * @ignore
  516. * @returns {ServerSession}
  517. */
  518. acquire() {
  519. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  520. while (this.sessions.length) {
  521. const session = this.sessions.shift();
  522. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  523. return session;
  524. }
  525. }
  526. return new ServerSession();
  527. }
  528. /**
  529. * Release a session to the session pool
  530. * Adds the session back to the session pool if the session has not timed out yet.
  531. * This method also removes any stale sessions from the pool.
  532. * @ignore
  533. * @param {ServerSession} session The session to release to the pool
  534. */
  535. release(session) {
  536. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  537. while (this.sessions.length) {
  538. const pooledSession = this.sessions[this.sessions.length - 1];
  539. if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
  540. this.sessions.pop();
  541. } else {
  542. break;
  543. }
  544. }
  545. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  546. if (session.isDirty) {
  547. return;
  548. }
  549. // otherwise, readd this session to the session pool
  550. this.sessions.unshift(session);
  551. }
  552. }
  553. }
  554. // TODO: this should be codified in command construction
  555. // @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
  556. function commandSupportsReadConcern(command, options) {
  557. if (
  558. command.aggregate ||
  559. command.count ||
  560. command.distinct ||
  561. command.find ||
  562. command.parallelCollectionScan ||
  563. command.geoNear ||
  564. command.geoSearch
  565. ) {
  566. return true;
  567. }
  568. if (command.mapReduce && options.out && (options.out.inline === 1 || options.out === 'inline')) {
  569. return true;
  570. }
  571. return false;
  572. }
  573. /**
  574. * Optionally decorate a command with sessions specific keys
  575. *
  576. * @ignore
  577. * @param {ClientSession} session the session tracking transaction state
  578. * @param {Object} command the command to decorate
  579. * @param {Object} topology the topology for tracking the cluster time
  580. * @param {Object} [options] Optional settings passed to calling operation
  581. * @return {MongoError|null} An error, if some error condition was met
  582. */
  583. function applySession(session, command, options) {
  584. const serverSession = session.serverSession;
  585. if (serverSession == null) {
  586. // TODO: merge this with `assertAlive`, did not want to throw a try/catch here
  587. return new MongoError('Cannot use a session that has ended');
  588. }
  589. // mark the last use of this session, and apply the `lsid`
  590. serverSession.lastUse = Date.now();
  591. command.lsid = serverSession.id;
  592. // first apply non-transaction-specific sessions data
  593. const inTransaction = session.inTransaction() || isTransactionCommand(command);
  594. const isRetryableWrite = options.willRetryWrite;
  595. const shouldApplyReadConcern = commandSupportsReadConcern(command);
  596. if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
  597. command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
  598. }
  599. // now attempt to apply transaction-specific sessions data
  600. if (!inTransaction) {
  601. if (session.transaction.state !== TxnState.NO_TRANSACTION) {
  602. session.transaction.transition(TxnState.NO_TRANSACTION);
  603. }
  604. // TODO: the following should only be applied to read operation per spec.
  605. // for causal consistency
  606. if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
  607. command.readConcern = command.readConcern || {};
  608. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  609. }
  610. return;
  611. }
  612. if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) {
  613. return new MongoError(
  614. `Read preference in a transaction must be primary, not: ${options.readPreference.mode}`
  615. );
  616. }
  617. // `autocommit` must always be false to differentiate from retryable writes
  618. command.autocommit = false;
  619. if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
  620. session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
  621. command.startTransaction = true;
  622. const readConcern =
  623. session.transaction.options.readConcern || session.clientOptions.readConcern;
  624. if (readConcern) {
  625. command.readConcern = readConcern;
  626. }
  627. if (session.supports.causalConsistency && session.operationTime) {
  628. command.readConcern = command.readConcern || {};
  629. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  630. }
  631. }
  632. }
  633. function updateSessionFromResponse(session, document) {
  634. if (document.$clusterTime) {
  635. resolveClusterTime(session, document.$clusterTime);
  636. }
  637. if (document.operationTime && session && session.supports.causalConsistency) {
  638. session.advanceOperationTime(document.operationTime);
  639. }
  640. if (document.recoveryToken && session && session.inTransaction()) {
  641. session.transaction._recoveryToken = document.recoveryToken;
  642. }
  643. }
  644. module.exports = {
  645. ClientSession,
  646. ServerSession,
  647. ServerSessionPool,
  648. TxnState,
  649. applySession,
  650. updateSessionFromResponse,
  651. commandSupportsReadConcern
  652. };