Source: lib/replset.js

  1. "use strict";
  2. var EventEmitter = require('events').EventEmitter
  3. , inherits = require('util').inherits
  4. , f = require('util').format
  5. , Server = require('./server')
  6. , Cursor = require('./cursor')
  7. , AggregationCursor = require('./aggregation_cursor')
  8. , CommandCursor = require('./command_cursor')
  9. , ReadPreference = require('./read_preference')
  10. , MongoError = require('mongodb-core').MongoError
  11. , ServerCapabilities = require('./topology_base').ServerCapabilities
  12. , Store = require('./topology_base').Store
  13. , Define = require('./metadata')
  14. , CReplSet = require('mongodb-core').ReplSet
  15. , CoreReadPreference = require('mongodb-core').ReadPreference
  16. , MAX_JS_INT = require('./utils').MAX_JS_INT
  17. , translateOptions = require('./utils').translateOptions
  18. , filterOptions = require('./utils').filterOptions
  19. , getReadPreference = require('./utils').getReadPreference
  20. , mergeOptions = require('./utils').mergeOptions
  21. , os = require('os');
  22. /**
  23. * @fileOverview The **ReplSet** class is a class that represents a Replicaset topology and is
  24. * used to construct connections.
  25. *
  26. * **ReplSet Should not be used, use MongoClient.connect**
  27. * @example
  28. * var Db = require('mongodb').Db,
  29. * ReplSet = require('mongodb').ReplSet,
  30. * Server = require('mongodb').Server,
  31. * test = require('assert');
  32. * // Connect using ReplSet
  33. * var server = new Server('localhost', 27017);
  34. * var db = new Db('test', new ReplSet([server]));
  35. * db.open(function(err, db) {
  36. * // Get an additional db
  37. * db.close();
  38. * });
  39. */
  40. // Allowed parameters
  41. var legalOptionNames = ['ha', 'haInterval', 'replicaSet', 'rs_name', 'secondaryAcceptableLatencyMS'
  42. , 'connectWithNoPrimary', 'poolSize', 'ssl', 'checkServerIdentity', 'sslValidate'
  43. , 'sslCA', 'sslCert', 'sslCRL', 'sslKey', 'sslPass', 'socketOptions', 'bufferMaxEntries'
  44. , 'store', 'auto_reconnect', 'autoReconnect', 'emitError'
  45. , 'keepAlive', 'noDelay', 'connectTimeoutMS', 'socketTimeoutMS', 'strategy', 'debug', 'family'
  46. , 'loggerLevel', 'logger', 'reconnectTries', 'appname', 'domainsEnabled'
  47. , 'servername', 'promoteLongs', 'promoteValues', 'promoteBuffers', 'maxStalenessSeconds'];
  48. // Get package.json variable
  49. var driverVersion = require('../package.json').version;
  50. var nodejsversion = f('Node.js %s, %s', process.version, os.endianness());
  51. var type = os.type();
  52. var name = process.platform;
  53. var architecture = process.arch;
  54. var release = os.release();
  55. /**
  56. * Creates a new ReplSet instance
  57. * @class
  58. * @deprecated
  59. * @param {Server[]} servers A seedlist of servers participating in the replicaset.
  60. * @param {object} [options=null] Optional settings.
  61. * @param {boolean} [options.ha=true] Turn on high availability monitoring.
  62. * @param {number} [options.haInterval=10000] Time between each replicaset status check.
  63. * @param {string} [options.replicaSet] The name of the replicaset to connect to.
  64. * @param {number} [options.secondaryAcceptableLatencyMS=15] Sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms)
  65. * @param {boolean} [options.connectWithNoPrimary=false] Sets if the driver should connect even if no primary is available
  66. * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
  67. * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support)
  68. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  69. * @param {object} [options.sslValidate=true] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
  70. * @param {array} [options.sslCA=null] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
  71. * @param {array} [options.sslCRL=null] Array of revocation certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
  72. * @param {(Buffer|string)} [options.sslCert=null] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  73. * @param {(Buffer|string)} [options.sslKey=null] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  74. * @param {(Buffer|string)} [options.sslPass=null] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
  75. * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
  76. * @param {object} [options.socketOptions=null] Socket options
  77. * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option.
  78. * @param {number} [options.socketOptions.keepAlive=0] TCP KeepAlive on the socket with a X ms delay before start.
  79. * @param {number} [options.socketOptions.connectTimeoutMS=10000] TCP Connection timeout setting
  80. * @param {number} [options.socketOptions.socketTimeoutMS=0] TCP Socket timeout setting
  81. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  82. * @param {number} [options.maxStalenessSeconds=undefined] The max staleness to secondary reads (values under 10 seconds cannot be guaranteed);
  83. * @fires ReplSet#connect
  84. * @fires ReplSet#ha
  85. * @fires ReplSet#joined
  86. * @fires ReplSet#left
  87. * @fires ReplSet#fullsetup
  88. * @fires ReplSet#open
  89. * @fires ReplSet#close
  90. * @fires ReplSet#error
  91. * @fires ReplSet#timeout
  92. * @fires ReplSet#parseError
  93. * @property {string} parserType the parser type used (c++ or js).
  94. * @return {ReplSet} a ReplSet instance.
  95. */
  96. var ReplSet = function(servers, options) {
  97. if(!(this instanceof ReplSet)) return new ReplSet(servers, options);
  98. options = options || {};
  99. var self = this;
  100. // Set up event emitter
  101. EventEmitter.call(this);
  102. // Filter the options
  103. options = filterOptions(options, legalOptionNames);
  104. // Ensure all the instances are Server
  105. for(var i = 0; i < servers.length; i++) {
  106. if(!(servers[i] instanceof Server)) {
  107. throw MongoError.create({message: "all seed list instances must be of the Server type", driver:true});
  108. }
  109. }
  110. // Stored options
  111. var storeOptions = {
  112. force: false
  113. , bufferMaxEntries: typeof options.bufferMaxEntries == 'number' ? options.bufferMaxEntries : MAX_JS_INT
  114. }
  115. // Shared global store
  116. var store = options.store || new Store(self, storeOptions);
  117. // Build seed list
  118. var seedlist = servers.map(function(x) {
  119. return {host: x.host, port: x.port}
  120. });
  121. // Clone options
  122. var clonedOptions = mergeOptions({}, {
  123. disconnectHandler: store,
  124. cursorFactory: Cursor,
  125. reconnect: false,
  126. emitError: typeof options.emitError == 'boolean' ? options.emitError : true,
  127. size: typeof options.poolSize == 'number' ? options.poolSize : 5
  128. });
  129. // Translate any SSL options and other connectivity options
  130. clonedOptions = translateOptions(clonedOptions, options);
  131. // Socket options
  132. var socketOptions = options.socketOptions && Object.keys(options.socketOptions).length > 0
  133. ? options.socketOptions : options;
  134. // Translate all the options to the mongodb-core ones
  135. clonedOptions = translateOptions(clonedOptions, socketOptions);
  136. if(typeof clonedOptions.keepAlive == 'number') {
  137. clonedOptions.keepAliveInitialDelay = clonedOptions.keepAlive;
  138. clonedOptions.keepAlive = clonedOptions.keepAlive > 0;
  139. }
  140. // Client info
  141. this.clientInfo = {
  142. driver: {
  143. name: "nodejs",
  144. version: driverVersion
  145. },
  146. os: {
  147. type: type,
  148. name: name,
  149. architecture: architecture,
  150. version: release
  151. },
  152. platform: nodejsversion
  153. }
  154. // Build default client information
  155. clonedOptions.clientInfo = this.clientInfo;
  156. // Do we have an application specific string
  157. if(options.appname) {
  158. clonedOptions.clientInfo.application = { name: options.appname };
  159. }
  160. // Create the ReplSet
  161. var replset = new CReplSet(seedlist, clonedOptions);
  162. // Listen to reconnect event
  163. replset.on('reconnect', function() {
  164. self.emit('reconnect');
  165. store.execute();
  166. });
  167. // Internal state
  168. this.s = {
  169. // Replicaset
  170. replset: replset
  171. // Server capabilities
  172. , sCapabilities: null
  173. // Debug tag
  174. , tag: options.tag
  175. // Store options
  176. , storeOptions: storeOptions
  177. // Cloned options
  178. , clonedOptions: clonedOptions
  179. // Store
  180. , store: store
  181. // Options
  182. , options: options
  183. }
  184. // Debug
  185. if(clonedOptions.debug) {
  186. // Last ismaster
  187. Object.defineProperty(this, 'replset', {
  188. enumerable:true, get: function() { return replset; }
  189. });
  190. }
  191. }
  192. /**
  193. * @ignore
  194. */
  195. inherits(ReplSet, EventEmitter);
  196. // Last ismaster
  197. Object.defineProperty(ReplSet.prototype, 'isMasterDoc', {
  198. enumerable:true, get: function() { return this.s.replset.lastIsMaster(); }
  199. });
  200. Object.defineProperty(ReplSet.prototype, 'parserType', {
  201. enumerable:true, get: function() {
  202. return this.s.replset.parserType;
  203. }
  204. });
  205. // BSON property
  206. Object.defineProperty(ReplSet.prototype, 'bson', {
  207. enumerable: true, get: function() {
  208. return this.s.replset.s.bson;
  209. }
  210. });
  211. Object.defineProperty(ReplSet.prototype, 'haInterval', {
  212. enumerable:true, get: function() { return this.s.replset.s.haInterval; }
  213. });
  214. var define = ReplSet.define = new Define('ReplSet', ReplSet, false);
  215. // Ensure the right read Preference object
  216. var translateReadPreference = function(options) {
  217. if(typeof options.readPreference == 'string') {
  218. options.readPreference = new CoreReadPreference(options.readPreference);
  219. } else if(options.readPreference instanceof ReadPreference) {
  220. options.readPreference = new CoreReadPreference(options.readPreference.mode
  221. , options.readPreference.tags, {maxStalenessSeconds: options.readPreference.maxStalenessSeconds});
  222. }
  223. return options;
  224. }
  225. // Connect method
  226. ReplSet.prototype.connect = function(db, _options, callback) {
  227. var self = this;
  228. if('function' === typeof _options) callback = _options, _options = {};
  229. if(_options == null) _options = {};
  230. if(!('function' === typeof callback)) callback = null;
  231. self.s.options = _options;
  232. // Update bufferMaxEntries
  233. self.s.storeOptions.bufferMaxEntries = db.bufferMaxEntries;
  234. // Actual handler
  235. var errorHandler = function(event) {
  236. return function(err) {
  237. if(event != 'error') {
  238. self.emit(event, err);
  239. }
  240. }
  241. }
  242. // Clear out all the current handlers left over
  243. var events = ["timeout", "error", "close", 'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted',
  244. 'serverHeartbeatSucceeded', 'serverHeartbeatFailed', 'serverClosed', 'topologyOpening',
  245. 'topologyClosed', 'topologyDescriptionChanged', 'joined', 'left', 'ping', 'ha'];
  246. events.forEach(function(e) {
  247. self.s.replset.removeAllListeners(e);
  248. });
  249. // relay the event
  250. var relay = function(event) {
  251. return function(t, server) {
  252. self.emit(event, t, server);
  253. }
  254. }
  255. // Replset events relay
  256. var replsetRelay = function(event) {
  257. return function(t, server) {
  258. self.emit(event, t, server.lastIsMaster(), server);
  259. }
  260. }
  261. // Relay ha
  262. var relayHa = function(t, state) {
  263. self.emit('ha', t, state);
  264. if(t == 'start') {
  265. self.emit('ha_connect', t, state);
  266. } else if(t == 'end') {
  267. self.emit('ha_ismaster', t, state);
  268. }
  269. }
  270. // Set up serverConfig listeners
  271. self.s.replset.on('joined', replsetRelay('joined'));
  272. self.s.replset.on('left', relay('left'));
  273. self.s.replset.on('ping', relay('ping'));
  274. self.s.replset.on('ha', relayHa);
  275. // Set up SDAM listeners
  276. self.s.replset.on('serverDescriptionChanged', relay('serverDescriptionChanged'));
  277. self.s.replset.on('serverHeartbeatStarted', relay('serverHeartbeatStarted'));
  278. self.s.replset.on('serverHeartbeatSucceeded', relay('serverHeartbeatSucceeded'));
  279. self.s.replset.on('serverHeartbeatFailed', relay('serverHeartbeatFailed'));
  280. self.s.replset.on('serverOpening', relay('serverOpening'));
  281. self.s.replset.on('serverClosed', relay('serverClosed'));
  282. self.s.replset.on('topologyOpening', relay('topologyOpening'));
  283. self.s.replset.on('topologyClosed', relay('topologyClosed'));
  284. self.s.replset.on('topologyDescriptionChanged', relay('topologyDescriptionChanged'));
  285. self.s.replset.on('fullsetup', function() {
  286. self.emit('fullsetup', self, self);
  287. });
  288. self.s.replset.on('all', function() {
  289. self.emit('all', null, self);
  290. });
  291. // Connect handler
  292. var connectHandler = function() {
  293. // Set up listeners
  294. self.s.replset.once('timeout', errorHandler('timeout'));
  295. self.s.replset.once('error', errorHandler('error'));
  296. self.s.replset.once('close', errorHandler('close'));
  297. // Emit open event
  298. self.emit('open', null, self);
  299. // Return correctly
  300. try {
  301. callback(null, self);
  302. } catch(err) {
  303. process.nextTick(function() { throw err; })
  304. }
  305. }
  306. // Error handler
  307. var connectErrorHandler = function() {
  308. return function(err) {
  309. ['timeout', 'error', 'close'].forEach(function(e) {
  310. self.s.replset.removeListener(e, connectErrorHandler);
  311. });
  312. self.s.replset.removeListener('connect', connectErrorHandler);
  313. // Destroy the replset
  314. self.s.replset.destroy();
  315. // Try to callback
  316. try {
  317. callback(err);
  318. } catch(err) {
  319. if(!self.s.replset.isConnected())
  320. process.nextTick(function() { throw err; })
  321. }
  322. }
  323. }
  324. // Set up listeners
  325. self.s.replset.once('timeout', connectErrorHandler('timeout'));
  326. self.s.replset.once('error', connectErrorHandler('error'));
  327. self.s.replset.once('close', connectErrorHandler('close'));
  328. self.s.replset.once('connect', connectHandler);
  329. // Start connection
  330. self.s.replset.connect(_options);
  331. }
  332. // Server capabilities
  333. ReplSet.prototype.capabilities = function() {
  334. if(this.s.sCapabilities) return this.s.sCapabilities;
  335. if(this.s.replset.lastIsMaster() == null) return null;
  336. this.s.sCapabilities = new ServerCapabilities(this.s.replset.lastIsMaster());
  337. return this.s.sCapabilities;
  338. }
  339. define.classMethod('capabilities', {callback: false, promise:false, returns: [ServerCapabilities]});
  340. // Command
  341. ReplSet.prototype.command = function(ns, cmd, options, callback) {
  342. this.s.replset.command(ns, cmd, getReadPreference(options), callback);
  343. }
  344. define.classMethod('command', {callback: true, promise:false});
  345. // Insert
  346. ReplSet.prototype.insert = function(ns, ops, options, callback) {
  347. this.s.replset.insert(ns, ops, options, callback);
  348. }
  349. define.classMethod('insert', {callback: true, promise:false});
  350. // Update
  351. ReplSet.prototype.update = function(ns, ops, options, callback) {
  352. this.s.replset.update(ns, ops, options, callback);
  353. }
  354. define.classMethod('update', {callback: true, promise:false});
  355. // Remove
  356. ReplSet.prototype.remove = function(ns, ops, options, callback) {
  357. this.s.replset.remove(ns, ops, options, callback);
  358. }
  359. define.classMethod('remove', {callback: true, promise:false});
  360. // Destroyed
  361. ReplSet.prototype.isDestroyed = function() {
  362. return this.s.replset.isDestroyed();
  363. }
  364. // IsConnected
  365. ReplSet.prototype.isConnected = function(options) {
  366. options = options || {};
  367. // If we passed in a readPreference, translate to
  368. // a CoreReadPreference instance
  369. if(options.readPreference) {
  370. options.readPreference = translateReadPreference(options.readPreference);
  371. }
  372. return this.s.replset.isConnected(options);
  373. }
  374. define.classMethod('isConnected', {callback: false, promise:false, returns: [Boolean]});
  375. // Insert
  376. ReplSet.prototype.cursor = function(ns, cmd, options) {
  377. options = translateReadPreference(options);
  378. options.disconnectHandler = this.s.store;
  379. return this.s.replset.cursor(ns, cmd, options);
  380. }
  381. define.classMethod('cursor', {callback: false, promise:false, returns: [Cursor, AggregationCursor, CommandCursor]});
  382. ReplSet.prototype.lastIsMaster = function() {
  383. return this.s.replset.lastIsMaster();
  384. }
  385. /**
  386. * Unref all sockets
  387. * @method
  388. */
  389. ReplSet.prototype.unref = function() {
  390. return this.s.replset.unref();
  391. }
  392. ReplSet.prototype.close = function(forceClosed) {
  393. var self = this;
  394. // Call destroy on the topology
  395. this.s.replset.destroy({
  396. force: typeof forceClosed == 'boolean' ? forceClosed : false,
  397. });
  398. // We need to wash out all stored processes
  399. if(forceClosed == true) {
  400. this.s.storeOptions.force = forceClosed;
  401. this.s.store.flush();
  402. }
  403. var events = ['timeout', 'error', 'close', 'joined', 'left'];
  404. events.forEach(function(e) {
  405. self.removeAllListeners(e);
  406. });
  407. }
  408. define.classMethod('close', {callback: false, promise:false});
  409. ReplSet.prototype.auth = function() {
  410. var args = Array.prototype.slice.call(arguments, 0);
  411. this.s.replset.auth.apply(this.s.replset, args);
  412. }
  413. define.classMethod('auth', {callback: true, promise:false});
  414. ReplSet.prototype.logout = function() {
  415. var args = Array.prototype.slice.call(arguments, 0);
  416. this.s.replset.logout.apply(this.s.replset, args);
  417. }
  418. define.classMethod('logout', {callback: true, promise:false});
  419. /**
  420. * All raw connections
  421. * @method
  422. * @return {array}
  423. */
  424. ReplSet.prototype.connections = function() {
  425. return this.s.replset.connections();
  426. }
  427. define.classMethod('connections', {callback: false, promise:false, returns:[Array]});
  428. /**
  429. * A replset connect event, used to verify that the connection is up and running
  430. *
  431. * @event ReplSet#connect
  432. * @type {ReplSet}
  433. */
  434. /**
  435. * The replset high availability event
  436. *
  437. * @event ReplSet#ha
  438. * @type {function}
  439. * @param {string} type The stage in the high availability event (start|end)
  440. * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
  441. * @param {number} data.id The id for this high availability request
  442. * @param {object} data.state An object containing the information about the current replicaset
  443. */
  444. /**
  445. * A server member left the replicaset
  446. *
  447. * @event ReplSet#left
  448. * @type {function}
  449. * @param {string} type The type of member that left (primary|secondary|arbiter)
  450. * @param {Server} server The server object that left
  451. */
  452. /**
  453. * A server member joined the replicaset
  454. *
  455. * @event ReplSet#joined
  456. * @type {function}
  457. * @param {string} type The type of member that joined (primary|secondary|arbiter)
  458. * @param {Server} server The server object that joined
  459. */
  460. /**
  461. * ReplSet open event, emitted when replicaset can start processing commands.
  462. *
  463. * @event ReplSet#open
  464. * @type {Replset}
  465. */
  466. /**
  467. * ReplSet fullsetup event, emitted when all servers in the topology have been connected to.
  468. *
  469. * @event ReplSet#fullsetup
  470. * @type {Replset}
  471. */
  472. /**
  473. * ReplSet close event
  474. *
  475. * @event ReplSet#close
  476. * @type {object}
  477. */
  478. /**
  479. * ReplSet error event, emitted if there is an error listener.
  480. *
  481. * @event ReplSet#error
  482. * @type {MongoError}
  483. */
  484. /**
  485. * ReplSet timeout event
  486. *
  487. * @event ReplSet#timeout
  488. * @type {object}
  489. */
  490. /**
  491. * ReplSet parseError event
  492. *
  493. * @event ReplSet#parseError
  494. * @type {object}
  495. */
  496. module.exports = ReplSet;