!C99Shell v. 2.5 [PHP 8 Update] [24.05.2025]!

Software: Apache/2.4.41 (Ubuntu). PHP/8.0.30 

uname -a: Linux apirnd 5.4.0-204-generic #224-Ubuntu SMP Thu Dec 5 13:38:28 UTC 2024 x86_64 

uid=33(www-data) gid=33(www-data) groups=33(www-data) 

Safe-mode: OFF (not secure)

/var/www/html/wincloud_gateway/node_modules/mongodb/lib/core/connection/   drwxr-xr-x
Free 13.13 GB of 57.97 GB (22.66%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Self remove    Logout    


Viewing file:     pool.js (36.75 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
'use strict';

const inherits = require('util').inherits;
const EventEmitter = require('events').EventEmitter;
const MongoError = require('../error').MongoError;
const MongoTimeoutError = require('../error').MongoTimeoutError;
const MongoWriteConcernError = require('../error').MongoWriteConcernError;
const Logger = require('./logger');
const f = require('util').format;
const Msg = require('./msg').Msg;
const CommandResult = require('./command_result');
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
const opcodes = require('../wireprotocol/shared').opcodes;
const compress = require('../wireprotocol/compression').compress;
const compressorIDs = require('../wireprotocol/compression').compressorIDs;
const uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands;
const apm = require('./apm');
const Buffer = require('safe-buffer').Buffer;
const connect = require('./connect');
const updateSessionFromResponse = require('../sessions').updateSessionFromResponse;
const eachAsync = require('../utils').eachAsync;
const makeStateMachine = require('../utils').makeStateMachine;
const now = require('../../utils').now;

const DISCONNECTED = 'disconnected';
const CONNECTING = 'connecting';
const CONNECTED = 'connected';
const DRAINING = 'draining';
const DESTROYING = 'destroying';
const DESTROYED = 'destroyed';
const stateTransition = makeStateMachine({
  [DISCONNECTED]: [CONNECTING, DRAINING, DISCONNECTED],
  [CONNECTING]: [CONNECTING, CONNECTED, DRAINING, DISCONNECTED],
  [CONNECTED]: [CONNECTED, DISCONNECTED, DRAINING],
  [DRAINING]: [DRAINING, DESTROYING, DESTROYED],
  [DESTROYING]: [DESTROYING, DESTROYED],
  [DESTROYED]: [DESTROYED]
});

const CONNECTION_EVENTS = new Set([
  'error',
  'close',
  'timeout',
  'parseError',
  'connect',
  'message'
]);

var _id = 0;

/**
 * Creates a new Pool instance
 * @class
 * @param {string} options.host The server host
 * @param {number} options.port The server port
 * @param {number} [options.size=5] Max server connection pool size
 * @param {number} [options.minSize=0] Minimum server connection pool size
 * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
 * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
 * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
 * @param {number} [options.keepAliveInitialDelay=120000] Initial delay before TCP keep alive enabled
 * @param {boolean} [options.noDelay=true] TCP Connection no delay
 * @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
 * @param {number} [options.monitoringSocketTimeout=0] TCP Socket timeout setting for replicaset monitoring socket
 * @param {boolean} [options.ssl=false] Use SSL for connection
 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
 * @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer
 * @param {Buffer} [options.cert] SSL Certificate binary buffer
 * @param {Buffer} [options.key] SSL Key file binary buffer
 * @param {string} [options.passphrase] SSL Certificate pass phrase
 * @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates
 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
 * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
 * @fires Pool#connect
 * @fires Pool#close
 * @fires Pool#error
 * @fires Pool#timeout
 * @fires Pool#parseError
 * @return {Pool} A cursor instance
 */
var Pool = function(topology, options) {
  // Add event listener
  EventEmitter.call(this);

  // Store topology for later use
  this.topology = topology;

  this.s = {
    state: DISCONNECTED,
    cancellationToken: new EventEmitter()
  };

  // we don't care how many connections are listening for cancellation
  this.s.cancellationToken.setMaxListeners(Infinity);

  // Add the options
  this.options = Object.assign(
    {
      // Host and port settings
      host: 'localhost',
      port: 27017,
      // Pool default max size
      size: 5,
      // Pool default min size
      minSize: 0,
      // socket settings
      connectionTimeout: 30000,
      socketTimeout: 0,
      keepAlive: true,
      keepAliveInitialDelay: 120000,
      noDelay: true,
      // SSL Settings
      ssl: false,
      checkServerIdentity: true,
      ca: null,
      crl: null,
      cert: null,
      key: null,
      passphrase: null,
      rejectUnauthorized: false,
      promoteLongs: true,
      promoteValues: true,
      promoteBuffers: false,
      // Reconnection options
      reconnect: true,
      reconnectInterval: 1000,
      reconnectTries: 30,
      // Enable domains
      domainsEnabled: false,
      // feature flag for determining if we are running with the unified topology or not
      legacyCompatMode: true
    },
    options
  );

  // Identification information
  this.id = _id++;
  // Current reconnect retries
  this.retriesLeft = this.options.reconnectTries;
  this.reconnectId = null;
  this.reconnectError = null;
  // No bson parser passed in
  if (
    !options.bson ||
    (options.bson &&
      (typeof options.bson.serialize !== 'function' ||
        typeof options.bson.deserialize !== 'function'))
  ) {
    throw new Error('must pass in valid bson parser');
  }

  // Logger instance
  this.logger = Logger('Pool', options);
  // Connections
  this.availableConnections = [];
  this.inUseConnections = [];
  this.connectingConnections = 0;
  // Currently executing
  this.executing = false;
  // Operation work queue
  this.queue = [];

  // Number of consecutive timeouts caught
  this.numberOfConsecutiveTimeouts = 0;
  // Current pool Index
  this.connectionIndex = 0;

  // event handlers
  const pool = this;
  this._messageHandler = messageHandler(this);
  this._connectionCloseHandler = function(err) {
    const connection = this;
    connectionFailureHandler(pool, 'close', err, connection);
  };

  this._connectionErrorHandler = function(err) {
    const connection = this;
    connectionFailureHandler(pool, 'error', err, connection);
  };

  this._connectionTimeoutHandler = function(err) {
    const connection = this;
    connectionFailureHandler(pool, 'timeout', err, connection);
  };

  this._connectionParseErrorHandler = function(err) {
    const connection = this;
    connectionFailureHandler(pool, 'parseError', err, connection);
  };
};

inherits(Pool, EventEmitter);

Object.defineProperty(Pool.prototype, 'size', {
  enumerable: true,
  get: function() {
    return this.options.size;
  }
});

Object.defineProperty(Pool.prototype, 'minSize', {
  enumerable: true,
  get: function() {
    return this.options.minSize;
  }
});

Object.defineProperty(Pool.prototype, 'connectionTimeout', {
  enumerable: true,
  get: function() {
    return this.options.connectionTimeout;
  }
});

Object.defineProperty(Pool.prototype, 'socketTimeout', {
  enumerable: true,
  get: function() {
    return this.options.socketTimeout;
  }
});

Object.defineProperty(Pool.prototype, 'state', {
  enumerable: true,
  get: function() {
    return this.s.state;
  }
});

// clears all pool state
function resetPoolState(pool) {
  pool.inUseConnections = [];
  pool.availableConnections = [];
  pool.connectingConnections = 0;
  pool.executing = false;
  pool.numberOfConsecutiveTimeouts = 0;
  pool.connectionIndex = 0;
  pool.retriesLeft = pool.options.reconnectTries;
  pool.reconnectId = null;
}

function connectionFailureHandler(pool, event, err, conn) {
  if (conn) {
    if (conn._connectionFailHandled) {
      return;
    }

    conn._connectionFailHandled = true;
    conn.destroy();

    // Remove the connection
    removeConnection(pool, conn);

    // flush remaining work items
    conn.flush(err);
  }

  // Did we catch a timeout, increment the numberOfConsecutiveTimeouts
  if (event === 'timeout') {
    pool.numberOfConsecutiveTimeouts = pool.numberOfConsecutiveTimeouts + 1;

    // Have we timed out more than reconnectTries in a row ?
    // Force close the pool as we are trying to connect to tcp sink hole
    if (pool.numberOfConsecutiveTimeouts > pool.options.reconnectTries) {
      pool.numberOfConsecutiveTimeouts = 0;
      // Destroy all connections and pool
      pool.destroy(true);
      // Emit close event
      return pool.emit('close', pool);
    }
  }

  // No more socket available propegate the event
  if (pool.socketCount() === 0) {
    if (pool.state !== DESTROYED && pool.state !== DESTROYING && pool.state !== DRAINING) {
      if (pool.options.reconnect) {
        stateTransition(pool, DISCONNECTED);
      }
    }

    // Do not emit error events, they are always close events
    // do not trigger the low level error handler in node
    event = event === 'error' ? 'close' : event;
    pool.emit(event, err);
  }

  // Start reconnection attempts
  if (!pool.reconnectId && pool.options.reconnect) {
    pool.reconnectError = err;
    pool.reconnectId = setTimeout(attemptReconnect(pool), pool.options.reconnectInterval);
  }

  // Do we need to do anything to maintain the minimum pool size
  const totalConnections = totalConnectionCount(pool);
  if (totalConnections < pool.minSize) {
    createConnection(pool);
  }
}

function attemptReconnect(pool, callback) {
  return function() {
    pool.emit('attemptReconnect', pool);

    if (pool.state === DESTROYED || pool.state === DESTROYING) {
      if (typeof callback === 'function') {
        callback(new MongoError('Cannot create connection when pool is destroyed'));
      }

      return;
    }

    pool.retriesLeft = pool.retriesLeft - 1;
    if (pool.retriesLeft <= 0) {
      pool.destroy();

      const error = new MongoTimeoutError(
        `failed to reconnect after ${pool.options.reconnectTries} attempts with interval ${pool.options.reconnectInterval} ms`,
        pool.reconnectError
      );

      pool.emit('reconnectFailed', error);
      if (typeof callback === 'function') {
        callback(error);
      }

      return;
    }

    // clear the reconnect id on retry
    pool.reconnectId = null;

    // now retry creating a connection
    createConnection(pool, (err, conn) => {
      if (err == null) {
        pool.reconnectId = null;
        pool.retriesLeft = pool.options.reconnectTries;
        pool.emit('reconnect', pool);
      }

      if (typeof callback === 'function') {
        callback(err, conn);
      }
    });
  };
}

function moveConnectionBetween(connection, from, to) {
  var index = from.indexOf(connection);
  // Move the connection from connecting to available
  if (index !== -1) {
    from.splice(index, 1);
    to.push(connection);
  }
}

function messageHandler(self) {
  return function(message, connection) {
    // workItem to execute
    var workItem = null;

    // Locate the workItem
    for (var i = 0; i < connection.workItems.length; i++) {
      if (connection.workItems[i].requestId === message.responseTo) {
        // Get the callback
        workItem = connection.workItems[i];
        // Remove from list of workItems
        connection.workItems.splice(i, 1);
      }
    }

    if (workItem && workItem.monitoring) {
      moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
    }

    // Reset timeout counter
    self.numberOfConsecutiveTimeouts = 0;

    // Reset the connection timeout if we modified it for
    // this operation
    if (workItem && workItem.socketTimeout) {
      connection.resetSocketTimeout();
    }

    // Log if debug enabled
    if (self.logger.isDebug()) {
      self.logger.debug(
        f(
          'message [%s] received from %s:%s',
          message.raw.toString('hex'),
          self.options.host,
          self.options.port
        )
      );
    }

    function handleOperationCallback(self, cb, err, result) {
      // No domain enabled
      if (!self.options.domainsEnabled) {
        return process.nextTick(function() {
          return cb(err, result);
        });
      }

      // Domain enabled just call the callback
      cb(err, result);
    }

    // Keep executing, ensure current message handler does not stop execution
    if (!self.executing) {
      process.nextTick(function() {
        _execute(self)();
      });
    }

    // Time to dispatch the message if we have a callback
    if (workItem && !workItem.immediateRelease) {
      try {
        // Parse the message according to the provided options
        message.parse(workItem);
      } catch (err) {
        return handleOperationCallback(self, workItem.cb, new MongoError(err));
      }

      if (message.documents[0]) {
        const document = message.documents[0];
        const session = workItem.session;
        if (session) {
          updateSessionFromResponse(session, document);
        }

        if (self.topology && document.$clusterTime) {
          self.topology.clusterTime = document.$clusterTime;
        }
      }

      // Establish if we have an error
      if (workItem.command && message.documents[0]) {
        const responseDoc = message.documents[0];

        if (responseDoc.writeConcernError) {
          const err = new MongoWriteConcernError(responseDoc.writeConcernError, responseDoc);
          return handleOperationCallback(self, workItem.cb, err);
        }

        if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) {
          return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc));
        }
      }

      // Add the connection details
      message.hashedName = connection.hashedName;

      // Return the documents
      handleOperationCallback(
        self,
        workItem.cb,
        null,
        new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message)
      );
    }
  };
}

