In the previous part we developed a fake event store for our blogging application. This event store just kept all events in-memory, making it unsuitable for production use. But it did allow us to adapt our application to using an event store, and let us work out the details of the event store interface without having to worry about an actual persistence mechanism.
In this part we’ll develop an event store implementation on top of Redis, a key-value store with support for additional data structures (such as lists and hashes), publish/subscribe, and transactions. If you’re more interested in adding new application functionality, you can safely skip this part and go to part 4 – conflict resolution.
So we already have an event store specification and a reference implementation. Implementing a Redis-based event store should be pretty straight forward, right? Well, it mostly is, but before we dive into the details let’s take a look at what we’ll be able to do once everything is up and running.
First, make sure you have a working Redis installation (you can use the redis-cli
command to connect to Redis and check if everything is
working). Redis 2.6 introduced support for server-side Lua scripts which the event store implementation uses automatically if available. Otherwise,
the event store falls back to using the WATCH/MULTI/EXEC commands, so Redis 2.4 also works, although with a
performance penalty when it comes to committing events.
If you use a Mac OS X with homebrew you can simple run brew install redis
to install 2.4.15 or brew install redis --devel
to install
2.6.0-RC5.
Now checkout the part-3
branch of the project, adopt conf/application.conf
to
match your Redis configuration, and start the application using play run
or sbt run
. Any blog posts you add, edit, or delete will now have the
resulting events committed to Redis. If you restart the application the events are replayed on start-up, so you will no longer lose your data.
Start the Scala console (play console
or sbt console
) and paste the following Scala code (adjust the connection settings as needed).
import events._, eventstore._
val es = redis.RedisEventStore[PostEvent]("blog", "localhost", 6379)
es.publisher.subscribe(StoreRevision.Initial)(println)
This code connects to the event store and subscribes the println
method. You’ll see that first all historical commits are printed, and as you use
the application you’ll notice that new commits are immediately printed as well. This is one of the major differences with more traditional database
backed systems: an event sourced system is open for extension and can communicate to the outside world what is going on right now, while a database
based system is usually much more of a black box.
Now start a second instance of the application (play 'run 9001'
or sbt 'run 9001'
). This second instance also connects to the event store to
replay and subscribe to the events being committed. The memory images of both instances stay synchronized as you make changes, while the Scala console
keeps printing events as they are committed (make sure you close the event store before exiting the console, otherwise the process stays connected and
will fail to terminate). The picture below shows the general setup:
So by using the Redis event store we can run multiple instances of the application, each running on a different server. Each commit from any of the application servers is pushed to all connected servers. This provides fault-tolerance and scalability at the application server level, and also makes it possible to upgrade application servers without affecting users (by doing rolling upgrades).
When data is stored in an external system we need to define some kind of serialization format. There is a wide variety of serialization formats and libraries available. For this example application we’ll use JSON. This may not be the most compact or fastest format, but is easy to use and read, which is great for this example. Play! defines the Format trait for JSON serialization. We just have to provide an implementation of this trait for each data type that we want to serialize.
Although Play! predefines various JSON format implementations for many of the standard Scala classes, it does not include any helpers to make it easy to define formats for our own types. But it is easy to supply our own helpers, which you can find in the JsonMapping object.
The objectFormat
methods are used to
define the JSON format for case classes such as PostContent
:
implicit val PostContentFormat: Format[PostContent] =
objectFormat("author", "title", "body")(PostContent.apply)(PostContent.unapply)
Here we simply list the JSON field names and the Scala generated apply
and unapply
methods to construct and deconstruct PostContent
instances. Formats for class hierarchies like PostEvent
are a bit more difficult to get right, but easy enough to define with the typeChoiceFormat
helper:
implicit val PostEventFormat: Format[PostEvent] = typeChoiceFormat(
"PostAdded" -> objectFormat("postId", "content")(PostAdded.apply)(PostAdded.unapply),
"PostEdited" -> objectFormat("postId", "content")(PostEdited.apply)(PostEdited.unapply),
"PostDeleted" -> objectFormat("postId")(PostDeleted.apply)(PostDeleted.unapply))
This may not be quite as convenient as using a reflection based serialization library, but avoids having to deal with various JVM class loader problems and other reflection related troubles.
The formats are defined as implicit
values in the companion objects of the classes that we want to serialize so that the Scala compiler can find
these formats and automatically pass them as parameters where needed, so that we don’t have to. (Yes, the type “checker” is filling in the blanks for
you.)
Writing tests for correct serialization and deserialization is rather tedious, but very important. Fortunately, ScalaCheck makes writing these kind of tests almost trivial:
"Post events" should {
"convert to and from JSON" in forAll(eventsForMultiplePosts.arbitrary) { events =>
Json.fromJson[List[PostEvent]](Json.toJson(events)) must_== events
}
"parse example Post Added event" in {
val event = PostAdded(PostId(UUID.fromString("5ab11526-477b-43b9-8fe6-4bb25a3dfcc6")), PostContent(author = "Author", title = "Title", body = "Body"))
val json = """{"type":"PostAdded","data":{"postId":"5ab11526-477b-43b9-8fe6-4bb25a3dfcc6","content":{"author":"Author","title":"Title","body":"Body"}}}"""
Json.fromJson[PostEvent](Json.parse(json)) must_== event
}
}
For the serialization tests I typically test using a few examples, but the main testing happens by defining the property fromJson(toJson(events))
== events
. ScalaCheck will randomly generate data to verify that this property holds, so we can be confident serialization works as expected.
The Redis event store implementation uses the Jedis client library. For reading and committing we use a
JedisPool
connection pool. To use a connection and return it to the pool we’ve implemented
the
loan pattern in
the withJedis
method:
class RedisEventStore[Event]
(name: String, host: String, port: Int, config: Config)
(implicit val eventFormat: Format[Event])
extends EventStore[Event] {
val jedisPool = new JedisPool(config, host, port)
def withJedis[A](f: Jedis => A): A = {
val jedis = jedisPool.getResource
try {
f(jedis)
} finally {
jedisPool.returnResource(jedis: BinaryJedis)
}
}
// [...]
}
To store the commits we’ll use two Redis data structures:
This means that if we want to read all past commits we simply need to read each hash value starting with commit id 1 up to the commit id equal to the size of the hash. The code for this can be found in the reader implementation of RedisEventStore.scala:
override object reader extends CommitReader[Event] {
override def storeRevision = withJedis { jedis =>
StoreRevision(jedis.hlen(CommitsKey))
}
override def readCommits(since: StoreRevision, to: StoreRevision) = {
val current = storeRevision
if (since >= current) Stream.empty else {
val revisionRange = (since.value + 1) to (to.value min current.value)
doReadCommits(revisionRange.map(_.toString))
}
}
The current store revision is equal to the size of the commits hash (HLEN). The commits are all read in order using
the doReadCommits
method:
val KeyPrefix = name + ":"
val CommitsKey: String = KeyPrefix + "commits"
def doReadCommits(commitIds: Seq[String]): Stream[Commit[Event]] = {
val chunks = commitIds.grouped(ChunkSize).map(_.toArray)
chunks.flatMap { chunk =>
val serializedCommits = withJedis { _.hmget(CommitsKey, chunk: _*) }
serializedCommits.asScala.par.map(deserializeCommit)
}.toStream
}
def deserializeCommit(serialized: String) =
Json.fromJson[Commit[Event]](Json.parse(serialized))
The list of commit ids to read is first split into chunks, each at most ChunkSize
(10,000) elements in size. Each chunk is then read using a
single get multiple values from hash (HMGET) command. By reading many commits at a time we greatly reduce the
number of process context switches. The commits are then deserialized using the deserializeCommit
method, which is just a simple wrapper around the
Play! JSON deserializer. Since deserialization is a CPU intensive process we use Scala’s parallel collections (par
) to get a nice speed-up on a
multi-core machine.
The resulting commits are then transformed into a lazy Stream
. This way we do not have to read all the commits into memory at once, but we’ll read
them on-demand, one chunk at a time.
There are two implementations of
the EventCommitter
interface. The
RedisWatchMultiExecEventCommitter uses
the Redis WATCH/MULTI/EXEC
commands and is compatible with Redis 2.4 and higher. But it’s slower and more complicated than
the
RedisLuaEventCommitter
that we’ll describe below.
The RedisLuaEventCommitter
uses a Lua script to atomically commit events in Redis. The script is fairly similar to the commit
implementation of
the FakeEventStore:
{% highlight scala linenos %} trait RedisLuaEventCommitter[Event] { this: RedisEventStore[Event] => val TryCommitScript: String = """ | local commitsKey = KEYS[1] | local streamKey = KEYS[2] | local timestamp = tonumber(ARGV[1]) | local streamId = ARGV[2] | local expected = tonumber(ARGV[3]) | local events = ARGV[4] | | local actual = tonumber(redis.call(‘llen’, streamKey)) | if actual ~= expected then | return {‘conflict’, tostring(actual)} | end | | local storeRevision = tonumber(redis.call(‘hlen’, commitsKey)) | local commitId = storeRevision + 1 | local commitData = string.format(‘{"storeRevision":%d,"timestamp":%d,"streamId":%s,"streamRevision":%d,"events":%s}’, | commitId, timestamp, cjson.encode(streamId), actual + 1, events) | | redis.call(‘hset’, commitsKey, commitId, commitData) | redis.call(‘rpush’, streamKey, commitId) | redis.call(‘publish’, commitsKey, commitData) | | return {‘commit’, tostring(commitId)} """.stripMargin
// [...] } {% endhighlight %}
The RedisLuaEventCommitter
must be mixed into the RedisEventStore
class, so we specify this using
a self type (line 1). This gives us access to all RedisEventStore
methods.
The first section of the Lua script reads the command arguments (line 3-8). The second section checks if the expected stream revision matches the actual stream revision. If not, a conflict is returned (line 10-13).
Otherwise the current store revision is determined. The commit id is set to the next store revision and the serialized commit JSON is generated (line 15-18). Then three commands are executed to:
commitId
is appended to the commits of the affected stream (line 21).When starting the event store the above Lua script is uploaded to Redis (using SCRIPT LOAD) and the resulting script identifier (the SHA-1 hash of the script contents) is saved:
val TryCommitScriptId = withJedis { _.scriptLoad(TryCommitScript) }
The implementation of the tryCommit
is now quite straightforward. We simple need to invoke the Lua script using
the EVALSHA command and translate the results to Scala:
object committer extends EventCommitter[Event] {
override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] = {
// Prepare parameters.
val timestamp = DateTimeUtils.currentTimeMillis
val serializedEvents = Json.stringify(Json.toJson(Seq(event))(Writes.seqWrites(eventFormat)))
// Invoke Lua script.
val response = withJedis { _.evalsha(TryCommitScriptId, 2,
/* KEYS */ CommitsKey, keyForStream(streamId),
/* ARGV */ timestamp.toString, streamId, expected.value.toString, serializedEvents)
}
// Parse response.
try {
response.asInstanceOf[java.util.List[_]].asScala match {
case Seq("conflict", actual: String) =>
val conflicting = reader.readStream(streamId, since = expected)
Left(Conflict(streamId, StreamRevision(actual.toLong), expected, conflicting))
case Seq("commit", storeRevision: String) =>
Right(Commit(StoreRevision(storeRevision.toLong), timestamp, streamId, expected.next, Seq(event)))
}
} catch {
case e: Exception =>
throw new EventStoreException("Error parsing response from Redis TryCommit script: " + response, e)
}
}
}
Redis has built-in support for publish/subscribe, which we will use to notify each subscriber when new events are committed. But before we actually subscribe to Redis, we first need to replay historical commits. The reason to not subscribe before replaying is that replaying might take some time (if you have many historical commits) and we do not want any new commits to queue-up while we’re still processing the older commits:
override object publisher extends CommitPublisher[Event] {
import reader._
override def subscribe(since: StoreRevision)(listener: CommitListener[Event]): Subscription = {
@volatile var cancelled = false
@volatile var last = since
val unsubscribeToken = UUID.randomUUID.toString
executor.execute(new Runnable {
private def replayCommitsTo(to: StoreRevision) {
if (last < to) {
Logger.info("Replaying commits since " + last + " to " + to)
readCommits(last, to).takeWhile(_ => !closed && !cancelled).foreach(listener)
last = to
}
}
// [...]
}
}
}
The last
variable contains the store revision of the last commit we passed to the listener so far. It is initialized to the since
parameter. We
then start a new thread to perform the actual commit replay and Redis subscription. This is consistent with our fake event store implementation.
The cancelled
flag is used to help with unsubscribing when the subscription is cancelled. In addition to this, the unsubscribeToken
is sent to the
subscriber as well so that the subscriber wakes up when required.
After replaying (line 7 below) the historical commits we perform the actual subscription using a new Redis connection (line 12):
{% highlight scala linenos %} override def run { val currentRevision = storeRevision if (last > currentRevision) { Logger.warn("Last " + last + " is in the future, resetting it to current " + currentRevision) last = currentRevision } else { replayCommitsTo(currentRevision) }
val jedis = new Jedis(host, port) try { jedis.subscribe(Subscriber, ControlChannel, CommitsKey) } finally { jedis.disconnect } } {% endhighlight %}
The Subscriber
object implements the JedisPubSub
interface:
object Subscriber extends JedisPubSub {
When the subscription is confirmed we check if we missed any commits that happened while we were performing the initial replay, but before completing the subscription:
override def onSubscribe(channel: String, subscribedChannels: Int) = channel match {
case ControlChannel =>
// We may have missed the cancellation token while subscribing, so check the flag.
if (closed || cancelled) unsubscribe
case CommitsKey =>
// We may have missed some commits while subscribing, so replay missing if needed.
replayCommitsTo(storeRevision)
case _ =>
Logger.warn("message received on unknown channel ‘" + channel + "’")
}
Once we’re subscribed we’ll receive a notification for each commit:
override def onMessage(channel: String, message: String) = channel match {
case ControlChannel =>
if (message == CloseToken || message == unsubscribeToken) {
unsubscribe
}
case CommitsKey =>
val commit = deserializeCommit(message)
if (last.next < commit.storeRevision) {
Logger.warn("missing commits since " + last + " to " + commit.storeRevision + ", replaying...")
replayCommitsTo(commit.storeRevision)
} else if (last.next == commit.storeRevision) {
listener(commit)
last = commit.storeRevision
} else {
Logger.warn("Ignoring old commit " + commit.storeRevision + ", since we already processed everything up to " + last)
}
case _ =>
Logger.warn("message received on unknown channel ‘" + channel + "’")
}
We deserialize and forward any new commit to the listener. In some case we may receive old commits, which we ignore. In case we miss any commits, we replay the missing ones. The control channel is monitored for event store closing or subscription cancellation messages.
This completes the implementation of the Redis event store. For full details, check the eventstore.redis package.
Now that we have multiple event store implementations we need to provide some configuration options. This is done in
the controllers.Global object in the
onStart
method. The actual configuration can be found in
the conf/application.conf file.
Since our event store exposes a blocking API (for ease of implementation) we also increase the Play! thread-pool sizes1.
The primary concern of an event store is to ensure events are written to durable storage, so no events get lost. To configure Redis
for maximum reliability of written data we need to edit the redis.conf
file to:
save
settings.appendonly yes
).appendfsync always
.auto-aof-rewrite-percentage 0
). We’ll only ever be adding new data to Redis, so
rewriting will not result in a smaller append-only file anyway.To test and compare the performance of the various event store implementations the following setup was used:
deler
(dutch for denominator), a 2010 dual-core 2.66GHz Core i7 with hyper-threading and a 7200-RPM HDD. The
server was started using the start-server.sh script.noemer
(dutch for numerator), a 2012 quad-core 2.6GHz Core i7 with hyper-threading. The benchmark was
started using the run-benchmark.sh script with the
following arguments: http://deler.local:9000 50 500000
.The server and client machine were directly connected with a 1 Gigabit ethernet cable and were otherwise idle. This setup ensures that neither the client nor the network was a bottleneck so that we can fully test the performance of the server code. Both computers were running the Java SE 7u5 JDK.
The benchmark itself consists of a warmup phase (running 20,000 iterations of adding/listing/editing/reading/deleting blog posts) to ensure the server JVM has ample opportunity to optimize the generated code, and then 50 concurrent clients completed a total of 500,000 iterations (100,000 iterations for the Redis WATCH/MULTI/EXEC event store implementation) of benchmarking. Note that this is a full-stack benchmark, including the network, HTTP protocol, request routing, the event store, and template rendering.
The results are summarized below (first number is total iterations per second, second number is the 99th percentile latency in milliseconds):
Event store | None | Fake | Redis Lua | Redis WME |
---|---|---|---|---|
Iterations | 500,000 | 100,000 | ||
Add post (POST) | 5330.5/s (47 ms) | 5154.2/s (59 ms) | 2617.4/s (64 ms) | 583.7/s (762 ms) |
Read post (GET) | 8284.1/s (31 ms) | 8176.1/s (32 ms) | 6755.8/s (36 ms) | 6641.7/s (38 ms) |
Edit post (GET-POST-GET) | 2145.0/s (55 ms) | 1845.1/s (64 ms) | 1178.7/s (96 ms) | 359.5/s (511 ms) |
List posts (GET) | 1290.8/s (98 ms) | 1259.6/s (147 ms) | 1174.7/s (155 ms) | 1165.5/s (150 ms) |
Java memory usage | 700 MB | 1750 MB | 700 MB | |
Redis memory usage | N/A | N/A | 958 MB |
As you can see there is only a slight difference between having no event store at all or the fake event store. The switch to the Lua-based Redis event
store causes a much bigger change. Committing events now requires us to wait for a disk write, and also adds the overhead of process switching and
commit serialization and deserialization. Still, the performance is not bad at all. When falling back to the WATCH/MULTI/EXEC instructions commit
performance drops again. With the Lua implementation Redis was still able to write multiple commits to disk using a single fsync
2, without Lua
each commit needs to wait for a full fsync
to complete due to the use of optimistic locking.
When a more complex page is rendered (listing the last 20 blog posts) the performance differences become negligible. It seems the bottleneck here is the speed at which we can render the page. I believe the Play! 2.0 template engine implementation could use a bit more optimization here.
After completing a benchmark run there are 1,060,000 events in the event store, almost 1 Gigabyte of data. How much time does it take to replay these
events on startup? On deler
it takes about 26 seconds to fully restore the memory image from the committed events (about 40,500 commits per
second). Noemer
is more than twice as fast, since it has more and faster processors to perform event deserialization, the primary bottleneck.
Now that we have a working event store implementation our application could go into production, except for the obvious lack of features. There are obviously some enhancements that could be made to the event store:
But for now this event store implementation is good enough.
If you feel that we needed to do a lot of work to get to this place, you’re partly right. But we did gain complete control and understanding of the basic infrastructure of our application! Compare this with having to understand SQL, database schemas, JDBC, your favorite Object-Relational Mapper, transaction managers, etc.
Of course, it still remains to be seen how easy or hard it is to add new application functionality. So in the next parts we’ll focus on adding new features, and some of the problems (with solutions) that you will quickly encounter when using event sourcing.
Footnotes:
A good exercise is to modify the event store’s tryCommit
method to return the result asynchronously. The implementation can then use a
dedicated thread pool for performing the actual commits, leaving the Play! action thread pool available for handling web requests. ↩
Another good exercise is to implement our own write-combining of commits. This should be pretty straightforward with the Lua event store implementation. All you need is a single, dedicated thread that continuously gathers all pending commits and sends these to Redis. Redis pipelining and the disruptor are a perfect match for this! ↩