In the first part of this series we developed a simple blogging application that uses event sourcing to keep track of all changes made by the users. However, we did not yet write these events to durable storage. This means all data is lost when the application is restarted, which is clearly not acceptable. Saving and restoring events will be the responsibility of the event store, which we’ll start implementing in this part. But before we get to actually writing events to disk, we must first tackle the problem of maintaining data consistency when using event sourcing.
You can find the code on github in the part-2
branch. Use git clone -b
part-2 https://github.com/zilverline/event-sourced-blog-example.git
to checkout the correct branch. If you already retrieved the previous part, then
using git pull
and git checkout part-2
should do the trick.
Something you’ll quickly have to learn when you develop web applications is that the internet is a truly distributed environment. You’ll often run your application across multiple servers (for performance and/or fault-tolerance), while users access your application from different devices all across the globe. All of these servers and devices will have a different view of the world. In a distributed world there is no such thing as a single, global truth. As soon as a web page is rendered by a server and the page is displayed by the client, the data on the page is already out of date. If you do not handle this correctly, some surprising things may happen.
An easy way to see this is to start editing a blog post, then use another browser window to delete the same blog post, and then submit the original
edit form. In the application from part 1 this results in a NoSuchElementException
while processing the event to update the current state. In the
Rails getting started application you get a ActiveRecord::RecordNotFound
. Other frameworks
that I know of aren’t any better at handling this. Back button usage and double-clicking also causes many of the same problems. And the more
collaberative your application is, the more likely it is that these inconsistencies and conflicts will occur for real.
Many current web applications deal with this by relying on a combination of ad-hoc solutions, prayer, and luck1. You just have to hope that the occasional lost update, exception or other glitch won’t cause too much trouble.
Since events are forever, we cannot afford to let these glitches go undetected. Otherwise we could get strange, meaningless event sequences, such as having a blog post being edited after it has been deleted! So one major responsibility of an event store is to detect these conflicts and prevent storing inconsistent events.
To implement this our event store is going to keep track of multiple event streams. Each event stream has an associated stream revision, which is defined to be equal to the number of commits for that particular event stream. This makes the stream revision a gapless, strictly increasing sequence number.
When you commit additional events to an event stream, you will have to specify the stream revision that you expect the stream to currently have. If the expected stream revision is lower than the current stream revision, there is a conflict. This approach is also known as optimistic concurrency control.
We use multiple event streams so that commits that go to different event streams do not conflict: each event stream defines a consistency boundary. In our example application we’ll map each blog post to its own event stream. So editing or deleting two different posts never cause conflicts.
There is another problem that we have to solve, related to using a memory image. In a database oriented application the central database is considered to give a single, up-to-date, consistent view of the current state of the application. Database transactions are used to control concurrency conflicts (but make sure you choose the correct isolation level!). Since the database is a single, centralized component any read after a transaction commit will return the updated data (until you introduce master-slave replication or caching).
With a memory image the situation is slightly different. After changes are successfully committed to the event store, the memory images of every application server is not immediately updated! The newly committed events must first be transferred to the application servers and applied to the memory image before the effects of the change becomes visible:
So to avoid having a fast client not seeing the results of the action they just performed, we’ll also keep track of the event store revision, which is defined to be equal to the total number of commits to the event store. Again, this produces a gapless, strictly increasing sequence number. Whenever a client is redirected to see the results of a commit, we’ll check the current store revision to check if our memory image is up-to-date.
There are a couple of ways to implement this:
If the above got you thoroughly confused, don’t worry too much. The event store implementation is actually not that complicated. To make the first implementation easier to understand we will not worry about durability yet. In the next part of this series we’ll build an implementation that uses an actual database (Redis). We’ll then compare the fake implementation to the Redis implementation to check that both behave the same.
The store and stream revisions are represented by the StoreRevision
and StreamRevision
classes and can be found
in EventStore.scala. They simply wrap a Long
value. The main reason not to use plain Long
s is to avoid accidental mixups. Plain numbers that are related but actually mean different things are
hard to keep straight otherwise!
Committing events to the event store uses the EventCommitter
interface:
/**
* Commits events to an event store.
*/
trait EventCommitter[Event] {
def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event]
}
The tryCommit
method will attempt to commit the given event to the event stream. The result type is defined as follows:
/**
* The result of a commit attempt is either a `Conflict` or a successful `Commit`.
*/
type CommitResult[+Event] = Either[Conflict[Event], Commit[Event]]
/**
* A successful commit to `streamId`.
*/
case class Commit[+Event](
storeRevision: StoreRevision,
timestamp: Long,
streamId: String,
streamRevision: StreamRevision,
events: Seq[Event])
/**
* The conflict that occurred while trying to commit to `streamId`.
*/
case class Conflict[+Event](
streamId: String,
actual: StreamRevision,
expected: StreamRevision,
conflicting: Seq[Commit[Event]])
In other words, tryCommit
returns either a Conflict
or a Commit
. The conflict will contain the actual stream revision and all conflicting
commits (any commit that happened since the expected stream revision). When a commit succeeds, it will contain the committed event as well as some
metadata, such as the timestamp of the commit.
You may have noticed that the Commit
class allows for multiple events in a single commit. This can often be useful if you want a group of events to
be treated atomically by subscribers. Another reason for this is that as your application evolves some events may no longer be of interest. These
events can then be filtered out without altering the store or stream revisions the commit is part of. In other words, you may have commits that no
longer hold any events!
To rebuild the memory image and receive commits as they happen the CommitPublisher
interface is defined:
/**
* Publishes successful commits to subscribers.
*/
trait CommitPublisher[Event] {
/**
* Notifies `listener` of all commits that happened `since`. Notification happens asynchronously.
*/
def subscribe(since: StoreRevision)(listener: Commit[Event] => Unit): Subscription
}
/**
* A subscription that can be cancelled.
*/
trait Subscription {
def cancel(): Unit
}
After subscribing to the event store all commits that happened after the since
store revision will be passed to the listener
callback. The event
store will ensure that commits are send to the listener in-order and without any gaps or duplicates. Notice that notification of commits happens
asynchronously.
Two more interfaces are defined that complete the event store API:
/**
* Reads commits from the event store.
*/
trait CommitReader[Event] {
def storeRevision: StoreRevision
def readCommits(since: StoreRevision, to: StoreRevision): Stream[Commit[Event]]
def streamRevision(streamId: String): StreamRevision
def readStream(streamId: String, since: StreamRevision, to: StreamRevision): Stream[Commit[Event]]
}
/**
* The event store API.
*/
trait EventStore[Event] {
def reader: CommitReader[Event]
def committer: EventCommitter[Event]
def publisher: CommitPublisher[Event]
def close(): Unit
}
The CommitReader
interface allows access to stored commits (using Scala’s lazy Stream
s to avoid having to load all commits in memory at once) and
the EventStore
interface simply combines the other interfaces and a close
method into a single unit.
The FakeEventStore
implementation can be found
in FakeEventStore.scala. It uses a
simple Vector
(Scala’s immutable equivalent to Java’s ArrayList
) to store all events committed to the store and a map of streamId
to Vector
for the commits associated with each event stream. All commits are stored in their original order. Scala’s Software Transactional Memory (STM) is
again used to ensure consistency:
class FakeEventStore[Event] extends EventStore[Event] {
// [...]
private[this] val commits = Ref(Vector.empty[Commit[Event]]).single
private[this] val streams = Ref(Map.empty[String, Vector[Commit[Event]]]).single
override object reader extends CommitReader[Event] {
override def storeRevision = StoreRevision(commits().size)
override def readCommits(since: StoreRevision, to: StoreRevision): Stream[Commit[Event]] = {
commits().slice(
(since.value min Int.MaxValue).toInt,
(to.value min Int.MaxValue).toInt).toStream
}
// [...]
}
// [...]
}
The CommitReader
implementation just uses regular Scala collection manipulation functions. Notice that the implementation of storeRevision
is
precisely the definition of a store revision as discussed in “read consistency” above! The streamRevision
and readStream
methods (not shown) are
implemented similarly.
The EventCommitter
tryCommit
method is also straightforward:
override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] = {
require(Txn.findCurrent.isEmpty, "the fake event store cannot participate in an STM transaction, just like a real event store")
atomic { implicit txn =>
val actual = streamRevision(streamId)
if (expected < actual) {
val conflicting = readStream(streamId, since = expected)
Left(Conflict(streamId, actual, expected, conflicting))
} else if (expected > actual) {
throw new IllegalArgumentException("expected revision %d greater than actual revision %d" format (expected.value, actual.value))
} else {
val commit = Commit(storeRevision.next, DateTimeUtils.currentTimeMillis, streamId, actual.next, Seq(event))
commits.transform(_ :+ commit)
streams.transform(streams => streams.updated(streamId, streams.getOrElse(streamId, Vector.empty) :+ commit))
Right(commit)
}
}
}
First a check is done to ensure that no STM transaction is currently active, to make sure that we correctly mimic a real event store. Then the
expected stream revision is checked against the actual revision. If there is a mismatch a conflict or error is reported. If they match, a new Commit
is instantiated and appended to the commits
and streams
collections. The new commit is then returned. All this is done inside a STM transaction to
ensure atomicity.
The final part of the event store implementation deals with notification and is the most tricky. STM again helps to keep complexity under control:
private[this] val closed = Ref(false).single
private[this] val executor = Executors.newCachedThreadPool
override object publisher extends CommitPublisher[Event] {
override def subscribe(since: StoreRevision)(listener: Commit[Event] => Unit): Subscription = {
val cancelled = Ref(false).single
val last = Ref(since).single
executor.execute(new Runnable {
@tailrec override def run {
// Wait for new commits or subscription termination.
val pending = atomic { implicit txn =>
if (closed() || cancelled()) None else {
val pending = commits().drop(last().value.toInt)
if (pending.isEmpty) retry
else Some(pending)
}
}
pending match {
case None => // Stop.
case Some(commits) =>
// Notify listener and go back to run.
commits.foreach { commit =>
listener(commit)
last() = commit.storeRevision
}
run
}
}
})
// Return a subscription instance that can be used for cancellation.
new Subscription {
override def cancel() = cancelled.set(true)
override def toString = "Subscription(" + last() + ", " + cancelled() + ", " + FakeEventStore.this + ")"
}
}
}
The closed
reference is used to communicate that the event store has been closed. The executor
thread pool is used to run each subscription on its
own background thread2 so that we can correctly mimic a real event store.
When a subscription is made, we create two more references: cancelled
to communicate that this subscription has been cancelled and last
to keep
track of the last commit that has been passed to the listener
.
A notification thread is then started that continuously checks if there are any new commits that the listener needs to be notified of. If there are none, the STM transaction is retried. Retry internally uses blocking to avoid needless looping to check for new event store commits.
If the event store is closed or the subscription is cancelled the subscription thread terminates.
The tests for the event store can be found in EventStoreSpec.scala. The tests are implemented as a trait and the FakeEventStoreSpec simply extends this trait. This allows us to use the same tests for all event store implementations.
The memory image itself is now implemented in
its own class. It basically wraps an
EventStore
while allowing access to the current state.
/**
* A `MemoryImage` tracks an underlying event store and uses the provided
* `initialState` and `update` to project the committed events onto the
* current state.
*/
class MemoryImage[State, Event] private
(eventStore: EventStore[Event])
(initialState: State)
(update: (State, Commit[Event]) => State)
extends EventCommitter[Event] {
private[this] val state = Ref(initialState)
private[this] val revision = Ref(StoreRevision.Initial)
/**
* The current state of the memory image with at least all commits applied
* that have been committed to the underlying event store.
*/
def get: State = {
val minimum = eventStore.reader.storeRevision
atomic { implicit txn =>
if (revision() < minimum) retry
else state()
}
}
/**
* Commits an event to the underlying event store. The memory image will be
* updated if the commit succeeds.
*/
override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] =
eventStore.committer.tryCommit(streamId, expected, event)
override def toString = "MemoryImage(%s, %s)".format(revision.single.get, eventStore)
// Subscribe to the underlying event store and apply every commit to the
// current state using the provided `update` function.
eventStore.publisher.subscribe(StoreRevision.Initial) { commit =>
atomic { implicit txn =>
require(revision().next == commit.storeRevision, "expected: " + revision().next + ", got " + commit.storeRevision)
state.transform(s => update(s, commit))
revision() = commit.storeRevision
}
}
}
The memory image takes three constructor parameters: the event store, the initial state, and the update function that takes the current state and a
commit to produce the updated state. The memory image also implements the EventCommitter
interface and simply passes any commit attempts to the
underlying event store.
The get
method first checks if the current memory image is up-to-date. If it isn’t, it blocks until the required number of commits have been
applied. Otherwise it simply returns the current state. This ensures the required level of read consistency.
When the memory image is instantiated it also subscribes to the underlying event store and uses the provided update
function to ensure the current
state reflects all commits received from the event store.
Now that we have a (fake) implementation of the event store, we can start adjusting the PostsController
to use it to store events. Before we simply
had a posts
reference to the latest data and a commit
method to commit events. We’ll re-implement these to use the memory image:
class PostsController(memoryImage: MemoryImage[Posts, PostEvent]) extends Controller {
/**
* The current blog posts from the memory image.
*/
def posts(): Posts = memoryImage.get
/**
* Commits an event and applies it to the current state. If successful the
* provided callback `f` is applied to the `commit`. Otherwise a conflict
* result is returned.
*/
private[this] def commit(expected: StreamRevision, event: PostEvent)
(f: Commit[PostEvent] => Result): Result = {
memoryImage.tryCommit(event.postId.toString, expected, event) match {
case Left(conflict) => Conflict(todo())
case Right(commit) => f(commit)
}
}
As you can see the commit
method parameter list was expanded to include the expected stream revision and a callback used to render an HTTP response
if the commit succeeds. However, if a conflict is detected an HTTP 409 (Conflict) status code is returned. Giving the client detailed information on
the conflict is not implemented yet, so we simply render a todo
page.
The controller actions that generate events are all related to HTTP POST requests and need to be adjusted to handle the new expected stream revision
parameter and conflict response. Since only the client knows the revision used to render the page by the time the POST request is submitted, the
expected stream revision is added as a new URL parameter. Listed below are the show
and submit
actions related to the “edit post” page:
def show(id: PostId) = Action { implicit request =>
posts().get(id) match {
case Some(post) => Ok(views.html.posts.edit(post.id, post.revision, postContentForm.fill(post.content)))
case None => NotFound(notFound(request, None))
}
}
def submit(id: PostId, expected: StreamRevision) = Action { implicit request =>
postContentForm.bindFromRequest.fold(
formWithErrors => BadRequest(views.html.posts.edit(id, expected, formWithErrors)),
postContent =>
commit(expected, PostEdited(id, postContent)) { commit =>
Redirect(routes.PostsController.show(id)).flashing("info" -> "Post saved.")
}
)
}
Compared to the previous version there are two main changes:
Since Play! 2 view templates and URLs are strongly typed it is also impossible to forget to pass this additional expected revision parameter, something that can happen easily with many other solutions, such as hidden “version” form fields.
Since we need to know the current revision of a blog post when rendering the edit or delete actions we need to keep track of this in our model classes. This is done by adding a new field to the Post class and setting this based on the stream revision associated with the event:
def update(event: PostEvent, revision: StreamRevision): Posts = event match {
case PostAdded(id, content) =>
this.copy(byId = byId.updated(id, Post(id, revision, content)), orderedByTimeAdded = orderedByTimeAdded :+ id)
case PostEdited(id, content) =>
this.copy(byId = byId.updated(id, byId(id).copy(revision = revision, content = content)))
case PostDeleted(id) =>
this.copy(byId = byId - id, orderedByTimeAdded = orderedByTimeAdded.filterNot(_ == id))
}
This concludes our tour of the code. There were only minor changes to the code related to the application functionality, as most of the code is related to introducing the event store as a piece of application infrastructure.
Introducing the fake event store causes about a 3%-10% drop in performance compared to having no event store at all. Writes were affected the most, while read performance is almost the same. This is to be expected, since writes now use a background thread to update the memory image, while reads only add a quick check to the memory image to see if it is up-to-date with respect to the event store. Of course, we’re still not persisting the events to durable storage so we are mostly CPU bound. Writing to disk will obviously introduce new bottlenecks and we’ll go into a more detailed performance analysis in the next part.
With the introduction of the fake event store implementation our application’s architecture is now in place. The fake event store has the same behavior as a real event store (except for durability) and is very useful in combination with automated testing. The tests for the fake event store will also be used to check that the real event store behaves correctly. No architectural changes will be needed to our application when this new event store is available.
In the next part we’ll use Redis to implement an event store that will actually write the events to durable storage and will be used to replay the events to restore the memory image when the application server is started.
Footnotes:
Primarily by defining various database constraints and by using row-level optimistic locking. These can help detect and prevent consistency problems, but do not help much with resolving these conflicts, something we’ll come back to in a later part of this series. ↩
Since we do not expect to have many concurrent subscriptions to a single event store from a single JVM, threads are perfectly fine and do not limit scalability or performance. If you need many concurrent subscribers (for example, to push events to clients) then you should probably put an event bus or actor between the event store subscription and the clients. ↩