Experiments in concurrency 3: Event loops
New to this series? Check out Parts 1 and 2 first.
Last time, I wrote about how coroutines help enable concurrency, and how they work in JavaScript. I spent some time after that trying to implement coroutines in Ruby and PHP, but inspired by these two articles, I decided to go bigger—implement an event loop.
What's an event loop?
An event loop is a loop that runs your code and does things based on some events. That's vague, I know, but it'll become clearer as we go.
It's called a loop because it really is a loop. In fact, this is basically what happens in an event loop:
function runEventLoop() { runUserCode(); while (taskQueue.isNotEmpty()) { runNextTask(); } }
First, it executes your code, statement-by-statement. Then, if there are any tasks in the task queue (which your code might have added), it executes them one-by-one. It'll keep looping until there are no more tasks, and then it exits. For instance, take this JS code:
// index.js const x = 3 + 6; setTimeout(() => { console.log("I'm a task!"); }, 2000); fs.readFile("some-file.txt", (contents) => { console.log("Also a task!"); }); console.log(x);
When you run this with node index.js
, it executes each statement in the file. By the time that's done, there are now two tasks in the queue: the tasks added by setTimeout()
and fs.readFile()
. The event loop waits until those tasks are ready and executes them. When they're completed, the task queue is empty, and the event loop exits, ending the script.
JavaScript's event loop is implicit, so you don't have to run it manually. We'll be implementing an event loop in Ruby, so we'll have to explicitly start the loop with our code inside, so we can enter the loop. Here's how using our event loop will look like:
loop = EventLoop.new loop.run do # do stuff loop.set_timeout(2000) do puts "I'm a task!" end # do stuff end
Task queues
As we've seen, event loops are powered by a task queue. This queue is typically FIFO (first in, first out)—so tasks that get added earlier will get executed first. Let's implement our EventLoop
class: its constructor creates a new task queue, while its run
method executes the block that it's given, then waits until the task queue is empty.
class EventLoop def initialize @queue = TaskQueue.new end def run(&entrypoint) entrypoint.call until @queue.empty? callback = @queue.next callback.call if callback end end end
(We're going to be passing blocks and procs around a lot with and without the &
notation; check out my article on function handles if you're not familiar with those in Ruby.)
Now, let's implement our TaskQueue
. Normally, you'd use a simple array and manipulate it as needed, but there's an important detail: not all tasks are equal. For example, a task on a timer (like setTimeout()
) should be executed when the timer runs out, even if it was added to the queue last. To make things worse, JavaScript has a "microtask" queue—tasks on this queue get run before any other tasks. And then there's process.nextTick()
in Node.js, which runs before the microtask queue. That's a lot of queues!
Node.js' event loop handles this by running in phases: there's a "timers" phase where it checks for any timers that are due, a "pending callbacks" phase where it checks for I/O callbacks (like the callback you passed to fs.readFile()
), and more. The phases have a specific order, so that the tasks with higher priority get executed first.
However, we'll keep things simple: we'll still expose a single queue to our event loop, but that queue actually holds two sub-queues: one for timers, and one for any other callbacks.
class TaskQueue def initialize @callbacks = [] @timers = [] end def empty? @timers.empty? && @callbacks.empty? end # Get the next task from the queue def next # TODO end end
Timers
Let's see if we can implement our own version of JavaScript's setTimeout
. As we saw earlier, it should take a time and a block to be executed when the time is due:
loop.set_timeout(2000) do puts "This task will be executed after 2s" end
Okay, implementation. Running set_timeout
should add a timer task to the queue, telling the queue the time when the task should be executed:
class EventLoop # ... def set_timeout(timeout, &callback) current_time = Time.now.to_f * 1000 @queue.add_timer(current_time + timeout, callback) end end
Now, back to the queue. We know that timers have higher priority than other tasks: they should be executed when the execution time is due, even if there are tasks in the queue ahead of them. This is why they have their separate queue. But they also have priority amongst themselves. If I run set_timeout(2000)
and then set_timeout(1000)
, I'd expect the second timer to be executed first, since it has a shorter timeout. There's a data structure for this: a priority queue.
A priority queue is like a regular queue, but it sorts items by their priority. When adding an item you specify its priority; when you ask for the next item, it gives you the one with the highest priority. This fits our use case—we can use the scheduled time as the priority for each timer, so when we ask the queue for a timer, we get the timer that's supposed to fire next.
Rather than implementing mine, I'll be using the priority queue from this library. We'll replace our @timers
array with a priority queue instead. Here's how our queue will work:
- We add timers to the timers queue using the inverse (negative) of their scheduled time (
-scheduled_time
) as priority. We're using the inverse because thisPriorityQueue
gives us highest priority items next, but we want items with the smallest time first (ie the timers that are happening soon). - Whenever
next()
is called on ourTaskQueue
, we fetch the next timer. If its time is due, then we return it, otherwise we return the next callback, ornil
, and try again later.
require 'algorithms' class TaskQueue def initialize @callbacks = [] @timers = Containers::PriorityQueue.new end def empty? @timers.empty? && @callbacks.empty? end def next next_timer = @timers.next current_time = Time.now.to_f * 1000 if next_timer && (current_time >= next_timer[:scheduled_time]) # Timer's due; remove it from the queue @timers.pop return next_timer[:callback] end if @callbacks.length return @callbacks.shift end nil end def add_timer(scheduled_time, callback) priority = -scheduled_time @timers.push({ scheduled_time: scheduled_time, callback: callback }, priority) end end
All good!
Coroutines
Let's head back to coroutines, to see how they can be used to implement "asynchronous" operations with an event loop. Here's a Ruby version of our synchronous copy-and-stringify example from last time:
require 'json' array = 8_000_000.times.map { |i| {a: i} } copied = [*array] puts "Copied" stringified = copied.to_json puts "Stringified"
Now let's convert this to a coroutine. Ruby doesn't have generators, but it has something called fibers that lets us achieve the same thing (pause and resume execution of a block). Here's how you'd write a fiber:
example_fiber = Fiber.new do puts "Executing Part 1" Fiber.yield puts "Executing Part 2" Fiber.yield puts "Executing Part 3" Fiber.yield return "Finished" end example_fiber.resume # Prints "Executing Part 1" example_fiber.resume # Prints "Executing Part 2" example_fiber.resume # Prints "Executing Part 3"
You create a fiber by passing in a block to Fiber.new
. Inside that block, you call Fiber.yield
to pause execution, like JavaScript's yield
. The caller can start or resume the fiber by calling resume
.
Here's how we would write our copying and stringifying tasks as coroutines using fibers.
copy_fiber = Fiber.new do |arr| copied_array = [] arr.each_with_index do |item, index| if index > 0 && (index % 50_000).zero? Fiber.yield # Pause every 50_000 items end copied_array.push item end puts "Done with copying" copied_array end stringify_fiber = Fiber.new do |arr| result = "[" arr.each_with_index do |item, index| if index > 0 && (index % 50_000).zero? Fiber.yield # Pause every 50_000 items end result += item.to_json result += "," end result[result.length - 1] = "]" puts "Done with stringifying" result end
Now let's implement the dispatcher as part of our event loop. We can add a new run_coroutine
method to our EventLoop
class:
class EventLoop # ... def run_coroutine(fiber, *args) fiber.resume(*args) if fiber.alive? @queue.add { self.run_coroutine(fiber) } end end end
Key details:
- the
run_coroutine
method takes in the fiber and any arguments to be passed to it, then starts the fiber by runningfiber.resume
with those arguments - we use the
fiber.alive?
method to check if there's still more of the fiber left to execute. If there is, then we add a new task to the task queue.
With that, we've achieved concurrency. By adding a new task to the queue, we're handing control to the event loop to continue the coroutine whenever it's free.
Let's implement the add
method in our TaskQueue
. Its job is very simple: add a new task to the @callbacks
array:
class TaskQueue def add(&callback) @callbacks << callback end end
Let's test out what we have so far:
loop = EventLoop.new loop.run do loop.run_coroutine(copy_fiber, array) loop.run_coroutine(stringify_fiber, array) loop.set_timeout(1200) { puts "Executed after 1.2s" } loop.set_timeout(800) { puts "Executed next" } loop.set_timeout(2200) { puts "Executed last" } puts "Executed first" end
Here it is in action (note: I reduced the size of the array because the Ruby version on repl.it is somewhat slow). Click "▸" to run the main.rb
; click the "Files" icon to see event_loop.rb
:
Sweet!
Continuations
We've implemented coroutines, but we can go a step further: how about if we add a piece of code we want to run after the coroutine is done?Think about fs.readFile()
in Node.js—you pass in the file you want to read, and then a function you want to execute when that is done (known as a callback or continuation).
fs.readFile(fileName, (contents) => {
// do stuff with the file
});
This is helpful in case we have some other operation that depends on the result of the coroutine (for example, our stringifying operation could use the result of the copy operation).
To do this, we add another argument to our run_coroutine
method to represent the continuation. Once the coroutine finishes (fiber.alive?
returns false), we capture its return value and add a new task to the queue to run the continuation later, passing it the return value.
class EventLoop # ... def run_coroutine(fiber, *args, &continuation) return_val = fiber.resume(*args) if fiber.alive? @queue.add { self.run_coroutine(fiber, &continuation) } else if continuation @queue.add { continuation.call(return_val) } end end end end
Now we can do this:
loop.run do loop.run_coroutine(copy, array) do |copied| # Continuation after copy puts "Copy is DONE." loop.run_coroutine(stringify, copied) do # Continuration after stringify puts "Stringify is DONE." end end end
If we wanted to avoid the extra indentation and potential "callback hell", we could wrap this in a custom Promise
implementation, complete with .then/.catch
and even async/await
, but I'll stop here.
For completeness, we can add an add_callback
method to the event loop to provide an easy way for external code to directly add new tasks to the queue—it'll come in handy later.
def add_callback(&callback) @queue.add &callback end
Handling requests
Now let's make our event loop actually useful: we'll write a basic HTTP server. Ruby's default execution model is similar to PHP's (blocking and synchronous), so a single-threaded server would have to finish handling one request before it can accept new ones. For example, here's a simple HTTP server:
require 'socket' server = TCPServer.new("127.0.0.1", 5678) puts "Listening on localhost:5678" while socket = server.accept # Wait for a new connection request = socket.gets.strip puts "Started handling req #{request}: #{Time.now}" sleep 5 puts "Responding 5 seconds later: #{Time.now}" socket.puts <<~HTTP HTTP/1.1 200 OK Content-Type: text/html Hii 👋 HTTP socket.close end
When I test this with autocannon making three simultaneous requests (autocannon --connections 3 --amount 3 --timeout 10000 --no-progress http://localhost:5678/
):
Listening on localhost:5678
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:31 +0100
Responding 5 seconds later: 2021-04-29 10:23:36 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:36 +0100
Responding 5 seconds later: 2021-04-29 10:23:41 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:41 +0100
Responding 5 seconds later: 2021-04-29 10:23:46 +0100
As expected, requests are handled sequentially. Each request does something that takes 5 seconds, and the thread has to wait for that to finish before handling the next request, so it takes 15 seconds in total. Let's change this to a concurrent setup using our event loop.
def handle_next_request(server, &handler) begin socket = server.accept_nonblock request = socket.gets.strip puts "Started handling req #{request}: #{Time.now}" handler.call(socket) add_callback { handle_next_request(server, &handler) } rescue IO::WaitReadable, Errno::EINTR add_callback { handle_next_request(server, &handler) } end end run_loop do server = TCPServer.new("127.0.0.1", 5678) puts "Listening on localhost:5678" handle_next_request(server) do |socket| set_timeout(5000) do puts "Responding 5 seconds later: #{Time.now}" socket.puts <<~HTTP HTTP/1.1 200 OK Content-Type: text/html Hii 👋 HTTP socket.close end # A few extra timers, to look cool 😎 set_timeout(0) { puts "0ms later: #{Time.now}" } set_timeout(1500) { puts "1.5s later: #{Time.now}" } end end
It looks a little complicated at first, but it's not. Here are the important bits:
begin/rescue
is Ruby's try/catchserver.accept_nonblock
is key here.server.accept
(the earlier version) checks to see if there's a new incoming request, and waits until there is one.accept_nonblock
, however, throws an error if there's no incoming connection (this is why we use thebegin/rescue
), giving you the freedom to do other things before checking again. And that's what we're doing here: if there isn't any new connection, we add a task to check again, allowing the event loop to execute any pending tasks first. On the flip side, if there's a request,handle_next_request
will call the handler block you passed in, allowing you to respond to the request.
Note: I made the code a little cleaner by using some Ruby abstractions to hide the creation of the loop, and to make our set_timeout
, run_coroutine
, and add_callback
methods global. See the full code in this gist.
Now, when I try making three concurrent requests:
Listening on localhost:5678
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
1.5s later: 2021-04-29 11:02:26 +0100
1.5s later: 2021-04-29 11:02:26 +0100
1.5s later: 2021-04-29 11:02:26 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Sweet! The server starts handling one request, schedules the timeouts and switches to another, so no connection is kept waiting. And, in this case, we get a speed boost—the server responds to all requests in 5s total (rather than 15s), since it doesn't wait for one to finish.
Why use an event loop?
You might already be convinced, but let's formally answer this. Why use an event loop? Why try to make the language behave differently? The answer is concurrency (duh).
Event loops are a way to achieve concurrency without having to add more threads or processes. Remember in part 1 of this series, we saw how a single-threaded server in PHP has zero concurrency? With an event loop, we can split up handling one request into separate tasks, allowing for our app to switch over to handling a new request. In fact, the example of a concurrent webserver in PHP uses Amp's event loop, along with asynchronous coroutines like \Amp\File\get()
:
\Amp\Loop::run(function () { $sockets = [ Server::listen("0.0.0.0:8080"), ]; $server = new HttpServer($sockets, new CallableRequestHandler(function (Request $request) { $file = yield \Amp\File\get(__FILE__); return new Response(Status::OK, ["content-type" => "text/plain"], "Hello, World!"); }), new NullLogger); yield $server->start(); });
Many other popular async libraries implement an event loop, including ReactPHP, async (Ruby), and EventMachine (Ruby).
But it's not just the idea of doing multiple things at once that makes event loops cool. It's the ability to utilise "idle time". For example, with server.accept
, we spend a good deal of time waiting for the next connection. What if we could do something else while waiting? server.accept_nonblock
+ the event loop allows for that. If we go further and implement asynchronous I/O like file reads, database queries, network calls, we'll see the difference even more—in that time our code spends waiting for the OS/database/remote server to return results, we can switch to doing something else.
Event loops shine in event-driven contexts. New events (like new requests, complete database queries) add new items to the task queue, and the event loop is able to switch over and handle those. Nginx uses an event loop, and so do many other GUI systems.
I've mostly mentioned webservers in this series, but concurrency is needed in a lot of places. For instance, games have to process user input while rendering the game world and a making a bunch of calculations. Mobile apps have to handle user input, fetch data from remote APIs, draw UIs and more. An event loop is one way of making this work.
Hey👋. I write about interesting software engineering challenges. Want to get updated when I publish new posts? Just visit tntcl.app/blog.shalvah.me.
(Confession: I built Tentacle.✋ It helps you keep a clean inbox by combining your favourite blogs into one weekly newsletter.)