Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ

RabbitMQ_Screen_3

Introduction

There has been a considerable growth in modern, highly scalable, distributed application platforms, built around fine-grained RESTful microservices. Microservices generally use lightweight protocols to communicate with each other, such as HTTP, TCP, UDP, WebSockets, MQTT, and AMQP. Microservices commonly communicate with each other directly using REST-based HTTP, or indirectly, using messaging brokers.

There are several well-known, production-tested messaging queues, such as Apache Kafka, Apache ActiveMQAmazon Simple Queue Service (SQS), and Pivotal’s RabbitMQ. According to Pivotal, of these messaging brokers, RabbitMQ is the most widely deployed open source message broker.

RabbitMQ supports multiple messaging protocols. RabbitMQ’s primary protocol, the Advanced Message Queuing Protocol (AMQP), is an open standard wire-level protocol and semantic framework for high-performance enterprise messaging. According to Spring, ‘AMQP has exchanges, routes, and queues. Messages are first published to exchanges. Routes define on which queue(s) to pipe the message. Consumers subscribing to that queue then receive a copy of the message.

Pivotal’s Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. The project’s libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration. The project provides a ‘template’ (RabbitTemplate) as a high-level abstraction for sending and receiving messages.

In this post, we will explore how to start moving Spring Boot Java services away from using synchronous REST HTTP for inter-process communications (IPC), and toward message-based IPC. Moving from synchronous IPC to messaging queues and asynchronous IPC decouples services from one another, allowing us to more easily build, test, and release individual microservices.

Message-Based RPC IPC

Decoupling services using asynchronous IPC is considered optimal by many enterprise software architects when developing modern distributed platforms. However, sometimes it is not easy or possible to get away from synchronous communications. Rightly or wrongly, often times services are architected, such that one service needs to retrieve data from another service or services, in order to process its own requests. It can be said, that service has a direct dependency on the other services. Many would argue, services, especially RESTful microservices, should not be coupled in this way.

There are several ways to break direct service-to-service dependencies using asynchronous IPC. We might implement request/async response REST HTTP-based IPC. We could also use publish/subscribe or publish/async response messaging queue-based IPC. These are all described by NGINX, in their article, Building Microservices: Inter-Process Communication in a Microservices Architecture; a must-read for anyone working with microservices. We might also implement an architecture which supports eventual consistency, eliminating the need for one service to obtain data from another service.

So what if we cannot implement asynchronous methods to break direct service dependencies, but we want to move toward message-based IPC? One answer is message-based Remote Procedure Call (RPC) IPC. I realize the mention of RPC might send cold shivers down the spine of many seasoned architected. Traditional RPC has several challenges, many which have been overcome with more modern architectural patterns.

According to Wikipedia, ‘in distributed computing, a remote procedure call (RPC) is when a computer program causes a procedure (subroutine) to execute in another address space (commonly on another computer on a shared network), which is coded as if it were a normal (local) procedure call, without the programmer explicitly coding the details for the remote interaction.

Although still a form of RPC and not asynchronous, it is possible to replace REST HTTP IPC with message-based RPC IPC. Using message-based RPC, services have no direct dependencies on other services. A service only depends on a response to a message request it makes to that queue. The services are now decoupled from one another. The requestor service (the client) has no direct knowledge of the respondent service (the server).

RPC with RabbitMQ and AMQP

RabbitMQ has an excellent set of six tutorials, which cover the basics of creating messaging applications, applying different architectural patterns, using RabbitMQ, in several different programming languages. The sixth and final tutorial covers using RabbitMQ for RPC-based IPC, with the request/reply architectural pattern.

Pivotal recently added Spring AMPQ implementations to each RabbitMQ tutorial, based on their Spring AMQP project. If you recall, the Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions.

This post’s RPC IPC example is closely based on the architectural pattern found in the Spring AMQP RabbitMQ tutorial.

Sample Code

To demonstrate Spring AMQP-based RPC IPC messaging with RabbitMQ, we will use a pair of simple Spring Boot microservices. These services, the Voter and Candidate services, have been used in several previous posts, and for training and testing DevOps engineers. Both services are backed by MongoDB. The services and MongoDB, along with RabbitMQ, are all part of the Voter API project. The Voter API project also contains an HAProxy-based API Gateway, which provides indirect, load-balanced access to the two services.

All code necessary to build this post’s example is available on GitHub, within three projects. The Voter Service project repository contains the Voter service source code, along with the scripts and Docker Compose files required to deploy the project. The Candidate Service project repository and the Voter API Gateway project repository are also available on GitHub. For this post, you need only clone the Voter Service project repository.