/**
 * Return the total socket count in the pool.
 * @method
 * @return {Number} The number of socket available.
 */
Pool.prototype.socketCount = function() {
  return this.availableConnections.length + this.inUseConnections.length;
  // + this.connectingConnections.length;
};

function totalConnectionCount(pool) {
  return (
    pool.availableConnections.length + pool.inUseConnections.length + pool.connectingConnections
  );
}

/**
 * Return all pool connections
 * @method
 * @return {Connection[]} The pool connections
 */
Pool.prototype.allConnections = function() {
  return this.availableConnections.concat(this.inUseConnections);
};

/**
 * Get a pool connection (round-robin)
 * @method
 * @return {Connection}
 */
Pool.prototype.get = function() {
  return this.allConnections()[0];
};

/**
 * Is the pool connected
 * @method
 * @return {boolean}
 */
Pool.prototype.isConnected = function() {
  // We are in a destroyed state
  if (this.state === DESTROYED || this.state === DESTROYING) {
    return false;
  }

  // Get connections
  var connections = this.availableConnections.concat(this.inUseConnections);

  // Check if we have any connected connections
  for (var i = 0; i < connections.length; i++) {
    if (connections[i].isConnected()) return true;
  }

  // Not connected
  return false;
};

/**
 * Was the pool destroyed
 * @method
 * @return {boolean}
 */
