In this post we will explore how we can use some of Elixir's more interesting features to implement event sourcing. OTP is the star of this show, enabling us to do some very nice tricks that would be practically impossible with Ruby. OTP stands for Open Telecom Platform, and is Erlang's answer to the problem of concurrency. OTP relies on immutability of data to provide some very nice abstractions that make working with it very easy.
I will assume that the reader is familiar with OTP, Elixir syntax, and Event Sourcing. If not, I would recommend at least skimming the following links:
Elixir Getting Started - Pattern Matching
Elixir Getting Started - Agent
Greg Young - CQRS and Event Sourcing - Code on the Beach 2014
If you are building an event-sourced system in Ruby, the usual way you would process a command is like this:
With OTP, we can reduce steps 1 and 2 (which are by far the most expensive parts of the process for large aggregates), reducing the entire operation from O(N) to O(1) for an aggregate that has already been loaded into memory.
This approach also eliminates the "optimistic locking" that you usually see in the DB layer in event sourced solutions. With optimistic locking you generally accept that some requests will fail if they are in a race against one other. With this approach, they will all succeed (unless there is application logic that prevents that).
This is possible if we build our system with the following guidelines:
Let's dive right into the code.
I've chosen Redis to back my event store for demonstration purposes but it should be trivial to write an event store using any other storage system. An event store is on its most basic level just a key-value store, with every key pointing to a stack of events.
The EventStore
stores events in the event store, and notifies listeners via EventHandler
(this will come back later). It has only two functions. commit
will add events to a particular aggregate, identified by its uuid. load
simply loads events from a particular aggregate.
#lib/event_store.ex
defmodule EventStore do
@event_store RedisEventStore
def commit(uuid, events) when is_list(events) do
@event_store.commit(uuid, (events |> Enum.map(&EventSerializer.serialize/1)))
notify(uuid, events)
end
def commit(uuid, event), do: commit(uuid, [event])
def load(uuid), do: @event_store.load(uuid) |> Enum.map(&EventSerializer.deserialize/1)
defp notify(_uuid, []), do: :ok
defp notify(uuid, [event | events]) do
EventHandler.handle({uuid, event})
notify(uuid, events)
end
end
We also need a serializer to convert our events from structs to strings and back again.
#lib/event_store.ex
defmodule EventSerializer do
def serialize(event) do
event |> Map.from_struct |> Map.merge(%{__struct__: to_string(event.__struct__)}) |> Poison.encode!
end
def deserialize(serialized_event) do
serialized_event |> Poison.decode!(keys: :atoms) |> decode()
end
def decode(event_map) do
new_event = event_map |> Enum.reduce(%{}, fn ({key, val}, acc) -> Map.put(acc, key, val) end)
[Map.fetch!(new_event, :__struct__)] |> Module.concat() |> struct(new_event)
end
end
Finally, the bit of code that writes and reads from Redis. If you want to implement an event store using another back-end, this is the code you'll have to adapt.
#lib/event_store.ex
defmodule RedisEventStore do
def commit(_uuid, []), do: nil
def commit(uuid, [event|events]) do
{:ok, _} = Redix.command(:redix, ["RPUSH", uuid, event])
commit(uuid, events)
end
def load(uuid) do
{:ok, result} = Redix.command(:redix, ~w(LRANGE #{uuid} 0 -1))
result
end
end
Note that we are not making use of OTP here. As you will see in a moment, we will isolate each aggregate into its own process, which will provide us with the consistency guarantee that we need.
We have made use of the pattern matching that Elixir affords us to deal with lists of events, and check responses of function calls.
The event handler is unremarkable. It has only one function, handle
, which dispatches an event to multiple handlers. Again we are making use of pattern matching to help us process the list of handlers.
# lib/event_handler.ex
defmodule EventHandler do
@handlers [CounterEventHandler]
def handle(_, []), do: :ok
def handle({uuid, event}, [handler | handlers] \\ @handlers) do
:ok = handler.handle(event)
handle({uuid, event}, handlers)
end
end
The event handler for a simple counter might look like this:
# lib/counter.ex
defmodule CounterEventHandler do
alias Model.Counter
def handle(%Counter.Created{aggregate_id: aggregate_id}) do
%Eslixir.Counter{aggregate_id: aggregate_id, count: 0}
|> Eslixir.Repo.insert
:ok
end
def handle(%Counter.Incremented{aggregate_id: aggregate_id, count: count}) do
import Ecto.Query, only: [from: 2, update: 2, where: 2]
from(c in Eslixir.Counter, where: c.aggregate_id == ^aggregate_id)
|> Eslixir.Repo.update_all(set: [count: count])
:ok
end
def handle(_), do: :ok
end
Note the last function definition: def handle(_), do: :ok
. This is necessary to prevent Elixir from complaining that a particular event handler doesn't have a function definition for a particular event. So we simply return :ok
for all events not otherwise handled by a particular event handler.
The aggregate is the first part of our system that will be built using OTP. It's important for our strategy to ensure that there is only ever one process running per aggregate. In order to do this, we are making use of Registry
, which ensures that there is only one process alive per model
/ aggregate_id
combination. This eliminates race conditions and guarantees data consistency.
#lib/aggregate_root.ex
defmodule Aggregate do
use GenServer
def with_aggregate(model, aggregate_id, function) do
:ok = start_link(model, aggregate_id)
via_tuple(model, aggregate_id) |> GenServer.call(function)
end
def start_link(model, aggregate_id) do
name = via_tuple(model, aggregate_id)
case GenServer.start_link(__MODULE__, {model, aggregate_id}, name: name) do
{:ok, _} ->
GenServer.cast(name, :finish_init)
:ok
{:error, {:already_started, _}} ->
:ok
end
end
defp via_tuple(model, aggregate_id) do
{:via, Registry, {:aggregate_repository, {model, aggregate_id}}}
end
# callbacks
#
def init({model, aggregate_id}) do
{:ok, {model, %{aggregate_id: aggregate_id}}}
end
def handle_cast(:finish_init, {model, state}) do
events = if state.aggregate_id, do: EventStore.load(state.aggregate_id), else: []
{:noreply, {model, state_from_events(model, events) || nil}}
end
def handle_call(function, _from, {model, state}) do
case function.({state, []}) do
{_, {:error, reason}} ->
{:reply, {:error, reason}, {model, state}}
{new_state, new_events} ->
EventStore.commit(new_state.aggregate_id, Enum.reverse(new_events))
{:reply, :ok, {model, new_state}}
end
end
defp state_from_events(model, events) do
events
|> Enum.reduce(nil, fn(event, state) -> model.next_state(event, state) end)
end
end
The aggregate instance is also a GenServer
, capturing the state of the aggregate, and ensuring data consistency by ensuring that all writes for a particular aggregate get executed in a single process. This eliminates race conditions present in typical CRUD applications (which may have multiple "read -> compute new state -> write" processes computing at once).
In start_link
, we send an asynchronous message to the aggregate to tell it to finish its own initialization. The aggregate must read its events from the event store and build up its state before it is ready to do any computation. start_link
is always run in the Aggregate.Repository
process, but we want EventStore.load
to execute in the Aggregate.Instance
process. This will reduce the amount of computation done in Aggregate.Repository
, improving its throughput. Aggregate.Repository
is a bottleneck in the system, since every write request will be using it.
Because we are writing this in a functional language, appending to lists is an expensive O(N) operation, while prepending to lists is O(1). This is due to immutability and the structure of linked lists. So while it may seem logical to append new events, we will actually prepend them and then reverse the order later with Enum.reverse
.
If the model returns {:error, reason}
, then the state reverts to the old state (or more accurately is simply not updated), and nothing is written to the event store. Only when the result of function
is a valid new state, does the state get updated and the new events get written to the event store.
In order to have as many aggregates loaded in memory as possible it makes sense to limit the amount of state that will be tracked to only that which is needed to perform business logic. Data that is not used in invariants should not be tracked in the State
struct of an aggregate.
The function Aggregate.with_aggregate/3
is worth mentioning. It takes a model, aggregate id, and function as arguments. It calls GenServer.call
with the via tuple (which routes the call through Registry
) and function as arguments. This sends the function to the aggregate process, which then executes it. This approach eliminates race conditions and guarantees that our data will always be consistent.
Of course, if you run this in a production environment, sooner or later you are going to run out of memory as your system loads more and more aggregates into memory. Setting an upper limit on the number of processes would be a good idea. There are many strategies possible when deciding which processes to kill when you reach the upper limit, but that's far outside of the scope of this post.
#lib/counter.ex
defmodule Model.Counter do
defmodule State, do: defstruct [:aggregate_id, :count]
defmodule Create, do: defstruct [:aggregate_id]
defmodule Increment, do: defstruct [:aggregate_id]
defmodule Created, do: defstruct [:aggregate_id]
defmodule Incremented, do: defstruct [:aggregate_id, :count]
def create({state, new_events}, aggregate_id) do
if(state) do
{state, {:error, "Already created"}}
else
{state, new_events} |> apply_event(%Created{aggregate_id: aggregate_id})
end
end
def increment({state, new_events}) do
{state, new_events} |> apply_event(%Incremented{aggregate_id: state.aggregate_id, count: state.count + 1})
end
def next_state(%Created{aggregate_id: aggregate_id}, _), do: %State{aggregate_id: aggregate_id, count: 0}
def next_state(%Incremented{aggregate_id: aggregate_id, count: count}, state), do: %State{state | count: count}
defp apply_event({state, {:error, reason}}, _), do: {state, {:error, reason}}
defp apply_event({state, new_events}, event) do
{ next_state(event, state), [event | new_events] }
end
end
This model is defining a simple counter. Every model has a State
struct, that's what's going to be held in memory in the Aggregate.Instance
process. There are also some commands defined here (Create
, Increment
) and some events (Created
, Incremented
).
Model.Counter.create/2
applies Created
, but only if the counter has not already been created.
Model.Counter.increment/2
applied Incremented
, and increases the count by 1.
Model.Counter.next_state
is used to translate events to state. This is used both when the aggregate is being loaded from the event store, and when a new event is applied to the aggregate. This way the state is always kept up to date.
Model.Counter.apply_event/3
simply coordinates setting the new state and prepending the new event.
The command service will handle dispatching a command to the right command handler.
#lib/command_service.ex
defprotocol CommandService do
def execute(command)
end
CommandService
is defined using a protocol (since we only want a command to be processed once). Each command will have its own implementation of the CommandService
protocol.
#lib/counter.ex
defimpl CommandService, for: Model.Counter.Create do
alias Model.Counter
def execute(command) do
Aggregate.with_aggregate(Counter, command.aggregate_id, fn(state) ->
state |> Counter.create(command.aggregate_id)
end)
end
end
defimpl CommandService, for: Model.Counter.Increment do
alias Model.Counter
def execute(command) do
Aggregate.with_aggregate(Counter, command.aggregate_id, fn(state) ->
state |> Counter.increment
end)
end
end
One execute function can also describe multiple instructions. These are chainable, so the following code is also possible:
def execute(%Counter.Increment2{}) do
Aggregate.with_aggregate(Counter, aggregate_id, fn(state) ->
state
|> Counter.increment
|> Counter.increment
end)
end
With Elixir we can easily implement a robust event sourcing system. Thanks to OTP, we can maintain a single, lightweight process per aggregate, that is accessible to the entire Elixir runtime (even transparently across multiple servers). We can avoid the complexity of snapshotting, and increase throughput by requiring fewer database accesses (and in some cases -- if an invariant is violated -- none), increasing throughput. Perhaps more importantly, because our data is immutable, this can be done without locking, and concurrently across however many processes or servers we would like.
Edited 19 May 2017: Use Registry
instead of homemade Aggregate.Repository
and implement CommandService
as a protocol.