Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices
Given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.
Introduction
Apache Kafka is an open-source distributed event streaming platform capable of handling trillions of messages. According to Confluent, initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform.
Eventual consistency, according to Wikipedia, is a consistency model used in distributed computing to achieve high availability that informally guarantees that if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. I previously covered the topic of eventual consistency in a distributed system using RabbitMQ in the May 2017 post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. The post was featured on Pivotal’s RabbitMQ website.
Domain-driven Design
To ground the discussion, let’s examine a common example — an online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

Given this problem domain, we can assume we have the concept of a Customer. Further, we can assume the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of the Customer will require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program and online shopping activity. Fulfillment may maintain a record of all orders being shipped to the customer. Security likely holds the customer’s access credentials, account access history, and privacy settings.
Below are the Customer data objects are shown in yellow. Orange represents the logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.

Distributed Data Consistency
If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts or even between services within the same context, then we must ensure data consistency. Take, for example, the case where a customer changes their home address or email. Let us assume that the Accounting context is the system of record for these data fields. However, to fulfill orders, the Shipping context might also need to maintain the customer’s current home address. Likewise, the Marketing context, responsible for opt-in email advertising, also needs to be aware of the email change and update its customer records.
If a piece of shared data is changed, then the party making the change should be responsible for communicating the change without expecting a response. They are stating a fact, not asking a question. Interested parties can choose if and how to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, defined by Martin Fowler of ThoughtWorks in his insightful post, What do you mean by “Event-Driven”?. Changes to a piece of data can be thought of as a state change event — events that contain details of the data that changed. Coincidentally, Fowler uses a customer’s address change as an example of Event-Carried State Transfer in the post. Fellow former ThoughtWorker Graham Brooks also detailed the concept in his post, Event-Carried State Transfer Pattern.
Consistency Strategies
Multiple architectural approaches can be taken to solve for data consistency in a distributed system. For example, you could use a single relational database with shared schemas to persist data, avoiding the distributed data model altogether. However, it could be argued that using a single database just turned your distributed system back into a monolith.
You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.
Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages persist in Kafka, the service has the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity to the system.
In this post’s somewhat simplistic architecture, the business microservices will maintain consistency across their respective domains by producing and consuming messages from multiple Kafka topics to which they are subscribed. Kafka Producers may also be Consumers within our domain.
Storefront Example
In this post, our online storefront API will be built in Java using Spring Boot and OpenJDK 16. We will ensure the uniformity of distributed data by using a publish/subscribe model with Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot microservice, if appropriate, that state change will trigger a state change event, which will be shared with other microservices using Kafka topics.

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from one another while still ensuring the data is distributed.

Given the use case of placing an order, we will examine the interactions of three services that compose our storefront API: the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other in a completely decoupled manner.
The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at three event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service.

Below is a view of the online storefront through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you an idea of where Kafka and Zookeeper, Kafka’s current cluster manager, might sit in a typical, highly-available, microservice-based, distributed application platform.

This post will focus on the storefront’s backend API — its services, databases, and messaging sub-systems.

Storefront Microservices
We will explore the functionality of each of the three microservices and how they share state change events using Kafka 2.8. Each storefront API service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream are not part of this post.
Source Code
The storefront’s microservices source code is publicly available on GitHub. The four GitHub projects can be cloned using the following commands:
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-accounts.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-orders.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-fulfillment.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git
Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
Accounts Service
The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

The Customer
class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. Below we see the representation of a Customer
, as a BSON document in the customer.accounts
MongoDB database collection.
Along with the primary Customer
entity, the Accounts service contains a CustomerChangeEvent
class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent
domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added or a change is made to an existing customer. The CustomerChangeEvent
object is not an exact duplicate of the Customer
object. For example, the CustomerChangeEvent
object does not share sensitive credit card information with other message Consumers (the CreditCard
data object).

Since the CustomerChangeEvent
domain event object does not persist in MongoDB, we can look at its JSON message payload in Kafka to examine its structure. Note the differences in the data structure (schema) between the Customer
document in MongoDB and the Kafka CustomerChangeEvent
message payload.
For simplicity, we will assume that other services do not make changes to the customer’s name, contact information, or addresses — this is the sole responsibility of the Accounts service.
Source code for the Accounts service is available on GitHub. Use the latest 2021-istio
branch of the project.
Orders Service
The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

The CustomerOrders
class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order
data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders
, represented as a BSON document in the customer.orders
database collection, looks as follows:
Along with the primary CustomerOrders
entity, the Orders service contains the FulfillmentRequestEvent
class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent
domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. The FulfillmentRequestEvent
object only contains the information it needs to share. Our example shares a single Order
, along with the customer’s name, contact information, and shipping address.

Since the FulfillmentRequestEvent
domain event object is not persisted in MongoDB, we can look at its JSON message payload in Kafka. Again, note the schema differences between the CustomerOrders
document in MongoDB and the FulfillmentRequestEvent
message payload in Kafka.
Source code for the Orders service is available on GitHub. Use the latest 2021-istio
branch of the project.
Fulfillment Service
Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

