Building a task queue, Part 2

This post is part 2 of a series where I build a task queue system. Other parts:


Next, I'll beef up my queue library with some extra features.

Bulk dispatch

First, I'll add the ability to dispatch jobs in bulk. This way rather than calling Queuable.dispatch in a loop n times and issuing n database queries, the user can make a single call which issues a single query. This part is pretty easy:

# Add a `dispatch_many` method to our Queueable class
module Gator
  class Queueable
    # ...
    
    def self.dispatch_many(args, **options)
      jobs = args.map do |job_args|
        {
          id: Gator::Models::Job.generate_job_id,
          name: self.name,
          args: job_args.to_json,
        }
      end
      Gator::Models::Job.multi_insert(jobs)

      Gator::Logger.new.info "Enqueued #{args.size} #{self.name} jobs"
    end
end
# Usage:
get "/queue/:count?" do
  count = Integer(params[:count] || 1)
  args = count.times.map { |i| ["Nellie #{i}", "Buster #{i}"] }
  Jobs::DoSomeStuff.dispatch_many(args)
  "Queued #{count} jobs"
end

Delay

Next up, supporting a delay before dispatching a job. Say, we want to send a welcome email to a user two hours after they signed up. This should look like:

def signup
  # Create user...
  SendWelcomeEmail.dispatch(user.id, wait: 2 * 60 * 60)
end

Or just sending an email at a specific date/time (like a payment reminder on a certain date):

SendPaymentReminder.dispatch(subscription.id, at: reminder_date)

Since we planned ahead for this with the next_execution_at column, this is also straightforward:

def self.dispatch(*args, wait: nil, at: nil)
  job = Gator::Models::Job.new(
    name: self.name, args:, 
    next_execution_at: wait ? (Time.now + wait) : (at ? at : nil),
  )
  job.save
end

def self.dispatch_many(args, wait: nil, at: nil)
  next_execution_at = wait ? (Time.now + wait) : (at ? at : nil)

  jobs = args.map do |job_args|
    {
      id: Gator::Models::Job.generate_job_id,
      name: self.name,
      args: job_args.to_json,
      next_execution_at:,
    }
  end
  Gator::Models::Job.multi_insert(jobs)
end
# Usage:
delay = count % 15
Jobs::DoSomeStuff.dispatch("I was dispatched #{delay} minutes ago — at #{Time.now}", wait: delay * 60)
INFO [2022-11-01 23:40:43] HIIII I was dispatched 3 minutes ago — at 2022-11-01 23:37:40 +0100 and 
INFO [2022-11-01 23:40:43] Processed job id=job_0426d65614121577f4d3aba1 result=succeeded args=["I was dispatched 3 minutes ago — at 2022-11-01 23:37:40 +0100"]

Of course, there's no guarantee the job will execute at exactly that time. Our workers only poll the database at intervals, so if the execution time reaches during a sleep, the job has to wait for the next poll. Also, a worker might be busy with another job that takes some time. Overall, most timing systems can only guarantee that the task won't be executed until after the timer is up, but not necessarily immediately.

Named queues

Not all jobs are equal. Some are business-critical, some take longer, and so on. In an app with a lot of background jobs, jobs are often assigned to different queues, which are typically named by priority (eg "critical", "high", "default", "low") or by function ("emails", "transactions"), or a mix of both.

By default, all jobs are pushed to the default queue, and the worker executes all jobs on all queues, but using multiple queues allows for greater flexibility and control over how jobs are processed:

  • You can spin up a new worker process and assign it to a queue that gets a lot of jobs, so they aren't kept waiting for long.
  • You could even have a separate server dedicated to a queue with jobs that use a lot of resources (CPU/memory), so they don't affect other jobs.
  • You can tell a worker what queues to prioritize, so jobs on that queue get processed sooner.
  • Workers can be independent, so a problem with one worker doesn't mean there's a backlog on all your jobs

Different job systems do this differently.

  • BullMQ and Que set the priority directly on a job. You can set this to any number, and jobs with a higher priority are executed before any other waiting jobs of a lower (or no) priority.
  • Resque does not support explicit priorities, but instead will check queues in the order you list them. So, for instance, if you specify high,default for a worker, the default queue will only be checked when the high queue is empty. One big weakness of this implicit ordering is queue starvation: in a single-threaded worker, if you keep enqueuing high jobs at a high rate, default jobs will never be processed. The only way to get around this is to start up a dedicated worker for each queue.
  • Sidekiq supports Resque-like implicit ordering as well as explicitly assigning priorities to queues (["critical", 7], ["default", 2]). In this case, this means that when picking a job from the queue, the worker is likely to pick a critical job 7 times out of 9, and a default job 2 times.

I'll be implementing the explicit queue priority approach, as I think it's the clearest and most predictable approach.

To implement this, we allow the user to specify the queue by defining a static queue property:

class SendStorageAlmostUsedUpEmail < Gator::Queueable
  queue_on :critical
end

We'll also support overriding it when enqueuing a job; you might have a job that should only be sent as high priority in certain scenarios:

case percent_storage_used
when 95
  SendStorageUsageEmail.dispatch(account.id, 95, queue: :critical)
else
  SendStorageUsageEmail.dispatch(account.id, 80)
end

(Neat trick: this allows us achieve Que's "set a priority on a job" approach—just use the queue as the priority.)

Right, let's go. First, we update the Queueable class so a job can specify its queue and override it at execution time.

def self.queue_on(queue)
  @queue = queue
end

class << self
  attr_reader :queue
end

def self.dispatch(*args, wait: nil, at: nil, queue: nil)
  job = Gator::Models::Job.new(
    name: self.name, args:, 
    next_execution_at: wait ? (Time.now + wait) : (at ? at : nil),
    queue: queue ? queue : (self.queue || 'default'),
  )
  job.save
  Gator::Logger.new.info "Enqueued job id=#{job.id} args=#{job.args} queue=#{job.queue}"
end

Now, over to the job server. Our CLI work script needs to change to accept a list of queues:


OptionParser.new do |parser|
  parser.on("-r", "--require FILE", "File to load at startup. Use this to boot your app.")

  parser.on("-q", "--queue QUEUE", Array,
    "A queue for this worker to check. Pass this flag multiple times to set multiple queues. " +
      "Separate a queue from its priority with a comma. Example: '-q critical,8 -q default,1'"
  ) do |q, priority|
    options[:queue] ||= []
    options[:queue] << [q, priority ? Integer(priority) : 1]
  end
end.parse!(into: options)

require options.delete(:require) if options[:require]

require_relative '../worker'

# CLI option name is `queue`, 
# but Worker argument name is `queues`
options[:queues] = options.delete(:queue)

w = Gator::Worker.new(**options)
w.run

As for the worker itself, we need to adjust the worker to take an optional list of queues to check, along with their priorities. The queue list should be composed of [<queue_name>, <priority>] tuples. You can also just pass a queue name to default the priority to 1. For example, in the list [[:critical, 6], [:high, 4], [:default, 2], :low], :low has a priority of 1.

We'll also adjust our check_for_jobs query to respect the queues the worker is assigned to. There are three rules:

  • If no queues were specified, just pick the next job as usual.
  • If queues were specified, but they all have the same priority, only pick up jobs that are in any of the queues.
  • If queues were specified, and have different priorities, pick a job randomly using the queue priorities as weights.

So here we go (showing only the changed bits):

module Gator
  class Worker
    attr_reader :polling_interval, :logger, :worker_id, :queues

    def initialize(queues: [])
      @polling_interval = 5
      @logger = Gator::Logger.new
      @worker_id = "wrk_" + SecureRandom.hex(8)
      
      # Normalize the list of queues
      @queues = (queues || []).map do |q|
        q.kind_of?(Array) ? q : [q, 1]
      end
      
      # If all queues have the same priority,
      # we can precompute the query, to save time
      all_queues_have_same_priority = @queues.map { _1[1] }.uniq.size == 1
      @queues_filter = @queues.map(&:first) if all_queues_have_same_priority
    end
    
    def run
      logger.info "Worker #{worker_id} ready"
      queues.empty? ?
        logger.info("Watching all queues") :
        logger.info("Watching queues: #{queues.map { |q, p| "#{q} (priority=#{p})" }.join(', ')}")

      loop do
        if (job = next_job)
          error = execute_job(job)
          cleanup(job, error)
        else
          sleep polling_interval
        end
      end
    end
    
    # ...

    def check_for_jobs
      query = Models::Job.where(state: "waiting", reserved_by: nil)
      query = query.where { (next_execution_at =~ nil) | (next_execution_at <= Time.now) }
      
      if queues.empty?
        logger.info "Checking all queues"
      else
        queues_to_check = queues_filter
        query = query.where(queue: queues_to_check)
        logger.info "Checking queues #{queues_to_check}"
      end
      
      logger.debug query.sql
      query.first
    end

    def queues_filter
      return @queues_filter if @queues_filter

      # Weighted random sampling formula from Efraimidis and Spirakis,
      # thanks to https://gist.github.com/O-I/3e0654509dd8057b539a
      queue_to_check, _priority = queues.max_by(1) { |(_name, priority)| rand ** (1.0/priority) }.first
      [queue_to_check]
    end

    def execute_job(job)
      Object.const_get(job.name).new.handle(*job.args)
      logger.info "Processed job id=#{job.id} result=succeeded queue=#{job.queue}"
      nil
    rescue => e
      logger.info "Processed job id=#{job.id} result=failed queue=#{job.queue}"
      e
    end
    
    # ...
  end
end

I've added a bunch of logging statements so we can see what's going on at this stage.

Now, here's what happens when we start the worker without specifying any queues (bin/work.rb).

When we specify queues with the same priority (bin/work.rb -q batch,2 -q default,2):

And when we specify different priorities (bin/work.rb -q critical, 3 -q default):

Neat!

Just to be sure our weighted random algorithm works, let's let it run for a bit. I've turned off the SQL logging, so all that shows is the queue being checked.

We're specifying a priority of 3 for critical and 1 for default. This means jobs processed from these queues should be in the ratio 3:1. How did we do?

Our screenshot has 20 lines of "Checking..." logs, and 14 of them are for critical, while 6 are for default. So we have a ratio of 14:6, which is 2.3:1. Not exact, but good enough. (In fact, if the worker had checked critical just once more instead of default, we'd have had 15:5, which is exactly 3:1 😄.)

Anyway, this is a small sample size. As the number of jobs grows larger, the ratio will tend towards the expected value. (We could also write a test for it, but I've intentionally ignored testing for now.)

Okay, finally, let's watch it in action. I'm creating two jobs, a critical one and a regular-ass one.

class Jobs::CriticalJob < Gator::Queueable
  queue_on :critical

  def handle
    sleep(rand(3))
  end
end

class Jobs::RegularAssJob < Gator::Queueable
  def handle
    sleep(rand(3))
  end
end

Let's do some dispatching (while the worker is stopped), just to see it in action:

3.times { Jobs::RegularAssJob.dispatch("arg") }
Jobs::CriticalJob.dispatch
Jobs::RegularAssJob.dispatch_many([["arg"]] * 4, queue: :critical)

(By the way, I've switched from making a route for every thing I need to running things in pry. pry -r ./app/boot.rb)

That's a total of 3 jobs to the default queue, and 5 to the critical queue. Now, if we start the worker: we can see it processes the 5 critical jobs before it even gets to the default.

Note: it isn't always like this! There's every chance the weighted sample gives us the lower priority first, or even twice or three times in a row. As mentioned earlier, the effects are more obvious with a large sample size.

In fact, here's my test with a larger (albeit still small) sample size. I've modified the worker to loop only 200 times and log to stdout:

def run
- loop do
+ 200.times do
    if (job = next_job)
      # ...
    end
  end
end
 ruby .\gator\bin\work.rb -q critical,3 -q default `
  | Select-String -Pattern '["critical"]' -SimpleMatch `
  | Measure-Object -Line

Lines Words Characters Property
----- ----- ---------- --------
  143

(It's PowerShell, so sorry if the syntax looks strange. 🤣)

critical was checked times, so that's a ratio of 143:57, which is 2.51:1.

Retries

Final feature for the day is automatic retries. Implementation:

  • We'll support different retry policies.
  • The user can specify the retry policy per job class (or write a custom policy).
  • After a job is executed, if it falls, we'll use the specified retry policy to calculate the next time we should try again.

Here's what the API will look like:

# Retry every 5 minutes until success, or at most 5 times
retry_with interval: 5 * 60, max_retries: 5

# Retry with exponential backoff
retry_with interval: :exponential

# Put retries on a different queue
retry_with interval: :exponential, queue: :retries

# Custom retry logic
retry_with(queue: :retries) do |exception, retry_count|
  # Don't retry if the record has been deleted
  next false if exception === RecordNotFound
  
  next :exponential if exception === ExternalServiceDown
  
  # Retry after 5 minutes, then 10, then 25
  retry_intervals = [5 * 60, 10 * 60, 25 * 60]
  retry_intervals[retry_count]
end

In summary, your choices:

  • Specify an interval, either as a number or :exponential for exponential backoff, along with optional max_retries and queue parameters.
  • Pass a block that will be called after every failure. The block can return the next interval (or :exponential), or false to stop retrying.

As usual, we update the Queueable class:

def self.retry_with(
  interval: nil, max_retries: 10, queue: nil, &block
)
  @retry_strategy = { interval:, max_retries:, queue:, block: }
end

  class << self
    attr_reader :queue, :retry_strategy
  end

In the worker, the main change is in our cleanup method:

    def cleanup(job, error = nil)
      job.reserved_by = nil
      job.attempts += 1
      job.last_executed_at = Time.now
      job_class = Object.const_get(job.name)
      if error
        job.state = "failed"
        job.error_details = error
        set_retry_details(job_class.retry_strategy, job, error) if job_class.retry_strategy
      else
        job.state = "succeeded"
      end

      job.save
    end
    
    
    def set_retry_details(retry_strategy, job, error)
      retry_strategy => { interval:, queue:, max_retries:, block: } # Ruby 3 "destructuring" 😍

      retry_count = job.attempts - 1
      if max_retries && retry_count >= max_retries
        job.state = "dead"
        return
      end

      if block
        decision = block.call(error, retry_count)
        if decision == false
          job.state = "dead"
          return
        end

        interval = decision
      end

      interval = (30 + (retry_count) ** 5) if interval == :exponential
      job.next_execution_at = job.last_executed_at + interval

      job.queue = queue if queue
    end

Let's talk about the exponential option there. Exponential backoff is a common strategy used to retry a task at progressively longer intervals. A good reason for this might be to reduce the load on the resource being accessed or give the problem time to be resolved.

It's called "exponential" because it's typically in the shape of an exponential curve, y = a + bᶜ—y increases slowly at first, but then faster and faster as b (or c) increases. In the real world, though, no one is checking your maths. The important thing is that the intervals increase at a reasonable rate. In fact, the Resque backoffs plugin uses a fixed list of intervals 1m, 10m, 1h, 3h, 6h, rather than a specific formula. Sidekiq's formula is approximately next_interval = n⁴ + 15, where n is the current retry count. So the first retry happens after 15s, the second happens 16s after the first, the third happens 31s after the second, and so on.

I think Sidekiq's intervals are too close together, at least in the early stages; 3 retries within a minute is overkill. I went with interval = n^5 + 30, because I like the spacing of the intervals. Let's check the first 15 retry intervals:

def y(x) = 30 + (x ** 5)

ONE_HOUR = 60 * 60
ONE_DAY = ONE_HOUR * 24

def print_duration(v)
  return "#{v} seconds" if v < 60
  return "#{v/60.0} minutes" if v < ONE_HOUR
  return "#{v/ONE_HOUR} hours, #{(v % ONE_HOUR)/60.0} minutes" if v < ONE_DAY
  "#{v/ONE_DAY} days, #{(v % ONE_DAY)/ONE_HOUR} hours, #{(v % ONE_HOUR)/60.0} minutes"
end

(0...15).map { print_duration y(_1) }
[
 "30 seconds",                                                 
 "31 seconds",                                                 
 "1.0333333333333334 minutes",                                 
 "4.55 minutes",                                               
 "17.566666666666666 minutes",                                 
 "52.583333333333336 minutes",                                 
 "2 hours, 10.1 minutes",                                      
 "4 hours, 40.61666666666667 minutes",                         
 "9 hours, 6.633333333333334 minutes",
 "16 hours, 24.65 minutes",
 "1 days, 3 hours, 47.166666666666664 minutes",
 "1 days, 20 hours, 44.68333333333333 minutes",
 "2 days, 21 hours, 7.7 minutes",
 "4 days, 7 hours, 8.716666666666667 minutes",
 "6 days, 5 hours, 24.233333333333334 minutes"
 ]

Finally, we have to update the query in our check_for_jobs method to also look for failed jobs whose retry time is due.

query = Models::Job.where(state: "waiting").
  where { (next_execution_at =~ nil) | (next_execution_at <= Time.now) }.
  or(state: "failed", next_execution_at: (..Time.now)).
  where(reserved_by: nil)

And finally, demo time! With a two-minute interval and a custom queue:

class Jobs::WithRetryInterval < Gator::Queueable
  queue_on :interval
  retry_with interval: 2 * 60, queue: :retries

  def handle(*)
    logger.warn "Going to fail..."
    raise "Always fails"
  end
end

Jobs::WithRetryInterval.dispatch

Retries every 2 minutes and keeps going, because we didn't set a max_retries. ✅

With exponential retry and a limit:

class Jobs::WithExponentialRetry < Gator::Queueable
  queue_on :exponential
  retry_with interval: :exponential, max_retries: 4

  def handle(*)
    logger.warn "Going to fail..."
    raise "Always fails"
  end
end

Jobs::WithExponentialRetry.dispatch

You can see the time intervals between retries matches up with our earlier calculations.✅

With a block:

class Jobs::WithCustomRetryLogic < Gator::Queueable
  queue_on :custom
  retry_with do |exception, retry_count|
    [10, 20, 30][retry_count] || false
  end

  def handle(*)
    logger.warn "Going to fail..."
    raise "Always fails"
  end
end

Jobs::WithCustomRetryLogic.dispatch

Uses our custom retry intervals (10s, 20s, 30s), then stops. ✅

Phew, that's a lot! See you in Part 3! Code on GitHub.



I write about my software engineering learnings and experiments. Stay updated with Tentacle: tntcl.app/blog.shalvah.me.

Powered By Swish