Where We Left Off Previously

In the previous architecture spotlight entry, we discussed Event Sourcing and illustrated the concept with a simple banking account example. We laid out many of its pros and cons to help readers decide if the pattern would be useful to them.

In this post, we will be expanding the example and showing a working code implementation using a popular event streaming technology, Apache Kafka.

Why Kafka?

Kafka provides persistent messaging infrastructure and key/value data storage, supporting both event sourcing and derived state storage within a single package. These are the two primary pillars that make up a real-world event sourcing application. Kafka’s central construct of a messaging topic is used to support both concepts – its storage APIs act as a higher-level construct on top of its topic APIs.

However, event sourcing is not Kafka’s only use-case. It was originally designed to support event streaming – high-volume messaging, processing and analytics – so also works well as the general messaging fabric within your microservice system. It is easily scalable, and through its scalability, provides partitioned, replicated storage for the messages you send through it.

Scaffolding the Basic Example

We opt for a couple of abstractions to reduce boilerplate and allow us to focus on the event sourcing implementation. We will be using Spring Cloud Stream to provide the basis of an integration-enabled microservice app, together with its Kafka and Kafka Streams binders to allow us to operate with Kafka. Input, output and processing functions within the sample application will be designed as Spring Cloud Functions, both simplifying the processing code as well as allowing for easier deployment to serverless infrastructure on a cloud provider, if desired. We also use Lombok to reduce domain object boilerplate and to provide us with readily-available logging, as well as Jackson to handle marshaling data to and from JSON.

The project described in this article will be built using Spring Cloud, so the first step is to download a project scaffold from Spring Initializr. Once downloaded, edit the project’s pom.xml and add the Spring Cloud Stream Kafka binders to the project’s dependencies list:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    </dependency>
    ...

You can now build and run the project from the command line, as follows (substitute mvnw for mvnw.cmd on a Windows platform):

./mvnw clean package && java -jar target/es-example*.jar

Running this will show typical maven build output (including test executions), then log output from the running app. To start with, the log output will consist of the spring boot logo together with a few log messages indicating initial application startup, then shutdown, as right now the application contains no runtime processing logic.

Running Kafka

We will also need a running Kafka instance that our application can interact with to send and receive messages, as well as to access our persistent derived-state storage.

The easiest way to get started locally is by using docker-compose. A suitable docker-compose.yml file is available (along with several other Spring Cloud Stream Kafka examples for you to take a look at). Once you have downloaded this docker-compose.yml file to your project, simply run:

docker-compose up -d

Running this will start a zookeeper instance (which Kafka requires to orchestrate its clustering), as well as Kafka itself.

Adding the Domain Model

Before we can add some processing logic to our example application, we will need a few domain objects to model our Commands, Events and our Bank Account Entity. These objects will represent the data we pass as messages over Kafka, as well as general state containers, and will be what our processing logic operates on.

Entity

The basic data entity we will deal with is a bank account, described by the BankAccount class. Note we do include an account ID here, as Kafka needs a way to correlate messages by some form of key. All messages with the same key are assumed to operate on the same data object. Newer messages could also represent the complete updated state for the specified object, depending on how your system is designed. Kafka also stores all messages with the same key together on a given node, to increase efficiency when processing the complete message history for that specific data object.

package com.example.esexample.model.entities;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import lombok.Builder;
import lombok.Value;

@Value
@JsonDeserialize(builder = BankAccount.BankAccountBuilder.class)
@Builder(builderClassName = "BankAccountBuilder", toBuilder = true)
public class BankAccount {

   String accountId;
   Long balance;

   @JsonPOJOBuilder(withPrefix = "")
   public static class BankAccountBuilder {}

}
package com.example.esexample.model.entities;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import lombok.Builder;
import lombok.Value;

@Value
@JsonDeserialize(builder = BankAccount.BankAccountBuilder.class)
@Builder(builderClassName = "BankAccountBuilder", toBuilder = true)
public class BankAccount {

   String accountId;
   Long balance;

   @JsonPOJOBuilder(withPrefix = "")
   public static class BankAccountBuilder {}

}

