Building a task queue, Part 4

This post is part 4 of a series where I build a task queue system. Other parts:


Before we continue, at this point, you'd have noticed the worker is getting fairly complex, so I've moved the details about job execution to an Executor class. This allows the worker to focus on supervising things such as fetching jobs and managing the process state.

def run
  # ...
  if (job = next_job)
    @job_thread = Thread.new do
-     job_class = Object.const_get(job.name)
-     error = execute_job(job, job_class)
-     cleanup(job, job_class, error)
+     Executor.execute(job)
    end
    @job_thread.join
    @job_thread = nil

    # ...
end

Multithreading

Last time out, we moved our job execution to a thread. But it's still fully synchronous; we wait for that thread to finish before picking up the next job. Let's take that further and make it multi-threaded: we assign a job to a thread, and we don't wait before picking up the next job and running on another thread. This way, we can have multiple jobs running at the same time.

There are many Ruby libraries that help manage such concurrency concerns, but I'd like to give this a shot from scratch.

First, we remove the line where we wait on the job thread:

if (job = next_job)
-  @job_thread = Thread.new do
+  Thread.new do
     Executor.execute(job)
   end
-  @job_thread.join
-  @job_thread = nil

and we update our exit process to kill all active child threads:

def child_threads
  Thread.list.reject { |t| t == Thread.main }
end

def run
  # ...
  setup_signal_handlers
  loop do
    if @should_exit
      child_threads.each(&:join)
      break
    end

    if (job = next_job)
      Thread.new do
        Executor.execute(job)
      end
    else
      sleep polling_interval
    end
  end
end

def setup_signal_handlers
  Signal.trap("SIGINT") do
    if @should_exit
      puts "Force exiting"
      child_threads.each(&:kill)
      exit
    end

    puts "Received SIGINT; waiting for any executing jobs to finish before exiting..."
    @should_exit = true
  end
end

This works:

Jobs::RegularAssJob.dispatch_many([["arg"]] * 4)

Before you can blink, all jobs are processed:

INFO [2022-12-26 13:09:03] Processing job id=job_0a6133ea18456f3cc1712c8f class=Jobs::RegularAssJob queue=default
INFO [2022-12-26 13:09:03] Processing job id=job_e63ee46100a1d2cf65b13ec6 class=Jobs::RegularAssJob queue=default
INFO [2022-12-26 13:09:03] Processing job id=job_00e734da548093554e47fb47 class=Jobs::RegularAssJob queue=default
INFO [2022-12-26 13:09:03] Processed job id=job_a05078c9053614baa6c7822f result=succeeded class=Jobs::RegularAssJob queue=default
INFO [2022-12-26 13:09:03] Processed job id=job_0a6133ea18456f3cc1712c8f result=succeeded class=Jobs::RegularAssJob queue=default
INFO [2022-12-26 13:09:03] Processed job id=job_00e734da548093554e47fb47 result=succeeded class=Jobs::RegularAssJob queue=default
INFO [2022-12-26 13:09:04] Processed job id=job_e63ee46100a1d2cf65b13ec6 result=succeeded class=Jobs::RegularAssJob queue=default

However, this is unsafe. If 10k jobs are ready, our worker will start 10k concurrent threads. Computing resources are not infinite; eventually, we will run into limits of something:

  • the database may not be able to handle 10k concurrent connections
  • the machine may not have enough memory for 10k competing invocations
  • the operating system may not be able to handle 10k active threads (depending on how threads are implemented)

We'll likely either see unexplaind failures or a performance degradation. To avoid this, we need to limit our concurrency. We can do this by only having a max number of threads running at any moment:

+ @max_concurrency = 5
  loop do
    if @should_exit
      child_threads.each(&:join)
      break
    end

-   if (job = next_job)
+   if (child_threads.size < @max_concurrency) && (job = next_job)
      Thread.new do
        Executor.execute(job)
      end
    else
      sleep polling_interval
    end
  end

This is also good. The change here is that if our allotted concurrency is occupied, we sleep and then check back.

But we can go further. Right now, we're creating and destroying threads for each job. I don't know the exact cost, but there is some overhead to that. How about if we had a fixed set of already-alive threads and we could send jobs to them? Once a thread finishes its current job, it hangs around to receive a new job, rather than us killing it and having to start a new thread. I believe this is called a thread pool, and the major advantage is that it cuts down the overhead of creating and destroying threads.

