Implementing and testing a gossip protocol with Rust
Catalin December 23, 2024 #RustIntro
With the recent AI and ML boom, there's a lot of uncertainty around what technologies and paradigms will be most relevant to the market. One thing is clear though, the scale of computing tasks that we're dealing with is constantly increasing. It will be more common to have computational tasks that span multiple machines. In this context, I thought it would be good to try out an interesting challenge with great educational potential: Fly.io's distributed systems challenge: Gossip Glomers.
The challenge centers around the concept of gossip protocols, a paradigm for replicating data across a distributed system. It involves using Maelstrom, a workbench for writing your own implementations of distributed systems. Maelstrom relies on Jepsen - a testing library to check your implementations. Keep in mind, these frameworks are for building and testing educational versions of distributed systems, and are not tools that you would use in a production system.
Setup
Before explaining gossip protocols, let's cover what we need to get going.
Maelstrom is the framework in charge of running our distributed application as a cluster of nodes. We will package our application as a binary and Maelstrom will run instances of our binaries in the cluster while also emulating a network between the nodes. Network communication is handled via JSON messages passed through STDIN and STDOUT. A node will send messages on STDOUT containing a destination field - Maelstrom will take this output and deliver it to the destination nodes’ STDIN.
Running the application will involve two node types - The nodes of the actual distributed system being tested and client nodes that Maelstrom uses to interact with our system. Client nodes will only call available methods on the distributed nodes of our app, while the app's nodes will communicate between each other to synchronize state. An important point here is that we're only programming the logic of a single node, so we have no other way to control the flow of information, other than the protocol defined for the messages that a particular node handles. There is no dedicated coordination layer.
The first challenge is a simple echo exercise that's designed just to get you set up with everything needed. We have to create a node that responds to a simple echo request. At this point, we're not actually running a distributed system. For the test, Maelstrom will instantiate a single app node and then also create a client node that will make the echo call to our application.
Language and runtime choice
The documentation gives examples in the Go language. I chose to do it in Rust to make it more interesting. Both languages allow us to easily export light-weight binaries to be used by the simulation framework. If you're starting out and just want to learn distributed systems concepts, I recommend sticking with Go, as the Rust borrow checker limits the way you can use threads.
Maelstrom provides a library that helps with reading and sending messages without having to worry about JSON, serialization and deserialization. There's also a Rust variant of the library from Maelstrom but I didn't want to use it because it's using async Rust. It doesn't really help with performance in this case and just adds unnecessary complexity.
In our case, we have two options - We can either write our own synchronous library to interact with Maelstrom or we can look for an existing one. I opted for the latter (I wanted to avoid spending too much time on serialization logic instead of the protocol itself). I chose this crate which uses an actor model with synchronous Rust. Later, I observed that I would need to extend this crate, but let's not get ahead of ourselves.
First Challenge: Echo
Let's use the crate (library) to solve the first challenge. All we need to do is implement a node that takes an “echo” type message and replies with the same data but with an “echo_ok” type. Fortunately, the crate that we're using already handles appending "ok" automatically to the message type for us when we respond. So, all we need to do is copy the message data to our response.
The crate uses an actor model, so to run the program, we need to implement the Actor trait for our node and then run the actor on a runtime. The runtime is basically a loop on the main thread that reads from stdin and writes to stdout. The runtime knows how to initialize a node and how to create and then send out a response.
Now, back to the Actor trait. We just need to implement init which will simply store the id of the node in our struct and receive, which will handle an echo request and use handle_echo to respond to it. Handle_echo will just take the contents of the message from the echo field and then write it out to a response. Again, the crate handles the message type for us. And that's all there is to the first challenge.
Second challenge: Unique ID
The second challenge revolves around unique ID generation. In this case, we'll be actually running a distributed system, but we don't need to actually use a distributed algorithm to solve it.
Maelstrom will run three of our nodes for a period of time and have client nodes send generate requests to them. In all cases we should respond with a unique ID. If there's any clash between ids the test will fail.
The hard way to do this would be to create a consensus algorithm where the nodes negotiate the IDs between themselves, but this is way too complex at this point in the challenge. We'll go for a simpler option of having each node return a UUID V4 value. There's enough randomness to UUID V4 to guarantee that there won't be clashes for the scenario we're running. In this case, we'll have the receive method of the Actor trait - we can just delegate to this handle_generate method. This will simply use the UUID crate to generate a value and return a response.
Challenge 3a: Single-node broadcast
Now we're getting closer to the real challenge. Part three will have us implement a broadcast system. This first part of the challenge (3a) will just have us handle a single node, so we don't need to worry about communication with the other nodes in the system for now. This will involve handling three special message types that the client nodes send to the app nodes - broadcast, read, and topology.
The broadcast message has a value inside it that needs to be transmitted and stored in the entire distributed system. Since we're only handing a single node at this point, we just need to store the value in the current node:
pub
In this case, we just respond with "broadcast_ok" (appending ok to the request type is handled automatically by our Response type). The body of the response will be empty.
Read is a method for getting all the values that were stored in a specific node:
pub
Here we're reading the stored list of values in the node and serializing it to a JSON response.
Topology is a message containing the layout of the distributed systems - a list of nodes, with links to their neighbors. These are the nodes that in a physical system would be closest to the current node. We'll cover this concept more in a bit. Since we're in the single-node scenario, we just need to reply with "topology_ok". We won't even store the topology information for now.
There's also a more general part that the framework crate handles for us: the init message. This message will inform the current node of what its id is along with the ids of the other nodes in the system.
Challenge 3b: Multi-node broadcast
Now that we can handle client requests for a single node, let's handle communication for a multi-node setup. We now need to actually handle the topology call. This will provide the current node with information about the layout of the system.
pub
In this case, we're extracting the corresponding entry for our node (because the topology message contains the map of the entire network, not just for this particular node), and saving the list of neighbors to the neighbors field on the struct.
For now, we can take a naive approach, where on each broadcast requested from the client nodes, the current app node will re-trigger (or replicate) the broadcast to the neighboring app nodes, and this will propagate through the network in a cascading fashion.
pub
Challenge 3c: Fault-tolerant communication
This new step of the challenge will introduce network partitions in the test run - everything is exactly the same otherwise. This means that communication will fail between some nodes. So, our previous strategy of a cascading broadcast started from the client will no longer be reliable, as a dropped message will stop the broadcast from propagating in some areas of the network.
To get around this, we need to use a gossip protocol.
Gossip protocols
Up until now, we've been relying on the requests from the clients as a trigger, but this only works in an ideal case, where there are no network issues. We can't rely on external triggers in order to propagate the data through the network. Some transmissions will fail, and we need a way to ensure consistency no matter how frequent or infrequent the client calls are. We will use a separate mechanism that periodically communicates with other nodes in order to sync state.
This is our gossip protocol - At specific intervals, each node will pick another node at random and gossip with it. Gossiping will share the current node's state with the other node. For this exercise, we'll keep propagating the broadcast message to neighbors, but in parallel, we'll also have the gossip thread running, gossiping between neighbors.
For these exercises I decided to use a push model for gossip, as it requires less communication. This means that a node will send its state to the other node, and if the send is successful, the receiver node will merge its state with that of the sender. If we were using a pull model, nodes would ask for updates from other nodes and receive the info when the request gets a response.
Framework refactor
How do we implement this?
Since we're using an actor model with synchronous Rust and the library doesn't let us run multiple actors on the same node, we're going to either have to switch to something else, or extend the library - I decided on the latter.
The library uses a setup where a single thread loops through the text received on STDIN, and responds accordingly on STDOUT. I needed to have at least another thread running in parallel that would handle the periodic gossip. Since Rust is very restrictive about what data you can send between threads, we don't have a good way of passing the node data to the gossip thread. Inspired by Jon Gjengset's stream on the topic, I decided to use an event system together with a mpsc channel.
The library runtime will no longer be looping over the raw request text coming from STDIN, but instead will be looping on the events coming from the channel.
We will have a new reader thread spawned at the start of the runtime which will read request text from STDIN and parse this into request events. These will be sent to the receiver in the main runtime thread. There will also be another gossip thread that gets spawned during the initialization of a node. This thread will regularly send gossip events through the channel. The sender in this thread is a clone of the sender in the reader thread.
Thread initialization:
The gossip code:
With these changes implemented, we successfully pass the fault-tolerant communication section. Link to framework refactor commit
Challenge 3d: Efficient broadcast
In this challenge, we have to reach a certain message-per-operation and latency threshold. In other words, we have to make sure we're not sending to many unnecessary messages, and that the system state syncs up in a good amount of time.
The targets are:
- Messages-per-operation is below
30 - Median latency is below
400ms - Maximum latency is below
600ms
When running the test against the previous solution, I saw that we're using a couple hundreds of messages-per-op, when we should be below 30. I figured out that the cascading broadcast is the main culprit, so I decided to remove it.
pub
Now, when a broadcast request is received by the node, it just adds the broadcast value to its state, without sending broadcast signals to its neighbors. The gossip between nodes will propagate the data on its own. All we have to do now is find a good frequency for the gossip signal.
The latency still wasn't good enough, so I updated the protocol to gossip with two other nodes instead of a single one - a random node from the network and one of its neighbors (as before). This way, messages spread in a larger vicinity, helping with convergence speed and fault tolerance, while not increasing network usage too much.
The number of nodes that we gossip to is called "fanout" - in this case we're opting for a fanout of: 2
With a timer of 50 ms for the gossip trigger we get:
- Messages-per-operation:
15.31 - Median latency:
377ms - Maximum latency:
575ms
We also need to make sure that our solution works with the --nemesis-partion option, which it does.
Great! On to the last part of this challenge
Challenge 3e: Efficient broadcast II
In the last section of this challenge, we need to optimize the solution to satisfy these targets:
- Messages-per-operation is below
20 - Median latency is below
1 second - Maximum latency is below
2 seconds
Oh, and the previous solution is already performant enough. The point of this challenge is to test and arrive at a compromise between latency and messages per operation. The state can converge faster, if the system communicates more, but this of course, impacts bandwidth usage.
In any case, we already have a solution that satisfies both parts of the challenge, so we're good.
Caveat
For those who want to try their hand at this, a small caveat: there is a more naive way to handle this, which is to not use gossip protocols at all.
Maelstrom will not handle a large number of nodes, so our test cases will be on a small scale (essentially a simulation for educational purposes). This means that you can tackle it as a deterministic multi-threaded system, avoid gossip protocols altogether and solve the challenges without incurring a huge performance penalty. However, this would strip the challenge of most of its educational value - the whole point of it is to learn about gossip protocols (hence the Gossip Glomers name).
Gossip protocols are probabilistic by design - in the sense that there's a percentage chance for the node states to be fully consistent, and the distributed system will converge towards consistency over a length of time. We gossip the state to randomly selected nodes, and trust that the state will eventually converge in an acceptable amount of time.
We can solve the challenges in a deterministic way where each nodes checks for an acknowledge response, keeps track of what other nodes successfully received its message and keeps retrying if a failure was encountered.
This can be done by spawning threads or co-routines for each message, and having each thread retry sending the messages that failed to go through. This works for the small number of nodes in the Maelstrom test and the small amount of information that we're sending, but in a real world scenario with thousands of nodes and more complex messages, this will put a lot of strain on an application node when we encounter network issues.
In the case of long interruptions, nodes can end up out of resources, due to having to remember all the failed messages that they need to resend. Also, since we're not disseminating the state in any other way, we have to be sure that the retry mechanism will not fail, as this will lead to nodes that will never sync their state.
I opted to go for a fire-and-forget approach, where nodes regularly push their state to other nodes. The distributed system will converge a bit slower to a synchronized state, but the nodes will have less work to do, and the communication protocol will be simpler. This will sadly use a bit more bandwidth when the network is fully healthy but it will be more resilient to network disruptions.
Thanks for reading!