
Distributed systems pose all kinds of challenges. And we’ve built them in the
web age, when the tech of the wider Internet is what we use in microcosm to
build the underpinnings of our own systems. Our industry has done somersaults
to try to make these systems work well with synchronous calls built on top of
HTTP. This is working at scale for a number of companies and that’s just fine.
But if you were to start from scratch is that what you would build? A few years
ago, we had that opportunity and we decided, no, that’s not what we would
build. Instead, we built a distributed, asynchronous system centered around an
event bus, and it has been one of the best decisions we’ve made. Most of the
things that make a service architecture painful are either completely
alleviated or mostly so, with only a few tradeoffs. Here’s what we’ve built at
Community, and some of what we have learned.
Beyond The Monolith
The advice du jour is that you should start with a monolith because it allows
you to iterate quickly, change big things more easily, and avoid any serious
consistency issues—assuming you back your service with an ACID-compliant DB.
That’s good advice that I also give to people, and we did that. That scales
pretty well, but becomes an issue as you grow the team and need to iterate on
independent work streams.
Queuing
But what’s the next step? For us, the next step was to stop building things in
the monolith and to build some async services that could work semi-autonomously
alongside it. Our platform is a messaging platform so this was already a
reasonably good fit: messages aren’t delivered synchronously and some parts of
the workflow can operate like a pipeline.
We needed at least a queuing system to do that, something that would buffer
calls between services and which would guarantee reliable delivery. We are
primarily an Elixir shop so we picked RabbitMQ because of the good driver
support on our stack: RabbitMQ is written in Erlang and Elixir runs on the same
VM and can leverage any Erlang libraries. This has turned out to be a really
good choice for the long term. RabbitMQ is super reliable, can have very good
throughput, is available in various hosted forms, and has a lot of different
topologies and functionality that make it a Swiss army knife of async systems.
We paid a 3rd party vendor to host it and began building on top of it.
Initially we used a very common pattern and just queued work for other
services, using JSON payloads. This was great for passing things between
services where fire-and-forget was adequate. Being able to rely on delivery
once RabbitMQ accepted the message from the publisher means you don’t deal with
retries on the sender side, you almost never lose messages, and the consumer of
the messages can determine how it wants retries to be handled. Deploys never
interrupt messaging. A service can treat each message as a transaction and only
ack the message once work has been completed successfully. All good stuff.
But the core data and associated models was/were still locked up inside the
monolith. And we needed to access that data from other services fairly often.
The first pass was to just look at the messages passed from the monolith to
other services and do a local DB call to enrich them with required fields
before passing them on. That works for a few cases just fine.
Other Paradigms
We built other kinds of async messaging between services on top of those same
ad hoc JSON messages, knowing full well that wasn’t what we wanted long term,
but learning about the interaction patterns, and getting our product into the
market.
But, eventually you litter the code with DB enrichment calls and other
complexity. And with no fixed schemas, the JSON messaging rapidly outscales
your ability to reason about it. Instead, wouldn’t it be nice if a new service
could also get a copy of those same messages? And wouldn’t it be really great
if the people writing that new service didn’t have to talk to the people
emitting those messages to make sure the schema wouldn’t change on them? And
what if you could get a history of all the major things that ever happened in
the system in the form of messages? And maybe a new service could have access
to that history to bootstrap it?
Events!
Yes, to all of the above. That’s what a truly event-based system offers. And
that’s what we transformed this async system into.
Building the async system in the first place made this much easier and I want
to shout out to Tomas Koci, Jeffrey
Matthias, and Joe
Merriweather-Webb who designed and
built most of that and who have made many contributions to the events system as
well. Once we were in the market with our product, we all agreed it was time
for the next phase.
In mid-2019, Andrea Leopardi, Roland
Tritsch, and I met up in Dublin and plotted
the course for the next year or so. The plans from that meeting turned into the
structure of the events system we have now. A lot of people have contributed
since! This has been a big team effort from the folks in Community
Engineering. I have attempted to name
names here wherever possible, but there are a million contributions that have
been important.
Since building the bus, we’ve grown to about 146 services running in
production, of which 106 are core business services (61 Elixir, 20 Go, 7
Python, remainder 3rd party or other tech). Most core business logic lives in
Elixir with Go in a supporting role, and Python focused on data science. This
is nearly 2 services per engineer in the company. On most stacks that would be
an enormous burden. On our system, as described below, it’s fairly painless.
We still have the monolith, but it’s a lot smaller now. It gets smaller all the
time, thanks to the drive of Jeffrey Matthias, Geoff
Smith, Lee
Marlow, and Joe
Lepper among others.
So, back to the story…

