Eventual Consistency with Spring for Apache Kafka: Part 1 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

Given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Introduction

Apache Kafka is an open-source distributed event streaming platform capable of handling trillions of messages. According to Confluent, initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform.

Eventual consistency, according to Wikipedia, is a consistency model used in distributed computing to achieve high availability that informally guarantees that if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. I previously covered the topic of eventual consistency in a distributed system using RabbitMQ in the May 2017 post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. The post was featured on Pivotal’s RabbitMQ website.

Domain-driven Design

To ground the discussion, let’s examine a common example — an online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

Given this problem domain, we can assume we have the concept of a Customer. Further, we can assume the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of the Customer will require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program and online shopping activity. Fulfillment may maintain a record of all orders being shipped to the customer. Security likely holds the customer’s access credentials, account access history, and privacy settings.

Below are the Customer data objects are shown in yellow. Orange represents the logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts or even between services within the same context, then we must ensure data consistency. Take, for example, the case where a customer changes their home address or email. Let us assume that the Accounting context is the system of record for these data fields. However, to fulfill orders, the Shipping context might also need to maintain the customer’s current home address. Likewise, the Marketing context, responsible for opt-in email advertising, also needs to be aware of the email change and update its customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change without expecting a response. They are stating a fact, not asking a question. Interested parties can choose if and how to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, defined by Martin Fowler of ThoughtWorks in his insightful post, What do you mean by “Event-Driven”?. Changes to a piece of data can be thought of as a state change event — events that contain details of the data that changed. Coincidentally, Fowler uses a customer’s address change as an example of Event-Carried State Transfer in the post. Fellow former ThoughtWorker Graham Brooks also detailed the concept in his post, Event-Carried State Transfer Pattern.

Consistency Strategies

Multiple architectural approaches can be taken to solve for data consistency in a distributed system. For example, you could use a single relational database with shared schemas to persist data, avoiding the distributed data model altogether. However, it could be argued that using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages persist in Kafka, the service has the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity to the system.

In this post’s somewhat simplistic architecture, the business microservices will maintain consistency across their respective domains by producing and consuming messages from multiple Kafka topics to which they are subscribed. Kafka Producers may also be Consumers within our domain.

Storefront Example

In this post, our online storefront API will be built in Java using Spring Boot and OpenJDK 16. We will ensure the uniformity of distributed data by using a publish/subscribe model with Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot microservice, if appropriate, that state change will trigger a state change event, which will be shared with other microservices using Kafka topics.

View of the Storefront API from Kiali

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from one another while still ensuring the data is distributed.

Given the use case of placing an order, we will examine the interactions of three services that compose our storefront API: the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other in a completely decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at three event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service.

Below is a view of the online storefront through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you an idea of where Kafka and Zookeeper, Kafka’s current cluster manager, might sit in a typical, highly-available, microservice-based, distributed application platform.

This post will focus on the storefront’s backend API — its services, databases, and messaging sub-systems.

Storefront Microservices

We will explore the functionality of each of the three microservices and how they share state change events using Kafka 2.8. Each storefront API service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream are not part of this post.

Source Code

The storefront’s microservices source code is publicly available on GitHub. The four GitHub projects can be cloned using the following commands:

git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-accounts.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-orders.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-fulfillment.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. Below we see the representation of a Customer, as a BSON document in the customer.accounts MongoDB database collection.

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}
view raw customer.orders.bson hosted with ❤ by GitHub

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

Since the CustomerChangeEvent domain event object does not persist in MongoDB, we can look at its JSON message payload in Kafka to examine its structure. Note the differences in the data structure (schema) between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload.

{
"id": "5b189af9a8d05613315b0212",
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}, {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}]
}

For simplicity, we will assume that other services do not make changes to the customer’s name, contact information, or addresses — this is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub. Use the latest 2021-istio branch of the project.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows:

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}
view raw customer.orders.bson hosted with ❤ by GitHub

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. The FulfillmentRequestEvent object only contains the information it needs to share. Our example shares a single Order, along with the customer’s name, contact information, and shipping address.

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at its JSON message payload in Kafka. Again, note the schema differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka.

{
"timestamp": 1528334218821,
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"address": {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
"order": {
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvents": [{
"timestamp": 1528333926586,
"orderStatusType": "CREATED",
"note": null
}, {
"timestamp": 1528333926586,
"orderStatusType": "APPROVED",
"note": null
}],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": 11.99
},
"quantity": 5
}]
}
}

Source code for the Orders service is available on GitHub. Use the latest 2021-istio branch of the project.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store the latest shipping status, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping address are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object represented as a BSON document in the fulfillment.requests database collection looks as follows:

{
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"),
"timestamp": NumberLong("1528553706260"),
"name": {
"title": "Ms.",
"firstName": "Susan",
"lastName": "Blackstone"
},
"contact": {
"primaryPhone": "433-544-6555",
"secondaryPhone": "223-445-6767",
"email": "susan.m.blackstone@emailisus.com"
},
"address": {
"type": "SHIPPING",
"description": "Home Sweet Home",
"address1": "33 Oak Avenue",
"city": "Nowhere",
"state": "VT",
"postalCode": "444556-9090"
},
"order": {
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44",
"orderStatusEvents": [{
"timestamp": NumberLong("1528558453686"),
"orderStatusType": "RECEIVED"
}],
"orderItems": [{
"product": {
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa",
"title": "Purple Widget"
},
"quantity": 2
},
{
"product": {
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897",
"title": "Blue Widget"
},
"quantity": 5
},
{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget"
},
"quantity": 2
}
]
},
"shippingMethod": "Drone",
"_class": "com.storefront.model.Fulfillment"
}

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, again, we can again look at its JSON message payload in Kafka.