Pool.prototype.isDestroyed = function() {
  return this.state === DESTROYED || this.state === DESTROYING;
};

/**
 * Is the pool in a disconnected state
 * @method
 * @return {boolean}
 */
Pool.prototype.isDisconnected = function() {
  return this.state === DISCONNECTED;
};

/**
 * Connect pool
 */
Pool.prototype.connect = function(callback) {
  if (this.state !== DISCONNECTED) {
    throw new MongoError('connection in unlawful state ' + this.state);
  }

  stateTransition(this, CONNECTING);
  createConnection(this, (err, conn) => {
    if (err) {
      if (typeof callback === 'function') {
        this.destroy();
        callback(err);
        return;
      }

      if (this.state === CONNECTING) {
        this.emit('error', err);
      }

      this.destroy();
      return;
    }

    stateTransition(this, CONNECTED);

    // create min connections
    if (this.minSize) {
      for (let i = 0; i < this.minSize; i++) {
        createConnection(this);
      }
    }

    if (typeof callback === 'function') {
      callback(null, conn);
    } else {
      this.emit('connect', this, conn);
    }
  });
};

/**
 * Authenticate using a specified mechanism
 * @param {authResultCallback} callback A callback function
 */
Pool.prototype.auth = function(credentials, callback) {
  if (typeof callback === 'function') callback(null, null);
};

