NodesCharsBranches
1230 / 12989765 / 1021645 / 64
DeepCover v0.1.16
1# frozen_string_literal: true
2
3require "thread"
4require "concurrent/map"
5require "monitor"
6
7module ActiveRecord
8 # Raised when a connection could not be obtained within the connection
9 # acquisition timeout period: because max connections in pool
10 # are in use.
11 class ConnectionTimeoutError < ConnectionNotEstablished
12 end
13
14 # Raised when a pool was unable to get ahold of all its connections
15 # to perform a "group" action such as
16 # {ActiveRecord::Base.connection_pool.disconnect!}[rdoc-ref:ConnectionAdapters::ConnectionPool#disconnect!]
17 # or {ActiveRecord::Base.clear_reloadable_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_reloadable_connections!].
18 class ExclusiveConnectionTimeoutError < ConnectionTimeoutError
19 end
20
21 module ConnectionAdapters
22 # Connection pool base class for managing Active Record database
23 # connections.
24 #
25 # == Introduction
26 #
27 # A connection pool synchronizes thread access to a limited number of
28 # database connections. The basic idea is that each thread checks out a
29 # database connection from the pool, uses that connection, and checks the
30 # connection back in. ConnectionPool is completely thread-safe, and will
31 # ensure that a connection cannot be used by two threads at the same time,
32 # as long as ConnectionPool's contract is correctly followed. It will also
33 # handle cases in which there are more threads than connections: if all
34 # connections have been checked out, and a thread tries to checkout a
35 # connection anyway, then ConnectionPool will wait until some other thread
36 # has checked in a connection.
37 #
38 # == Obtaining (checking out) a connection
39 #
40 # Connections can be obtained and used from a connection pool in several
41 # ways:
42 #
43 # 1. Simply use {ActiveRecord::Base.connection}[rdoc-ref:ConnectionHandling.connection]
44 # as with Active Record 2.1 and
45 # earlier (pre-connection-pooling). Eventually, when you're done with
46 # the connection(s) and wish it to be returned to the pool, you call
47 # {ActiveRecord::Base.clear_active_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_active_connections!].
48 # This will be the default behavior for Active Record when used in conjunction with
49 # Action Pack's request handling cycle.
50 # 2. Manually check out a connection from the pool with
51 # {ActiveRecord::Base.connection_pool.checkout}[rdoc-ref:#checkout]. You are responsible for
52 # returning this connection to the pool when finished by calling
53 # {ActiveRecord::Base.connection_pool.checkin(connection)}[rdoc-ref:#checkin].
54 # 3. Use {ActiveRecord::Base.connection_pool.with_connection(&block)}[rdoc-ref:#with_connection], which
55 # obtains a connection, yields it as the sole argument to the block,
56 # and returns it to the pool after the block completes.
57 #
58 # Connections in the pool are actually AbstractAdapter objects (or objects
59 # compatible with AbstractAdapter's interface).
60 #
61 # == Options
62 #
63 # There are several connection-pooling-related options that you can add to
64 # your database connection configuration:
65 #
66 # * +pool+: maximum number of connections the pool may manage (default 5).
67 # * +idle_timeout+: number of seconds that a connection will be kept
68 # unused in the pool before it is automatically disconnected (default
69 # 300 seconds). Set this to zero to keep connections forever.
70 # * +checkout_timeout+: number of seconds to wait for a connection to
71 # become available before giving up and raising a timeout error (default
72 # 5 seconds).
73 #
74 #--
75 # Synchronization policy:
76 # * all public methods can be called outside +synchronize+
77 # * access to these instance variables needs to be in +synchronize+:
78 # * @connections
79 # * @now_connecting
80 # * private methods that require being called in a +synchronize+ blocks
81 # are now explicitly documented
82 class ConnectionPool
83 # Threadsafe, fair, LIFO queue. Meant to be used by ConnectionPool
84 # with which it shares a Monitor.
85 class Queue
86 def initialize(lock = Monitor.new)
87 @lock = lock
88 @cond = @lock.new_cond
89 @num_waiting = 0
90 @queue = []
91 end
92
93 # Test if any threads are currently waiting on the queue.
94 def any_waiting?
95 synchronize do
96 @num_waiting > 0
97 end
98 end
99
100 # Returns the number of threads currently waiting on this
101 # queue.
102 def num_waiting
103 synchronize do
104 @num_waiting
105 end
106 end
107
108 # Add +element+ to the queue. Never blocks.
109 def add(element)
110 synchronize do
111 @queue.push element
112 @cond.signal
113 end
114 end
115
116 # If +element+ is in the queue, remove and return it, or +nil+.
117 def delete(element)
118 synchronize do
119 @queue.delete(element)
120 end
121 end
122
123 # Remove all elements from the queue.
124 def clear
125 synchronize do
126 @queue.clear
127 end
128 end
129
130 # Remove the head of the queue.
131 #
132 # If +timeout+ is not given, remove and return the head the
133 # queue if the number of available elements is strictly
134 # greater than the number of threads currently waiting (that
135 # is, don't jump ahead in line). Otherwise, return +nil+.
136 #
137 # If +timeout+ is given, block if there is no element
138 # available, waiting up to +timeout+ seconds for an element to
139 # become available.
140 #
141 # Raises:
142 # - ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element
143 # becomes available within +timeout+ seconds,
144 def poll(timeout = nil)
145 synchronize { internal_poll(timeout) }
146 end
147
148 private
149
150 def internal_poll(timeout)
151 no_wait_poll || (timeout && wait_poll(timeout))
152 end
153
154 def synchronize(&block)
155 @lock.synchronize(&block)
156 end
157
158 # Test if the queue currently contains any elements.
159 def any?
160 !@queue.empty?
161 end
162
163 # A thread can remove an element from the queue without
164 # waiting if and only if the number of currently available
165 # connections is strictly greater than the number of waiting
166 # threads.
167 def can_remove_no_wait?
168 @queue.size > @num_waiting
169 end
170
171 # Removes and returns the head of the queue if possible, or +nil+.
172 def remove
173 @queue.pop
174 end
175
176 # Remove and return the head the queue if the number of
177 # available elements is strictly greater than the number of
178 # threads currently waiting. Otherwise, return +nil+.
179 def no_wait_poll
180 remove if can_remove_no_wait?
181 end
182
183 # Waits on the queue up to +timeout+ seconds, then removes and
184 # returns the head of the queue.
185 def wait_poll(timeout)
186 @num_waiting += 1
187
188 t0 = Time.now
189 elapsed = 0
190 loop do
191 @cond.wait(timeout - elapsed)
192
193 return remove if any?
194
195 elapsed = Time.now - t0
196 if elapsed >= timeout
197 msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" %
198 [timeout, elapsed]
199 raise ConnectionTimeoutError, msg
200 end
201 end
202 ensure
203 @num_waiting -= 1
204 end
205 end
206
207 # Adds the ability to turn a basic fair FIFO queue into one
208 # biased to some thread.
209 module BiasableQueue # :nodoc:
210 class BiasedConditionVariable # :nodoc:
211 # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
212 # +signal+ and +wait+ methods are only called while holding a lock
213 def initialize(lock, other_cond, preferred_thread)
214 @real_cond = lock.new_cond
215 @other_cond = other_cond
216 @preferred_thread = preferred_thread
217 @num_waiting_on_real_cond = 0
218 end
219
220 def broadcast
221 broadcast_on_biased
222 @other_cond.broadcast
223 end
224
225 def broadcast_on_biased
226 @num_waiting_on_real_cond = 0
227 @real_cond.broadcast
228 end
229
230 def signal
231 if @num_waiting_on_real_cond > 0
232 @num_waiting_on_real_cond -= 1
233 @real_cond
234 else
235 @other_cond
236 end.signal
237 end
238
239 def wait(timeout)
240 if Thread.current == @preferred_thread
241 @num_waiting_on_real_cond += 1
242 @real_cond
243 else
244 @other_cond
245 end.wait(timeout)
246 end
247 end
248
249 def with_a_bias_for(thread)
250 previous_cond = nil
251 new_cond = nil
252 synchronize do
253 previous_cond = @cond
254 @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
255 end
256 yield
257 ensure
258 synchronize do
259 @cond = previous_cond if previous_cond
260 new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
261 end
262 end
263 end
264
265 # Connections must be leased while holding the main pool mutex. This is
266 # an internal subclass that also +.leases+ returned connections while
267 # still in queue's critical section (queue synchronizes with the same
268 # <tt>@lock</tt> as the main pool) so that a returned connection is already
269 # leased and there is no need to re-enter synchronized block.
270 class ConnectionLeasingQueue < Queue # :nodoc:
271 include BiasableQueue
272
273 private
274 def internal_poll(timeout)
275 conn = super
276 conn.lease if conn
277 conn
278 end
279 end
280
281 # Every +frequency+ seconds, the reaper will call +reap+ and +flush+ on
282 # +pool+. A reaper instantiated with a zero frequency will never reap
283 # the connection pool.
284 #
285 # Configure the frequency by setting +reaping_frequency+ in your database
286 # yaml file (default 60 seconds).
287 class Reaper
288 attr_reader :pool, :frequency
289
290 def initialize(pool, frequency)
291 @pool = pool
292 @frequency = frequency
293 end
294
295 def run
296 return unless frequency && frequency > 0
297 Thread.new(frequency, pool) { |t, p|
298 loop do
299 sleep t
300 p.reap
301 p.flush
302 end
303 }
304 end
305 end
306
307 include MonitorMixin
308 include QueryCache::ConnectionPoolConfiguration
309
310 attr_accessor :automatic_reconnect, :checkout_timeout, :schema_cache
311 attr_reader :spec, :connections, :size, :reaper
312
313 # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
314 # object which describes database connection information (e.g. adapter,
315 # host name, username, password, etc), as well as the maximum size for
316 # this ConnectionPool.
317 #
318 # The default ConnectionPool maximum size is 5.
319 def initialize(spec)
320 super()
321
322 @spec = spec
323
324 @checkout_timeout = (spec.config[:checkout_timeout] && spec.config[:checkout_timeout].to_f) || 5
325 if @idle_timeout = spec.config.fetch(:idle_timeout, 300)
326 @idle_timeout = @idle_timeout.to_f
327 @idle_timeout = nil if @idle_timeout <= 0
328 end
329
330 # default max pool size to 5
331 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
332
333 # This variable tracks the cache of threads mapped to reserved connections, with the
334 # sole purpose of speeding up the +connection+ method. It is not the authoritative
335 # registry of which thread owns which connection. Connection ownership is tracked by
336 # the +connection.owner+ attr on each +connection+ instance.
337 # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
338 # then that +thread+ does indeed own that +conn+. However, an absence of a such
339 # mapping does not mean that the +thread+ doesn't own the said connection. In
340 # that case +conn.owner+ attr should be consulted.
341 # Access and modification of <tt>@thread_cached_conns</tt> does not require
342 # synchronization.
343 @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size)
344
345 @connections = []
346 @automatic_reconnect = true
347
348 # Connection pool allows for concurrent (outside the main +synchronize+ section)
349 # establishment of new connections. This variable tracks the number of threads
350 # currently in the process of independently establishing connections to the DB.
351 @now_connecting = 0
352
353 @threads_blocking_new_connections = 0
354
355 @available = ConnectionLeasingQueue.new self
356
357 @lock_thread = false
358
359 # +reaping_frequency+ is configurable mostly for historical reasons, but it could
360 # also be useful if someone wants a very low +idle_timeout+.
361 reaping_frequency = spec.config.fetch(:reaping_frequency, 60)
362 @reaper = Reaper.new(self, reaping_frequency && reaping_frequency.to_f)
363 @reaper.run
364 end
365
366 def lock_thread=(lock_thread)
367 if lock_thread
368 @lock_thread = Thread.current
369 else
370 @lock_thread = nil
371 end
372 end
373
374 # Retrieve the connection associated with the current thread, or call
375 # #checkout to obtain one if necessary.
376 #
377 # #connection can be called any number of times; the connection is
378 # held in a cache keyed by a thread.
379 def connection
380 @thread_cached_conns[connection_cache_key(@lock_thread || Thread.current)] ||= checkout
381 end
382
383 # Returns true if there is an open connection being used for the current thread.
384 #
385 # This method only works for connections that have been obtained through
386 # #connection or #with_connection methods. Connections obtained through
387 # #checkout will not be detected by #active_connection?
388 def active_connection?
389 @thread_cached_conns[connection_cache_key(Thread.current)]
390 end
391
392 # Signal that the thread is finished with the current connection.
393 # #release_connection releases the connection-thread association
394 # and returns the connection to the pool.
395 #
396 # This method only works for connections that have been obtained through
397 # #connection or #with_connection methods, connections obtained through
398 # #checkout will not be automatically released.
399 def release_connection(owner_thread = Thread.current)
400 if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
401 checkin conn
402 end
403 end
404
405 # If a connection obtained through #connection or #with_connection methods
406 # already exists yield it to the block. If no such connection
407 # exists checkout a connection, yield it to the block, and checkin the
408 # connection when finished.
409 def with_connection
410 unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
411 conn = connection
412 fresh_connection = true
413 end
414 yield conn
415 ensure
416 release_connection if fresh_connection
417 end
418
419 # Returns true if a connection has already been opened.
420 def connected?
421 synchronize { @connections.any? }
422 end
423
424 # Disconnects all connections in the pool, and clears the pool.
425 #
426 # Raises:
427 # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
428 # connections in the pool within a timeout interval (default duration is
429 # <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
430 def disconnect(raise_on_acquisition_timeout = true)
431 with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
432 synchronize do
433 @connections.each do |conn|
434 if conn.in_use?
435 conn.steal!
436 checkin conn
437 end
438 conn.disconnect!
439 end
440 @connections = []
441 @available.clear
442 end
443 end
444 end
445
446 # Disconnects all connections in the pool, and clears the pool.
447 #
448 # The pool first tries to gain ownership of all connections. If unable to
449 # do so within a timeout interval (default duration is
450 # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool is forcefully
451 # disconnected without any regard for other connection owning threads.
452 def disconnect!
453 disconnect(false)
454 end
455
456 # Discards all connections in the pool (even if they're currently
457 # leased!), along with the pool itself. Any further interaction with the
458 # pool (except #spec and #schema_cache) is undefined.
459 #
460 # See AbstractAdapter#discard!
461 def discard! # :nodoc:
462 synchronize do
463 return if @connections.nil? # already discarded
464 @connections.each do |conn|
465 conn.discard!
466 end
467 @connections = @available = @thread_cached_conns = nil
468 end
469 end
470
471 # Clears the cache which maps classes and re-connects connections that
472 # require reloading.
473 #
474 # Raises:
475 # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all
476 # connections in the pool within a timeout interval (default duration is
477 # <tt>spec.config[:checkout_timeout] * 2</tt> seconds).
478 def clear_reloadable_connections(raise_on_acquisition_timeout = true)
479 with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
480 synchronize do
481 @connections.each do |conn|
482 if conn.in_use?
483 conn.steal!
484 checkin conn
485 end
486 conn.disconnect! if conn.requires_reloading?
487 end
488 @connections.delete_if(&:requires_reloading?)
489 @available.clear
490 end
491 end
492 end
493
494 # Clears the cache which maps classes and re-connects connections that
495 # require reloading.
496 #
497 # The pool first tries to gain ownership of all connections. If unable to
498 # do so within a timeout interval (default duration is
499 # <tt>spec.config[:checkout_timeout] * 2</tt> seconds), then the pool forcefully
500 # clears the cache and reloads connections without any regard for other
501 # connection owning threads.
502 def clear_reloadable_connections!
503 clear_reloadable_connections(false)
504 end
505
506 # Check-out a database connection from the pool, indicating that you want
507 # to use it. You should call #checkin when you no longer need this.
508 #
509 # This is done by either returning and leasing existing connection, or by
510 # creating a new connection and leasing it.
511 #
512 # If all connections are leased and the pool is at capacity (meaning the
513 # number of currently leased connections is greater than or equal to the
514 # size limit set), an ActiveRecord::ConnectionTimeoutError exception will be raised.
515 #
516 # Returns: an AbstractAdapter object.
517 #
518 # Raises:
519 # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
520 def checkout(checkout_timeout = @checkout_timeout)
521 checkout_and_verify(acquire_connection(checkout_timeout))
522 end
523
524 # Check-in a database connection back into the pool, indicating that you
525 # no longer need this connection.
526 #
527 # +conn+: an AbstractAdapter object, which was obtained by earlier by
528 # calling #checkout on this pool.
529 def checkin(conn)
530 conn.lock.synchronize do
531 synchronize do
532 remove_connection_from_thread_cache conn
533
534 conn._run_checkin_callbacks do
535 conn.expire
536 end
537
538 @available.add conn
539 end
540 end
541 end
542
543 # Remove a connection from the connection pool. The connection will
544 # remain open and active but will no longer be managed by this pool.
545 def remove(conn)
546 needs_new_connection = false
547
548 synchronize do
549 remove_connection_from_thread_cache conn
550
551 @connections.delete conn
552 @available.delete conn
553
554 # @available.any_waiting? => true means that prior to removing this
555 # conn, the pool was at its max size (@connections.size == @size).
556 # This would mean that any threads stuck waiting in the queue wouldn't
557 # know they could checkout_new_connection, so let's do it for them.
558 # Because condition-wait loop is encapsulated in the Queue class
559 # (that in turn is oblivious to ConnectionPool implementation), threads
560 # that are "stuck" there are helpless. They have no way of creating
561 # new connections and are completely reliant on us feeding available
562 # connections into the Queue.
563 needs_new_connection = @available.any_waiting?
564 end
565
566 # This is intentionally done outside of the synchronized section as we
567 # would like not to hold the main mutex while checking out new connections.
568 # Thus there is some chance that needs_new_connection information is now
569 # stale, we can live with that (bulk_make_new_connections will make
570 # sure not to exceed the pool's @size limit).
571 bulk_make_new_connections(1) if needs_new_connection
572 end
573
574 # Recover lost connections for the pool. A lost connection can occur if
575 # a programmer forgets to checkin a connection at the end of a thread
576 # or a thread dies unexpectedly.
577 def reap
578 stale_connections = synchronize do
579 @connections.select do |conn|
580 conn.in_use? && !conn.owner.alive?
581 end.each do |conn|
582 conn.steal!
583 end
584 end
585
586 stale_connections.each do |conn|
587 if conn.active?
588 conn.reset!
589 checkin conn
590 else
591 remove conn
592 end
593 end
594 end
595
596 # Disconnect all connections that have been idle for at least
597 # +minimum_idle+ seconds. Connections currently checked out, or that were
598 # checked in less than +minimum_idle+ seconds ago, are unaffected.
599 def flush(minimum_idle = @idle_timeout)
600 return if minimum_idle.nil?
601
602 idle_connections = synchronize do
603 @connections.select do |conn|
604 !conn.in_use? && conn.seconds_idle >= minimum_idle
605 end.each do |conn|
606 conn.lease
607
608 @available.delete conn
609 @connections.delete conn
610 end
611 end
612
613 idle_connections.each do |conn|
614 conn.disconnect!
615 end
616 end
617
618 # Disconnect all currently idle connections. Connections currently checked
619 # out are unaffected.
620 def flush!
621 reap
622 flush(-1)
623 end
624
625 def num_waiting_in_queue # :nodoc:
626 @available.num_waiting
627 end
628
629 # Return connection pool's usage statistic
630 # Example:
631 #
632 # ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
633 def stat
634 synchronize do
635 {
636 size: size,
637 connections: @connections.size,
638 busy: @connections.count { |c| c.in_use? && c.owner.alive? },
639 dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
640 idle: @connections.count { |c| !c.in_use? },
641 waiting: num_waiting_in_queue,
642 checkout_timeout: checkout_timeout
643 }
644 end
645 end
646
647 private
648 #--
649 # this is unfortunately not concurrent
650 def bulk_make_new_connections(num_new_conns_needed)
651 num_new_conns_needed.times do
652 # try_to_checkout_new_connection will not exceed pool's @size limit
653 if new_conn = try_to_checkout_new_connection
654 # make the new_conn available to the starving threads stuck @available Queue
655 checkin(new_conn)
656 end
657 end
658 end
659
660 #--
661 # From the discussion on GitHub:
662 # https://github.com/rails/rails/pull/14938#commitcomment-6601951
663 # This hook-in method allows for easier monkey-patching fixes needed by
664 # JRuby users that use Fibers.
665 def connection_cache_key(thread)
666 thread
667 end
668
669 # Take control of all existing connections so a "group" action such as
670 # reload/disconnect can be performed safely. It is no longer enough to
671 # wrap it in +synchronize+ because some pool's actions are allowed
672 # to be performed outside of the main +synchronize+ block.
673 def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true)
674 with_new_connections_blocked do
675 attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout)
676 yield
677 end
678 end
679
680 def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
681 collected_conns = synchronize do
682 # account for our own connections
683 @connections.select { |conn| conn.owner == Thread.current }
684 end
685
686 newly_checked_out = []
687 timeout_time = Time.now + (@checkout_timeout * 2)
688
689 @available.with_a_bias_for(Thread.current) do
690 loop do
691 synchronize do
692 return if collected_conns.size == @connections.size && @now_connecting == 0
693 remaining_timeout = timeout_time - Time.now
694 remaining_timeout = 0 if remaining_timeout < 0
695 conn = checkout_for_exclusive_access(remaining_timeout)
696 collected_conns << conn
697 newly_checked_out << conn
698 end
699 end
700 end
701 rescue ExclusiveConnectionTimeoutError
702 # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any
703 # timeouts and are expected to just give up: we've obtained as many connections
704 # as possible, note that in a case like that we don't return any of the
705 # +newly_checked_out+ connections.
706
707 if raise_on_acquisition_timeout
708 release_newly_checked_out = true
709 raise
710 end
711 rescue Exception # if something else went wrong
712 # this can't be a "naked" rescue, because we have should return conns
713 # even for non-StandardErrors
714 release_newly_checked_out = true
715 raise
716 ensure
717 if release_newly_checked_out && newly_checked_out
718 # releasing only those conns that were checked out in this method, conns
719 # checked outside this method (before it was called) are not for us to release
720 newly_checked_out.each { |conn| checkin(conn) }
721 end
722 end
723
724 #--
725 # Must be called in a synchronize block.
726 def checkout_for_exclusive_access(checkout_timeout)
727 checkout(checkout_timeout)
728 rescue ConnectionTimeoutError
729 # this block can't be easily moved into attempt_to_checkout_all_existing_connections's
730 # rescue block, because doing so would put it outside of synchronize section, without
731 # being in a critical section thread_report might become inaccurate
732 msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds".dup
733
734 thread_report = []
735 @connections.each do |conn|
736 unless conn.owner == Thread.current
737 thread_report << "#{conn} is owned by #{conn.owner}"
738 end
739 end
740
741 msg << " (#{thread_report.join(', ')})" if thread_report.any?
742
743 raise ExclusiveConnectionTimeoutError, msg
744 end
745
746 def with_new_connections_blocked
747 synchronize do
748 @threads_blocking_new_connections += 1
749 end
750
751 yield
752 ensure
753 num_new_conns_required = 0
754
755 synchronize do
756 @threads_blocking_new_connections -= 1
757
758 if @threads_blocking_new_connections.zero?
759 @available.clear
760
761 num_new_conns_required = num_waiting_in_queue
762
763 @connections.each do |conn|
764 next if conn.in_use?
765
766 @available.add conn
767 num_new_conns_required -= 1
768 end
769 end
770 end
771
772 bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0
773 end
774
775 # Acquire a connection by one of 1) immediately removing one
776 # from the queue of available connections, 2) creating a new
777 # connection if the pool is not at capacity, 3) waiting on the
778 # queue for a connection to become available.
779 #
780 # Raises:
781 # - ActiveRecord::ConnectionTimeoutError if a connection could not be acquired
782 #
783 #--
784 # Implementation detail: the connection returned by +acquire_connection+
785 # will already be "+connection.lease+ -ed" to the current thread.
786 def acquire_connection(checkout_timeout)
787 # NOTE: we rely on <tt>@available.poll</tt> and +try_to_checkout_new_connection+ to
788 # +conn.lease+ the returned connection (and to do this in a +synchronized+
789 # section). This is not the cleanest implementation, as ideally we would
790 # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to <tt>@available.poll</tt>
791 # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections
792 # of the said methods and avoid an additional +synchronize+ overhead.
793 if conn = @available.poll || try_to_checkout_new_connection
794 conn
795 else
796 reap
797 @available.poll(checkout_timeout)
798 end
799 end
800
801 #--
802 # if owner_thread param is omitted, this must be called in synchronize block
803 def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
804 @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
805 end
806 alias_method :release, :remove_connection_from_thread_cache
807
808 def new_connection
809 Base.send(spec.adapter_method, spec.config).tap do |conn|
810 conn.schema_cache = schema_cache.dup if schema_cache
811 end
812 end
813
814 # If the pool is not at a <tt>@size</tt> limit, establish new connection. Connecting
815 # to the DB is done outside main synchronized section.
816 #--
817 # Implementation constraint: a newly established connection returned by this
818 # method must be in the +.leased+ state.
819 def try_to_checkout_new_connection
820 # first in synchronized section check if establishing new conns is allowed
821 # and increment @now_connecting, to prevent overstepping this pool's @size
822 # constraint
823 do_checkout = synchronize do
824 if @threads_blocking_new_connections.zero? && (@connections.size + @now_connecting) < @size
825 @now_connecting += 1
826 end
827 end
828 if do_checkout
829 begin
830 # if successfully incremented @now_connecting establish new connection
831 # outside of synchronized section
832 conn = checkout_new_connection
833 ensure
834 synchronize do
835 if conn
836 adopt_connection(conn)
837 # returned conn needs to be already leased
838 conn.lease
839 end
840 @now_connecting -= 1
841 end
842 end
843 end
844 end
845
846 def adopt_connection(conn)
847 conn.pool = self
848 @connections << conn
849 end
850
851 def checkout_new_connection
852 raise ConnectionNotEstablished unless @automatic_reconnect
853 new_connection
854 end
855
856 def checkout_and_verify(c)
857 c._run_checkout_callbacks do
858 c.verify!
859 end
860 c
861 rescue
862 remove c
863 c.disconnect!
864 raise
865 end
866 end
867
868 # ConnectionHandler is a collection of ConnectionPool objects. It is used
869 # for keeping separate connection pools that connect to different databases.
870 #
871 # For example, suppose that you have 5 models, with the following hierarchy:
872 #
873 # class Author < ActiveRecord::Base
874 # end
875 #
876 # class BankAccount < ActiveRecord::Base
877 # end
878 #
879 # class Book < ActiveRecord::Base
880 # establish_connection :library_db
881 # end
882 #
883 # class ScaryBook < Book
884 # end
885 #
886 # class GoodBook < Book
887 # end
888 #
889 # And a database.yml that looked like this:
890 #
891 # development:
892 # database: my_application
893 # host: localhost
894 #
895 # library_db:
896 # database: library
897 # host: some.library.org
898 #
899 # Your primary database in the development environment is "my_application"
900 # but the Book model connects to a separate database called "library_db"
901 # (this can even be a database on a different machine).
902 #
903 # Book, ScaryBook and GoodBook will all use the same connection pool to
904 # "library_db" while Author, BankAccount, and any other models you create
905 # will use the default connection pool to "my_application".
906 #
907 # The various connection pools are managed by a single instance of
908 # ConnectionHandler accessible via ActiveRecord::Base.connection_handler.
909 # All Active Record models use this handler to determine the connection pool that they
910 # should use.
911 #
912 # The ConnectionHandler class is not coupled with the Active models, as it has no knowledge
913 # about the model. The model needs to pass a specification name to the handler,
914 # in order to look up the correct connection pool.
915 class ConnectionHandler
916 def self.unowned_pool_finalizer(pid_map) # :nodoc:
917 lambda do |_|
918 discard_unowned_pools(pid_map)
919 end
920 end
921
922 def self.discard_unowned_pools(pid_map) # :nodoc:
923 pid_map.each do |pid, pools|
924 pools.values.compact.each(&:discard!) unless pid == Process.pid
925 end
926 end
927
928 def initialize
929 # These caches are keyed by spec.name (ConnectionSpecification#name).
930 @owner_to_pool = Concurrent::Map.new(initial_capacity: 2) do |h, k|
931 # Discard the parent's connection pools immediately; we have no need
932 # of them
933 ConnectionHandler.discard_unowned_pools(h)
934
935 h[k] = Concurrent::Map.new(initial_capacity: 2)
936 end
937
938 # Backup finalizer: if the forked child never needed a pool, the above
939 # early discard has not occurred
940 ObjectSpace.define_finalizer self, ConnectionHandler.unowned_pool_finalizer(@owner_to_pool)
941 end
942
943 def connection_pool_list
944 owner_to_pool.values.compact
945 end
946 alias :connection_pools :connection_pool_list
947
948 def establish_connection(config)
949 resolver = ConnectionSpecification::Resolver.new(Base.configurations)
950 spec = resolver.spec(config)
951
952 remove_connection(spec.name)
953
954 message_bus = ActiveSupport::Notifications.instrumenter
955 payload = {
956 connection_id: object_id
957 }
958 if spec
959 payload[:spec_name] = spec.name
960 payload[:config] = spec.config
961 end
962
963 message_bus.instrument("!connection.active_record", payload) do
964 owner_to_pool[spec.name] = ConnectionAdapters::ConnectionPool.new(spec)
965 end
966
967 owner_to_pool[spec.name]
968 end
969
970 # Returns true if there are any active connections among the connection
971 # pools that the ConnectionHandler is managing.
972 def active_connections?
973 connection_pool_list.any?(&:active_connection?)
974 end
975
976 # Returns any connections in use by the current thread back to the pool,
977 # and also returns connections to the pool cached by threads that are no
978 # longer alive.
979 def clear_active_connections!
980 connection_pool_list.each(&:release_connection)
981 end
982
983 # Clears the cache which maps classes.
984 #
985 # See ConnectionPool#clear_reloadable_connections! for details.
986 def clear_reloadable_connections!
987 connection_pool_list.each(&:clear_reloadable_connections!)
988 end
989
990 def clear_all_connections!
991 connection_pool_list.each(&:disconnect!)
992 end
993
994 # Disconnects all currently idle connections.
995 #
996 # See ConnectionPool#flush! for details.
997 def flush_idle_connections!
998 connection_pool_list.each(&:flush!)
999 end
1000
1001 # Locate the connection of the nearest super class. This can be an
1002 # active or defined connection: if it is the latter, it will be
1003 # opened and set as the active connection for the class it was defined
1004 # for (not necessarily the current class).
1005 def retrieve_connection(spec_name) #:nodoc:
1006 pool = retrieve_connection_pool(spec_name)
1007 raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found." unless pool
1008 conn = pool.connection
1009 raise ConnectionNotEstablished, "No connection for '#{spec_name}' in connection pool" unless conn
1010 conn
1011 end
1012
1013 # Returns true if a connection that's accessible to this class has
1014 # already been opened.
1015 def connected?(spec_name)
1016 conn = retrieve_connection_pool(spec_name)
1017 conn && conn.connected?
1018 end
1019
1020 # Remove the connection for this class. This will close the active
1021 # connection and the defined connection (if they exist). The result
1022 # can be used as an argument for #establish_connection, for easily
1023 # re-establishing the connection.
1024 def remove_connection(spec_name)
1025 if pool = owner_to_pool.delete(spec_name)
1026 pool.automatic_reconnect = false
1027 pool.disconnect!
1028 pool.spec.config
1029 end
1030 end
1031
1032 # Retrieving the connection pool happens a lot, so we cache it in @owner_to_pool.
1033 # This makes retrieving the connection pool O(1) once the process is warm.
1034 # When a connection is established or removed, we invalidate the cache.
1035 def retrieve_connection_pool(spec_name)
1036 owner_to_pool.fetch(spec_name) do
1037 # Check if a connection was previously established in an ancestor process,
1038 # which may have been forked.
1039 if ancestor_pool = pool_from_any_process_for(spec_name)
1040 # A connection was established in an ancestor process that must have
1041 # subsequently forked. We can't reuse the connection, but we can copy
1042 # the specification and establish a new connection with it.
1043 establish_connection(ancestor_pool.spec.to_hash).tap do |pool|
1044 pool.schema_cache = ancestor_pool.schema_cache if ancestor_pool.schema_cache
1045 end
1046 else
1047 owner_to_pool[spec_name] = nil
1048 end
1049 end
1050 end
1051
1052 private
1053
1054 def owner_to_pool
1055 @owner_to_pool[Process.pid]
1056 end
1057
1058 def pool_from_any_process_for(spec_name)
1059 owner_to_pool = @owner_to_pool.values.reverse.find { |v| v[spec_name] }
1060 owner_to_pool && owner_to_pool[spec_name]
1061 end
1062 end
1063 end
1064end