After our deep dive into a Redis event store implementation we’re now getting back to actually adding functionality to the blogging application. Like the Getting started with Rails guide we’ll add the capability to add comments to blog posts. Adding this functionality is straightforward, but it will require us to look into resolving conflicts when multiple people make modifications to the same blog post or comment concurrently.
To add the new functionality to our application we will first define the new events and supporting data types. Notice that by focusing on the events we are actually thinking about the behaviour of the application first, instead of just the data model. In such a simple application as this blogging application it’s a rather subtle distinction, but when your application is more complicated events can help you to get a good understanding of what your application is supposed to do. Here are the event definitions for adding and deleting comments:
{% highlight scala linenos %} case class CommentId(value: Int) object CommentId { implicit val CommentIdFormat: Format[CommentId] = valueFormat(apply)(unapply) implicit val CommentIdOrdering: Ordering[CommentId] = Ordering.by(_.value) }
case class CommentContent(commenter: String, body: String)
// [...]
sealed trait PostCommentEvent extends PostEvent { def commentId: CommentId } case class CommentAdded(postId: PostId, commentId: CommentId, content: CommentContent) extends PostCommentEvent case class CommentDeleted(postId: PostId, commentId: CommentId) extends PostCommentEvent
object PostEvent { // [...]
implicit val CommentContentFormat: Format[CommentContent] = objectFormat(“commenter”, “body”)(CommentContent.apply)(CommentContent.unapply)
implicit val PostEventFormat: Format[PostEvent] = typeChoiceFormat( "PostAdded" -> objectFormat("postId", "content")(PostAdded.apply)(PostAdded.unapply), "PostEdited" -> objectFormat("postId", "content")(PostEdited.apply)(PostEdited.unapply), "PostDeleted" -> objectFormat("postId")(PostDeleted.apply)(PostDeleted.unapply), "CommentAdded" -> objectFormat("postId", "commentId", "content")(CommentAdded.apply)(CommentAdded.unapply), "CommentDeleted" -> objectFormat("postId", "commentId")(CommentDeleted.apply)(CommentDeleted.unapply)) } {% endhighlight %}
Since comments are always part of a blog post, we’ll can use a simple sequential comment identifier. The first comment of a post will have id
CommentId(1)
, the second one CommentId(2)
, etc. Obviously, we could also generate UUIDs for comments (and it would actually simplify the code),
but by working with a sequential identifier we’ll slowly introduce some "domain" logic into our example application. We’ll also use CommentId
as a
key in a sorted map, so we need to define the Ordering (line 4), which is
based on the underlying numeric value.
CommentContent
is a simple container for the name of the commenter and the comment text.
The CommentAdded
and CommentDeleted
events are both subtypes of the PostCommentEvent
trait, which in turn extends PostEvent
. This makes it
easier to distinguish comment related events which will be useful for resolving conflicts automatically.
To store the events in the event store, we also define and extend the
necessary Format
instances.
Now that we have defined the new events, we can adjust our view models to keep track of comments as part of the post class:
case class Post(
id: PostId,
revision: StreamRevision,
content: PostContent,
nextCommentId: CommentId = CommentId(1),
comments: SortedMap[CommentId, CommentContent] = SortedMap.empty)
case class Posts(byId: Map[PostId, Post] = Map.empty, orderedByTimeAdded: Seq[PostId] = Vector.empty) {
// [...]
def update(event: PostEvent, revision: StreamRevision): Posts = event match {
// [...]
case CommentAdded(id, commentId, content) =>
modify(id) { post =>
post.copy(
revision = revision,
nextCommentId = CommentId(commentId.value + 1),
comments = post.comments.updated(commentId, content))
}
case CommentDeleted(id, commentId) =>
modify(id) { post =>
post.copy(
revision = revision,
comments = post.comments – commentId)
}
}
private[this] def modify(id: PostId)(f: Post => Post) =
this.copy(byId = byId.updated(id, f(byId(id))))
}
The Post
class is modified to keep track of the next available CommentId
(starting at 1) and also tracks the comments, using a SortedMap
1
from CommentId to CommentContent. Remember that the representation of the view model is not tied to any kind of database schema, so we can easily
change it whenever we need to. The contents of the memory will be automatically rebuild whenever we restart the application!
The update
method also has to be changed to match against the new events and update the Post
class accordingly. Updating nested, immutable data
structures is a bit more involved than the mutable equivalent, so we use a simple helper method to take care of the first two levels of nesting. There
are more general solutions (PDF) to this problem, but for now the modify
method
will do.
Next we add the required routes to conf/routes:
POST /posts/:postId/comments ⏎
controllers.PostsController.comments.add(postId: PostId, r: StreamRevision)
POST /posts/:postId/comments/:commentId/delete ⏎
controllers.PostsController.comments.delete(postId: PostId, r: StreamRevision, commentId: CommentId)
Listing, deleting, and adding comments is all part of the show.scala.html template.
@if(post.comments.nonEmpty) {
## Comments
@for((commentId, commentContent) <- post.comments) {
<div style="display: inline-block;">
<form class="form-inline" style="display: inline-block;" action="@routes.PostsController.comments.delete(post.id, post.revision, commentId)" method="POST">
<fieldset><button>×</button></fieldset>
</form>
@commentContent.body — _by @commentContent.commenter_
</div>
}
}
## Add a comment:
@helper.form(action = routes.PostsController.comments.add(post.id, post.revision)) {
@globalErrorsPanel(form)
@conflictsMessagePanel(conflicts)
<fieldset>@helper.inputText(form("commenter"), ‘_label -> "Commenter", ‘required -> "required")
@helper.textarea(form("body"), ‘_label -> "Body", ‘cols -> 80, ‘rows -> 10)
</fieldset>
<fieldset><button class="btn btn-primary">Submit</button></fieldset>
}
As you can see, the template is quite straightforward. The only thing that might be unfamiliar is the invocation of the conflictsMessagePanel
template, which we’ll get back to later.
Finally, we need to implement two new controller methods, which can be found in
the comments singleton object
inside of PostsController
:
{% highlight scala linenos %} // [...]
private[this] def withPost(postId: PostId)(found: Post => Result)(implicit request: Request[_]) = { posts().get(postId).map(found).getOrElse(notFound(request)) }
object comments { val commentContentForm = Form(mapping( "commenter" -> trimmedText.verifying(minLength(3)), "body" -> trimmedText.verifying(minLength(3)))(CommentContent.apply)(CommentContent.unapply))
def add(postId: PostId, expected: StreamRevision) = Action { implicit request => withPost(postId) { post => commentContentForm.bindFromRequest.fold( formWithErrors => BadRequest(views.html.posts.show(post, formWithErrors)), commentContent => commit(expected, CommentAdded(postId, post.nextCommentId, commentContent))( onCommit = Redirect(routes.PostsController.show(postId)).flashing("info" -> "Comment added."), onConflict = (actual, conflicts) => Conflict(views.html.posts.show(post, commentContentForm.fill(commentContent), conflicts)))) } }
def delete(postId: PostId, expected: StreamRevision, commentId: CommentId) = Action { implicit request => withPost(postId) { post => def deletedResult = Redirect(routes.PostsController.show(postId)).flashing("info" -> "Comment deleted.") post.comments.get(commentId) match { case None => deletedResult case Some(comment) => commit(expected, CommentDeleted(postId, commentId))( onCommit = deletedResult, onConflict = (actual, conflicts) => Conflict(views.html.posts.show(post, commentContentForm, conflicts))) } } } } {% endhighlight %}
The code is a bit more complicated than the basic post actions, since adding and deleting comments requires the presence of a blog post instance. The
withPost
helper method is used to read the required post from the memory image, or render a 404 Not Found
result if the post is not present.
In the case of the add
method we then validate the submitted form (line 14). If incorrect, we rerender the page with the error messages (line
15). Otherwise we commit a new CommentAdded
event using the provided content and the next available comment id (line 17). If the commit succeeds
without conflict, we redirect the user to the blog post with the added comment (line 18). In case there is a conflict, we rerender the form but add
some information on the conflicts that occurred (line 19). The delete action is very similar.
This basically completes the addition of some new functionality. The effort required is comparable to a traditional database backed application, which is good to know. But there is one fly in the ointment we need to fix before we can call it a day...
Now that we can add comments to post you’ll soon discover that multiple concurrent users will quickly get a conflict. In part 2 we added conflict detection and rendered a placeholder page whenever a conflict occurred. Now that conflicts are more likely to occur, we need to be a bit smarter about resolving these conflicts.
The basic idea is that comment events usually do not conflict, even when they apply to the same post and therefore the same event stream. We can capture this knowledge in a method like this:
{% highlight scala linenos %} def conflictsWith(committed: PostEvent, attempted: PostEvent) = (committed, attempted) match { case (a: PostCommentEvent, b: PostCommentEvent) => a.commentId == b.commentId case (_: PostCommentEvent, _) => false case _ => true } {% endhighlight %}
This method states that any two PostCommentEvent
s only conflict when they affect the same comment (line 3). Furthermore, any new blog post related
event does not conflict with an already committed PostCommentEvent
(line 4). This allows the author to edit the blog post without getting conflicts
on added or removed comments. Any other event combination is considered to conflict (line 5). So if you add a comment while someone edited the post,
the system will give you a warning and ask you to resubmit your comment, if it is still applicable.
Notice that the main goal of this method is to decide whether we should ask the user for confirmation when a conflict occurs, or whether we should just proceed with the commit even though changes were made while the user was working on their request. This is rather subjective and the details will vary depending on your domain, your users, etc.
So now that we have a way to decide if two events conflict or not we need to modify our commit method to take this into account. This method is
defined in the PostsController
:
{% highlight scala linenos %}
/**
* Commits an event and applies it to the current state. If successful the
* provided onCommit
callback is invoked and its result returned. Otherwise
* the onConflict
is callback is invoked and its result returned.
*/
private[this] def commit
(expected: StreamRevision, event: PostEvent)
(onCommit: => Result,
onConflict: (StreamRevision, Seq[PostEvent]) => Result): Result = {
def resolveConflict(committed: Seq[PostEvent], attempted: PostEvent) = {
val conflicting = committed.filter(PostEvent.conflictsWith(_, attempted))
if (conflicting.isEmpty) Right(attempted)
else Left(conflicting)
}
@tailrec def run(expected: StreamRevision, event: PostEvent): Result = memoryImage.tryCommit(event.postId.toString, expected, event) match { case Right(commit) => onCommit case Left(conflict) => resolveConflict(conflict.conflicting.flatMap(_.events), event) match { case Right(event) => run(conflict.actual, event) case Left(conflicting) => onConflict(conflict.actual, conflicting) } }
run(expected, event) } {% endhighlight %}
The new commit
method implementation first defines a resolveConflict
helper method (line 10), which takes a list of already committed, potentially
conflicting events and uses the PostEvent.conflictsWith
method to see if there are any real conflicts. If there are none, the attempted event is
returned. Otherwise the conflicting events are returned.
The run
method (line 16) runs in a (tail-recursive) loop. It tries to commit against the expected revision of the event stream. If there is no
conflict, it returns the result of the provided onCommit
callback. Otherwise it tries to resolve the conflicts. If this succeeds, it tries invokes
itself again2, but now with the latest known event stream revision as the expected revision. If the conflict cannot be resolved, the result of the
onConflict
callback is returned instead.
Finally, line 27 simply kicks off the entire process using the provided revision and event.
With this in place actual conflicts should be quite rare. But we can still do better than just showing a generic "there was a conflict" error page. This is the job of the conflictsMessagePanel.scala.html template. This template shows a human readable version of the conflicts that occurred:
Besides adding support for conflict resolution, the implementation of the blog post comment functionality was quite straightforward. In a blogging application conflicts are probably quite rare, so it may not make sense to build in extensive UI support for this, but having this as an example hopefully gives you some idea of what is possible. In more collaborative applications this kind of functionality is much more interesting and you may prefer to immediately push updates directly to the client, instead of waiting for the client to submit a form. An example of this is Pivotal Tracker or Apache Wave.
If your application has extreme availability requirements, similar conflict resolution can also help you deal with recovering from network partitioning. Or applications that need to be able to run in disconnected mode. In these cases you will need to write an event stream function that merges divergent event streams. Version control systems are examples of this, and can be a source of inspiration, although they usually don’t have intention revealing events to work with.
But the main point is that the level of conflict resolution you need depends on your users and your application. Event sourcing gives you a great tool to make conflicts easier to resolve, without necessarily complicating your application if you do not need this kind of functionality.
In the next part we’ll take a look at another kind of concurrency conflict that can occur in the current application when two or more users committing events to the same event stream nearly simultaneously.
Footnotes: