A Message Queue in Shell

I saw people being confused about message queues, so I figured I'd implement one in shell, just for fun. This would also let us explore the common pitfalls in implementing them.

What is a Message Queue?

A message queue is conceptually simple – it's a queue. You know, like the opposite of a stack. You put stuff in it, and later you can take stuff out. The “stuff” are messages. Of course, there is a very limited use-case to having a single queue. So instead, you want to have multiple, “named” queues (i.e “collections”).

Another part of a message queue is how you access it. There is a common type of MQ (Message Queue) called “pub/sub” – in this scheme, the MQ server keeps open connections to all the “subscribers” and sends all of them the message whenever it arrives. The other one is poll-based – the queue keeps each message until it gets explicitly read by a connection, via some sort of “reserve” keyword. This latter type is what we'll be implementing.

So, we have a few basic operations to implement:

That's really all there is to it! So let's get to implementing.

Storage Format

We can keep a queue in a flat file. We'll call it "$queue".queue. This allows us to have almost free writes – we just append to the file. Let's not worry about networking for now and write this down in ./lib/push.

# add message from stdin to queue
cat >> "$queue".queue

This has an obvious potential issue: shell operates line-wise, so what if the message we're writing is longer than a one-liner? We'll use the base64 utility. Note that it isn't part of POSIX, but neither is nmap's ncat (which we're going to be using for networking later), with both being extremely common.

We can now rewrite the above like this:

# add message from stdin to queue
base64 >> "$queue".queue

We're still assuming that we're going to get the message via stdin (somehow), and that the queue environment variable will be populated with the queue name.

Still, this storage format is pretty simple – the messages are listed, in order, oldest first, one per line. We can guarantee the “one per line” part because we base64-encode the messages.

Reading Just One Message

We need a way to pop items off the queue. Since we can guarantee there's only one message per line, it means getting the first message (first line) and everything else separately. Let's write a simple utility ./lib/pop that will print the topmost message (decoded), and truncate the old file.

# print message
head -n1 "$queue".queue | base64 -d
# get remaining messages
tail -n+2 "$queue".queue > "$queue".cut
# move post-cut queue into the true queue
mv "$queue".cut "$queue".queue

This has a few obvious disadvantages – it's full of crash-related race conditions. It does do the job, though, so we'll keep it for now.

Networking

We're going to use nmap's netcat implementation to handle networking for us. Initially, it'll look roughly like so, in ./launch:

export PATH="$(dirname $0)/lib:$PATH"
while ncat -l 1234 -c handler; do :; done

This will repeatedly make netcat listen on port 1234. Once a connection arrives, it'll run the handler binary found in PATH. Stdin of handler will be filled with data from the network pipe, and whatever handler prints to stdout will be sent back. Notably, stderr will not be sent over.

Let's write this handler, then: ./lib/handler:

#!/bin/sh
read cmd queue
[ -z "$queue" ] && queue=default
export queue

case "$cmd" in
pop) . pop ;;
push) . push ;;
esac

exit 0

This determines our wire format. The first line sent by the server will contain the command, followed by spaces or tabs, followed by an optional queue name. If there is no queue name, we assume the name is “default”. Currently, valid commands are “pop” and “push”, which run our previously made commands in .. Finally, after handling is done, we successfully quit.

If we want to add more commands, we can do it in the case "$cmd" section later.

Trying it Out

Let's launch it! ./launch

We can connect and see how things behave:

ncat localhost 1234
pop
^D # no output

ncat localhost 1234
push
package 1
^D

ncat localhost 1234
push
package 2
^D

ncat localhost 1234
pop
^D # package 1

Well, that's fun, it's already functional! We can improve it, however.

Crash Race Conditions

We're designing a serious MQ, so we need to think about potential failures. What if the process crashes during a transaction!? We should consider this.

If the launcher loop dies, the server is dead, not much surprise there, so we can ignore it. What if we pop, but crash after sending the data back, but before truncating the old data? This is actually relatively likely with larger queues because we need to process the entire file every time. Let's fix this by implementing a rudimentary rollback journal (first, without actually using it):

CUT="$queue".cut
OUT="$queue".out
QUEUE="$queue".queue

# determine message to send
head -n1 "$QUEUE" | base64 -d > "$OUT"