To implement a thread pool:

  • We have to start up all our threads before we even start checking for jobs.
  • We need a way to pass jobs to the threads after they've started. This means some sort of shared storage that the main thread can write to and child threads can read from.
  • When they're not busy with another job, the threads have to continually check this shared storage for new jobs to execute.

The good news is that Ruby comes with this shared storage we're looking for: Thread::Queue. It's perfect for us because our main thread can push jobs to it, while child threads listen for new jobs on it, with no need to poll. Calling queue.shift will either return the next item from the queue or, if the queue is empty, block (pause execution of the thread) until a new item is pushed to the queue.

Here we go:

def setup_thread_pool
  @max_concurrency.times do
    Thread.new do
      while (job = @work_queue.shift)
        Executor.execute(job)
      end
    end
  end

  logger.debug "Started #{@max_concurrency} executor threads"
end
def run
  # ...
+ @work_queue = Thread::Queue.new
+ setup_thread_pool
  setup_signal_handlers

  loop do
    if @should_exit
+     @work_queue.close
      child_threads.each(&:join)
      break
    end

-   if (child_threads.size < @max_concurrency) && (job = next_job)
+   if (@work_queue.size < @max_concurrency) && (job = next_job)
+     @work_queue.push(job)
    else
      sleep polling_interval
    end
  end
end

First up, we start our threads and create the queue they will consume jobs from. Our threads use the shift method to wait for new jobs.

You'll notice that, in our main thread, we don't push a new job to the queue if it's already at the concurrency limit (if (@work_queue.size < @max_concurrency)). We could do so without any problems, since the threads will only pick up the next job when they're done with the current one. However, in our implementation, calling next_job marks the job as reserved in the database. This means that, if the worker process gets shut down before any thread actually picks up the job, we'd have a job that was reserved but not processed. Hence, we limit the size of the queue to match the number of threads. There's actually a SizedQueue class for this, but its API doesn't give us any real advantages here.

Finally, when shutting down, we call queue.close. This will make queue.shift return nil for all the threads waiting on it, allowing them to finish and exit.

That was actually easier than I expected starting out. I'm sure there's probably more to consider, but I like how easy Ruby makes writing concurrent code. This also means that our jobs now have to be thread-safe; multiple jobs trying to accessing the same resource at the same time in different threads could lead to problems.

Redis

At this point, I think I've implemented most of the features I was really curious about. There are still some more that I may or may not tackle in the future (I'll explain them below), but the final thing I'd like to do now is switch this to use Redis. As I've explained earlier, I think Redis is a better choice for intense queuing workloads: it's much faster, and you can fetch and delete old data in one go (for instance, LPOP will fetch and remove the first item from a list). With an SQL database, you'd need to have a separate job or check that runs at intervals, and deletes old rows (and even then, they might not be deleted on disk).

Redis also has many atomic operations (like the aforementioned LPOP), which makes it a great fit for concurrent use, without having to implement locks like our reserved_by column. Finally, infrastructure: it's much easier to spin up, configure and manage a separate Redis instance (or a separate database in an existing instance) to isolate your queueing system from your app than it is to spin up a database server.

The main limitation of Redis is restricted data access. You can't exactly write ad hoc queries for what you want; you have to structure your data to optimize for how you plan to access it, not necessarily how it is structured. And there are some operations (like counting the number of keys) that will probably be just too expensive to ever do in production.

Alright, then. To port to Redis, we need to think about how we access our data and what we do with it.

First, enqueuing jobs. How we enqueue jobs determines how we can fetch them, so we must consider both. A good approach would be to push jobs into a list named after their queue. So if the user enqueues a job to the high queue (SomeJob.dispatch(queue: :high)); we save the job details as a new item in a Redis list called "queue-high" (RPUSH queue-high <SomeJob details>). This way, our worker can pick a queue and run LPOP to get the next job on that queue. This also means we don't need the reserved_by field anymore, since the worker pops the job from the list—there's no chance another worker might try to pick it up.

