Software: Apache/2.4.41 (Ubuntu). PHP/8.0.30 uname -a: Linux apirnd 5.4.0-204-generic #224-Ubuntu SMP Thu Dec 5 13:38:28 UTC 2024 x86_64 uid=33(www-data) gid=33(www-data) groups=33(www-data) Safe-mode: OFF (not secure) /var/www/html/wincloud_gateway/node_modules/mongodb/lib/core/topologies/ drwxr-xr-x | |
| Viewing file: Select action/file-type: 'use strict';
const ReadPreference = require('./read_preference');
const TopologyType = require('../sdam/common').TopologyType;
const MongoError = require('../error').MongoError;
const isRetryableWriteError = require('../error').isRetryableWriteError;
const maxWireVersion = require('../utils').maxWireVersion;
const MongoNetworkError = require('../error').MongoNetworkError;
const MMAPv1_RETRY_WRITES_ERROR_CODE = 20;
/**
* Emit event if it exists
* @method
*/
function emitSDAMEvent(self, event, description) {
if (self.listeners(event).length > 0) {
self.emit(event, description);
}
}
function createCompressionInfo(options) {
if (!options.compression || !options.compression.compressors) {
return [];
}
// Check that all supplied compressors are valid
options.compression.compressors.forEach(function(compressor) {
if (compressor !== 'snappy' && compressor !== 'zlib') {
throw new Error('compressors must be at least one of snappy or zlib');
}
});
return options.compression.compressors;
}
function clone(object) {
return JSON.parse(JSON.stringify(object));
}
var getPreviousDescription = function(self) {
if (!self.s.serverDescription) {
self.s.serverDescription = {
address: self.name,
arbiters: [],
hosts: [],
passives: [],
type: 'Unknown'
};
}
return self.s.serverDescription;
};
var emitServerDescriptionChanged = function(self, description) {
if (self.listeners('serverDescriptionChanged').length > 0) {
// Emit the server description changed events
self.emit('serverDescriptionChanged', {
topologyId: self.s.topologyId !== -1 ? self.s.topologyId : self.id,
address: self.name,
previousDescription: getPreviousDescription(self),
newDescription: description
});
self.s.serverDescription = description;
}
};
var getPreviousTopologyDescription = function(self) {
if (!self.s.topologyDescription) {
self.s.topologyDescription = {
topologyType: 'Unknown',
servers: [
{
address: self.name,
arbiters: [],
hosts: [],
passives: [],
type: 'Unknown'
}
]
};
}
return self.s.topologyDescription;
};
var emitTopologyDescriptionChanged = function(self, description) {
if (self.listeners('topologyDescriptionChanged').length > 0) {
// Emit the server description changed events
self.emit('topologyDescriptionChanged', {
topologyId: self.s.topologyId !== -1 ? self.s.topologyId : self.id,
address: self.name,
previousDescription: getPreviousTopologyDescription(self),
newDescription: description
});
self.s.serverDescription = description;
}
};
var changedIsMaster = function(self, currentIsmaster, ismaster) {
var currentType = getTopologyType(self, currentIsmaster);
var newType = getTopologyType(self, ismaster);
if (newType !== currentType) return true;
return false;
};
var getTopologyType = function(self, ismaster) {
if (!ismaster) {
ismaster = self.ismaster;
}
if (!ismaster) return 'Unknown';
if (ismaster.ismaster && ismaster.msg === 'isdbgrid') return 'Mongos';
if (ismaster.ismaster && !ismaster.hosts) return 'Standalone';
if (ismaster.ismaster) return 'RSPrimary';
if (ismaster.secondary) return 'RSSecondary';
if (ismaster.arbiterOnly) return 'RSArbiter';
return 'Unknown';
};
var inquireServerState = function(self) {
return function(callback) {
if (self.s.state === 'destroyed') return;
// Record response time
var start = new Date().getTime();
// emitSDAMEvent
emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: self.name });
// Attempt to execute ismaster command
self.command('admin.$cmd', { ismaster: true }, { monitoring: true }, function(err, r) {
if (!err) {
// Legacy event sender
self.emit('ismaster', r, self);
// Calculate latencyMS
var latencyMS = new Date().getTime() - start;
// Server heart beat event
emitSDAMEvent(self, 'serverHeartbeatSucceeded', {
durationMS: latencyMS,
reply: r.result,
connectionId: self.name
});
// Did the server change
if (changedIsMaster(self, self.s.ismaster, r.result)) {
// Emit server description changed if something listening
emitServerDescriptionChanged(self, {
address: self.name,
arbiters: [],
hosts: [],
passives: [],
type: !self.s.inTopology ? 'Standalone' : getTopologyType(self)
});
}
// Updat ismaster view
self.s.ismaster = r.result;
// Set server response time
self.s.isMasterLatencyMS = latencyMS;
} else {
emitSDAMEvent(self, 'serverHeartbeatFailed', {
durationMS: latencyMS,
failure: err,
connectionId: self.name
});
}
// Peforming an ismaster monitoring callback operation
if (typeof callback === 'function') {
return callback(err, r);
}
// Perform another sweep
self.s.inquireServerStateTimeout = setTimeout(inquireServerState(self), self.s.haInterval);
});
};
};
//
// Clone the options
var cloneOptions = function(options) {
var opts = {};
for (var name in options) {
opts[name] = options[name];
}
return opts;
};
function Interval(fn, time) {
var timer = false;
this.start = function() {
if (!this.isRunning()) {
timer = setInterval(fn, time);
}
return this;
};
this.stop = function() {
clearInterval(timer);
timer = false;
return this;
};
this.isRunning = function() {
return timer !== false;
};
}
function Timeout(fn, time) {
var timer = false;
var func = () => {
if (timer) {
clearTimeout(timer);
timer = false;
fn();
}
};
this.start = function() {
if (!this.isRunning()) {
timer = setTimeout(func, time);
}
return this;
};
this.stop = function() {
clearTimeout(timer);
timer = false;
return this;
};
this.isRunning = function() {
return timer !== false;
};
}
function diff(previous, current) {
// Difference document
var diff = {
servers: []
};
// Previous entry
if (!previous) {
previous = { servers: [] };
}
// Check if we have any previous servers missing in the current ones
for (var i = 0; i < previous.servers.length; i++) {
var found = false;
for (var j = 0; j < current.servers.length; j++) {
if (current.servers[j].address.toLowerCase() === previous.servers[i].address.toLowerCase()) {
found = true;
break;
}
}
if (!found) {
// Add to the diff
diff.servers.push({
address: previous.servers[i].address,
from: previous.servers[i].type,
to: 'Unknown'
});
}
}
// Check if there are any severs that don't exist
for (j = 0; j < current.servers.length; j++) {
found = false;
// Go over all the previous servers
for (i = 0; i < previous.servers.length; i++) {
if (previous.servers[i].address.toLowerCase() === current.servers[j].address.toLowerCase()) {
found = true;
break;
}
}
// Add the server to the diff
if (!found) {
diff.servers.push({
address: current.servers[j].address,
from: 'Unknown',
to: current.servers[j].type
});
}
}
// Got through all the servers
for (i = 0; i < previous.servers.length; i++) {
var prevServer = previous.servers[i];
// Go through all current servers
for (j = 0; j < current.servers.length; j++) {
var currServer = current.servers[j];
// Matching server
if (prevServer.address.toLowerCase() === currServer.address.toLowerCase()) {
// We had a change in state
if (prevServer.type !== currServer.type) {
diff.servers.push({
address: prevServer.address,
from: prevServer.type,
to: currServer.type
});
}
}
}
}
// Return difference
return diff;
}
/**
* Shared function to determine clusterTime for a given topology
*
* @param {*} topology
* @param {*} clusterTime
*/
function resolveClusterTime(topology, $clusterTime) {
if (topology.clusterTime == null) {
topology.clusterTime = $clusterTime;
} else {
if ($clusterTime.clusterTime.greaterThan(topology.clusterTime.clusterTime)) {
topology.clusterTime = $clusterTime;
}
}
}
// NOTE: this is a temporary move until the topologies can be more formally refactored
// to share code.
const SessionMixins = {
endSessions: function(sessions, callback) {
if (!Array.isArray(sessions)) {
sessions = [sessions];
}
// TODO:
// When connected to a sharded cluster the endSessions command
// can be sent to any mongos. When connected to a replica set the
// endSessions command MUST be sent to the primary if the primary
// is available, otherwise it MUST be sent to any available secondary.
// Is it enough to use: ReadPreference.primaryPreferred ?
this.command(
'admin.$cmd',
{ endSessions: sessions },
{ readPreference: ReadPreference.primaryPreferred },
() => {
// intentionally ignored, per spec
if (typeof callback === 'function') callback();
}
);
}
};
function topologyType(topology) {
if (topology.description) {
return topology.description.type;
}
if (topology.type === 'mongos') {
return TopologyType.Sharded;
} else if (topology.type === 'replset') {
return TopologyType.ReplicaSetWithPrimary;
}
return TopologyType.Single;
}
const RETRYABLE_WIRE_VERSION = 6;
/**
* Determines whether the provided topology supports retryable writes
*
* @param {Mongos|Replset} topology
*/
const isRetryableWritesSupported = function(topology) {
const maxWireVersion = topology.lastIsMaster().maxWireVersion;
if (maxWireVersion < RETRYABLE_WIRE_VERSION) {
return false;
}
if (!topology.logicalSessionTimeoutMinutes) {
return false;
}
if (topologyType(topology) === TopologyType.Single) {
return false;
}
return true;
};
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';
function getMMAPError(err) {
if (err.code !== MMAPv1_RETRY_WRITES_ERROR_CODE || !err.errmsg.includes('Transaction numbers')) {
return err;
}
// According to the retryable writes spec, we must replace the error message in this case.
// We need to replace err.message so the thrown message is correct and we need to replace err.errmsg to meet the spec requirement.
const newErr = new MongoError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError: err
});
return newErr;
}
// NOTE: only used for legacy topology types
function legacyIsRetryableWriteError(err, topology) {
if (!(err instanceof MongoError)) {
return false;
}
// if pre-4.4 server, then add error label if its a retryable write error
if (
isRetryableWritesSupported(topology) &&
(err instanceof MongoNetworkError ||
(maxWireVersion(topology) < 9 && isRetryableWriteError(err)))
) {
err.addErrorLabel('RetryableWriteError');
}
return err.hasErrorLabel('RetryableWriteError');
}
module.exports = {
SessionMixins,
resolveClusterTime,
inquireServerState,
getTopologyType,
emitServerDescriptionChanged,
emitTopologyDescriptionChanged,
cloneOptions,
createCompressionInfo,
clone,
diff,
Interval,
Timeout,
isRetryableWritesSupported,
getMMAPError,
topologyType,
legacyIsRetryableWriteError
};
|
:: Command execute :: | |
--[ c99shell v. 2.5 [PHP 8 Update] [24.05.2025] | Generation time: 0.0095 ]-- |