Message Broker: Maybe Invented Here

NIH (Not Invented Here) is a common initialism that refers to excluding solutions that the project itself did not create. For example, some game studios NIH their own game engines, where others license an existing engine. There are advantages to both and neither is always correct. For me, I have a tendency to NIH things when working in Rust. This post is to help me understand why.

When I started with Rust, I built an example bot for one of the The AI Games challenges. The focus of that project was parsing the protocol and presenting it in a usable way for folks to get started with and extend into their bot. I built my parser from scratch, at first focusing on getting it working. Then spending time looking at other parsers to see how they go faster, and what the their interfaces in Rust look like. I updated the implementation to account for this research and I learned a lot about both Rust itself and implementing low level parsers.

I did similar for another couple projects. Spending a lot of time implementing things that there were libraries to do or at least assist with. Something I’ve started to notice over time: I’m using libraries for the parts I used to NIH. Especially the more serious I am about completing the project.

In my broker I’m doing little parsing of my own. I resisted using PlainTalk for my text protocol because I didn’t want to write the parser in a few languages. I’m using JSON since most languages have an implementation already, even if it isn’t the best to type. My only libraries so far are for encoding and decoding the allowed formats automatically. This means my socket and thread handling has been all custom or built into Rust itself.

I definitely get joy out of working at those layers. Which is an easy explanation for NIHing those parts while working on a project in general. I’m also learning a lot about the design as I implement my own. But I find myself at a crossroad. Continuing to NIH the layer and spend a week on getting a workable socket, thread, and job handling story. Or I can entangle the fate of my project with the Rust community more. To explain lets talk about some pros and cons of NIH or using other’s projects.

NIHing something means you can build something custom to your situation. It often takes more up front time than using an external solution. Your project will need to bring in or build up the expertise to handle that solution. The more central to the heart of your project, the more you should NIH. If the heart of your project is learning, then it could make sense to NIH as much as possible.

Using something external means doing research into the many solutions that could fit. Narrowing down to a final few or one solutions to try. Then learning how to use that solution and adapt it to your project. Often the solution is not a perfect fit but the savings on time and required expertise can make up for it. There is an accepted risk of the external project having different goals or being discontinued.

This morning I found myself staring down the problem of reading from my sockets in the server. Wanting to be efficient on resources I didn’t want to only rely on interval polling. I started by looking in the Rust standard library for a solution. The recommendations are to create a thread per connection, use interval polling, or external libraries. Thread per connection wont work for me with my goals. The resource cost of switching between a lot of threads shadows the cost of the work you are trying to perform. I had already ruled out interval polling. A less recommended path is wrapping the lower level mechanisms yourself.

So, I started looking into more and less complete solutions to these problems. When using less complete solutions, you can glue a few together. Creating a normalized interface on top of them that your project can use. The more complete solutions will do that normalization for you. Often at a cost of not closely matching your needs. This brings me to what I mean by entangling my project’s fate with the Rust community more.

Tokio is a project the Rust community has started to center around for building services, protocol parsers, and other tools. Designed to handle a large part of the asynchronous layer for users. I heard about it at RustConf 2016 and read about it in This Week In Rust. My understanding stayed high level and I’ve not had any serious Rust projects to apply it. I began looking into it as a solution for my broker and was delighted. Their breakdown of this problem is similar to how I have been designing my broker already. The largest difference being their inclusion of futures.

The architecture match with Tokio, as well as the community’s energy, makes it a good choice for me. I’ll need to learn more about their framework and how to use it well as I go. But, I’m confident I’ll be able to refactor my broker to run on top of it in a day or so. Then I can get the rest of the minimal story for this message broker done this week. Once I have it doing the basics with at least the Rust driver, I’ll open source it.

Message Broker: Into String

Strings in most native or performance focused languages tend to present a fair amount of complexity and Rust is no exception to this. There are cases where you have a struct that needs to have a name, such as:

struct ServiceHandle {
    name: String,
}

The first ServiceHandle::new() function I would typically write when first learning Rust would look like this:

fn new(name: String) -> ServiceHandle {
    // other init stuff
    ServiceHandle { name: name }
}

In the real world I generally have a String to pass to ServiceHandle::new(String) so this API works well. But when I’m writing tests for my code, I want to pass hard coded values with type &'static str. In order to do that, I have to call one of the conversion functions.

ServiceHandle::new("listener".into());
ServiceHandle::new("listener".to_owned());
ServiceHandle::new(String::from("listener"));

If I change the signature to something like:

fn new(name: &str) -> ServiceHandle {
    // other init stuff
    ServiceHandle { name: name.to_owned() }
}

Then I have to remember to prefix String with & when passing to the function. Another possibility is to from_string and from_str which is what I started to rely on next. Then I don’t have to remember as much, just use the right one for the right type.

fn from_str(name: &str) -> ServiceHandle {
    // skip init stuff, let from_string do it
    ServiceHandle::from_string(name.to_owned())
}

fn from_string(name: String) -> ServiceHandle {
    // other init stuff
    ServiceHandle { name: name }
}

This gets me into a better state where it is easy to remember what string type it takes, but it feels clumsy. Rust provides the From and Into traits that types can implement to enable more generic coding. They are as their name implies a way to automatically change types. They are “reflexive” which means that if one of them is implemented the other can use it. For example, impl From<A> for B would allow you to write a function like fn thing<T: Into<B>>(arg: T) which could be called like thing(A {}).

So the next iteration of my ServiceHandle::new() went generic:

fn new<S: Into<String>>(name: S) -> ServiceHandle {
    ServiceHandle { name: name.into() }
}

This allows calling with String, &str or several other types that can automatically be converted into a String. Making writing testing code with &'static str simple, while dynamically generated String objects still a first class citizen.

Message Broker: Goals and (De)Motivations

Recently, I read a twitter rant that described message brokers as poor combination load balancer, database, and service discovery tools. It hit me hard since I’d just spent a week diving into writing my own message broker. While I had my dislikes of brokers, I think they are handy tools. The tweeter stated that many of these things should be built into the services. The goal of which to keep the heavy work out of the center of the system. Message brokers doing the opposite when used as a central bus.

Having this description of the problem space is turning out to be nice. It gives me some different framing for the various parts of the message broker I’ll be building and the underlying needs. It also pointed out a heavy flaw that message brokers as a central bus can cause trouble in some systems. While that twitter rant dismayed me at first, I now feel even more energized in building this tool.

This framing of load balancer, database, and service discovery reminds me to go read up on that tech as well. Sourcing papers for those problems while looking into queuing related things. I can acknowledge and make sure these subproblems get solved well enough for my intended scale. That will be a key part of my design going forward, keeping my decisions favoring small to medium scale. I’ve seen message brokers work well in those scenarios and want to make an even better one of those.

This doesn’t mean one couldn’t use the broker in a larger scale operation. But, I’m architecting it to encourage deliberate clustering beyond medium scale. Clustering acknowledges the fact that there are usually groups of services that are able to meet a work request without speaking outside of their group except for one or two edges. What I hope to discover as part of the development process is how to encourage this. Whether documenting and creating examples will be enough, or if I’ll need more core features.

I think keeping the message broker light weight will be instrumental in encouraging clustering. If the message broker is heavy, folks wouldn’t want to run too many instances. If it requires a lot of tuning to be useful, folks will want to only tune it once as a central bus. Side note: as I typed this I realized this is why Redis is so good.

Among the lofty design and architecture goals I want to mention my motivations and put the goals in perspective. This project’s main goal is to be a learning project. I want to better understand the internals of message buses. Most green field backend projects will be utilizing a message bus and smaller services. Understanding the internals of the message bus and keeping them in mind will let me design better services.

I also want to build a complex, performance focused, realistic piece of software in Rust. I find the language fun to work with and writing my own thread orchestration that is safe is delightful. As I build up the basics in the broker and client, I’m learning a lot of practical Rust skills. Like many others writing and coding in Rust in their free time, I’m hoping this will help encourage more jobs writing Rust. If I’m lucky enough, I’ll get to secure one of those jobs.

Message Broker: Channel Naming

I’ve started building a message broker as a learning project. There are several out there such a RabbitMQ, Kafka, Redis’ pub/sub layer, and some brokerless message queue solutions like 0mq. Having used many of them over the years and studying the topic both from the classic “Enterprise Integration” side and the more modern/agile “Microservices” side, I figured I’d try my hand at implementing one.

In this blog post, I’m going to go over how I designed and implemented channel naming. Channel names fill the role of data descriptor used by the publishers and the role of query language for subscribers. This turned out to be a delightful series of problems to explore. Questions such as “how do people use them”, “what features are expected”, “what limitations are common” came up. Realizing that my message broker is essentially providing a naming framework that will have at least some opinion, I needed to ask “are there practices I want to encourage or discourage” and recognize my influence.