Events

We opt for a single event object together with an enum to differentiate between multiple event types, however, this is by no means the only way to model such a domain.

The event type enum:

package com.example.esexample.model.events;

public enum AccountEventType {

   ACCOUNT_OPENED, ACCOUNT_CREDITED, ACCOUNT_DEBITED

}

In addition to credit and debit events, we also include ACCOUNT_OPENED. This is used as a marker to ‘open’ an account, meaning that an initial account object (with zero starting balance) is created within the system. Credit and debit events can only operate on an already-existing account.

Account events are represented by an AccountEvent class:

package com.example.esexample.model.events;

import com.example.esexample.model.entities.BankAccount;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import lombok.Builder;
import lombok.Value;

@Value
@JsonDeserialize(builder = AccountEvent.AccountEventBuilder.class)
@Builder(builderClassName = "AccountEventBuilder", toBuilder = true)
public class AccountEvent {

   String accountId;
   AccountEventType eventType;
   Long amount;
   BankAccount bankAccount;

   @JsonPOJOBuilder(withPrefix = "")
   public static class AccountEventBuilder {}

}

Notice the event contains a representation of the latest bank account state within it. This is only done for point-in-time processing convenience and is by no means required. With event sourcing, it’s key to remember that the sequence of all events ultimately represents the master state record. Any other state representation is purely for convenience.

Commands

Similar to the events, we use a single container object with an enum to differentiate between command types. The enum also contains most of the processing logic to apply a command to an account, and by doing so, generate an event.

The command type enum:

package com.example.esexample.model.commands;

import com.example.esexample.model.entities.BankAccount;
import com.example.esexample.model.events.AccountEvent;
import com.example.esexample.model.events.AccountEventType;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public enum CommandType {

   DEPOSIT {
       @Override
       protected AccountEvent processForAccount(String accountId, Long amount, BankAccount bankAccount) {
           validateAccountPreconditions(accountId, amount, bankAccount);

           log.info("Processing {} (amount: {}) for account {}", this, amount, bankAccount);

           final BankAccount newAccountState = BankAccount.builder().accountId(bankAccount.getAccountId())
                   .balance(bankAccount.getBalance() + amount).build();

           return AccountEvent.builder().accountId(bankAccount.getAccountId())
                   .eventType(AccountEventType.ACCOUNT_CREDITED).amount(amount).bankAccount(newAccountState).build();
       }

       @Override
       protected void validateAccountPreconditions(String accountId, Long amount, BankAccount bankAccount) {
           if (!accountId.equals(bankAccount.getAccountId())) {
               throw new IllegalStateException("Command request is not for the specified account");
           }
       }
   }, WITHDRAW {
       @Override
       protected AccountEvent processForAccount(String accountId, Long amount, BankAccount bankAccount) {
           validateAccountPreconditions(accountId, amount, bankAccount);

           log.info("Processing {} (amount: {}) for account {}", this, amount, bankAccount);

           final BankAccount newAccountState = BankAccount.builder().accountId(bankAccount.getAccountId())
                   .balance(bankAccount.getBalance() - amount).build();

           return AccountEvent.builder().accountId(bankAccount.getAccountId())
                   .eventType(AccountEventType.ACCOUNT_DEBITED).amount(amount).bankAccount(newAccountState).build();
       }

       @Override
       protected void validateAccountPreconditions(String accountId, Long amount, BankAccount bankAccount) {
           if (!accountId.equals(bankAccount.getAccountId())) {
               throw new IllegalStateException("Withdrawal request is not for the specified account");
           }

           if (bankAccount.getBalance() < amount) {
               throw new IllegalStateException("Insufficient funds to process withdrawal request");
           }
       }

   };

   abstract protected AccountEvent processForAccount(String accountId, Long amount, BankAccount bankAccount);

   abstract protected void validateAccountPreconditions(String accountId, Long amount, BankAccount bankAccount);

   boolean canProcessForAccount(String accountId, Long amount, BankAccount bankAccount) {
       try {
           validateAccountPreconditions(accountId, amount, bankAccount);
           return true;
       } catch (IllegalStateException e) {
           return false;
       }
   }

}