/**
 * Logout all users against a database
 * @param {authResultCallback} callback A callback function
 */
Pool.prototype.logout = function(dbName, callback) {
  if (typeof callback === 'function') callback(null, null);
};

/**
 * Unref the pool
 * @method
 */
Pool.prototype.unref = function() {
  // Get all the known connections
  var connections = this.availableConnections.concat(this.inUseConnections);

  connections.forEach(function(c) {
    c.unref();
  });
};

// Destroy the connections
function destroy(self, connections, options, callback) {
  stateTransition(self, DESTROYING);

  // indicate that in-flight connections should cancel
  self.s.cancellationToken.emit('cancel');

  eachAsync(
    connections,
    (conn, cb) => {
      for (const eventName of CONNECTION_EVENTS) {
        conn.removeAllListeners(eventName);
      }

      // ignore any errors during destruction
      conn.on('error', () => {});

      conn.destroy(options, cb);
    },
    err => {
      if (err) {
        if (typeof callback === 'function') callback(err, null);
        return;
      }

      resetPoolState(self);
      self.queue = [];

      stateTransition(self, DESTROYED);
      if (typeof callback === 'function') callback(null, null);
    }
  );
}

/**
 * Destroy pool
 * @method
 */
Pool.prototype.destroy = function(force, callback) {
  var self = this;
  if (typeof force === 'function') {
    callback = force;
    force = false;
  }

  // Do not try again if the pool is already dead
  if (this.state === DESTROYED || self.state === DESTROYING) {
    if (typeof callback === 'function') callback(null, null);
    return;
  }

  // Set state to draining
  stateTransition(this, DRAINING);

  // Are we force closing
  if (force) {
    // Get all the known connections
    var connections = self.availableConnections.concat(self.inUseConnections);

    // Flush any remaining work items with
    // an error
    while (self.queue.length > 0) {
      var workItem = self.queue.shift();
      if (typeof workItem.cb === 'function') {
        workItem.cb(new MongoError('Pool was force destroyed'));
      }
    }

    // Destroy the topology
    return destroy(self, connections, { force: true }, callback);
  }

  // Clear out the reconnect if set
  if (this.reconnectId) {
    clearTimeout(this.reconnectId);
  }

  // Wait for the operations to drain before we close the pool
  function checkStatus() {
    if (self.state === DESTROYED || self.state === DESTROYING) {
      if (typeof callback === 'function') {
        callback();
      }

      return;
    }

    flushMonitoringOperations(self.queue);

    if (self.queue.length === 0) {
      // Get all the known connections
      var connections = self.availableConnections.concat(self.inUseConnections);

      // Check if we have any in flight operations
      for (var i = 0; i < connections.length; i++) {
        // There is an operation still in flight, reschedule a
        // check waiting for it to drain
        if (connections[i].workItems.length > 0) {
          return setTimeout(checkStatus, 1);
        }
      }

      destroy(self, connections, { force: false }, callback);
    } else {
      // Ensure we empty the queue
      _execute(self)();
      // Set timeout
      setTimeout(checkStatus, 1);
    }
  }

  // Initiate drain of operations
  checkStatus();
};

