mirror of
https://github.com/discourse/discourse.git
synced 2025-05-22 22:43:33 +08:00
FIX: handle more thread pool edge cases (#30392)
* Split `shutdown` into two separate methods for better control: - `shutdown` - signals threads to stop accepting new work - `wait_for_termination` - waits for threads to finish (with optional timeout) * Add tracking of busy threads via `@busy_threads` Set * Make idle_time parameter optional with 30-second default * Improve thread spawning logic: - Spawn initial thread immediately when work is posted - Spawn additional threads when all threads are busy and work is queued * Fix race condition in work distribution * Add busy thread count to stats output * Add test coverage for zero min_threads configuration This commit makes the ThreadPool more reliable, easier to use, and adds better visibility into its internal state. --------- Co-authored-by: Alan Guo Xiang Tan <gxtan1990@gmail.com>
This commit is contained in:
@ -8,15 +8,21 @@ module Scheduler
|
|||||||
# Usage:
|
# Usage:
|
||||||
# pool = ThreadPool.new(min_threads: 0, max_threads: 4, idle_time: 0.1)
|
# pool = ThreadPool.new(min_threads: 0, max_threads: 4, idle_time: 0.1)
|
||||||
# pool.post { do_something }
|
# pool.post { do_something }
|
||||||
|
# pool.stats (returns thread count, busy thread count, etc.)
|
||||||
#
|
#
|
||||||
# (optional)
|
# pool.shutdown (do not accept new tasks)
|
||||||
# pool.shutdown
|
# pool.wait_for_termination(timeout: 1) (optional timeout)
|
||||||
|
|
||||||
class ThreadPool
|
class ThreadPool
|
||||||
class ShutdownError < StandardError
|
class ShutdownError < StandardError
|
||||||
end
|
end
|
||||||
|
|
||||||
def initialize(min_threads:, max_threads:, idle_time:)
|
def initialize(min_threads:, max_threads:, idle_time: nil)
|
||||||
|
# 30 seconds is a reasonable default for idle time
|
||||||
|
# it is particularly useful for the use case of:
|
||||||
|
# ThreadPool.new(min_threads: 4, max_threads: 4)
|
||||||
|
# operators would get confused about idle time cause why does it matter
|
||||||
|
idle_time ||= 30
|
||||||
raise ArgumentError, "min_threads must be 0 or larger" if min_threads < 0
|
raise ArgumentError, "min_threads must be 0 or larger" if min_threads < 0
|
||||||
raise ArgumentError, "max_threads must be 1 or larger" if max_threads < 1
|
raise ArgumentError, "max_threads must be 1 or larger" if max_threads < 1
|
||||||
raise ArgumentError, "max_threads must be >= min_threads" if max_threads < min_threads
|
raise ArgumentError, "max_threads must be >= min_threads" if max_threads < min_threads
|
||||||
@ -27,6 +33,7 @@ module Scheduler
|
|||||||
@idle_time = idle_time
|
@idle_time = idle_time
|
||||||
|
|
||||||
@threads = Set.new
|
@threads = Set.new
|
||||||
|
@busy_threads = Set.new
|
||||||
|
|
||||||
@queue = Queue.new
|
@queue = Queue.new
|
||||||
@mutex = Mutex.new
|
@mutex = Mutex.new
|
||||||
@ -45,37 +52,45 @@ module Scheduler
|
|||||||
|
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
@queue << wrapped_block
|
@queue << wrapped_block
|
||||||
|
spawn_thread if @threads.length == 0
|
||||||
spawn_thread if @queue.length > 1 && @threads.length < @max_threads
|
|
||||||
|
|
||||||
@new_work.signal
|
@new_work.signal
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def shutdown(timeout: 30)
|
def wait_for_termination(timeout: nil)
|
||||||
|
threads_to_join = nil
|
||||||
|
@mutex.synchronize { threads_to_join = @threads.to_a }
|
||||||
|
|
||||||
|
if timeout.nil?
|
||||||
|
threads_to_join.each(&:join)
|
||||||
|
else
|
||||||
|
failed_to_shutdown = false
|
||||||
|
|
||||||
|
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
|
||||||
|
threads_to_join.each do |thread|
|
||||||
|
remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||||
|
break if remaining_time <= 0
|
||||||
|
if !thread.join(remaining_time)
|
||||||
|
Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}"
|
||||||
|
failed_to_shutdown = true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
if failed_to_shutdown
|
||||||
|
@mutex.synchronize { @threads.each(&:kill) }
|
||||||
|
raise ShutdownError, "Failed to shutdown ThreadPool within timeout"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def shutdown
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
return if @shutdown
|
return if @shutdown
|
||||||
@shutdown = true
|
@shutdown = true
|
||||||
@threads.length.times { @queue << :shutdown }
|
@threads.length.times { @queue << :shutdown }
|
||||||
@new_work.broadcast
|
@new_work.broadcast
|
||||||
end
|
end
|
||||||
|
|
||||||
threads_to_join = nil
|
|
||||||
@mutex.synchronize { threads_to_join = @threads.to_a }
|
|
||||||
|
|
||||||
failed_to_shutdown = false
|
|
||||||
|
|
||||||
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
|
|
||||||
threads_to_join.each do |thread|
|
|
||||||
remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
|
||||||
break if remaining_time <= 0
|
|
||||||
if !thread.join(remaining_time)
|
|
||||||
Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}"
|
|
||||||
failed_to_shutdown = true
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
raise ShutdownError, "Failed to shutdown ThreadPool within timeout" if failed_to_shutdown
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def shutdown?
|
def shutdown?
|
||||||
@ -90,6 +105,7 @@ module Scheduler
|
|||||||
shutdown: @shutdown,
|
shutdown: @shutdown,
|
||||||
min_threads: @min_threads,
|
min_threads: @min_threads,
|
||||||
max_threads: @max_threads,
|
max_threads: @max_threads,
|
||||||
|
busy_thread_count: @busy_threads.size,
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@ -115,12 +131,16 @@ module Scheduler
|
|||||||
work = nil
|
work = nil
|
||||||
|
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
@new_work.wait(@mutex, @idle_time)
|
# we may have already have work so no need
|
||||||
|
# to wait for signals, this also handles the race
|
||||||
|
# condition between spinning up threads and posting work
|
||||||
|
work = @queue.pop(timeout: 0)
|
||||||
|
@new_work.wait(@mutex, @idle_time) if !work
|
||||||
|
|
||||||
if @queue.empty?
|
if !work && @queue.empty?
|
||||||
done = @threads.count > @min_threads
|
done = @threads.count > @min_threads
|
||||||
else
|
else
|
||||||
work = @queue.pop
|
work ||= @queue.pop
|
||||||
|
|
||||||
if work == :shutdown
|
if work == :shutdown
|
||||||
work = nil
|
work = nil
|
||||||
@ -128,11 +148,23 @@ module Scheduler
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@busy_threads << Thread.current if work
|
||||||
|
|
||||||
|
if !done && work && @queue.length > 0 && @threads.length < @max_threads &&
|
||||||
|
@busy_threads.length == @threads.length
|
||||||
|
spawn_thread
|
||||||
|
end
|
||||||
|
|
||||||
@threads.delete(Thread.current) if done
|
@threads.delete(Thread.current) if done
|
||||||
end
|
end
|
||||||
|
|
||||||
# could be nil if the thread just needs to idle
|
if work
|
||||||
work&.call if !done
|
begin
|
||||||
|
work.call
|
||||||
|
ensure
|
||||||
|
@mutex.synchronize { @busy_threads.delete(Thread.current) }
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -9,7 +9,10 @@ RSpec.describe Scheduler::ThreadPool, type: :multisite do
|
|||||||
described_class.new(min_threads: min_threads, max_threads: max_threads, idle_time: idle_time)
|
described_class.new(min_threads: min_threads, max_threads: max_threads, idle_time: idle_time)
|
||||||
end
|
end
|
||||||
|
|
||||||
after { pool.shutdown(timeout: 1) }
|
after do
|
||||||
|
pool.shutdown
|
||||||
|
pool.wait_for_termination(timeout: 1)
|
||||||
|
end
|
||||||
|
|
||||||
describe "initialization" do
|
describe "initialization" do
|
||||||
it "creates the minimum number of threads and validates parameters" do
|
it "creates the minimum number of threads and validates parameters" do
|
||||||
@ -70,7 +73,12 @@ RSpec.describe Scheduler::ThreadPool, type: :multisite do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# we spin up threads in the thread loop, so it can take
|
||||||
|
# a bit of time to react to work pressure
|
||||||
|
wait_for { pool.stats[:thread_count] == max_threads }
|
||||||
|
|
||||||
expect(pool.stats[:thread_count]).to eq(max_threads)
|
expect(pool.stats[:thread_count]).to eq(max_threads)
|
||||||
|
|
||||||
(max_threads + 1).times { blocker_queue << :continue }
|
(max_threads + 1).times { blocker_queue << :continue }
|
||||||
|
|
||||||
results = Array.new(max_threads + 1) { completion_queue.pop }
|
results = Array.new(max_threads + 1) { completion_queue.pop }
|
||||||
@ -119,7 +127,8 @@ RSpec.describe Scheduler::ThreadPool, type: :multisite do
|
|||||||
results2 = Array.new(3) { completion_queue2.pop }
|
results2 = Array.new(3) { completion_queue2.pop }
|
||||||
end
|
end
|
||||||
|
|
||||||
pool.shutdown(timeout: 1)
|
pool.shutdown
|
||||||
|
pool.wait_for_termination(timeout: 1)
|
||||||
|
|
||||||
expect(results1.size).to eq(3)
|
expect(results1.size).to eq(3)
|
||||||
expect(results1.sort).to eq([0, 1, 2])
|
expect(results1.sort).to eq([0, 1, 2])
|
||||||
@ -165,6 +174,24 @@ RSpec.describe Scheduler::ThreadPool, type: :multisite do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "when thread pool has zero min threads" do
|
||||||
|
it "can quickly process and can be cleanly terminated" do
|
||||||
|
# setting idle time to 1000 to ensure that there are maximal delays waiting
|
||||||
|
# for jobs
|
||||||
|
pool = Scheduler::ThreadPool.new(min_threads: 0, max_threads: 5, idle_time: 1000)
|
||||||
|
|
||||||
|
done = Queue.new
|
||||||
|
pool.post { done << :done }
|
||||||
|
|
||||||
|
# should happen in less than 1 second
|
||||||
|
Timeout.timeout(1) { expect(done.pop).to eq(:done) }
|
||||||
|
|
||||||
|
pool.shutdown
|
||||||
|
pool.wait_for_termination
|
||||||
|
expect(pool.stats[:thread_count]).to eq(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "stress test" do
|
describe "stress test" do
|
||||||
it "handles multiple task submissions correctly" do
|
it "handles multiple task submissions correctly" do
|
||||||
completion_queue = Queue.new
|
completion_queue = Queue.new
|
||||||
|
Reference in New Issue
Block a user