This post is part 2 of a series where I build a task queue system. Other parts:
Next, I'll beef up my queue library with some extra features.
Bulk dispatch
First, I'll add the ability to dispatch jobs in bulk. This way rather than calling Queuable.dispatch
in a loop n times and issuing n database queries, the user can make a single call which issues a single query. This part is pretty easy:
# Add a `dispatch_many` method to our Queueable class
module Gator
class Queueable
# ...
def self.dispatch_many(args, **options)
jobs = args.map do |job_args|
{
id: Gator::Models::Job.generate_job_id,
name: self.name,
args: job_args.to_json,
}
end
Gator::Models::Job.multi_insert(jobs)
Gator::Logger.new.info "Enqueued #{args.size} #{self.name} jobs"
end
end
# Usage:
get "/queue/:count?" do
count = Integer(params[:count] || 1)
args = count.times.map { |i| ["Nellie #{i}", "Buster #{i}"] }
Jobs::DoSomeStuff.dispatch_many(args)
"Queued #{count} jobs"
end
Delay
Next up, supporting a delay before dispatching a job. Say, we want to send a welcome email to a user two hours after they signed up. This should look like:
def signup
# Create user...
SendWelcomeEmail.dispatch(user.id, wait: 2 * 60 * 60)
end
Or just sending an email at a specific date/time (like a payment reminder on a certain date):
SendPaymentReminder.dispatch(subscription.id, at: reminder_date)
Since we planned ahead for this with the next_execution_at
column, this is also straightforward:
def self.dispatch(*args, wait: nil, at: nil)
job = Gator::Models::Job.new(
name: self.name, args:,
next_execution_at: wait ? (Time.now + wait) : (at ? at : nil),
)
job.save
end
def self.dispatch_many(args, wait: nil, at: nil)
next_execution_at = wait ? (Time.now + wait) : (at ? at : nil)
jobs = args.map do |job_args|
{
id: Gator::Models::Job.generate_job_id,
name: self.name,
args: job_args.to_json,
next_execution_at:,
}
end
Gator::Models::Job.multi_insert(jobs)
end
# Usage:
delay = count % 15
Jobs::DoSomeStuff.dispatch("I was dispatched #{delay} minutes ago — at #{Time.now}", wait: delay * 60)
INFO [2022-11-01 23:40:43] HIIII I was dispatched 3 minutes ago — at 2022-11-01 23:37:40 +0100 and
INFO [2022-11-01 23:40:43] Processed job id=job_0426d65614121577f4d3aba1 result=succeeded args=["I was dispatched 3 minutes ago — at 2022-11-01 23:37:40 +0100"]
Of course, there's no guarantee the job will execute at exactly that time. Our workers only poll the database at intervals, so if the execution time reaches during a sleep, the job has to wait for the next poll. Also, a worker might be busy with another job that takes some time. Overall, most timing systems can only guarantee that the task won't be executed until after the timer is up, but not necessarily immediately.
Named queues
Not all jobs are equal. Some are business-critical, some take longer, and so on. In an app with a lot of background jobs, jobs are often assigned to different queues, which are typically named by priority (eg "critical", "high", "default", "low") or by function ("emails", "transactions"), or a mix of both.
By default, all jobs are pushed to the default
queue, and the worker executes all jobs on all queues, but using multiple queues allows for greater flexibility and control over how jobs are processed:
- You can spin up a new worker process and assign it to a queue that gets a lot of jobs, so they aren't kept waiting for long.
- You could even have a separate server dedicated to a queue with jobs that use a lot of resources (CPU/memory), so they don't affect other jobs.
- You can tell a worker what queues to prioritize, so jobs on that queue get processed sooner.
- Workers can be independent, so a problem with one worker doesn't mean there's a backlog on all your jobs
Different job systems do this differently.
- BullMQ and Que set the priority directly on a job. You can set this to any number, and jobs with a higher priority are executed before any other waiting jobs of a lower (or no) priority.
- Resque does not support explicit priorities, but instead will check queues in the order you list them. So, for instance, if you specify
high,default
for a worker, thedefault
queue will only be checked when thehigh
queue is empty. One big weakness of this implicit ordering is queue starvation: in a single-threaded worker, if you keep enqueuinghigh
jobs at a high rate,default
jobs will never be processed. The only way to get around this is to start up a dedicated worker for each queue. - Sidekiq supports Resque-like implicit ordering as well as explicitly assigning priorities to queues (
["critical", 7], ["default", 2]
). In this case, this means that when picking a job from the queue, the worker is likely to pick acritical
job 7 times out of 9, and adefault
job 2 times.
I'll be implementing the explicit queue priority approach, as I think it's the clearest and most predictable approach.
To implement this, we allow the user to specify the queue by defining a static queue
property:
class SendStorageAlmostUsedUpEmail < Gator::Queueable
queue_on :critical
end
We'll also support overriding it when enqueuing a job; you might have a job that should only be sent as high priority in certain scenarios:
case percent_storage_used
when 95
SendStorageUsageEmail.dispatch(account.id, 95, queue: :critical)
else
SendStorageUsageEmail.dispatch(account.id, 80)
end
(Neat trick: this allows us achieve Que's "set a priority on a job" approach—just use the queue as the priority.)
Right, let's go. First, we update the Queueable class so a job can specify its queue and override it at execution time.
def self.queue_on(queue)
@queue = queue
end
class << self
attr_reader :queue
end
def self.dispatch(*args, wait: nil, at: nil, queue: nil)
job = Gator::Models::Job.new(
name: self.name, args:,
next_execution_at: wait ? (Time.now + wait) : (at ? at : nil),
queue: queue ? queue : (self.queue || 'default'),
)
job.save
Gator::Logger.new.info "Enqueued job id=#{job.id} args=#{job.args} queue=#{job.queue}"
end
Now, over to the job server. Our CLI work
script needs to change to accept a list of queues:
OptionParser.new do |parser|
parser.on("-r", "--require FILE", "File to load at startup. Use this to boot your app.")
parser.on("-q", "--queue QUEUE", Array,
"A queue for this worker to check. Pass this flag multiple times to set multiple queues. " +
"Separate a queue from its priority with a comma. Example: '-q critical,8 -q default,1'"
) do |q, priority|
options[:queue] ||= []
options[:queue] << [q, priority ? Integer(priority) : 1]
end
end.parse!(into: options)
require options.delete(:require) if options[:require]
require_relative '../worker'
# CLI option name is `queue`,
# but Worker argument name is `queues`
options[:queues] = options.delete(:queue)
w = Gator::Worker.new(**options)
w.run
As for the worker itself, we need to adjust the worker to take an optional list of queues to check, along with their priorities. The queue list should be composed of [<queue_name>, <priority>]
tuples. You can also just pass a queue name to default the priority to 1. For example, in the list [[:critical, 6], [:high, 4], [:default, 2], :low]
, :low
has a priority of 1.
We'll also adjust our check_for_jobs
query to respect the queues the worker is assigned to. There are three rules:
- If no queues were specified, just pick the next job as usual.
- If queues were specified, but they all have the same priority, only pick up jobs that are in any of the queues.
- If queues were specified, and have different priorities, pick a job randomly using the queue priorities as weights.
So here we go (showing only the changed bits):
module Gator
class Worker
attr_reader :polling_interval, :logger, :worker_id, :queues
def initialize(queues: [])
@polling_interval = 5
@logger = Gator::Logger.new
@worker_id = "wrk_" + SecureRandom.hex(8)
# Normalize the list of queues
@queues = (queues || []).map do |q|
q.kind_of?(Array) ? q : [q, 1]
end
# If all queues have the same priority,
# we can precompute the query, to save time
all_queues_have_same_priority = @queues.map { _1[1] }.uniq.size == 1
@queues_filter = @queues.map(&:first) if all_queues_have_same_priority
end
def run
logger.info "Worker #{worker_id} ready"
queues.empty? ?
logger.info("Watching all queues") :
logger.info("Watching queues: #{queues.map { |q, p| "#{q} (priority=#{p})" }.join(', ')}")
loop do
if (job = next_job)
error = execute_job(job)
cleanup(job, error)
else
sleep polling_interval
end
end
end
# ...
def check_for_jobs
query = Models::Job.where(state: "waiting", reserved_by: nil)
query = query.where { (next_execution_at =~ nil) | (next_execution_at <= Time.now) }
if queues.empty?
logger.info "Checking all queues"
else
queues_to_check = queues_filter
query = query.where(queue: queues_to_check)
logger.info "Checking queues #{queues_to_check}"
end
logger.debug query.sql
query.first
end
def queues_filter
return @queues_filter if @queues_filter
# Weighted random sampling formula from Efraimidis and Spirakis,
# thanks to https://gist.github.com/O-I/3e0654509dd8057b539a
queue_to_check, _priority = queues.max_by(1) { |(_name, priority)| rand ** (1.0/priority) }.first
[queue_to_check]
end
def execute_job(job)
Object.const_get(job.name).new.handle(*job.args)
logger.info "Processed job id=#{job.id} result=succeeded queue=#{job.queue}"
nil
rescue => e
logger.info "Processed job id=#{job.id} result=failed queue=#{job.queue}"
e
end
# ...
end
end
I've added a bunch of logging statements so we can see what's going on at this stage.
Now, here's what happens when we start the worker without specifying any queues (bin/work.rb
).
When we specify queues with the same priority (bin/work.rb -q batch,2 -q default,2
):
And when we specify different priorities (bin/work.rb -q critical, 3 -q default
):
Neat!
Just to be sure our weighted random algorithm works, let's let it run for a bit. I've turned off the SQL logging, so all that shows is the queue being checked.
We're specifying a priority of 3
for critical
and 1
for default
. This means jobs processed from these queues should be in the ratio 3:1
. How did we do?
Our screenshot has 20 lines of "Checking..." logs, and 14 of them are for critical
, while 6 are for default
. So we have a ratio of 14:6, which is 2.3:1. Not exact, but good enough. (In fact, if the worker had checked critical
just once more instead of default
, we'd have had 15:5, which is exactly 3:1 😄.)
Anyway, this is a small sample size. As the number of jobs grows larger, the ratio will tend towards the expected value. (We could also write a test for it, but I've intentionally ignored testing for now.)
Okay, finally, let's watch it in action. I'm creating two jobs, a critical one and a regular-ass one.
class Jobs::CriticalJob < Gator::Queueable
queue_on :critical
def handle
sleep(rand(3))
end
end
class Jobs::RegularAssJob < Gator::Queueable
def handle
sleep(rand(3))
end
end
Let's do some dispatching (while the worker is stopped), just to see it in action:
3.times { Jobs::RegularAssJob.dispatch("arg") }
Jobs::CriticalJob.dispatch
Jobs::RegularAssJob.dispatch_many([["arg"]] * 4, queue: :critical)
(By the way, I've switched from making a route for every thing I need to running things in pry. pry -r ./app/boot.rb
)
That's a total of 3 jobs to the default
queue, and 5 to the critical
queue. Now, if we start the worker: we can see it processes the 5 critical jobs before it even gets to the default.
Note: it isn't always like this! There's every chance the weighted sample gives us the lower priority first, or even twice or three times in a row. As mentioned earlier, the effects are more obvious with a large sample size.
In fact, here's my test with a larger (albeit still small) sample size. I've modified the worker to loop only 200 times and log to stdout:
def run
- loop do
+ 200.times do
if (job = next_job)
# ...
end
end
end
➜ ruby .\gator\bin\work.rb -q critical,3 -q default `
| Select-String -Pattern '["critical"]' -SimpleMatch `
| Measure-Object -Line
Lines Words Characters Property
----- ----- ---------- --------
143
(It's PowerShell, so sorry if the syntax looks strange. 🤣)
critical
was checked times, so that's a ratio of 143:57, which is 2.51:1.
Retries
Final feature for the day is automatic retries. Implementation:
- We'll support different retry policies.
- The user can specify the retry policy per job class (or write a custom policy).
- After a job is executed, if it falls, we'll use the specified retry policy to calculate the next time we should try again.
Here's what the API will look like:
# Retry every 5 minutes until success, or at most 5 times
retry_with interval: 5 * 60, max_retries: 5
# Retry with exponential backoff
retry_with interval: :exponential
# Put retries on a different queue
retry_with interval: :exponential, queue: :retries
# Custom retry logic
retry_with(queue: :retries) do |exception, retry_count|
# Don't retry if the record has been deleted
next false if exception === RecordNotFound
next :exponential if exception === ExternalServiceDown
# Retry after 5 minutes, then 10, then 25
retry_intervals = [5 * 60, 10 * 60, 25 * 60]
retry_intervals[retry_count]
end
In summary, your choices:
- Specify an interval, either as a number or
:exponential
for exponential backoff, along with optionalmax_retries
andqueue
parameters. - Pass a block that will be called after every failure. The block can return the next interval (or
:exponential
), orfalse
to stop retrying.
As usual, we update the Queueable
class:
def self.retry_with(
interval: nil, max_retries: 10, queue: nil, &block
)
@retry_strategy = { interval:, max_retries:, queue:, block: }
end
class << self
attr_reader :queue, :retry_strategy
end
In the worker, the main change is in our cleanup
method:
def cleanup(job, error = nil)
job.reserved_by = nil
job.attempts += 1
job.last_executed_at = Time.now
job_class = Object.const_get(job.name)
if error
job.state = "failed"
job.error_details = error
set_retry_details(job_class.retry_strategy, job, error) if job_class.retry_strategy
else
job.state = "succeeded"
end
job.save
end
def set_retry_details(retry_strategy, job, error)
retry_strategy => { interval:, queue:, max_retries:, block: } # Ruby 3 "destructuring" 😍
retry_count = job.attempts - 1
if max_retries && retry_count >= max_retries
job.state = "dead"
return
end
if block
decision = block.call(error, retry_count)
if decision == false
job.state = "dead"
return
end
interval = decision
end
interval = (30 + (retry_count) ** 5) if interval == :exponential
job.next_execution_at = job.last_executed_at + interval
job.queue = queue if queue
end
Let's talk about the exponential
option there. Exponential backoff is a common strategy used to retry a task at progressively longer intervals. A good reason for this might be to reduce the load on the resource being accessed or give the problem time to be resolved.
It's called "exponential" because it's typically in the shape of an exponential curve, y = a + bᶜ—y increases slowly at first, but then faster and faster as b (or c) increases. In the real world, though, no one is checking your maths. The important thing is that the intervals increase at a reasonable rate. In fact, the Resque backoffs plugin uses a fixed list of intervals 1m, 10m, 1h, 3h, 6h
, rather than a specific formula. Sidekiq's formula is approximately next_interval = n⁴ + 15
, where n
is the current retry count. So the first retry happens after 15s, the second happens 16s after the first, the third happens 31s after the second, and so on.
I think Sidekiq's intervals are too close together, at least in the early stages; 3 retries within a minute is overkill. I went with interval = n^5 + 30
, because I like the spacing of the intervals. Let's check the first 15 retry intervals:
def y(x) = 30 + (x ** 5)
ONE_HOUR = 60 * 60
ONE_DAY = ONE_HOUR * 24
def print_duration(v)
return "#{v} seconds" if v < 60
return "#{v/60.0} minutes" if v < ONE_HOUR
return "#{v/ONE_HOUR} hours, #{(v % ONE_HOUR)/60.0} minutes" if v < ONE_DAY
"#{v/ONE_DAY} days, #{(v % ONE_DAY)/ONE_HOUR} hours, #{(v % ONE_HOUR)/60.0} minutes"
end
(0...15).map { print_duration y(_1) }
[
"30 seconds",
"31 seconds",
"1.0333333333333334 minutes",
"4.55 minutes",
"17.566666666666666 minutes",
"52.583333333333336 minutes",
"2 hours, 10.1 minutes",
"4 hours, 40.61666666666667 minutes",
"9 hours, 6.633333333333334 minutes",
"16 hours, 24.65 minutes",
"1 days, 3 hours, 47.166666666666664 minutes",
"1 days, 20 hours, 44.68333333333333 minutes",
"2 days, 21 hours, 7.7 minutes",
"4 days, 7 hours, 8.716666666666667 minutes",
"6 days, 5 hours, 24.233333333333334 minutes"
]
Finally, we have to update the query in our check_for_jobs
method to also look for failed jobs whose retry time is due.
query = Models::Job.where(state: "waiting").
where { (next_execution_at =~ nil) | (next_execution_at <= Time.now) }.
or(state: "failed", next_execution_at: (..Time.now)).
where(reserved_by: nil)
And finally, demo time! With a two-minute interval and a custom queue:
class Jobs::WithRetryInterval < Gator::Queueable
queue_on :interval
retry_with interval: 2 * 60, queue: :retries
def handle(*)
logger.warn "Going to fail..."
raise "Always fails"
end
end
Jobs::WithRetryInterval.dispatch
Retries every 2 minutes and keeps going, because we didn't set a max_retries
. ✅
With exponential retry and a limit:
class Jobs::WithExponentialRetry < Gator::Queueable
queue_on :exponential
retry_with interval: :exponential, max_retries: 4
def handle(*)
logger.warn "Going to fail..."
raise "Always fails"
end
end
Jobs::WithExponentialRetry.dispatch
You can see the time intervals between retries matches up with our earlier calculations.✅
With a block:
class Jobs::WithCustomRetryLogic < Gator::Queueable
queue_on :custom
retry_with do |exception, retry_count|
[10, 20, 30][retry_count] || false
end
def handle(*)
logger.warn "Going to fail..."
raise "Always fails"
end
end
Jobs::WithCustomRetryLogic.dispatch
Uses our custom retry intervals (10s, 20s, 30s), then stops. ✅
Phew, that's a lot! See you in Part 3! Code on GitHub.
I write about my software engineering learnings and experiments. Stay updated with Tentacle: tntcl.app/blog.shalvah.me.