** This post has been rewritten and updated in May 2021 **
Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge, using Apache Kafka and the model of eventual consistency.
I previously covered the topic of eventual consistency in a distributed system, using RabbitMQ, in the post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. This post is featured on Pivotal’s RabbitMQ website.
Introduction
To ground the discussion, let’s examine a common example of the online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.
Given this problem domain, we can assume we have the concept of the Customer. Further, the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of a Customer would require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record (SOR) for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program. Fulfillment may maintain a record of all the orders shipped to the customer. Security likely holds the customer’s access credentials and privacy settings.
Below, Customer data objects are shown in yellow. Orange represents logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.
Distributed Data Consistency
If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts, or even between services within the same contexts, then we must ensure data consistency. Take, for example, a change in a customer’s address. The Accounting context is the system of record for the customer’s addresses. However, to fulfill orders, the Shipping context might also need to maintain the customer’s address. Likewise, the Marketing context, who is responsible for direct-mail advertising, also needs to be aware of the address change, and update its own customer records.
If a piece of shared data is changed, then the party making the change should be responsible for communicating the change, without the expectation of a response. They are stating a fact, not asking a question. Interested parties can choose if, and how, to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, as defined by Martin Fowler, of ThoughtWorks, in his insightful post, What do you mean by “Event-Driven”?. A change to a piece of data can be thought of as a state change event. Coincidentally, Fowler also uses a customer’s address change as an example of Event-Carried State Transfer. The Event-Carried State Transfer Pattern is also detailed by fellow ThoughtWorker and noted Architect, Graham Brooks.
Consistency Strategies
Multiple architectural approaches could be taken to solve for data consistency in a distributed system. For example, you could use a single relational database to persist all data, avoiding the distributed data model altogether. Although I would argue, using a single database just turned your distributed system back into a monolith.
You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.
Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages are persisted in Kafka, the service have the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity.
Storefront Example
In this post, our online storefront’s services will be built using Spring Boot. Thus, we will ensure the uniformity of distributed data by using a Publish/Subscribe model with the Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot service, if appropriate, that state change will trigger an event, which will be shared with other services using Kafka topics.
We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from each one another, while still ensuring the data is exchanged.
Given the use case of placing an order, we will examine the interactions of three services, the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other, in a decoupled manner.
The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service. Kafka Producers may also be Consumers within our domain.
Below is a view of the online storefront, through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you the idea of where Kafka, and Zookeeper, Kafka’s cluster manager, might sit in a typical, highly-available, microservice-based, distributed, application platform.
This post will focus on the storefront’s services, database, and messaging sub-systems.
Storefront Microservices
First, we will explore the functionality of each of the three microservices. We will examine how they share state change events using Kafka. Each storefront service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, Spring Cloud Netflix Eureka, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream is not part of this post.
Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
Accounts Service
The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.
The Customer
class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. A Customer
, represented as a BSON document in the customer.accounts
database collection, looks as follows (gist).
{ | |
"_id": ObjectId("5b189af9a8d05613315b0212"), | |
"name": { | |
"title": "Mr.", | |
"firstName": "John", | |
"middleName": "S.", | |
"lastName": "Doe", | |
"suffix": "Jr." | |
}, | |
"contact": { | |
"primaryPhone": "555-666-7777", | |
"secondaryPhone": "555-444-9898", | |
"email": "john.doe@internet.com" | |
}, | |
"addresses": [{ | |
"type": "BILLING", | |
"description": "My cc billing address", | |
"address1": "123 Oak Street", | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}, | |
{ | |
"type": "SHIPPING", | |
"description": "My home address", | |
"address1": "123 Oak Street", | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
} | |
], | |
"creditCards": [{ | |
"type": "PRIMARY", | |
"description": "VISA", | |
"number": "1234-6789-0000-0000", | |
"expiration": "6/19", | |
"nameOnCard": "John S. Doe" | |
}, | |
{ | |
"type": "ALTERNATE", | |
"description": "Corporate American Express", | |
"number": "9999-8888-7777-6666", | |
"expiration": "3/20", | |
"nameOnCard": "John Doe" | |
} | |
], | |
"_class": "com.storefront.model.Customer" | |
} |
Along with the primary Customer
entity, the Accounts service contains a CustomerChangeEvent
class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent
domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added, or a change is made to an existing customer. The CustomerChangeEvent
object is not an exact duplicate of the Customer
object. For example, the CustomerChangeEvent
object does not share sensitive credit card information with other message Consumers (the CreditCard
data object).
Since the CustomerChangeEvent
domain event object is not persisted in MongoDB, to examine its structure, we can look at its JSON message payload in Kafka. Note the differences in the data structure between the Customer
document in MongoDB and the Kafka CustomerChangeEvent
message payload (gist).
{ | |
"id": "5b189af9a8d05613315b0212", | |
"name": { | |
"title": "Mr.", | |
"firstName": "John", | |
"middleName": "S.", | |
"lastName": "Doe", | |
"suffix": "Jr." | |
}, | |
"contact": { | |
"primaryPhone": "555-666-7777", | |
"secondaryPhone": "555-444-9898", | |
"email": "john.doe@internet.com" | |
}, | |
"addresses": [{ | |
"type": "BILLING", | |
"description": "My cc billing address", | |
"address1": "123 Oak Street", | |
"address2": null, | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}, { | |
"type": "SHIPPING", | |
"description": "My home address", | |
"address1": "123 Oak Street", | |
"address2": null, | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}] | |
} |
For simplicity, we will assume other services do not make changes to the customer’s name, contact information, or addresses. It is the sole responsibility of the Accounts service.
Source code for the Accounts service is available on GitHub.
Orders Service
The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.
The CustomerOrders
class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order
data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders
, represented as a BSON document in the customer.orders
database collection, looks as follows (gist).
{ | |
"_id": ObjectId("5b189af9a8d05613315b0212"), | |
"name": { | |
"title": "Mr.", | |
"firstName": "John", | |
"middleName": "S.", | |
"lastName": "Doe", | |
"suffix": "Jr." | |
}, | |
"contact": { | |
"primaryPhone": "555-666-7777", | |
"secondaryPhone": "555-444-9898", | |
"email": "john.doe@internet.com" | |
}, | |
"addresses": [{ | |
"type": "BILLING", | |
"description": "My cc billing address", | |
"address1": "123 Oak Street", | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}, | |
{ | |
"type": "SHIPPING", | |
"description": "My home address", | |
"address1": "123 Oak Street", | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
} | |
], | |
"orders": [{ | |
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4", | |
"orderStatusEvents": [{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "CREATED" | |
}, | |
{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "APPROVED" | |
}, | |
{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "PROCESSING" | |
}, | |
{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "COMPLETED" | |
} | |
], | |
"orderItems": [{ | |
"product": { | |
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37", | |
"title": "Green Widget", | |
"description": "Gorgeous Green Widget", | |
"price": "11.99" | |
}, | |
"quantity": 2 | |
}, | |
{ | |
"product": { | |
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48", | |
"title": "Red Widget", | |
"description": "Reliable Red Widget", | |
"price": "3.99" | |
}, | |
"quantity": 3 | |
} | |
] | |
}, | |
{ | |
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e", | |
"orderStatusEvents": [{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "CREATED" | |
}, | |
{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "APPROVED" | |
} | |
], | |
"orderItems": [{ | |
"product": { | |
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d", | |
"title": "Yellow Widget", | |
"description": "Amazing Yellow Widget", | |
"price": "5.99" | |
}, | |
"quantity": 1 | |
}] | |
} | |
], | |
"_class": "com.storefront.model.CustomerOrders" | |
} |
Along with the primary CustomerOrders
entity, the Orders service contains the FulfillmentRequestEvent
class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent
domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. TheFulfillmentRequestEvent
object only contains the information it needs to share. In our example, it shares a single Order
, along with the customer’s name, contact information, and shipping address.
Since the FulfillmentRequestEvent
domain event object is not persisted in MongoDB, we can look at it’s JSON message payload in Kafka. Again, note the structural differences between the CustomerOrders
document in MongoDB and the FulfillmentRequestEvent
message payload in Kafka (gist).
{ | |
"timestamp": 1528334218821, | |
"name": { | |
"title": "Mr.", | |
"firstName": "John", | |
"middleName": "S.", | |
"lastName": "Doe", | |
"suffix": "Jr." | |
}, | |
"contact": { | |
"primaryPhone": "555-666-7777", | |
"secondaryPhone": "555-444-9898", | |
"email": "john.doe@internet.com" | |
}, | |
"address": { | |
"type": "SHIPPING", | |
"description": "My home address", | |
"address1": "123 Oak Street", | |
"address2": null, | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}, | |
"order": { | |
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652", | |
"orderStatusEvents": [{ | |
"timestamp": 1528333926586, | |
"orderStatusType": "CREATED", | |
"note": null | |
}, { | |
"timestamp": 1528333926586, | |
"orderStatusType": "APPROVED", | |
"note": null | |
}], | |
"orderItems": [{ | |
"product": { | |
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37", | |
"title": "Green Widget", | |
"description": "Gorgeous Green Widget", | |
"price": 11.99 | |
}, | |
"quantity": 5 | |
}] | |
} | |
} |
Source code for the Orders service is available on GitHub.
Fulfillment Service
Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.
The Fulfillment service’s primary entity, the Fulfillment
class, is persisted in MongoDB. This entity contains a single Order
data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment
entity to store the latest shipping event, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping addresses are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service, via Kafka, using the FulfillmentRequestEvent
entity.
In the Fulfillment MongoDB database, a Fulfillment
object, represented as a BSON document in the fulfillment.requests
database collection, looks as follows (gist).
{ | |
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"), | |
"timestamp": NumberLong("1528553706260"), | |
"name": { | |
"title": "Ms.", | |
"firstName": "Susan", | |
"lastName": "Blackstone" | |
}, | |
"contact": { | |
"primaryPhone": "433-544-6555", | |
"secondaryPhone": "223-445-6767", | |
"email": "susan.m.blackstone@emailisus.com" | |
}, | |
"address": { | |
"type": "SHIPPING", | |
"description": "Home Sweet Home", | |
"address1": "33 Oak Avenue", | |
"city": "Nowhere", | |
"state": "VT", | |
"postalCode": "444556-9090" | |
}, | |
"order": { | |
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44", | |
"orderStatusEvents": [{ | |
"timestamp": NumberLong("1528558453686"), | |
"orderStatusType": "RECEIVED" | |
}], | |
"orderItems": [{ | |
"product": { | |
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa", | |
"title": "Purple Widget" | |
}, | |
"quantity": 2 | |
}, | |
{ | |
"product": { | |
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897", | |
"title": "Blue Widget" | |
}, | |
"quantity": 5 | |
}, | |
{ | |
"product": { | |
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d", | |
"title": "Yellow Widget" | |
}, | |
"quantity": 2 | |
} | |
] | |
}, | |
"shippingMethod": "Drone", | |
"_class": "com.storefront.model.Fulfillment" | |
} |
Along with the primary Fulfillment
entity, the Fulfillment service has an OrderStatusChangeEvent
class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent
domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent
object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.
Since the OrderStatusChangeEvent
domain event object is not persisted in MongoDB, to examine it, we can again look at it’s JSON message payload in Kafka (gist).
{ | |
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652", | |
"orderStatusEvent": { | |
"timestamp": 1528334452746, | |
"orderStatusType": "PROCESSING", | |
"note": null | |
} | |
} |
Source code for the Fulfillment service is available on GitHub.
State Change Event Messaging Flows
There are three state change event messaging flows demonstrated in this post.
- Change to a Customer triggers an event message by the Accounts service;
- Order Approved triggers an event message by the Orders service;
- Change to the status of an Order triggers an event message by the Fulfillment service;
Each of these state change event messaging flows follow the exact same architectural pattern on both the Producer and Consumer sides of the Kafka topic.
Let’s examine each state change event messaging flow and the code behind them.
Customer State Change
When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent
message is produced and sent to the accounts.customer.change
Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. It can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information, by way of Kafka.
There are different methods to trigger a message to be sent to Kafka, For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity (gist).
@Slf4j | |
@Controller | |
public class AfterSaveListener extends AbstractMongoEventListener<Customer> { | |
@Value("${spring.kafka.topic.accounts-customer}") | |
private String topic; | |
private Sender sender; | |
@Autowired | |
public AfterSaveListener(Sender sender) { | |
this.sender = sender; | |
} | |
@Override | |
public void onAfterSave(AfterSaveEvent<Customer> event) { | |
log.info("onAfterSave event='{}'", event); | |
Customer customer = event.getSource(); | |
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent(); | |
customerChangeEvent.setId(customer.getId()); | |
customerChangeEvent.setName(customer.getName()); | |
customerChangeEvent.setContact(customer.getContact()); | |
customerChangeEvent.setAddresses(customer.getAddresses()); | |
sender.send(topic, customerChangeEvent); | |
} | |
} |
The listener handles the event by instantiating a new CustomerChangeEvent
with the Customer’s information and passes it to the Sender
class (gist).
@Slf4j | |
public class Sender { | |
@Autowired | |
private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate; | |
public void send(String topic, CustomerChangeEvent payload) { | |
log.info("sending payload='{}' to topic='{}'", payload, topic); | |
kafkaTemplate.send(topic, payload); | |
} | |
} |
The configuration of the Sender
is handled by the SenderConfig
class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent
object into a JSON message payload (gist).
@Configuration | |
@EnableKafka | |
public class SenderConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Bean | |
public Map<String, Object> producerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | |
return props; | |
} | |
@Bean | |
public ProducerFactory<String, CustomerChangeEvent> producerFactory() { | |
return new DefaultKafkaProducerFactory<>(producerConfigs()); | |
} | |
@Bean | |
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
@Bean | |
public Sender sender() { | |
return new Sender(); | |
} | |
} |
The Sender
uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.
The Orders service’s Receiver
class consumes the CustomerChangeEvent
messages, produced by the Accounts service (gist).
[gust]cc3c4e55bc291e5435eccdd679d03015[/gist]
The Orders service’s Receiver
class is configured differently, compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig
class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig
references Spring Kafka’s AbstractKafkaListenerContainerFactory
classes setMessageConverter method, which allows for dynamic object type matching (gist).
@Configuration | |
@EnableKafka | |
public class ReceiverConfigNotConfluent implements ReceiverConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.consumer.group-id}") | |
private String groupId; | |
@Value("${spring.kafka.consumer.auto-offset-reset}") | |
private String autoOffsetReset; | |
@Override | |
@Bean | |
public Map<String, Object> consumerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
return props; | |
} | |
@Override | |
@Bean | |
public ConsumerFactory<String, String> consumerFactory() { | |
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), | |
new StringDeserializer(), | |
new StringDeserializer() | |
); | |
} | |
@Bean | |
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, String> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
factory.setMessageConverter(new StringJsonMessageConverter()); | |
return factory; | |
} | |
@Override | |
@Bean | |
public Receiver receiver() { | |
return new Receiver(); | |
} | |
} |
Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver
class (shown above). That method accepts a specific object type as input, denoting the object type the message payload needs to be deserialized into. In this way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent
, the Orders service calls the receiveCustomerOrder
method to consume the message and properly deserialize it.
For all services, a Spring application.yaml
properties file, in each service’s resources
directory, contains the Kafka configuration (gist).
server: | |
port: 8080 | |
spring: | |
main: | |
allow-bean-definition-overriding: true | |
application: | |
name: orders | |
data: | |
mongodb: | |
uri: mongodb://mongo:27017/orders | |
kafka: | |
bootstrap-servers: kafka:9092 | |
topic: | |
accounts-customer: accounts.customer.change | |
orders-order: orders.order.fulfill | |
fulfillment-order: fulfillment.order.change | |
consumer: | |
group-id: orders | |
auto-offset-reset: earliest | |
zipkin: | |
sender: | |
type: kafka | |
management: | |
endpoints: | |
web: | |
exposure: | |
include: '*' | |
logging: | |
level: | |
root: INFO | |
--- | |
spring: | |
config: | |
activate: | |
on-profile: local | |
data: | |
mongodb: | |
uri: mongodb://localhost:27017/orders | |
kafka: | |
bootstrap-servers: localhost:9092 | |
server: | |
port: 8090 | |
management: | |
endpoints: | |
web: | |
exposure: | |
include: '*' | |
logging: | |
level: | |
root: DEBUG | |
--- | |
spring: | |
config: | |
activate: | |
on-profile: confluent | |
server: | |
port: 8080 | |
logging: | |
level: | |
root: INFO | |
--- | |
server: | |
port: 8080 | |
spring: | |
config: | |
activate: | |
on-profile: minikube | |
data: | |
mongodb: | |
uri: mongodb://mongo.dev:27017/orders | |
kafka: | |
bootstrap-servers: kafka-cluster.dev:9092 | |
management: | |
endpoints: | |
web: | |
exposure: | |
include: '*' | |
logging: | |
level: | |
root: DEBUG |
Order Approved for Fulfillment
When the status of the Order
in a CustomerOrders
entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent
message is produced and sent to the accounts.customer.change
Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.
Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent
is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent
is passed to the Sender
class (gist).
@Slf4j | |
public class Sender { | |
@Autowired | |
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate; | |
public void send(String topic, FulfillmentRequestEvent payload) { | |
log.info("sending payload='{}' to topic='{}'", payload, topic); | |
kafkaTemplate.send(topic, payload); | |
} | |
} |
The configuration of the Sender
class is handled by the SenderConfig
class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent
object into a JSON message payload (gist).
@Configuration | |
@EnableKafka | |
public class SenderConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Bean | |
public Map<String, Object> producerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | |
return props; | |
} | |
@Bean | |
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() { | |
return new DefaultKafkaProducerFactory<>(producerConfigs()); | |
} | |
@Bean | |
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
@Bean | |
public Sender sender() { | |
return new Sender(); | |
} | |
} |
The Sender
class uses a KafkaTemplate
to send the message to the Kafka topic, as shown below. Since message order is not critical messages could be sent to a topic with multiple partitions if the volume of messages required it.
The Fulfillment service’s Receiver
class consumes the FulfillmentRequestEvent
from the Kafka topic and instantiates a Fulfillment
object, containing the data passed in the FulfillmentRequestEvent
message payload. This includes the order to be fulfilled and the customer’s contact and shipping information (gist).
@Slf4j | |
@Component | |
public class Receiver { | |
@Autowired | |
private FulfillmentRepository fulfillmentRepository; | |
private CountDownLatch latch = new CountDownLatch(1); | |
public CountDownLatch getLatch() { | |
return latch; | |
} | |
@KafkaListener(topics = "${spring.kafka.topic.orders-order}") | |
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) { | |
log.info("received payload='{}'", fulfillmentRequestEvent.toString()); | |
latch.countDown(); | |
Fulfillment fulfillment = new Fulfillment(); | |
fulfillment.setId(fulfillmentRequestEvent.getId()); | |
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp()); | |
fulfillment.setName(fulfillmentRequestEvent.getName()); | |
fulfillment.setContact(fulfillmentRequestEvent.getContact()); | |
fulfillment.setAddress(fulfillmentRequestEvent.getAddress()); | |
fulfillment.setOrder(fulfillmentRequestEvent.getOrder()); | |
fulfillmentRepository.save(fulfillment); | |
} | |
} |
The Fulfillment service’s ReceiverConfig
class defines the DefaultKafkaConsumerFactory
and ConcurrentKafkaListenerContainerFactory
, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent
object (gist).
@Configuration | |
@EnableKafka | |
public class ReceiverConfigNotConfluent implements ReceiverConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.consumer.group-id}") | |
private String groupId; | |
@Value("${spring.kafka.consumer.auto-offset-reset}") | |
private String autoOffsetReset; | |
@Override | |
@Bean | |
public Map<String, Object> consumerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
return props; | |
} | |
@Override | |
@Bean | |
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() { | |
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), | |
new StringDeserializer(), | |
new JsonDeserializer<>(FulfillmentRequestEvent.class)); | |
} | |
@Override | |
@Bean | |
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
return factory; | |
} | |
@Override | |
@Bean | |
public Receiver receiver() { | |
return new Receiver(); | |
} | |
} |
Fulfillment Order Status State Change
When the status of the Order in a Fulfillment entity is changed to anything other than ‘Approved’, an OrderStatusChangeEvent
message is produced by the Fulfillment service and sent to the fulfillment.order.change
Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder
lifecycle events from the initial ‘Created’ status to the final happy path ‘Received’ status.
The Fulfillment service exposes several endpoints through the FulfillmentController
class, which are simulate a change the status of an order. They allow an order status to be changed from ‘Approved’ to ‘Processing’, to ‘Shipped’, to ‘In Transit’, and to ‘Received’. This change is applied to all orders that meet the criteria.
Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates an Kafka message, containing the OrderStatusChangeEvent
in the message payload. This is handled by the Fulfillment service’s Sender
class.
Note in this example, these two events are not handled in an atomic transaction. Either the updating the database or the sending of the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure both these disparate actions succeed or fail as a single transaction, to ensure data consistency (gist).
@Slf4j | |
public class Sender { | |
@Autowired | |
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate; | |
public void send(String topic, OrderStatusChangeEvent payload) { | |
log.info("sending payload='{}' to topic='{}'", payload, topic); | |
kafkaTemplate.send(topic, payload); | |
} | |
} |
The configuration of the Sender
class is handled by the SenderConfig
class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent
object into a JSON message payload. This class is almost identical to the SenderConfig
class in the Orders and Accounts services (gist).
@Configuration | |
@EnableKafka | |
public class SenderConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Bean | |
public Map<String, Object> producerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | |
return props; | |
} | |
@Bean | |
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() { | |
return new DefaultKafkaProducerFactory<>(producerConfigs()); | |
} | |
@Bean | |
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
@Bean | |
public Sender sender() { | |
return new Sender(); | |
} | |
} |
The Sender
class uses a KafkaTemplate
to send the message to the Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages could be sent to a topic with multiple partitions if the volume of messages required it.
The Orders service’s Receiver
class is responsible for consuming the OrderStatusChangeEvent
message, produced by the Fulfillment service (gist).
@Slf4j | |
@Component | |
public class Receiver { | |
@Autowired | |
private CustomerOrdersRepository customerOrdersRepository; | |
@Autowired | |
private MongoTemplate mongoTemplate; | |
private CountDownLatch latch = new CountDownLatch(1); | |
public CountDownLatch getLatch() { | |
return latch; | |
} | |
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}") | |
public void receiveCustomerOrder(CustomerOrders customerOrders) { | |
log.info("received payload='{}'", customerOrders); | |
latch.countDown(); | |
customerOrdersRepository.save(customerOrders); | |
} | |
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}") | |
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) { | |
log.info("received payload='{}'", orderStatusChangeEvent); | |
latch.countDown(); | |
Criteria criteria = Criteria.where("orders.guid") | |
.is(orderStatusChangeEvent.getGuid()); | |
Query query = Query.query(criteria); | |
Update update = new Update(); | |
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent()); | |
mongoTemplate.updateFirst(query, update, "customer.orders"); | |
} | |
} |
As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service needs to receive messages from more than one topic. The ReceiverConfig
class deserializes all message using the StringDeserializer
. The Orders service’s ReceiverConfig
class references the Spring Kafka AbstractKafkaListenerContainerFactory
classes setMessageConverter method, which allows for dynamic object type matching (gist).
@Configuration | |
@EnableKafka | |
public class ReceiverConfigNotConfluent implements ReceiverConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.consumer.group-id}") | |
private String groupId; | |
@Value("${spring.kafka.consumer.auto-offset-reset}") | |
private String autoOffsetReset; | |
@Override | |
@Bean | |
public Map<String, Object> consumerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
return props; | |
} | |
@Override | |
@Bean | |
public ConsumerFactory<String, String> consumerFactory() { | |
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), | |
new StringDeserializer(), | |
new StringDeserializer() | |
); | |
} | |
@Bean | |
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, String> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
factory.setMessageConverter(new StringJsonMessageConverter()); | |
return factory; | |
} | |
@Override | |
@Bean | |
public Receiver receiver() { | |
return new Receiver(); | |
} | |
} |
Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver
class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent
message, the receiveOrderStatusChangeEvents
method is called to consume a message from the fulfillment.order.change
Kafka topic.
Part Two
In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
#1 by mvk_tech on July 29, 2018 - 10:47 pm
Great article. I am currently working on a similar system. I have a question though – How do you handle exceptions at the consumer. Say for example, Orders service’s Receiver is not able to consume the message (bad JSON) or cannot update the data in database. Will the Accounts service ever know that the data has gone out of sync? How do we reconcile or even detect this divergence?
#2 by Richard Williams on October 2, 2018 - 7:46 am
I love the graphics you use to illustrate your articles. In particular, the data model for mongo is just what I’m looking for. Can you tell me where I could find the software you used to create the data diagram for the Accounting service. Thanks
#3 by Gary A. Stafford on October 2, 2018 - 6:40 pm
Thanks. I use JetBrains IntelliJ for data diagrams and draw.io for other diagrams.
#4 by Jon on January 5, 2019 - 7:03 pm
How do you guarantee that an event will be delievered to Kafka?
What if something goes wrong?
Do you track somehow which events were delivered to Kafka?
#5 by Gary A. Stafford on January 6, 2019 - 10:42 am
Jon, great questions. There are many aspects of Kafka and event-driven architectures in general, which given their complexity, cannot be covered in a brief post. Error handling, retry logic, duplicate messages, message ordering, delivery auditing, DLQs, etc. are all considerations that would need to layered on to this post’s demonstration to completely operationalize it for Production.