Message Broker: Channel Naming

I’ve started building a message broker as a learning project. There are several out there such a RabbitMQ, Kafka, Redis’ pub/sub layer, and some brokerless message queue solutions like 0mq. Having used many of them over the years and studying the topic both from the classic “Enterprise Integration” side and the more modern/agile “Microservices” side, I figured I’d try my hand at implementing one.

In this blog post, I’m going to go over how I designed and implemented channel naming. Channel names fill the role of data descriptor used by the publishers and the role of query language for subscribers. This turned out to be a delightful series of problems to explore. Questions such as “how do people use them”, “what features are expected”, “what limitations are common” came up. Realizing that my message broker is essentially providing a naming framework that will have at least some opinion, I needed to ask “are there practices I want to encourage or discourage” and recognize my influence.

In almost all message queue systems, channels have a string based name. This name may be broken up by delimiters, and that delimiter is sometimes chosen by the client or in a config on the broker. Most support the ability to wild card parts of the channel when subscribing. Sometimes the wild card is purely string based, in delimited channels often the wild card is done by chunk. Wild cards are sometimes restricted in what positions they can be in, almost always allowed at the end, sometimes in the middle and sometimes disallowed at the start.

When deciding between these I also wanted to keep in mind what solutions are relatively easy to code, straightforward to debug, and allow for acceptable performance. Not allowing wild cards at all means that simple string comparison works, but omits a common feature. Using simple strings and a well known text query setup like regular expressions would give huge amounts of flexibility in selecting what messages you’d like, but comes with a large performance cost, libraries, and is much harder to spot debug. I decided that since I’m not going to use a common query language, that I’d want to make the names as simple as possible to parse.

A Rope is a data structure I’d heard about a few times and I knew it had something to do with making string operations faster, but I was hazy on the details. Since this is a project without deadlines, I set about reading a paper on them to learn more. Shortly into the paper it was clear this wasn’t the solution for me, ropes are designed for larger bodies of text, manipulating that text in a variety of ways, and making common editor features easier to implement. But it shared that part of the problem was breaking down the text, and part of it was comparing text.

Since channel names are often a hierarchy that uses the delimiter to separate the layers, I split the name using that delimiter into an array. But that then left me with a bunch of smaller strings I had to compare, which seemed much slower than just walking through the name and query once, using the wild cards to skip characters. To avoid a char by char parse on every channel name comparison, I drew on the common native layer practice of hashing strings then comparing the hashes. Hashing has an up front cost of processing the string into a numeric form, but then makes comparisons extremely fast. Since a message’s channel would be used to query for appropriate subscribers, it could be hashed once and compared many times. An unfortunate side effect of hashing the substrings though, I wouldn’t be able to allow partial segment matches. The wild card would be all or nothing in a given position, just a.* no a.b*.

I decided that side effect of only allowing wild cards at the segment layer was ultimately a good thing. While there may be transition times where the feature of partial segment matching would help, it would allow people to break the idea that each segment is a complete chunk of data. This similar reasoning is why the only selectors are exact and wild card, no numeric or alpha only sorts of selection.

After all this research, thinking, note taking, and general meandering thing the computer science fields, I started to implement my solution in Rust. My message broker isn’t actually ready for the channel names yet, as I’m still working on how to manage connections and properly do the various forms of store and forward. But this problem tickled me so I set about solving it anyway. I created a file channel.rs to try building these ideas.

I started with a basic struct with the string form of the name for debugging, and a hashed form of the name for comparisons.

struct Channel {
    raw_name: String,
    hashed_name: Vec<Option<u64>>
}

raw_name is a String so the struct can own the string without any lifetime concerns of a str, this value will mostly be used for debugging or admin purposes. The hashed_name is a Vec so it can be variable sized, currently I have no limit on the number of delimiters you can use. Option is what I used to handle the wild card. If it was Some<u64> then you have a hash to compare. If it was None then it was wild card and you don’t have to compare it. After thinking harder though, I realized that I didn’t want to have the binary Option as my indicator for whether to use a wild card or not. If I added a new type of wild card, for instance, one that allowed any number of segments, I’d have to replace my Option usage everywhere. So instead I’ve preemptively changed to using my own ChannelSegment type like so:

struct Channel {
    raw_name: String,
    hashed_name: Vec<ChannelSegment>
}

enum ChannelSegment {
    Wild,
    Hash(u64)
}

Next, I set about parsing the input. I knew that hard coding strings in tests meant that I wanted to have a from_str variant. People will often have hard coded channel names but there will also be generated ones, and for that allowing a from_string is nice. I also knew I was going to turn it into a String anyway to assign to raw_name. So I did the following to enable both:

pub fn from_str(input: &str) -> Result<Channel, String> {
    Channel::from_string(input.to_owned())
}

pub fn from_string(input: String) -> Result<Channel, String> {
    // parsing code goes here
}

Parsing was a pretty simple matter. Using String.split(char) to get an iterator returning each segment. Then relying on Rust’s pattern matching for cases I specifically cared about matching, like empty string (which I made into an error to prevent mistakes) and "*" which is the wild card character. Building up the hashed name, then returning it.

