Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ

RabbitMQEnventCons.png

Introduction

In a recent post, Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ, we moved away from synchronous REST HTTP for inter-process communications (IPC) toward message-based IPC. Moving to asynchronous message-based communications allows us to decouple services from one another. It makes it easier to build, test, and release our individual services. In that post, we did not achieve fully asynchronous communications. Although, we did achieve a higher level of service decoupling using message-based Remote Procedure Call (RPC) IPC.

In this post, we will fully decouple our services using the distributed computing model of eventual consistency. More specifically, we will use a message-based, event-driven, loosely-coupled, eventually consistent architectural approach for communications between services.

What is eventual consistency? One of the best definitions of eventual consistency I have read was posted on microservices.io. To paraphrase, ‘using an event-driven, eventually consistent approach, each service publishes an event whenever it updates its data. Other services subscribe to events. When an event is received, a service updates its data.

Example of Eventual Consistency

Imagine, Service A, the Customer service, inserts a new customer record into its database. Based on that ‘customer created’ event, Service A publishes a message containing the new customer object, serialized to JSON, to the lightweight, persistent, New Customer message queue.

Service B, the Customer Onboarding service, a subscriber to the New Customer queue, consumes and deserializes Service A’s message. Service B may or may not perform a data transformation of the Customer object to its own Customer data model. Service B then inserts the new customer record into its own database.

In the above example, it can be said that the customer records in Service B’s database are eventually consistent with the customer records in Service A’s database. Service A makes a change and publishes a message in response to the event. Service B consumes the message and makes the same change. Eventually (likely within milliseconds), Service B’s customer records are consistent with Service A’s customer records.

Why Eventual Consistency?

So what does this apparent added complexity and duplication of data buy us? Consider the advantages. Service B, the Onboarding service, requires no knowledge of, or a dependency on, Service A, the Customer service. Still, Service B has a current record of all the customers that Service A maintains. Instead of making repeated and potentially costly RESTful HTTP calls or RPC message-based calls to or from Service A to Service B for new customers, Service B queries its database for a list of customers.

The value of eventual consistency increases factorially as you scale a distributed system. Imagine dozens of distinct microservices, many requiring data from other microservices. Further, imagine multiple instances of each of those services all running in parallel. Decoupling services from one another, through asynchronous forms of IPC, messaging, and event-driven eventual consistency greatly simplifies the software development lifecycle and operations.

Demonstration

In this post, we could use a few different architectural patterns to demonstrate message passing with RabbitMQ and Spring AMQP. They including Work Queues, Publish/Subscribe, Routing, or Topics. To keep things as simple as possible, we will have a single Producer, publish messages to a single durable and persistent message queue. We will have a single Subscriber, a Consumer, consume the messages from that queue. We focus on a single type of event message.

Sample Code

To demonstrate Spring AMQP-based messaging with RabbitMQ, we will use a reference set of three Spring Boot microservices. The Election ServiceCandidate Service, and Voter Service are all backed by MongoDB. The services and MongoDB, along with RabbitMQ and Voter API Gateway, are all part of the Voter API.

The Voter API Gateway, based on HAProxy, serves as a common entry point to all three services, as well as serving as a reverse proxy and load balancer. The API Gateway provides round-robin load-balanced access to multiple instances of each service.

Voter_API_Architecture

All the source code found this post’s example is available on GitHub, within a few different project repositories. The Voter Service repository contains the Voter service source code, along with the scripts and Docker Compose files required to deploy the project. The Election Service repository, Candidate Service repository, and Voter API Gateway repository are also available on GitHub. There is also a new AngularJS/Node.js Web Client, to demonstrate how to use the Voter API.

For this post, you only need to clone the Voter Service repository.

Deploying Voter API

All components, including the Spring Boot services, MongoDB, RabbitMQ, API Gateway, and the Web Client, are individually deployed using Docker. Each component is publicly available as a Docker Image, on Docker Hub. The Voter Service repository contains scripts to deploy the entire set of Dockerized components, locally. The repository also contains optional scripts to provision a Docker Swarm, using Docker’s newer swarm mode, and deploy the components. We will only deploy the services locally for this post.