The Fulfillment service’s primary entity, the Fulfillment
class, is persisted in MongoDB. This entity contains a single Order
data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment
entity to store the latest shipping status, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping address are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service via Kafka, using the FulfillmentRequestEvent
entity.
In the Fulfillment MongoDB database, a Fulfillment
object represented as a BSON document in the fulfillment.requests
database collection looks as follows:
Along with the primary Fulfillment
entity, the Fulfillment service has an OrderStatusChangeEvent
class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent
domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent
object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.
Since the OrderStatusChangeEvent
domain event object is not persisted in MongoDB, again, we can again look at its JSON message payload in Kafka.
Source code for the Fulfillment service is available on GitHub. Use the latest 2021-istio
branch of the project.
State Change Event Messaging Flows
There are three state change event messaging flows illustrated in this post.
- Changes to a Customer triggers an event message produced by the Accounts service, which is published on the
accounts.customer.change
Kafka topic and consumed by the Orders service; - Order Approved triggers an event message produced by the Orders service, which is published on the
orders.order.fulfill
Kafka topic, and is consumed by the Fulfillment service; - Changes to the status of an Order triggers an event message produced by the Fulfillment Service, which is published on the
fulfillment.order.change
Kafka topic, and is consumed by the Orders service;
Each of these state change event messaging flows follows the same architectural pattern on both the Kafka topic’s producer and consumer sides.

Let us examine each state change event messaging flow and the code behind it.
Customer State Change
When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent
message is produced and sent to the accounts.customer.change
Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. By way of Kafka, it can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information.

There are different methods to trigger a message to be sent to Kafka. For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity.
The listener handles the event by instantiating a new CustomerChangeEvent
with the Customer’s information and passes it to the Sender
class.
The SenderConfig
class handles the configuration of the Sender
. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent
object into a JSON message payload.
The Sender
uses a KafkaTemplate to send the message to the accounts.customer.change
Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

The Orders service’s Receiver
class consumes the CustomerChangeEvent
messages produced by the Accounts service.
The Orders service’s Receiver
class is configured differently compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig
class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig
references Spring Kafka’s AbstractKafkaListenerContainerFactory
classes setMessageConverter method, which allows for dynamic object type matching.
Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver
class (shown above). This method accepts a specific object type as input, denoting the object type into which the message payload needs to be deserialized. This way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent
, the Orders service calls the receiveCustomerOrder
method to consume the message and properly deserialize it.
For all services, a Spring application.yaml
properties file in each service’s resources
directory contains the Kafka configuration (lines 11–19).
Order Approved for Fulfillment
When the status of the Order
in a CustomerOrders
entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent
message is produced and sent to the orders.order.fulfill
Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent
is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent
is passed to the Sender
class.
The SenderConfig
class handles the configuration of the Sender
class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent
object into a JSON message payload.
The Sender
class uses a KafkaTemplate
to send the message to the orders.order.fulfill
Kafka topic, as shown below. Since message order is not critical, messages can be sent to a topic with multiple partitions if the volume of messages required it.

The Fulfillment service’s Receiver
class consumes the FulfillmentRequestEvent
from the Kafka topic and instantiates a Fulfillment
object, containing the data passed in the FulfillmentRequestEvent
message payload. The Fulfillment
object includes the order to be fulfilled and the customer’s contact and shipping information.
The Fulfillment service’s ReceiverConfig
class defines the DefaultKafkaConsumerFactory
and ConcurrentKafkaListenerContainerFactory
, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent
object.
Fulfillment Order Status State Change
When the Order status in a Fulfillment entity is changed to anything other than Approved, an OrderStatusChangeEvent
message is produced by the Fulfillment service and sent to the fulfillment.order.change
Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder
lifecycle events from the initial Created status to the final Received status.

The Fulfillment service exposes several endpoints via the FulfillmentController
class, which simulates a change in order status. They allow an order’s status to be changed from Approved to Processing, to Shipped, to In Transit, and finally to Received. This change applies to all orders that meet the criteria.
Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates a Kafka message, containing the OrderStatusChangeEvent
in the message payload. The Fulfillment service’s Sender
class handles this.
Note in this example that these two events are not handled in an atomic transaction. Either updating the database or sending the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure that both these independent actions succeed or fail as a single transaction to ensure data consistency, using any of a handful of common architectural patterns.
The SenderConfig
class handles the configuration of the Sender
class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent
object into a JSON message payload. This class is almost identical to the SenderConfig
class in the Orders and Accounts services.
The Sender
class uses a KafkaTemplate
to send the message to the fulfillment.order.change
Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages can be sent to a topic with multiple partitions if the volume of messages requires it.

The Orders service’s Receiver
class is responsible for consuming the OrderStatusChangeEvent
message produced by the Fulfillment service.
As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service receives messages from more than one topic. The ReceiverConfig
class deserializes all messages using the StringDeserializer
. The Orders service’s ReceiverConfig
class references the Spring Kafka AbstractKafkaListenerContainerFactory
class’s setMessageConverter method, which allows for dynamic object type matching.
Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver
class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent
message, the receiveOrderStatusChangeEvents
method is called to consume a message from the fulfillment.order.change
Kafka topic.
Part Two
In Part Two of this post, we will review how to deploy and run the storefront API components into a local development environment running on Kubernetes with Istio, using Minikube. To provide operational visibility, we will add observability tools, like Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo Express, Kiali, Prometheus, and Grafana to our system.

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.