# calculate remaining messages
tail -n+2 "$QUEUE" > "$CUT"

# move post-queue into the true queue
cp "$CUT" "$QUEUE"

# send the message
cat "$OUT"

# delete rollback journal
rm "$CUT" "$OUT"

We now have a multipart rollback journal. Let's say we crashed before sending the message, and wanted to manually roll back the transaction. We could do that! We would need to write a base64 encoding of $OUT to a file, then append that file with $CUT, and we would have the old state back.

It bears noting that this is not how rollback journals are typically implemented – usually they're implemented by making a copy of the data, then operating on the “real” dataset, with rollback triggering a copy back, and a commit triggering a deletion of the journal. This non-traditional approach allows us to also keep the last transaction in mind for potential repeating, since we want to avoid dropping any jobs.

Of course because we have a queue, the actual state never has to be rolled back. New writes can be added to the state with no problem, and new reads can simply use the rollback journal's data as-is. With this understanding, we can now utilize it:

CUT="$queue".cut
OUT="$queue".out
QUEUE="$queue".queue

# create rollback journal if we don't have one yet
if ! [ -f "$CUT" ]; then
    # this step is idempotent
    head -n1 "$QUEUE" | base64 -d > "$OUT"
    tail -n+2 "$QUEUE" > "$CUT"
fi

# we might have been interrupted last round
# but this is idempotent
# so always do it
cp "$CUT" "$QUEUE"

# finish transaction and delete the rollback journal
cat "$OUT"
rm "$CUT" "$OUT"

Now the operations that take a while (the head, tail, and cp invocations) are guarded by the rollback journal. The only place where corruption can occur is between printing sending the message over and deleting the rollback journal. Furthermore, the consequence of this crash would simply be a repeat send of the message (a much less disastrous consequence than dropping a message).

We didn't eliminate the crash race condition per se, we simply reduced the odds of it triggering dramatically with only a handful of additional lines of code.

Let's take a similar approach for the push operation, but with a copy-on-write (CoW) write-ahead log (WAL). The idea behind the write-ahead log is that doing a verbatim write is faster than an append with post-processing, and that we can resume the post-processing later if need be. Let's look at what kind of workflow we expect to have:

QUEUE="$queue".queue
WAL="$queue".wal

# we perform a fast write
cat > "$WAL".1

# then we do the processing
base64 "$WAL".1 > "$WAL".2

# and the appending
cat "$QUEUE" "$WAL".2 > "$WAL".3

# then we commit
cp "$WAL".3 "$QUEUE"
rm "$WAL".*

As far as the client is concerned, as long as we do those other steps later, the push is done as soon as $WAL.1 is created. The processing can be done “in the background”, between invocations. Let's write the processor wal first:

QUEUE="$queue".queue
WAL="$queue".wal

# there's a transaction to handle
if [ -f "$WAL".1 ]; then
    [ -f "$WAL".2 ] || base64 "$WAL".1 > "$WAL".2
    # we always repeat this step,
    # in case a read has already changed the queue
    cat "$QUEUE" "$WAL".2 > "$WAL".3
    cp "$WAL".3 "$QUEUE"
    rm "$WAL".*
fi

Now we can call it as a part of our launcher loop:

#!/bin/sh
export PATH="$PWD/lib:$PATH"
# process any remaining transactions
checkpoint() (
    for queue in *.queue; do
        queue=$(basename "$queue" .queue)
        . wal
    done
)
checkpoint
while ncat -l 1234 -c handler; do
    # if the handler crashed, we can catch it here
    checkpoint
done

Just as before – we didn't entirely eliminate the crash race condition. After all, the server could crash in the middle of a push. And if we added a notification for completed pushes, the notification could fail to come, while the push would happen. However, we've significantly reduced the odds of queue corruption, to the point where we can avoid worrying about it as much.

Notably, this approach results in potentially missed or corrupted writes, as opposed to potential double-writes. This is to demonstrate how that's done, as opposed to the double-read philosophy we took with pop.

Parallelism

At this point, the server is done, if we're content with a single thread! What if two clients connect simultaneously? As of currently, they can't.

I'm done messing around for today, though, so maybe a follow-up (in a branch) will be made to provide parallelism.

You can find this version of the server over on github.