/**
 * Reset all connections of this pool
 *
 * @param {function} [callback]
 */
Pool.prototype.reset = function(callback) {
  if (this.s.state !== CONNECTED) {
    if (typeof callback === 'function') {
      callback(new MongoError('pool is not connected, reset aborted'));
    }

    return;
  }

  // signal in-flight connections should be cancelled
  this.s.cancellationToken.emit('cancel');

  // destroy existing connections
  const connections = this.availableConnections.concat(this.inUseConnections);
  eachAsync(
    connections,
    (conn, cb) => {
      for (const eventName of CONNECTION_EVENTS) {
        conn.removeAllListeners(eventName);
      }

      conn.destroy({ force: true }, cb);
    },
    err => {
      if (err) {
        if (typeof callback === 'function') {
          callback(err, null);
          return;
        }
      }

      resetPoolState(this);

      // create a new connection, this will ultimately trigger execution
      createConnection(this, () => {
        if (typeof callback === 'function') {
          callback(null, null);
        }
      });
    }
  );
};

// Prepare the buffer that Pool.prototype.write() uses to send to the server
function serializeCommand(self, command, callback) {
  const originalCommandBuffer = command.toBin();

  // Check whether we and the server have agreed to use a compressor
  const shouldCompress = !!self.options.agreedCompressor;
  if (!shouldCompress || !canCompress(command)) {
    return callback(null, originalCommandBuffer);
  }

  // Transform originalCommandBuffer into OP_COMPRESSED
  const concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer);
  const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);

  // Extract information needed for OP_COMPRESSED from the uncompressed message
  const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);

  // Compress the message body
  compress(self, messageToBeCompressed, function(err, compressedMessage) {
    if (err) return callback(err, null);

    // Create the msgHeader of OP_COMPRESSED
    const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
    msgHeader.writeInt32LE(
      MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
      0
    ); // messageLength
    msgHeader.writeInt32LE(command.requestId, 4); // requestID
    msgHeader.writeInt32LE(0, 8); // responseTo (zero)
    msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode

    // Create the compression details of OP_COMPRESSED
    const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
    compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
    compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
    compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID

    return callback(null, [msgHeader, compressionDetails, compressedMessage]);
  });
}