To clone and deploy the components locally, including the Spring Boot services, MongoDB, RabbitMQ, and the API Gateway, execute the following commands. If this is your first time running the commands, it may take a few minutes for your system to download all the required Docker Images from Docker Hub.


git clone –depth 1 –branch rabbitmq \
https://github.com/garystafford/voter-service.git
cd voter-service/scripts-services
sh ./stack_deploy_local.sh

If everything was deployed successfully, you should observe six running Docker containers, similar to the output, below.


CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
32d73282ff3d garystafford/voter-api-gateway:rabbitmq "/docker-entrypoin…" 8 seconds ago Up 5 seconds 0.0.0.0:8080->8080/tcp voterstack_voter-api-gateway_1
1ece438c5da4 garystafford/candidate-service:rabbitmq "java -Dspring.pro…" 10 seconds ago Up 7 seconds 0.0.0.0:8097->8080/tcp voterstack_candidate_1
30391faa3422 garystafford/voter-service:rabbitmq "java -Dspring.pro…" 10 seconds ago Up 7 seconds 0.0.0.0:8099->8080/tcp voterstack_voter_1
35063ccfe706 garystafford/election-service:rabbitmq "java -Dspring.pro…" 12 seconds ago Up 10 seconds 0.0.0.0:8095->8080/tcp voterstack_election_1
23eae86967a2 rabbitmq:management-alpine "docker-entrypoint…" 14 seconds ago Up 11 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp voterstack_rabbitmq_1
7e77ddecddbb mongo:latest "docker-entrypoint…" 24 seconds ago Up 21 seconds 0.0.0.0:27017->27017/tcp voterstack_mongodb_1

Using Voter API

The Voter Service, Election Service, and Candidate Service GitHub repositories each contain README files, which detail all the API endpoints each service exposes, and how to call them.

In addition to casting votes for candidates, the Voter service can simulate election results. Calling the /simulation endpoint, and indicating the desired election, the Voter service will randomly generate a number of votes for each candidate in that election. This will save us the burden of casting votes for this demonstration. However, the Voter service has no knowledge of elections or candidates. The Voter service depends on the Candidate service to obtain a list of candidates.

The Candidate service manages electoral candidates, their political affiliation, and the election in which they are running. Like the Voter service, the Candidate service also has a /simulation endpoint. The service will create a list of candidates based on the 2012 and 2016 US Presidential Elections. The simulation capability of the service saves us the burden of inputting candidates for this demonstration.

The Election service manages elections, their polling dates, and the type of election (federal, state, or local). Like the other services, the Election service also has a /simulation endpoint, which will create a list of sample elections. The Election service will not be discussed in this post’s demonstration. We will examine communications between the Candidate and Voter services, only.

REST HTTP Endpoint

As you recall from our previous post, Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ, the Voter service exposes multiple, almost identical endpoints. Each endpoint uses a different means of IPC to retrieve candidates and generates random votes.

Calling the /voter/simulation/http/{election} endpoint and providing a specific election, prompts the Voter service to request a list of candidates from the Candidate service, based on the election parameter you input. This request is done using synchronous REST HTTP. The Voter service uses the HTTP GET method to request the data from the Candidate service. The Voter service then waits for a response.

The Candidate service receives the HTTP request. The Candidate service responds to the Voter service with a list of candidates in JSON format. The Voter service receives the response payload containing the list of candidates. The Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message-based RPC Endpoint

Similarly, calling the /voter/simulation/rpc/{election} endpoint and providing a specific election, prompts the Voter service to request the same list of candidates. However, this time, the Voter service (the client) produces a request message and places in RabbitMQ’s voter.rpc.requests queue. The Voter service then waits for a response. The Voter service has no direct dependency on the Candidate service; it only depends on a response to its request message. In this way, it is still a form of synchronous IPC, but the Voter service is now decoupled from the Candidate service.