Deploying Voter API

All components, including the two Spring services, MongoDB, RabbitMQ, and the API Gateway, are individually deployed using Docker. Each component is publicly available as a Docker Image, on Docker Hub.

The Voter Service repository contains scripts to deploy the entire set of Dockerized components, locally. The repository also contains optional scripts to provision a Docker Swarm, using Docker’s newer swarm mode, and deploy the components. We will only deploy the services locally for this post.

To clone and deploy the components locally, including the two Spring services, MongoDB, RabbitMQ, and the API Gateway, execute the following commands. If this is your first time running the commands, it may take a few minutes for your system to download all the required Docker Images from Docker Hub.


git clone –depth 1 –branch rabbitmq \
https://github.com/garystafford/voter-service.git
cd voter-service/scripts-services
sh ./stack_deploy_local.sh

If everything was deployed successfully, you should see the following output. You should observe five running Docker containers.


? docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8ef4866984c3 garystafford/voter-api-gateway:rabbitmq "/docker-entrypoin…" 25 hours ago Up 25 hours 0.0.0.0:8080->8080/tcp voterstack_voter-api-gateway_1
cc28d084ab17 garystafford/candidate-service:rabbitmq "java -Dspring.pro…" 25 hours ago Up 25 hours 0.0.0.0:8097->8080/tcp voterstack_candidate_1
e4c22258b77b garystafford/voter-service:rabbitmq "java -Dspring.pro…" 25 hours ago Up 25 hours 0.0.0.0:8099->8080/tcp voterstack_voter_1
fdb4b9f58a53 rabbitmq:management-alpine "docker-entrypoint…" 25 hours ago Up 25 hours 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp voterstack_rabbitmq_1
1678227b143c mongo:latest "docker-entrypoint…" 25 hours ago Up 25 hours 0.0.0.0:27017->27017/tcp voterstack_mongodb_1

Using Voter API

The Voter Service and Candidate Service GitHub repositories both contain README files, which detail all the API endpoints each service exposes, and how to call them.

In addition to casting votes for candidates, the Voter service has the ability to simulate election results. By calling a /simulation endpoint, and indicating the desired election, the Voter service will randomly generate a number of votes for each candidate in that election. This will save us the burden of casting votes for this demonstration. However, the Voter service has no knowledge of elections or candidates. To obtain a list of candidates, the Voter service depends on the Candidate service.

The Candidate service manages electoral candidates, their political affiliation, and the election in which they are running. Like the Voter service, the Candidate service also has a /simulation endpoint. The service will create a list of candidates based on the 2012 and 2016 US Presidential Elections. The simulation capability of the service saves us the burden of inputting candidates for this demonstration.

REST HTTP Endpoint

The Voter service exposes two almost identical endpoints. Both endpoints generate random votes. However, below the covers, the two endpoints are very different. Calling the /voter/simulation/http/{election} endpoint, prompts the Voter service to request a list of candidates from the Candidate service, based on the election parameter you input. This request is done using synchronous REST HTTP. The Voter service uses the HTTP GET method to request the data from the Candidate service. The Voter service then waits for a response.

The HTTP request is received by the Candidate service. The Candidate service responds to the Voter service with a list of candidates, in JSON format. The Voter service receives the response containing the list of candidates. The Voter service then proceeds to generate a random number of votes for each candidate. Finally, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message Queue Diagram 1D

Message-based RPC Endpoint

Similarly, calling the /voter/simulation/rpc/{election} endpoint with a specific election prompts the Voter service to request the same list of candidates. However, this time, the Voter service (the client), produces a request message and places in RabbitMQ’s voter.rpc.requests queue. The Voter service then waits for a response. The Voter service has no direct dependency on the Candidate service. It only depends on a response to its message request. In this way, it is still a form of synchronous IPC, but the Voter service is now decoupled from the Candidate service.

The request message is consumed by the Candidate service (the server), who is listening to that queue. In response, the Candidate service produces a message containing the list of candidates, serialized to JSON. The Candidate service (the server) sends a response back to the Voter service (the client), through RabbitMQ. This is done using the Direct reply-to feature of RabbitMQ or using a unique response queue, specified in the reply-to header of the request message, sent by the Voter Service.