Why not a jobs list? Redis. If we put all jobs in one list, the worker would have to load each job in the list, check its queue to determine whether it can process it, and push it back to the list or delete it from the list. Theoretically doable, but more complicated, highly inefficient and not atomic, so expect to run into concurrency problems.

The other part is saving the job data when we're done. For successful jobs, we can simply discard them, or if we wish, push them to a "successful" list. For failed jobs which are not to be retried, we can push to a "dead" list. But what about jobs which failed but need to be retried at a specific time...? We can't simply push them back onto the queues, because they won't be ready until a certain time. This also applies to scheduled jobs.

In fact, we have two big pieces of missing functionality:

  • scheduled jobs and failed jobs which need to be retried. These must be executed after a specific time, so we can't simply push them to the queue's ready list, because they're not ready yet. They probably have to be put somewhere else.
  • chains. This isn't an SQL database, so we can't find the next job by querying the next_job_id field. We'll need to store all information for a chain in one place.

Solutions? Restructure the data. For chains, this would mean that, rather than executing all the jobs initially, we only enqueue the first job, including its chain ID in its details. We store the rest of the jobs in the chain as a separate Redis key, say chains-<chain_id>. When we finish executing the first job, we look up its chain, pop the next job and push it to its queue's ready list, and so on.

For scheduled/retrying jobs, we can utilize a sorted set for this. In a sorted set, every item has a sortable value or score (in our case, the items are the jobs, and the score is the timestamp they were scheduled for). We could have a scheduled sorted set where we put scheduled jobs. At intervals, our worker can then run ZRANGE scheduled 0 <current timestamp> BYSCORE to get all items with timestamp equal to or less than now. (Redis.🥰) If there are any "ripe" jobs, the worker will push them onto their respective queues, from where they can be picked up.

Another big change is our "Checking all queues" scenario. Remember that if the user doesn't tell the worker what queues to check, it checks all queues. In the database implementation, this is simple: don't add a WHERE queue IN (...) clause to the query. With Redis, it's tougher, since there's no general jobs list. However, LPOP allows us to fetch an item from any of several lists, so if we know the names of all the queues, we can pass all of them (for instance, LPOP queue-high queue-default queue-low) to get any available job.

