from Bunker Labs
I saw people being confused about message queues, so I figured I'd implement one in shell, just for fun. This would also let us explore the common pitfalls in implementing them.
What is a Message Queue?
A message queue is conceptually simple – it's a queue. You know, like the opposite of a stack. You put stuff in it, and later you can take stuff out. The “stuff” are messages. Of course, there is a very limited use-case to having a single queue. So instead, you want to have multiple, “named” queues (i.e “collections”).
Another part of a message queue is how you access it. There is a common type of MQ (Message Queue) called “pub/sub” – in this scheme, the MQ server keeps open connections to all the “subscribers” and sends all of them the message whenever it arrives. The other one is poll-based – the queue keeps each message until it gets explicitly read by a connection, via some sort of “reserve” keyword. This latter type is what we'll be implementing.
So, we have a few basic operations to implement:
- Write a message to a named queue.
- Read a message from a named queue.
That's really all there is to it! So let's get to implementing.
We can keep a queue in a flat file. We'll call it
"$queue".queue. This allows us to have almost free writes – we just append to the file. Let's not worry about networking for now and write this down in
# add message from stdin to queue cat >> "$queue".queue
This has an obvious potential issue: shell operates line-wise, so what if the message we're writing is longer than a one-liner? We'll use the
base64 utility. Note that it isn't part of POSIX, but neither is nmap's ncat (which we're going to be using for networking later), with both being extremely common.
We can now rewrite the above like this:
# add message from stdin to queue base64 >> "$queue".queue
We're still assuming that we're going to get the message via stdin (somehow), and that the queue environment variable will be populated with the queue name.
Still, this storage format is pretty simple – the messages are listed, in order, oldest first, one per line. We can guarantee the “one per line” part because we base64-encode the messages.
Reading Just One Message
We need a way to pop items off the queue. Since we can guarantee there's only one message per line, it means getting the first message (first line) and everything else separately. Let's write a simple utility
./lib/pop that will print the topmost message (decoded), and truncate the old file.
# print message head -n1 "$queue".queue | base64 -d # get remaining messages tail -n+2 "$queue".queue > "$queue".cut # move post-cut queue into the true queue mv "$queue".cut "$queue".queue
This has a few obvious disadvantages – it's full of crash-related race conditions. It does do the job, though, so we'll keep it for now.
export PATH="$(dirname $0)/lib:$PATH" while ncat -l 1234 -c handler; do :; done
This will repeatedly make netcat listen on port 1234. Once a connection arrives, it'll run the
handler binary found in PATH. Stdin of handler will be filled with data from the network pipe, and whatever handler prints to stdout will be sent back. Notably, stderr will not be sent over.
Let's write this handler, then:
#!/bin/sh read cmd queue [ -z "$queue" ] && queue=default export queue case "$cmd" in pop) . pop ;; push) . push ;; esac exit 0
This determines our wire format. The first line sent by the server will contain the command, followed by spaces or tabs, followed by an optional queue name. If there is no queue name, we assume the name is “default”. Currently, valid commands are “pop” and “push”, which run our previously made commands in
.. Finally, after handling is done, we successfully quit.
If we want to add more commands, we can do it in the
case "$cmd" section later.
Trying it Out
Let's launch it!
We can connect and see how things behave:
ncat localhost 1234 pop ^D # no output ncat localhost 1234 push package 1 ^D ncat localhost 1234 push package 2 ^D ncat localhost 1234 pop ^D # package 1
Well, that's fun, it's already functional! We can improve it, however.
Crash Race Conditions
We're designing a serious MQ, so we need to think about potential failures. What if the process crashes during a transaction!? We should consider this.
If the launcher loop dies, the server is dead, not much surprise there, so we can ignore it. What if we pop, but crash after sending the data back, but before truncating the old data? This is actually relatively likely with larger queues because we need to process the entire file every time. Let's fix this by implementing a rudimentary rollback journal (first, without actually using it):
CUT="$queue".cut OUT="$queue".out QUEUE="$queue".queue # determine message to send head -n1 "$QUEUE" | base64 -d > "$OUT" # calculate remaining messages tail -n+2 "$QUEUE" > "$CUT" # move post-queue into the true queue cp "$CUT" "$QUEUE" # send the message cat "$OUT" # delete rollback journal rm "$CUT" "$OUT"
We now have a multipart rollback journal. Let's say we crashed before sending the message, and wanted to manually roll back the transaction. We could do that! We would need to write a base64 encoding of
$OUT to a file, then append that file with
$CUT, and we would have the old state back.
It bears noting that this is not how rollback journals are typically implemented – usually they're implemented by making a copy of the data, then operating on the “real” dataset, with rollback triggering a copy back, and a commit triggering a deletion of the journal. This non-traditional approach allows us to also keep the last transaction in mind for potential repeating, since we want to avoid dropping any jobs.
Of course because we have a queue, the actual state never has to be rolled back. New writes can be added to the state with no problem, and new reads can simply use the rollback journal's data as-is. With this understanding, we can now utilize it:
CUT="$queue".cut OUT="$queue".out QUEUE="$queue".queue # create rollback journal if we don't have one yet if ! [ -f "$CUT" ]; then # this step is idempotent head -n1 "$QUEUE" | base64 -d > "$OUT" tail -n+2 "$QUEUE" > "$CUT" fi # we might have been interrupted last round # but this is idempotent # so always do it cp "$CUT" "$QUEUE" # finish transaction and delete the rollback journal cat "$OUT" rm "$CUT" "$OUT"
Now the operations that take a while (the
cp invocations) are guarded by the rollback journal. The only place where corruption can occur is between printing sending the message over and deleting the rollback journal. Furthermore, the consequence of this crash would simply be a repeat send of the message (a much less disastrous consequence than dropping a message).
We didn't eliminate the crash race condition per se, we simply reduced the odds of it triggering dramatically with only a handful of additional lines of code.
Let's take a similar approach for the
push operation, but with a copy-on-write (CoW) write-ahead log (WAL). The idea behind the write-ahead log is that doing a verbatim write is faster than an append with post-processing, and that we can resume the post-processing later if need be. Let's look at what kind of workflow we expect to have:
QUEUE="$queue".queue WAL="$queue".wal # we perform a fast write cat > "$WAL".1 # then we do the processing base64 "$WAL".1 > "$WAL".2 # and the appending cat "$QUEUE" "$WAL".2 > "$WAL".3 # then we commit cp "$WAL".3 "$QUEUE" rm "$WAL".*
As far as the client is concerned, as long as we do those other steps later, the push is done as soon as
$WAL.1 is created. The processing can be done “in the background”, between invocations. Let's write the processor
QUEUE="$queue".queue WAL="$queue".wal # there's a transaction to handle if [ -f "$WAL".1 ]; then [ -f "$WAL".2 ] || base64 "$WAL".1 > "$WAL".2 # we always repeat this step, # in case a read has already changed the queue cat "$QUEUE" "$WAL".2 > "$WAL".3 cp "$WAL".3 "$QUEUE" rm "$WAL".* fi
Now we can call it as a part of our launcher loop:
#!/bin/sh export PATH="$PWD/lib:$PATH" # process any remaining transactions checkpoint() ( for queue in *.queue; do queue=$(basename "$queue" .queue) . wal done ) checkpoint while ncat -l 1234 -c handler; do # if the handler crashed, we can catch it here checkpoint done
Just as before – we didn't entirely eliminate the crash race condition. After all, the server could crash in the middle of a push. And if we added a notification for completed pushes, the notification could fail to come, while the push would happen. However, we've significantly reduced the odds of queue corruption, to the point where we can avoid worrying about it as much.
Notably, this approach results in potentially missed or corrupted writes, as opposed to potential double-writes. This is to demonstrate how that's done, as opposed to the double-read philosophy we took with
At this point, the server is done, if we're content with a single thread! What if two clients connect simultaneously? As of currently, they can't.
I'm done messing around for today, though, so maybe a follow-up (in a branch) will be made to provide parallelism.
You can find this version of the server over on github.