Event Stream as a Message Queue

I was recently having a discussion around a system being built using Microsoft Azure.  Some concepts being discussed for this system where CQRS, Event Sourcing and Message Queue.

The diagram below is fairly typical when discussing CQRS and Event Sourcing.

Message Queue

One of the first things that stood out to me was the use of the Message Queue and Azure Service Bus.

For this blog post, I want to focus on the Service bus, which is used for publish-subscribe pattern.  The domain will emit events that are stored to the event stream and then will be published to the Service Bus.  Subscribers, such as Projections or other Bounded Contexts will process these events.

Forgotten Option

There is nothing wrong with using a service bus to publish domain events.

However, one option which is seemingly always forgotten to developers that are new to CQRS and Event Sourcing:

Your event stream can be used as a message queue

Other bounded contexts or projections can query/poll your event storage to retrieve new events that have been persisted.

At regular intervals, the event consumer could poll your event storage requesting any number of new events based on the last event it processed.

The consumer would be required to keep track of the last event it processed in order.  This provides some benefits as it may not be a process that is required to be continuously running.

You may be thinking: Polling? Really?

If you rolled your own event storage, I could understand how this might be problematic and would likely be easier to use a service bus.  Or you may want your event consumers to process the event as soon as possible.  Your implementation of how to handle this is dependant on how you are currently storing your events.

But the point still remains: Your event stream is a message queue.

As always, context is king.  Requirements and many other factors will play into how you want to handle messaging.

It is another option that may fit a scenario that you run into.

Event Store

Event Store

If you are just thinking about getting into Event Sourcing, I would highly recommend looking at Event Store by Greg Young as your event storage.

Event Store supports multiple types of Subscriptions including Persistent Subscriptions for the Competing Consumers messaging pattern.

How to get started with CQRS & Event Sourcing

hammernailWhen I first heard about CQRS & Event Sourcing concepts through various blogs and videos, primarily from Greg Young and Udi Dahan, I wanted to apply it everywhere.

It seems really natural to want to take the limited knowledge we have about a new concept or technology and try and apply it to any problem.

Most of us know this is a terrible idea, but we are so tempted to push the concepts on a problem that it just doesn’t fit.

I realize I’m grouping CQRS and Event Sourcing together in this post.  Generally, if you are applying Event Sourcing then you are doing CQRS.  The other way around isn’t always true.

The Problem

In the very beginning I had a difficult time seeing where CQRS and Event Sourcing were best applied.  Again, as a beginner with our new shiny new knowledge we have our blinders on.

We look at any new problem and try to make it fit what we think CQRS and Event Sourcing solves.

There are some pretty good examples scattered around the internet about different situations that CQRS and Event Sourcing have been applied.  You may be able to read these examples and get a better understanding about what the context and how it was a good fit.  Also, you may read horror stories about how it was a complete failure and why.

To be honest, I haven’t read that many horror stories as I assume people are too afraid to admit their failures, although it would be very helpful to the community.

Feeling the Pain

I could read all kinds of blogs about where not to apply CQRS & Event Sourcing, but I need to feel the pain first hand to really understand where it’s best used.

I needed to apply the concepts I read about in a meaningful project.

For me a meaningful project was a side-project I worked on after hours at home that was large enough in scale that wasn’t incredibly trivial.

The learning experience of failure (and success) gave me greater insights and a deeper understanding to the words I’ve previously read from others discussing CQRS & Event Sourcing.

Practice

Start using CQRS and Event Sourcing in a project that is meaningful to you.  Obviously, I’m not recommending it in a project at work.  Do it in on a project where you can fail.  Feel the pain of applying these concepts where they don’t provide enough value or are cumbersome (hint: CRUD).

With real practice, your understanding matures and not everything will look like a nail with your shiny new hammer.

Clean up your Domain Model with Event Sourcing

Vacuum cleaner - Workers collectionI recently had a discussion with a developer who was new to Domain Driven Design, CQRS and Event Sourcing.  They were using Greg Young’s Simple CQRS Example to create there own simple app as learning exercise.

After taking a quick look at their aggregate root, one thing that immediately stood out to me was a class field that contained current state.  Why this is interesting is because this field was not in any invariant.

When I first started reading and exploring Event Sourcing, this was a big “ah ha!” moment for me.

The realization that only certain data within my domain model was really important.

We will use a simple shopping cart example.

First, lets look at what our shopping cart would look like without being event sourced. We would be probably persisting this to a relational database using an ORM or perhaps storing it in a NoSQL document store.

public class ShoppingCart
{
    public Guid Id { get; set; }
    public IList<ShoppingCartLineItem> LineItems { get; set; }
    public bool IsComplete { get; set; }
    public DateTime CompleteDate { get; set; }

    public ShoppingCart(Guid id)
    {
        Id = id;
        LineItems = new List<ShoppingCartLineItem>();
    }

    public void AddProduct(string productSku, int quantity)
    {
        if (string.IsNullOrEmpty(productSku)) throw new ArgumentException("productSku");
        if (quantity <= 0) throw new ArgumentException("quantity");

        LineItems.Add(new ShoppingCartLineItem(productSku, quantity));
    }

    public void Complete()
    {
        if (IsComplete) throw new InvalidOperationException("Already completed.");
        if (LineItems.Any() == false) throw new InvalidOperationException("Cannot complete with no products.");

        IsComplete = true;
        CompleteDate = DateTime.UtcNow;
    }
}

There are a few things to point out in the above code.

Take note of the public properties which represent our current state.  These are the pieces of data that we be persisted to data storage.

Notice we have a IList<ShoppingCartLineItem> which is our collection of products that are in our shopping cart.  The ShoppingCartLineItem class contains the Product SKU and the quantity ordered.

One invariant in this example to mention is in the Complete() method.  There must be at least one product added to the shopping cart in order to mark it as complete.

if (LineItems.Any() == false) throw new InvalidOperationException("Cannot complete with no products.");

Next, let’s implement the above with event sourcing.

public class ShoppingCart : AggregateRoot
{
    private Guid _id;
    private bool _completed;
    private DateTime _completedDate;
    private readonly IList _lineItems;
 
    public override Guid Id
    {
        get { return _id; }
    }

    private void Apply(ShoppingCartCreated e)
    {
        _id = e.Id;
    }

    private void Apply(ProductAdded e)
    {
        _lineItems.Add(new ShoppingCartLineItem(e.ProductSku, e.Quantity));
    }

    private void Apply(Completed e)
    {
        _completed = true;
        _completedDate = e.DateCompleted;
    }

    public ShoppingCart(Guid id)
    {
        _lineItems = new List();

        ApplyChange(new ShoppingCartCreated(id));
    }

    public void AddProduct(string productSku, int quantity)
    {
        if (string.IsNullOrEmpty(productSku)) throw new ArgumentException("productSku");
        if (quantity <= 0) throw new ArgumentException("quantity");

        ApplyChange(new ProductAdded(_id, productSku, quantity));
    }

    public void Complete()
    {
        if (_completed) throw new InvalidOperationException("Already completed.");
        if (_lineItems.Any() == false) throw new InvalidOperationException("Cannot complete with no products.");

        ApplyChange(new Completed(_id, DateTime.UtcNow));
    }
}

In our event sourced implementation above we still are holding current state in the same fashion as our non event sourced version.  I’ve changed them from public properties to private fields since they are not being persisted to data storage.  Instead our events raised through ApplyChange() are what are persisted to data storage.

The example above is typical of a first implementation of event sourcing.  We are taking the concept we know and use of persisting current state along with this new idea of using events as state transitions.

There is actually a lot of code that can be removed from our event sourced example above.

ShoppingCartLineItem class is no longer needed because the data it encapsulates is now encapsulated in our ProductAdded event.  So if we delete that class, then we have the issue with:

private readonly IList<ShoppingCartLineItem> _lineItems;

This private member will no longer be valid since we just deleted the ShoppingCartLineItem class.  So we can remove this private member as well.

Now let’s go back to that invariant that we have:

if (_lineItems.Any() == false) throw new InvalidOperationException("Cannot complete with no products.");

How can we maintain this invariant since we are no longer keep track of the line items?

You only need the data applicable to an invariant

Because of this, we don’t really care what products or quantities are on the shopping cart, just that there is at least one.

Any data that is not used by an invariant does not need to be in your aggregate.

private DateTime _completedDate;

We also no longer need the date the shopping cart was completed because it is not used by an invariant and it is also contained in the Completed event.

Now let’s take a look at our shopping cart with these ideas applied:

public class ShoppingCart : AggregateRoot
{
    private Guid _id;
    private bool _containsProducts;
    private bool _completed;

    public override Guid Id
    {
        get { return _id; }
    }

    private void Apply(ShoppingCartCreated e)
    {
        _id = e.Id;
    }

    private void Apply(ProductAdded e)
    {
        _containsProducts = true;
    }

    private void Apply(Completed e)
    {
        _completed = true;
    }

    public ShoppingCart(Guid id)
    {
        ApplyChange(new ShoppingCartCreated(id));
    }

    public void AddProduct(string productSku, int quantity)
    {
        if (string.IsNullOrEmpty(productSku)) throw new ArgumentException("productSku");
        if (quantity <= 0) throw new ArgumentException("quantity");

        ApplyChange(new ProductAdded(_id, productSku, quantity));
    }

    public void Complete()
    {
        if (_completed) throw new InvalidOperationException("Already completed.");
        if (_containsProducts == false) throw new InvalidOperationException("Cannot complete with no products.");

        ApplyChange(new Completed(_id, DateTime.UtcNow));
    }
}

The invariant is now based on a boolean set when a product has been added.  The aggregate does not care about what products were added or their quantity.  It doesn’t even need to have them in a collection.

With the implementation of event sourcing in our aggregate, we get a much clearer view of what data drives business logic and is important in our domain model.