In this part, the sample application using mentioned technologies and architecture is presented.
he application is developed in Grails. CQRS architecture is implemented with Axon framework support.
There are two simple use cases in the application:
- entering new credit card transaction
- canceling created credit card transaction
If you need some more information about technologies used come back to Part I. In this post I will mainly concentrate at the code base.
I will try to present the whole classes but in some cases they contain some less important code, not relevant to the CQRS, so I will skip that parts. However the entire project can be pulled from GitHub.
Let’s start with a quick project structure overview:
ControllerCQRS architecture introduces division of application into two separate parts: one responsible for data persistence and domain logic and another one responsible for fetching data in order to present it to the user.
I decided to use Grails domain classes feature to implement the read part while implement the proper domain logic in the separate classes placed outside grails app in the src folder. So if you look in grails-app/domain/grails/cqrs it is the read model, while in src/grails/cqrs/domain write model is located.
The analysis will by done starting from the view part down to the backend.
Controller in Grails is a component handling requests and preparing a responses. In grails controllers are simple classes which names end in Controller. There are placed in the grails-app/controllers location.
Have a look at part of the CreditCardTransactionController:
class CreditCardTransactionController { DefaultCommandGateway defaultCommandGateway; .... }
Command Gateway is an interface to the command dispatching/handling mechanism. Axon provides CommandGateway interface containing signatures of methods responsible for sending commands synchronously, asynchronously, with timeout and so on. Obviously, you can implement that interface and provide own mechanism for dispatching command. However Axon provides default implementation DefaultCommandGateway which can be used straightaway.
In the controller we have two important methods: save and cancel. The are responsible for handling request for two basic use cases of the application: creating and canceling credit card transaction.
@Transactional def save(CreateTransactionCommand command) { if (command == null) { notFound() return } if (command.hasErrors()) { respond command.errors, view: 'create' return } command.id = randomUUID() commandGateway.send(command) flash.message = 'Credit card transaction created' redirect(action: "index") } @Transactional def cancel(CancelTransactionCommand command) { if (command == null) { notFound() return } CreditCardTransaction creditCardTransaction = CreditCardTransaction.findById(command.id) command.aggregateIdentifier = creditCardTransaction.aggregateIdentifier commandGateway.send(command) request.withFormat { form { flash.message = "Credit card transaction cancelled" redirect action: "index", method: "GET" } '*' { render status: NO_CONTENT } } }
Command Object
The first interesting thing is command parameter of each method. Command object in Grails when used as a controller parameter is a convenient way to pass request data. It uses data binding mechanism to bind request parameters to the command object properties. There is no need any more to get data from the request itself. What is more it allows data validation. It is also a mechanism to use when implementing command design pattern.
As an example have a look at the CreateTransactionCommand:
@grails.validation.Validateableclass CreateTransactionCommand { UUID id String creditCardNumber String creditCardCvvCode String creditCardOwner Date validDate BigDecimal amount static constraints = { creditCardNumber blank: false, size: 16..16 creditCardCvvCode blank: false, size: 3..3 creditCardOwner blank: false validDate nullable: false amount nullable: false } }
It contains request data, validation rules and id. Because of the fact that those commands will be dispatched to the backend they should have unique id. This id can be assigned in different parts of the application. I decided to assign it in the controller (line 12). It will be used as an aggregate identifier later on during command processing.
Command Gateway
Commands are sent to the command handling mechanism using mentioned earlier command gateway (line 13). Apart of simple send(Object command) method which sends command and returns immediately without waiting for command execution, CommandGateway provides also the following methods:
- void send(Object command, CommandCallback<R> callback
- sends command and have the command result reported to the given callback
- R sendAndWait(Object command)
- sends command and waits for the command execution
- R sendAndWait(Object command, long timeout, TimeUnit unit)
- sends command and waits for the command execution until the given timeout is reached
Come back for a moment to the controller and its cancel method. It is responsible for canceling credit card transaction. The problem here is that the read and write model need to be matched in some way. On the view we have data from read model (grails-app/domain) while command sent to the write model needs to identify the proper aggregate to update (delete in this case – it is only for presentation purpose, in the real world such case should not delete an aggregate).
The matching is done by write model which holds aggregate identifier.
Command Handler
So far we have sent command using command gateway. It is then forwarded to the appropriate command handler through command bus. Infrastructure, like command bus will be discussed a little bit later. Command handler is an object receiving command of the specified type.
Command handlers can be created by implementing CommandHandler interface which defines a single method Object handle(CommandMessage<T> command, UnitOfWork uow). Such command handler needs to be subscribed to/unsubscribed from the command bus manually.
I decided to use annotation based command handlers. In that case, command handler can be any POJO class with handle method annotated with CommandHandler annotation. That method as its first argument should declare command object. It is possible to add more arguments but the first one must be a command object.
@Componentclass CreateTransactionCommandHandler { @CommandHandler void handle(CreateTransactionCommand command) throws Throwable { CreditCardTransactionAggregate creditCardTransactionAggregate = new CreditCardTransactionAggregate(command) EventSourcingRepository eventSourcingRepository = SpringBeanResolver.resolve("creditCardTransactionRepository") eventSourcingRepository.add(creditCardTransactionAggregate) } }
I created two separate command handlers, one for each command type. Annotation based configuration allows to define multiple methods in one object, each handling specific command. What is more aggregate methods can be annotated as command handlers. For example aggregate constructor can be annotated @CommandHandler. Consequently, sending that command may result in creating a new aggregate.
In the sample application aggregate is created in the command handler itself and saved to the event store using repository.
Line 8 from the above listing is specific for Grails framework. I have needed to create simple Spring beans resolver because Grails dependency injection (based on Spring) does not inject dependencies into objects located outside grails-app.
Repository will be discussed later on during Spring configuration analysis since it is configured there.
Aggregate
As we have aggregate created and saved to the store, let’s have a look at its implementation:
class CreditCardTransactionAggregate extends AbstractEventSourcedAggregateRoot { @AggregateIdentifier UUID identifier String creditCardNumber String creditCardCvvCode Date validDate BigDecimal amount Date transactionDate Date cancellationDate CreditCardOwner creditCardOwner CreditCardTransactionAggregate() {} CreditCardTransactionAggregate(CreateTransactionCommand command) { apply(new TransactionCreatedEvent(command.id, command.creditCardNumber, command.creditCardCvvCode, command.creditCardOwner, command.validDate, command.amount, new Date())) } void cancelTransaction(CancelTransactionCommand command) { apply(new TransactionCancelledEvent(command.id, command.aggregateIdentifier)) } @Override protected Iterable <? extends EventSourcedEntity > getChildEntities() { return null } @Override protected void handle(DomainEventMessage event) { if (TransactionCreatedEvent.class.isAssignableFrom(event.getPayload().getClass())) { handleTransactionCreated(event.getPayload()) } if (TransactionCancelledEvent.class.isAssignableFrom(event.getPayload().getClass())) { handleTransactionCancel(event.getPayload()) } } private void handleTransactionCreated(TransactionCreatedEvent event) { this.identifier = event.id this.creditCardNumber = event.creditCardNumber this.creditCardCvvCode = event.creditCardCvvCode this.validDate = event.getValidDate() this.amount = event.getAmount() this.transactionDate = new Date() creditCardOwner = new CreditCardOwner(event.creditCardOwner) } private void handleTransactionCancel(TransactionCancelledEvent event) { cancellationDate = new Date() markDeleted() } }
The aggregate extends AbstractEventSourcedAggregateRoot which is a nice abstract class to extend by aggregates which are store in an event store. That class also tracks all uncommitted events. Moreover, it provides useful methods to initialize aggregate state based on the events retrieved from the event store.
As we can see in the configuration later on, events applied on the aggregate are stored on the file system in the folder with aggregate name. Each event is stored in separate file identified by event id:
Domain Event
As nowadays we live in the event driven environment, software, especially its domain model, which is intended to reflect the reality as perfectly as it is possible, should be event-driven as well. Going that path all important changes in the domain should be represented by events. Axon provides a special DomainEventMessage interface to be implemented by the objects representing such events.
The sample aggregate is applied two domain events: TransactionCreatedEvent and TransactionCancelledEvent. AbstractEventSourcedAggregateRoot interface provides a handle method (line 30) which responsibility is to handle events internally in the aggregate.
Event applied on the aggregate is dispatched on the event bus which forwards it to the applicable event listener.
class TransactionCreatedEvent { UUID id String creditCardNumber String creditCardCvvCode String creditCardOwner Date validDate BigDecimal amount Date transactionDate TransactionCreatedEvent(UUID id, String creditCardNumber, String creditCardCvvCode, String creditCardOwner, Date validDate, BigDecimal amount, Date transactionDate) { this.id = id this.creditCardNumber = creditCardNumber this.creditCardCvvCode = creditCardCvvCode this.creditCardOwner = creditCardOwner this.validDate = validDate this.amount = amount this.transactionDate = transactionDate } }
Domain event caries all the information which will be stored in the read part of the application. One important thing to notice is that it id property holding aggregate identifier.
Domain Event Listener
Finally, there is an event listener which handles events and persists data in the store from which data is retrieved and presented to the user.
@Componentclass CreditCardTransactionEventListener { @EventHandler public void handle(TransactionCreatedEvent event) { CreditCardTransaction creditCardTransaction = new CreditCardTransaction() creditCardTransaction.creditCardNumber = event.creditCardNumber creditCardTransaction.creditCardCvvCode = event.creditCardCvvCode creditCardTransaction.creditCardOwner = event.creditCardOwner creditCardTransaction.validDate = event.validDate creditCardTransaction.amount = event.amount creditCardTransaction.transactionDate = event.transactionDate creditCardTransaction.aggregateIdentifier = event.id creditCardTransaction.save flush: true } @EventHandler public void handle(TransactionCancelledEvent event) { CreditCardTransaction creditCardTransaction = CreditCardTransaction.findById(event.transactionId) creditCardTransaction.delete flush: true } }
So we have events stored in the write part of the application and data stored in the read part based on the events dispatched by the domain model.
Spring configuration
Last but not least element is an infrastructure. In the sample application it is configured using Spring application context:
<axon:annotation-config /> <context:component-scan base-package="grails.cqrs" /> <bean class="org.axonframework.commandhandling.annotation.AnnotationCommandHandlerBeanPostProcessor"> <property name="commandBus" ref="commandBus" /> </bean> <bean id="defaultCommandGateway" class="org.axonframework.commandhandling.gateway.DefaultCommandGateway"> <constructor-arg name="commandBus" ref="commandBus" /> <constructor-arg name="commandDispatchInterceptors"> <list></list> </constructor-arg> </bean> <axon:filesystem-event-store id="eventStore" base-dir="/events" /> <axon:command-bus id="commandBus" /> <axon:event-bus id="eventBus" /> <bean id="creditCardTransactionRepository" class="org.axonframework.eventsourcing.EventSourcingRepository"> <constructor-arg value="grails.cqrs.domain.CreditCardTransactionAggregate"/> <property name="eventBus" ref="eventBus"/> <property name="eventStore" ref="eventStore"/> </bean>
Line <axon:annotation-config /> allows Axon to process annotation and accordingly turn objects with @EventHandler annotation into event handlers or @CommandHandler into command handlers for example. Obviously, such beans needs to be components managed by the Spring.
AnnotationCommandHandlerBeanPostProcessor defined in line 5 registers classes containing command handling methods with a command bus provided through commandBus property.
In the line 9 there is a command gateway definition. It receives reference to the command bus and list of interceptors through constructor arguments.
Event though I haven’t provided any command dispatch interceptors it is worth saying a few words about them. Their purpose is to modify command message before dispatching it to the command bus. Interceptors are executed only on the command handlers going through that particular gateway.
What can be done in the command dispatch interceptors? Validation for example or enriching command event with meta data.
Very significant element is defined in the line 16. It is an event store. In this case it is a file system based store. It stores events applied on the aggregates in the events folder.
Apart of the file system event store used in the example, Axon framework provides other event store implementations like JpaEventStore for example.
Lines 18 and 20 use Axon namespaces to define the command bus and the event bus.
Line 22 defines repository used to persist or load aggregates. EventSourcingRepository used in the sample automatically publish domain event to the event bus. In addition, it delegates event storage to the provided event store.
Summary
That is nearly all. As you can probably see from that simple example, Axon framework does brilliant job providing a perfectly designed infrastructure and architecture to support CQRS implementation.
A lot of boilerplate code is eliminated from the application and moved to the framework. That is exactly what we expect from the solid framework.
Apart of all the advantages which were mentioned in the Part I and more can be find in the Internet, I would like to stress one more thing. Having application separated into two loosely coupled parts: write and read, each one having own storage gives you an unique ability to switch off any of the parts without affecting another one.Think about a failure. It greatly reduces risk of the whole application going down.
In my opinion, that is another great advantage of CQRS architecture. Advantage sometimes underestimated.