{
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvent": {
"timestamp": 1528334452746,
"orderStatusType": "PROCESSING",
"note": null
}
}

Source code for the Fulfillment service is available on GitHub. Use the latest 2021-istio branch of the project.

State Change Event Messaging Flows

There are three state change event messaging flows illustrated in this post.

  1. Changes to a Customer triggers an event message produced by the Accounts service, which is published on the accounts.customer.change Kafka topic and consumed by the Orders service;
  2. Order Approved triggers an event message produced by the Orders service, which is published on the orders.order.fulfill Kafka topic, and is consumed by the Fulfillment service;
  3. Changes to the status of an Order triggers an event message produced by the Fulfillment Service, which is published on the fulfillment.order.change Kafka topic, and is consumed by the Orders service;

Each of these state change event messaging flows follows the same architectural pattern on both the Kafka topic’s producer and consumer sides.

Let us examine each state change event messaging flow and the code behind it.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. By way of Kafka, it can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information.

There are different methods to trigger a message to be sent to Kafka. For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity.

@Slf4j
@Controller
public class AfterSaveListener extends AbstractMongoEventListener<Customer> {
@Value("${spring.kafka.topic.accounts-customer}")
private String topic;
private Sender sender;
@Autowired
public AfterSaveListener(Sender sender) {
this.sender = sender;
}
@Override
public void onAfterSave(AfterSaveEvent<Customer> event) {
log.info("onAfterSave event='{}'", event);
Customer customer = event.getSource();
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent();
customerChangeEvent.setId(customer.getId());
customerChangeEvent.setName(customer.getName());
customerChangeEvent.setContact(customer.getContact());
customerChangeEvent.setAddresses(customer.getAddresses());
sender.send(topic, customerChangeEvent);
}
}

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate;
public void send(String topic, CustomerChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender uses a KafkaTemplate to send the message to the accounts.customer.change Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

The Orders service’s Receiver class consumes the CustomerChangeEvent messages produced by the Accounts service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Orders service’s Receiver class is configured differently compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as input, denoting the object type into which the message payload needs to be deserialized. This way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file in each service’s resources directory contains the Kafka configuration (lines 11–19).

server:
port: 8080
spring:
main:
allow-bean-definition-overriding: true
application:
name: orders
data:
mongodb:
uri: mongodb://mongo:27017/orders
kafka:
bootstrap-servers: kafka:9092
topic:
accounts-customer: accounts.customer.change
orders-order: orders.order.fulfill
fulfillment-order: fulfillment.order.change
consumer:
group-id: orders
auto-offset-reset: earliest
zipkin:
sender:
type: kafka
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: INFO
spring:
config:
activate:
on-profile: local
data:
mongodb:
uri: mongodb://localhost:27017/orders
kafka:
bootstrap-servers: localhost:9092
server:
port: 8090
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
spring:
config:
activate:
on-profile: confluent
server:
port: 8080
logging:
level:
root: INFO
server:
port: 8080
spring:
config:
activate:
on-profile: minikube
data:
mongodb:
uri: mongodb://mongo.dev:27017/orders
kafka:
bootstrap-servers: kafka-cluster.dev:9092
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
view raw application.yaml hosted with ❤ by GitHub

Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the orders.order.fulfill Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate;
public void send(String topic, FulfillmentRequestEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender class uses a KafkaTemplate to send the message to the orders.order.fulfill Kafka topic, as shown below. Since message order is not critical, messages can be sent to a topic with multiple partitions if the volume of messages required it.

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. The Fulfillment object includes the order to be fulfilled and the customer’s contact and shipping information.

@Slf4j
@Component
public class Receiver {
@Autowired
private FulfillmentRepository fulfillmentRepository;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.orders-order}")
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) {
log.info("received payload='{}'", fulfillmentRequestEvent.toString());
latch.countDown();
Fulfillment fulfillment = new Fulfillment();
fulfillment.setId(fulfillmentRequestEvent.getId());
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp());
fulfillment.setName(fulfillmentRequestEvent.getName());
fulfillment.setContact(fulfillmentRequestEvent.getContact());
fulfillment.setAddress(fulfillmentRequestEvent.getAddress());
fulfillment.setOrder(fulfillmentRequestEvent.getOrder());
fulfillmentRepository.save(fulfillment);
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(FulfillmentRequestEvent.class));
}
@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Fulfillment Order Status State Change

When the Order status in a Fulfillment entity is changed to anything other than Approved, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial Created status to the final Received status.

The Fulfillment service exposes several endpoints via the FulfillmentController class, which simulates a change in order status. They allow an order’s status to be changed from Approved to Processing, to Shipped, to In Transit, and finally to Received. This change applies to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates a Kafka message, containing the OrderStatusChangeEvent in the message payload. The Fulfillment service’s Sender class handles this.

Note in this example that these two events are not handled in an atomic transaction. Either updating the database or sending the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure that both these independent actions succeed or fail as a single transaction to ensure data consistency, using any of a handful of common architectural patterns.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;
public void send(String topic, OrderStatusChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender class uses a KafkaTemplate to send the message to the fulfillment.order.change Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages can be sent to a topic with multiple partitions if the volume of messages requires it.

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message produced by the Fulfillment service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service receives messages from more than one topic. The ReceiverConfig class deserializes all messages using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory class’s setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components into a local development environment running on Kubernetes with Istio, using Minikube. To provide operational visibility, we will add observability tools, like Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo Express, Kiali, Prometheus, and Grafana to our system.

View of the Storefront API from Kiali

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

  1. Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1 | Programmatic Ponderings

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: