Competing Consumers Pattern for Scalability

The Competing Consumers Pattern enables messages from Message Queues (or Topics) to be processed concurrently by multiple Consumers. This improves scalability, availability but also has some issues that you need to consider such as message ordering and moving the bottleneck.

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post.

Competing Consumers Pattern

Message Queues are a great way to offload work to be that can be handled separately by another process. Meaning if you have a web application that takes an HTTP request, it can add a new message to the queue and then immediately return to the client. Since the work is being done asynchronously, the web application is non-blocking because the work is being done in another process.

In this example, the client/browser makes an HTTP call to the web application.

If the request itself can be done asynchronously and the client doesn’t need a consistent response, instead of performing the actual work, it creates a message.

That message is then sent to the message queue from the API, which is our Producer.

Then the API/Producer can return back to the client.

The actual work to be done will be handled by a Consumer. A consumer will get the message from the Message Queue and perform the work required.

As the consumer is processing the message, our API could still be creating new messages from HTTP Requests.

And producing more and more messages that are being put into the queue.

At this point, we have 3 messages in the queue. Once the consumer is done processing the message, it will go and grab the next message in the queue to process.

Competing Consumers Pattern

In this illustration, the rate at which we are producing messages and adding them to the queue exceeds how fast we can consume messages.

This may or not may be an issue depending on how quickly these messages need to be processed. If they are time-sensitive then we must increase our throughput.

To do that is simply to increase the number of consumers that are processing messages off the queue.

Competing Consumers Pattern

This is called the Competing Consumers Pattern because each consumer is competing for the next message in the queue.

If we have two Consumers that are both busy processing a message, and two messages waiting in the queue.

Competing Consumers Pattern

The moment one of the consumers finishes processing it’s message, it will get the next message in the queue.

Competing Consumers Pattern

Since the second consumer is free, it now gets the next message in the queue.

Competing Consumers Pattern

Increasing Throughput

The Competing Consumers Pattern allows you to horizontally scale by adding more consumers to process more messages concurrently. By increasing the number of consumers you will increase throughput and improve scalability and availability to manage the length of the queue.

Because the number of messages can fluctuate in most systems, you may see more messages during business hours than after hours. This allows you to scale the number of consumers and the resources they use. At peak hours you increase the number of deployed Consumers and off-peak hours you can reduce the number of consumers deployed.

Moving the Bottleneck

There are a couple issues with Competing Consumers Pattern and the first is moving the bottle neck.

If you have a lot of messages in the queue and you add more consumers to try and increase throughput, this means you’re going to be processing more work concurrently. This can have a negative effect downstream to any resources that are used in processing messages.

For example, if the messages being processed involved interacting with a database, you’ve moved the bottle neck to the database.

Competing Consumers Pattern

Any resources (database, caches, services) will all feel the effects of adding more consumers and processing more messages concurrently. Beware of moving the bottleneck that downstream resources can handle the increased load.

Message Ordering

The second common issue with the Competing Consumers Pattern is the expectation of processing messages in a particular order. For example, if the Producer is creating messages based on workflow the client is doing, then the messages are created in a particular order and added to the queue in a particular order.

Most message brokers support FIFO (First In First Out), which means that consumers will take the oldest message available to be processed.

This does not mean however that you won’t have correlated messages being processed at the same time.

Here’s an example of a yellow message being processed by a consumer.

Competing Consumers Pattern

If the producer produces another message that relates to the first message, and there are consumers available to process it, they will.

Competing Consumers Pattern

This can have negative consequences to your system if you’re expecting to process messages sequentially in a particular order. Although you will receive messages in order, that does not mean you will process them sequentially in a particular order because you’re processing messages concurrently.

Source Code

Developer-level members of my CodeOpinion YouTube channel get access to the full source for any working demo application that I post on my blog or YouTube. Check out the membership for more info.

Related Posts

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.

Highly COHESIVE Software Design to tame Complexity

What is cohesion and why should you care? Highly cohesive software design can reduce complexity and coupling. But what is cohesion? It’s the degree to which the elements inside a module belong together. How you group operations together can have a widely different outcome on Cohesion. Informational Cohesion is grouped by operations on data. Functional Cohesion is grouped by operations of a task. It’s directly related to the Single Responsibility Principle, which you might also have a different definition of.

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post.

Cohesion

Most people generally hear about Cohesion in relation to Coupling. I’ve done a post about how to Write Stable Code using Coupling Metrics, but I haven’t yet touched directly on Cohesion. To me, Cohesion and Coupling are like the yin and yang of software design.

To give it a simple definition:

“degree to which the elements inside a module belong together”

Structured Design: Fundamentals of a Discipline of Computer Program and Systems Design

However, most people are likely more familiar with or think of the Single Responsibility Principle, which is directly related to Cohesion. Single Responsibility Principle states:

A class should have one, and only one, reason to change

 Agile Software Development, Principles, Patterns, and Practices

What what does that really mean? What is “one reason”? Does that mean that a class/module should only have one job? What about dependencies, what if they change? Doesn’t that affect other classes (coupling) and they have to change?

Robert C. Martin, the author wrote about this years ago on his blog to clarify.

When you write a software module, you want to make sure that when changes are requested, those changes can only originate from a single person, or rather, a single tightly coupled group of people representing a single narrowly defined business function. You want to isolate your modules from the complexities of the organization as a whole, and design your systems such that each module is responsible (responds to) the needs of just that one business function.

https://blog.cleancoder.com/uncle-bob/2014/05/08/SingleReponsibilityPrinciple.html

