Class Mongo::Pool
In: lib/mongo/util/pool.rb
Parent: Object

Methods

Constants

PING_ATTEMPTS = 6

Attributes

checked_out  [RW] 
connection  [RW] 
host  [RW] 
port  [RW] 
safe  [RW] 
size  [RW] 
timeout  [RW] 

Public Class methods

Create a new pool of connections.

[Source]

# File lib/mongo/util/pool.rb, line 26
    def initialize(connection, host, port, opts={})
      @connection  = connection

      @host, @port = host, port

      # A Mongo::Node object.
      @node = opts[:node]

      # Pool size and timeout.
      @size      = opts[:size] || 1
      @timeout   = opts[:timeout]   || 5.0

      # Mutex for synchronizing pool access
      @connection_mutex = Mutex.new

      # Condition variable for signal and wait
      @queue = ConditionVariable.new

      # Operations to perform on a socket
      @socket_ops = Hash.new { |h, k| h[k] = [] }

      @sockets      = []
      @pids         = {}
      @checked_out  = []
      @ping_time    = nil
      @last_ping    = nil
    end

Public Instance methods

If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket. So we store the apply_authentication method, and this will be applied right before the next use of each socket.

[Source]

# File lib/mongo/util/pool.rb, line 160
    def authenticate_existing
      @connection_mutex.synchronize do
        @sockets.each do |socket|
          @socket_ops[socket] << Proc.new do
            @connection.apply_saved_authentication(:socket => socket)
          end
        end
      end
    end

Return a socket to the pool.

[Source]

# File lib/mongo/util/pool.rb, line 124
    def checkin(socket)
      @connection_mutex.synchronize do
        @checked_out.delete(socket)
        @queue.signal
      end
      true
    end

Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.

[Source]

# File lib/mongo/util/pool.rb, line 205
    def checkout
      @connection.connect if !@connection.connected?
      start_time = Time.now
      loop do
        if (Time.now - start_time) > @timeout
            raise ConnectionTimeoutError, "could not obtain connection within " +
              "#{@timeout} seconds. The max pool size is currently #{@size}; " +
              "consider increasing the pool size or timeout."
        end

        @connection_mutex.synchronize do
          socket = if @checked_out.size < @sockets.size
                     checkout_existing_socket
                   elsif @sockets.size < @size
                     checkout_new_socket
                   end

          if socket

          # This calls all procs, in order, scoped to existing sockets.
          # At the moment, we use this to lazily authenticate and
          # logout existing socket connections.
          @socket_ops[socket].reject! do |op|
            op.call
          end

            return socket
          else
            # Otherwise, wait
            @queue.wait(@connection_mutex)
          end
        end
      end
    end

Checks out the first available socket from the pool.

If the pid has changed, remove the socket and check out new one.

This method is called exclusively from checkout; therefore, it runs within a mutex.

[Source]

# File lib/mongo/util/pool.rb, line 189
    def checkout_existing_socket
      socket = (@sockets - @checked_out).first
      if @pids[socket] != Process.pid
         @pids[socket] = nil
         @sockets.delete(socket)
         socket.close if socket
         checkout_new_socket
      else
        @checked_out << socket
        socket
      end
    end

Adds a new socket to the pool and checks it out.

This method is called exclusively from checkout; therefore, it runs within a mutex.

[Source]

# File lib/mongo/util/pool.rb, line 136
    def checkout_new_socket
      begin
        socket = self.connection.socket_class.new(@host, @port)
        socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
      rescue => ex
        socket.close if socket
        raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}"
        @node.close if @node
      end

      # If any saved authentications exist, we want to apply those
      # when creating new sockets.
      @connection.apply_saved_authentication(:socket => socket)

      @sockets << socket
      @pids[socket] = Process.pid
      @checked_out << socket
      socket
    end

[Source]

# File lib/mongo/util/pool.rb, line 54
    def close
      @connection_mutex.synchronize do
        @sockets.each do |sock|
          begin
            sock.close
          rescue IOError => ex
            warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}"
          end
        end
        @host = @port = nil
        @sockets.clear
        @pids.clear
        @checked_out.clear
      end
    end

[Source]

# File lib/mongo/util/pool.rb, line 79
    def host_port
      [@host, @port]
    end

[Source]

# File lib/mongo/util/pool.rb, line 75
    def host_string
      "#{@host}:#{@port}"
    end

[Source]

# File lib/mongo/util/pool.rb, line 70
    def inspect
      "#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
        "@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>"
    end

Store the logout op for each existing socket to be applied before the next use of each socket.

[Source]

# File lib/mongo/util/pool.rb, line 172
    def logout_existing(db)
      @connection_mutex.synchronize do
        @sockets.each do |socket|
          @socket_ops[socket] << Proc.new do
            @connection.db(db).issue_logout(:socket => socket)
          end
        end
      end
    end

Refresh ping time only if we haven‘t checked within the last five minutes.

[Source]

# File lib/mongo/util/pool.rb, line 85
    def ping_time
      if !@last_ping
        @last_ping = Time.now
        @ping_time = refresh_ping_time
      elsif Time.now - @last_ping > 300
        @last_ping = Time.now
        @ping_time = refresh_ping_time
      else
        @ping_time
      end
    end

Return the time it takes on average to do a round-trip against this node.

[Source]

# File lib/mongo/util/pool.rb, line 99
    def refresh_ping_time
      trials = []
      begin
         PING_ATTEMPTS.times do
           t1 = Time.now
           self.connection['admin'].command({:ping => 1}, :socket => @node.socket)
           trials << (Time.now - t1) * 1000
         end
       rescue OperationFailure, SocketError, SystemCallError, IOError => ex
         return nil
      end

      trials.sort!

      # Delete shortest and longest times
      trials.delete_at(trials.length-1)
      trials.delete_at(0)

      total = 0.0
      trials.each { |t| total += t }

      (total / trials.length).ceil
    end

[Validate]