Microservices / Programming Languages

Spring and Data Synchronization Between Queries and Commands

16 Oct 2018 10:36am, by

Pivotal sponsored this post.

Jakub Pilimon
Jakub is a Spring developer advocate at Pivotal, a blogger, a passionate programmer and a trainer. He loves to tackle complex architectures with domain-driven design, test-driven development and Spring. Being a microservice freak, architecture is his main area of interest. When he does not program, he rides motorbike, skis or practices kitesurfing. He is also writes the MVB-awarded blog: pillopl.github.io.

In this article, we describe CQRS and why it might be useful to you for microservices deployments and management. We also outline in a very hands-on and comprehensive way different ways of using CQRS with Spring tools. Along the way, we’ll consider pros and cons of each solution as we go through that journey. Note that these synchronization techniques are agnostic.You might find them useful even if you don’t use CQRS. By the same token,  you might implement CQRS in a different manner.

What is CQRS? This pattern divides your system in two distinct parts. CQRS separates the components used for writing (executing commands) from those for querying. As such, the Command Service and Query Service are decoupled, and can be operated separately.

Let’s Start with CQS (We’ll Add the R Later)

On our journey to understanding CQRS, let’s start with CQS — the famous Command-Query Separation. (Note the lack of an “R” in this approach — it’s intentional.)