The Public Event Bus
The next step was the move to a system with a public event bus. We made some
pretty good decisions but still live with a few small mistakes. So I’ll
describe a simpler version of what we have now and gloss over the iterations on
getting to this points, and I’ll call out mistakes later.
If you aren’t familiar with the idea of an event bus, it boils down to this:
any service can listen on the bus for events that happen anywhere in the
system, and then do whatever they need to do based on the event that happened
in the system. Services that do important things publish those events in a
defined schema so that anyone can use them as they need. You archive all of
those events and make them arbitrarily queryable and replayable. To be clear
here about what we mean when we say “events”: Events are something that
happened, Commands are a request for something to happen. We started with
only events, and that was a good choice. That approach allowed us to more
carefully iterate on the events side so that we didn’t make the same mistakes
in two places at once.
For other interactions between services we built an async RPC functionality
over RabbitMQ that essentially provides a very lightweight Protobuf wrapper
around arbitrary data. This enabled us to largely punt on Commands
implementation while we got good at events. It allowed us to identify sensible
practices in a private way between services before making those commands
public system-wide.
So let’s talk about the event bus and event store since that’s the core of the
system.
System Overview
Our public bus is RabbitMQ, with a common top-level exchange. We separated it
into two clusters: one built around text messages (e.g. SMS) that we process
and the main one around events that are the real core of the system. This
allowed us to have very low latency on core data events while allowing more
latency on the high throughput messaging cluster. You could run it on on a
single cluster, but we have enough messaging throughput that separation was a
good choice. It also divides the fault domain along lines that make sense for
our system.
We publish events to the main bus (a topic exchange), using event type as the
routing key. Services that want to subscribe to them do so. Those services may
then event-source a projection into a DB of their own based on those events, or
simply take action as required. DB tech is whatever the service requires. We
have Postgres, Vitess (MySQL), Redis, Cassandra, ElasticSearch, etc. For
services that do event source, we have standardized a set of rules about how
they must interact with the bus, how they handle out-of-order events,
duplicates, etc. We have a
LADR
that defines how this must work. The technical side of this is built into a
shared Elixir library that most of the services use. This wraps the excellent
Broadway library, the Broadway AMQP
integration, and our generated
Protobuf library containing the schemas. It provides things like validation and
sane RabbitMQ bindings, and publishes an events manifest we can use to build
maps of who produces and consumes which events. Dan
Selans worked for us back then, and built a
frontend that makes those manifests human consumable, and draws an events map.
This is very useful!
Because some of the services in our system are written in Go, we have built
some of the same logic in that language and leverage the
Benthos project (which we sponsor) for the work
pipelining, similar to how we use Broadway in Elixir. Benthos is an
unbelievable jack-of-all-trades that you should take a look at if you don’t
know it. We additionally build all the Protobuf in Python for use in data
science activities, but don’t have a full events library implementation….
yet.
We archive everything that is published on the bus and put it into the event
store. This then enables replays.
Sourcing
When we start a new service that needs to source events, we bootstrap it from a
replay of historical events and it writes them to its DB using the same code it
will use in production. Because our services must handle out of order events,
we can generally replay any part of the history necessary at any future point.
Simple rules about idempotency and a little bit of state keeping solve this out
of order handling for the 95% case. The remaining 5% tend to be one-off
solutions for each use case.
Entropy
Services now have their own projection(s) of what things look like from the
events they have consumed. Because this is so distributed, even with RabbitMQ
ensuring deliverability, there are still many ways the system could introduce
drift.
Underpinning all of the other mechanisms is another simple rule: we have worked
hard to guarantee that one event type is published from one service. This
vastly reduces the complexity of working with them.
We handle anti-entropy by several means:
-
We have a fallback for RabbitMQ. If it’s not available for publishing,
services will fall back to Amazon SQS. This is an extremely rare occurrence,
but ensures we don’t lose events. We can then play events from SQS into
RabbitMQ when it comes back up. Thus, services don’t subscribe to SQS.
-
Services must handle all events that are of the type they subscribe to.
This means any failure to do so is an error that must be handled. This is
pretty rare in production because it generally gets caught pretty early on,
and Protobuf and our events library help guarantee correctness.
-
We run daily replays of all of the previous day’s core events, on the
production bus. We don’t replay message events, but all of the core system
events are replayed. This means that a service has a maximum window of 24
hours to have missed an event before it sees it again. We’ll be adding 1 hour
replays or similar in the next few quarters.
-
We run Spark jobs that compare drift on some event-sourced DBs against the
event store. This allows us to track how widespread any issues may be. We have
dashboards that let us see what this looks like. It’s extremely small drift,
and is generally insignificant. Recents runs show that average drift is
0.005%, which is already very good, but is better than it looks because it
also reflects changes that happen while the run is in flight. For all
practical purposes this then simply reflects eventual consistency and in
absolute numbers is basically zero.
Consistency
We handle consistency by assuming eventual consistency whenever possible.
Where it’s not possible, we do allow querying of a remote service to get data.
Events related to that data should only be published by one service. So it’s
possible to get an authoritative source when strictly necessary. This is done
over a synchronous RPC implementation on top of RabbitMQ with Protobuf thanks
to Tom Patterer and Andrea
Leopardi. Andrea wrote about this
implementation.
Many of the frontend calls go through the monolith. This then provides some
consistency for the models it contains. We sometimes use the monolith as a
BFF where necessary, to
provide some consistency. For all other cases we query directly against the
services themselves via an API gateway.
Commands
Following on the two year success of the event bus, we introduced commands,
which work quite similarly but which express a different idea. Commands are a
request for something in the system to happen (intent). Services may or may not
take action as a result. If they do, they may or may not generate events.
Commands are also public, archived, and replayable. This is so far working
quite well, but runs alongside other interaction methods described below. We’ll
phase out much of the async RPC in favor of Commands now that we have them.
Current State
We continue to iterate and improve on this system. Some more improvements and
efficiencies are slated for this year already. We’ll write more about those
when we get there. If you are a person who wants MOAR detail, keep reading
below. I’ll attempt to describe some of this in more technical detail.

