Fly.io's Broadcast Challenges
Apr 09, 2026This is another post about the series of distributed systems challenges by Fly.io. I’ve talked about using Snowflake and Raft consensus to solve Challenge #2: Unique ID Generation, in Implementing Snowflake Unique ID Generation and Generating Unique IDs with Raft Consensus. Now we’re going to discuss a set of Broadcast challenges that start at Challenge #3a: Single-Node Broadcast.
In this set, we are given the task of implementing a gossip algorithm that is fault-tolerant and efficient. We start at a simple single-node implementation, move towards a multi-node one, then implement fault tolerance, and last, try to improve its efficiency. It is really nice how the challenges are set up and how you can build on top of the previous solution.
Single-node
There’s not much to discuss here. The solution is straightforward: receive the message, record it, and send broadcast_ok back. The only thing to be aware of is to make sure there are no duplicates.
code: single-node
Multi-node
Keeping it simple is enough for this challenge. We can simply broadcast any new message to every neighbor we know. The interesting thing about this challenge is the fact that only new messages can be sent. If that check is not done, messages will be in a loop, hopping from node to node, congesting the network. This hints to us at the importance of trying to keep the network congestion low.
This algorithm is enough to pass the challenge, but what happens if we add network partition?
code: multi-node
Fault tolerance
Network partitions are introduced. It means that for a brief period of time, some nodes will not be available. We need some kind of mechanism to make sure messages always make their way through all nodes eventually, after the partition is healed.
Two possible approaches:
-
Add a retry queue for timed-out messages
When you try to send a message to a neighbor and it times out, you add that message to a retry queue. Then a background job that listens to the queue periodically sends those messages. The messages only leave the queue when the neighbor acks. There are multiple flavors of this approach, actually. What is important is the strategy: make sure every new message you receive is acknowledged by your neighbors. If we can make sure every message sent is acknowledged, all nodes will eventually have all messages.
-
Periodic anti-entropy
We don’t have to immediately forward the new messages. An alternative to that: nodes can periodically exchange messages with their neighbors, detect missing entries, and fill the gaps. Because nodes are periodically running this repair mechanism, eventually, everyone catches up.
For this challenge I decided on using the first approach. It is not a very robust retry queue with adequate retry intervals, but it is good enough for us.
retries := make(chan RetryMessage, 100)
// have 5 background jobs periodically retrying messages
for i := 0; i < 5; i++ {
go func() {
for job := range retries {
type req struct {
Type string `json:"type"`
Message int `json:"message"`
}
r := req{Type: "broadcast", Message: job.message}
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
_, err := n.SyncRPC(ctx, job.destination, r)
cancel()
if err != nil {
job := job
time.AfterFunc(RetryInterval, func() {
retries <- job
})
}
}
}()
}
We create 5 goroutines that listen to the retry queue and resend failed messages. If it fails again, it re-enqueues. The issue with this approach is the infinite retry. Having an infinite retry assumes nodes can’t crash and topology is stable, which is not true in real life. If we add a limit, we violate the broadcast guarantee that all nodes eventually see all messages. So, adding anti-entropy to the mix is probably a good idea, and something I definitely would like to explore in the future.
code: fault-tolerant
Efficiency
When we’re building an efficient broadcast algorithm, we’re usually trying to optimize for low latency and low messages per operation.
-
Latency is how long it takes for a message to be seen in all nodes. An algorithm that is fast to converge is desirable.
-
Messages per operation is the average number of network messages generated per broadcast operation. The lower the msgs/op is, the less congested the network is, meaning a lower amount of resources is needed by the node. That is a more scalable solution.
Essentially there are two major things that impact them that need further investigation: topology and batching.
Topology
Let’s look at the following 7-node topology:
Every node is connected to every node. What happens when a new message arrives at n0? It broadcasts it to all other 6 nodes. And when that message arrives at n1, it will broadcast the same message to all other nodes (except n0). But that was unnecessary, because n0 already sent the message to the other nodes. There’s some redundancy going on, generating more messages than required.
In contrast to that, let’s look at the other topology:
When a message arrives at n0, it sends to n1 and n2, then n1 sends to n3 and n4, and n2 sends n5 and n6. No redundancy.
What is happening here is that in the first topology, there are multiple paths between any pair of nodes. So a message will travel through all of those paths. In the second one, there’s only one path between any pair of nodes, meaning there’s no way for a node to ever receive the same message from a different path. The second topology is a graph with no cycles, that is, a tree.
From that we can conclude that a tree is the minimum needed to reach everyone, and adding extra edges will only increase msgs/op. So, we should aim to work with a topology as close as possible to a tree.
Within a tree topology, there are multiple configurations. For example, at one extreme we have the line topology:
An operation at node n0 will hop from node to node until it reaches the last one. It takes quite a while for an operation to travel through the network.
At the other extreme, we have the star topology:
It is natural to see that the latency in the line topology will be higher than in the star topology. And the balanced tree is a middle ground.
The branching factor of the tree also impacts msgs/op. Imagine a network partition in n1. In the line topology, a message from n0 will not reach the other nodes, and it will need to be retried until the partition heals. That does not happen in the star topology. In the star topology, a partition at n1 only blocks n1, n0 still reaches everyone else directly, so fewer retries are needed.
Batching
The second thing to look into is batching. Instead of generating one message per operation, we can batch multiple broadcasts into a single message, decreasing msgs/op.
But you can see that to introduce a batching mechanism, we need to necessarily introduce some latency. The node had to wait a bit for a batch of broadcasts to arrive first. So there’s a trade-off between msgs/op and latency.
To implement batching, I’ve built on top of our fault-tolerant strategy. Now we keep track of all unacked messages per neighbor and periodically send all unacked messages in a single request. That required a new internal maelstrom message, that we called gossip.
Another thing that can be considered is whether we really need an immediate broadcast to the neighbors as we need a new message. By having a background routine, we can simply mark the new incoming messages as unacked and let the background routine deal with it. The decision of having or not having this immediate call also impacts msgs/op and latency.
With these two mechanisms in hand, topology and batching, we can tweak a couple of parameters to figure out reasonable values of latency and msgs/op that satisfy the requirements.
code: fault-tolerant-efficient-1 and fault-tolerant-efficient-2
Benchmark
To understand the trade-offs discussed above and figure out a reasonable topology and batch interval, I’ve run the same test with different batch intervals and topologies. You can see the batch interval used in the INTERVAL column, and the topology is defined by the branching factor of the tree in the BRANCH column.
| Interval (ms) | Branch | msgs/op | p50 | p95 | p99 |
|---|---|---|---|---|---|
| 100 | 1 | 13.31 | 2281 | 3363 | 3646 |
| 100 | 2 | 10.70 | 867 | 1154 | 1214 |
| 100 | 4 | 9.95 | 512 | 704 | 795 |
| 100 | 8 | 9.33 | 416 | 587 | 599 |
| 100 | 16 | 9.28 | 348 | 438 | 457 |
| 200 | 1 | 8.80 | 3046 | 4397 | 4640 |
| 200 | 2 | 7.30 | 1155 | 1466 | 1567 |
| 200 | 4 | 7.02 | 708 | 955 | 981 |
| 200 | 8 | 6.74 | 566 | 749 | 791 |
| 200 | 16 | 6.74 | 414 | 608 | 636 |
| 300 | 1 | 7.26 | 4365 | 6580 | 6775 |
| 300 | 2 | 6.33 | 1667 | 2114 | 2222 |
| 300 | 4 | 5.91 | 958 | 1249 | 1331 |
| 300 | 8 | 5.80 | 723 | 981 | 1056 |
| 300 | 16 | 5.89 | 484 | 711 | 780 |
| 500 | 1 | 6.21 | 6661 | 10857 | 11287 |
| 500 | 2 | 5.47 | 2733 | 3467 | 3584 |
| 500 | 4 | 5.30 | 1648 | 2029 | 2143 |
| 500 | 8 | 5.24 | 1170 | 1532 | 1655 |
| 500 | 16 | 5.24 | 794 | 1112 | 1177 |
The test used was:
./maelstrom test -w broadcast \
--bin ~/go/bin/fault-tolerant-efficient-2 \
--node-count 25 \
--time-limit 20 \
--rate 100 \
--latency 100
Nice to see that our understanding about trade-offs checks out: higher branching implies lower latency, and, at the same branching, higher interval implies lower msgs/op but higher latency. Also, we can see diminishing returns on branching.
Back to the challenge. Given the requirements of:
- Messages-per-operation is below
20 - Median latency is below
1 second - Maximum latency is below
2 seconds
We see that 11 out of 20 configurations pass the test. Awesome. We would not be able to achieve that without a tree topology and a batching strategy.
And that’s all I got for this one.