According to RabbitMQ, ‘the direct reply-to feature allows RPC clients to receive replies directly from their RPC server, without going through a reply queue. (“Directly” here still means going through AMQP and the RabbitMQ server; there is no separate network connection between RPC client and RPC server.)

According to Spring, ‘starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with Spring AMQP version 1.4.1 Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues. When no replyQueue is provided (or it is set with the name amq.rabbitmq.reply-to), the RabbitTemplate will automatically detect whether Direct reply-to is supported and use it, or fall back to using a temporary reply queue. When using Direct reply-to, a reply-listener is not required and should not be configured.’ We are using the latest versions of both RabbitMQ and Spring AMQP, which should support Direct reply-to.

The Voter service receives the message containing the list of candidates. The Voter service deserializes the JSON payload to Candidate objects and proceeds to generate a random number of votes for each candidate in the list. Finally, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message Queue Diagram 2D

Exploring the RPC Code

We will not examine the REST HTTP IPC code in this post. Instead, we will explore the RPC code. You are welcome to download the source code and explore the REST HTTP code pattern; it uses some advanced features of Spring Boot and Spring Data.

Spring Dependencies

In order to use RabbitMQ, we need to add a project dependency on org.springframework.boot.spring-boot-starter-amqp. Below is a snippet from the Candidate service’s build.gradle file, showing project dependencies. The Voter service’s dependencies are identical.


dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-actuator-docs'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-rest'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-hateoas'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-logging'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web'
compile group: 'org.webjars', name: 'hal-browser'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test'
}

view raw

build.gradle

hosted with ❤ by GitHub

AMQP Configuration

Next, we need to add a small amount of RabbitMQ AMQP configuration to both services. We accomplish this by using Spring’s @Configuration annotation on our configuration classes. Below is the configuration class for the Voter service.


package com.voterapi.voter.configuration;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class VoterConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("voter.rpc");
}
}

And here, the configuration class for the Candidate service.


package com.voterapi.candidate.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CandidateConfig {
@Bean
public Queue queue() {
return new Queue("voter.rpc.requests");
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("voter.rpc");
}
@Bean
public Binding binding(DirectExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("rpc");
}
}

Candidate Service Code

With the dependencies and configuration in place, we define the method in the Voter service, which will request the candidates from the Candidate service, using RabbitMQ. Below is an abridged version of the Voter service’s CandidateListService class, containing the getCandidatesMessageRpc method. This method calls the rabbitTemplate.convertSendAndReceive method (see line 5, below).


public List<CandidateVoterView> getCandidatesMessageRpc(String election) {
logger.debug("Sending RPC request message for list of candidates…");
String requestMessage = election;
String candidates = (String) rabbitTemplate.convertSendAndReceive(
directExchange.getName(), "rpc", requestMessage);
TypeReference<Map<String, List<CandidateVoterView>>> mapType =
new TypeReference<Map<String, List<CandidateVoterView>>>() {};
ObjectMapper objectMapper = new ObjectMapper();
Map<String, List<CandidateVoterView>> candidatesMap = null;
try {
candidatesMap = objectMapper.readValue(candidates, mapType);
} catch (IOException e) {
logger.info(String.valueOf(e));
}
List<CandidateVoterView> candidatesList = candidatesMap.get("candidates");
logger.debug("List of {} candidates received…", candidatesList.size());
return candidatesList;
}

Voter Service Code

Next, we define a method in the Candidate service, which will process the Voter service’s request. Below is an abridged version of the CandidateController class, containing the getCandidatesMessageRpc method. This method is decorated with Spring’s @RabbitListener annotation (see line 1, below). This annotation marks c to be the target of a Rabbit message listener on the voter.rpc.requests queue.

Also shown, are the getCandidatesMessageRpc method’s two helper methods, getByElection and serializeToJson. These methods query MongoDB for the list of candidates and serialize the list to JSON.


@RabbitListener(queues = "voter.rpc.requests")
private String getCandidatesMessageRpc(String requestMessage) {
logger.debug("Request message: {}", requestMessage);
logger.debug("Sending RPC response message with list of candidates…");
List<CandidateVoterView> candidates = getByElection(requestMessage);
return serializeToJson(candidates);
}
private List<CandidateVoterView> getByElection(String election) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("election").is(election)),
project("firstName", "lastName", "politicalParty", "election")
.andExpression("concat(firstName,' ', lastName)")
.as("fullName"),
sort(Sort.Direction.ASC, "lastName")
);
AggregationResults<CandidateVoterView> groupResults
= mongoTemplate.aggregate(aggregation, Candidate.class, CandidateVoterView.class);
return groupResults.getMappedResults();
}
private String serializeToJson(List<CandidateVoterView> candidates) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
final Map<String, List<CandidateVoterView>> dataMap = new HashMap<>();
dataMap.put("candidates", candidates);
try {
jsonInString = mapper.writeValueAsString(dataMap);
} catch (JsonProcessingException e) {
logger.info(String.valueOf(e));
}
logger.debug(jsonInString);
return jsonInString;
}

