'use strict';
const retrieveBSON = require('./connection/utils').retrieveBSON;
const EventEmitter = require('events');
const BSON = retrieveBSON();
const Binary = BSON.Binary;
const uuidV4 = require('./utils').uuidV4;
const MongoError = require('./error').MongoError;
const isRetryableError = require('././error').isRetryableError;
const MongoNetworkError = require('./error').MongoNetworkError;
const MongoWriteConcernError = require('./error').MongoWriteConcernError;
const Transaction = require('./transactions').Transaction;
const TxnState = require('./transactions').TxnState;
const isPromiseLike = require('./utils').isPromiseLike;
const ReadPreference = require('./topologies/read_preference');
const maybePromise = require('../utils').maybePromise;
const isTransactionCommand = require('./transactions').isTransactionCommand;
const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
const isSharded = require('./wireprotocol/shared').isSharded;
const maxWireVersion = require('./utils').maxWireVersion;
const now = require('./../utils').now;
const calculateDurationInMs = require('./../utils').calculateDurationInMs;
const minWireVersionForShardedTransactions = 8;
function assertAlive(session, callback) {
if (session.serverSession == null) {
const error = new MongoError('Cannot use a session that has ended');
if (typeof callback === 'function') {
callback(error, null);
return false;
}
throw error;
}
return true;
}
/**
* Options to pass when creating a Client Session
* @typedef {Object} SessionOptions
* @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session
* @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
*/
/**
* A BSON document reflecting the lsid of a {@link ClientSession}
* @typedef {Object} SessionId
*/
const kServerSession = Symbol('serverSession');
/**
* A class representing a client session on the server
* WARNING: not meant to be instantiated directly.
* @class
* @hideconstructor
*/
class ClientSession extends EventEmitter {
/**
* Create a client session.
* WARNING: not meant to be instantiated directly
*
* @param {Topology} topology The current client's topology (Internal Class)
* @param {ServerSessionPool} sessionPool The server session pool (Internal Class)
* @param {SessionOptions} [options] Optional settings
* @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver
*/
constructor(topology, sessionPool, options, clientOptions) {
super();
if (topology == null) {
throw new Error('ClientSession requires a topology');
}
if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
throw new Error('ClientSession requires a ServerSessionPool');
}
options = options || {};
clientOptions = clientOptions || {};
this.topology = topology;
this.sessionPool = sessionPool;
this.hasEnded = false;
this.clientOptions = clientOptions;
this[kServerSession] = undefined;
this.supports = {
causalConsistency:
typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true
};
this.clusterTime = options.initialClusterTime;
this.operationTime = null;
this.explicit = !!options.explicit;
this.owner = options.owner;
this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
this.transaction = new Transaction();
}
/**
* The server id associated with this session
* @type {SessionId}
*/
get id() {
return this.serverSession.id;
}
get serverSession() {
if (this[kServerSession] == null) {
this[kServerSession] = this.sessionPool.acquire();
}
return this[kServerSession];
}
/**
* Ends this session on the server
*
* @param {Object} [options] Optional settings. Currently reserved for future use
* @param {Function} [callback] Optional callback for completion of this operation
*/
endSession(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
const session = this;
return maybePromise(this, callback, done => {
if (session.hasEnded) {
return done();
}
function completeEndSession() {
// release the server session back to the pool
session.sessionPool.release(session.serverSession);
session[kServerSession] = undefined;
// mark the session as ended, and emit a signal
session.hasEnded = true;
session.emit('ended', session);
// spec indicates that we should ignore all errors for `endSessions`
done();
}
if (session.serverSession && session.inTransaction()) {
session.abortTransaction(err => {
if (err) return done(err);
completeEndSession();
});
return;
}
completeEndSession();
});
}
/**
* Advances the operationTime for a ClientSession.
*
* @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
*/
advanceOperationTime(operationTime) {
if (this.operationTime == null) {
this.operationTime = operationTime;
return;
}
if (operationTime.greaterThan(this.operationTime)) {
this.operationTime = operationTime;
}
}
/**
* Used to determine if this session equals another
* @param {ClientSession} session
* @return {boolean} true if the sessions are equal
*/
equals(session) {
if (!(session instanceof ClientSession)) {
return false;
}
return this.id.id.buffer.equals(session.id.id.buffer);
}
/**
* Increment the transaction number on the internal ServerSession
*/
incrementTransactionNumber() {
this.serverSession.txnNumber++;
}
/**
* @returns {boolean} whether this session is currently in a transaction or not
*/
inTransaction() {
return this.transaction.isActive;
}
/**
* Starts a new transaction with the given options.
*
* @param {TransactionOptions} options Options for the transaction
*/
startTransaction(options) {
assertAlive(this);
if (this.inTransaction()) {
throw new MongoError('Transaction already in progress');
}
const topologyMaxWireVersion = maxWireVersion(this.topology);
if (
isSharded(this.topology) &&
topologyMaxWireVersion != null &&
topologyMaxWireVersion < minWireVersionForShardedTransactions
) {
throw new MongoError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
}
// increment txnNumber
this.incrementTransactionNumber();
// create transaction state
this.transaction = new Transaction(
Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
);
this.transaction.transition(TxnState.STARTING_TRANSACTION);
}
/**
* Commits the currently active transaction in this session.
*
* @param {Function} [callback] optional callback for completion of this operation
* @return {Promise} A promise is returned if no callback is provided
*/
commitTransaction(callback) {
return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done));
}
/**
* Aborts the currently active transaction in this session.
*
* @param {Function} [callback] optional callback for completion of this operation
* @return {Promise} A promise is returned if no callback is provided
*/
abortTransaction(callback) {
return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done));
}
/**
* This is here to ensure that ClientSession is never serialized to BSON.
* @ignore
*/
toBSON() {
throw new Error('ClientSession cannot be serialized to BSON.');
}
/**
* A user provided function to be run within a transaction
*
* @callback WithTransactionCallback
* @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
* @returns {Promise} The resulting Promise of operations run within this transaction
*/
/**
* Runs a provided lambda within a transaction, retrying either the commit operation
* or entire transaction as needed (and when the error permits) to better ensure that
* the transaction can complete successfully.
*
* IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
* return a Promise will result in undefined behavior.
*
* @param {WithTransactionCallback} fn
* @param {TransactionOptions} [options] Optional settings for the transaction
*/
withTransaction(fn, options) {
const startTime = now();
return attemptTransaction(this, startTime, fn, options);
}
}
const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
const MAX_TIME_MS_EXPIRED_CODE = 50;
const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
'CannotSatisfyWriteConcern',
'UnknownReplWriteConcern',
'UnsatisfiableWriteConcern'
]);
function hasNotTimedOut(startTime, max) {
return calculateDurationInMs(startTime) < max;
}
function isUnknownTransactionCommitResult(err) {
return (
isMaxTimeMSExpiredError(err) ||
(!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE)
);
}
function isMaxTimeMSExpiredError(err) {
if (err == null) return false;
return (
err.code === MAX_TIME_MS_EXPIRED_CODE ||
(err.writeConcernError && err.writeConcernError.code === MAX_TIME_MS_EXPIRED_CODE)
);
}
function attemptTransactionCommit(session, startTime, fn, options) {
return session.commitTransaction().catch(err => {
if (
err instanceof MongoError &&
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
!isMaxTimeMSExpiredError(err)
) {
if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
return attemptTransactionCommit(session, startTime, fn, options);
}
if (err.hasErrorLabel('TransientTransactionError')) {
return attemptTransaction(session, startTime, fn, options);
}
}
throw err;
});
}
const USER_EXPLICIT_TXN_END_STATES = new Set([
TxnState.NO_TRANSACTION,
TxnState.TRANSACTION_COMMITTED,
TxnState.TRANSACTION_ABORTED
]);
function userExplicitlyEndedTransaction(session) {
return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
}
function attemptTransaction(session, startTime, fn, options) {
session.startTransaction(options);
let promise;
try {
promise = fn(session);
} catch (err) {
promise = Promise.reject(err);
}
if (!isPromiseLike(promise)) {
session.abortTransaction();
throw new TypeError('Function provided to `withTransaction` must return a Promise');
}
return promise
.then(() => {
if (userExplicitlyEndedTransaction(session)) {
return;
}
return attemptTransactionCommit(session, startTime, fn, options);
})
.catch(err => {
function maybeRetryOrThrow(err) {
if (
err instanceof MongoError &&
err.hasErrorLabel('TransientTransactionError') &&
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
) {
return attemptTransaction(session, startTime, fn, options);
}
if (isMaxTimeMSExpiredError(err)) {
err.addErrorLabel('UnknownTransactionCommitResult');
}
throw err;
}
if (session.transaction.isActive) {
return session.abortTransaction().then(() => maybeRetryOrThrow(err));
}
return maybeRetryOrThrow(err);
});
}
function endTransaction(session, commandName, callback) {
if (!assertAlive(session, callback)) {
// checking result in case callback was called
return;
}
// handle any initial problematic cases
let txnState = session.transaction.state;
if (txnState === TxnState.NO_TRANSACTION) {
callback(new MongoError('No transaction started'));
return;
}
if (commandName === 'commitTransaction') {
if (
txnState === TxnState.STARTING_TRANSACTION ||
txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
) {
// the transaction was never started, we can safely exit here
session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
callback(null, null);
return;
}
if (txnState === TxnState.TRANSACTION_ABORTED) {
callback(new MongoError('Cannot call commitTransaction after calling abortTransaction'));
return;
}
} else {
if (txnState === TxnState.STARTING_TRANSACTION) {
// the transaction was never started, we can safely exit here
session.transaction.transition(TxnState.TRANSACTION_ABORTED);
callback(null, null);
return;
}
if (txnState === TxnState.TRANSACTION_ABORTED) {
callback(new MongoError('Cannot call abortTransaction twice'));
return;
}
if (
txnState === TxnState.TRANSACTION_COMMITTED ||
txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
) {
callback(new MongoError('Cannot call abortTransaction after calling commitTransaction'));
return;
}
}
// construct and send the command
const command = { [commandName]: 1 };
// apply a writeConcern if specified
let writeConcern;
if (session.transaction.options.writeConcern) {
writeConcern = Object.assign({}, session.transaction.options.writeConcern);
} else if (session.clientOptions && session.clientOptions.w) {
writeConcern = { w: session.clientOptions.w };
}
if (txnState === TxnState.TRANSACTION_COMMITTED) {
writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
}
if (writeConcern) {
Object.assign(command, { writeConcern });
}
if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
}
function commandHandler(e, r) {
if (commandName === 'commitTransaction') {
session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
if (e) {
if (
e instanceof MongoNetworkError ||
e instanceof MongoWriteConcernError ||
isRetryableError(e) ||
isMaxTimeMSExpiredError(e)
) {
if (isUnknownTransactionCommitResult(e)) {
e.addErrorLabel('UnknownTransactionCommitResult');
// per txns spec, must unpin session in this case
session.transaction.unpinServer();
}
} else if (e.hasErrorLabel('TransientTransactionError')) {
session.transaction.unpinServer();
}
}
} else {
session.transaction.transition(TxnState.TRANSACTION_ABORTED);
}
callback(e, r);
}
// The spec indicates that we should ignore all errors on `abortTransaction`
function transactionError(err) {
return commandName === 'commitTransaction' ? err : null;
}
if (
// Assumption here that commandName is "commitTransaction" or "abortTransaction"
session.transaction.recoveryToken &&
supportsRecoveryToken(session)
) {
command.recoveryToken = session.transaction.recoveryToken;
}
// send the command
session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
if (err && isRetryableError(err)) {
// SPEC-1185: apply majority write concern when retrying commitTransaction
if (command.commitTransaction) {
// per txns spec, must unpin session in this case
session.transaction.unpinServer();
command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
w: 'majority'
});
}
return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>
commandHandler(transactionError(_err), _reply)
);
}
commandHandler(transactionError(err), reply);
});
}
function supportsRecoveryToken(session) {
const topology = session.topology;
return !!topology.s.options.useRecoveryToken;
}
/**
* Reflects the existence of a session on the server. Can be reused by the session pool.
* WARNING: not meant to be instantiated directly. For internal use only.
* @ignore
*/
class ServerSession {
constructor() {
this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
this.lastUse = now();
this.txnNumber = 0;
this.isDirty = false;
}
/**
* Determines if the server session has timed out.
* @ignore
* @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes"
* @return {boolean} true if the session has timed out.
*/
hasTimedOut(sessionTimeoutMinutes) {
// Take the difference of the lastUse timestamp and now, which will result in a value in
// milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
const idleTimeMinutes = Math.round(
((calculateDurationInMs(this.lastUse) % 86400000) % 3600000) / 60000
);
return idleTimeMinutes > sessionTimeoutMinutes - 1;
}
}
/**
* Maintains a pool of Server Sessions.
* For internal use only
* @ignore
*/
class ServerSessionPool {
constructor(topology) {
if (topology == null) {
throw new Error('ServerSessionPool requires a topology');
}
this.topology = topology;
this.sessions = [];
}
/**
* Ends all sessions in the session pool.
* @ignore
*/
endAllPooledSessions(callback) {
if (this.sessions.length) {
this.topology.endSessions(
this.sessions.map(session => session.id),
() => {
this.sessions = [];
if (typeof callback === 'function') {
callback();
}
}
);
return;
}
if (typeof callback === 'function') {
callback();
}
}
/**
* Acquire a Server Session from the pool.
* Iterates through each session in the pool, removing any stale sessions
* along the way. The first non-stale session found is removed from the
* pool and returned. If no non-stale session is found, a new ServerSession
* is created.
* @ignore
* @returns {ServerSession}
*/
acquire() {
const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
while (this.sessions.length) {
const session = this.sessions.shift();
if (!session.hasTimedOut(sessionTimeoutMinutes)) {
return session;
}
}
return new ServerSession();
}
/**
* Release a session to the session pool
* Adds the session back to the session pool if the session has not timed out yet.
* This method also removes any stale sessions from the pool.
* @ignore
* @param {ServerSession} session The session to release to the pool
*/
release(session) {
const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
while (this.sessions.length) {
const pooledSession = this.sessions[this.sessions.length - 1];
if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
this.sessions.pop();
} else {
break;
}
}
if (!session.hasTimedOut(sessionTimeoutMinutes)) {
if (session.isDirty) {
return;
}
// otherwise, readd this session to the session pool
this.sessions.unshift(session);
}
}
}
// TODO: this should be codified in command construction
// @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
function commandSupportsReadConcern(command, options) {
if (
command.aggregate ||
command.count ||
command.distinct ||
command.find ||
command.parallelCollectionScan ||
command.geoNear ||
command.geoSearch
) {
return true;
}
if (
command.mapReduce &&
options &&
options.out &&
(options.out.inline === 1 || options.out === 'inline')
) {
return true;
}
return false;
}
/**
* Optionally decorate a command with sessions specific keys
*
* @ignore
* @param {ClientSession} session the session tracking transaction state
* @param {Object} command the command to decorate
* @param {Object} topology the topology for tracking the cluster time
* @param {Object} [options] Optional settings passed to calling operation
* @return {MongoError|null} An error, if some error condition was met
*/
function applySession(session, command, options) {
if (session.hasEnded) {
// TODO: merge this with `assertAlive`, did not want to throw a try/catch here
return new MongoError('Cannot use a session that has ended');
}
// SPEC-1019: silently ignore explicit session with unacknowledged write for backwards compatibility
if (options && options.writeConcern && options.writeConcern.w === 0) {
return;
}
const serverSession = session.serverSession;
serverSession.lastUse = now();
command.lsid = serverSession.id;
// first apply non-transaction-specific sessions data
const inTransaction = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = options.willRetryWrite;
const shouldApplyReadConcern = commandSupportsReadConcern(command, options);
if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
}
// now attempt to apply transaction-specific sessions data
if (!inTransaction) {
if (session.transaction.state !== TxnState.NO_TRANSACTION) {
session.transaction.transition(TxnState.NO_TRANSACTION);
}
// TODO: the following should only be applied to read operation per spec.
// for causal consistency
if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
command.readConcern = command.readConcern || {};
Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
}
return;
}
if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) {
return new MongoError(
`Read preference in a transaction must be primary, not: ${options.readPreference.mode}`
);
}
// `autocommit` must always be false to differentiate from retryable writes
command.autocommit = false;
if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
command.startTransaction = true;
const readConcern =
session.transaction.options.readConcern || session.clientOptions.readConcern;
if (readConcern) {
command.readConcern = readConcern;
}
if (session.supports.causalConsistency && session.operationTime) {
command.readConcern = command.readConcern || {};
Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
}
}
}
function updateSessionFromResponse(session, document) {
if (document.$clusterTime) {
resolveClusterTime(session, document.$clusterTime);
}
if (document.operationTime && session && session.supports.causalConsistency) {
session.advanceOperationTime(document.operationTime);
}
if (document.recoveryToken && session && session.inTransaction()) {
session.transaction._recoveryToken = document.recoveryToken;
}
}
module.exports = {
ClientSession,
ServerSession,
ServerSessionPool,
TxnState,
applySession,
updateSessionFromResponse,
commandSupportsReadConcern
};