Building a task queue, Part 3

This post is part 3 of a series where I build a task queue system. Other parts:


Error handling

Right now, when an error happens, we only mark the job as failedand save the exception. Let's improve that. We'll still do that, but we'll allow the user specify an error handler function that will be called when the job crashes.

First, we add an on_error method to our parent Queueable class:

attr_reader :queue, :retry_strategy, :error_handler

protected def on_error(&handler)
  @error_handler = handler
end

This method is used like this:

class Jobs::DoSomeStuff < Gator::Queueable
  on_error do |job|
    job.logger.error "Oh no, there's a problem with job #{job.job_id}!"
  end
end

In our worker:

def execute_job(job, job_class)
  job_instance = job_class.new(job_id: job.id, retry_count: job.attempts)
  job_instance.handle(*job.args)

  logger.info "Processed job id=#{job.id} result=succeeded queue=#{job.queue}"
  nil
rescue => e
  logger.info "Processed job id=#{job.id} result=failed queue=#{job.queue}"
  run_job_error_handler(job_instance, e) if e
  e
end

def run_job_error_handler(job_instance, error)
  return unless job_instance.class.error_handler
  
  job_instance.class.error_handler.call(job_instance, error)
rescue => e
  logger.warn "Job error handler threw an error: #{e.message}"
end

A potential UX improvement here would be to allow the user to set a global error handler for all jobs, but I'm ignoring this. More importantly, you'll notice that we've changed from job_class.new to job_class.new(job_id: job.id, retry_count: job.attempts). We're saving this information to the job instance because they might be useful to the error handler. I think this is a suboptimal API, though. Reusing the actual job class the user enqueues is a bit of a poor design, since it contains methods like dispatch and other methods the user may have defined in it. Instead, we should be passing a custom wrapper class that provides only the necessary information (job ID, retry count, and configuration). But this will do for now.

Middleware/Callbacks

I like Sidekiq's middleware system and ActiveJob's callback system, which are two different ways of inserting some custom code to run before/after your jbp execution (Sidekiq also supports "client middleware", which run when enqueuing). An example use of callbacks would be to notify a monitoring service that a scheduled job has run successfully:

# With ActiveJob
class SomeScheduledJob < ApplicationJob
  after_perform :ping_success
  
  def ping_success
    # Send a ping to the service
  end
end

# With Sidekiq
Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add PingSuccess
  end
end

Laravel also has a middleware system:

class SomeScheduledJob implements ShouldQueue
{
  public function middleware()
  {
    return [new PingSuccess];
  }
}

I'll go with something similar: a with_middleware method, that allows you pass the list of middleware.

class SomeScheduledJob < Gator::Queueable
  with_middleware [PingSuccess]
end

A middleware can be anything (class, object, proc) that responds to #call. The call method will be passed a block that can be invoked to execute the job (or any other middleware left in the stack). So a PingSUccess middleware could look like this:

class PingSuccess
  def self.call(job)
    yield # execute the job
    send_success_ping
  end
end

Let's implement this. I'll skip the definition of with_middleware, because it's the same as our previous DSL methods (queue_on, retry_with).

In our worker, we only need to change our job executor; rather than a simple job_instance.handle, we have to execute any defined middleware, passing each one a block that they can use to advance execution.

middleware = (job_class.middleware || []).dup
job_instance = job_class.new
executor = proc do
  next_middleware = middleware.shift
  next_middleware ? next_middleware.call(job_instance, &executor) : job_instance.handle(*job.args)
end
executor.call

And that's all. I'll create two sample middleware to test this:

class MeasureExecutionTime
  def self.call(job)
    start_time = Process::clock_gettime(Process::CLOCK_MONOTONIC)
    yield
    end_time = Process::clock_gettime(Process::CLOCK_MONOTONIC)
    job.logger.info("Took #{end_time - start_time} seconds")
  end
end

class SayGoodbye
  def self.call(job)
    yield
    job.logger.info("Goodbye from job #{job.class.name}")
  end
end

And in the job class:

class Jobs::DoSomeStuff < Gator::Queueable
  queue_on :high

  with_middleware [
    Jobs::Middleware::SayGoodbye, 
    Jobs::Middleware::MeasureExecutionTime
  ]

  def handle(arg1, arg2 = nil)
    sleep 1.5
    logger.info "HIIII #{arg1} and #{arg2}"
  end
end

So we dispatch a job:

Jobs::DoSomeStuff.dispatch("Tim", "Bob")

And the output in the console:

INFO [2022-12-24 20:29:59] HIIII Tim and Bob
INFO [2022-12-24 20:29:59] Took 1.50799410000036 seconds
INFO [2022-12-24 20:29:59] Goodbye from job Jobs::DoSomeStuff
INFO [2022-12-24 20:29:59] Processed job id=job_d4ba19849dccfa812acb539d result=succeeded queue=high

More middleware

We can refactor some of our internal worker code into middleware! For instance, the logging that we do before and after processing a job could be expressed as middleware.

Before:

