Source: lib/core/sessions.js

  1. 'use strict';
  2. const retrieveBSON = require('./connection/utils').retrieveBSON;
  3. const EventEmitter = require('events');
  4. const BSON = retrieveBSON();
  5. const Binary = BSON.Binary;
  6. const uuidV4 = require('./utils').uuidV4;
  7. const MongoError = require('./error').MongoError;
  8. const isRetryableError = require('././error').isRetryableError;
  9. const MongoNetworkError = require('./error').MongoNetworkError;
  10. const MongoWriteConcernError = require('./error').MongoWriteConcernError;
  11. const Transaction = require('./transactions').Transaction;
  12. const TxnState = require('./transactions').TxnState;
  13. const isPromiseLike = require('./utils').isPromiseLike;
  14. const ReadPreference = require('./topologies/read_preference');
  15. const isTransactionCommand = require('./transactions').isTransactionCommand;
  16. const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
  17. const isSharded = require('./wireprotocol/shared').isSharded;
  18. const maxWireVersion = require('./utils').maxWireVersion;
  19. const now = require('./../utils').now;
  20. const calculateDurationInMs = require('./../utils').calculateDurationInMs;
  21. const minWireVersionForShardedTransactions = 8;
  22. function assertAlive(session, callback) {
  23. if (session.serverSession == null) {
  24. const error = new MongoError('Cannot use a session that has ended');
  25. if (typeof callback === 'function') {
  26. callback(error, null);
  27. return false;
  28. }
  29. throw error;
  30. }
  31. return true;
  32. }
  33. /**
  34. * Options to pass when creating a Client Session
  35. * @typedef {Object} SessionOptions
  36. * @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session
  37. * @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
  38. */
  39. /**
  40. * A BSON document reflecting the lsid of a {@link ClientSession}
  41. * @typedef {Object} SessionId
  42. */
  43. const kServerSession = Symbol('serverSession');
  44. /**
  45. * A class representing a client session on the server
  46. * WARNING: not meant to be instantiated directly.
  47. * @class
  48. * @hideconstructor
  49. */
  50. class ClientSession extends EventEmitter {
  51. /**
  52. * Create a client session.
  53. * WARNING: not meant to be instantiated directly
  54. *
  55. * @param {Topology} topology The current client's topology (Internal Class)
  56. * @param {ServerSessionPool} sessionPool The server session pool (Internal Class)
  57. * @param {SessionOptions} [options] Optional settings
  58. * @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver
  59. */
  60. constructor(topology, sessionPool, options, clientOptions) {
  61. super();
  62. if (topology == null) {
  63. throw new Error('ClientSession requires a topology');
  64. }
  65. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  66. throw new Error('ClientSession requires a ServerSessionPool');
  67. }
  68. options = options || {};
  69. clientOptions = clientOptions || {};
  70. this.topology = topology;
  71. this.sessionPool = sessionPool;
  72. this.hasEnded = false;
  73. this.clientOptions = clientOptions;
  74. this[kServerSession] = undefined;
  75. this.supports = {
  76. causalConsistency:
  77. typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true
  78. };
  79. this.clusterTime = options.initialClusterTime;
  80. this.operationTime = null;
  81. this.explicit = !!options.explicit;
  82. this.owner = options.owner;
  83. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  84. this.transaction = new Transaction();
  85. }
  86. /**
  87. * The server id associated with this session
  88. * @type {SessionId}
  89. */
  90. get id() {
  91. return this.serverSession.id;
  92. }
  93. get serverSession() {
  94. if (this[kServerSession] == null) {
  95. this[kServerSession] = this.sessionPool.acquire();
  96. }
  97. return this[kServerSession];
  98. }
  99. /**
  100. * Ends this session on the server
  101. *
  102. * @param {Object} [options] Optional settings. Currently reserved for future use
  103. * @param {Function} [callback] Optional callback for completion of this operation
  104. */
  105. endSession(options, callback) {
  106. if (typeof options === 'function') (callback = options), (options = {});
  107. options = options || {};
  108. if (this.hasEnded) {
  109. if (typeof callback === 'function') callback(null, null);
  110. return;
  111. }
  112. if (this.serverSession && this.inTransaction()) {
  113. this.abortTransaction(); // pass in callback?
  114. }
  115. // release the server session back to the pool
  116. this.sessionPool.release(this.serverSession);
  117. this[kServerSession] = undefined;
  118. // mark the session as ended, and emit a signal
  119. this.hasEnded = true;
  120. this.emit('ended', this);
  121. // spec indicates that we should ignore all errors for `endSessions`
  122. if (typeof callback === 'function') callback(null, null);
  123. }
  124. /**
  125. * Advances the operationTime for a ClientSession.
  126. *
  127. * @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
  128. */
  129. advanceOperationTime(operationTime) {
  130. if (this.operationTime == null) {
  131. this.operationTime = operationTime;
  132. return;
  133. }
  134. if (operationTime.greaterThan(this.operationTime)) {
  135. this.operationTime = operationTime;
  136. }
  137. }
  138. /**
  139. * Used to determine if this session equals another
  140. * @param {ClientSession} session
  141. * @return {boolean} true if the sessions are equal
  142. */
  143. equals(session) {
  144. if (!(session instanceof ClientSession)) {
  145. return false;
  146. }
  147. return this.id.id.buffer.equals(session.id.id.buffer);
  148. }
  149. /**
  150. * Increment the transaction number on the internal ServerSession
  151. */
  152. incrementTransactionNumber() {
  153. this.serverSession.txnNumber++;
  154. }
  155. /**
  156. * @returns {boolean} whether this session is currently in a transaction or not
  157. */
  158. inTransaction() {
  159. return this.transaction.isActive;
  160. }
  161. /**
  162. * Starts a new transaction with the given options.
  163. *
  164. * @param {TransactionOptions} options Options for the transaction
  165. */
  166. startTransaction(options) {
  167. assertAlive(this);
  168. if (this.inTransaction()) {
  169. throw new MongoError('Transaction already in progress');
  170. }
  171. const topologyMaxWireVersion = maxWireVersion(this.topology);
  172. if (
  173. isSharded(this.topology) &&
  174. topologyMaxWireVersion != null &&
  175. topologyMaxWireVersion < minWireVersionForShardedTransactions
  176. ) {
  177. throw new MongoError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  178. }
  179. // increment txnNumber
  180. this.incrementTransactionNumber();
  181. // create transaction state
  182. this.transaction = new Transaction(
  183. Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
  184. );
  185. this.transaction.transition(TxnState.STARTING_TRANSACTION);
  186. }
  187. /**
  188. * Commits the currently active transaction in this session.
  189. *
  190. * @param {Function} [callback] optional callback for completion of this operation
  191. * @return {Promise} A promise is returned if no callback is provided
  192. */
  193. commitTransaction(callback) {
  194. if (typeof callback === 'function') {
  195. endTransaction(this, 'commitTransaction', callback);
  196. return;
  197. }
  198. return new Promise((resolve, reject) => {
  199. endTransaction(this, 'commitTransaction', (err, reply) =>
  200. err ? reject(err) : resolve(reply)
  201. );
  202. });
  203. }
  204. /**
  205. * Aborts the currently active transaction in this session.
  206. *
  207. * @param {Function} [callback] optional callback for completion of this operation
  208. * @return {Promise} A promise is returned if no callback is provided
  209. */
  210. abortTransaction(callback) {
  211. if (typeof callback === 'function') {
  212. endTransaction(this, 'abortTransaction', callback);
  213. return;
  214. }
  215. return new Promise((resolve, reject) => {
  216. endTransaction(this, 'abortTransaction', (err, reply) =>
  217. err ? reject(err) : resolve(reply)
  218. );
  219. });
  220. }
  221. /**
  222. * This is here to ensure that ClientSession is never serialized to BSON.
  223. * @ignore
  224. */
  225. toBSON() {
  226. throw new Error('ClientSession cannot be serialized to BSON.');
  227. }
  228. /**
  229. * A user provided function to be run within a transaction
  230. *
  231. * @callback WithTransactionCallback
  232. * @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
  233. * @returns {Promise} The resulting Promise of operations run within this transaction
  234. */
  235. /**
  236. * Runs a provided lambda within a transaction, retrying either the commit operation
  237. * or entire transaction as needed (and when the error permits) to better ensure that
  238. * the transaction can complete successfully.
  239. *
  240. * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
  241. * return a Promise will result in undefined behavior.
  242. *
  243. * @param {WithTransactionCallback} fn
  244. * @param {TransactionOptions} [options] Optional settings for the transaction
  245. */
  246. withTransaction(fn, options) {
  247. const startTime = now();
  248. return attemptTransaction(this, startTime, fn, options);
  249. }
  250. }
  251. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  252. const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
  253. const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
  254. const MAX_TIME_MS_EXPIRED_CODE = 50;
  255. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  256. 'CannotSatisfyWriteConcern',
  257. 'UnknownReplWriteConcern',
  258. 'UnsatisfiableWriteConcern'
  259. ]);
  260. function hasNotTimedOut(startTime, max) {
  261. return calculateDurationInMs(startTime) < max;
  262. }
  263. function isUnknownTransactionCommitResult(err) {
  264. return (
  265. isMaxTimeMSExpiredError(err) ||
  266. (!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
  267. err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
  268. err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE)
  269. );
  270. }
  271. function isMaxTimeMSExpiredError(err) {
  272. if (err == null) return false;
  273. return (
  274. err.code === MAX_TIME_MS_EXPIRED_CODE ||
  275. (err.writeConcernError && err.writeConcernError.code === MAX_TIME_MS_EXPIRED_CODE)
  276. );
  277. }
  278. function attemptTransactionCommit(session, startTime, fn, options) {
  279. return session.commitTransaction().catch(err => {
  280. if (
  281. err instanceof MongoError &&
  282. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  283. !isMaxTimeMSExpiredError(err)
  284. ) {
  285. if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
  286. return attemptTransactionCommit(session, startTime, fn, options);
  287. }
  288. if (err.hasErrorLabel('TransientTransactionError')) {
  289. return attemptTransaction(session, startTime, fn, options);
  290. }
  291. }
  292. throw err;
  293. });
  294. }
  295. const USER_EXPLICIT_TXN_END_STATES = new Set([
  296. TxnState.NO_TRANSACTION,
  297. TxnState.TRANSACTION_COMMITTED,
  298. TxnState.TRANSACTION_ABORTED
  299. ]);
  300. function userExplicitlyEndedTransaction(session) {
  301. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  302. }
  303. function attemptTransaction(session, startTime, fn, options) {
  304. session.startTransaction(options);
  305. let promise;
  306. try {
  307. promise = fn(session);
  308. } catch (err) {
  309. promise = Promise.reject(err);
  310. }
  311. if (!isPromiseLike(promise)) {
  312. session.abortTransaction();
  313. throw new TypeError('Function provided to `withTransaction` must return a Promise');
  314. }
  315. return promise
  316. .then(() => {
  317. if (userExplicitlyEndedTransaction(session)) {
  318. return;
  319. }
  320. return attemptTransactionCommit(session, startTime, fn, options);
  321. })
  322. .catch(err => {
  323. function maybeRetryOrThrow(err) {
  324. if (
  325. err instanceof MongoError &&
  326. err.hasErrorLabel('TransientTransactionError') &&
  327. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
  328. ) {
  329. return attemptTransaction(session, startTime, fn, options);
  330. }
  331. if (isMaxTimeMSExpiredError(err)) {
  332. if (err.errorLabels == null) {
  333. err.errorLabels = [];
  334. }
  335. err.errorLabels.push('UnknownTransactionCommitResult');
  336. }
  337. throw err;
  338. }
  339. if (session.transaction.isActive) {
  340. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  341. }
  342. return maybeRetryOrThrow(err);
  343. });
  344. }
  345. function endTransaction(session, commandName, callback) {
  346. if (!assertAlive(session, callback)) {
  347. // checking result in case callback was called
  348. return;
  349. }
  350. // handle any initial problematic cases
  351. let txnState = session.transaction.state;
  352. if (txnState === TxnState.NO_TRANSACTION) {
  353. callback(new MongoError('No transaction started'));
  354. return;
  355. }
  356. if (commandName === 'commitTransaction') {
  357. if (
  358. txnState === TxnState.STARTING_TRANSACTION ||
  359. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  360. ) {
  361. // the transaction was never started, we can safely exit here
  362. session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
  363. callback(null, null);
  364. return;
  365. }
  366. if (txnState === TxnState.TRANSACTION_ABORTED) {
  367. callback(new MongoError('Cannot call commitTransaction after calling abortTransaction'));
  368. return;
  369. }
  370. } else {
  371. if (txnState === TxnState.STARTING_TRANSACTION) {
  372. // the transaction was never started, we can safely exit here
  373. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  374. callback(null, null);
  375. return;
  376. }
  377. if (txnState === TxnState.TRANSACTION_ABORTED) {
  378. callback(new MongoError('Cannot call abortTransaction twice'));
  379. return;
  380. }
  381. if (
  382. txnState === TxnState.TRANSACTION_COMMITTED ||
  383. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  384. ) {
  385. callback(new MongoError('Cannot call abortTransaction after calling commitTransaction'));
  386. return;
  387. }
  388. }
  389. // construct and send the command
  390. const command = { [commandName]: 1 };
  391. // apply a writeConcern if specified
  392. let writeConcern;
  393. if (session.transaction.options.writeConcern) {
  394. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  395. } else if (session.clientOptions && session.clientOptions.w) {
  396. writeConcern = { w: session.clientOptions.w };
  397. }
  398. if (txnState === TxnState.TRANSACTION_COMMITTED) {
  399. writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
  400. }
  401. if (writeConcern) {
  402. Object.assign(command, { writeConcern });
  403. }
  404. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  405. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  406. }
  407. function commandHandler(e, r) {
  408. if (commandName === 'commitTransaction') {
  409. session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
  410. if (
  411. e &&
  412. (e instanceof MongoNetworkError ||
  413. e instanceof MongoWriteConcernError ||
  414. isRetryableError(e) ||
  415. isMaxTimeMSExpiredError(e))
  416. ) {
  417. if (e.errorLabels) {
  418. const idx = e.errorLabels.indexOf('TransientTransactionError');
  419. if (idx !== -1) {
  420. e.errorLabels.splice(idx, 1);
  421. }
  422. } else {
  423. e.errorLabels = [];
  424. }
  425. if (isUnknownTransactionCommitResult(e)) {
  426. e.errorLabels.push('UnknownTransactionCommitResult');
  427. // per txns spec, must unpin session in this case
  428. session.transaction.unpinServer();
  429. }
  430. }
  431. } else {
  432. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  433. }
  434. callback(e, r);
  435. }
  436. // The spec indicates that we should ignore all errors on `abortTransaction`
  437. function transactionError(err) {
  438. return commandName === 'commitTransaction' ? err : null;
  439. }
  440. if (
  441. // Assumption here that commandName is "commitTransaction" or "abortTransaction"
  442. session.transaction.recoveryToken &&
  443. supportsRecoveryToken(session)
  444. ) {
  445. command.recoveryToken = session.transaction.recoveryToken;
  446. }
  447. // send the command
  448. session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
  449. if (err && isRetryableError(err)) {
  450. // SPEC-1185: apply majority write concern when retrying commitTransaction
  451. if (command.commitTransaction) {
  452. // per txns spec, must unpin session in this case
  453. session.transaction.unpinServer();
  454. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  455. w: 'majority'
  456. });
  457. }
  458. return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>
  459. commandHandler(transactionError(_err), _reply)
  460. );
  461. }
  462. commandHandler(transactionError(err), reply);
  463. });
  464. }
  465. function supportsRecoveryToken(session) {
  466. const topology = session.topology;
  467. return !!topology.s.options.useRecoveryToken;
  468. }
  469. /**
  470. * Reflects the existence of a session on the server. Can be reused by the session pool.
  471. * WARNING: not meant to be instantiated directly. For internal use only.
  472. * @ignore
  473. */
  474. class ServerSession {
  475. constructor() {
  476. this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
  477. this.lastUse = now();
  478. this.txnNumber = 0;
  479. this.isDirty = false;
  480. }
  481. /**
  482. * Determines if the server session has timed out.
  483. * @ignore
  484. * @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes"
  485. * @return {boolean} true if the session has timed out.
  486. */
  487. hasTimedOut(sessionTimeoutMinutes) {
  488. // Take the difference of the lastUse timestamp and now, which will result in a value in
  489. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  490. const idleTimeMinutes = Math.round(
  491. ((calculateDurationInMs(this.lastUse) % 86400000) % 3600000) / 60000
  492. );
  493. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  494. }
  495. }
  496. /**
  497. * Maintains a pool of Server Sessions.
  498. * For internal use only
  499. * @ignore
  500. */
  501. class ServerSessionPool {
  502. constructor(topology) {
  503. if (topology == null) {
  504. throw new Error('ServerSessionPool requires a topology');
  505. }
  506. this.topology = topology;
  507. this.sessions = [];
  508. }
  509. /**
  510. * Ends all sessions in the session pool.
  511. * @ignore
  512. */
  513. endAllPooledSessions(callback) {
  514. if (this.sessions.length) {
  515. this.topology.endSessions(
  516. this.sessions.map(session => session.id),
  517. () => {
  518. this.sessions = [];
  519. if (typeof callback === 'function') {
  520. callback();
  521. }
  522. }
  523. );
  524. return;
  525. }
  526. if (typeof callback === 'function') {
  527. callback();
  528. }
  529. }
  530. /**
  531. * Acquire a Server Session from the pool.
  532. * Iterates through each session in the pool, removing any stale sessions
  533. * along the way. The first non-stale session found is removed from the
  534. * pool and returned. If no non-stale session is found, a new ServerSession
  535. * is created.
  536. * @ignore
  537. * @returns {ServerSession}
  538. */
  539. acquire() {
  540. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  541. while (this.sessions.length) {
  542. const session = this.sessions.shift();
  543. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  544. return session;
  545. }
  546. }
  547. return new ServerSession();
  548. }
  549. /**
  550. * Release a session to the session pool
  551. * Adds the session back to the session pool if the session has not timed out yet.
  552. * This method also removes any stale sessions from the pool.
  553. * @ignore
  554. * @param {ServerSession} session The session to release to the pool
  555. */
  556. release(session) {
  557. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  558. while (this.sessions.length) {
  559. const pooledSession = this.sessions[this.sessions.length - 1];
  560. if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
  561. this.sessions.pop();
  562. } else {
  563. break;
  564. }
  565. }
  566. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  567. if (session.isDirty) {
  568. return;
  569. }
  570. // otherwise, readd this session to the session pool
  571. this.sessions.unshift(session);
  572. }
  573. }
  574. }
  575. // TODO: this should be codified in command construction
  576. // @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
  577. function commandSupportsReadConcern(command, options) {
  578. if (
  579. command.aggregate ||
  580. command.count ||
  581. command.distinct ||
  582. command.find ||
  583. command.parallelCollectionScan ||
  584. command.geoNear ||
  585. command.geoSearch
  586. ) {
  587. return true;
  588. }
  589. if (command.mapReduce && options.out && (options.out.inline === 1 || options.out === 'inline')) {
  590. return true;
  591. }
  592. return false;
  593. }
  594. /**
  595. * Optionally decorate a command with sessions specific keys
  596. *
  597. * @ignore
  598. * @param {ClientSession} session the session tracking transaction state
  599. * @param {Object} command the command to decorate
  600. * @param {Object} topology the topology for tracking the cluster time
  601. * @param {Object} [options] Optional settings passed to calling operation
  602. * @return {MongoError|null} An error, if some error condition was met
  603. */
  604. function applySession(session, command, options) {
  605. if (session.hasEnded) {
  606. // TODO: merge this with `assertAlive`, did not want to throw a try/catch here
  607. return new MongoError('Cannot use a session that has ended');
  608. }
  609. // SPEC-1019: silently ignore explicit session with unacknowledged write for backwards compatibility
  610. if (options && options.writeConcern && options.writeConcern.w === 0) {
  611. return;
  612. }
  613. const serverSession = session.serverSession;
  614. serverSession.lastUse = now();
  615. command.lsid = serverSession.id;
  616. // first apply non-transaction-specific sessions data
  617. const inTransaction = session.inTransaction() || isTransactionCommand(command);
  618. const isRetryableWrite = options.willRetryWrite;
  619. const shouldApplyReadConcern = commandSupportsReadConcern(command);
  620. if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
  621. command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
  622. }
  623. // now attempt to apply transaction-specific sessions data
  624. if (!inTransaction) {
  625. if (session.transaction.state !== TxnState.NO_TRANSACTION) {
  626. session.transaction.transition(TxnState.NO_TRANSACTION);
  627. }
  628. // TODO: the following should only be applied to read operation per spec.
  629. // for causal consistency
  630. if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
  631. command.readConcern = command.readConcern || {};
  632. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  633. }
  634. return;
  635. }
  636. if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) {
  637. return new MongoError(
  638. `Read preference in a transaction must be primary, not: ${options.readPreference.mode}`
  639. );
  640. }
  641. // `autocommit` must always be false to differentiate from retryable writes
  642. command.autocommit = false;
  643. if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
  644. session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
  645. command.startTransaction = true;
  646. const readConcern =
  647. session.transaction.options.readConcern || session.clientOptions.readConcern;
  648. if (readConcern) {
  649. command.readConcern = readConcern;
  650. }
  651. if (session.supports.causalConsistency && session.operationTime) {
  652. command.readConcern = command.readConcern || {};
  653. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  654. }
  655. }
  656. }
  657. function updateSessionFromResponse(session, document) {
  658. if (document.$clusterTime) {
  659. resolveClusterTime(session, document.$clusterTime);
  660. }
  661. if (document.operationTime && session && session.supports.causalConsistency) {
  662. session.advanceOperationTime(document.operationTime);
  663. }
  664. if (document.recoveryToken && session && session.inTransaction()) {
  665. session.transaction._recoveryToken = document.recoveryToken;
  666. }
  667. }
  668. module.exports = {
  669. ClientSession,
  670. ServerSession,
  671. ServerSessionPool,
  672. TxnState,
  673. applySession,
  674. updateSessionFromResponse,
  675. commandSupportsReadConcern
  676. };