Demonstration

To demonstrate both the synchronous REST HTTP IPC code and the Spring AMQP-based RPC IPC code, we will make a few REST HTTP calls to the Voter API Gateway. For convenience, I have provided a shell script, demostrate_ipc.sh, which executes all the API calls necessary. I have added sleep commands to slow the output to the terminal down a bit, for easier analysis. The script requires HTTPie, a great time saver when working with RESTful services.

The demostrate_ipc.sh script does three things. First, it calls the Candidate service to generate a group of sample candidates. Next, the script calls the Voter service to simulate votes, using synchronous REST HTTP. Lastly, the script repeats the voter simulation, this time using message-based RPC IPC. All API calls are done through the Voter API Gateway on port 8080. To understand the API calls, examine the script, below.


#!/bin/sh
# Demostrate API calls for REST HTTP IPC and RPC IPC via API Gateway
# Requires HTTPie
# Requires all services are running
set -e
HOST=${1:-localhost:8080}
API_GATEWAY="http://${HOST}"
ELECTION="2016%20Presidential%20Election"
echo "Simulating candidates…"
http ${API_GATEWAY}/candidate/simulation && sleep 2
http ${API_GATEWAY}/candidate/candidates/summary/${ELECTION} && sleep 2
echo "Simulating voting using REST HTTP IPC…"
http ${API_GATEWAY}/voter/simulation/http/${ELECTION} && sleep 2
http ${API_GATEWAY}/voter/results && sleep 4
http ${API_GATEWAY}/voter/winners && sleep 2
echo "Simulating voting using message-based RPC IPC…"
http ${API_GATEWAY}/voter/simulation/rpc/${ELECTION} && sleep 2
http ${API_GATEWAY}/voter/results && sleep 4
http ${API_GATEWAY}/voter/winners && sleep 2
echo "Script completed…"

Below is the list of candidates for the 2016 Presidential Election, generated by the Candidate service. The JSON payload was retrieved using the Voter service’s /voter/candidates/rpc/{election} endpoint. This endpoint uses the same RPC IPC method as the Voter service’s /voter/simulation/rpc/{election} endpoint.


HTTP/1.1 200
Access-Control-Allow-Credentials: true
Access-Control-Allow-Headers: Content-Type, Accept, X-Requested-With, remember-me
Access-Control-Allow-Methods: POST, GET, OPTIONS, DELETE
Access-Control-Max-Age: 3600
Content-Type: application/json;charset=UTF-8
Date: Sun, 07 May 2017 19:10:22 GMT
Transfer-Encoding: chunked
X-Application-Context: Voter Service:docker-local:8099
{
"candidates": [
{
"election": "2016 Presidential Election",
"fullName": "Darrell Castle",
"politicalParty": "Constitution Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Hillary Clinton",
"politicalParty": "Democratic Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Gary Johnson",
"politicalParty": "Libertarian Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Chris Keniston",
"politicalParty": "Veterans Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Jill Stein",
"politicalParty": "Green Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Donald Trump",
"politicalParty": "Republican Party"
}
]
}

Based on the list of candidates, below are the simulated election results. This JSON payload was retrieved using the Voter service’s /voter/results endpoint.


HTTP/1.1 200
Access-Control-Allow-Credentials: true
Access-Control-Allow-Headers: Content-Type, Accept, X-Requested-With, remember-me
Access-Control-Allow-Methods: POST, GET, OPTIONS, DELETE
Access-Control-Max-Age: 3600
Content-Type: application/json;charset=UTF-8
Date: Sun, 07 May 2017 19:42:42 GMT
Transfer-Encoding: chunked
X-Application-Context: Voter Service:docker-local:8099
{
"results": [
{
"candidate": "Jill Stein",
"votes": 19
},
{
"candidate": "Gary Johnson",
"votes": 16
},
{
"candidate": "Hillary Clinton",
"votes": 12
},
{
"candidate": "Donald Trump",
"votes": 12
},
{
"candidate": "Chris Keniston",
"votes": 10
},
{
"candidate": "Darrell Castle",
"votes": 9
}
]
}