The CQS (pattern says that object’s methods should group into:

  • Commands: can change state of the system and does not return any value.
  • Queries: cannot change state of the system (are free of side effects) and return a value.

This clear separation is useful for developers. Here’s why: queries and commands are fundamentally different operations. They have different scalability characteristics, so it’s useful to decouple them. Queries can be repeated; they are safe and idempotent. (That is to say asking a query a second time doesn’t change the answer.) You can process different orders of queries without worrying about consistency. By contrast, commands are not safe to repeat. The order of commands matters.

We can further our understanding by building a sample app with these principles in mind. Let’s build a credit card object following CQS tips. Let’s give ourselves these  requirements:

  • The user can withdraw a given amount when there is enough money in the account tied to their credit card. We’ll call this the Withdraw
  • The user can view a list all withdrawals from the account linked to their credit card. We’ll call this the Show Me Withdrawals

The Withdraw command and Show Me Withdrawals query are two basic use-cases. Also, querying for withdrawals should return the details of all past Withdraw commands. Implementing these commands and queries is simple when they both use the same data store.

You could use separate data stores, provided you had a mechanism to make the sources for command and query consistent. To help illustrate these concepts, let’s use a color code in some fun drawings in our tutorial.

On to the code!

IN ONE CLASS

Here is a simple class (that deals with two mentioned use-cases):

public class CreditCard {
  @OneToMany(cascade = CascadeType.ALL)
   private List<Withdrawal> withdrawals = new ArrayList<>();
   @Id @GeneratedValue @Getter 
    private UUID id;
    private BigDecimal initialLimit;
    private BigDecimal usedLimit = BigDecimal.ZERO;

   public void withdraw(BigDecimal amount) {
       if (thereIsMoneyToWithdraw(amount)) {
           usedLimit = usedLimit.add(amount);
           withdrawals.add(new Withdrawal(amount, id));
       } else {
           throw new NotEnoughMoneyException(id, amount, availableBalance());
       }
   }
   //.. private methods
   public List<Withdrawal> getWithdrawals() {
       return Collections.unmodifiableList(withdrawals);
   }
}

Note that the primary goal of this article is to describe synchronization. For this reason, we will keep things simple, and represent monetary values as BigDecimals. In the real world, you might want to spend more time modeling concepts represented by this class.

Please note there is the “withdraw” command and  the “getWithdrawals” query. The separation is clear. The credit card class is responsible for checking the business invariants after receiving a new command.  Query immediately returns all the past withdrawals. There is one model to deal with commands and queries. And the credit card class is the mentioned synchronization mechanism.

The business invariants and synchronization can be tested with a simple unit test. You can find it here.

Using spring-mvc, we can expose RESTful API for two use-cases. HTTP GET for the query (list of withdrawals) and HTTP POST for the withdraw command.

@RestController("/withdrawals")
class WithdrawalsController {
//fields, constructors
  @PostMapping
ResponseEntity withdraw(@RequestBody WithdrawalCommand withdrawalCommand) {
   CreditCard creditCard = creditCardRepository.findById(withdrawalCommand.getCard())
       .orElseThrow(() -> new IllegalStateException("Cannot find card with id " +     withdrawalCommand.getCard()));
   creditCard.withdraw(withdrawalCommand.getAmount());
   return ResponseEntity.ok().build();
}

@GetMapping
ResponseEntity<List<Withdrawal>> withdrawals(@PathParam("cardId") String cardId) {
   return 
ResponseEntity.ok().body(creditCardRepository.getOne(UUID.fromString(cardId)).getWithdrawals());
}
}

An end to end test of our REST API using spring-test can be found here:

@Test
public void shouldSynchronizeQuerySideAfterSendingACommand() {
   // given
   UUID cardUUid = thereIsCreditCardWithLimit(new BigDecimal(100)); //http POST
   // when
   clientWantsToWithdraw(TEN, cardUUid); //http GET
   // then
   thereIsOneWithdrawalOf(TEN, cardUUid);

//private methods
}
The entire application can be found here.

Even though there is a separation of queries and commands, there is a coupling between them. It is because one object handles both functions. Is this approach a problem? It might not be the best answer for us, let’s take a closer look at why.

  • If we use an Object-Relational Mapper and lazy loading for Withdrawals, we could run into trouble. We may create the object in an inconsistent state. Imagine loading the initial state (with used limit), creating a new withdrawal by a concurrent thread, and loading lazy collection with new withdrawal. Of course, there are solutions to this. For instance, you can load eagerly, or calculate used limit by counting the values in the withdrawals.
  • The withdraw command adds a new withdrawal to the list of current withdrawals. This operation is not needed to accept or deny the command. However, it is needed to perform an accurate query in the future;
  • Adding a new withdrawal requires accessing the list of withdrawals. What about lazy loading? Will an Object-Relational Mapper fetch the entire collection from the database, to simply add a new withdrawal? Some of them will treat this operation as simple insert. But some will not. There is a risk of loading a huge number of superfluous withdrawals to the application’s memory;

Most systems have far more reads than writes. That means your system is likely to benefit from different models and different data structures. In other words, those different non-functional requirements may lead to different design decisions.

Remember these two truths:

  • Reads are safe to repeat.Commands are not.
  • Reads are cacheable.Commands are not.

Let’s say that we want to handle those use cases with different objects. We say nothing about separate storages. At least, not yet.

You may also face situation with existing code and data, which we need to synchronize to deliver new value. Some of the synchronization techniques that we will use in the next sections might be helpful.

Now, Let’s Separate Our Use Cases

One way to break our credit card into separate objects:

  • Let’s remove withdrawals collection from originate class.
  • And introduce a new concept, which represents list of client’s withdrawals. Withdrawals seems like a good choice.

public class CreditCard {


  @Id @GeneratedValue @Getter private UUID id;

  private BigDecimal initialLimit;

  private BigDecimal usedLimit = BigDecimal.ZERO;


  public CreditCard(BigDecimal limit) {

      this.initialLimit = limit;

  }

  public void withdraw(BigDecimal amount) {

      if (thereIsMoneyToWithdraw(amount)) {

          usedLimit = usedLimit.add(amount);

      } else {

          throw new NotEnoughMoneyException(id, amount, availableBalance());

      }

  }

 //private methods

}

class Withdrawals {

  private List<WithdrawalsDto> withdrawals;

}

class WithdrawalDto {

  private UUID cardId;

  private BigDecimal amount;

}

The REST API does not change from a client’s perspective. However, now the list of withdrawals are not present in a credit card. The credit card has no idea about withdrawals. Success of ORMs and abstractions over those had a cost. We tend to forget that in most cases there is a RDBMS that powers the system. When this is the case, we can use a direct query. There’s no need to go through the credit card class. Let’s see how simple this is:

@GetMapping

ResponseEntity<List<WithdrawalDto>> withdrawals(@PathParam(“cardId”) String cardId) {

  return ResponseEntity.ok().body(loadWithdrawalsFor(UUID.fromString(cardId)));

}


private List<WithdrawalDto> loadWithdrawalsFor(@PathVariable UUID cardId) {

  return withdrawalsRepository..queryForWithdrawals(cardId);

}

Inside of WithdrawalsFinder, we could do use a direct query to our database, or just use a read-only transaction. But the important thing is that we avoid the  dirty checking penalty. What is this, and why do you want to avoid it? Dirty checking is a mechanism built into object-relational-mapper to detect changes of our objects. Of course, queries will not change state, so dirty checking is useless. Spring allows use to disable it when you use read-only transactions.

Mapping a query result to an immutable data structure is more than enough

Quick aside: We might have you thought about mapping part of entity’s data for read purposes and the rest for write.? It is possible to map two JPA entities to one database table. (You can do it by mapping different columns.) But is that necessary? Entities can be modified. When two entities mapped to the same table, you could be introducing frequent optimistic locking exceptions. And again, the entity that serves queries does not need to change. Immutable projection is a natural solution.

But now end-to-end tests fail. After successful Withdrawal command, there are zero withdrawals returned when querying. Why? Because we removed some code that used to synchronize commands and queries. The list of withdrawals in CreditCard disappeared.

In the reminder of the post, let’s have a look at different ways of synchronizing state of our CreditCard and Withdrawals. First up: explicit synchronization.

Explicit Synchronization

Synchronization can be moved one layer up from the CreditCard object to the application layer. After all, we want to keep our credit card simpler. Similarly, we want to avoid dirty checking when querying. Here’s how it can work.

Let’s update the credit card’s limit during a withdrawal. Let’s insert a new withdrawal into the database too. We can create a simple application service that does the job. Its name is WithdrawalProcess. It makes our test green again.

@Service

public class WithdrawalProcess {

  private final CreditCardRepository creditCardRepository;

  private final WithdrawalsService  withdrawals;

  WithdrawalProcess(CreditCardRepository creditCardRepository, WithdrawalsService withdrawals) {

      this.creditCardRepository = creditCardRepository;

      this.withdrawals = withdrawals;

  }

@Transactional

  public void withdraw(UUID cardId, BigDecimal amount) {

      CreditCard creditCard = creditCardRepository.findById(cardId)

              .orElseThrow(() -> new IllegalStateException(“Cannot find card with id ” + cardId));

      creditCard.withdraw(amount);

      withdrawals.add(new Withdrawal(UUID.randomUUID(), cardId, amount).;

  }

}

Commands and queries are not coupled. We have an application service that ensures that state is consistent. Like in the previous example, the synchronization is immediate. Change is atomic. And our system performed the modification of the credit card limit and the insertion of a new withdrawal in a single transaction.

Before we had implicit synchronization done by ORM. Now we have explicit synchronization done in the application layer. Immediate synchronization wouldn’t be possible with a separate database only for withdrawals. We cannot beat the  CAP theorem and atomically change two different databases. But this approach delivers the desired business outcome.

The whole application can be found here (with projection at query side) and here (with entity at query side).

We continue on with a look at a kind of implicit synchronization, one featuring Spring application events.

Implicit Synchronization with Spring Application Events

Some people may not like the manual and explicit work done in the application service above. Could there be a different way to insert a new withdrawal? Yes, there is! To invert the control, our credit card class can finish with an event publication. This event can be handled in a different part of our application. To publish and listen to events, spring application events can be used.

Here is how to publish the event in WithdrawalProcess.

@Transactional

public void withdraw(UUID cardId, BigDecimal amount) {

  CreditCard creditCard = creditCardRepository.findById(cardId)

          .orElseThrow(() -> new IllegalStateException(“Cannot find card with id ” + cardId));

  CardWithdrawn event = creditCard.withdraw(amount);

  applicationEventPublisher.publishEvent(event);

}

And here how to subscribe to it and create a new withdrawal.

@EventListener

public void addWithdrawalOnCardWithdrawn(CardWithdrawn event) {

  withdrawalsRepository.add(new Withdrawal(UUID.randomUUID(), cardId, amount);

}

As you can see, the only difference from our previous example is how we divide the identical work among components. The Application service used to insert the withdrawal, now it publishes it as an event, and the ReadModelUpdater inserts the withdrawal. The conceptual flow is still the same; we update credit card and withdrawal tables. The application events are there to invert the control. Remember that everything runs in a single transaction. If there is a failure when inserting a new withdrawal, the change in CreditCard is rolled back.

If you want to avoid manual publication and a dependency on  ApplicationEventPublisher, Spring can do all the work for you. If you are interested, read here.

Two previous synchronizations operated on the application level. That gave explicit control over it. However, there is also an implicit way of doing that and it operates on the database level – we can use a database trigger.

Database Trigger

Here’s a drawing we can use to visualize this choice:

Here is how to do that with an in-memory version of an H2 database:

public class CreditCardUsedTrigger implements Trigger {


  @Override

  public void fire(Connection connection, Object[] before, Object[] after) throws SQLException {

      try (PreparedStatement stmt = connection.prepareStatement(

              “INSERT INTO WITHDRAWAL (ID, CARD_ID, AMOUNT) ” + “VALUES (?, ?, ?)”)) {

          stmt.setObject(1, UUID.randomUUID()); //generate withdrawal id

          stmt.setObject(2, cardId(after));

          stmt.setObject(3, getUsedLimitChange(before, after));


          stmt.executeUpdate();

      }

  }

  private Object cardId(Object[] cardRow) {

      return cardRow[0];

  }

private BigDecimal getUsedLimitChange(Object[] oldCardRow, Object[] newCardRow) {

      return ((BigDecimal) newCardRow[2]).subtract((BigDecimal) oldCardRow[2]);

  }

}

The synchronization is immediate. A change in a credit card limit and adding a new withdrawal is still atomic because the trigger is a part of a transaction.

This solution has some drawbacks. As mentioned, it is a database specific solution. That means that application logic is not kept in the application. Also, the solution might not be portable to a different database.

We mentioned that the application service, and the trigger, decouple commands and queries. Well, this is not entirely true. What happens if the trigger fails? Everything is one transaction. The Withdraw operation will fail for reasons unrelated to executing the command.

Moving synchronization from the credit card class to an upper layer the domain layer to the application layer (explicitly or implicitly with application events), and then to database layer did not solve all issues. We still have coupling to a degree. Some may say events give us decoupling. Here’s why I disagree.

Note that the application event from the previous solution exists only in memory. Any code invoked before and after the event’s publication is handled within one database transaction. Hence, the synchronization is immediate. This has a hidden, and significant, drawback. It may look as if commands and queries are decoupled. In fact one transaction creates a strong implicit coupling. This is not obvious from reviewing the code.

The whole application can be found here.

Transaction Log Tailing

Now, onto the next step. That one will be a bit counterintuitive. Sometimes it might be useful to actually synchronize in this way. Especially when we don’t have control over the application that produces the initial state. Let’s get back to the example. We need to synchronize when the thread responsible for the Withdraw command finishes. One of the options is to use the database transaction journal, and look for the credit card’s limit to be updated. A transaction journal or transaction log is a history of executed actions. Before, a trigger fired based on changes to the credit card’s limit in the same transaction and thread. Now, our application can scan a transaction log and react to previous updates, and go on to update the credit card limit. In this setup we will use:

  • MySQL to keep withdrawals and credit cards;
  • Apache Kafka (for pub/sub for messages read from a database transaction log (MySQL in this case);
  • Kafka Connect with Debezium to read MySQL’s transaction log and stream messages to Kafka’s topic;
  • Spring Cloud Stream to read messages from Kafka’s topic.

ReadModelUpdater is now responsible for creating a new withdrawal. It subscribes to Kafka’s topic and looks for messages read from the transaction log:

@Service

@Slf4j

class ReadModelUpdater {

 private final WithdrawalRepository withdrawalRepository;

 ReadModelUpdater(WithdrawalRepository withdrawalRepository) {

    this.withdrawalRepository = withdrawalRepository;

 }


 @StreamListener(Sink.INPUT)

 public void handle(Envelope message) {

    if(message.isUpdate()) {

       saveWithdrawalFrom(message);

    }

 }


 private void saveWithdrawalFrom(Envelope message) {

    String cardId = message.getPayload().getBefore().getId();

    BigDecimal withdrawalAmount

          = balanceAfter(message).subtract(balanceBefore(message));

    withdrawalRepository.save(new Withdrawal(withdrawalAmount, cardId));

 }


 private BigDecimal balanceAfter(Envelope message) {

    return message.getPayload().getAfter().getUsed_limit();

 }


private BigDecimal balanceBefore(Envelope message) {

    return message.getPayload().getBefore().getUsed_limit();

 }

}

 

At this point, ReadModelUpdater could save a new withdrawal in a separate storage. Failure to inserting a new withdrawal does not affect the command side. But, the change is now no longer atomic..It takes some time to insert a new withdrawal after a successful Withdraw command. The solution is eventually consistent.

Pub/Sub allows us to add new read models without affecting the command model. But introducing messaging has a penalty. Now, we have to think about the order of messages and duplicates. And like triggers, transaction logs are solutions specific for a given database. Also, testing became harder due to new dependencies (Kafka, Kafka Connect, Transaction Log). At least we can test if a message delivery results in a new withdrawal in our database:

@Test

public void shouldSynchronizeQuerySideAfterLogTailing() {

  // given

  String cardUUid = thereIsCreditCardWithLimit(new BigDecimal(100));

  // when

  creditCardUpdateReadFromDbTransactionLog(TEN, cardUUid); //message came from input channel

  // then

  thereIsOneWithdrawalOf(TEN, cardUUid); //database was populated

}


Actually, there is a bigger problem with transaction log tailing and with triggers. Let’s try to add a new feature to the credit card’s system: chargebacks. The implementation is simple: we lower used limit whenever there is a chargeback.


public class CreditCard {


  // …fields


  public void chargeBack(BigDecimal amount) {

      usedLimit = usedLimit.subtract(amount);

  }


}

Guess what happens to our read models when there is a chargeback? You guessed it: the trigger will run. And yes, a message will appear in the transaction log. The consequence is a withdrawal with a negative amount. We can fix that with an if statement allowing only positive amounts for a withdrawal. But this a short-term solution — the real problem is somewhere else. It relates to one of the most common misconceptions in software development. Our solution suffers from “business-intent amnesia.”

This solution focuses on data instead of behaviors. There can be many business cases (withdrawals, chargebacks) that lead to changes in state (i.e. changing used limit). When we rely on state, it might be impossible to find a real business reason behind this change. Using database triggers or transaction log tailing steals that knowledge.

The whole application can be found here.

Introducing Domain Events

So now we need something that looks like a transaction log – but with a business intent. Ideally, we’d have something like an append-only log of immutable changes that contain business context. In other words, we need a stream of domain events. Our transaction log was populated automatically by the database management system. But our events will be published from the application code. We resolve this situation with some extra care. An event should be published only when the command was successful. (Want to know more about that? Read this article.)

Note: Domain events represent all changes that happened to a credit card. This way, we can host domain events in some kind of a storage to recreate state of any credit card. Hence, a command would complete by just publishing a set of events. There is no need to keep credit card state, and there’s no problem with reliable event publication. This technique is called Event Sourcing. The Axon framework that makes it easier to use Event Sourcing and CQRS with Spring Boot.

In this setup we use:

  • MySQL to store credit cards data;
  • Apache Kafka for pub/sub for domain events;
  • Spring Cloud Stream for publishing and listening from Kafka’s topic;
  • MongoDB as a separate storage for withdrawals. To connect to MongoDB we use reactive driver from Spring Data Reactive MongoDB; Please note that the use of MongoDB (especially with reactive driver) is optional. The point is that we are free to use whatever best fits our query-side in this example. We can choose wisely!

The solution is distributed. We have separate Spring Boot applications and storage for commands and queries. What if we have many more queries than commands? Query side  can be horizontally scaled, independent of MySQL. Plus, our selection of a different database at the read-side has advantages. We may opt for a more performant models for querying.

This solution differs from transaction log tailing in 3 aspects:

  1. There is a separate storage for withdrawals. The transaction log contains state changes, whereas domain events focus on business behaviours;
  2. The transaction log is populated by the database management system. Domain events must be published by application’s code.

We need to remember how to atomically push an event after changing internal state. One of the ideas would be to internally store events in the same database and them send them by a scheduled publisher. If you would like to know how to implement that, check here.

ReadModelUpdater is now responsible for creating a new withdrawal. It subscribes to Kafka’s topic and looks for domain events:

@Service

class ReadModelUpdater {

 

 private final WithdrawalsRepository withdrawalsRepository;

 

 ReadModelUpdater(WithdrawalsRepository withdrawalsRepository) {

    this.withdrawalsRepository = withdrawalsRepository;

 }

 

 @StreamListener(target = Sink.INPUT, condition = “headers[‘type’] == ‘card-withdrawn'”)

 public void handle(CardWithdrawn cardWithdrawn) {

      withdrawalsRepository

          .save(new WithdrawalDto(UUID.randomUUID().toString(), cardWithdrawn.getCardNo().toString(), cardWithdrawn.getAmount())).subscribe();

 }

}

}

Two applications representing synchronization done by events can be found here: query and command.

Summary

All of our discussed approaches have their pros and cons. The important thing is to choose an option that works for your use case. My advice to you is to pick one option, then to consider these factors:

Implementation: How easy is to implement proposed solution?

Testability: How easy is it to test the solution?

Complexity: How many technologies are needed to build the solution?

Consistency: Is the query side immediately consistent after a successful command? Or is it or eventually consistent?

Portability: Is it easy to port a solution to a different vendor? For instance, some of the solutions may rely on specific databases. That creates vendor lock-in.

 

Scalability: Is it easy to horizontally scale the one of the sides, independent of the another?

Distribution: Is the solution distributed? Are the use-cases handled by different processes?

Extensibility: Is it easy to add new model?

 


A digest of the week’s most important stories & analyses.

View / Add Comments

Please stay on topic and be respectful of others. Review our Terms of Use.