This post is part 3 of a series where I build a task queue system. Other parts:
Error handling
Right now, when an error happens, we only mark the job as failed
and save the exception. Let's improve that. We'll still do that, but we'll allow the user specify an error handler function that will be called when the job crashes.
First, we add an on_error
method to our parent Queueable
class:
attr_reader :queue, :retry_strategy, :error_handler
protected def on_error(&handler)
@error_handler = handler
end
This method is used like this:
class Jobs::DoSomeStuff < Gator::Queueable
on_error do |job|
job.logger.error "Oh no, there's a problem with job #{job.job_id}!"
end
end
In our worker:
def execute_job(job, job_class)
job_instance = job_class.new(job_id: job.id, retry_count: job.attempts)
job_instance.handle(*job.args)
logger.info "Processed job id=#{job.id} result=succeeded queue=#{job.queue}"
nil
rescue => e
logger.info "Processed job id=#{job.id} result=failed queue=#{job.queue}"
run_job_error_handler(job_instance, e) if e
e
end
def run_job_error_handler(job_instance, error)
return unless job_instance.class.error_handler
job_instance.class.error_handler.call(job_instance, error)
rescue => e
logger.warn "Job error handler threw an error: #{e.message}"
end
A potential UX improvement here would be to allow the user to set a global error handler for all jobs, but I'm ignoring this. More importantly, you'll notice that we've changed from job_class.new
to job_class.new(job_id: job.id, retry_count: job.attempts)
. We're saving this information to the job instance because they might be useful to the error handler. I think this is a suboptimal API, though. Reusing the actual job class the user enqueues is a bit of a poor design, since it contains methods like dispatch
and other methods the user may have defined in it. Instead, we should be passing a custom wrapper class that provides only the necessary information (job ID, retry count, and configuration). But this will do for now.
Middleware/Callbacks
I like Sidekiq's middleware system and ActiveJob's callback system, which are two different ways of inserting some custom code to run before/after your jbp execution (Sidekiq also supports "client middleware", which run when enqueuing). An example use of callbacks would be to notify a monitoring service that a scheduled job has run successfully:
# With ActiveJob
class SomeScheduledJob < ApplicationJob
after_perform :ping_success
def ping_success
# Send a ping to the service
end
end
# With Sidekiq
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add PingSuccess
end
end
Laravel also has a middleware system:
class SomeScheduledJob implements ShouldQueue
{
public function middleware()
{
return [new PingSuccess];
}
}
I'll go with something similar: a with_middleware
method, that allows you pass the list of middleware.
class SomeScheduledJob < Gator::Queueable
with_middleware [PingSuccess]
end
A middleware can be anything (class, object, proc) that responds to #call
. The call
method will be passed a block that can be invoked to execute the job (or any other middleware left in the stack). So a PingSUccess
middleware could look like this:
class PingSuccess
def self.call(job)
yield # execute the job
send_success_ping
end
end
Let's implement this. I'll skip the definition of with_middleware
, because it's the same as our previous DSL methods (queue_on
, retry_with
).
In our worker, we only need to change our job executor; rather than a simple job_instance.handle
, we have to execute any defined middleware, passing each one a block that they can use to advance execution.
middleware = (job_class.middleware || []).dup
job_instance = job_class.new
executor = proc do
next_middleware = middleware.shift
next_middleware ? next_middleware.call(job_instance, &executor) : job_instance.handle(*job.args)
end
executor.call
And that's all. I'll create two sample middleware to test this:
class MeasureExecutionTime
def self.call(job)
start_time = Process::clock_gettime(Process::CLOCK_MONOTONIC)
yield
end_time = Process::clock_gettime(Process::CLOCK_MONOTONIC)
job.logger.info("Took #{end_time - start_time} seconds")
end
end
class SayGoodbye
def self.call(job)
yield
job.logger.info("Goodbye from job #{job.class.name}")
end
end
And in the job class:
class Jobs::DoSomeStuff < Gator::Queueable
queue_on :high
with_middleware [
Jobs::Middleware::SayGoodbye,
Jobs::Middleware::MeasureExecutionTime
]
def handle(arg1, arg2 = nil)
sleep 1.5
logger.info "HIIII #{arg1} and #{arg2}"
end
end
So we dispatch a job:
Jobs::DoSomeStuff.dispatch("Tim", "Bob")
And the output in the console:
INFO [2022-12-24 20:29:59] HIIII Tim and Bob
INFO [2022-12-24 20:29:59] Took 1.50799410000036 seconds
INFO [2022-12-24 20:29:59] Goodbye from job Jobs::DoSomeStuff
INFO [2022-12-24 20:29:59] Processed job id=job_d4ba19849dccfa812acb539d result=succeeded queue=high
More middleware
We can refactor some of our internal worker code into middleware! For instance, the logging that we do before and after processing a job could be expressed as middleware.
Before:
def execute_job(job, job_class)
job_class.new.handle(*job.args)
logger.info "Processed job id=#{job.id} result=succeeded queue=#{job.queue}"
nil
rescue => e
logger.info "Processed job id=#{job.id} result=failed queue=#{job.queue}"
e
end
After:
class LogJobExecution
def self.call(job)
yield
job.logger.info "Processed job id=#{job.job_id} result=succeeded queue=#{job.queue}"
rescue
job.logger.info "Processed job id=#{job.job_id} result=failed queue=#{job.queue}"
raise
end
end
# in the worker
DEFAULT_MIDDLEWARE = [
Middleware::LogJobExecution
]
def execute_job(job, job_class)
middleware = DEFAULT_MIDDLEWARE + (job_class.middleware || [])
# ...
end
We could even go further and implement things like retries and error handling as middleware.
It's debatable whether or not this middleware approach is a good idea, though. One consequence of having these internals as part of the middleware chain is that they become subjected to the regular pipeline that end-user middleware go through, which can be problematic. For instance, if an unexpected error is raised by the LogJobExecution
middleware, it would be bubbled up and cause the job to be marked as failed. We could get around this with some smarter error handling, but then we'd be adding extra code to get ourselves out of the hole we put ourselves into.
Chaining
Chaining is a feature that lets you set up a "chain" of jobs. An example from the Laravel docs:
Bus::chain([
new ProcessPodcast($podcast),
new OptimizePodcast($podcast),
new ReleasePodcast($podcast),
])->dispatch();
You could achieve this yourself (by having each job directly or indirectly enqueue the next one, or using a message queue), but it's an interesting challenge. For my queue library, I'd prefer a chain to be defined as a separate class. This way you can name it and specify its own behaviour (queue name, error handler, etc):
class PodcastPipeline < Gator::QueueableChain
queue_on :priority
on_error do |job|
# ...
end
jobs [
ProcessPodcast,
OptimizePodcast,
ReleasePodcast,
]
end
PodcastPipeline.dispatch(podcast_id)
There are a couple of ways to implement chaining, but I'll go with a simple approach:
- When the chain is dispatched, we enqueue all of its jobs immediately.
- The first job is set to
ready
(I renamed the default state from"waiting"
to"ready"
, to indicate that these jobs are "ready to be picked up"). The other jobs are set towaiting
. - Each job has a
next_job_id
that points to the next job in the chain. We'll also store the chain class so we can look up its error handler. - When one job executes successfully, the next job is set to
ready
, so it can be picked up, and so on. - If a job errors, we'll call the chain's error handler (if there's one) rather than the job's.
Okay, let's do this. First up, we update our database schema:
DB.create_table :jobs do
# ...
String :next_job_id, null: true
String :chain_class, null: true
end
Next, the chain class:
module Gator
class QueueableChain
class << self
# Other DSL methods (queue_as, on_error)...
attr_reader :component_jobs
def jobs(jobs)
@component_jobs = jobs
end
end
def self.dispatch(*args, wait: nil, at: nil, queue: nil)
next_execution_at = wait ? (Time.now + wait) : (at ? at : nil)
chain_queue = queue ? queue : self.queue
jobs = []
logs = []
component_jobs.reverse_each.with_index do |job_class, index|
is_first_in_chain = (index == component_jobs.size - 1)
queue = (chain_queue || job_class.queue || 'default').to_s
jobs << {
id: Gator::Models::Job.generate_job_id,
name: job_class.name,
args: args.to_json,
next_execution_at: is_first_in_chain ? next_execution_at : nil,
state: is_first_in_chain ? "ready" : "waiting",
queue:,
next_job_id: (jobs.last[:id] rescue nil),
}
logs << ["Enqueued job #{job_class.name} in chain #{self.name} queue=#{queue}"]
end
Gator::Models::Job.multi_insert(jobs.reverse!)
Gator::Logger.new.info logs.join("\n")
end
end
end
Testing:
class Jobs::DoSomeStuffChain < Gator::QueueableChain
on_error do |job|
job.logger.info "Error in job #{job.class.name} in chain #{self.name}"
end
jobs [
Jobs::DoSomeStuff,
Jobs::RegularAssJob,
Jobs::DoSomeStuff,
]
end
> Jobs::DoSomeStuffChain.dispatch("Bob", "Pete")
INFO [2022-12-25 21:51:07] Enqueued job Jobs::DoSomeStuff in chain Jobs::DoSomeStuffChain queue=high
Enqueued job Jobs::RegularAssJob in chain Jobs::DoSomeStuffChain queue=default
Enqueued job Jobs::DoSomeStuff in chain Jobs::DoSomeStuffChain queue=high
The last thing left is to adjust the worker. The run_job_error_handler
needs to check for the chain's error handler ,and cleanup
needs to mark the next job as ready:
def run_job_error_handler(job_instance, error)
+ error_handler = Object.const_get(job_instance.chain_class).error_handler if job_instance.chain_class
+ error_handler ||= job_instance.class.error_handler
+ return unless error_handler
error_handler.call(job_instance, error)
rescue => e
logger.warn "Job error handler threw an error: #{e.message}"
end
def cleanup(job, job_class, error = nil)
job.reserved_by = nil
job.attempts += 1
job.last_executed_at = Time.now
if error
job.state = "failed"
job.error_details = error
set_retry_details(job_class.retry_strategy, job, error) if job_class.retry_strategy
else
job.state = "succeeded"
end
+ DB.transaction do
job.save
+ if job.state == "succeeded" && job.next_job_id
+ Models::Job.where(id: job.next_job_id).update(state: "ready")
+ end
+ end
end
And when we run our worker:
INFO [2022-12-25 21:57:12] HIIII Bob and Pete
INFO [2022-12-25 21:57:12] Took 1.501239400007762 seconds
INFO [2022-12-25 21:57:12] Goodbye from job Jobs::DoSomeStuff
INFO [2022-12-25 21:57:12] Processed job id=job_f6f8561eb0d4b9269bfa7767 result=succeeded queue=high
INFO [2022-12-25 21:57:12] Checking all queues
INFO [2022-12-25 21:57:14] Processed job id=job_290a8ebaca5e3541986f0e19 result=succeeded queue=default
INFO [2022-12-25 21:57:14] Checking all queues
INFO [2022-12-25 21:57:16] HIIII Bob and Pete
INFO [2022-12-25 21:57:16] Processed job id=job_94bfaa5f6c79b16d5663e8bf result=succeeded queue=high
This approach is very simple. An alternative approach I considered involved having a separate chains
table, with each entry having its own state that got updated as the jobs were processed. A major advantage of this would be easy visualization chains in a web UI.
Graceful shutdown
Finally, we'll teach our worker how to shut down gracefully. This means letting jobs that are currently executing to finish up, before exiting. We can do this by implementing a handler for the "exit" signal, SIGINT (what the script receives when you press Ctrl-C). (There are also more signals we could handle, such as SIGKILL and SIGTERM., which represent other instructions)
Unfortunately, this won't help with our current setup, since the job has to be interrupted in order to run the signal handler, and can't be resumed afterwards. What we need to do is move our job execution to a separate thread. Then when we receive the exit signal, we wait for the thread to finish, then exit.
def run
# ...
setup_signal_handlers # (1)
loop do
break if @should_exit # (2)
if (job = next_job)
@job_thread = Thread.new do # (3)
job_class = Object.const_get(job.name)
error = execute_job(job, job_class)
cleanup(job, job_class, error)
end
@job_thread.join
@job_thread = nil
else
sleep polling_interval
end
end
puts "Exiting." # (4)
end
def setup_signal_handlers # (5)
Signal.trap("SIGINT") do
if @should_exit
puts "Force exiting"
Thread.kill(@job_thread) if @job_thread
exit
end
puts "Received SIGINT; waiting for any executing jobs to finish before exiting..."
@should_exit = true
end
end
Let's break this down. The first piece of the puzzle is scheduling the job in another thread:
@job_thread = Thread.new do
job_class = Object.const_get(job.name)
error = execute_job(job, job_class)
cleanup(job, job_class, error)
end
@job_thread.join
@job_thread = nil
We're explicitly waiting for the job to complete (with @job_thread.join
) before checking for the next job, but we don't have to. In fact, from here we could switch to a multithreaded approach where we process n jobs concurrently on n threads. But that'll come later. For now, we want to stay single-threaded.
The next piece is handling SIGINT. At its core, all we do is set an instance variable, @should_exit
. We don't need to do anything else, because the run
method is already waiting on the thread (@job_thread.join
). However, now that we've set @should_exit
to true, after the job thread (or sleep) is done, the loop will exit. However, if SIGINT is sent again, then we force the job to exit. This way, jobs that somehow take a long time won't block our worker.
I'm using puts
here rather than our logger
, since Ruby's logger can't be used in signal handlers.
I don't like that the exiting code is spread across two places (normal exit in the run
method, and force exit in the signal handler), but I couldn't find a better way. To force exit, we have to interrupt the job thread, and the run
method can't do that, because it's blocked at @job_thread.join
. Alternatively, we could move all the exiting code to the signal handler like this:
def setup_signal_handlers
Signal.trap("SIGINT") do
if should_exit
puts "Force exiting"
Thread.kill(@job_thread) if @job_thread
exit
end
puts "Received SIGINT; waiting for any executing jobs to finish before exiting..."
should_exit = true
@job_thread.join
exit
end
end
But this won't work, because the signal handler now blocks at @job_thread.join
and can't even process a second SIGINT (force exit) until the job thread is done.
That's it for Part 3. Still more cool stuff to come (update: Part 4's out). Check out the code so far.
I write about my software engineering learnings and experiments. Stay updated with Tentacle: tntcl.app/blog.shalvah.me.