RabbitMQ Management Console

The easiest way to observe what is happening with our messages is using the RabbitMQ Management Console. To access the console, point your web-browser to localhost, on port 15672. The default login credentials for the console are guest/guest.

As you successfully send and receive messages between the services through RabbitMQ, you should see activity on the Overview tab. In addition, you should see a number of Connections, Channels, Exchanges, Queues, and Consumers.

RabbitMQ_Screen_3

In the Queues tab, you should find a single queue, the voter.rpc.requests queue. This queue was configured in the Candidate service’s configuration class, shown previously.

RabbitMQ_Screen_2

In the Exchanges tab, you should see one exchange, voter.rpc, which we configured in both the Voter and the Candidate service’s configuration classes (aka DirectExchange). Also, visible in the Exchanges tab, should be the routing key rpc, which we configured in the Candidate service’s configuration class (aka Binding).

The route binds the exchange to the voter.rpc.requests queue. If you recall Spring’s description, AMQP has exchanges (DirectExchange), routes (Binding), and queues (Queue). Messages are first published to exchanges. Routes define on which queue(s) to pipe the message. Consumers subscribing to that queue then receive a copy of the message.

RabbitMQ_Screen_1

In the Channels tab, you should note two connections, the single instances of the Voter and Candidate services. Likewise, there are two channels, one for each service. You can differentiate the channels by the presence of the consumer tag. The consumer tag, in this example, amq.ctag-Anv7GXs7ZWVoznO64euyjQ, uniquely identifies the consumer. In this example, the Voter service is the consumer. For a more complete explanation of the consumer tag, check out RabbitMQ’s AMQP documentation.

RabbitMQ_Screen_4.png

Message Structure

Messages cannot be viewed directly in the RabbitMQ Management Console. One way I have found to view messages is using your IDE’s debugger. Below, I have added a breakpoint on the Candidate service’ getCandidatesMessageRpc method, using IntelliJ IDEA. You can view the Voter service’s request message, as it is received by the Candidate service.

Debug_RPC_Message.png

Note the message payload, the requested election. Note the twelve message header elements. The headers include the AMQP exchange, queue, and binding. The message headers also include the consumer tag. The message also uniquely identifies the reply-to queue to use, if the server does not support Direct reply-to (see earlier explanation).

Service Logs

In addition to the RabbitMQ Management Console, we may obverse communications between the two services, by looking at the Voter and Candidate service’s logs. I have grabbed a snippet of both service’s logs and added a few comments to show where different processes are being executed. First the Voter service logs.


