Skip to content

Commit

Permalink
Add lifecycle hooks to Dispatcher and Scheduler
Browse files Browse the repository at this point in the history
I'm switching over to Rails Semantic Logger and for that to work
with forking code it requires a post fork action.

With this change, all 4 process types have start and stop hooks and it becomes
possible to do on_*_start { post_fork_action } and on_*_stop { pre_stop_action }
  • Loading branch information
hms authored and rosa committed Jan 28, 2025
1 parent 44e4be2 commit dd2f0fe
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 5 deletions.
16 changes: 16 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ def on_worker_stop(...)
Worker.on_stop(...)
end

def on_dispatcher_start(...)
Dispatcher.on_start(...)
end

def on_dispatcher_stop(...)
Dispatcher.on_stop(...)
end

def on_scheduler_start(...)
Scheduler.on_start(...)
end

def on_scheduler_stop(...)
Scheduler.on_stop(...)
end

def supervisor?
supervisor
end
Expand Down
3 changes: 3 additions & 0 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

module SolidQueue
class Dispatcher < Processes::Poller
include LifecycleHooks
attr_accessor :batch_size, :concurrency_maintenance

after_boot :run_start_hooks
after_boot :start_concurrency_maintenance
before_shutdown :stop_concurrency_maintenance
after_shutdown :run_stop_hooks

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
Expand Down
3 changes: 3 additions & 0 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
module SolidQueue
class Scheduler < Processes::Base
include Processes::Runnable
include LifecycleHooks

attr_accessor :recurring_schedule

after_boot :run_start_hooks
after_boot :schedule_recurring_tasks
before_shutdown :unschedule_recurring_tasks
before_shutdown :run_stop_hooks

def initialize(recurring_tasks:, **options)
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
Expand Down
20 changes: 15 additions & 5 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@ class LifecycleHooksTest < ActiveSupport::TestCase
SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }

pid = run_supervisor_as_fork(workers: [ { queues: "*" } ])
SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }

SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }

pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

results = skip_active_record_query_cache do
assert_equal 4, JobResult.count
JobResult.last(4)
assert_equal 8, JobResult.count
JobResult.last(8)
end

assert_equal "hook_called", results.map(&:status).first
assert_equal [ "start", "stop", "worker_start", "worker_stop" ], results.map(&:value).sort
assert_equal({ "hook_called" => 8 }, results.map(&:status).tally)
assert_equal %w[start stop worker_start worker_stop dispatcher_start dispatcher_stop scheduler_start scheduler_stop].sort, results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
SolidQueue::Dispatcher.clear_hooks
SolidQueue::Scheduler.clear_hooks
end

test "handle errors on lifecycle hooks" do
Expand All @@ -48,5 +56,7 @@ class LifecycleHooksTest < ActiveSupport::TestCase
SolidQueue.on_thread_error = previous_on_thread_error
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
SolidQueue::Dispatcher.clear_hooks
SolidQueue::Scheduler.clear_hooks
end
end

0 comments on commit dd2f0fe

Please sign in to comment.