Solving distributed systems challenges with Rust

Catalin December 23, 2024 #Rust

Intro

With the recent AI and ML boom, there's a lot of uncertainty around what technologies and paradigms will be most relevant to the market. One thing is clear though, the scale of computing tasks that we're dealing with is constantly increasing. It will be more common to have computational tasks that can't operate on a single machine. In this context, I thought it would be good to try out an interesting challenge with great educational potential: Fly.io's distributed systems challenge: Gossip Glomers.

The challenge centers around the concept of gossip protocols, a paradigm for replicating data across a distributed system. It involves using Maelstrom, a workbench for writing your own implementations of distributed systems. Maelstrom relies on Jepsen - a testing library to check your implementations. Keep in mind, these frameworks are for building and testing educational versions of distributed systems, and are not tools that you would use in a production system.

Setup

Before explaining gossip protocols, let's cover what we need to get going.

Maelstrom is the framework in charge of running or distributed application in the form of a cluster of nodes. We will package our application as a binary and Maelstrom will be in charge of running instances of our binaries in the cluster. It will also emulate a network between the nodes. The nodes communicate with each other via JSON Messages passed through STDIN and STDOUT. A message on STDOUT will have a destination field. Maelstrom will take this output and deliver it to the destination nodes’ STDIN. 

Running the application will involve 2 node types - The nodes of the actual distributed system being tested and client nodes that Maelstrom uses to interact with our system. Client nodes will always call a procedure on distributed node of the app, while the app's nodes will communicate with each other to synchronize state. An important point here is that we're only programming a single node, so we have no other way to control the flow of information, other than the protocol defined for the messages a node handles.

The first challenge is a simple echo exercise that's designed just to get you set up with everything you need. We have to create a node that responds to a simple echo request. At this point, we're not actually running a distributed system. For the test, Maelstrom will instantiate a single app node and then also create a client node that will make the echo call to our application.

Language and runtime choice

The documentation gives examples in the Go language. I chose to do it in Rust to make it more interesting, while still a good fit for the domain of distributed systems.

If you're starting out and just want to learn distributed systems concepts, I recommend sticking with Go. The library Maelstrom provides is very easy to use and the examples are also very helpful. The role of the library is to provide an easy way to read and send messages without having to worry about JSON, serialization and deserialization. There's also a Rust variant of the library from Maelstrom but I didn't want to use it because it's using async Rust. It doesn't really help with performance in this case and just adds unnecessary complexity. 

Now, we have two options. We can either write our own library to interact with Maelstrom or we can look for a crate. John Gjengset had a stream a while back where he tackled part of this challenge (linked in the description) and he decided to create his own helper library for this. I really recommend the stream if you're interested in this. I wasn't that interested in this part, so I decided to go with a crate. I chose this crate which uses an actor model with synchronous Rust. Later I observed that I would need to extend this crate, but lets not get ahead of ourselves.

First Challenge: Echo

Let's use the crate to solve the first challenge. All we need to do is implement a node that takes an “echo” type message and replies with the same data but with an “echo_ok” type. Fortunately, the create that we're using already handles appending OK to the message type for us when we respond. So, all we need to do is copy the message data to our response.

The crate uses an actor model, so to run the program, we need to implement the Actor trait for our node and then run the actor on a runtime. The runtime is basically a loop on the main thread that reads from stdin and writes to stdout. The runtime knows how to initialize a node and how to create and send out a response.

Now back to the Actor trait. We just need to implement init which will simply store the id of the node in our struct and receive, which will handle an echo request and use handle_echo to respond to it. Handle_echo will just take the contents of the message from the echo field and then write it out to a response. Again, the crate handles the message type for us. And that's all there is to the first challenge.

struct EchoActor {
    node_id: Option<String>,
}

impl Actor for EchoActor {
    fn init(
        &mut self,
        node_id: &str,
        _node_ids: Vec<String>,
    ) -> Result<(), maelstrom_rs::error::Error> {
        self.node_id = Some(String::from(node_id));
        eprintln!("node {} initiated", node_id);
        Ok(())
    }

    fn receive(&mut self, request: &Request) -> Result<Vec<Response>, Error> {
        match request.message_type.as_str() {
            "echo" => self.handle_echo(request),
            _ => unimplemented!(
                "unimplemented message type {}",
                request.message_type.as_str()
            ),
        }
    }
}