# API request is made
2017-05-03 21:10:32.947 DEBUG 1 — [nio-8099-exec-3] s.w.s.m.m.a.RequestMappingHandlerMapping : Looking up handler method for path /simulation/rpc/2016 Presidential Election
2017-05-03 21:10:32.962 DEBUG 1 — [nio-8099-exec-3] s.w.s.m.m.a.RequestMappingHandlerMapping : Returning handler method [public org.springframework.http.ResponseEntity<java.util.Map<java.lang.String, java.lang.String>> com.voter_api.voter.controller.VoterController.getSimulationRpc(java.lang.String)]
2017-05-03 21:10:32.967 DEBUG 1 — [nio-8099-exec-3] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'voterController'
2017-05-03 21:10:32.969 DEBUG 1 — [nio-8099-exec-3] o.s.web.servlet.DispatcherServlet : Last-Modified value for [/voter/simulation/rpc/2016%20Presidential%20Election] is: -1
# Clearing out previous MongoDB data
2017-05-03 21:10:32.977 DEBUG 1 — [nio-8099-exec-3] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[voter]
2017-05-03 21:10:32.980 DEBUG 1 — [nio-8099-exec-3] o.s.data.mongodb.core.MongoTemplate : Remove using query: { } in collection: vote.
2017-05-03 21:10:32.985 DEBUG 1 — [nio-8099-exec-3] org.mongodb.driver.protocol.delete : Deleting documents from namespace voter.vote on connection [connectionId{localValue:2, serverValue:4}] to server mongodb:27017
2017-05-03 21:10:32.990 DEBUG 1 — [nio-8099-exec-3] org.mongodb.driver.protocol.delete : Delete completed
# Publishing request message to queue
2017-05-03 21:10:32.999 DEBUG 1 — [nio-8099-exec-3] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@247be51c Shared Rabbit Connection: SimpleConnection@61797757 [delegate=amqp://guest@172.20.0.2:5672/, localPort= 57908]
2017-05-03 21:10:33.018 DEBUG 1 — [nio-8099-exec-3] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message on exchange [voter.rpc], routingKey = [rpc]
# Receiving response
2017-05-03 21:10:33.109 DEBUG 1 — [nio-8099-exec-3] o.s.amqp.rabbit.core.RabbitTemplate : Reply: (Body:'[{"fullName":"Darrell Castle","politicalParty":"Constitution Party","election":"2016 Presidential Election"},{"fullName":"Hillary Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election"},{"fullName":"Gary Johnson","politicalParty":"Libertarian Party","election":"2016 Presidential Election"},{"fullName":"Chris Keniston","politicalParty":"Veterans Party","election":"2016 Presidential Election"},{"fullName":"Jill Stein","politicalParty":"Green Party","election":"2016 Presidential Election"},{"fullName":"Donald Trump","politicalParty":"Republican Party","election":"2016 Presidential Election"}]' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAA9yYWJiaXRAcmFiYml0bXEAAAH3AAAAAAI=.GREaYm1ow+4nMWzSClXlfQ==, receivedDelay=null, deliveryTag=1, messageCount=null, consumerTag=null, consumerQueue=null])
# Inserting simulation data into MongoDB
2017-05-03 21:10:33.154 DEBUG 1 — [nio-8099-exec-3] o.s.data.mongodb.core.MongoTemplate : Inserting list of DBObjects containing 34 items
2017-05-03 21:10:33.154 DEBUG 1 — [nio-8099-exec-3] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[voter]
2017-05-03 21:10:33.157 DEBUG 1 — [nio-8099-exec-3] org.mongodb.driver.protocol.insert : Inserting 34 documents into namespace voter.vote on connection [connectionId{localValue:2, serverValue:4}] to server mongodb:27017
2017-05-03 21:10:33.169 DEBUG 1 — [nio-8099-exec-3] org.mongodb.driver.protocol.insert : Insert completed
# Sending response to API call
2017-05-03 21:10:33.180 DEBUG 1 — [nio-8099-exec-3] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'org.springframework.boot.actuate.autoconfigure.EndpointWebMvcHypermediaManagementContextConfiguration$ActuatorEndpointLinksAdvice'
2017-05-03 21:10:33.182 DEBUG 1 — [nio-8099-exec-3] o.s.w.s.m.m.a.HttpEntityMethodProcessor : Written [{message=Simulation data created using RPC!}] as "application/json" using [org.springframework.http.converter.json.MappingJackson2HttpMessageConverter@387a8303]
2017-05-03 21:10:33.185 DEBUG 1 — [nio-8099-exec-3] o.s.web.servlet.DispatcherServlet : Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling
2017-05-03 21:10:33.186 DEBUG 1 — [nio-8099-exec-3] o.s.web.servlet.DispatcherServlet : Successfully completed request

view raw

voter_log.txt

hosted with ❤ by GitHub

Next, the Candidate service logs.