The request message is consumed by the Candidate service (the server), who is listening to that queue. In response, the Candidate service produces a message containing the list of candidates serialized to JSON. The Candidate service (the server) sends a response back to the Voter service (the client) through RabbitMQ. This is done using the Direct reply-to feature of RabbitMQ or using a unique response queue, specified in the reply-to header of the request message, sent by the Voter Service.

The Voter service receives the message containing the list of candidates. The Voter service deserializes the JSON payload to candidate objects. The Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, identical to the previous example, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters database.

New Endpoint

Calling the new /voter/simulation/db/{election} endpoint and providing a specific election, prompts the Voter service to query its own MongoDB database for a list of candidates.

But wait, where did the candidates come from? The Voter service didn’t call the Candidate service? The answer is message-based eventual consistency. Whenever a new candidate is created, using a REST HTTP POST request to the Candidate service’s /candidate/candidates endpoint, a Spring Data Rest Repository Event Handler responds. Responding to the candidate created event, the event handler publishes a message, containing a serialized JSON representation of the new candidate object, to a durable and persistent RabbitMQ queue.

The Voter service is listening to that queue. The Voter service consumes messages off the queue, deserializes the candidate object, and saves it to its own voters database, to the candidate collection. For this example, we are saving the incoming candidate object as is, with no transformations. The candidate object model for both services is identical.

When /voter/simulation/db/{election} endpoint is called, the Voter service queries its voters database for a list of candidates. They Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, identical to the previous two examples, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message_Queue_Diagram_Final3B

Exploring the Code

We will not review the REST HTTP or RPC IPC code in this post. It was covered in detail, in the previous post. Instead, we will explore the new code required for eventual consistency.

Spring Dependencies

To use AMQP with RabbitMQ, we need to add a project dependency on org.springframework.boot.spring-boot-starter-amqp. Below is a snippet from the Candidate service’s build.gradle file, showing project dependencies. The Voter service’s dependencies are identical.


dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-actuator-docs'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-rest'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-hateoas'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-logging'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web'
compile group: 'org.webjars', name: 'hal-browser'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test'
}

view raw

build.gradle

hosted with ❤ by GitHub

AMQP Configuration

Next, we need to add a small amount of RabbitMQ AMQP configuration to both services. We accomplish this by using Spring’s @Configuration annotation on our configuration classes. Below is the abridged configuration class for the Voter service.


package com.voterapi.voter.configuration;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class VoterConfig {
@Bean
public Queue candidateQueue() {
return new Queue("candidates.queue");
}
}

And here, the abridged configuration class for the Candidate service.


package com.voterapi.candidate.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CandidateConfig {
@Bean
public Queue candidateQueue() {
return new Queue("candidates.queue");
}
}

Event Handler

With our dependencies and configuration in place, we will define the CandidateEventHandler class. This class is annotated with the Spring Data Rest @RepositoryEventHandler and Spring’s @Component. The @Component annotation ensures the event handler is registered.

The class contains the handleCandidateSave method, which is annotated with the Spring Data Rest @HandleAfterCreate. The event handler acts on the Candidate object, which is the first parameter in the method signature.

Responding to the candidate created event, the event handler publishes a message, containing a serialized JSON representation of the new candidate object, to the candidates.queue queue. This was the queue we configured earlier.


package com.voterapi.candidate.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.voterapi.candidate.domain.Candidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.rest.core.annotation.HandleAfterCreate;
import org.springframework.data.rest.core.annotation.RepositoryEventHandler;
import org.springframework.stereotype.Component;
@Component
@RepositoryEventHandler
public class CandidateEventHandler {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private RabbitTemplate rabbitTemplate;
private Queue candidateQueue;
@Autowired
public CandidateEventHandler(RabbitTemplate rabbitTemplate, Queue candidateQueue) {
this.rabbitTemplate = rabbitTemplate;
this.candidateQueue = candidateQueue;
}
@HandleAfterCreate
public void handleCandidateSave(Candidate candidate) {
sendMessage(candidate);
}
private void sendMessage(Candidate candidate) {
rabbitTemplate.convertAndSend(
candidateQueue.getName(), serializeToJson(candidate));
}
private String serializeToJson(Candidate candidate) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(candidate);
} catch (JsonProcessingException e) {
logger.info(String.valueOf(e));
}
logger.debug("Serialized message payload: {}", jsonInString);
return jsonInString;
}
}