Of particular interest above is the validateAccountPreconditions() method in the WITHDRAW enum value. This represents the business logic to check that the account into which the command is being processed contains sufficient funds for the withdrawal to succeed. An IllegalStateException is thrown if this condition cannot be met.

The command container object:

package com.example.esexample.model.commands;

import com.example.esexample.model.entities.BankAccount;
import com.example.esexample.model.events.AccountEvent;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import lombok.Builder;
import lombok.Value;

@Value
@JsonDeserialize(builder = Command.CommandBuilder.class)
@Builder(builderClassName = "CommandBuilder", toBuilder = true)
public class Command {

   String accountId;
   CommandType commandType;
   Long amount;

   public boolean canProcessForAccount(BankAccount bankAccount) {
       return null != commandType && commandType.canProcessForAccount(accountId, amount, bankAccount);
   }

   public AccountEvent processForAccount(BankAccount bankAccount) {
       return null == commandType ? null : commandType.processForAccount(accountId, amount, bankAccount);
   }

   @JsonPOJOBuilder(withPrefix = "")
   public static class CommandBuilder {}

}

Utility Classes

A Constants class stores constants used across the app:

package com.example.esexample.util;

public class Constants {

   public static final String UNITY_ACCOUNT_ID = "unity";

   public static final String HEADER_ACCOUNT_ID = "accountId";
   public static final String HEADER_EVENT_TYPE = "eventType";
   public static final String HEADER_CORRELATION_UUID = "correlationUuid";

   public static final String EVENT_STORE_ACCOUNTS = "stores.accounts";

   public static final String RESOURCE_ACCOUNT_GET_PREFIX = "/account/";
   public static final String RESOURCE_ACCOUNT_GET = RESOURCE_ACCOUNT_GET_PREFIX + "{accountId}";

   private Constants() throws IllegalAccessException {
       throw new IllegalAccessException("Utility class");
   }

}

Of particular interest is the UNITY_ACCOUNT_ID value. This represents the ID of the singular account we are operating on as part of this example and is only present to allow Kafka to correlate messages intended for this particular account entity.

The AccountPrinter class will periodically output the current state of the bank account:

package com.example.esexample.util;

import com.example.esexample.service.BankAccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClientException;
import org.springframework.web.server.ResponseStatusException;

@Component
@Slf4j
public class AccountPrinter {

   @Autowired
   private BankAccountService bankAccountService;

   @Scheduled(fixedRate = 5_000)
   public void printAccountState() {
       log.info("-----------------------------------");
       try {
           log.info("Account state: {}", bankAccountService.getAccount(Constants.UNITY_ACCOUNT_ID));
       } catch (RestClientException e) {
           log.warn(
                   "Account service still starting up (unable to parse response for account " + Constants.UNITY_ACCOUNT_ID + ")");
       } catch (ResponseStatusException e) {
           if (e.getStatus() == HttpStatus.SERVICE_UNAVAILABLE) {
               log.warn("Account [id: '{}'] not yet available", Constants.UNITY_ACCOUNT_ID);
           } else if (e.getStatus() == HttpStatus.NOT_FOUND) {
               log.warn("Account [id: '{}'] not found", Constants.UNITY_ACCOUNT_ID);
           } else {
               log.error("Failed to fetch account " + Constants.UNITY_ACCOUNT_ID, e);
           }
       } catch (RuntimeException e) {
           log.error("Failed to fetch account " + Constants.UNITY_ACCOUNT_ID, e);
       }
       log.info("-----------------------------------");
   }

}

The Service and Data Layers

The AccountPrinter needs a BankAccountService to fetch the latest available account status:

package com.example.esexample.service;

import com.example.esexample.dl.BankAccountRepository;
import com.example.esexample.model.entities.BankAccount;
import com.example.esexample.util.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;

@RestController()
@Slf4j
public class BankAccountService {

   @Autowired
   private BankAccountRepository bankAccountRepository;

