Class | Mongo::Pool |
In: |
lib/mongo/util/pool.rb
|
Parent: | Object |
PING_ATTEMPTS | = | 6 |
checked_out | [RW] | |
connection | [RW] | |
host | [RW] | |
port | [RW] | |
safe | [RW] | |
size | [RW] | |
timeout | [RW] |
Create a new pool of connections.
# File lib/mongo/util/pool.rb, line 26 def initialize(connection, host, port, opts={}) @connection = connection @host, @port = host, port # A Mongo::Node object. @node = opts[:node] # Pool size and timeout. @size = opts[:size] || 1 @timeout = opts[:timeout] || 5.0 # Mutex for synchronizing pool access @connection_mutex = Mutex.new # Condition variable for signal and wait @queue = ConditionVariable.new # Operations to perform on a socket @socket_ops = Hash.new { |h, k| h[k] = [] } @sockets = [] @pids = {} @checked_out = [] @ping_time = nil @last_ping = nil end
If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket. So we store the apply_authentication method, and this will be applied right before the next use of each socket.
# File lib/mongo/util/pool.rb, line 160 def authenticate_existing @connection_mutex.synchronize do @sockets.each do |socket| @socket_ops[socket] << Proc.new do @connection.apply_saved_authentication(:socket => socket) end end end end
Return a socket to the pool.
# File lib/mongo/util/pool.rb, line 124 def checkin(socket) @connection_mutex.synchronize do @checked_out.delete(socket) @queue.signal end true end
Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.
# File lib/mongo/util/pool.rb, line 205 def checkout @connection.connect if !@connection.connected? start_time = Time.now loop do if (Time.now - start_time) > @timeout raise ConnectionTimeoutError, "could not obtain connection within " + "#{@timeout} seconds. The max pool size is currently #{@size}; " + "consider increasing the pool size or timeout." end @connection_mutex.synchronize do socket = if @checked_out.size < @sockets.size checkout_existing_socket elsif @sockets.size < @size checkout_new_socket end if socket # This calls all procs, in order, scoped to existing sockets. # At the moment, we use this to lazily authenticate and # logout existing socket connections. @socket_ops[socket].reject! do |op| op.call end return socket else # Otherwise, wait @queue.wait(@connection_mutex) end end end end
Checks out the first available socket from the pool.
If the pid has changed, remove the socket and check out new one.
This method is called exclusively from checkout; therefore, it runs within a mutex.
# File lib/mongo/util/pool.rb, line 189 def checkout_existing_socket socket = (@sockets - @checked_out).first if @pids[socket] != Process.pid @pids[socket] = nil @sockets.delete(socket) socket.close if socket checkout_new_socket else @checked_out << socket socket end end
Adds a new socket to the pool and checks it out.
This method is called exclusively from checkout; therefore, it runs within a mutex.
# File lib/mongo/util/pool.rb, line 136 def checkout_new_socket begin socket = self.connection.socket_class.new(@host, @port) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) rescue => ex socket.close if socket raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}" @node.close if @node end # If any saved authentications exist, we want to apply those # when creating new sockets. @connection.apply_saved_authentication(:socket => socket) @sockets << socket @pids[socket] = Process.pid @checked_out << socket socket end
# File lib/mongo/util/pool.rb, line 54 def close @connection_mutex.synchronize do @sockets.each do |sock| begin sock.close rescue IOError => ex warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}" end end @host = @port = nil @sockets.clear @pids.clear @checked_out.clear end end
# File lib/mongo/util/pool.rb, line 70 def inspect "#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " + "@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>" end
Store the logout op for each existing socket to be applied before the next use of each socket.
# File lib/mongo/util/pool.rb, line 172 def logout_existing(db) @connection_mutex.synchronize do @sockets.each do |socket| @socket_ops[socket] << Proc.new do @connection.db(db).issue_logout(:socket => socket) end end end end
Refresh ping time only if we haven‘t checked within the last five minutes.
# File lib/mongo/util/pool.rb, line 85 def ping_time if !@last_ping @last_ping = Time.now @ping_time = refresh_ping_time elsif Time.now - @last_ping > 300 @last_ping = Time.now @ping_time = refresh_ping_time else @ping_time end end
Return the time it takes on average to do a round-trip against this node.
# File lib/mongo/util/pool.rb, line 99 def refresh_ping_time trials = [] begin PING_ATTEMPTS.times do t1 = Time.now self.connection['admin'].command({:ping => 1}, :socket => @node.socket) trials << (Time.now - t1) * 1000 end rescue OperationFailure, SocketError, SystemCallError, IOError => ex return nil end trials.sort! # Delete shortest and longest times trials.delete_at(trials.length-1) trials.delete_at(0) total = 0.0 trials.each { |t| total += t } (total / trials.length).ceil end