This blog introduces you to building event-driven microservices application using CQRS and Event sourcing patterns. Following is a brief definition of the concepts that would be discussed during the course of this blog, more details about these can be obtained from the resources provided at the end of this blog.
What is a Microservice?
While there is no single definition for this architectural style, Adrian Cockcroft defines microservices architecture as a service-oriented architecture composed of loosely coupled elements that have bounded contexts.
What is a Bounded Context?
A Bounded Context is a concept that encapsulates the details of a single domain, such as domain model, data model, application services, etc., and defines the integration points with other bounded contexts/domains.
What is CQRS?
Command Query Responsibility Segregation (CQRS) is an architectural pattern that segregates the domain operations into two categories – Queries and Commands. While queries just return some results without making any state changes, commands are the operations which change the state of the domain model.
Why CQRS?
During the lifecycle of an application, it is common that the logical model becomes more complicated and structured that could impact the user experience, which must be independent of the core system.
In order to have a scalable and easy to maintain application we need to reduce the constraints between the read model and the write model. Some reasons for keeping reads and writes apart could be:
- Scalability (read exceeds the write, so does the scaling requirements for each differs and can be addressed better)
- Flexibility (separate read / write models)
- Reduced Complexity (shifting complexity into separate concerns)
What is Event sourcing?
Event sourcing achieves atomicity by using a different, event-centric approach to persisting business entities. Instead of storing the current state of an entity, the application stores a sequence of ‘events’ that changed the entity’s state. The application can reconstruct an entity’s current state by replaying the events. Since saving an event is a single operation, it is inherently atomic and does not require 2PC (2-phase commit) which is typically associated with distributed transactions.
Overview
This blog explains how CQRS and Event Sourcing patterns can be applied to develop a simple microservice application that consists of a single bounded context called “Cart” with add, remove and read operations. The sample does not have any functional significance but should be good enough to understand the underlying patterns and their implementations. The following diagram depicts a high level flow of activities when using CQRS and Event sourcing patterns to build applications:

