Software: Apache/2.4.41 (Ubuntu). PHP/8.0.30 uname -a: Linux apirnd 5.4.0-204-generic #224-Ubuntu SMP Thu Dec 5 13:38:28 UTC 2024 x86_64 uid=33(www-data) gid=33(www-data) groups=33(www-data) Safe-mode: OFF (not secure) /var/www/html/wincloud_gateway/node_modules/mongodb/lib/core/sdam/ drwxr-xr-x | |
| Viewing file: Select action/file-type: 'use strict';
const ServerType = require('./common').ServerType;
const TopologyType = require('./common').TopologyType;
const ReadPreference = require('../topologies/read_preference');
const MongoError = require('../error').MongoError;
// max staleness constants
const IDLE_WRITE_PERIOD = 10000;
const SMALLEST_MAX_STALENESS_SECONDS = 90;
/**
* Returns a server selector that selects for writable servers
*/
function writableServerSelector() {
return function(topologyDescription, servers) {
return latencyWindowReducer(
topologyDescription,
servers.filter(s => s.isWritable)
);
};
}
/**
* Reduces the passed in array of servers by the rules of the "Max Staleness" specification
* found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst
*
* @param {ReadPreference} readPreference The read preference providing max staleness guidance
* @param {topologyDescription} topologyDescription The topology description
* @param {ServerDescription[]} servers The list of server descriptions to be reduced
* @return {ServerDescription[]} The list of servers that satisfy the requirements of max staleness
*/
function maxStalenessReducer(readPreference, topologyDescription, servers) {
if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {
return servers;
}
const maxStaleness = readPreference.maxStalenessSeconds;
const maxStalenessVariance =
(topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000;
if (maxStaleness < maxStalenessVariance) {
throw new MongoError(`maxStalenessSeconds must be at least ${maxStalenessVariance} seconds`);
}
if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) {
throw new MongoError(
`maxStalenessSeconds must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`
);
}
if (topologyDescription.type === TopologyType.ReplicaSetWithPrimary) {
const primary = Array.from(topologyDescription.servers.values()).filter(primaryFilter)[0];
return servers.reduce((result, server) => {
const stalenessMS =
server.lastUpdateTime -
server.lastWriteDate -
(primary.lastUpdateTime - primary.lastWriteDate) +
topologyDescription.heartbeatFrequencyMS;
const staleness = stalenessMS / 1000;
if (staleness <= readPreference.maxStalenessSeconds) result.push(server);
return result;
}, []);
}
if (topologyDescription.type === TopologyType.ReplicaSetNoPrimary) {
if (servers.length === 0) {
return servers;
}
const sMax = servers.reduce((max, s) => (s.lastWriteDate > max.lastWriteDate ? s : max));
return servers.reduce((result, server) => {
const stalenessMS =
sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;
const staleness = stalenessMS / 1000;
if (staleness <= readPreference.maxStalenessSeconds) result.push(server);
return result;
}, []);
}
return servers;
}
/**
* Determines whether a server's tags match a given set of tags
*
* @param {String[]} tagSet The requested tag set to match
* @param {String[]} serverTags The server's tags
*/
function tagSetMatch(tagSet, serverTags) {
const keys = Object.keys(tagSet);
const serverTagKeys = Object.keys(serverTags);
for (let i = 0; i < keys.length; ++i) {
const key = keys[i];
if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {
return false;
}
}
return true;
}
/**
* Reduces a set of server descriptions based on tags requested by the read preference
*
* @param {ReadPreference} readPreference The read preference providing the requested tags
* @param {ServerDescription[]} servers The list of server descriptions to reduce
* @return {ServerDescription[]} The list of servers matching the requested tags
*/
function tagSetReducer(readPreference, servers) {
if (
readPreference.tags == null ||
(Array.isArray(readPreference.tags) && readPreference.tags.length === 0)
) {
return servers;
}
for (let i = 0; i < readPreference.tags.length; ++i) {
const tagSet = readPreference.tags[i];
const serversMatchingTagset = servers.reduce((matched, server) => {
if (tagSetMatch(tagSet, server.tags)) matched.push(server);
return matched;
}, []);
if (serversMatchingTagset.length) {
return serversMatchingTagset;
}
}
return [];
}
/**
* Reduces a list of servers to ensure they fall within an acceptable latency window. This is
* further specified in the "Server Selection" specification, found here:
* https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst
*
* @param {topologyDescription} topologyDescription The topology description
* @param {ServerDescription[]} servers The list of servers to reduce
* @returns {ServerDescription[]} The servers which fall within an acceptable latency window
*/
function latencyWindowReducer(topologyDescription, servers) {
const low = servers.reduce(
(min, server) => (min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min)),
-1
);
const high = low + topologyDescription.localThresholdMS;
return servers.reduce((result, server) => {
if (server.roundTripTime <= high && server.roundTripTime >= low) result.push(server);
return result;
}, []);
}
// filters
function primaryFilter(server) {
return server.type === ServerType.RSPrimary;
}
function secondaryFilter(server) {
return server.type === ServerType.RSSecondary;
}
function nearestFilter(server) {
return server.type === ServerType.RSSecondary || server.type === ServerType.RSPrimary;
}
function knownFilter(server) {
return server.type !== ServerType.Unknown;
}
/**
* Returns a function which selects servers based on a provided read preference
*
* @param {ReadPreference} readPreference The read preference to select with
*/
function readPreferenceServerSelector(readPreference) {
if (!readPreference.isValid()) {
throw new TypeError('Invalid read preference specified');
}
return function(topologyDescription, servers) {
const commonWireVersion = topologyDescription.commonWireVersion;
if (
commonWireVersion &&
readPreference.minWireVersion &&
readPreference.minWireVersion > commonWireVersion
) {
throw new MongoError(
`Minimum wire version '${readPreference.minWireVersion}' required, but found '${commonWireVersion}'`
);
}
if (topologyDescription.type === TopologyType.Unknown) {
return [];
}
if (
topologyDescription.type === TopologyType.Single ||
topologyDescription.type === TopologyType.Sharded
) {
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
}
const mode = readPreference.mode;
if (mode === ReadPreference.PRIMARY) {
return servers.filter(primaryFilter);
}
if (mode === ReadPreference.PRIMARY_PREFERRED) {
const result = servers.filter(primaryFilter);
if (result.length) {
return result;
}
}
const filter = mode === ReadPreference.NEAREST ? nearestFilter : secondaryFilter;
const selectedServers = latencyWindowReducer(
topologyDescription,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))
)
);
if (mode === ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {
return servers.filter(primaryFilter);
}
return selectedServers;
};
}
module.exports = {
writableServerSelector,
readPreferenceServerSelector
};
|
:: Command execute :: | |
--[ c99shell v. 2.5 [PHP 8 Update] [24.05.2025] | Generation time: 0.0189 ]-- |