   @RequestMapping(Constants.RESOURCE_ACCOUNT_GET)
   public BankAccount getAccount(@RequestParam(value = "accountId") String accountId) {
       if (null == accountId || accountId.isEmpty()) {
           throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "No account ID specified");
       }

       try {
           final BankAccount account = bankAccountRepository.getAccount(accountId);
           if (account == null) {
               throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Specified account not found");
           }
           return account;
       } catch (IllegalStateException e) {
           throw new ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE,
                   "This server is currently unable to process the request - please try again later");
       }
   }

}

The service provides a REST endpoint to fetch an account, given its account ID. It is simply a transport wrapper around the data layer repository, which is:

package com.example.esexample.dl;

import com.example.esexample.model.entities.BankAccount;
import com.example.esexample.util.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.stereotype.Repository;
import org.springframework.web.client.RestTemplate;

@Slf4j
@Repository
public class BankAccountRepository {

   @Autowired
   private InteractiveQueryService interactiveQueryService;

   public BankAccount getAccount(String accountId) {
       HostInfo hostInfo = interactiveQueryService
               .getHostInfo(Constants.EVENT_STORE_ACCOUNTS, accountId, new StringSerializer());

       if (null == hostInfo || null == hostInfo.host()) {
           throw new IllegalStateException("Unable to determine host responsible for the account");
       }

       if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
           log.debug("Fetching account from local host {}:{}", hostInfo.host(), hostInfo.port());
           final ReadOnlyKeyValueStore<String, BankAccount> accountStore = interactiveQueryService
                   .getQueryableStore(Constants.EVENT_STORE_ACCOUNTS,
                           QueryableStoreTypes.<String, BankAccount>keyValueStore());

           return accountStore.get(accountId);
       } else {
           log.debug("Fetching account from remote host {}:{}", hostInfo.host(), hostInfo.port());

           RestTemplate restTemplate = new RestTemplate();
           return restTemplate.getForEntity(String.format("http://%s:%d%s%s", hostInfo.host(), hostInfo.port(),
                   Constants.RESOURCE_ACCOUNT_GET_PREFIX, accountId), BankAccount.class).getBody();
       }
   }

}

This repository uses Kafka’s interactive query service to gain access to a KeyValueStore, from which the latest derived account state can be fetched. As previously mentioned, Kafka identifies and correlates data by key, storing all data for a given key locally to a given node. The repository queries for the HostInfo for the specified bank account ID – if Kafka responds that the data for the given key is being stored locally, the repository can then request a queryable key/value store through which it can fetch the account data directly.

When scaling up and running the services and Kafka across multiple nodes, the HostInfo returned from the interactive query service may indicate that the data for the given key is being stored on an external node. In this case, the repository acts as a proxy to fetch the account data from the identified node’s REST service (specifically, its BankAccountService.getAccount() method). This topology allows for any data to be requested from any REST node, and the data layer and Kafka will, behind the scenes, orchestrate which particular service node the request is routed to, depending on where Kafka is storing the data for the specified message key.

Message Integration and Processing Layers

Now that we have the complete domain, data and service layers in place, we can add our message processing nodes. These will inject messages into the system (acting as simulated user input), process messages within the topology, as well as act as persistence sinks for Kafka’s key/value stores.

Input

This is an account opener, which will send an initial ACCOUNT_OPENED event to instantiate the initial account object within Kafka’s storage:

package com.example.esexample.pipeline.input;

import com.example.esexample.model.entities.BankAccount;
import com.example.esexample.model.events.AccountEvent;
import com.example.esexample.model.events.AccountEventType;
import com.example.esexample.service.BankAccountService;
import com.example.esexample.util.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;
import org.springframework.web.server.ResponseStatusException;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

@Slf4j
@Service
public class AccountOpener {

   @Autowired
   private BankAccountService bankAccountService;

   private AtomicBoolean accountOpened = new AtomicBoolean(false);