In almost all message queue systems, channels have a string based name. This name may be broken up by delimiters, and that delimiter is sometimes chosen by the client or in a config on the broker. Most support the ability to wild card parts of the channel when subscribing. Sometimes the wild card is purely string based, in delimited channels often the wild card is done by chunk. Wild cards are sometimes restricted in what positions they can be in, almost always allowed at the end, sometimes in the middle and sometimes disallowed at the start.

When deciding between these I also wanted to keep in mind what solutions are relatively easy to code, straightforward to debug, and allow for acceptable performance. Not allowing wild cards at all means that simple string comparison works, but omits a common feature. Using simple strings and a well known text query setup like regular expressions would give huge amounts of flexibility in selecting what messages you’d like, but comes with a large performance cost, libraries, and is much harder to spot debug. I decided that since I’m not going to use a common query language, that I’d want to make the names as simple as possible to parse.

A Rope is a data structure I’d heard about a few times and I knew it had something to do with making string operations faster, but I was hazy on the details. Since this is a project without deadlines, I set about reading a paper on them to learn more. Shortly into the paper it was clear this wasn’t the solution for me, ropes are designed for larger bodies of text, manipulating that text in a variety of ways, and making common editor features easier to implement. But it shared that part of the problem was breaking down the text, and part of it was comparing text.

Since channel names are often a hierarchy that uses the delimiter to separate the layers, I split the name using that delimiter into an array. But that then left me with a bunch of smaller strings I had to compare, which seemed much slower than just walking through the name and query once, using the wild cards to skip characters. To avoid a char by char parse on every channel name comparison, I drew on the common native layer practice of hashing strings then comparing the hashes. Hashing has an up front cost of processing the string into a numeric form, but then makes comparisons extremely fast. Since a message’s channel would be used to query for appropriate subscribers, it could be hashed once and compared many times. An unfortunate side effect of hashing the substrings though, I wouldn’t be able to allow partial segment matches. The wild card would be all or nothing in a given position, just a.* no a.b*.

I decided that side effect of only allowing wild cards at the segment layer was ultimately a good thing. While there may be transition times where the feature of partial segment matching would help, it would allow people to break the idea that each segment is a complete chunk of data. This similar reasoning is why the only selectors are exact and wild card, no numeric or alpha only sorts of selection.

After all this research, thinking, note taking, and general meandering thing the computer science fields, I started to implement my solution in Rust. My message broker isn’t actually ready for the channel names yet, as I’m still working on how to manage connections and properly do the various forms of store and forward. But this problem tickled me so I set about solving it anyway. I created a file channel.rs to try building these ideas.

I started with a basic struct with the string form of the name for debugging, and a hashed form of the name for comparisons.

struct Channel {
    raw_name: String,
    hashed_name: Vec<Option<u64>>
}

raw_name is a String so the struct can own the string without any lifetime concerns of a str, this value will mostly be used for debugging or admin purposes. The hashed_name is a Vec so it can be variable sized, currently I have no limit on the number of delimiters you can use. Option is what I used to handle the wild card. If it was Some<u64> then you have a hash to compare. If it was None then it was wild card and you don’t have to compare it. After thinking harder though, I realized that I didn’t want to have the binary Option as my indicator for whether to use a wild card or not. If I added a new type of wild card, for instance, one that allowed any number of segments, I’d have to replace my Option usage everywhere. So instead I’ve preemptively changed to using my own ChannelSegment type like so:

struct Channel {
    raw_name: String,
    hashed_name: Vec<ChannelSegment>
}

enum ChannelSegment {
    Wild,
    Hash(u64)
}

Next, I set about parsing the input. I knew that hard coding strings in tests meant that I wanted to have a from_str variant. People will often have hard coded channel names but there will also be generated ones, and for that allowing a from_string is nice. I also knew I was going to turn it into a String anyway to assign to raw_name. So I did the following to enable both:

pub fn from_str(input: &str) -> Result<Channel, String> {
    Channel::from_string(input.to_owned())
}

pub fn from_string(input: String) -> Result<Channel, String> {
    // parsing code goes here
}

Parsing was a pretty simple matter. Using String.split(char) to get an iterator returning each segment. Then relying on Rust’s pattern matching for cases I specifically cared about matching, like empty string (which I made into an error to prevent mistakes) and "*" which is the wild card character. Building up the hashed name, then returning it.

let mut hashed_name = Vec::new();
for chunk in input.split('.') {
    match chunk {
        "" => {
            return Err("empty entry is invalid".to_owned());
        }
        "*" => {
            hashed_name.push(ChannelSegment::Wild);
        }
        _ => {
            hashed_name.push(ChannelSegment::Hash(calculate_hash(&chunk)));
        }
    }
}
Ok(Channel{
    raw_name: input,
    hashed_name: hashed_name
})