/**
 * Write a message to MongoDB
 * @method
 * @return {Connection}
 */
Pool.prototype.write = function(command, options, cb) {
  var self = this;
  // Ensure we have a callback
  if (typeof options === 'function') {
    cb = options;
  }

  // Always have options
  options = options || {};

  // We need to have a callback function unless the message returns no response
  if (!(typeof cb === 'function') && !options.noResponse) {
    throw new MongoError('write method must provide a callback');
  }

  // Pool was destroyed error out
  if (this.state === DESTROYED || this.state === DESTROYING) {
    cb(new MongoError('pool destroyed'));
    return;
  }

  if (this.state === DRAINING) {
    cb(new MongoError('pool is draining, new operations prohibited'));
    return;
  }

  if (this.options.domainsEnabled && process.domain && typeof cb === 'function') {
    // if we have a domain bind to it
    var oldCb = cb;
    cb = process.domain.bind(function() {
      // v8 - argumentsToArray one-liner
      var args = new Array(arguments.length);
      for (var i = 0; i < arguments.length; i++) {
        args[i] = arguments[i];
      }
      // bounce off event loop so domain switch takes place
      process.nextTick(function() {
        oldCb.apply(null, args);
      });
    });
  }

  // Do we have an operation
  var operation = {
    cb: cb,
    raw: false,
    promoteLongs: true,
    promoteValues: true,
    promoteBuffers: false,
    fullResult: false
  };

  // Set the options for the parsing
  operation.promoteLongs = typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true;
  operation.promoteValues =
    typeof options.promoteValues === 'boolean' ? options.promoteValues : true;
  operation.promoteBuffers =
    typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false;
  operation.raw = typeof options.raw === 'boolean' ? options.raw : false;
  operation.immediateRelease =
    typeof options.immediateRelease === 'boolean' ? options.immediateRelease : false;
  operation.documentsReturnedIn = options.documentsReturnedIn;
  operation.command = typeof options.command === 'boolean' ? options.command : false;
  operation.fullResult = typeof options.fullResult === 'boolean' ? options.fullResult : false;
  operation.noResponse = typeof options.noResponse === 'boolean' ? options.noResponse : false;
  operation.session = options.session || null;

  // Optional per operation socketTimeout
  operation.socketTimeout = options.socketTimeout;
  operation.monitoring = options.monitoring;

  // Get the requestId
  operation.requestId = command.requestId;

  // If command monitoring is enabled we need to modify the callback here
  if (self.options.monitorCommands) {
    this.emit('commandStarted', new apm.CommandStartedEvent(this, command));

    operation.started = now();
    operation.cb = (err, reply) => {
      if (err) {
        self.emit(
          'commandFailed',
          new apm.CommandFailedEvent(this, command, err, operation.started)
        );
      } else {
        if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) {
          self.emit(
            'commandFailed',
            new apm.CommandFailedEvent(this, command, reply.result, operation.started)
          );
        } else {
          self.emit(
            'commandSucceeded',
            new apm.CommandSucceededEvent(this, command, reply, operation.started)
          );
        }
      }

      if (typeof cb === 'function') cb(err, reply);
    };
  }

  // Prepare the operation buffer
  serializeCommand(self, command, (err, serializedBuffers) => {
    if (err) throw err;

    // Set the operation's buffer to the serialization of the commands
    operation.buffer = serializedBuffers;

    // If we have a monitoring operation schedule as the very first operation
    // Otherwise add to back of queue
    if (options.monitoring) {
      self.queue.unshift(operation);
    } else {
      self.queue.push(operation);
    }

    // Attempt to execute the operation
    if (!self.executing) {
      process.nextTick(function() {
        _execute(self)();
      });
    }
  });
};