impl EchoActor {
    pub(crate) fn handle_echo(&self, request: &Request) -> Result<Vec<Response>, Error> {
        let echo = request.body.get("echo").unwrap().as_str().unwrap();
        let mut body = Map::new();

        body.insert("echo".to_string(), Value::from(String::from(echo)));
        Ok(vec![Response::new_from_request(request, body)])
    }
}

Second challenge: Unique ID

The second challenge revolves around unique ID generation. In this case, we'll be actually running a distributed system, but we don't need to actually use a distributed algorithm to solve it.

Maelstrom will run three of our nodes for a period of time and have client nodes send generate requests to them. In all cases we should respond with a unique ID. If there's any clash between ids the test will fail.

The hard way to do this would be to create a consensus algorithm where the nodes negotiate the IDs between themselves, but this is way too complex at this point of the challenge. We'll go for a simpler option of having each node return a UUID V4 value. There's enough randomness to UUID V4 to guarantee that there won't be clashes for the scenario we're running. In this case, we'll have the receive method of the Actor trait. Just delegate to this handle generate method. This will simply use the UUID crate to generate a value and return a response. 

struct UuidActor {
    node_id: Option<String>,
}

impl Actor for UuidActor {
    fn init(&mut self, node_id: &str, _node_ids: Vec<String>) -> Result<(), Error> {
        self.node_id = Some(String::from(node_id));
        eprintln!("node {} initialized", node_id);
        Ok(())
    }

    fn receive(&mut self, request: &Request) -> Result<Vec<Response>, Error> {
        match request.message_type.as_str() {
            "generate" => self.handle_generate(request),
            _ => unimplemented!("not implemented"),
        }
    }
}

impl UuidActor {
    pub(crate) fn handle_generate(&self, request: &Request) -> Result<Vec<Response>, Error> {
        let mut body = Map::new();
        let uuid = uuid::Uuid::new_v4().to_string();
        eprintln!("generated id: {}", uuid);
        body.insert("id".to_string(), Value::String(uuid));
        Ok(vec![Response::new_from_request(request, body)])
    }
}

Challenge 3a: Single-node broadcast

OK, now we're getting closer to the real challenge. Part three will have us implement a broadcast system. This first part of the challenge will just have us handle a single node, so we don't need to worry about communication with the other nodes in the system for now. This will involve handling 3 special message types that the client nodes send to the app nodes - broadcast, read, and topology.

The broadcast message has a value inside it that needs to be transmitted and stored in the entire distributed system. Since we're only handing a single node at this point, we just need to store the value in the current node:

pub(crate) fn handle_broadcast(&mut self, request: &Request) -> Result<Vec<Response>, Error> {
        self.messages
            .push(request.body.get("message").unwrap().clone());
        let body = Map::new();
        Ok(vec![Response::new_from_request(request, body)])
    }

In this case we just respond with "broadcast_ok" (appending ok to the request type is handled automatically by our Response type). The body of the response will be empty.

Read is a method for getting all the values that were stored in a specific node:

pub(crate) fn handle_read(&self, request: &Request) -> Result<Vec<Response>, Error> {
        let mut body = Map::new();
        body.insert("messages".to_string(), Value::from(self.messages.clone()));
        Ok(vec![Response::new_from_request(request, body)])
    }

Here we're reading the stored list of values in the node and serializing it to a JSON response.

Topology is a message containing the layout of the distributed systems - what are the neighbors for each node. These are the nodes that in a physical system would be closest to the current node. We'll cover this concept more in a bit. Since we're in the single-node scenario, we just need to reply with "topology_ok". We won't even store the topology information for now.

There's also a more general part that the framework crate handles for us: the "init" message. This message will inform the current node of what its id is along with the ids of the other nodes in the system.

fn init(&mut self, node_id: &str, node_ids: Vec<String>) -> Result<(), Error> {
        self.node_id = Some(node_id.to_string());
        self.node_ids = node_ids;
        eprintln!("Node {} started!", node_id);
        Ok(())
    }

Challenge 3b: Multi-node broadcast

Now that we can handle client request for a single node, let's handle communication for a multi-node setup. We now need to actually handle the topology call. This will provide the current node with information about the layout of the system.

