Grails + Axon = CQRS (part II)

 As promised in one of the previous posts, I carry on series concerning Groovy, Grails and Axon framework in this case.  In Part I a few words have been written about Grails, CQRS and Axon.
In this part, the sample application using mentioned technologies and architecture is presented.
he application is developed in Grails. CQRS architecture is implemented with Axon framework support.

There are two simple use cases in the application:

  • entering new credit card transaction
  • canceling created credit card transaction

If you need some more information about technologies used come back to Part I. In this post I will mainly concentrate at the code base.

I will try to present the whole classes but in some cases they contain some less important code, not relevant to the CQRS, so I will skip that parts. However the entire project can be pulled from GitHub.

Let’s start with a quick project structure overview:

Picture 1. Project overview
Picture 1. Project overview

ControllerCQRS architecture introduces division of application into two separate parts: one responsible for data persistence and domain logic and another one responsible for fetching data in order to present it to the user.
I decided to use Grails domain classes feature to implement the read part while implement the proper domain logic in the separate classes placed outside grails app in the src folder. So if you look in grails-app/domain/grails/cqrs it is the read model, while in src/grails/cqrs/domain write model is located.

The analysis will by done starting from the view part down to the backend.

Controller in Grails is a component handling requests and preparing a responses. In grails controllers are simple classes which names end in Controller. There are placed in the grails-app/controllers location.
Have a look at part of the CreditCardTransactionController:

class CreditCardTransactionController {
  DefaultCommandGateway defaultCommandGateway; 
  ....
}

Command Gateway is an interface to the command dispatching/handling mechanism. Axon provides CommandGateway interface containing signatures of methods responsible for sending commands synchronously, asynchronously, with timeout and so on. Obviously, you can implement that interface and provide own mechanism for dispatching command. However Axon provides default implementation DefaultCommandGateway which can be used straightaway.

In the controller we have two important methods: save and cancel. The are responsible for handling request for two basic use cases of the application: creating and canceling credit card transaction.

@Transactional 
def save(CreateTransactionCommand command) {
  if (command == null) {
     notFound() 
     return
  }
  if (command.hasErrors()) {
    respond command.errors, view: 'create'
    return
  }
  command.id = randomUUID() commandGateway.send(command) flash.message = 'Credit card transaction created'
  redirect(action: "index")
}
@Transactional 
def cancel(CancelTransactionCommand command) {
  if (command == null) {
    notFound() 
    return
  }
  CreditCardTransaction creditCardTransaction = CreditCardTransaction.findById(command.id) 
  command.aggregateIdentifier = creditCardTransaction.aggregateIdentifier 
  commandGateway.send(command) request.withFormat {
    form {
      flash.message = "Credit card transaction cancelled"
      redirect action: "index", method: "GET"
    }
    '*' { render status: NO_CONTENT }
  }
}

Command Object

The first interesting thing is command parameter of each method. Command object in Grails when used as a controller parameter is a convenient way to pass request data. It uses data binding mechanism to bind request parameters to the command object properties. There is no need any more to get data from the request itself. What is more it allows data validation. It is also a mechanism to use when implementing command design pattern.

As an example have a look at the CreateTransactionCommand:

@grails.validation.Validateableclass 
CreateTransactionCommand {    
UUID id    
String creditCardNumber    
String creditCardCvvCode    
String creditCardOwner    
Date validDate    
BigDecimal amount    

  static constraints = {        
    creditCardNumber blank: false, size: 16..16        
    creditCardCvvCode blank: false, size: 3..3        
    creditCardOwner blank: false        
    validDate nullable: false        
    amount nullable: false    
  }
}

It contains request data, validation rules and id. Because of the fact that those commands will be dispatched to the backend they should have unique id. This id can be assigned in different parts of the application. I decided to assign it in the controller (line 12).  It will be used as an aggregate identifier later on during command processing.

Command Gateway