   @Bean
   public Supplier<Message<AccountEvent>> openAccount() {
       return () -> {
           final String accountId = Constants.UNITY_ACCOUNT_ID;

           try {
               bankAccountService.getAccount(accountId);
               return null;
           } catch (RestClientException e) {
               log.warn("Account service still starting up (unable to parse response for account " + accountId + ")");
               return null;
           } catch (ResponseStatusException e) {
               if (e.getStatus() != HttpStatus.NOT_FOUND) {
                   return null;
               }
           }

           if (!accountOpened.compareAndSet(false, true)) {
               return null;
           }
           log.info("Requesting opening of account '{}'", accountId);

           final BankAccount bankAccount = BankAccount.builder().accountId(accountId).balance(0l).build();

           final AccountEvent accountEvent = AccountEvent.builder().accountId(accountId)
                   .eventType(AccountEventType.ACCOUNT_OPENED).bankAccount(bankAccount).build();

           log.info("Sending account event: {}", accountEvent);

           return MessageBuilder.withPayload(accountEvent).setHeader("messageKey", accountId)
                   .setHeader(Constants.HEADER_ACCOUNT_ID, accountId)
                   .setHeader(Constants.HEADER_EVENT_TYPE, accountEvent.getEventType())
                   .setHeader(Constants.HEADER_CORRELATION_UUID, UUID.randomUUID().toString()).build();
       };
   }

}

This is a command requestor, which simulates user input of depositing and withdrawing money from the account:

package com.example.esexample.pipeline.input;

import com.example.esexample.model.commands.Command;
import com.example.esexample.model.commands.CommandType;
import com.example.esexample.model.entities.BankAccount;
import com.example.esexample.service.BankAccountService;
import com.example.esexample.util.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;
import org.springframework.web.server.ResponseStatusException;

import java.util.UUID;
import java.util.function.Supplier;

@Slf4j
@Service
public class CommandRequestor {

   @Autowired
   private BankAccountService bankAccountService;

   @Bean
   public Supplier<Message<Command>> depositSource() {
       return () -> {
           final String accountId = Constants.UNITY_ACCOUNT_ID;

           try {
               final BankAccount bankAccount = bankAccountService.getAccount(accountId);
               if (null == bankAccount) {
                   return null;
               }
           } catch (RestClientException e) {
               log.warn("Account service still starting up (unable to parse response for account " + accountId + ")");
               return null;
           } catch (ResponseStatusException e) {
               if (e.getStatus() != HttpStatus.NOT_FOUND && e.getStatus() != HttpStatus.SERVICE_UNAVAILABLE) {
                   return null;
               }
           }

           Long amount = Math.round(Math.random() * 9.0d + 1.0d);
           Command depositRequest = Command.builder().commandType(CommandType.DEPOSIT).accountId(accountId)
                   .amount(amount).build();

           log.info("Sending deposit request: {}", depositRequest);

           return MessageBuilder.withPayload(depositRequest).setHeader(Constants.HEADER_ACCOUNT_ID, accountId)
                   .setHeader(Constants.HEADER_CORRELATION_UUID, UUID.randomUUID().toString()).build();
       };
   }

   @Bean
   public Supplier<Message<Command>> withdrawalSource() {
       return () -> {
           final String accountId = Constants.UNITY_ACCOUNT_ID;

           try {
               final BankAccount bankAccount = bankAccountService.getAccount(accountId);
               if (null == bankAccount) {
                   return null;
               }
           } catch (RestClientException e) {
               log.warn("Account service still starting up (unable to parse response for account " + accountId + ")");
               return null;
           } catch (ResponseStatusException e) {
               if (e.getStatus() != HttpStatus.NOT_FOUND && e.getStatus() != HttpStatus.SERVICE_UNAVAILABLE) {
                   return null;
               }
           }

           Long amount = Math.round(Math.random() * 49.0d + 1.0d);
           Command withdrawRequest = Command.builder().commandType(CommandType.WITHDRAW).accountId(accountId)
                   .amount(amount).build();

           log.info("Sending withdrawal request: {}", withdrawRequest);

           return MessageBuilder.withPayload(withdrawRequest).setHeader(Constants.HEADER_ACCOUNT_ID, accountId)
                   .setHeader(Constants.HEADER_CORRELATION_UUID, UUID.randomUUID().toString()).build();
       };
   }

}