# Listening for messages
2017-05-03 21:10:30.000 DEBUG 1 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@662706a7: tags=[{amq.ctag-Anv7GXs7ZWVoznO64euyjQ=voter.rpc.requests}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@6f92666f Shared Rabbit Connection: SimpleConnection@6badffc2 [deleg2017-05-03 21:10:31.001 DEBUG 1 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@662706a7: tags=[{amq.ctag-Anv7GXs7ZWVoznO64euyjQ=voter.rpc.requests}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@6f92666f Shared Rabbit Connection: SimpleConnection@6badffc2 [delegate=amqp://guest@172.20.0.2:5672/, localPort= 33932], acknowledgeMode=AUTO local queue size=0
2017-05-03 21:10:32.003 DEBUG 1 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@662706a7: tags=[{amq.ctag-Anv7GXs7ZWVoznO64euyjQ=voter.rpc.requests}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@6f92666f Shared Rabbit Connection: SimpleConnection@6badffc2 [delegate=amqp://guest@172.20.0.2:5672/, localPort= 33932], acknowledgeMode=AUTO local queue size=0
2017-05-03 21:10:33.005 DEBUG 1 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@662706a7: tags=[{amq.ctag-Anv7GXs7ZWVoznO64euyjQ=voter.rpc.requests}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@6f92666f Shared Rabbit Connection: SimpleConnection@6badffc2 [delegate=amqp://guest@172.20.0.2:5672/, localPort= 33932], acknowledgeMode=AUTO local queue size=0
# Retrieving message
2017-05-03 21:10:33.044 DEBUG 1 — [pool-1-thread-5] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for Consumer@662706a7: tags=[{amq.ctag-Anv7GXs7ZWVoznO64euyjQ=voter.rpc.requests}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@6f92666f Shared Rabbit Connection: SimpleConnection@6badffc2 [delegate=amqp://guest@172.20.0.2:5672/, localPort= 33932], acknowledgeMode=AUTO local queue size=0
2017-05-03 21:10:33.049 DEBUG 1 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:'2016 Presidential Election' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=amq.rabbitmq.reply-to.g2dkAA9yYWJiaXRAcmFiYml0bXEAAAH3AAAAAAI=.GREaYm1ow+4nMWzSClXlfQ==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=voter.rpc, receivedRoutingKey=rpc, receivedDelay=null, deliveryTag=14, messageCount=0, consumerTag=amq.ctag-Anv7GXs7ZWVoznO64euyjQ, consumerQueue=voter.rpc.requests])
2017-05-03 21:10:33.054 DEBUG 1 — [cTaskExecutor-1] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload=2016 Presidential Election, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=rpc, amqp_contentEncoding=UTF-8, amqp_receivedExchange=voter.rpc, amqp_deliveryTag=14, amqp_replyTo=amq.rabbitmq.reply-to.g2dkAA9yYWJiaXRAcmFiYml0bXEAAAH3AAAAAAI=.GREaYm1ow+4nMWzSClXlfQ==, amqp_consumerQueue=voter.rpc.requests, amqp_redelivered=false, id=bbd84286-fae6-36e2-f5e8-d8d9714cde6c, amqp_consumerTag=amq.ctag-Anv7GXs7ZWVoznO64euyjQ, contentType=text/plain, timestamp=1493845833053}]]
2017-05-03 21:10:33.057 DEBUG 1 — [cTaskExecutor-1] c.v.c.controller.CandidateController : Request message: 2016 Presidential Election
# Querying MongDB for candidates
2017-05-03 21:10:33.063 DEBUG 1 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoTemplate : Executing aggregation: { "aggregate" : "candidate" , "pipeline" : [ { "$match" : { "election" : "2016 Presidential Election"}} , { "$project" : { "firstName" : 1 , "lastName" : 1 , "politicalParty" : 1 , "election" : 1 , "fullName" : { "$concat" : [ "$firstName" , " " , "$lastName"]}}} , { "$sort" : { "lastName" : 1}}]}
2017-05-03 21:10:33.063 DEBUG 1 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[candidates]
2017-05-03 21:10:33.064 DEBUG 1 — [cTaskExecutor-1] org.mongodb.driver.protocol.command : Sending command {aggregate : BsonString{value='candidate'}} to database candidates on connection [connectionId{localValue:2, serverValue:3}] to server mongodb:27017
2017-05-03 21:10:33.067 DEBUG 1 — [cTaskExecutor-1] org.mongodb.driver.protocol.command : Command execution completed
# Responding to queue with results
2017-05-03 21:10:33.094 DEBUG 1 — [cTaskExecutor-1] .a.r.l.a.MessagingMessageListenerAdapter : Listener method returned result [[{"fullName":"Darrell Castle","politicalParty":"Constitution Party","election":"2016 Presidential Election"},{"fullName":"Hillary Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election"},{"fullName":"Gary Johnson","politicalParty":"Libertarian Party","election":"2016 Presidential Election"},{"fullName":"Chris Keniston","politicalParty":"Veterans Party","election":"2016 Presidential Election"},{"fullName":"Jill Stein","politicalParty":"Green Party","election":"2016 Presidential Election"},{"fullName":"Donald Trump","politicalParty":"Republican Party","election":"2016 Presidential Election"}]] – generating response message for it
2017-05-03 21:10:33.096 DEBUG 1 — [cTaskExecutor-1] .a.r.l.a.MessagingMessageListenerAdapter : Publishing response to exchange = [], routingKey = [amq.rabbitmq.reply-to.g2dkAA9yYWJiaXRAcmFiYml0bXEAAAH3AAAAAAI=.GREaYm1ow+4nMWzSClXlfQ==]
# Returning to listening for messages
2017-05-03 21:10:33.123 DEBUG 1 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@662706a7: tags=[{amq.ctag-Anv7GXs7ZWVoznO64euyjQ=voter.rpc.requests}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@6f92666f Shared Rabbit Connection: SimpleConnection@6badffc2 [delegate=amqp://guest@172.20.0.2:5672/, localPort= 33932], acknowledgeMode=AUTO local queue size=0
2017-05-03 21:10:34.125 DEBUG 1 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@662706a7: tags=[{amq.ctag-Anv7GXs7ZWVoznO64euyjQ=voter.rpc.requests}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@6f92666f Shared Rabbit Connection: SimpleConnection@6badffc2 [delegate=amqp://guest@172.20.0.2:5672/, localPort= 33932], acknowledgeMode=AUTO local queue size=0
2017-05-03 21:10:35.126 DEBUG 1 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@662706a7: tags=[{amq.ctag-Anv7GXs7ZWVoznO64euyjQ=voter.rpc.requests}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.20.0.2:5672/,1), conn: Proxy@6f92666f Shared Rabbit Connection: SimpleConnection@6badffc2 [delegate=amqp://guest@172.20.0.2:5672/, localPort= 33932], acknowledgeMode=AUTO local queue size=0

Performance

What about the performance of Spring AMQP RPC IPC versus REST HTTP IPC? RabbitMQ has proven to be very performant, having been clocked at one million messages per second on GCE. I performed a series of fairly ‘unscientific’ performance tests, completing 250, 500, and then 1,000 requests. The tests were performed on a six-node Docker Swarm cluster with three instances of each service in a round-robin load-balanced configuration, and a single instance of RabbitMQ. The scripts to create the swarm cluster can be found in the Voter service GitHub project.

Based on consistent test results, the speed of the two methods was almost identical. Both methods performed between 3.1 to 3.2 responses per second. For example, the Spring AMQP RPC IPC method successfully completed 1,000 requests in 5 minutes and 11 seconds, while the REST HTTP IPC method successfully completed 1,000 requests in 5 minutes and 18 seconds, 7 seconds slower than the RPC method.

RabbitMQ on Docker Swarm

There are many variables to consider, which could dramatically impact IPC performance. For example, RabbitMQ was not clustered. Also, we did not use any type of caching, such as Varnish, Memcached, or Redis. Both these could dramatically increase IPC performance.

There are also several notable differences between the two methods from a code perspective. The REST HTTP method relies on Spring Data Projection combined with Spring Data MongoDB Repository, to obtain the candidate list from MongoDB. Somewhat differently, the RPC method makes use of Spring Data MongoDB Aggregation to return a list of candidates. Therefore, the test results should be taken with a grain of salt.

Production Considerations

The post demonstrated a simple example of RPC communications between two services using Spring AMQP. In an actual Production environment, there are a few things that must be considered, as Pivotal points out:

  • How should either service react on startup if RabbitMQ is not available? What if RabbitMQ fails after the services have started?
  • How should the Voter server (the client) react if there are no Candidate service instances (the server) running?
  • Should the Voter service have a timeout for the RPC response to return? What should happen if the request times out?
  • If the Candidate service malfunctions and raises an exception, should it be forwarded to the Voter service?
  • How does the Voter service protect against invalid incoming messages (eg checking bounds of the candidate list) before processing?
  • In all of the above scenarios, what, if any, response is returned to the API end user?

Conclusion

Although in this post we did not achieve asynchronous inter-process communications, we did achieve a higher level of service decoupling, using message-based RPC IPC. Adopting a message-based, loosely-coupled architecture, whether asynchronous or synchronous, wherever possible, will improve the overall functionality and deliverability of a microservices-based platform.

References

All opinions in this post are my own and not necessarily the views of my current employer or their clients.

, , , , , , , , ,

  1. #1 by brunozambiazi on February 2, 2019 - 12:45 pm

    Congratulations on this article, very well explained. 🙂

    I’d like to share just one thought: by default, Spring AMQP creates only one consumer by queue, which means that, if we perform say 100 simultaneous requests to the /rpc endpoint, all of them will be processed sequentially (on the candidates service), one by one, which depending on the case could degrades performance drastically.

    This always comes to my mind when I have to decide if I should go through REST vs Direct Reply-To. What do you think about that? How do you handle that kind of situation?

  2. #2 by Vivek on August 6, 2021 - 6:31 am

    Thanks for explaining this in detailed.

  3. #3 by Faraz on December 29, 2021 - 3:00 am

    Thanks for a great article!

  1. Using Eventual Consistency 
and Spring for Kafka to Manage a Distributed Data Model: Part 1 | Programmatic Ponderings

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: