Testing your Domain when Event Sourcing

How do you test your domain when Event Sourcing? I find testing aggregates with Event Sourcing to be simpler than testing if you’re storing the current state. The inputs to your aggregate are events and the output of your aggregate are events.

Given a stream of events
When a valid command is performed
Then new event(s) occurs

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post.

Event Sourcing

I’ve covered Event Sourcing in my Event Sourcing Example & Explained in plain English post. So check that out if you need a more detailed primer.

Generally, I use Aggregates as a way to model my domain. The aggregate is what exposes commands and if those commands are called/invoked will result in creating new events. This general concept is how testing in event sourcing becomes simple.

In order to use an aggregate, you first need to pull all the existing events from the Event Store, replay them within the Aggregate to get to the current state, then return the Aggregate to the caller. To do this, I generally use a Repository that will do this work and build the Aggregate.

To illustrate this, we have client code that is using the repository to get the Aggregate. The repository calls the Event Store to get the existing Events (if any) for this specific aggregate.

Testing your Domain when Event Sourcing

At this point the Repository will create an empty aggregate and replay all the events it received from the Event Store.

Testing your Domain when Event Sourcing

Once it has rebuilt the aggregate, it then returns it back to the Client. The client at this point will likely call various methods/commands on the aggregate.

This was the first stage of the process. I call this the rehydrating stage. You’re rehydrating your aggregate back to its current state with all the past events. Remember this first stage as the “Given” as we start testing in event sourcing.

Creating Events

The rest of the example is using a Warehouse Product, which is the concept of a product that is in a warehouse.

Now that the client code has an aggregate, it will likely perform one or more commands on the aggregate.

If the client calls ShipProduct() command on the aggregate, and the aggregate is in a valid state to do so and passes all invariants, then it will create a new event that it will keep internally within it.

If the client code then called another command, another event would be appended to the internal list.

This is the second stage of the process where we’ve created new events which are the result of calling commands on our aggregate. Remember this second stage as the “When” stage of testing with event sourcing.

Saving Events

The last step is taking the newly created events that are in the aggregate and persisting those to the event store.

Testing your Domain when Event Sourcing

This means the client code will call back to our Repository passing it the Aggreagate.

Testing your Domain when Event Sourcing

The repository will get the new events and append those to the Event Store for that specific aggregates event stream.

Remember this stage as the “Then” in our tests when Event Sourcing.

Given, When, Then

If you take that basic 3 steps of loading an aggregate, calling commands, saving the new events, you can boil that down to:

Given a stream of events
When a valid command is performed
Then new event(s) occurs

You can use this as the testing strategy for testing your Aggregates.

The WarehouseProduct above has 3 commands: ShipProduct, ReceiveProduct, and AdjustInventory. All of which result in creating their respective events if they passed any invariants.

To illustrate this Given, When, Then for the ShipProduct command, which should create a ProductShipped Event.

While that does satisfy our goal, it’s a bit cumbersome to have to write this to verify the event. To simply and make this a bit more natural to read, I created a base class to use within our tests.

Now using this abstract class, here are all the tests for the WarehouseProduct.

Source Code

Developer-level members of my CodeOpinion YouTube channel get access to the full source for any working demo application that I post on my blog or YouTube. Check out the membership for more info.

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.

Snapshots in Event Sourcing for Rehydrating Aggregates

Once you understand how Event Sourcing works, the most common thought is: “What happens when you have a lot of Events? Won’t it be inefficient to fetch every event from the event stream and replay all of them to get to the current state?”. It might be. But to combat this, you can use snapshots in event sourcing to rehydrate aggregates. Snapshots give you a representation of your aggregates state at a point in time. You can then use this as a checkpoint and then only replay the events since the snapshot.

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post on using Snapshots in Event Sourcing.

Large Event Streams

I’d guess in most situations, you won’t have a stream with a lot of events because you’ll have a lifecycle to the stream. Meaning an event stream usually has a life where it being and ends. For example, an Order as a lifecycle from when from the OrderPlaced to maybe an OrderShipped. After the order has been shipped, there likely are no more events for that specific order that will occur. However, you could have a stream that is opened ended or possibly has a really long life, such as maybe a product in a warehouse. That event stream could be long-lived as long as you keep selling and having a given product in the warehouse. In these situations, you may have thousands or millions of events depending on the context.

When you have a lot of events, the challenge is you generally will fetch from the Event Store all the events from the very beginning of the stream, and then replay them in your aggregate to build up to the current state. This could take an undesirable amount of time if you have to fetch and replay a significant amount of events.

Snapshots in Event Sourcing

Snapshots are a way of solving this issue by recording the state of an aggregate at a point in time. You then store this state in a separate event stream. Along with the state, you record the version of the last event you processed that represents this state.

For illustration, you can see that the snapshot has our current state, which we’re recording the total quantity on hand. It also captures version (3) which is the last event in our event stream. For my example, I’m creating a snapshot on every 4th event. In reality, at what interval you create a snapshot is totally dependent on your situation. This could be hundreds.