The requested withdrawal amounts are five times as much as the deposit amounts, allowing for some failures to occur when insufficient funds are available. These failures will be indicated in the application’s log output when eventually running the fully working example.

Command and Event Processing

Let’s look in greater detail at the command handler. This delegates to the command’s business logic to check if they can be processed on the latest available account state. If they can, processing occurs and an event is generated. If processing cannot happen, null is returned instead and no event will be generated – in our processing topology we’re using null to indicate the termination of a given messaging flow.

package com.example.esexample.pipeline.processing;

import com.example.esexample.model.entities.BankAccount;
import com.example.esexample.model.events.AccountEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import java.util.function.BiFunction;

@Service
@Slf4j
public class EventHandler {

   @Bean
   public BiFunction<KStream<String, AccountEvent>, KTable<String, BankAccount>, KStream<String, AccountEvent>> processEvent() {
       return (eventStream, accountTable) -> eventStream
               .leftJoin(accountTable, (event, bankAccount) -> null == bankAccount ? event : null)
               .filter((s, event) -> {
                   if (null != event) {
                       log.info("Sending account event {}", event);
                   }
                   return null != event;
               });
   }

}

Below are storage sinks, which act as terminator nodes within the messaging topology. These just log any messages they receive – the real magic of using these for persistent key/value storage is done via Spring Cloud Stream Kafka configuration:

package com.example.esexample.pipeline.processing;

import com.example.esexample.model.commands.Command;
import com.example.esexample.model.entities.BankAccount;
import com.example.esexample.model.events.AccountEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.context.annotation.Bean;

import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
public class StorageSinks {

   @Bean
   public Function<KStream<String, AccountEvent>, KStream<String, BankAccount>> getAccountFromEvent() {
       return (eventStream) -> eventStream.mapValues((s, event) -> event.getBankAccount());
   }

   @Bean
   public Consumer<KTable<String, BankAccount>> accountStorageSink() {
       return accountTable -> {
           accountTable.mapValues((accountId, bankAccount) -> {
               log.info("Sinking account #{} to persistent state store: {} [{}]", accountId,
                       accountTable.queryableStoreName(), bankAccount);
               return bankAccount;
           });
       };
   }

   @Bean
   public Consumer<KTable<String, Command>> commandStorageSink() {
       return commandTable -> {
           commandTable.mapValues((accountId, command) -> {
               log.info("Sinking command to persistent state store: {} [{}]", commandTable.queryableStoreName(),
                       command);
               return command;
           });
       };
   }

}

Tying It All Together – Application Configuration

A basic Lombok configuration file is needed to properly support our JSON serialization. Create a lombok.config file within the root of your src/main/java folder and add the following to it:

config.stopBubbling = true
lombok.copyableAnnotations += com.fasterxml.jackson.annotation.JsonProperty

Now that we have all the code in place, we need to wire each individual component into a complete processing topology. This is done with an application.yml configuration file. Replace the application.properties file in your project with the application.yml below:

spring.application.name: es-example

spring.cloud.stream:
 default.content-type: application/json
 kafka:
   streams.binder.configuration:
     processing.guarantee: exactly_once
###############
# SET THIS TO A VALID HOSTNAME BEFORE RUNNING THE APP!
###############
     application.server: CHANGEME.LOCAL.HOST:8080
     commit.interval.ms: 1000
   streams.binder.stateStoreRetry.maxAttempts: 5
   default.producer:
     messageKeyExpression: headers['accountId'].getBytes('UTF-8')
     sync: true
     configuration:
       acks: all
       retries: 1
       enable.idempotence: true
       max.block.ms: 5000
# Enable indefinite topic message retention
     topic.properties.retention:
       ms: -1
       bytes: -1
   default.consumer.partitioned: true