Consuming Messages

Next, we let’s switch to the Voter service’s CandidateListService class. Below is an abridged version of the class with two new methods. First, the getCandidateMessage method listens to the candidates.queue queue. This was the queue we configured earlier. The method is annotated with theSpring AMQP Rabbit @RabbitListener annotation.

The getCandidateMessage retrieves the new candidate object from the message, deserializes the message’s JSON payload, maps it to the candidate object model and saves it to the Voter service’s database.

The second method, getCandidatesQueueDb, retrieves the candidates from the Voter service’s database. The method makes use of the Spring Data MongoDB Aggregation package to return a list of candidates from MongoDB.


/**
* Consumes a new candidate message, deserializes, and save to MongoDB
* @param candidateMessage
*/
@RabbitListener(queues = "#{candidateQueue.name}")
public void getCandidateMessage(String candidateMessage) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
TypeReference<Candidate> mapType = new TypeReference<Candidate>() {};
Candidate candidate = null;
try {
candidate = objectMapper.readValue(candidateMessage, mapType);
} catch (IOException e) {
logger.info(String.valueOf(e));
}
candidateRepository.save(candidate);
logger.debug("Candidate {} saved to MongoDB", candidate.toString());
}
/**
* Retrieves candidates from MongoDB and transforms to voter view
* @param election
* @return List of candidates
*/
public List<CandidateVoterView> getCandidatesQueueDb(String election) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("election").is(election)),
project("firstName", "lastName", "politicalParty", "election")
.andExpression("concat(firstName,' ', lastName)")
.as("fullName"),
sort(Sort.Direction.ASC, "lastName")
);
AggregationResults<CandidateVoterView> groupResults
= mongoTemplate.aggregate(aggregation, Candidate.class, CandidateVoterView.class);
return groupResults.getMappedResults();
}

RabbitMQ Management Console

The easiest way to observe what is happening with the messages is using the RabbitMQ Management Console. To access the console, point your web browser to localhost, on port 15672. The default login credentials for the console are guest/guest. As you successfully produce and consume messages with RabbitMQ, you should see activity on the Overview tab.

RabbitMQ_EC_Durable3.png

Recall we said the queue, in this example, was durable. That means messages will survive the RabbitMQ broker stopping and starting. In the below view of the RabbitMQ Management Console, note the six messages persisted in memory. The Candidate service produced the messages in response to six new candidates being created. However, the Voter service was not running, and therefore, could not consume the messages. In addition, the RabbitMQ server was restarted, after receiving the candidate messages. The messages were persisted and still present in the queue after the successful reboot of RabbitMQ.

RabbitMQ_EC_Durable

Once RabbitMQ and the Voter service instance were back online, the Voter service successfully consumed the six waiting messages from the queue.

RabbitMQ_EC_Durable2.png

Service Logs

In addition to using the RabbitMQ Management Console, we may obverse communications between the two services by looking at the Voter and Candidate service’s logs. I have grabbed a snippet of both service’s logs and added a few comments to show where different processes are being executed.

First the Candidate service logs. We observe a REST HTTP POST request containing a new candidate. We then observe the creation of the new candidate object in the Candidate service’s database, followed by the event handler publishing a message on the queue. Finally, we observe the response is returned in reply to the initial REST HTTP POST request.


# REST HTTP POST Request received
2017-05-11 22:43:46.667 DEBUG 18702 — [nio-8097-exec-5] o.a.coyote.http11.Http11InputBuffer : Received [POST /candidate/candidates HTTP/1.1
Host: localhost:8097
User-Agent: HTTPie/0.9.8
Accept-Encoding: gzip, deflate
Accept: application/json, */*
Connection: keep-alive
Content-Type: application/json
Content-Length: 127
{"firstName": "Hillary", "lastName": "Clinton", "politicalParty": "Democratic Party", "election": "2016 Presidential Election"}]
2017-05-11 22:43:46.667 DEBUG 18702 — [nio-8097-exec-5] o.a.c.authenticator.AuthenticatorBase : Security checking request POST /candidate/candidates
# Inserting new Candidate into database
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] o.s.data.mongodb.core.MongoTemplate : Inserting DBObject containing fields: [_class, _id, firstName, lastName, politicalParty, election] in collection: candidate
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[candidates]
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] org.mongodb.driver.protocol.insert : Inserting 1 documents into namespace candidates.candidate on connection [connectionId{localValue:2, serverValue:147}] to server localhost:27017
2017-05-11 22:43:46.677 DEBUG 18702 — [nio-8097-exec-5] org.mongodb.driver.protocol.insert : Insert completed
# Publishing message on queue
2017-05-11 22:43:46.678 DEBUG 18702 — [nio-8097-exec-5] o.s.d.r.c.e.AnnotatedEventHandlerInvoker : Invoking AfterCreateEvent handler for Hillary Clinton (Democratic Party).
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] c.v.c.service.CandidateEventHandler : Serialized message payload: {"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@1bfb15f5 Shared Rabbit Connection: SimpleConnection@37d1ba14 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59422]
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message on exchange [], routingKey = [candidates.queue]
# Response to HTTP POST
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'persistentEntities'
2017-05-11 22:43:46.681 DEBUG 18702 — [nio-8097-exec-5] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'org.springframework.boot.actuate.autoconfigure.EndpointWebMvcHypermediaManagementContextConfiguration$ActuatorEndpointLinksAdvice'
2017-05-11 22:43:46.682 DEBUG 18702 — [nio-8097-exec-5] s.d.r.w.j.PersistentEntityJackson2Module : Serializing PersistentEntity org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity@1a4d1ab7.
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.w.s.m.m.a.HttpEntityMethodProcessor : Written [Resource { content: Hillary Clinton (Democratic Party), links: [<http://localhost:8097/candidate/candidates/591521621162e1490eb0d537&gt;;rel="self", <http://localhost:8097/candidate/candidates/591521621162e1490eb0d537{?projection}>;rel="candidate"] }] as "application/json" using [org.springframework.data.rest.webmvc.config.RepositoryRestMvcConfiguration$ResourceSupportHttpMessageConverter@27329d2a]
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.web.servlet.DispatcherServlet : Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.web.servlet.DispatcherServlet : Successfully completed request

Now the Voter service logs. At the exact same second as the message and the response sent by the Candidate service, the Voter service consumes the message off the queue. The Voter service then deserializes the new candidate object and inserts it into its database.


# Retrieving message from queue
2017-05-11 22:43:46.242 DEBUG 19001 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@78910096: tags=[{amq.ctag-WCLRWmQ6WRkGgxg-enVslA=candidates.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@386143c0 Shared Rabbit Connection: SimpleConnection@2d187d86 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59586], acknowledgeMode=AUTO local queue size=0
2017-05-11 22:43:46.684 DEBUG 19001 — [pool-1-thread-9] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for Consumer@78910096: tags=[{amq.ctag-WCLRWmQ6WRkGgxg-enVslA=candidates.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@386143c0 Shared Rabbit Connection: SimpleConnection@2d187d86 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59586], acknowledgeMode=AUTO local queue size=0
2017-05-11 22:43:46.685 DEBUG 19001 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:'{"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=candidates.queue, receivedDelay=null, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-WCLRWmQ6WRkGgxg-enVslA, consumerQueue=candidates.queue])
2017-05-11 22:43:46.686 DEBUG 19001 — [cTaskExecutor-1] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload={"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=candidates.queue, amqp_contentEncoding=UTF-8, amqp_deliveryTag=6, amqp_consumerQueue=candidates.queue, amqp_redelivered=false, id=608b990a-919b-52c1-fb64-4af4be03b306, amqp_consumerTag=amq.ctag-WCLRWmQ6WRkGgxg-enVslA, contentType=text/plain, timestamp=1494557026686}]]
# Inserting new Candidate into database
2017-05-11 22:43:46.687 DEBUG 19001 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoTemplate : Saving DBObject containing fields: [_class, _id, firstName, lastName, politicalParty, election]
2017-05-11 22:43:46.687 DEBUG 19001 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[voters]
2017-05-11 22:43:46.688 DEBUG 19001 — [cTaskExecutor-1] org.mongodb.driver.protocol.update : Updating documents in namespace voters.candidate on connection [connectionId{localValue:2, serverValue:151}] to server localhost:27017
2017-05-11 22:43:46.703 DEBUG 19001 — [cTaskExecutor-1] org.mongodb.driver.protocol.update : Update completed
2017-05-11 22:43:46.703 DEBUG 19001 — [cTaskExecutor-1] c.v.voter.service.CandidateListService : Candidate Hillary Clinton (Democratic Party) saved to MongoDB

MongoDB

Using the mongo Shell, we can observe six new 2016 Presidential Election candidates in the Candidate service’s database.


> show dbs
candidates 0.000GB
voters 0.000GB
> use candidates
switched to db candidates
> show collections
candidate
> db.candidate.find({})
{ "_id" : ObjectId("5915220e1162e14b2a42e65e"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Donald", "lastName" : "Trump", "politicalParty" : "Republican Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("5915220f1162e14b2a42e65f"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Chris", "lastName" : "Keniston", "politicalParty" : "Veterans Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e660"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Jill", "lastName" : "Stein", "politicalParty" : "Green Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e661"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Gary", "lastName" : "Johnson", "politicalParty" : "Libertarian Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e662"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Darrell", "lastName" : "Castle", "politicalParty" : "Constitution Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e663"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Hillary", "lastName" : "Clinton", "politicalParty" : "Democratic Party", "election" : "2016 Presidential Election" }

Now, looking at the Voter service’s database, we should find the same six 2016 Presidential Election candidates. Note the Object IDs are the same between the two service’s document sets, as are the rest of the fields (first name, last name, political party, and election). However, the class field is different between the two service’s records.


> show dbs
candidates 0.000GB
voters 0.000GB
> use voters
> db.candidate.find({})
{ "_id" : ObjectId("5915220e1162e14b2a42e65e"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Donald", "lastName" : "Trump", "politicalParty" : "Republican Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("5915220f1162e14b2a42e65f"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Chris", "lastName" : "Keniston", "politicalParty" : "Veterans Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e660"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Jill", "lastName" : "Stein", "politicalParty" : "Green Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e661"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Gary", "lastName" : "Johnson", "politicalParty" : "Libertarian Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e662"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Darrell", "lastName" : "Castle", "politicalParty" : "Constitution Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e663"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Hillary", "lastName" : "Clinton", "politicalParty" : "Democratic Party", "election" : "2016 Presidential Election" }

Production Considerations

The post demonstrated a simple example of message-based, event-driven eventual consistency. In an actual Production environment, there are a few things that must be considered.

  • We only addressed a ‘candidate created’ event. We would also have to code for other types of events, such as a ‘candidate deleted’ event and a ‘candidate updated’ event.
  • If a candidate is added, deleted, then re-added, are the events published and consumed in the right order? What about with multiple instances of the Voter service running? Does this pattern guarantee event ordering?
  • How should the Candidate service react on startup if RabbitMQ is not available
  • What if RabbitMQ fails after the Candidate services have started?
  • How should the Candidate service react if a new candidate record is added to the database, but a ‘candidate created’ event message cannot be published to RabbitMQ? The two actions are not wrapped in a single transaction.
  • In all of the above scenarios, what response should be returned to the API end user?

Conclusion

In this post, using eventual consistency, we successfully decoupled our two microservices and achieved asynchronous inter-process communications. Adopting a message-based, event-driven, loosely-coupled architecture, wherever possible, in combination with REST HTTP when it makes sense, will improve the overall manageability and scalability of a microservices-based platform.

References

All opinions in this post are my own and not necessarily the views of my current employer or their clients.

, , , , , , , , , , ,