More Details
That gives a pretty good overview. But if you want to know more, read on.
Routing, Exchanges, Queues
We don’t leverage headers for much routing, but there are a few places in the
system that add them and use them (e.g. for a consitent-hash-exchange). But
most routing is by event type and that’s our key. We learned early on that each
service should attach its own exchange to the top-level exchange and hang its
queue off of that. Exchanges are really cheap for RabbitMQ and this allows
services to use a different exchange type if necessary—and it also prevents
rapid connects/disconnects from impacting the main bus. This happened when an
service once went nuts and was bound to the top level. That hasn’t happened
since.
Most services will bind a single durable queue to their exchange to make sure
that they receive events even when down, and the work is spread across the
instances of the service when up. Some services use ephemeral queues that go
away when they are not running. Others use different topologies. RabbitMQ is
very flexible here and has been the bedrock of our implementation.
Archiving
Once things are on the bus, we archive every single one. We have a service that
subscribes to every event type on the two buses, aggregates them in memory into
file blobs, then uploads them to Amazon S3 (the “event store”). This archiver
only acks them from Rabbit once they are uploaded, so we don’t lose events.
Those files on S3 are in a run-length-encoded raw Protobuf format, straight
from the wire, grouped into directories by timestamp and event class. Filenames
are generated in a consistent way, and include some hashing of the contents so
that we prevent overwrites.
Like the majority of the services operating on the events store and bus, this
service is written in Elixir and leverages all of the badassery of Erlang’s OTP
and supervision trees to be always working. It doesn’t break.
Event Store Iteration
The main event store is the archive of raw Protobuf that was sent on the wire,
encoded with a run length encoding into blob files that are posted to S3, as
mentioned. After the first year, Tom Patterer and
Aki Colovic here in Europe built Apache Spark
jobs and a Go service to transform those into Parquet files on S3 more or less
as they get written—so there is very little latency between the two. We can
then leverage AWS Athena (Presto) for ad hoc queries, monitoring of events, and
understanding what is there.
And, the ability to query all of history for debugging is an amazing thing that
I don’t think I’d ever want to live without again. It takes a lot of pressure
off of distributed tracing, although we do have that for complex flows on Otel
thanks to Jaden Grossman, Tatsuro
Alpert, and Bradley
Smith.
Replays
We built a replayer service that initially could only do replays by event type
and time range, played straight from S3 to a private copy of the bus (“the
replay bus”). Services can hook up to that bus to get the replayed data. We
usually do this with a one-off deploy of the same code. That got us well into
this: it was enough for the first year.
Later on, we built further integration with AWS Athena that allows us to
arbitrarily query events from the event store in Athena from Parquet files on
S3, and also allows for the results of a query to be used in an event replay.
This allows for very targeted bootstrapping of new services, repairing outages,
fixing state when a bug caused a service to behave badly, etc. The ability to
arbitrarily query all of history also helps when looking for any issues in the
event store or your service. Athena is pretty quick, even with several years of
data. Partitioning by event date and type really helps. We actually use our Go
libs from Spark and I wrote about how to do
that.
Snapshots
An additional step we took later (thanks to Moritz
Mack) was to use Spark to build daily snapshots in
the form of Parquet tables of events… that can also then be replayed. This
also speeds up querying and consistency checking by vastly reducing the amount
of queried data. We currently rebuild those snapshots nightly from all of
history so that there is no drift. We will move to incremental snapshotting at
some point, but Spark and S3 are hugely fast, and we have enough individual
files to run in parallel nicely.
Events and Event Schemas
The best decision we made was to use Protobuf for messaging. Protobuf is one
of the only modern encoding systems that had good support across all three of
our core languages. There were two Protobuf libraries in Elixir at the time. We
picked one, then later switched to
elixir-protobuf, to which we are
now major contributors thanks to Andrea
Leopardi, João
Britto, and Eric
Meadows-Jönsson. Using Protobuf means that we can
guarantee compatibility on schemas going forward, have deprecation ability, and
because it is widely supported, we have access to it in all of the toolsets
where we need it. It also converts back and forth to JSON nicely when needed.
Protobuf schemas, unlike Avro, for example, don’t accompany the data payload.
This means that you need to provide them to the software that needs them out of
band. Andrea Leopardi wrote about how we do
that
so I won’t detail it here. We took most of the pain out of this by designing
the schemas in a way that means most services don’t always have to have the latest
schemas. And because Protobuf allows decoding all of a payload that you do know,
it means as long as we’re only adding fields we don’t have any issues.
To do this, we designed a schema where there is a common Event base, an
Envelope with a separate set of guaranteed fields for all events (e.g. id,
timestamp, and type). This allows systems to process events without always
having the very latest schemas unless they need to access the new event type or
new fields.
Other Communication Methods
Events make this all so much easier. But it’s hard to always use them (or
commands). It’s possible but there are just places where you need something
else. We have some get out of jail free cards. When we model new solutions in
our space we push the following hierarchy:
- Events and Commands first. If it can be done with events/commands, we do
that.
- Async, stateless RPC over a queue. For things like queueing work to
yourself or another service. Private calls that don’t need archiving or,
replays.
- Synchronous RPC, using queues for call-response
- HTTP as a last resort
I believe that in the core system there are only two remaining places that
widely leverage HTTP for things other than serving APIs to frontends.
Things We Learned
You don’t build an entire system without running into issues. Here are some
things we learned. They are not strictly ordered but somewhat in order of
importance.
-
Putting events on S3 was a really good decision. It has all the S3
goodnesss, and also unlocks all of the “Big Data” tools like Spark, Athena,
etc.
-
People will find every way possible to publish events that aren’t right
in one way or another. This is no surprise. But really, if you are deciding
where to allocate time, it’s worth all the effort you can muster to put
validation into the publishing side.
-
Being able to throw away your datastore and start over at any time is
powerful. For example: we run ElasticSearch but we never re-index. We just
throw away the cluster and make a new one from a replay thanks to Alec
Rubin, Joe Lepper, and Brian
Jones. If you have to swap from one kind of
datastore to another (e.g. Redis -> Postgres) you can just rehydrate the new
store from a replay using the same event sourcing code you would have to
write anyway. Very little migration code.
-
Sometimes you want to represent events as things that occurred. Sometimes
you want to represent them as the current state of something after the
occurrence. Some people will tell you not to do the latter. Making this a
first class behavior and clearly identifying which pattern an event is using
has been super helpful. We do both.
-
Use the wrapper types in Protobuf. One major shortcoming of Protobuf is that
you can’t tell if something is null or the zero value for its type. The
wrappers fix that.
-
If you have to interact with a DB and publish an event, sometimes the
right pattern is to publish the event, and consume it yourself before
writing to your DB. This helps with consistency issues and allows you to
replay your own events to fix bugs. Lee Marlow, Geoff Smith, and Jeffrey
Matthias called this pattern
“dogfooding”.
Sometimes the right pattern is to send yourself a command instead.
-
Protobuf supports custom annotations. Those can be really helpful for
encoding things like which events are allowed on which bus, which actions are
allowed on the event, etc. Especially helpful when building supporting
libraries in more than one language.
-
Daily replays allow you both an anti-entropy method as well as a daily
stress test of the whole system. This has been great for hammering out
issues. It also guarantees that services can deal with out-of-order events.
They get them at least every day. The main gate to rolling it out was fixing
all of the places this wasn’t right. Now it stays right.
-
Event-based systems make reporting so much easier! And data exports. And
external integrations. And webhooks, and a million other things.
-
The event store is a fantastic audit trail.
-
We sometimes rewrite our event store if we’ve messed something up. We save
up a list of the screw-ups and then do it in batch. We leave the old copy on
S3 and make a new root in the bucket (e.g. v4/). We use Spark to do it. We
don’t remove the old store so it’s there for auditing.
-
Write some tools to make working with your events nicer. We have things to
listen on a bus, to download and decode files from the raw store, etc.
-
Local development seeds are easy when you have events. Just seed the DB with
a local replay of staging/dev data.
Future
On the roadmap for the next year is to take our existing service cookie cutter
repo and enable it to maintain live event-sourced projections in the most
common format for a few very commonly used event types. We’ll snapshot those
nightly and when standing up a new service, we can start from the latest DB
snapshot and only replay since. This will make things even more efficient.