pub(crate) fn handle_topology(&mut self, request: &Request) -> Result<Vec<Response>, Error> {
        let body = Map::new();
        let topology = match request.body.get("topology") {
            Some(Value::Object(t)) => t,
            _ => unreachable!(),
        };

        self.neighbours = match topology.get(self.node_id.as_ref().unwrap()) {
            Some(Value::Array(n)) => n
                .iter()
                .filter_map(|s| s.as_str())
                .map(String::from)
                .collect(),
            _ => return Err(Error::CustomError((1001, String::from("bad topology")))),
        };

        Ok(vec![Response::new_from_request(request, body)])
    }

In this case, we're extracting the corresponding entry for our node, and saving the list of neighbors to the neighbors field on the struct.

For now, we can take a naive approach, where on each broadcast requested from the client nodes the current app node will re-trigger (or replicate) the broadcast to the neighboring app nodes, and this will propagate through the network in a cascading fashion.

pub(crate) fn handle_broadcast(&mut self, request: &Request) -> Result<Vec<Response>, Error> {
        let mut responses = vec![];

        let value = match request.body.get("message") {
            Some(value) => value,
            _ => unreachable!(),
        };

        if !self.messages.contains(value) {
            self.messages.push(value.clone());

            for neighbor in &self.neighbours {
                let mut body = Map::new();
                body.insert(String::from("message"), value.clone());
                responses.push(Response {
                    destination: neighbor.to_string(),
                    message_type: String::from("broadcast"),
                    message_id: None,
                    in_reply_to: None,
                    body
                });
            }
        }
    
        if request.message_id.is_some() {
            responses.push(Response::new_from_request(request, Default::default()))
        }
        Ok(responses)
    }

Challenge 3c: Fault-tolerant communication

This new step of the challenge will introduce network partitions in the test run - everything is exactly the same otherwise. This means that communication will fail between some nodes. So, our previous strategy of a cascading broadcast started from the client will no longer be reliable, as a dropped message will stop the broadcast from propagating in some areas of the network.

To get around this, we need to use a gossip protocol.

Gossip protocols

Up until now, we've been relying on the requests from the clients as a trigger, but this only works in an ideal case, where there are no network issues. We can't rely on external triggers in order to propagate the data through the network. Some transmissions will fail, and we need a way to ensure consistency no matter how frequent or infrequent the client calls are. We will use a separate mechanism that periodically communicates with other nodes in order to sync state.

This is our gossip protocol. At specific intervals, each node will pick another node at random and gossip with it. Gossiping will share the current node's state with the other node. For this exercise, we'll keep propagating the broadcast message to neighbors, but in parallel, we'll also have the gossip thread running, gossiping between neighbors

For these exercises I decided to use a push model for gossip, as it requires less communication. This means that a node will send its state to the other node, and if the send is successful, the receiver node will merge its state with that of the sender. If we were using a pull model, nodes would ask for updates from other nodes and receive the info when the request gets a response.

Framework refactor

How do we implement this?

Since we're using an actor model with synchronous Rust and the library doesn't let us run multiple actors on the same node, we're going to either have to switch to something else, or extend the library. I decided on the latter.

The library uses a setup where a single thread loops through the text received on STDIN, and responds accordingly on STDOUT. I needed to have at least another thread running in parallel that would handle the periodic gossip. Since Rust is very restrictive about what data you can send between threads, we don't have a good way of passing the node data to the gossip thread. Inspired by Jon Gjengset's stream on the topic, I decided to use an event system together with a mpsc channel.

The library runtime will no longer be looping over the raw request text coming from STDIN, but instead will be looping on the events coming from the channel.

Initial loop

We will have a new reader thread spawned at the start of the runtime which will read request text from STDIN and parse this into request events. These will be sent to the receiver in the main runtime thread. There will also be another gossip thread that gets spawned during the initialization of a node. This thread will regularly send gossip events trough the channel. The sender in this thread is a clone of the sender in the reader thread.

Refactored loop

Thread initialization:

    fn init(&mut self, node_id: &str, node_ids: Vec<String>) -> Result<(), Error> {
        self.node_id = Some(node_id.to_string());
        self.node_ids = node_ids;
        let sender = self.sender.clone();

        thread::spawn(move || loop {
            thread::sleep(Duration::from_millis(100));
            sender
                .as_ref()
                .expect("no sender")
                .send(Event::Trigger)
                .expect("unable to send trigger");
        });
        eprintln!("Node {} started!", node_id);
        Ok(())
    }

The gossip code:

