DeepCover
v0.1.16
| 1 | # frozen_string_literal: true |
| 2 | |
| 3 | require "thread" |
| 4 | require "concurrent/map" |
| 5 | require "monitor" |
| 6 | |
| 7 | module ActiveRecord |
| 8 | # Raised when a connection could not be obtained within the connection |
| 9 | # acquisition timeout period: because max connections in pool |
| 10 | # are in use. |
| 11 | class ConnectionTimeoutError < ConnectionNotEstablished |
| 12 | end |
| 13 | |
| 14 | # Raised when a pool was unable to get ahold of all its connections |
| 15 | # to perform a "group" action such as |
| 16 | # {ActiveRecord::Base.connection_pool.disconnect!}[rdoc-ref:ConnectionAdapters::ConnectionPool#disconnect!] |
| 17 | # or {ActiveRecord::Base.clear_reloadable_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_reloadable_connections!]. |
| 18 | class ExclusiveConnectionTimeoutError < ConnectionTimeoutError |
| 19 | end |
| 20 | |
| 21 | module ConnectionAdapters |
| 22 | # Connection pool base class for managing Active Record database |
| 23 | # connections. |
| 24 | # |
| 25 | # == Introduction |
| 26 | # |
| 27 | # A connection pool synchronizes thread access to a limited number of |
| 28 | # database connections. The basic idea is that each thread checks out a |
| 29 | # database connection from the pool, uses that connection, and checks the |
| 30 | # connection back in. ConnectionPool is completely thread-safe, and will |
| 31 | # ensure that a connection cannot be used by two threads at the same time, |
| 32 | # as long as ConnectionPool's contract is correctly followed. It will also |
| 33 | # handle cases in which there are more threads than connections: if all |
| 34 | # connections have been checked out, and a thread tries to checkout a |
| 35 | # connection anyway, then ConnectionPool will wait until some other thread |
| 36 | # has checked in a connection. |
| 37 | # |
| 38 | # == Obtaining (checking out) a connection |
| 39 | # |
| 40 | # Connections can be obtained and used from a connection pool in several |
| 41 | # ways: |
| 42 | # |
| 43 | # 1. Simply use {ActiveRecord::Base.connection}[rdoc-ref:ConnectionHandling.connection] |
| 44 | # as with Active Record 2.1 and |
| 45 | # earlier (pre-connection-pooling). Eventually, when you're done with |
| 46 | # the connection(s) and wish it to be returned to the pool, you call |
| 47 | # {ActiveRecord::Base.clear_active_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_active_connections!]. |
| 48 | # This will be the default behavior for Active Record when used in conjunction with |
| 49 | # Action Pack's request handling cycle. |
| 50 | # 2. Manually check out a connection from the pool with |
| 51 | # {ActiveRecord::Base.connection_pool.checkout}[rdoc-ref:#checkout]. You are responsible for |
| 52 | # returning this connection to the pool when finished by calling |
| 53 | # {ActiveRecord::Base.connection_pool.checkin(connection)}[rdoc-ref:#checkin]. |
| 54 | # 3. Use {ActiveRecord::Base.connection_pool.with_connection(&block)}[rdoc-ref:#with_connection], which |
| 55 | # obtains a connection, yields it as the sole argument to the block, |
| 56 | # and returns it to the pool after the block completes. |
| 57 | # |
| 58 | # Connections in the pool are actually AbstractAdapter objects (or objects |
| 59 | # compatible with AbstractAdapter's interface). |
| 60 | # |
| 61 | # == Options |
| 62 | # |
| 63 | # There are several connection-pooling-related options that you can add to |
| 64 | # your database connection configuration: |
| 65 | # |
| 66 | # * +pool+: maximum number of connections the pool may manage (default 5). |
| 67 | # * +idle_timeout+: number of seconds that a connection will be kept |
| 68 | # unused in the pool before it is automatically disconnected (default |
| 69 | # 300 seconds). Set this to zero to keep connections forever. |
| 70 | # * +checkout_timeout+: number of seconds to wait for a connection to |
| 71 | # become available before giving up and raising a timeout error (default |
| 72 | # 5 seconds). |
| 73 | # |
| 74 | #-- |
| 75 | # Synchronization policy: |
| 76 | # * all public methods can be called outside +synchronize+ |
| 77 | # * access to these instance variables needs to be in +synchronize+: |
| 78 | # * @connections |
| 79 | # * @now_connecting |
| 80 | # * private methods that require being called in a +synchronize+ blocks |
| 81 | # are now explicitly documented |
| 82 | class ConnectionPool |
| 83 | # Threadsafe, fair, LIFO queue. Meant to be used by ConnectionPool |
| 84 | # with which it shares a Monitor. |
| 85 | class Queue |
| 86 | def initialize(lock = Monitor.new) |
| 87 | @lock = lock |
| 88 | @cond = @lock.new_cond |
| 89 | @num_waiting = 0 |
| 90 | @queue = [] |
| 91 | end |
| 92 | |
| 93 | # Test if any threads are currently waiting on the queue. |
| 94 | def any_waiting? |
| 95 | synchronize do |
| 96 | @num_waiting > 0 |
| 97 | end |
| 98 | end |
| 99 | |
| 100 | # Returns the number of threads currently waiting on this |
| 101 | # queue. |
| 102 | def num_waiting |
| 103 | synchronize do |
| 104 | @num_waiting |
| 105 | end |
| 106 | end |
| 107 | |
| 108 | # Add +element+ to the queue. Never blocks. |
| 109 | def add(element) |
| 110 | synchronize do |
| 111 | @queue.push element |
| 112 | @cond.signal |
| 113 | end |
| 114 | end |
| 115 | |
| 116 | # If +element+ is in the queue, remove and return it, or +nil+. |
| 117 | def delete(element) |
| 118 | synchronize do |
| 119 | @queue.delete(element) |
| 120 | end |
| 121 | end |
| 122 | |
| 123 | # Remove all elements from the queue. |
| 124 | def clear |
| 125 | synchronize do |
| 126 | @queue.clear |
| 127 | end |
| 128 | end |
| 129 | |
| 130 | # Remove the head of the queue. |
| 131 | # |
| 132 | # If +timeout+ is not given, remove and return the head the |
| 133 | # queue if the number of available elements is strictly |
| 134 | # greater than the number of threads currently waiting (that |
| 135 | # is, don't jump ahead in line). Otherwise, return +nil+. |
| 136 | # |
| 137 | # If +timeout+ is given, block if there is no element |
| 138 | # available, waiting up to +timeout+ seconds for an element to |
| 139 | # become available. |
| 140 | # |
| 141 | # Raises: |
| 142 | # - ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element |
| 143 | # becomes available within +timeout+ seconds, |
| 144 | def poll(timeout = nil) |
| 145 | synchronize { internal_poll(timeout) } |
| 146 | end |
| 147 | |
| 148 | private |
| 149 | |
| 150 | def internal_poll(timeout) |
| 151 | no_wait_poll || (timeout && wait_poll(timeout)) |
| 152 | end |
| 153 | |
| 154 | def synchronize(&block) |
| 155 | @lock.synchronize(&block) |
| 156 | end |
| 157 | |
| 158 | # Test if the queue currently contains any elements. |
| 159 | def any? |
| 160 | !@queue.empty? |
| 161 | end |
| 162 | |
| 163 | # A thread can remove an element from the queue without |
| 164 | # waiting if and only if the number of currently available |
| 165 | # connections is strictly greater than the number of waiting |
| 166 | # threads. |
| 167 | def can_remove_no_wait? |
| 168 | @queue.size > @num_waiting |
| 169 | end |
| 170 | |
| 171 | # Removes and returns the head of the queue if possible, or +nil+. |
| 172 | def remove |
| 173 | @queue.pop |
| 174 | end |
| 175 | |
| 176 | # Remove and return the head the queue if the number of |
| 177 | # available elements is strictly greater than the number of |
| 178 | # threads currently waiting. Otherwise, return +nil+. |
| 179 | def no_wait_poll |
| 180 | remove if can_remove_no_wait? |
| 181 | end |
| 182 | |
| 183 | # Waits on the queue up to +timeout+ seconds, then removes and |
| 184 | # returns the head of the queue. |
| 185 | def wait_poll(timeout) |
| 186 | @num_waiting += 1 |
| 187 | |
| 188 | t0 = Time.now |
| 189 | elapsed = 0 |
| 190 | loop do |
| 191 | @cond.wait(timeout - elapsed) |
| 192 | |
| 193 | return remove if any? |
| 194 | |
| 195 | elapsed = Time.now - t0 |
| 196 | if elapsed >= timeout |
| 197 | msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" % |
| 198 | [timeout, elapsed] |
| 199 | raise ConnectionTimeoutError, msg |
| 200 | end |
| 201 | end |
| 202 | ensure |
| 203 | @num_waiting -= 1 |
| 204 | end |
| 205 | end |
| 206 | |
| 207 | # Adds the ability to turn a basic fair FIFO queue into one |
| 208 | # biased to some thread. |
| 209 | module BiasableQueue # :nodoc: |
| 210 | class BiasedConditionVariable # :nodoc: |
| 211 | # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+, |
| 212 | # +signal+ and +wait+ methods are only called while holding a lock |
| 213 | def initialize(lock, other_cond, preferred_thread) |
| 214 | @real_cond = lock.new_cond |
| 215 | @other_cond = other_cond |
| 216 | @preferred_thread = preferred_thread |
| 217 | @num_waiting_on_real_cond = 0 |
| 218 | end |
| 219 | |
| 220 | def broadcast |
| 221 | broadcast_on_biased |
| 222 | @other_cond.broadcast |
| 223 | end |
| 224 | |
| 225 | def broadcast_on_biased |
| 226 | @num_waiting_on_real_cond = 0 |
| 227 | @real_cond.broadcast |
| 228 | end |
| 229 | |
| 230 | def signal |
| 231 | if @num_waiting_on_real_cond > 0 |
| 232 | @num_waiting_on_real_cond -= 1 |
| 233 | @real_cond |
| 234 | else |
| 235 | @other_cond |
| 236 | end.signal |
| 237 | end |
| 238 | |
| 239 | def wait(timeout) |
| 240 | if Thread.current == @preferred_thread |
| 241 | @num_waiting_on_real_cond += 1 |
| 242 | @real_cond |
| 243 | else |
| 244 | @other_cond |
| 245 | end.wait(timeout) |
| 246 | end |
| 247 | end |
| 248 | |
| 249 | def with_a_bias_for(thread) |
| 250 | previous_cond = nil |
| 251 | new_cond = nil |
| 252 | synchronize do |
| 253 | previous_cond = @cond |
| 254 | @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread) |
| 255 | end |
| 256 | yield |
| 257 | ensure |
| 258 | synchronize do |
| 259 | @cond = previous_cond if previous_cond |
| 260 | new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers |
| 261 | end |
| 262 | end |
| 263 | end |
| 264 | |
| 265 | # Connections must be leased while holding the main pool mutex. This is |
| 266 | # an internal subclass that also +.leases+ returned connections while |
| 267 | # still in queue's critical section (queue synchronizes with the same |
| 268 | # <tt>@lock</tt> as the main pool) so that a returned connection is already |
| 269 | # leased and there is no need to re-enter synchronized block. |
| 270 | class ConnectionLeasingQueue < Queue # :nodoc: |
| 271 | include BiasableQueue |
| 272 | |
| 273 | private |
| 274 | def internal_poll(timeout) |
| 275 | conn = super |
| 276 | conn.lease if conn |
| 277 | conn |
| 278 | end |
| 279 | end |
| 280 | |
| 281 | # Every +frequency+ seconds, the reaper will call +reap+ and +flush+ on |
| 282 | # +pool+. A reaper instantiated with a zero frequency will never reap |
| 283 | # the connection pool. |
| 284 | # |
| 285 | # Configure the frequency by setting +reaping_frequency+ in your database |
| 286 | # yaml file (default 60 seconds). |
| 287 | class Reaper |
| 288 | attr_reader :pool, :frequency |
| 289 | |
| 290 | def initialize(pool, frequency) |
| 291 | @pool = pool |
| 292 | @frequency = frequency |
| 293 | end |
| 294 | |
| 295 | def run |
| 296 | return unless frequency && frequency > 0 |
| 297 | Thread.new(frequency, pool) { |t, p| |
| 298 | loop do |
| 299 | sleep t |
| 300 | p.reap |
| 301 | p.flush |
| 302 | end |
| 303 | } |
| 304 | end |
| 305 | end |
| 306 | |
| 307 | include MonitorMixin |
| 308 | include QueryCache::ConnectionPoolConfiguration |
| 309 | |
| 310 | attr_accessor :automatic_reconnect, :checkout_timeout, :schema_cache |
| 311 | attr_reader :spec, :connections, :size, :reaper |
| 312 | |
| 313 | # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification |
| 314 | # object which describes database connection information (e.g. adapter, |
| 315 | # host name, username, password, etc), as well as the maximum size for |
| 316 | # this ConnectionPool. |
| 317 | # |
| 318 | # The default ConnectionPool maximum size is 5. |
| 319 | def initialize(spec) |
| 320 | super() |
| 321 | |
| 322 | @spec = spec |
| 323 | |
| 324 | @checkout_timeout = (spec.config[:checkout_timeout] && spec.config[:checkout_timeout].to_f) || 5 |
| 325 | if @idle_timeout = spec.config.fetch(:idle_timeout, 300) |
| 326 | @idle_timeout = @idle_timeout.to_f |
| 327 | @idle_timeout = nil if @idle_timeout <= 0 |
| 328 | end |
| 329 | |
| 330 | # default max pool size to 5 |
| 331 | @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 |
| 332 | |
| 333 | # This variable tracks the cache of threads mapped to reserved connections, with the |
| 334 | # sole purpose of speeding up the +connection+ method. It is not the authoritative |
| 335 | # registry of which thread owns which connection. Connection ownership is tracked by |
| 336 | # the +connection.owner+ attr on each +connection+ instance. |
| 337 | # The invariant works like this: if there is mapping of <tt>thread => conn</tt>, |
| 338 | # then that +thread+ does indeed own that +conn+. However, an absence of a such |
| 339 | # mapping does not mean that the +thread+ doesn't own the said connection. In |
| 340 | # that case +conn.owner+ attr should be consulted. |
| 341 | # Access and modification of <tt>@thread_cached_conns</tt> does not require |
| 342 | # synchronization. |
| 343 | @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size) |
| 344 | |
| 345 | @connections = [] |
| 346 | @automatic_reconnect = true |
| 347 | |
| 348 | # Connection pool allows for concurrent (outside the main +synchronize+ section) |
| 349 | # establishment of new connections. This variable tracks the number of threads |
| 350 | # currently in the process of independently establishing connections to the DB. |
| 351 | @now_connecting = 0 |
| 352 | |
| 353 | @threads_blocking_new_connections = 0 |
| 354 | |
| 355 | @available = ConnectionLeasingQueue.new self |
| 356 | |
| 357 | @lock_thread = false |
| 358 | |
| 359 | # +reaping_frequency+ is configurable mostly for historical reasons, but it could |
| 360 | # also be useful if someone wants a very low +idle_timeout+. |
| 361 | reaping_frequency = spec.config.fetch(:reaping_frequency, 60) |
| 362 | @reaper = Reaper.new(self, reaping_frequency && reaping_frequency.to_f) |
| 363 | @reaper.run |
| 364 | end |
| 365 | |
| 366 | def lock_thread=(lock_thread) |
| 367 | if lock_thread |
| 368 | @lock_thread = Thread.current |
| 369 | else |
| 370 | @lock_thread = nil |
| 371 | end |
| 372 | end |
| 373 | |
| 374 | # Retrieve the connection associated with the current thread, or call |
| 375 | # #checkout to obtain one if necessary. |
| 376 | # |
| 377 | # #connection can be called any number of times; the connection is |
| 378 | # held in a cache keyed by a thread. |
| 379 | def connection |
| 380 | @thread_cached_conns[connection_cache_key(@lock_thread || Thread.current)] ||= checkout |
| 381 | end |
| 382 | |
| 383 | # Returns true if there is an open connection being used for the current thread. |
| 384 | # |
| 385 | # This method only works for connections that have been obtained through |
| 386 | # #connection or #with_connection methods. Connections obtained through |
| 387 | # #checkout will not be detected by #active_connection? |
| 388 | def active_connection? |
| 389 | @thread_cached_conns[connection_cache_key(Thread.current)] |
| 390 | end |
| 391 | |
| 392 | # Signal that the thread is finished with the current connection. |
| 393 | # #release_connection releases the connection-thread association |
| 394 | # and returns the connection to the pool. |
| 395 | # |
| 396 | # This method only works for connections that have been obtained through |
| 397 | # #connection or #with_connection methods, connections obtained through |
| 398 | # #checkout will not be automatically released. |
| 399 | def release_connection(owner_thread = Thread.current) |
| 400 | if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread)) |
| 401 | checkin conn |
| 402 | end |
| 403 | end |
| 404 | |
| 405 | # If a connection obtained through #connection or #with_connection methods |
| 406 | # already exists yield it to the block. If no such connection |
| 407 | # exists checkout a connection, yield it to the block, and checkin the |
| 408 | # connection when finished. |
| 409 | def with_connection |
| 410 | unless conn = @thread_cached_conns[connection_cache_key(Thread.current)] |
| 411 | conn = connection |
| 412 | fresh_connection = true |
| 413 | end |
| 414 | yield conn |
| 415 | ensure |
| 416 | release_connection if fresh_connection |
| 417 | end |
| 418 | |
| 419 | # Returns true if a connection has already been opened. |
| 420 | def connected? |
| 421 | synchronize { @connections.any? } |
| 422 | end |
| 423 | |
| 424 | # Disconnects all connections in the pool, and clears the pool. |
| 425 | # |
| 426 | # Raises: |
| 427 | # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all |
| 428 | # connections in the pool within a timeout interval (default duration is |
| 429 | # <tt>spec.config[:checkout_timeout] * 2</tt> seconds). |
| 430 | def disconnect(raise_on_acquisition_timeout = true) |
| 431 | with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do |
| 432 | synchronize do |
| 433 | @connections.each do |conn| |
| 434 | if conn.in_use? |
| 435 | conn.steal! |
| 436 | checkin conn |
| 437 | end |
| 438 | conn.disconnect! |
| 439 | end |
| 440 | @connections = [] |
| 441 | @available.clear |
| 442 | end |
| 443 | end |
| 444 | end |
| 445 | |
| 446 | # Disconnects all connections in the pool, and clears the pool. |
| 447 | # |
| 448 | # The pool first tries to gain ownership of all connections. If unable to |
| 449 | # do so within a timeout interval (default duration is |
| 450 | # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool is forcefully |
| 451 | # disconnected without any regard for other connection owning threads. |
| 452 | def disconnect! |
| 453 | disconnect(false) |
| 454 | end |
| 455 | |
| 456 | # Discards all connections in the pool (even if they're currently |
| 457 | # leased!), along with the pool itself. Any further interaction with the |
| 458 | # pool (except #spec and #schema_cache) is undefined. |
| 459 | # |
| 460 | # See AbstractAdapter#discard! |
| 461 | def discard! # :nodoc: |
| 462 | synchronize do |
| 463 | return if @connections.nil? # already discarded |
| 464 | @connections.each do |conn| |
| 465 | conn.discard! |
| 466 | end |
| 467 | @connections = @available = @thread_cached_conns = nil |
| 468 | end |
| 469 | end |
| 470 | |
| 471 | # Clears the cache which maps classes and re-connects connections that |
| 472 | # require reloading. |
| 473 | # |
| 474 | # Raises: |
| 475 | # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all |
| 476 | # connections in the pool within a timeout interval (default duration is |
| 477 | # <tt>spec.config[:checkout_timeout] * 2</tt> seconds). |
| 478 | def clear_reloadable_connections(raise_on_acquisition_timeout = true) |
| 479 | with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do |
| 480 | synchronize do |
| 481 | @connections.each do |conn| |
| 482 | if conn.in_use? |
| 483 | conn.steal! |
| 484 | checkin conn |
| 485 | end |
| 486 | conn.disconnect! if conn.requires_reloading? |
| 487 | end |
| 488 | @connections.delete_if(&:requires_reloading?) |
| 489 | @available.clear |
| 490 | end |
| 491 | end |
| 492 | end |
| 493 | |
| 494 | # Clears the cache which maps classes and re-connects connections that |
| 495 | # require reloading. |
| 496 | # |
| 497 | # The pool first tries to gain ownership of all connections. If unable to |
| 498 | # do so within a timeout interval (default duration is |
| 499 | # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool forcefully |
| 500 | # clears the cache and reloads connections without any regard for other |
| 501 | # connection owning threads. |
| 502 | def clear_reloadable_connections! |
| 503 | clear_reloadable_connections(false) |
| 504 | end |
| 505 | |
| 506 | # Check-out a database connection from the pool, indicating that you want |
| 507 | # to use it. You should call #checkin when you no longer need this. |
| 508 | # |
| 509 | # This is done by either returning and leasing existing connection, or by |
| 510 | # creating a new connection and leasing it. |
| 511 | # |
| 512 | # If all connections are leased and the pool is at capacity (meaning the |
| 513 | # number of currently leased connections is greater than or equal to the |
| 514 | # size limit set), an ActiveRecord::ConnectionTimeoutError exception will be raised. |
| 515 | # |
| 516 | # Returns: an AbstractAdapter object. |
| 517 | # |
| 518 | # Raises: |
| 519 | # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool. |
| 520 | def checkout(checkout_timeout = @checkout_timeout) |
| 521 | checkout_and_verify(acquire_connection(checkout_timeout)) |
| 522 | end |
| 523 | |
| 524 | # Check-in a database connection back into the pool, indicating that you |
| 525 | # no longer need this connection. |
| 526 | # |
| 527 | # +conn+: an AbstractAdapter object, which was obtained by earlier by |
| 528 | # calling #checkout on this pool. |
| 529 | def checkin(conn) |
| 530 | conn.lock.synchronize do |
| 531 | synchronize do |
| 532 | remove_connection_from_thread_cache conn |
| 533 | |
| 534 | conn._run_checkin_callbacks do |
| 535 | conn.expire |
| 536 | end |
| 537 | |
| 538 | @available.add conn |
| 539 | end |
| 540 | end |
| 541 | end |
| 542 | |
| 543 | # Remove a connection from the connection pool. The connection will |
| 544 | # remain open and active but will no longer be managed by this pool. |
| 545 | def remove(conn) |
| 546 | needs_new_connection = false |
| 547 | |
| 548 | synchronize do |
| 549 | remove_connection_from_thread_cache conn |
| 550 | |
| 551 | @connections.delete conn |
| 552 | @available.delete conn |
| 553 | |
| 554 | # @available.any_waiting? => true means that prior to removing this |
| 555 | # conn, the pool was at its max size (@connections.size == @size). |
| 556 | # This would mean that any threads stuck waiting in the queue wouldn't |
| 557 | # know they could checkout_new_connection, so let's do it for them. |
| 558 | # Because condition-wait loop is encapsulated in the Queue class |
| 559 | # (that in turn is oblivious to ConnectionPool implementation), threads |
| 560 | # that are "stuck" there are helpless. They have no way of creating |
| 561 | # new connections and are completely reliant on us feeding available |
| 562 | # connections into the Queue. |
| 563 | needs_new_connection = @available.any_waiting? |
| 564 | end |
| 565 | |
| 566 | # This is intentionally done outside of the synchronized section as we |
| 567 | # would like not to hold the main mutex while checking out new connections. |
| 568 | # Thus there is some chance that needs_new_connection information is now |
| 569 | # stale, we can live with that (bulk_make_new_connections will make |
| 570 | # sure not to exceed the pool's @size limit). |
| 571 | bulk_make_new_connections(1) if needs_new_connection |
| 572 | end |
| 573 | |
| 574 | # Recover lost connections for the pool. A lost connection can occur if |
| 575 | # a programmer forgets to checkin a connection at the end of a thread |
| 576 | # or a thread dies unexpectedly. |
| 577 | def reap |
| 578 | stale_connections = synchronize do |
| 579 | @connections.select do |conn| |
| 580 | conn.in_use? && !conn.owner.alive? |
| 581 | end.each do |conn| |
| 582 | conn.steal! |
| 583 | end |
| 584 | end |
| 585 | |
| 586 | stale_connections.each do |conn| |
| 587 | if conn.active? |
| 588 | conn.reset! |
| 589 | checkin conn |
| 590 | else |
| 591 | remove conn |
| 592 | end |
| 593 | end |
| 594 | end |
| 595 | |
| 596 | # Disconnect all connections that have been idle for at least |
| 597 | # +minimum_idle+ seconds. Connections currently checked out, or that were |
| 598 | # checked in less than +minimum_idle+ seconds ago, are unaffected. |
| 599 | def flush(minimum_idle = @idle_timeout) |
| 600 | return if minimum_idle.nil? |
| 601 | |
| 602 | idle_connections = synchronize do |
| 603 | @connections.select do |conn| |
| 604 | !conn.in_use? && conn.seconds_idle >= minimum_idle |
| 605 | end.each do |conn| |
| 606 | conn.lease |
| 607 | |
| 608 | @available.delete conn |
| 609 | @connections.delete conn |
| 610 | end |
| 611 | end |
| 612 | |
| 613 | idle_connections.each do |conn| |
| 614 | conn.disconnect! |
| 615 | end |
| 616 | end |
| 617 | |
| 618 | # Disconnect all currently idle connections. Connections currently checked |
| 619 | # out are unaffected. |
| 620 | def flush! |
| 621 | reap |
| 622 | flush(-1) |
| 623 | end |
| 624 | |
| 625 | def num_waiting_in_queue # :nodoc: |
| 626 | @available.num_waiting |
| 627 | end |
| 628 | |
| 629 | # Return connection pool's usage statistic |
| 630 | # Example: |
| 631 | # |
| 632 | # ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 } |
| 633 | def stat |
| 634 | synchronize do |
| 635 | { |
| 636 | size: size, |
| 637 | connections: @connections.size, |
| 638 | busy: @connections.count { |c| c.in_use? && c.owner.alive? }, |
| 639 | dead: @connections.count { |c| c.in_use? && !c.owner.alive? }, |
| 640 | idle: @connections.count { |c| !c.in_use? }, |
| 641 | waiting: num_waiting_in_queue, |
| 642 | checkout_timeout: checkout_timeout |
| 643 | } |
| 644 | end |
| 645 | end |
| 646 | |
| 647 | private |
| 648 | #-- |
| 649 | # this is unfortunately not concurrent |
| 650 | def bulk_make_new_connections(num_new_conns_needed) |
| 651 | num_new_conns_needed.times do |
| 652 | # try_to_checkout_new_connection will not exceed pool's @size limit |
| 653 | if new_conn = try_to_checkout_new_connection |
| 654 | # make the new_conn available to the starving threads stuck @available Queue |
| 655 | checkin(new_conn) |
| 656 | end |
| 657 | end |
| 658 | end |
| 659 | |
| 660 | #-- |
| 661 | # From the discussion on GitHub: |
| 662 | # https://github.com/rails/rails/pull/14938#commitcomment-6601951 |
| 663 | # This hook-in method allows for easier monkey-patching fixes needed by |
| 664 | # JRuby users that use Fibers. |
| 665 | def connection_cache_key(thread) |
| 666 | thread |
| 667 | end |
| 668 | |
| 669 | # Take control of all existing connections so a "group" action such as |
| 670 | # reload/disconnect can be performed safely. It is no longer enough to |
| 671 | # wrap it in +synchronize+ because some pool's actions are allowed |
| 672 | # to be performed outside of the main +synchronize+ block. |
| 673 | def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true) |
| 674 | with_new_connections_blocked do |
| 675 | attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout) |
| 676 | yield |
| 677 | end |
| 678 | end |
| 679 | |
| 680 | def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true) |
| 681 | collected_conns = synchronize do |
| 682 | # account for our own connections |
| 683 | @connections.select { |conn| conn.owner == Thread.current } |
| 684 | end |
| 685 | |
| 686 | newly_checked_out = [] |
| 687 | timeout_time = Time.now + (@checkout_timeout * 2) |
| 688 | |
| 689 | @available.with_a_bias_for(Thread.current) do |
| 690 | loop do |
| 691 | synchronize do |
| 692 | return if collected_conns.size == @connections.size && @now_connecting == 0 |
| 693 | remaining_timeout = timeout_time - Time.now |
| 694 | remaining_timeout = 0 if remaining_timeout < 0 |
| 695 | conn = checkout_for_exclusive_access(remaining_timeout) |
| 696 | collected_conns << conn |
| 697 | newly_checked_out << conn |
| 698 | end |
| 699 | end |
| 700 | end |
| 701 | rescue ExclusiveConnectionTimeoutError |
| 702 | # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any |
| 703 | # timeouts and are expected to just give up: we've obtained as many connections |
| 704 | # as possible, note that in a case like that we don't return any of the |
| 705 | # +newly_checked_out+ connections. |
| 706 | |
| 707 | if raise_on_acquisition_timeout |
| 708 | release_newly_checked_out = true |
| 709 | raise |
| 710 | end |
| 711 | rescue Exception # if something else went wrong |
| 712 | # this can't be a "naked" rescue, because we have should return conns |
| 713 | # even for non-StandardErrors |
| 714 | release_newly_checked_out = true |
| 715 | raise |
| 716 | ensure |
| 717 | if release_newly_checked_out && newly_checked_out |
| 718 | # releasing only those conns that were checked out in this method, conns |
| 719 | # checked outside this method (before it was called) are not for us to release |
| 720 | newly_checked_out.each { |conn| checkin(conn) } |
| 721 | end |
| 722 | end |
| 723 | |
| 724 | #-- |
| 725 | # Must be called in a synchronize block. |
| 726 | def checkout_for_exclusive_access(checkout_timeout) |
| 727 | checkout(checkout_timeout) |
| 728 | rescue ConnectionTimeoutError |
| 729 | # this block can't be easily moved into attempt_to_checkout_all_existing_connections's |
| 730 | # rescue block, because doing so would put it outside of synchronize section, without |
| 731 | # being in a critical section thread_report might become inaccurate |
| 732 | msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds".dup |
| 733 | |
| 734 | thread_report = [] |
| 735 | @connections.each do |conn| |
| 736 | unless conn.owner == Thread.current |
| 737 | thread_report << "#{conn} is owned by #{conn.owner}" |
| 738 | end |
| 739 | end |
| 740 | |
| 741 | msg << " (#{thread_report.join(', ')})" if thread_report.any? |
| 742 | |
| 743 | raise ExclusiveConnectionTimeoutError, msg |
| 744 | end |
| 745 | |
| 746 | def with_new_connections_blocked |
| 747 | synchronize do |
| 748 | @threads_blocking_new_connections += 1 |
| 749 | end |
| 750 | |
| 751 | yield |
| 752 | ensure |
| 753 | num_new_conns_required = 0 |
| 754 | |
| 755 | synchronize do |
| 756 | @threads_blocking_new_connections -= 1 |
| 757 | |
| 758 | if @threads_blocking_new_connections.zero? |
| 759 | @available.clear |
| 760 | |
| 761 | num_new_conns_required = num_waiting_in_queue |
| 762 | |
| 763 | @connections.each do |conn| |
| 764 | next if conn.in_use? |
| 765 | |
| 766 | @available.add conn |
| 767 | num_new_conns_required -= 1 |
| 768 | end |
| 769 | end |
| 770 | end |
| 771 | |
| 772 | bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0 |
| 773 | end |
| 774 | |
| 775 | # Acquire a connection by one of 1) immediately removing one |
| 776 | # from the queue of available connections, 2) creating a new |
| 777 | # connection if the pool is not at capacity, 3) waiting on the |
| 778 | # queue for a connection to become available. |
| 779 | # |
| 780 | # Raises: |
| 781 | # - ActiveRecord::ConnectionTimeoutError if a connection could not be acquired |
| 782 | # |
| 783 | #-- |
| 784 | # Implementation detail: the connection returned by +acquire_connection+ |
| 785 | # will already be "+connection.lease+ -ed" to the current thread. |
| 786 | def acquire_connection(checkout_timeout) |
| 787 | # NOTE: we rely on <tt>@available.poll</tt> and +try_to_checkout_new_connection+ to |
| 788 | # +conn.lease+ the returned connection (and to do this in a +synchronized+ |
| 789 | # section). This is not the cleanest implementation, as ideally we would |
| 790 | # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to <tt>@available.poll</tt> |
| 791 | # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections |
| 792 | # of the said methods and avoid an additional +synchronize+ overhead. |
| 793 | if conn = @available.poll || try_to_checkout_new_connection |
| 794 | conn |
| 795 | else |
| 796 | reap |
| 797 | @available.poll(checkout_timeout) |
| 798 | end |
| 799 | end |
| 800 | |
| 801 | #-- |
| 802 | # if owner_thread param is omitted, this must be called in synchronize block |
| 803 | def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) |
| 804 | @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn) |
| 805 | end |
| 806 | alias_method :release, :remove_connection_from_thread_cache |
| 807 | |
| 808 | def new_connection |
| 809 | Base.send(spec.adapter_method, spec.config).tap do |conn| |
| 810 | conn.schema_cache = schema_cache.dup if schema_cache |
| 811 | end |
| 812 | end |
| 813 | |
| 814 | # If the pool is not at a <tt>@size</tt> limit, establish new connection. Connecting |
| 815 | # to the DB is done outside main synchronized section. |
| 816 | #-- |
| 817 | # Implementation constraint: a newly established connection returned by this |
| 818 | # method must be in the +.leased+ state. |
| 819 | def try_to_checkout_new_connection |
| 820 | # first in synchronized section check if establishing new conns is allowed |
| 821 | # and increment @now_connecting, to prevent overstepping this pool's @size |
| 822 | # constraint |
| 823 | do_checkout = synchronize do |
| 824 | if @threads_blocking_new_connections.zero? && (@connections.size + @now_connecting) < @size |
| 825 | @now_connecting += 1 |
| 826 | end |
| 827 | end |
| 828 | if do_checkout |
| 829 | begin |
| 830 | # if successfully incremented @now_connecting establish new connection |
| 831 | # outside of synchronized section |
| 832 | conn = checkout_new_connection |
| 833 | ensure |
| 834 | synchronize do |
| 835 | if conn |
| 836 | adopt_connection(conn) |
| 837 | # returned conn needs to be already leased |
| 838 | conn.lease |
| 839 | end |
| 840 | @now_connecting -= 1 |
| 841 | end |
| 842 | end |
| 843 | end |
| 844 | end |
| 845 | |
| 846 | def adopt_connection(conn) |
| 847 | conn.pool = self |
| 848 | @connections << conn |
| 849 | end |
| 850 | |
| 851 | def checkout_new_connection |
| 852 | raise ConnectionNotEstablished unless @automatic_reconnect |
| 853 | new_connection |
| 854 | end |
| 855 | |
| 856 | def checkout_and_verify(c) |
| 857 | c._run_checkout_callbacks do |
| 858 | c.verify! |
| 859 | end |
| 860 | c |
| 861 | rescue |
| 862 | remove c |
| 863 | c.disconnect! |
| 864 | raise |
| 865 | end |
| 866 | end |
| 867 | |
| 868 | # ConnectionHandler is a collection of ConnectionPool objects. It is used |
| 869 | # for keeping separate connection pools that connect to different databases. |
| 870 | # |
| 871 | # For example, suppose that you have 5 models, with the following hierarchy: |
| 872 | # |
| 873 | # class Author < ActiveRecord::Base |
| 874 | # end |
| 875 | # |
| 876 | # class BankAccount < ActiveRecord::Base |
| 877 | # end |
| 878 | # |
| 879 | # class Book < ActiveRecord::Base |
| 880 | # establish_connection :library_db |
| 881 | # end |
| 882 | # |
| 883 | # class ScaryBook < Book |
| 884 | # end |
| 885 | # |
| 886 | # class GoodBook < Book |
| 887 | # end |
| 888 | # |
| 889 | # And a database.yml that looked like this: |
| 890 | # |
| 891 | # development: |
| 892 | # database: my_application |
| 893 | # host: localhost |
| 894 | # |
| 895 | # library_db: |
| 896 | # database: library |
| 897 | # host: some.library.org |
| 898 | # |
| 899 | # Your primary database in the development environment is "my_application" |
| 900 | # but the Book model connects to a separate database called "library_db" |
| 901 | # (this can even be a database on a different machine). |
| 902 | # |
| 903 | # Book, ScaryBook and GoodBook will all use the same connection pool to |
| 904 | # "library_db" while Author, BankAccount, and any other models you create |
| 905 | # will use the default connection pool to "my_application". |
| 906 | # |
| 907 | # The various connection pools are managed by a single instance of |
| 908 | # ConnectionHandler accessible via ActiveRecord::Base.connection_handler. |
| 909 | # All Active Record models use this handler to determine the connection pool that they |
| 910 | # should use. |
| 911 | # |
| 912 | # The ConnectionHandler class is not coupled with the Active models, as it has no knowledge |
| 913 | # about the model. The model needs to pass a specification name to the handler, |
| 914 | # in order to look up the correct connection pool. |
| 915 | class ConnectionHandler |
| 916 | def self.unowned_pool_finalizer(pid_map) # :nodoc: |
| 917 | lambda do |_| |
| 918 | discard_unowned_pools(pid_map) |
| 919 | end |
| 920 | end |
| 921 | |
| 922 | def self.discard_unowned_pools(pid_map) # :nodoc: |
| 923 | pid_map.each do |pid, pools| |
| 924 | pools.values.compact.each(&:discard!) unless pid == Process.pid |
| 925 | end |
| 926 | end |
| 927 | |
| 928 | def initialize |
| 929 | # These caches are keyed by spec.name (ConnectionSpecification#name). |
| 930 | @owner_to_pool = Concurrent::Map.new(initial_capacity: 2) do |h, k| |
| 931 | # Discard the parent's connection pools immediately; we have no need |
| 932 | # of them |
| 933 | ConnectionHandler.discard_unowned_pools(h) |
| 934 | |
| 935 | h[k] = Concurrent::Map.new(initial_capacity: 2) |
| 936 | end |
| 937 | |
| 938 | # Backup finalizer: if the forked child never needed a pool, the above |
| 939 | # early discard has not occurred |
| 940 | ObjectSpace.define_finalizer self, ConnectionHandler.unowned_pool_finalizer(@owner_to_pool) |
| 941 | end |
| 942 | |
| 943 | def connection_pool_list |
| 944 | owner_to_pool.values.compact |
| 945 | end |
| 946 | alias :connection_pools :connection_pool_list |
| 947 | |
| 948 | def establish_connection(config) |
| 949 | resolver = ConnectionSpecification::Resolver.new(Base.configurations) |
| 950 | spec = resolver.spec(config) |
| 951 | |
| 952 | remove_connection(spec.name) |
| 953 | |
| 954 | message_bus = ActiveSupport::Notifications.instrumenter |
| 955 | payload = { |
| 956 | connection_id: object_id |
| 957 | } |
| 958 | if spec |
| 959 | payload[:spec_name] = spec.name |
| 960 | payload[:config] = spec.config |
| 961 | end |
| 962 | |
| 963 | message_bus.instrument("!connection.active_record", payload) do |
| 964 | owner_to_pool[spec.name] = ConnectionAdapters::ConnectionPool.new(spec) |
| 965 | end |
| 966 | |
| 967 | owner_to_pool[spec.name] |
| 968 | end |
| 969 | |
| 970 | # Returns true if there are any active connections among the connection |
| 971 | # pools that the ConnectionHandler is managing. |
| 972 | def active_connections? |
| 973 | connection_pool_list.any?(&:active_connection?) |
| 974 | end |
| 975 | |
| 976 | # Returns any connections in use by the current thread back to the pool, |
| 977 | # and also returns connections to the pool cached by threads that are no |
| 978 | # longer alive. |
| 979 | def clear_active_connections! |
| 980 | connection_pool_list.each(&:release_connection) |
| 981 | end |
| 982 | |
| 983 | # Clears the cache which maps classes. |
| 984 | # |
| 985 | # See ConnectionPool#clear_reloadable_connections! for details. |
| 986 | def clear_reloadable_connections! |
| 987 | connection_pool_list.each(&:clear_reloadable_connections!) |
| 988 | end |
| 989 | |
| 990 | def clear_all_connections! |
| 991 | connection_pool_list.each(&:disconnect!) |
| 992 | end |
| 993 | |
| 994 | # Disconnects all currently idle connections. |
| 995 | # |
| 996 | # See ConnectionPool#flush! for details. |
| 997 | def flush_idle_connections! |
| 998 | connection_pool_list.each(&:flush!) |
| 999 | end |
| 1000 | |
| 1001 | # Locate the connection of the nearest super class. This can be an |
| 1002 | # active or defined connection: if it is the latter, it will be |
| 1003 | # opened and set as the active connection for the class it was defined |
| 1004 | # for (not necessarily the current class). |
| 1005 | def retrieve_connection(spec_name) #:nodoc: |
| 1006 | pool = retrieve_connection_pool(spec_name) |
| 1007 | raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found." unless pool |
| 1008 | conn = pool.connection |
| 1009 | raise ConnectionNotEstablished, "No connection for '#{spec_name}' in connection pool" unless conn |
| 1010 | conn |
| 1011 | end |
| 1012 | |
| 1013 | # Returns true if a connection that's accessible to this class has |
| 1014 | # already been opened. |
| 1015 | def connected?(spec_name) |
| 1016 | conn = retrieve_connection_pool(spec_name) |
| 1017 | conn && conn.connected? |
| 1018 | end |
| 1019 | |
| 1020 | # Remove the connection for this class. This will close the active |
| 1021 | # connection and the defined connection (if they exist). The result |
| 1022 | # can be used as an argument for #establish_connection, for easily |
| 1023 | # re-establishing the connection. |
| 1024 | def remove_connection(spec_name) |
| 1025 | if pool = owner_to_pool.delete(spec_name) |
| 1026 | pool.automatic_reconnect = false |
| 1027 | pool.disconnect! |
| 1028 | pool.spec.config |
| 1029 | end |
| 1030 | end |
| 1031 | |
| 1032 | # Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool. |
| 1033 | # This makes retrieving the connection pool O(1) once the process is warm. |
| 1034 | # When a connection is established or removed, we invalidate the cache. |
| 1035 | def retrieve_connection_pool(spec_name) |
| 1036 | owner_to_pool.fetch(spec_name) do |
| 1037 | # Check if a connection was previously established in an ancestor process, |
| 1038 | # which may have been forked. |
| 1039 | if ancestor_pool = pool_from_any_process_for(spec_name) |
| 1040 | # A connection was established in an ancestor process that must have |
| 1041 | # subsequently forked. We can't reuse the connection, but we can copy |
| 1042 | # the specification and establish a new connection with it. |
| 1043 | establish_connection(ancestor_pool.spec.to_hash).tap do |pool| |
| 1044 | pool.schema_cache = ancestor_pool.schema_cache if ancestor_pool.schema_cache |
| 1045 | end |
| 1046 | else |
| 1047 | owner_to_pool[spec_name] = nil |
| 1048 | end |
| 1049 | end |
| 1050 | end |
| 1051 | |
| 1052 | private |
| 1053 | |
| 1054 | def owner_to_pool |
| 1055 | @owner_to_pool[Process.pid] |
| 1056 | end |
| 1057 | |
| 1058 | def pool_from_any_process_for(spec_name) |
| 1059 | owner_to_pool = @owner_to_pool.values.reverse.find { |v| v[spec_name] } |
| 1060 | owner_to_pool && owner_to_pool[spec_name] |
| 1061 | end |
| 1062 | end |
| 1063 | end |
| 1064 | end |