Event Store Persistent Subscriptions Demo

Event Store Persistent SubscriptionsIn my previous blog post, I talked about Catch-Up Subscriptions in-comparison to Persistent Subscriptions in Event Store.

I’ve been meaning to create a little demo as I didn’t find very much in my limited searching.

Specifically, I wanted to create a console app that would contain the subscription client  and another console app (event writer) that would write events to a stream.

This way you could run the subscription client multiple times, then run the event writer and see how the events are only received from one subscription client.

Source

All the source code for this demo is available on GitHub.  Please take a look to follow along as the rest of this post is describing how it works.

Although I’ve added this to as a new repo on GitHub, It is also available in the EventStore.Samples.DotNet repo on GitHub along with other subscription model examples.

Server

If you don’t have it already, download a copy of Event Store and run EventStore.ClusterNode.exe with default arguments.

This will start the Event Store server under port 1113.  The web UI will be accessible via http://localhost:2113

Create Subscription

First you need to create a subscription to a stream.  You will also provide a group name which you will also use when connecting to the subscription.

In my demo I’ve included code to create the subscription.  This is simply because I don’t want anyone running the demo to have to create it manually. The documentation discourages it from being created in your general application code.

Normally the creating of the subscription group is not done in your general executable code. Instead it is normally done as a step during an install or as an admin task when setting things up. You should assume the subscription exists in your code.

The web UI has a pretty nice interface for viewing your subscriptions as well as creating and updating them.

Event Store

Connecting to Persistent Subscription

When connecting to a persistent subscription, you must specify the stream and the group in which you use to create the subscription.

There are two other optional parameters which I’ve found to be important: bufferSize and autoAck

bufferSize: The number of in-flight messages this the server will send to the client.

autoAck: The client API will automatically acknowledge after the EventAppeared method returns.  If an exception is thrown in the EventAppeared method, the subscription will drop your subscription.

Dropped Subscription

I found it helpful to have the subscription reconnect when it is dropped.

Another case where the subscription can be dropped is if changes to the subscription are made from another client or the web UI.

EventAppeared

Method is invoked once the subscription receives an event. As you can assume the ResolvedEvent contains the event and the eventStoragePersistentSubscriptionBase allows you to acknowledge or not acknowledge (fail) a ResolvedEvent or a array of ResolvedEvents.  If you have autoAck then you do not need to acknowledge.

Running Demo

To run the demo, simply build the solution to generate the exe’s.

Run multiple instances of PersistentSubscription.exe

Multiple instances will allow you to see how each will connect to the subscription and only receive one of the events that is written to the stream.

consumer

You can also now see in the Web UI that there are two connections.

Connections

Now if you run the WritingEvents.exe it will write 100 events to the stream.

The PersistentSubscription.exe will now have received the events.  But only one event will be sent to a client.  you can see that as our subscription is setup for Round Robin.

Results

Results

Source Codegithub-octocat

The source code for this demo is available on GitHub.

Event Store Persistent Subscriptions

Event StoreOne of the really nice features of Event Store is the Persistent Subscriptions that were implemented in v3.2.0.  I was previously using catch-up subscriptions but needed the ability to have many worker processes handle events from the same stream.

First lets take a look at couple of the subscription models Event Store suports.

Catch Up Subscriptions

As mentioned, I previously would use catch-up subscriptions for various use cases.  One of them would be for sending emails on specific events occurring in the system.

A worker process would subscribe to the $all event stream and handle incoming messages accordingly.

The issue is that catch up subscriptions are controlled by the client.  The start position of the subscription is stored by the client.  This information must be maintained by the client.

This means I did not have a built-in way to have multiple worker processes handle events from the same stream but only processing each event only once.

Notice I wrote built-in above.  Yes I could of created shared datasource that each worker process would share in order to determine which messages where being handled by which process.

I didn’t want to go down this road.

The next option would be to have multiple worker processes but each worker process would only be handling a single or small subset of events.

Persistent Subscriptions

queueThe competing consumers messaging pattern is implemented using Persistent Subscriptions.

They differ from catch-up subscription as the position of which event to publish is kept on the server rather than the client.

If you have used RabbitMQ or ActiveMQ, the Persistent Subscriptions in Event Store will feel similar.

I think the title Competing Consumers really does illustrate how it works.

You can have many work processes (Consumers) but only one of them will receive an event from the server.

Lets say we have 3 worker processes which are all subscribing to the same subscription.  When an event is published from the server, only one worker process will receive that specific event.  Not all 3.

In the catch-up subscription model, all 3 would receive the same event.

What this allowed me to do in my email example, is have multiple worker processes handle specific events.  And if I wanted to create multiple subscriptions for single or small subset of events, as I could with catch-up, I can do the same with persistent subscriptions but now have multiple worker processes as well.

Groups

When you create a persistent subscription, you will define a group name and the event stream you want to subscribe to.

When your client (consumers) want to use this subscription, they specify the same group name and the stream.

Acknowledge/Not Acknowledge/Timeout

The server will dispatch only one event to a consumer at any given time.  The event at this point is marked as in-process.  The client will acknowledge it has processed the message.

I’ve rewritten the above thanks to Greg Young’s correction in the comments.

The server will dispatch N events to the consumer at any given time.  The number of events is configurable when connecting to the subscription as you can define a buffer size which is the number of in-flight messages the client is allowed.

It can also not acknowledge the message, or if a timeout period expires without either a retry can occur.

Demo

I’m in the middle of putting together a demo of using persistent connections in action.  If this is something you have been looking for, let me know as I would love to discuss more about some of the use cases and a few of the quirks I have discovered while starting to dive into the .NET API.

Event Stream as a Message Queue

I was recently having a discussion around a system being built using Microsoft Azure.  Some concepts being discussed for this system where CQRS, Event Sourcing and Message Queue.

The diagram below is fairly typical when discussing CQRS and Event Sourcing.

Message Queue

One of the first things that stood out to me was the use of the Message Queue and Azure Service Bus.

For this blog post, I want to focus on the Service bus, which is used for publish-subscribe pattern.  The domain will emit events that are stored to the event stream and then will be published to the Service Bus.  Subscribers, such as Projections or other Bounded Contexts will process these events.

Forgotten Option

There is nothing wrong with using a service bus to publish domain events.

However, one option which is seemingly always forgotten to developers that are new to CQRS and Event Sourcing:

Your event stream can be used as a message queue

Other bounded contexts or projections can query/poll your event storage to retrieve new events that have been persisted.

At regular intervals, the event consumer could poll your event storage requesting any number of new events based on the last event it processed.

The consumer would be required to keep track of the last event it processed in order.  This provides some benefits as it may not be a process that is required to be continuously running.

You may be thinking: Polling? Really?

If you rolled your own event storage, I could understand how this might be problematic and would likely be easier to use a service bus.  Or you may want your event consumers to process the event as soon as possible.  Your implementation of how to handle this is dependant on how you are currently storing your events.

But the point still remains: Your event stream is a message queue.

As always, context is king.  Requirements and many other factors will play into how you want to handle messaging.

It is another option that may fit a scenario that you run into.

Event Store

Event Store

If you are just thinking about getting into Event Sourcing, I would highly recommend looking at Event Store by Greg Young as your event storage.

Event Store supports multiple types of Subscriptions including Persistent Subscriptions for the Competing Consumers messaging pattern.