Commands are sent to the command handling mechanism using mentioned earlier command gateway (line 13).  Apart of simple send(Object command) method which sends command and returns immediately without waiting for command execution, CommandGateway provides also the following methods:

  • void send(Object command, CommandCallback<R> callback
    • sends command and have the command result reported to the given callback
  • R sendAndWait(Object command)
    • sends command and waits for the command execution
  • R sendAndWait(Object command, long timeout, TimeUnit unit)
    • sends command and waits for the command execution until the given timeout is reached

Come back for a moment to the controller and its cancel method. It is responsible for canceling credit card transaction. The problem here is that the read and write model need to be matched in some way. On the view we have data from read model (grails-app/domain) while command sent to the write model needs to identify the proper aggregate to update (delete in this case – it is only for presentation purpose, in the real world such case should not delete an aggregate).
The matching is done by write model which holds aggregate identifier.

Command Handler

So far we have sent command using command gateway. It is then forwarded to the appropriate command handler through command bus. Infrastructure, like command bus will be discussed a little bit later. Command handler is an object receiving command of the specified type.
Command handlers can be created by implementing CommandHandler interface which defines a single method Object handle(CommandMessage<T> command, UnitOfWork uow). Such command handler needs to be subscribed to/unsubscribed from  the command bus manually.
I decided to use annotation based command handlers. In that case, command handler can be any POJO class with handle method annotated with CommandHandler annotation. That method as its first argument should declare command object. It is possible to add more arguments but the first one must be a command object.

@Componentclass 
CreateTransactionCommandHandler { 
  @CommandHandler 
  void handle(CreateTransactionCommand command) throws Throwable { 
    CreditCardTransactionAggregate creditCardTransactionAggregate = new CreditCardTransactionAggregate(command) 
    EventSourcingRepository eventSourcingRepository = SpringBeanResolver.resolve("creditCardTransactionRepository") 
    eventSourcingRepository.add(creditCardTransactionAggregate) 
  }
}

I created two separate command handlers, one for each command type. Annotation based configuration allows to define multiple methods in one object, each handling specific command. What is more aggregate methods can be annotated as command handlers. For example aggregate constructor can be annotated @CommandHandler. Consequently, sending that command may result in creating a new aggregate.
In the sample application aggregate is created in the command handler itself and saved to the event store using repository.

Line 8 from the above listing is specific for Grails framework. I have needed to create simple Spring beans resolver because Grails dependency injection (based on Spring) does not inject dependencies into objects located outside grails-app. 
Repository will be discussed later on during Spring configuration analysis since it is configured there.

Aggregate

As we have aggregate created and saved to the store, let’s have a look at its implementation:

class CreditCardTransactionAggregate extends AbstractEventSourcedAggregateRoot {
  @AggregateIdentifier UUID identifier 
  String creditCardNumber 
  String creditCardCvvCode 
  Date validDate 
  BigDecimal amount 
  Date transactionDate 
  Date cancellationDate 
  CreditCardOwner creditCardOwner 

  CreditCardTransactionAggregate() {}

  CreditCardTransactionAggregate(CreateTransactionCommand command) {
    apply(new TransactionCreatedEvent(command.id, command.creditCardNumber, command.creditCardCvvCode, command.creditCardOwner, command.validDate, command.amount, new Date()))
  }

  void cancelTransaction(CancelTransactionCommand command) {
    apply(new TransactionCancelledEvent(command.id, command.aggregateIdentifier))
  }

  @Override 
  protected Iterable <? extends EventSourcedEntity > getChildEntities() {
    return null
  }

  @Override 
  protected void handle(DomainEventMessage event) {
    if (TransactionCreatedEvent.class.isAssignableFrom(event.getPayload().getClass())) {
      handleTransactionCreated(event.getPayload())
    }

    if (TransactionCancelledEvent.class.isAssignableFrom(event.getPayload().getClass())) {
      handleTransactionCancel(event.getPayload())
    }
  }

  private void handleTransactionCreated(TransactionCreatedEvent event) {
    this.identifier = event.id this.creditCardNumber = event.creditCardNumber this.creditCardCvvCode = event.creditCardCvvCode this.validDate = event.getValidDate() this.amount = event.getAmount() this.transactionDate = new Date() creditCardOwner = new CreditCardOwner(event.creditCardOwner)
  }

  private void handleTransactionCancel(TransactionCancelledEvent event) {
    cancellationDate = new Date() markDeleted()
  }
}

The aggregate extends AbstractEventSourcedAggregateRoot which is a nice abstract class to extend by aggregates which are store in an event store. That class also tracks all uncommitted events. Moreover, it provides useful methods to initialize aggregate state based on the events retrieved from the event store.
As we can see in the configuration later on, events applied on the aggregate are stored on the file system in the folder with aggregate name. Each event is stored in separate file identified by event id:

Domain Event

As nowadays we live in the event driven environment, software, especially its domain model, which is intended to reflect the reality as perfectly as it is possible, should be event-driven as well. Going that path all important changes in the domain should be represented by events. Axon provides a special DomainEventMessage interface to be implemented by the objects representing such events.
The sample aggregate is applied two domain events: TransactionCreatedEvent and TransactionCancelledEvent. AbstractEventSourcedAggregateRoot interface provides a handle method (line 30) which responsibility is to handle events internally in the aggregate.
Event applied on the aggregate is dispatched on the event bus which forwards it to the applicable event listener.

class TransactionCreatedEvent { 
  UUID id 
  String creditCardNumber 
  String creditCardCvvCode 
  String creditCardOwner 
  Date validDate 
  BigDecimal amount 
  Date transactionDate
   
  TransactionCreatedEvent(UUID id, 
                          String creditCardNumber, 
                          String creditCardCvvCode, 
                          String creditCardOwner, 
                          Date validDate, 
                          BigDecimal amount, 
                          Date transactionDate) { 
    this.id = id 
    this.creditCardNumber = creditCardNumber 
    this.creditCardCvvCode = creditCardCvvCode 
    this.creditCardOwner = creditCardOwner 
    this.validDate = validDate 
    this.amount = amount 
    this.transactionDate = transactionDate 
  }
}

Domain event caries all the information which will be stored in the read part of the application. One important thing to notice is that it id property holding aggregate identifier.

Domain Event Listener

Finally, there is an event listener which handles events and persists data in the store from which data is retrieved and presented to the user.

@Componentclass 
CreditCardTransactionEventListener { 

  @EventHandler 
  public void handle(TransactionCreatedEvent event) { 
    CreditCardTransaction creditCardTransaction = new CreditCardTransaction() 
    creditCardTransaction.creditCardNumber = event.creditCardNumber 
    creditCardTransaction.creditCardCvvCode = event.creditCardCvvCode 
    creditCardTransaction.creditCardOwner = event.creditCardOwner 
    creditCardTransaction.validDate = event.validDate 
    creditCardTransaction.amount = event.amount 
    creditCardTransaction.transactionDate = event.transactionDate 
    creditCardTransaction.aggregateIdentifier = event.id 
    creditCardTransaction.save flush: true 
  } 

  @EventHandler 
  public void handle(TransactionCancelledEvent event) { 
    CreditCardTransaction creditCardTransaction = CreditCardTransaction.findById(event.transactionId) 
    creditCardTransaction.delete flush: true 
  }
}

So we have events stored in the write part of the application and data stored in the read part based on the events dispatched by the domain model.

Spring configuration

Last but not least element is an infrastructure. In the sample application it is configured using Spring application context:

<axon:annotation-config />
<context:component-scan base-package="grails.cqrs" />
<bean class="org.axonframework.commandhandling.annotation.AnnotationCommandHandlerBeanPostProcessor">
    <property name="commandBus" ref="commandBus" />
</bean>
<bean id="defaultCommandGateway" class="org.axonframework.commandhandling.gateway.DefaultCommandGateway">
    <constructor-arg name="commandBus" ref="commandBus" />
    <constructor-arg name="commandDispatchInterceptors">
        <list></list>
    </constructor-arg>
</bean>
<axon:filesystem-event-store id="eventStore" base-dir="/events" />
<axon:command-bus id="commandBus" />
<axon:event-bus id="eventBus" />
<bean id="creditCardTransactionRepository" class="org.axonframework.eventsourcing.EventSourcingRepository">
    <constructor-arg value="grails.cqrs.domain.CreditCardTransactionAggregate"/>
    <property name="eventBus" ref="eventBus"/>
    <property name="eventStore" ref="eventStore"/>
</bean>

Line <axon:annotation-config /> allows Axon to process annotation and accordingly turn objects with @EventHandler annotation into event handlers or @CommandHandler into command handlers for example. Obviously, such beans needs to be components managed by the Spring.

AnnotationCommandHandlerBeanPostProcessor defined in line 5 registers classes containing command handling methods with a command bus provided through commandBus property.

In the line 9 there is a command gateway definition. It receives reference to the command bus and list of interceptors through constructor arguments.
Event though I haven’t provided any command dispatch interceptors it is worth saying a few words about them. Their purpose is to modify command message before dispatching it to the command bus. Interceptors are executed only on the command handlers going through that particular gateway.
What can be done in the command dispatch interceptors? Validation for example or enriching command event with meta data.

Very significant element is defined in the line 16. It is an event store. In this case it is a file system based store. It stores events applied on the aggregates in the events folder.
Apart of the file system event store used in the example, Axon framework provides other event store implementations like JpaEventStore for example.

Lines 18 and 20 use Axon namespaces to define the command bus and the event bus.

Line 22 defines repository used to persist or load aggregates. EventSourcingRepository used in the sample automatically publish domain event to the event bus. In addition, it delegates event storage to the provided event store.

Summary

That is nearly all. As you can probably see from that simple example, Axon framework does brilliant job providing a perfectly designed infrastructure and architecture to support CQRS implementation.
A lot of boilerplate code is eliminated from the application and moved to the framework. That is exactly what we expect from the solid framework.
Apart of all the advantages which were mentioned in the Part I and more can be find in the Internet, I would like to stress one more thing. Having application separated into two loosely coupled parts: write and read, each one having own storage gives you an unique ability to switch off any of the parts without affecting another one.Think about a failure. It greatly reduces risk of the whole application going down.
In my opinion, that is another great advantage of CQRS architecture. Advantage sometimes underestimated.

Grails + Axon = CQRS (part I)

I continue series of posts regarding Groovy and Grails. In the previous post there were some starter information about Groovy programming language. In this one and the next one I would like to present sample application built with Grails and Axon which demonstrates a basic usage of CQRS architecture.

Grails

Grails is a rapid application development framework for the JVM. It is largely built on top of the Spring framework, and in particular, a lot of it is built on top of Spring MVC. So if you’re building a web application that targets any other platform, Grails is not for you. Grails is specifically written as a framework for building JVM web applications.
There are lots of developer productivity gains that Grails has to offer. It’s very, very easy to quickly get an application up and running on the Grails framework because of a lot of things that Grails does to help with things like a convention over configuration and the sensible defaults. Literally, in a couple of minutes you can have a simple Grails application up and running. And from there Grails offers a lot to simplify the process of building serious enterprise web applications.

 

grails

 

CQRS

Firstly, let’s try to answer a simple question. Why we may need CQRS?
Scalability. It is definitely a hard problem.
What can we do if we need to handle a lot of  concurrent connections?
One way to deal with it is using asynchronous concepts. Grails 2.3 has some async improvements. There is an API for doing async stuff. What is more Grails 2.3 introduces Promises, a concept similar to java.util.concurrent.Future instances.
That is one aspect, do async stuff using current programming model.

The other way to deal with scalability is changing completely an architecture. Here CQRS has his place. CQRS stands for Command Query Responsibility Segregation. It very often seems to be associated with Domain Driven Design. While Domain Driven Design is mainly focused on getting an application domain model right, CQRS is a wider concept.
The basic concept behind is that the part of the application that updates the database (call it write model) has different requirements that part which reads (call it read model) and reports to the user data from the database.
CQRS embeds that concept into application architecture. It clearly separates part that process commands and the one which queries data.
Someone may ask why we need such separation, why we need two storages in one application.
If we look closer at the two models (read and write) we notice that the requirements for them are different. The command part (write) is responsible for validating and execution of commands. Whereas the query part just reads data and transfers it to the view layer. Going further, storages for those parts can be optimized specifically for the particular purpose they serve. For example read storage can store denormalized data. What is more, database tables/documents can be design to serve particular view. So that no joining is needed. At the same time the write storage can persist data in normalized form. There are only a few examples which clearly justify the model CQRS provides.

source: infoq.com
source: infoq.com

When we get an update which comes in as an command (Grails supports that providing command objects), the command is processed by command handling component. Each command should have specific processing component. The command interacts with domain model. The domain model may change and the changes go to the persist model to be stored in the database. It does not need to be a database as we know it. In most cases it should be an Event Store.
The idea behind the Event Store is that the changes are stored to the database. As a result, we never loose information. Every change is persisted, we can go back, get audit trail, produce complicated reports and so on.
When persisting data in the write model a message is produced that triggers update in the other part of the application, the read model. The message is placed on the message/event bus. The update is done asynchronously. Then using the read model and the storage on that part of the application data is queried and presented to the user. Because of asynchronous nature of the updates in the read model we must assume that data presented to the user can be stale sometimes. However, that issue is present only when we use views generated on the server side. Modern web applications use wide variety of technologies on the view layer apart of server side generated pages so it is pretty simple to trigger view refresh when data update is captured in read model. By the way, there is Grails Atmosphere Plugin which provides smooth integration with Atmosphere framework – AjaxPush/Comet framework.

Axon

Implementing a CQRS architecture requires quite a lot of “plumbing” and boilerplate code. Much of that code is project-independent and quite error-prone to do right. Axon aims at providing the basic building blocks, which are needed in most CQRS-style applications. Event dispatching, for example, is typically an asynchronous process, but you do want guaranteed delivery and, in some cases, sequential delivery. Axon provides building blocks and guidelines for the implementation of these features. Besides these core-components, Axon wants to make integration with third party libraries possible. Think of frameworks and libraries such as JMS, MQ and Spring Integration.

The above was said by Allard Buijze, the author of Axon Framework. It perfectly sums up the reasons standing behind creating the framework and the areas in which Axon can help us to develop applications compliant with CQRS model. In my opinion it is brilliant framework, very well designed, open for extension. What is more it provides simple implementation of particular CQRS building blocks like event store, event bus, etc. which can be used straightforward in our application.

Further reading:

In the next part I will present simple Grails application which uses Axon framework to implement basic CQRS model.

Stay tuned!

Greg Young in Lublin

The big event for Lublin’s IT community is approaching. Greg Young has already come to the town. He is giving a talk regarding CQRS and Event Sourcing on Monday 22.03.
Besides his vast technical knowledge, he is a brilliant speaker who can really infect people around with his passion.
I am sure that there is no need to introduce that guy to anyone but maybe it would be worth mentioning some web resources regarding his activities:

Greg Young blog 

His stuff on InfoQ:

     CQRS document by Greg Young

     DDDD by Greg Young