let mut hashed_name = Vec::new();
for chunk in input.split('.') {
    match chunk {
        "" => {
            return Err("empty entry is invalid".to_owned());
        }
        "*" => {
            hashed_name.push(ChannelSegment::Wild);
        }
        _ => {
            hashed_name.push(ChannelSegment::Hash(calculate_hash(&chunk)));
        }
    }
}
Ok(Channel{
    raw_name: input,
    hashed_name: hashed_name
})

One could easily see using a LRU Cache (Least Recently Used cache that removes the oldest entries when it gets too full) to skip parsing the channel name for the most commonly used channels, but I’m not doing that yet until this proves to be a part that is slowing me down.

To compare between two Channels I added a .matches(&Channel) method. I decided against implementing PartialEq since I wouldn’t be testing for exact match, but instead match when considering wild cards, and most developers expect a more exact match when using ==.

pub fn matches(&self, other: &Channel) -> bool {
    if self.hashed_name.len() != other.hashed_name.len() {
        return false;
    }
    for (a, b) in self.hashed_name.iter().zip(&other.hashed_name) {
        if let (&ChannelSegment::Hash(inner_a), &ChannelSegment::Hash(inner_b)) = (a, b) {
            if inner_a != inner_b {
                return false;
            }
        }
    }
    
    true
}

Since my only wild card is for a single whole segment, I know I can immediately return if they have different lengths. Then I zip the two hashed_name together so I can iterate through them at the same time. In other languages one would commonly just create an integer, increment that and use it to index into both of the sequences at the same time, ensuring to not walk past the end ourselves. In Rust we rely on iterators to give us fast access to our sequences with minimal checking, keeping us safe against others (or ourselves) mutating the sequences in dangerous ways while iterating. Using zip we can create an iterator that walks both sequences at the same time keeping our to our land of safety and speed.

As a side note, the calculate_hash function is one I pulled from the docs but changed a little bit to fit my style better:

fn calculate_hash<T: Hash>(t: &T) -> u64 {
    let mut hasher = DefaultHasher::new();
    t.hash(&mut hasher);
    hasher.finish()
}

A feature of Rust I really enjoyed while developing this, was the ability to write tests in the same file as I went. I could quickly write a few examples then make the code pass, focusing on just the higher level parts of the API and not testing the internals so much. Think more “this errors, this does not error” and less “this returns a vector of integers that are ascending…” while you are adding such tests, to make sure you can quickly change out the implementation while the idea keeps working, here are the tests I wrote as I was developing:

#[test]
fn create_basic_channel() {
    Channel::from_str("a.b.c").unwrap();
    Channel::from_str("name").unwrap();
}

#[test]
fn create_with_wildcard() {
    Channel::from_str("a.*.b").unwrap();
    Channel::from_str("*").unwrap();
    Channel::from_str("*.end").unwrap();
    Channel::from_str("start.*").unwrap();
}

#[test]
fn create_invalid() {
    assert!(Channel::from_str(".a.b").is_err());
    assert!(Channel::from_str("c.b.").is_err());
    assert!(Channel::from_str("g.l..b").is_err());
    assert!(Channel::from_str("").is_err());
}

#[test]
fn matches_with_self_exact() {
    let channel = Channel::from_str("a.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("a").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("dabbling.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("abba.bobble").unwrap();
    assert!(channel.matches(&channel));
}

#[test]
fn not_matches_exact() {
    let channel_full = Channel::from_str("s.t.r").unwrap();
    let channel_sub = Channel::from_str("s.t").unwrap();
    assert!(!channel_full.matches(&channel_sub));
    assert!(!channel_sub.matches(&channel_full));
}

#[test]
fn matches_with_self_wild() {
    let channel = Channel::from_str("a.*.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("*").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("*.b.c").unwrap();
    assert!(channel.matches(&channel));
    let channel = Channel::from_str("abba.*").unwrap();
    assert!(channel.matches(&channel));
}

#[test]
fn matches_wild_card_one_side() {
    let wild_channel = Channel::from_str("alpha.beta.*").unwrap();
    let tame_channel = Channel::from_str("alpha.beta.charlie").unwrap();
    assert!(wild_channel.matches(&tame_channel));
    assert!(tame_channel.matches(&wild_channel));
}

Mostly mundane stuff, values all quickly typed in to ensure the general concept works, and common edge cases in parsing (start, end, middle) are covered. But note this isn’t combinatorially testing this, there isn’t unicode or other trouble points, those sorts of tests can come with time. Being able to add in a new test quickly when I noticed an edge case is easily one of my favorite features of Rust.

I’ll be open sourcing my work in progress soon, but for now it mostly lives in my notebook as some scribbles, and a smattering of mostly disconnected code on my laptop. If you have feedback on this blog post, how I’ve setup channels in my message broker, or generally about my rust code I’d love to hear it in comments below. Before the comments roll in though, I know using Strings for errors is not great, I just don’t have an error system setup in my project yet.

One thought on “Message Broker: Channel Naming

Leave a comment