Unfortunately, we don't know the names of all the queues...or do we? We don't, but since we push the jobs to the queue, all we need is a record of all the queues we've pushed jobs to. We'll use a known-queues list in Redis (a set, actually, since we don't need duplicates); every time we enqueue a new job, we add its queue to that list. This way, we can tell LPOP to check all our known queues.

Note that LPOP will check the speicified lists from left to right, so we'll shuffle the order each time so we don't favour one queue (LPOP queue-default queue-high queue-low, LPOP queue-low queue-high queue-default, and so on).

Phew. It's definitely more complex than the database solution, but it's a worthwhile challenge. Let's go for it.

require 'connection_pool'
require 'redis'

# Using a connection pool of 6 so each thread + worker gets one
$redis = ConnectionPool::Wrapper.new(size: 6, timeout: 3) { Redis.new }

Now, for enqueuing: We must serialize the job to a JSON hash, push to the appropriate queue list in Redis, and update the set of known queues.

Our Job model will change a lot. It's no longer a database-backed model, but a simple wrapper over the Redis client:

class Gator::Models::Job
  def self.save(details, at: nil, queue: nil, connection: nil)
    connection ||= $redis
    
    if at
      # Scheduled job; add to sorted set with execution time as score
      connection.zadd "scheduled", at.to_i, details.to_json
    else
      # Immediate job; push to the queue's ready list now
      connection.rpush "queue-#{details['queue']}", details.except('queue').to_json
    end

    # Update our known-queues set
    connection.sadd "known-queues", details['queue']
  end

  def self.generate_job_id
    "job_" + SecureRandom.hex(12)
  end
end

The save method is what we use for creating a new job. I've added a connection parameter that specifies the Redis connection to use, so I can use Redis transactions like this:

$redis.multi do |transaction|
  Gator::Models::Job.save(job_hash, connection: transaction)
  transaction.some_other_command
end

Next, our Queueable class. Not too complicated:

class Gator::Queueable
  def self.dispatch(*args, wait: nil, at: nil, queue: nil, connection: nil)
    at ||= wait ? (Time.now + wait) : nil
    queue ||= self.queue || 'default'
    job_hash = {
      'id' => Gator::Models::Job.generate_job_id,
      'name' => self.name,
      'args' => args,
      'queue' => queue,
    }
    Gator::Models::Job.save(job_hash, at:, connection:)
    Gator::Logger.new.info "Enqueued job id=#{job_hash['id']} args=#{args} queue=#{queue}"
  end

  def self.dispatch_many(args, wait: nil, at: nil, queue: nil)
    next_execution_at = wait ? (Time.now + wait) : (at ? at : nil)
    queue ||= self.queue

    $redis.multi do |transaction|
      args.each do |job_args|
        dispatch(*job_args, at: next_execution_at, queue:, connection: transaction)
      end
    end
  end
end

And for chains:

class Gator::QueueableChain
  def self.dispatch(*args, wait: nil, at: nil, queue: nil)
    at ||= wait ? (Time.now + wait) : nil
    chain_queue = queue || self.queue

    chain_id = SecureRandom.hex(12)
    jobs = component_jobs.map do |job_class|
      {
        'id' => Gator::Models::Job.generate_job_id,
        'name' => job_class.name,
        'args' => args,
        'queue' => chain_queue || job_class.queue || 'default',
        'chain_class' => self.name,
      }
    end

    $redis.multi do |transaction|
      # Enqueue the first job immediately
      Gator::Models::Job.save(
        jobs.shift.merge({ 'chain_id' => chain_id }), at:,
        connection: transaction
      )
      # Save the rest under the chain
      transaction.rpush "chains-#{chain_id}", jobs.map(&:to_json)
    end

    Gator::Logger.new.info "Enqueued chain #{self.name} queue=#{queue}"
  end
end

Over to our worker. Our run method stays largely the same: it's a loop that checks for new jobs and assigns them to a worker thread. The main changes are that there's no more need for polling, since we're using the blocking Redis command BRPOP, which will wait for a while if the queue is empty. Also, between each iteration of the loop, we check for any "ripe" jobs (scheduled jobs whose time is due) and enqueue them.

def run
  # ...
  loop do
    # ...
+   enqueue_any_ripe_jobs

    if (@work_queue.size < @max_concurrency) && (job = next_job)
      @work_queue.push(job)
    end
  end
end
def enqueue_any_ripe_jobs
  ripe_scheduled_jobs = $redis.zrange "scheduled", 0, Time.now.to_i, byscore: true
  $redis.multi do |transaction|
    # Enqueue the jobs
    ripe_scheduled_jobs.each do |job_hash|
      Gator::Models::Job.save(JSON(job_hash), connection: transaction)
    end
    # Remove them from the `scheduled` set
    transaction.zpopmin "scheduled", ripe_scheduled_jobs.count
  end

  ripe_retry_jobs = $redis.zrange "retry", 0, Time.now.to_i, byscore: true
  $redis.multi do |transaction|
    ripe_retry_jobs.each do |job_hash|
      Gator::Models::Job.save(JSON(job_hash), connection: transaction)
    end
    transaction.zpopmin "retry", ripe_retry_jobs.count
  end
end

The way we check for jobs also has to change:

def check_for_jobs
  queues_to_check = queues_filter
  logger.info "Checking queues #{queues_to_check}"

  # Use BLPOP to fetch the next available job from any of the queues
  # (waiting for up to 2 seconds if they're all empty)
  $redis.blpop queues_to_check.map { "queue-#{_1}"}, timeout: 2
end

The queues_filter is a modification of the one we used for our database (Part 2):

def queues_filter
  return @queues.map(&:first).shuffle if @all_queues_have_same_priority

  if @queues.empty?
    all_queues = $redis.smembers "known-queues"
    return all_queues.empty? ? ['default'] : all_queues.shuffle
  end

  # Remember the weighted random sampling? This orders queues by priority
  queues.sort_by { |(_name, priority)| -(rand ** (1.0 / priority)) }.map(&:first)
end

And finally, next_job prepares the job for sending to an executor thread:

def next_job
  queue, job_string = check_for_jobs
  return nil if queue == nil # No job, try again later

  defaults = {
    'attempts' => 0,
    'state' => 'ready',
    'queue' => queue.sub("queue-", ''),
    'chain_class' => nil,
    'chain_id' => nil,
    'error_details' => nil,
    'last_executed_at' => nil,
    'next_execution_at' => nil,
  }
  job_hash = defaults.merge(JSON(job_string))
  OpenStruct.new(**job_hash)
end

next_job sets some extra info and turns the job into an OpenStruct, which is like a data object wrapper for a hash. All this is to make it easier for the executor to work with the job object (and so I don't have to change most of the code I had in the database version😅).

And speaking of the executor, luckily, the only thing we need to change is the cleanup method (since that's the only place where it tries to save information about a job):

def cleanup(job, job_class, error = nil)
  job.attempts += 1
  job.last_executed_at = Time.now
  if error
    job.state = "failed"
    job.error_details = error
    job_class.retry_strategy ? set_retry_details(job_class.retry_strategy, job, error) : (job.state = "dead")
  else
    job.state = "succeeded"
  end

  if job.state == "failed"
    $redis.zadd "retry", job.next_execution_at.to_i, job.to_h.to_json
  elsif job.state == "dead"
    $redis.rpush "dead", job.to_h.to_json
  elsif job.chain_id
    next_job_in_chain = $redis.lpop "chains-#{job.chain_id}"
    if next_job_in_chain
      Gator::Models::Job.save(
        JSON(next_job_in_chain).merge({ 'chain_id' => job.chain_id })
      )
    end
  end
end

And that's all.

Hard to show the full results here, but it worked beautifully, complete with discovering new queues as I enqueued jobs, and shuffling the list of queues.

Here's some of what's in my Redis instance after some queueing and executing (explored via redis-commander):

A big takeaway for me was how fast Redis is. I mean, I know it's fast, but it was still impressive to see. I hardly had time to inspect the Redis database after queuing a job before the job was popped.

Improvements? For one, reliability. There are a few places here where I (had to) use non-atomic operations. For instance, when enqueuing the next_job_in_chain, if the thread gets killed before Gator::Models::Job.save is run, we would lose that job and be unable to recover it. In fact, the whole lifecycle of a job is susceptible to this—the worker LPOPs the job from its queue to execute, but what happens if the worker suddenly gets killed before the job can be executed? A solution would be to have another queue where we temporarily save in-progress jobs; when the worker restarts, we re-queue jobs from there. Sidekiq Pro offers variations of this feature.

A smaller improvement: I'd prefer to use a custom model or Struct for the job model rather than an OpenStruct, for better data integrity

Epilogue

I'm really happy with what I've built over the course of this series. I probably wouldn't use it in production until I did some further testing. Speaking of tests, you'll notice I didn't write any (and I'm sure there are several bugs😅). I think writing tests for this could be an interesting article (or series) in itself. Will I write that article? Maybe, but no commitments at the moment!

More importantly, I learnt a lot (and I hope you did too). As always, going beyond the surface is always a wonderful journey.

What else would I like to add?

  • A testing interface for end users. For instance, you should be able to switch from Redis to an in-memory queue in your app tests, without changing your app code.
  • Metrics: This would be fairly straightforward to do, I think. We could use Redis to store stats like number of jobs processed, average wait time (how long a job waits before it's picked up), memory usage, etc.
  • Speaking of metrics, I'd ideally want to run some benchmarks to have a concrete idea of my library's performance. How many jobs can it enqueue per second, how many can it process per second, how does it perform at different concurrency levels, and so on. I'd even like to benchmark the database version against the Redis version.
  • A web UI, or at the very least, an introspection API, that allows you to programmatically inspect the metrics generated above and internal state of your queues without having to dig into the Redis implementation details. In both cases, I'm inspired by Sidekiq, although I think the UI could offer more.
  • No-overlaps middleware: An idea from Laravel, a job middleware that stops multiple instances of the same job being executed at the same time. Technically speaking, it's relatively easy to execute (a Redis lock should do it), but I find myself intrigued by the prospect.

Anyway, that's it for now! Code on GitHub.



I write about my software engineering learnings and experiments. Stay updated with Tentacle: tntcl.app/blog.shalvah.me.

Powered By Swish