// Return whether a command contains an uncompressible command term
// Will return true if command contains no uncompressible command terms
function canCompress(command) {
  const commandDoc = command instanceof Msg ? command.command : command.query;
  const commandName = Object.keys(commandDoc)[0];
  return !uncompressibleCommands.has(commandName);
}

// Remove connection method
function remove(connection, connections) {
  for (var i = 0; i < connections.length; i++) {
    if (connections[i] === connection) {
      connections.splice(i, 1);
      return true;
    }
  }
}

function removeConnection(self, connection) {
  if (remove(connection, self.availableConnections)) return;
  if (remove(connection, self.inUseConnections)) return;
}

function createConnection(pool, callback) {
  if (pool.state === DESTROYED || pool.state === DESTROYING) {
    if (typeof callback === 'function') {
      callback(new MongoError('Cannot create connection when pool is destroyed'));
    }

    return;
  }

  pool.connectingConnections++;
  connect(pool.options, pool.s.cancellationToken, (err, connection) => {
    pool.connectingConnections--;

    if (err) {
      if (pool.logger.isDebug()) {
        pool.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
      }

      // check if reconnect is enabled, and attempt retry if so
      if (!pool.reconnectId && pool.options.reconnect) {
        if (pool.state === CONNECTING && pool.options.legacyCompatMode) {
          callback(err);
          return;
        }

        pool.reconnectError = err;
        pool.reconnectId = setTimeout(
          attemptReconnect(pool, callback),
          pool.options.reconnectInterval
        );

        return;
      }

      if (typeof callback === 'function') {
        callback(err);
      }

      return;
    }

    // the pool might have been closed since we started creating the connection
    if (pool.state === DESTROYED || pool.state === DESTROYING) {
      if (typeof callback === 'function') {
        callback(new MongoError('Pool was destroyed after connection creation'));
      }

      connection.destroy();
      return;
    }

    // otherwise, connect relevant event handlers and add it to our available connections
    connection.on('error', pool._connectionErrorHandler);
    connection.on('close', pool._connectionCloseHandler);
    connection.on('timeout', pool._connectionTimeoutHandler);
    connection.on('parseError', pool._connectionParseErrorHandler);
    connection.on('message', pool._messageHandler);

    pool.availableConnections.push(connection);

    // if a callback was provided, return the connection
    if (typeof callback === 'function') {
      callback(null, connection);
    }

    // immediately execute any waiting work
    _execute(pool)();
  });
}

function flushMonitoringOperations(queue) {
  for (var i = 0; i < queue.length; i++) {
    if (queue[i].monitoring) {
      var workItem = queue[i];
      queue.splice(i, 1);
      workItem.cb(
        new MongoError({ message: 'no connection available for monitoring', driver: true })
      );
    }
  }
}