fn gossip(&mut self) -> Result<Vec<Response>, Error> {
        let mut responses = vec![];
        let len = self.neighbours.len();
        let mut rng = thread_rng();
        let neigh_index = rng.gen_range(0..len);
        let dest = self.neighbours.get(neigh_index).unwrap();

        let mut body = Map::new();
        body.insert(
            "messages".to_string(),
            Value::from(
                self.messages
                    .iter()
                    .map(|val| Value::from(*val))
                    .collect::<Vec<_>>(),
            ),
        );

With these changes implemented, we successfully pass the fault-tolerant communication section. Link to framework refactor commit

Challenge 3d: Efficient broadcast

In this challenge, we have to reach a certain message-per-operation and latency threshold. In other words, we have to make sure we're not sending to many unnecessary messages, and that the system state syncs up in a good amount of time.

The targets are:

When running the test against the previous solution, I saw that we're using a couple hundreds of messages-per-op, when we should be below 30. I figured out the the cascading broadcast is the main culprit, so I decide to remove it.

    pub(crate) fn handle_broadcast(&mut self, request: &Request) -> Result<Vec<Response>, Error> {
        let mut responses = vec![];

        let value = match request.body.get("message") {
            Some(value) => value.as_i64().expect("not a correct value"),
            _ => unreachable!(),
        };

        if !self.messages.contains(&value) {
            self.messages.replace(value);
        }

        if request.message_id.is_some() {
            responses.push(Response::new_from_request(request, Default::default()))
        }
        Ok(responses)
    }

Now, when a broadcast request is received by the node, it just adds the broadcasted value to its state, without sending broadcast signals to its neighbors. The gossip between nodes will propagate the data on its own. All we have to do now is find a good frequency for the gossip signal.

The latency still wasn't good enough, so I updated the gossip protocol to gossip with 2 other nodes instead of - one neighbor, and one other random node from the network (doesn't have to be a neighbor). Previously we were only gossiping to one other node.

    fn gossip(&mut self) -> Result<Vec<Response>, Error> {
        let mut responses = vec![];
        let len = self.neighbours.len();
        let mut rng = thread_rng();
        let neigh_index = rng.gen_range(0..len);
        let node_index = rng.gen_range(0..self.node_ids.len());
        let neigh_dest = self.neighbours.get(neigh_index).unwrap();
        let node_dest = format!("n{}", node_index);
        self.send_to_node(&neigh_dest.clone(), &mut responses);
        self.send_to_node(&node_dest, &mut responses);
        Ok(responses)
    }

The number of nodes that we gossip to is called "fanout"

With a timer of 50 ms for the gossip trigger we get:

We also need to make sure that our solution works with the --nemesis-partion option, which it does.

Great! On to the last part of this challenge

Challenge 3e: Efficient broadcast II

In the last section of this challenge, we need to optimize the solution to satisfy these targets:

Oh, and the previous solution already does. The point of this challenge is to test and arrive at a compromise between latency and messages per operation. The state can converge faster, if the system communicates more, but this of course, impacts bandwidth usage.

In any case, we already have a solution that satisfies both parts of the challenge, so we're good.

Caveat

An important caveat to mention is that Maelstrom will not handle a large number of nodes, so our test cases will be on a very small scale. This means that you don't actually need to implement a gossip protocol to solve the challenges.

Gossip protocols are normally probabilistic - in the sense that there's a percentage chance for the node states to be consistent, and the distributed system will converge towards consistency over a length of time. We gossip the state to randomly selected nodes, and trust that the state will eventually converge in an acceptable amount of time.

We can solve the challenges in a deterministic way where each nodes checks for an acknowledge response, keeps track of what other nodes successfully received its message and keeps retrying if a failure was encountered.

This can be done by spawning threads or co-routines for each message, and having each thread retry sending the messages that failed to go through. This works for the small number of nodes in the Maelstrom test, and the small amount of information that we're sending, but in a real world scenario with thousands of nodes, and more complex messages, this will put a lot of strain on the application when we encounter network issues.

In the case of long interruptions, nodes can end up out of resources, due to having to remember all the failed messages that they need to resend. Also, since we're not disseminating the state in any other way, we have to be sure that the retry mechanism will not fail, as this will lead to nodes that will never sync their state.

I opted to go for a fire-and-forget approach, were nodes regularly push their state to other nodes. The distributed system will converge a bit slower to a synchronized state, but the nodes will have less work to do, and the communication protocol will be simpler. This will sadly use more bandwidth but it will be more resilient to network disruptions.

Thanks for reading! See you in part 2 where we'll use Go for the remaining challenges.

Link to my repo with the code for the first challenges