Figure 1 CQRS and Event sourcing
The sample referred in this blog uses the following technology stack:
- Spring Boot for building and packaging the application
- Axon framework with Spring for CQRS and Event Sourcing. Axon is an open source CQRS framework for Java which provides implementations of the most important building blocks, such as aggregates, repositories and event buses that help us build applications using CQRS and Event sourcing architectural patterns. It also allows you to provide your own implementation of the above mentioned building blocks.
- Oracle Application Container cloud for application deployment
With this background, let us start building the sample.
Identify Aggregate Root
First step is to identify the bounded context and domain entities in the bounded context. This will help us define the Aggregate Root (for example, an ‘account’, an ‘order’…etc.). An aggregate is an entity or group of entities that is always kept in a consistent state. The aggregate root is the object on top of the aggregate tree that is responsible for maintaining this consistent state.
To keep things simple for this blog, we consider ‘Cart’ as the only Aggregate Root in the domain model. Just like the usual shopping cart, the items in the cart are adjusted based on the additions or removals happening on that cart.
Define Commands
This aggregate root has 2 commands associated with it:
- Add to Cart Command – Modeled by AddToCartCommand class
- Remove from Cart Command – Modeled by RemoveFromCartCommand class
public****class AddToCartCommand {
**private** **final** String cartId;
**private** **final** **int** item;
**public** AddToCartCommand(String cartId, **int** item) {
**this**.cartId = cartId;
**this**.item = item;
}
**public** String getCartId() {
**return** cartId;
}
**public** **int** getItem() {
**return** item;
}
}
public class RemoveFromCartCommand {
private final String cartId;
private final int item;
public RemoveFromCartCommand(String cartId, int item) {
**this**.cartId = cartId;
**this**.item = item;
}
public String getCartId() {
**return** cartId;
}
public int getItem() {
**return** item;
}
}
As you notice, these commands are just POJOs used to capture the intent of what needs to happen within a system along with the necessary information that is required. Axon Framework does not require commands to implement any interface nor extend any class.
Define Command Handlers
A command is intended to have only one handler, the following classes represent the handlers for Add to Cart and Remove from Cart commands:
@Component
public class AddToCartCommandHandler {
private Repository repository;
@Autowired
public AddToCartCommandHandler(Repository repository) {
**this**.repository = repository;
}
@CommandHandler
public void handle(AddToCartCommand addToCartCommand){
Cart cartToBeAdded = (Cart) repository.load(addToCartCommand.getCartId());
cartToBeAdded.addCart(addToCartCommand.getItem());
}
}
@Component
public class RemoveFromCartHandler {
private Repository repository;
@Autowired
public RemoveFromCartHandler(Repository repository) {
**this**.repository = repository;
}
@CommandHandler
public void handle(RemoveFromCartCommand removeFromCartCommand){
Cart cartToBeRemoved = (Cart) repository.load(removeFromCartCommand.getCartId());
cartToBeRemoved.removeCart(removeFromCartCommand.getItem());
}
}
We use Axon with Spring framework, so the Spring beans defined above have methods annotated with @CommandHandler which makes them as command handlers. @Component annotation ensures that these beans are scanned during application startup and any auto wired resources are injected into this bean. Instead of accessing the Aggregates directly, Repository which is a domain object in Axon framework abstracts retrieving and persisting of aggregates.
Application Startup
Following is the AppConfiguration class which is a Spring configuration class that gets initialized upon application deployment and creates the components required for implementing the patterns.
@Configuration
@AnnotationDriven
public class AppConfiguration {
@Bean
public DataSource dataSource() {
**return** DataSourceBuilder
._create_()
.username("sa")
.password("")
.url("jdbc:h2:mem:axonappdb")
.driverClassName("org.h2.Driver")
.build();
}
/**
* Event store to store events
*/
@Bean
public EventStore jdbcEventStore() {
**return** **new** JdbcEventStore(dataSource());
}
@Bean
public SimpleCommandBus commandBus() {
SimpleCommandBus simpleCommandBus = **new** SimpleCommandBus();
**return** simpleCommandBus;
}
/**
* Cluster event handlers that listens to events thrown in the application.
*/
@Bean
public Cluster normalCluster() {
SimpleCluster simpleCluster = **new** SimpleCluster("simpleCluster");
**return** simpleCluster;
}
/**
* This configuration registers event handlers with defined clusters
*/
@Bean
public ClusterSelector clusterSelector() {
Map\<String, Cluster> clusterMap = **new** HashMap\<>();
clusterMap.put("msacqrses.eventhandler", normalCluster());
**return** **new** ClassNamePrefixClusterSelector(clusterMap);
}
/**
*The clustering event bus is needed to route events to event handlers in the clusters.
*/
@Bean
public EventBus clusteringEventBus() {
ClusteringEventBus clusteringEventBus = **new** ClusteringEventBus(clusterSelector(), terminal());
**return** clusteringEventBus;
}
/**
* Event Bus Terminal publishes domain events to the cluster
*
*/
@Bean
public EventBusTerminal terminal() {
**return** **new** EventBusTerminal() {
@Override
**public** **void** publish(EventMessage... events) {
normalCluster().publish(events);
}
@Override
**public** **void** onClusterCreated(Cluster cluster) {
}
};
}
/**
* Command gateway through which all commands in the application are submitted
*
*/
@Bean
public DefaultCommandGateway commandGateway() {
**return** **new** DefaultCommandGateway(commandBus());
}
/**
* Event Repository that handles retrieving of entity from the stream of events.
*/
@Bean
public Repository<Cart> eventSourcingRepository() {
EventSourcingRepository eventSourcingRepository = new EventSourcingRepository(Cart.class, jdbcEventStore());
eventSourcingRepository.setEventBus(clusteringEventBus());
**return** eventSourcingRepository;
}
}
Let us take a look at the key Axon provided infrastructure components that are initialized in this class:
Command bus
As represented in “Figure 1” above, command bus is the component that routes commands to their respective command handlers. Axon Framework comes with different types of Command Bus out of the box that can be used to dispatch commands to command handlers. Please refer here for more details on Axon’s Command Bus implementations. In our example, we use SimpleCommandBus which is configured as a bean in Spring's application context.
Command Gateway
Command bus can directly send commands but it is usually recommended to use a command gateway. Using a command gateway allows developers to perform certain functionalities like intercepting commands, setting retry in failure scenarios…etc. In our example, we use Axon provided default which is DefaultCommandGateway that is configured as a Spring bean to send commands instead of directly using a command bus.
Event Bus
As depicted in “Figure 1”, the commands initiated on an Aggregate root are sent as events to the Event store where they get persisted. Event Bus is the infrastructure that routes events to event handlers. Event Bus may look similar to command bus from a message dispatching perspective but they vary fundamentally.
Command Bus works with commands that define what happen in the near future and there is only one command handler that interprets the command. However in case of Event Bus, it routes events and events define actions that happened in the past with zero or more event handlers for an event.
Axon defines multiple implementations of Event Bus, in our example we use ClusteringEventBus which is again wired up as a Spring bean. Please refer here for more details on Axon’s Event Bus implementations.
Event Store
We need to configure an event store as our repository will store domain events instead of the current state of our domain objects. Axon framework allows storing the events using multiple persistent mechanisms like JDBC, JPA, file system etc. In this example we use a JDBC event store.
Event Sourcing Repository
In our example, the aggregate root is not created from a representation in a persistent mechanism, instead is created from stream of events which is achieved through an Event sourcing repository. We configure the repository with the event bus that we defined earlier since it will be publishing the domain events.
Database
We use in memory database (h2) in our example as the data store. The Spring Boot’s application.properties contains the data source configuration settings:
# Datasource configuration
spring.datasource.url=jdbc:h2:mem:axonappdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.datasource.validation-query=SELECT 1;
spring.datasource.initial-size=2
spring.datasource.sql-script-encoding=UTF-8
spring.jpa.database=h2
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=create
As mentioned above, this example uses a JDBC event store to store the domain events generated in the system. These events are stored in a default tables (part of Axon framework event infrastructure) specified by the Axon framework. We use the following startup class for creating the database tables required by this example:
@Component
public class Datastore {
@Autowired
@Qualifier("transactionManager")
protected PlatformTransactionManager txManager;
@Autowired
private Repository repository;
@Autowired
private javax.sql.DataSource dataSource;
// create two cart entries in the repository used for command processing
@PostConstruct
private void init(){
TransactionTemplate transactionTmp = new TransactionTemplate(txManager);
transactionTmp.execute(new TransactionCallbackWithoutResult() {
@Override
**protected** **void** doInTransactionWithoutResult(TransactionStatus status) {
UnitOfWork uow = DefaultUnitOfWork._startAndGet_();
repository.add(**new** Cart("cart1"));
repository.add(**new** Cart("cart2"));
uow.commit();
}
});
// create a database table for querying and add two cart entries
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
jdbcTemplate.execute("create table cartview (cartid VARCHAR , items NUMBER )");
jdbcTemplate.update("insert into cartview (cartid, items) values (?, ?)", new Object[]{"cart1", 0});
jdbcTemplate.update("insert into cartview (cartid, items) values (?, ?)", new Object[]{"cart2", 0});
}
This startup class creates two cart entries in the repository used for command processing and creates a database table called “cartview” which is used for processing the queries.
A quick recap on what we did so far:
- We have identified “Cart” as our Aggregate root and have defined commands and command handlers for adding and removing items from the Cart.
- We have defined a startup class which initializes the infrastructure components required for CQRS and Event sourcing.
- A startup class has also been defined to create the database tables and setup the data required by this sample.
Let us now look at our AggregateRoot - “Cart” which is defined as below:
Aggregate Root
public****class Cart extends AbstractAnnotatedAggregateRoot {
@AggregateIdentifier
private String cartid;
private int items;
public<