# Which spring cloud stream processing functions should be activated at runtime
 function.definition: openAccount;processEvent;getAccountFromEvent;accountStorageSink;commandStorageSink;depositSource;withdrawalSource;processCommand
 bindings:
   openAccount-out-0.destination: source.accounts
   processEvent-in-0.destination: source.accounts
   processEvent-in-1.destination: stores.accounts
   processEvent-out-0.destination: events
   depositSource-out-0.destination: commands
   withdrawalSource-out-0.destination: commands
   processCommand-in-0.destination: commands
   processCommand-in-1.destination: stores.accounts
   processCommand-out-0.destination: events
   getAccountFromEvent-in-0.destination: events
   getAccountFromEvent-out-0.destination: stores.accounts
   accountStorageSink-in-0.destination: stores.accounts
   commandStorageSink-in-0.destination: commands
 kafka.streams.bindings:
   accountStorageSink-in-0.consumer.materializedAs: stores.accounts
   getAccountFromEvent-in-0.consumer.materializedAs: stores.events
   commandStorageSink-in-0.consumer.materializedAs: stores.commands
# Each processor bean within a Kafka Streams topology requires a unique application ID
 kafka.streams.binder.functions:
   openAccount.applicationId: openAccount
   processEvent.applicationId: processEvent
   depositSource.applicationId: depositSource
   withdrawalSource.applicationId: depositSource
   processCommand.applicationId: processCommand
   getAccountFromEvent.applicationId: getAccountFromEvent
   accountStorageSink.applicationId: accountStorageSink
   commandStorageSink..applicationId: commandSink

Ensure you set a value for application.server before running the app. This should correspond to your local hostname – kafka requires this to support its HostInfo routing support between multiple cluster nodes.

The main points of interest in the config are:

spring.cloud.stream.function.definition

This indicates which of the given Spring Cloud Function processing components should be enabled within the running application. Here we are listing all the functions added so far in this example.

spring.cloud.stream.bindings

These configure our messaging topology – how all the inputs, outputs and intermediate processing functions are connected. Each named destination corresponds to a distinct Kafka topic which is used to route messages between processing functions.

spring.cloud.stream.kafka.streams.bindings

We are using our trivial storage sink functions to act as markers for named key/value stores within Kafka; these configuration options indicate which stores should be created and used. You may also notice that some of these store names are included in the spring.cloud.stream.bindings – as mentioned at the beginning of the article, Kafka’s storage APIs act as layers above its messaging topics. This means a named key/value store corresponds to a message topic, so it can also be used within the Kafka Streams message processing topology, via the KStream and KTable APIs. We use this mechanism within the CommandHandler, effectively joining on the latest available account data when attempting to process a command. This allows us to quickly check if a command can be processed by looking at the latest stored account state, without needing to re-process every event to determine what the current account balance is.

spring.cloud.stream.kafka.streams.binder.functions

Each spring cloud function within the application requires its own application ID for unique identification within the Kafka processing topology. With this example, we are including all processing functions within a single application – however, these would typically be separated out into their own independent microservices. For cases where a single application contains a single processing function, this configuration option can be omitted, in which case, Spring will use the main spring.application.name config property to identify the processing function within the Kafka topology.

System Review

With everything now in place, we can take a step back and review the system just created. The following diagram illustrates how both a command and query will flow through and how Kafka is leveraged along the way.

Each event flows through a series of independent functions – the Command Handler, Event Handler, and Storage Sink – until the resulting entity is persisted, marking the end of the event’s modification to the system.

Each query is much simpler as only the current system state is needed, which we’ve conveniently persisted alongside the corresponding event. The query request flows through a traditional service and repository implementation which ultimately leverages Kafka’s interactive query API to produce the latest system state.

Conclusion

This article has shown a complete working application for a Kafka-based event sourcing system that implements our simple banking account example. It has also shared several efficiency paradigms that are useful for real-world event sourcing applications, specifically derived state stores that provide easy access to the latest available state without needing to reprocess (“source”) all the stored events for each and every processing operation.

Once you build and run the completed application, you will see log output indicating the sending and receiving of commands and events, together with the periodic output of the latest account balance.

It is worth mentioning that this example is by no means canonical – it only shows one way of designing and building such a system. Hopefully, seeing a fully-working example like this can provide inspiration for implementing your own event sourcing system.

If you need help architecting your next enterprise application or improving your current architecture, please contact us to discuss how we can help!