What’s funny about this is that most classes/modules are not organized by business function, but rather they are organized by data.

If you read any of my other posts, you know that I advocate thinking about business capabilities and not technical concerns, which I describe more in my post AVOID Entity Services by Focusing on Capabilities.

Informational Cohesion

I think the common practice is to organize operations (methods) in a class/module using an informational cohesion approach. Information cohesion is about grouping related to operations on data.

For example, a ProductService class that did data access for a Product Entity (Data Model).

These operations are grouped in this Interface/Class because they operate on the same information/data.

Functional Cohesion

Functional Cohesion is about grouping related to the operations of a task. Single Responsibility mentions are about a narrowly defined business function(s).

This means that you don’t group by Entity/Data/Information, but rather the boundary in which users perform actions within your system. In other words, more based on roles. Cohesive software design, to me, is focusing on functional cohesion.

Example

To illustrate this more, here’s an example of a class that depends on a IProductService that I’ve defined above.

The problem with Informational Cohesion that is occurring in IProductService, is that we don’t require anything more than one single method (GetProductBySku).

To test this class, we have a couple of options. We can create a stub of the interface or we can mock it. Most would probably choose to use a mocking library so we only have to mock the one method.

However, if we change the actual implementation to use a different method on our IProductService interface, this test is going to fail because we haven’t mocked every method on the interface.

Do we really want a dependency on that interface so we can use one method? When you need a class for one method, you don’t really the interface, you want a function.

In C#, Delegates are… functions!

Instead of an interface, I’ve defined delegates within a static class that are specific for the Catalog.

Now instead of depending on the interface, we depend on the delegate.

Now our test becomes much more explicit in how we create this Handler to test. We need to stub the delegate. There is no mock. There is no mocking framework.

If we change the implementation to use a different delegate, if the signature of the delegate is the same, then the test will still pass. If the signature changes, then our code won’t even compile. We’re being as explicit as we can about what we depend on.

Cohesive Software Design

When I’m thinking about Cohesive Software Design, I’m thinking about grouping by business functions and capabilities. Grouping by tasks, by roles, but what the business process and workflows actually are.

By increasing cohesion you can reduce coupling.

Source Code

Developer-level members of my CodeOpinion YouTube channel get access to the full source for any working demo application that I post on my blog or YouTube. Check out the membership for more info.

Related Posts

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.

Testing your Domain when Event Sourcing

How do you test your domain when Event Sourcing? I find testing aggregates with Event Sourcing to be simpler than testing if you’re storing the current state. The inputs to your aggregate are events and the output of your aggregate are events.

Given a stream of events
When a valid command is performed
Then new event(s) occurs

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post.

Event Sourcing

I’ve covered Event Sourcing in my Event Sourcing Example & Explained in plain English post. So check that out if you need a more detailed primer.

Generally, I use Aggregates as a way to model my domain. The aggregate is what exposes commands and if those commands are called/invoked will result in creating new events. This general concept is how testing in event sourcing becomes simple.

In order to use an aggregate, you first need to pull all the existing events from the Event Store, replay them within the Aggregate to get to the current state, then return the Aggregate to the caller. To do this, I generally use a Repository that will do this work and build the Aggregate.

To illustrate this, we have client code that is using the repository to get the Aggregate. The repository calls the Event Store to get the existing Events (if any) for this specific aggregate.

Testing your Domain when Event Sourcing

At this point the Repository will create an empty aggregate and replay all the events it received from the Event Store.

Testing your Domain when Event Sourcing

Once it has rebuilt the aggregate, it then returns it back to the Client. The client at this point will likely call various methods/commands on the aggregate.

This was the first stage of the process. I call this the rehydrating stage. You’re rehydrating your aggregate back to its current state with all the past events. Remember this first stage as the “Given” as we start testing in event sourcing.

Creating Events

The rest of the example is using a Warehouse Product, which is the concept of a product that is in a warehouse.

Now that the client code has an aggregate, it will likely perform one or more commands on the aggregate.

If the client calls ShipProduct() command on the aggregate, and the aggregate is in a valid state to do so and passes all invariants, then it will create a new event that it will keep internally within it.

If the client code then called another command, another event would be appended to the internal list.

This is the second stage of the process where we’ve created new events which are the result of calling commands on our aggregate. Remember this second stage as the “When” stage of testing with event sourcing.

Saving Events

The last step is taking the newly created events that are in the aggregate and persisting those to the event store.

Testing your Domain when Event Sourcing

This means the client code will call back to our Repository passing it the Aggreagate.

Testing your Domain when Event Sourcing

The repository will get the new events and append those to the Event Store for that specific aggregates event stream.

Remember this stage as the “Then” in our tests when Event Sourcing.

Given, When, Then

If you take that basic 3 steps of loading an aggregate, calling commands, saving the new events, you can boil that down to:

Given a stream of events
When a valid command is performed
Then new event(s) occurs

You can use this as the testing strategy for testing your Aggregates.

The WarehouseProduct above has 3 commands: ShipProduct, ReceiveProduct, and AdjustInventory. All of which result in creating their respective events if they passed any invariants.

To illustrate this Given, When, Then for the ShipProduct command, which should create a ProductShipped Event.

While that does satisfy our goal, it’s a bit cumbersome to have to write this to verify the event. To simply and make this a bit more natural to read, I created a base class to use within our tests.

Now using this abstract class, here are all the tests for the WarehouseProduct.

Source Code

Developer-level members of my CodeOpinion YouTube channel get access to the full source for any working demo application that I post on my blog or YouTube. Check out the membership for more info.

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.