def execute_job(job, job_class)
  job_class.new.handle(*job.args)

  logger.info "Processed job id=#{job.id} result=succeeded queue=#{job.queue}"
  nil
rescue => e
  logger.info "Processed job id=#{job.id} result=failed queue=#{job.queue}"
  e
end

After:

class LogJobExecution
  def self.call(job)
    yield
    job.logger.info "Processed job id=#{job.job_id} result=succeeded queue=#{job.queue}"
  rescue
    job.logger.info "Processed job id=#{job.job_id} result=failed queue=#{job.queue}"
    raise
  end
end

# in the worker

DEFAULT_MIDDLEWARE = [
  Middleware::LogJobExecution
]

def execute_job(job, job_class)
  middleware = DEFAULT_MIDDLEWARE + (job_class.middleware || [])
  # ...
end

We could even go further and implement things like retries and error handling as middleware.

It's debatable whether or not this middleware approach is a good idea, though. One consequence of having these internals as part of the middleware chain is that they become subjected to the regular pipeline that end-user middleware go through, which can be problematic. For instance, if an unexpected error is raised by the LogJobExecution middleware, it would be bubbled up and cause the job to be marked as failed. We could get around this with some smarter error handling, but then we'd be adding extra code to get ourselves out of the hole we put ourselves into.

Chaining

Chaining is a feature that lets you set up a "chain" of jobs. An example from the Laravel docs:

Bus::chain([
    new ProcessPodcast($podcast),
    new OptimizePodcast($podcast),
    new ReleasePodcast($podcast),
])->dispatch();

You could achieve this yourself (by having each job directly or indirectly enqueue the next one, or using a message queue), but it's an interesting challenge. For my queue library, I'd prefer a chain to be defined as a separate class. This way you can name it and specify its own behaviour (queue name, error handler, etc):

class PodcastPipeline < Gator::QueueableChain
  queue_on :priority
  
  on_error do |job|
    # ...
  end
  
  jobs [
    ProcessPodcast,
    OptimizePodcast,
    ReleasePodcast,
  ]
end

PodcastPipeline.dispatch(podcast_id)