One could easily see using a LRU Cache (Least Recently Used cache that removes the oldest entries when it gets too full) to skip parsing the channel name for the most commonly used channels, but I’m not doing that yet until this proves to be a part that is slowing me down.

To compare between two Channels I added a .matches(&Channel) method. I decided against implementing PartialEq since I wouldn’t be testing for exact match, but instead match when considering wild cards, and most developers expect a more exact match when using ==.

pub fn matches(&self, other: &Channel) -> bool {
    if self.hashed_name.len() != other.hashed_name.len() {
        return false;
    }
    for (a, b) in self.hashed_name.iter().zip(&other.hashed_name) {
        if let (&ChannelSegment::Hash(inner_a), &ChannelSegment::Hash(inner_b)) = (a, b) {
            if inner_a != inner_b {
                return false;
            }
        }
    }
    
    true
}

Since my only wild card is for a single whole segment, I know I can immediately return if they have different lengths. Then I zip the two hashed_name together so I can iterate through them at the same time. In other languages one would commonly just create an integer, increment that and use it to index into both of the sequences at the same time, ensuring to not walk past the end ourselves. In Rust we rely on iterators to give us fast access to our sequences with minimal checking, keeping us safe against others (or ourselves) mutating the sequences in dangerous ways while iterating. Using zip we can create an iterator that walks both sequences at the same time keeping our to our land of safety and speed.

As a side note, the calculate_hash function is one I pulled from the docs but changed a little bit to fit my style better:

fn calculate_hash<T: Hash>(t: &T) -> u64 {
    let mut hasher = DefaultHasher::new();
    t.hash(&mut hasher);
    hasher.finish()
}

A feature of Rust I really enjoyed while developing this, was the ability to write tests in the same file as I went. I could quickly write a few examples then make the code pass, focusing on just the higher level parts of the API and not testing the internals so much. Think more “this errors, this does not error” and less “this returns a vector of integers that are ascending…” while you are adding such tests, to make sure you can quickly change out the implementation while the idea keeps working, here are the tests I wrote as I was developing:

#[test]
fn create_basic_channel() {
    Channel::from_str("a.b.c").unwrap();
    Channel::from_str("name").unwrap();
}

#[test]
fn create_with_wildcard() {
    Channel::from_str("a.*.b").unwrap();
    Channel::from_str("*").unwrap();
    Channel::from_str("*.end").unwrap();
    Channel::from_str("start.*").unwrap();
}

#[test]
fn create_invalid() {
    assert!(Channel::from_str(".a.b").is_err());
    assert!(Channel::from_str("c.b.").is_err());
    assert!(Channel::from_str("g.l..b").is_err());
    assert!(Channel::from_str("").is_err());
}

#[test]
fn matches_with_self_exact() {
    let channel = Channel::from_str("a.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("a").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("dabbling.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("abba.bobble").unwrap();
    assert!(channel.matches(&channel));
}

#[test]
fn not_matches_exact() {
    let channel_full = Channel::from_str("s.t.r").unwrap();
    let channel_sub = Channel::from_str("s.t").unwrap();
    assert!(!channel_full.matches(&channel_sub));
    assert!(!channel_sub.matches(&channel_full));
}

#[test]
fn matches_with_self_wild() {
    let channel = Channel::from_str("a.*.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("*").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("*.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("abba.*").unwrap();
    assert!(channel.matches(&channel));
}

#[test]
fn matches_wild_card_one_side() {
    let wild_channel = Channel::from_str("alpha.beta.*").unwrap();
    let tame_channel = Channel::from_str("alpha.beta.charlie").unwrap();
    assert!(wild_channel.matches(&tame_channel));
    assert!(tame_channel.matches(&wild_channel));
}

Mostly mundane stuff, values all quickly typed in to ensure the general concept works, and common edge cases in parsing (start, end, middle) are covered. But note this isn’t combinatorially testing this, there isn’t unicode or other trouble points, those sorts of tests can come with time. Being able to add in a new test quickly when I noticed an edge case is easily one of my favorite features of Rust.

I’ll be open sourcing my work in progress soon, but for now it mostly lives in my notebook as some scribbles, and a smattering of mostly disconnected code on my laptop. If you have feedback on this blog post, how I’ve setup channels in my message broker, or generally about my rust code I’d love to hear it in comments below. Before the comments roll in though, I know using Strings for errors is not great, I just don’t have an error system setup in my project yet.