function _execute(self) {
  return function() {
    if (self.state === DESTROYED) return;
    // Already executing, skip
    if (self.executing) return;
    // Set pool as executing
    self.executing = true;

    // New pool connections are in progress, wait them to finish
    // before executing any more operation to ensure distribution of
    // operations
    if (self.connectingConnections > 0) {
      self.executing = false;
      return;
    }

    // As long as we have available connections
    // eslint-disable-next-line
    while (true) {
      // Total availble connections
      const totalConnections = totalConnectionCount(self);

      // No available connections available, flush any monitoring ops
      if (self.availableConnections.length === 0) {
        // Flush any monitoring operations
        flushMonitoringOperations(self.queue);

        // Try to create a new connection to execute stuck operation
        if (totalConnections < self.options.size && self.queue.length > 0) {
          createConnection(self);
        }

        break;
      }

      // No queue break
      if (self.queue.length === 0) {
        break;
      }

      var connection = null;
      const connections = self.availableConnections.filter(conn => conn.workItems.length === 0);

      // No connection found that has no work on it, just pick one for pipelining
      if (connections.length === 0) {
        connection =
          self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
      } else {
        connection = connections[self.connectionIndex++ % connections.length];
      }

      // Is the connection connected
      if (!connection.isConnected()) {
        // Remove the disconnected connection
        removeConnection(self, connection);
        // Flush any monitoring operations in the queue, failing fast
        flushMonitoringOperations(self.queue);
        break;
      }

      // Get the next work item
      var workItem = self.queue.shift();

      // If we are monitoring we need to use a connection that is not
      // running another operation to avoid socket timeout changes
      // affecting an existing operation
      if (workItem.monitoring) {
        var foundValidConnection = false;

        for (let i = 0; i < self.availableConnections.length; i++) {
          // If the connection is connected
          // And there are no pending workItems on it
          // Then we can safely use it for monitoring.
          if (
            self.availableConnections[i].isConnected() &&
            self.availableConnections[i].workItems.length === 0
          ) {
            foundValidConnection = true;
            connection = self.availableConnections[i];
            break;
          }
        }

        // No safe connection found, attempt to grow the connections
        // if possible and break from the loop
        if (!foundValidConnection) {
          // Put workItem back on the queue
          self.queue.unshift(workItem);

          // Attempt to grow the pool if it's not yet maxsize
          if (totalConnections < self.options.size && self.queue.length > 0) {
            // Create a new connection
            createConnection(self);
          }

          // Re-execute the operation
          setTimeout(() => _execute(self)(), 10);
          break;
        }
      }

      // Don't execute operation until we have a full pool
      if (totalConnections < self.options.size) {
        // Connection has work items, then put it back on the queue
        // and create a new connection
        if (connection.workItems.length > 0) {
          // Lets put the workItem back on the list
          self.queue.unshift(workItem);
          // Create a new connection
          createConnection(self);
          // Break from the loop
          break;
        }
      }

      // Get actual binary commands
      var buffer = workItem.buffer;

      // If we are monitoring take the connection of the availableConnections
      if (workItem.monitoring) {
        moveConnectionBetween(connection, self.availableConnections, self.inUseConnections);
      }

      // Track the executing commands on the mongo server
      // as long as there is an expected response
      if (!workItem.noResponse) {
        connection.workItems.push(workItem);
      }

      // We have a custom socketTimeout
      if (!workItem.immediateRelease && typeof workItem.socketTimeout === 'number') {
        connection.setSocketTimeout(workItem.socketTimeout);
      }

      // Capture if write was successful
      var writeSuccessful = true;

      // Put operation on the wire
      if (Array.isArray(buffer)) {
        for (let i = 0; i < buffer.length; i++) {
          writeSuccessful = connection.write(buffer[i]);
        }
      } else {
        writeSuccessful = connection.write(buffer);
      }

      // if the command is designated noResponse, call the callback immeditely
      if (workItem.noResponse && typeof workItem.cb === 'function') {
        workItem.cb(null, null);
      }

      if (writeSuccessful === false) {
        // If write not successful put back on queue
        self.queue.unshift(workItem);
        // Remove the disconnected connection
        removeConnection(self, connection);
        // Flush any monitoring operations in the queue, failing fast
        flushMonitoringOperations(self.queue);
        break;
      }
    }

    self.executing = false;
  };
}

// Make execution loop available for testing
Pool._execute = _execute;

/**
 * A server connect event, used to verify that the connection is up and running
 *
 * @event Pool#connect
 * @type {Pool}
 */

/**
 * A server reconnect event, used to verify that pool reconnected.
 *
 * @event Pool#reconnect
 * @type {Pool}
 */

/**
 * The server connection closed, all pool connections closed
 *
 * @event Pool#close
 * @type {Pool}
 */

/**
 * The server connection caused an error, all pool connections closed
 *
 * @event Pool#error
 * @type {Pool}
 */

/**
 * The server connection timed out, all pool connections closed
 *
 * @event Pool#timeout
 * @type {Pool}
 */

/**
 * The driver experienced an invalid message, all pool connections closed
 *
 * @event Pool#parseError
 * @type {Pool}
 */

/**
 * The driver attempted to reconnect
 *
 * @event Pool#attemptReconnect
 * @type {Pool}
 */

/**
 * The driver exhausted all reconnect attempts
 *
 * @event Pool#reconnectFailed
 * @type {Pool}
 */

module.exports = Pool;

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.5 [PHP 8 Update] [24.05.2025] | Generation time: 0.0061 ]--