There are a couple of ways to implement chaining, but I'll go with a simple approach:

  • When the chain is dispatched, we enqueue all of its jobs immediately.
  • The first job is set to ready (I renamed the default state from "waiting" to "ready", to indicate that these jobs are "ready to be picked up"). The other jobs are set to waiting.
  • Each job has a next_job_id that points to the next job in the chain. We'll also store the chain class so we can look up its error handler.
  • When one job executes successfully, the next job is set to ready, so it can be picked up, and so on.
  • If a job errors, we'll call the chain's error handler (if there's one) rather than the job's.

Okay, let's do this. First up, we update our database schema:

DB.create_table :jobs do
  # ...
  String :next_job_id, null: true
  String :chain_class, null: true
end

Next, the chain class:

module Gator
  class QueueableChain
    class << self
      # Other DSL methods (queue_as, on_error)...
      
      attr_reader :component_jobs

      def jobs(jobs)
        @component_jobs = jobs
      end
    end

    def self.dispatch(*args, wait: nil, at: nil, queue: nil)
      next_execution_at = wait ? (Time.now + wait) : (at ? at : nil)
      chain_queue = queue ? queue : self.queue

      jobs = []
      logs = []
      component_jobs.reverse_each.with_index do |job_class, index|
        is_first_in_chain = (index == component_jobs.size - 1)
        queue = (chain_queue || job_class.queue || 'default').to_s
        jobs << {
          id: Gator::Models::Job.generate_job_id,
          name: job_class.name,
          args: args.to_json,
          next_execution_at: is_first_in_chain ? next_execution_at : nil,
          state: is_first_in_chain ? "ready" : "waiting",
          queue:,
          next_job_id: (jobs.last[:id] rescue nil),
        }
        logs << ["Enqueued job #{job_class.name} in chain #{self.name} queue=#{queue}"]
      end

      Gator::Models::Job.multi_insert(jobs.reverse!)

      Gator::Logger.new.info logs.join("\n")
    end
  end
end

Testing:

class Jobs::DoSomeStuffChain < Gator::QueueableChain
  on_error do |job|
    job.logger.info "Error in job #{job.class.name} in chain #{self.name}"
  end

  jobs [
    Jobs::DoSomeStuff,
    Jobs::RegularAssJob,
    Jobs::DoSomeStuff,
  ]
end
> Jobs::DoSomeStuffChain.dispatch("Bob", "Pete")
INFO [2022-12-25 21:51:07] Enqueued job Jobs::DoSomeStuff in chain Jobs::DoSomeStuffChain queue=high
Enqueued job Jobs::RegularAssJob in chain Jobs::DoSomeStuffChain queue=default
Enqueued job Jobs::DoSomeStuff in chain Jobs::DoSomeStuffChain queue=high

The last thing left is to adjust the worker. The run_job_error_handler needs to check for the chain's error handler ,and cleanup needs to mark the next job as ready:

def run_job_error_handler(job_instance, error)
+  error_handler = Object.const_get(job_instance.chain_class).error_handler if job_instance.chain_class
+  error_handler ||= job_instance.class.error_handler
+  return unless error_handler

  error_handler.call(job_instance, error)
rescue => e
  logger.warn "Job error handler threw an error: #{e.message}"
end

def cleanup(job, job_class, error = nil)
  job.reserved_by = nil
  job.attempts += 1
  job.last_executed_at = Time.now
  if error
    job.state = "failed"
    job.error_details = error
    set_retry_details(job_class.retry_strategy, job, error) if job_class.retry_strategy
  else
    job.state = "succeeded"
  end

+  DB.transaction do
     job.save
+    if job.state == "succeeded" && job.next_job_id
+     Models::Job.where(id: job.next_job_id).update(state: "ready")
+    end
+  end
end

And when we run our worker:

INFO [2022-12-25 21:57:12] HIIII Bob and Pete
INFO [2022-12-25 21:57:12] Took 1.501239400007762 seconds
INFO [2022-12-25 21:57:12] Goodbye from job Jobs::DoSomeStuff
INFO [2022-12-25 21:57:12] Processed job id=job_f6f8561eb0d4b9269bfa7767 result=succeeded queue=high
INFO [2022-12-25 21:57:12] Checking all queues
INFO [2022-12-25 21:57:14] Processed job id=job_290a8ebaca5e3541986f0e19 result=succeeded queue=default
INFO [2022-12-25 21:57:14] Checking all queues
INFO [2022-12-25 21:57:16] HIIII Bob and Pete
INFO [2022-12-25 21:57:16] Processed job id=job_94bfaa5f6c79b16d5663e8bf result=succeeded queue=high

This approach is very simple. An alternative approach I considered involved having a separate chains table, with each entry having its own state that got updated as the jobs were processed. A major advantage of this would be easy visualization chains in a web UI.

Graceful shutdown

Finally, we'll teach our worker how to shut down gracefully. This means letting jobs that are currently executing to finish up, before exiting. We can do this by implementing a handler for the "exit" signal, SIGINT (what the script receives when you press Ctrl-C). (There are also more signals we could handle, such as SIGKILL and SIGTERM., which represent other instructions)

Unfortunately, this won't help with our current setup, since the job has to be interrupted in order to run the signal handler, and can't be resumed afterwards. What we need to do is move our job execution to a separate thread. Then when we receive the exit signal, we wait for the thread to finish, then exit.

def run
  # ...
  setup_signal_handlers # (1)
  loop do
    break if @should_exit # (2)

    if (job = next_job)
      @job_thread = Thread.new do # (3)
        job_class = Object.const_get(job.name)
        error = execute_job(job, job_class)
        cleanup(job, job_class, error)
      end
      @job_thread.join
      @job_thread = nil
    else
      sleep polling_interval
    end
  end
  
  puts "Exiting." # (4)
end

def setup_signal_handlers # (5)
  Signal.trap("SIGINT") do
    if @should_exit
      puts "Force exiting"
      Thread.kill(@job_thread) if @job_thread
      exit
    end

    puts "Received SIGINT; waiting for any executing jobs to finish before exiting..."
    @should_exit = true
  end
end

Let's break this down. The first piece of the puzzle is scheduling the job in another thread:

@job_thread = Thread.new do
  job_class = Object.const_get(job.name)
  error = execute_job(job, job_class)
  cleanup(job, job_class, error)
end
@job_thread.join
@job_thread = nil

We're explicitly waiting for the job to complete (with @job_thread.join) before checking for the next job, but we don't have to. In fact, from here we could switch to a multithreaded approach where we process n jobs concurrently on n threads. But that'll come later. For now, we want to stay single-threaded.

The next piece is handling SIGINT. At its core, all we do is set an instance variable, @should_exit. We don't need to do anything else, because the run method is already waiting on the thread (@job_thread.join). However, now that we've set @should_exit to true, after the job thread (or sleep) is done, the loop will exit. However, if SIGINT is sent again, then we force the job to exit. This way, jobs that somehow take a long time won't block our worker.

I'm using puts here rather than our logger, since Ruby's logger can't be used in signal handlers.

I don't like that the exiting code is spread across two places (normal exit in the run method, and force exit in the signal handler), but I couldn't find a better way. To force exit, we have to interrupt the job thread, and the run method can't do that, because it's blocked at @job_thread.join. Alternatively, we could move all the exiting code to the signal handler like this:

def setup_signal_handlers
  Signal.trap("SIGINT") do
    if should_exit
      puts "Force exiting"
      Thread.kill(@job_thread) if @job_thread
      exit
    end

    puts "Received SIGINT; waiting for any executing jobs to finish before exiting..."
    should_exit = true
    @job_thread.join
    exit
  end
end

But this won't work, because the signal handler now blocks at @job_thread.join and can't even process a second SIGINT (force exit) until the job thread is done.


That's it for Part 3. Still more cool stuff to come (update: Part 4's out). Check out the code so far.



I write about my software engineering learnings and experiments. Stay updated with Tentacle: tntcl.app/blog.shalvah.me.

Powered By Swish