Once more events are added to our event stream, we create new snapshots of the state and keep track of the version they pertain to.

Rehydrating Aggregate

Now instead of replaying all the events from the beginning of the event stream, what we do is first look at the snapshot stream and see if there are any events. But you do so reading the stream backward! So in the example above, we would get the last snapshot in the stream, which has a State with the Qty: 87 and the Version = 11. We will pass the state into our aggregate, then we will query the event stream and start at Version 12. In my example above, it would then retrieve 3 events from that point, which would be replayed in our aggregate.

Here is some example of code from a Repository that is rehydrating an aggregate. That repository is using EventStoreDB as the EventStore.

Before you go down this road, make sure you absolutely need snapshots. They might not be required. Or perhaps you need to implement them for a specific type of event stream that is more open-ended. Keep track of metrics related to how long it takes to replay your aggregates, how many events typically in a stream, then decide if it’s worth adding.

Source Code

Developer-level members of my CodeOpinion YouTube channel get access to the full source for the working demo application available in a git repo. Check out the membership for more info.

Additional Related Posts

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.

Projections in Event Sourcing: Build ANY model you want!

Projections in Event Sourcing are a way to derive the current state from an event stream. This can be done asynchronously as events are persisted to an event stream which can update a projection. You don’t need to replay all the events in an event stream to get to the current state for a UI every time you need to display data. This could be very inefficient if you have a lot of events in a stream. Rather, create a projection that represents the current state and keep it updated as events occur.

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post.

Projections in Event Sourcing

Projections are a primary pattern you’ll likely use with Event Sourcing. It’s the answer to the most common question I get about event sourcing which is: How do you build out UI? If you have to replay all the events in an event stream to get to the current state, isn’t that very inefficient when it comes to displaying in a UI or reporting?

The answer, depending on your situation, could be yes. It could be very inefficient to have to replay an entire event stream to get to the current state.

This is where projections come in.

Data Transformation

Really a projection is transforming an event stream into another model. That other model could be almost anything depending on the events in your stream.

This is a stream of events for a specific product in a warehouse. Our event streams are per unique aggregate. In this case it’s for the product that’s identified with a SKU of ABC123

We can turn these series of events into a model that could be used for display purposes. The most obvious is probably to show users the current quantity on hand.

If we process these events we can derive these events into a current state that looks like this:

Our current state for the quantity is 59. If we process each event and keep track of the quantity (10 + 5 – 6 + 50) we would come to this final state.

The beauty of event sourcing is that you can create many different models. For example, we could also derive the event stream into this state:

Projections in Event Sourcing

In the above, we’re simply breaking out by keeping track of Received, Shipped, and Adjusted all separately.

As another example, we could keep track of product aging. Meaning how long is the oldest product in the warehouse from when it was received.

Event Consumers

Now before we get to actually building projections, you need to deliver/publish events to consumers that will process those events to update their projection of the current state.

There are a couple ways to accomplish this.

The first is to simply use a message broker that publishes events after they are saved to the event stream.

If you’re crossing a boundary, you likely don’t want to expose the event your persisting to your event stream. That would be leaking data internal to your domain. You’d likely want to transform that event into an integration event that you have versioning and contracts defined for.

The second option is if your consumers are within your boundary, and your database/event store supports it, is to use the event stream directly.

Products like EventStoreDB support two types of subscriptions: Persistent and Catch-up.

Subscriptions

Persistent subscriptions mean that you have competing consumers to a subscription group. As events occur, the event will be published to one consumer in the group that will process the message. This is similar to how you would use a message broker, except the event store is the broker. The event store keeps track of which subscription group has processed which message in the stream.

Catch-up subscriptions work a bit differently as the consumer must ask the event store to send events from a particular version onwards. The consumer must keep track of which message (version) of the stream it has processed. Once the consumer is caught up and processed all the messages that have occurred since the one it requested, the event store will send new messages to the consumer. Again, the consumer must keep track of which index/version it has processed because once it re-connects (for whatever reason) it needs to tell the event store where to start in the event stream.

Source Code

Developer-level members of my CodeOpinion YouTube channel get access to the full source in this post available in a git repo. Check out the membership for more info.

Building Projections

For my example, I’m using the first example I showed with a project that is keeping track of the current quantity for a product (by SKU).

I’m using Entity Framework and here’s my simple Entity and DbContext.

Every time a new event is appended to our event stream, it will publish this event which will be consumed by our ProjectionBuilder.ReceiveEvent()

ReceiveEvent() will determine which event type it is, then call the appropriate Apply() method. Each Apply() method fetches the appropriate record from our database, then updates the appropriate property/column. Then of course save the changes.

Now if we wanted to display the quantity on hand for a product, we would simply query DbContext by SKU and have to Subtract the Shipped from the Received amount. We do not need to go to the event store, reply to all the events to get to the current state. We already have it.

Demo App

I’ve created a simple console application that has all the code above in an interactive way. Developer-level members of my CodeOpinion YouTube channel get access to the full source and demo available in a git repo. Check out the membership for more info.

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.