Archive for category Java Development

Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service

Introduction

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives.

However, installing, configuring, and managing the technologies that support big data analytics, data science, ML, and AI, at scale and in Production, often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. The mere ability to manage terabytes and petabytes of transient data is beyond the capability of many enterprises, let alone performing analysis of that data.

To ease the burden of implementing these technologies, the three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data Analytics-, AI-, and ML-as-a-Service offerings. In this post, we will explore one such cloud-based product offering in the field of big data analytics, Google Cloud Dataproc. We will focus on Cloud Dataproc’s ability to quickly and efficiently run Spark jobs written in Java and Python, two widely adopted enterprise programming languages.

Technologies

The following technologies are featured prominently in this post.

dataproc

Google Cloud Dataproc

dataproc_logoAccording to Google, Cloud Dataproc is a fast, easy-to-use, fully-managed cloud service for running the Apache Spark and Apache Hadoop ecosystem on Google Cloud Platform. Dataproc is a complete platform for data processing, analytics, and machine learning. Dataproc offers per-second billing, so you only pay for exactly the resources you consume. Dataproc offers frequently updated and native versions of Apache Spark, Hadoop, Pig, and Hive, as well as other related applications. Dataproc has built-in integrations with other Google Cloud Platform (GCP) services, such as Cloud Storage, BigQuery, Bigtable, Stackdriver Logging, and Stackdriver Monitoring. Dataproc’s clusters are configurable and resizable from a three to hundreds of nodes, and each cluster action takes less than 90 seconds on average.

Similar Platform as a Service (PaaS) offerings to Dataproc, include Amazon Elastic MapReduce (EMR), Microsoft Azure HDInsight, and Qubole Data Service. Qubole is offered on AWS, Azure, and Oracle Cloud Infrastructure (Oracle OCI).

According to Google, Cloud Dataproc and Cloud Dataflow, both part of GCP’s Data Analytics/Big Data Product offerings, can both be used for data processing, and there’s overlap in their batch and streaming capabilities. Cloud Dataflow is a fully-managed service for transforming and enriching data in stream and batch modes. Dataflow uses the Apache Beam SDK to provide developers with Java and Python APIs, similar to Spark.

Apache Spark

spark_logoAccording to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With in-memory speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.

Spark’s polyglot programming model allows users to write applications in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). Spark may be run using its standalone cluster mode or on Apache Hadoop YARNMesos, and Kubernetes.

PySpark

pyspark_logoThe Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.

Apache Hadoop

hadoop_logo1According to Apache, the Apache Hadoop project develops open-source software for reliable, scalable, distributed computing. The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. This is a rather modest description of such a significant and transformative project. When we talk about Hadoop, often it is in the context of the project’s well-known modules, which includes:

  • Hadoop Common: The common utilities that support the other Hadoop modules
  • Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data
  • Hadoop YARN (Yet Another Resource Negotiator): A framework for job scheduling and cluster resource management, also known as ‘Hadoop NextGen’
  • Hadoop MapReduce: A YARN-based system for parallel processing of large datasets
  • Hadoop Ozone: An object store for Hadoop

Based on the Powered by Apache Hadoop list, there are many well-known enterprises and academic institutions using Apache Hadoop. Users include Adobe, eBay, Facebook, Hulu, LinkedIn, and The New York Times.

Spark vs. Hadoop

There are many articles and posts available that delve into the Spark versus Hadoop debate, this post is not one of them. Although both are mature technologies, Spark, the new kid on the block, achieved version 1.0.0 in May 2014, whereas Hadoop reached version 1.0.0, earlier, in December 2011. According to Google Trends, interest in both technologies has remained relatively high over the last three years. However, interest in Spark, based on the volume of searches, has been steadily outpacing Hadoop for well over a year now. The in-memory speed of Spark over HDFS-based Hadoop is likely a big differentiator for many users with large or streaming datasets, requiring near real-time processing.

spark-to-hadoop

In this post, all examples are built to run on Spark. This is not meant to suggest Spark is necessarily superior or that Spark runs better on Dataproc than Hadoop. In fact, Dataproc’s implementation of Spark relies on Hadoop’s core YARN technology to run.

Demonstration

To show the capabilities of Cloud Dataproc, we will create both a single-node Dataproc cluster and three-node cluster, upload Java- and Python-based analytics jobs and data to Google Cloud Storage, and execute the jobs on the Spark cluster. Finally, we will enable monitoring and notifications for the Dataproc clusters and the jobs running on the clusters with Stackdriver. The post will demonstrate the use of the Google Cloud Console, as well as Google’s Cloud SDK’s command line tools, for all tasks.

In this post, we will be uploading and running individual jobs on the Dataproc Spark cluster, as opposed to using the Cloud Dataproc Workflow Templates. According to Google, Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. Workflow Templates are useful for automating your Datapoc workflows, however, automation is not the primary topic of this post.

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for Java with Spark and one for Python with PySpark. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

Cost

Of course, there is a cost associated with provisioning cloud services. However, if you manage the Google Cloud Dataproc resources prudently, the costs are negligible. Regarding pricing, according to Google, Cloud Dataproc pricing is based on the size of Cloud Dataproc clusters and the duration of time that they run. The size of a cluster is based on the aggregate number of virtual CPUs (vCPUs) across the entire cluster, including the master and worker nodes. The duration of a cluster is the length of time, measured in minutes, between cluster creation and cluster deletion.

Over the course of writing the code for this post, as well as writing the post itself, the entire cost of all the related resources was a minuscule US$7.50. The cost includes creating, running, and deleting more than a dozen Dataproc clusters and uploading and executing approximately 75-100 Spark and PySpark jobs. Given the quick creation time of a cluster, 2 minutes on average or less in this demonstration, there is no reason to leave a cluster running longer than it takes to complete your workloads.

Kaggle Datasets

To explore the features of Dataproc, we will use a publicly-available dataset from Kaggle. Kaggle is a wildly-popular open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.

For this demonstration, I chose the IBRD Statement Of Loans Data dataset, from World Bank Financial Open Data, and available on Kaggle. The International Bank for Reconstruction and Development (IBRD) loans are public and publicly guaranteed debt extended by the World Bank Group. IBRD loans are made to, or guaranteed by, countries that are members of IBRD. This dataset contains historical snapshots of the Statement of Loans including the latest available snapshots.

screen_shot_2018-12-04_at_7.02.53_pm

There are two data files available. The Statement of Loans latest available snapshots data file contains 8,713 rows of loan data (~3 MB), ideal for development and testing. The Statement of Loans historic data file contains approximately 750,000 rows of data (~265 MB). Although not exactly ‘big data’, the historic dataset is large enough to sufficiently explore Dataproc. Both IBRD files have an identical schema with 33 columns of data (gist).

In this demonstration, both the Java and Python jobs will perform the same simple analysis of the larger historic dataset. For the analysis, we will ascertain the top 25 historic IBRD borrower; we will determine their total disbursements, current obligations, and the average interest rates they were charged. This simple analysis will be performed using Spark’s SQL capabilities. The results of the analysis, a Spark DataFrame containing 25 Rows, will be saved as a CSV file.

SELECT country, country_code,
       Format_number(total_disbursement, 0) AS total_disbursement,
       Format_number(total_obligation, 0) AS total_obligation,
       Format_number(avg_interest_rate, 2) AS avg_interest_rate
FROM   (SELECT country,
               country_code,
               Sum(disbursed) AS total_disbursement,
               Sum(obligation) AS total_obligation,
               Avg(interest_rate) AS avg_interest_rate
        FROM   loans
        GROUP  BY country, country_code
        ORDER  BY total_disbursement DESC
        LIMIT  25)

Google Cloud Storage

First, we need a location to store our Spark jobs, data files, and results, which will be accessible to Dataproc. Although there are a number of choices, the most convenient location for Dataproc is a Google Cloud Storage bucket. According to Google, Cloud Storage offers the highest level of availability and performance within a single region and is ideal for compute, analytics, and ML workloads in a particular region. Cloud Storage buckets are nearly identical to Amazon Simple Storage Service (Amazon S3), their object storage service.

Using the Google Cloud Console, Google’s Web Admin UI, create a new, uniquely named Cloud Storage bucket. Since our the Dataproc clusters will eventually be created in a single regional location, we will do so for the new bucket, I chose us-east1.

screen_shot_2018-12-04_at_7.04.45_pm

We will need the new bucket’s link, to use within the Java and Python code as well from the command line with gsutil. The gsutil tool is a Python application that lets you access Cloud Storage from the command line. The bucket’s link may be found on the Browser Overview console. A bucket’s link is always in the format, gs://bucket-name.

screen_shot_2018-12-04_at_7.06.06_pm

Alternatively, we may also create the Cloud Storage bucket using gsutil with the make buckets (mb) command, as follows:

# Always best practice since features are updated frequently
gcloud components update
  
export PROJECT=your_project_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
# Make sure you are creating resources in the correct project
gcloud config set project $PROJECT
  
gsutil mb -p $PROJECT -c regional -l $REGION $BUCKET_NAME

Cloud Dataproc Cluster

Next, we will create two different Cloud Dataproc clusters for demonstration purposes. If you have not used Cloud Dataproc previously in your GCP Project, you will first need to enable the API for Cloud Dataproc.

screen_shot_2018-12-04_at_7.15.05_pm

Single Node Cluster

We will start with a single node cluster with no worker nodes, suitable for development and testing Spark and Hadoop jobs, using small datasets. Create a single-node Dataproc cluster using the Single Node Cluster mode option. Create the cluster in the same region as the new Cloud Storage bucket. This will allow the Dataproc cluster access to the bucket without additional security or IAM configuration. I used the n1-standard-1 machine type, with 1 vCPU and 3.75 GB of memory. Observe the resources assigned to Hadoop YARN for Spark job scheduling and cluster resource management.

screen_shot_2018-12-04_at_7.19.37_pm

The new cluster, consisting of a single node and no worker nodes, should be ready for use in a few minutes or less.

screen_shot_2018-12-04_at_7.38.23_pm

Note the Image version, 1.3.16-deb9. According to Google, Dataproc uses image versions to bundle operating system, big data components, and Google Cloud Platform connectors into one package that is deployed on a cluster.  This image, released in November 2018, is the latest available version at the time of this post. The image contains:

  • Apache Spark 2.3.1
  • Apache Hadoop 2.9.0
  • Apache Pig 0.17.0
  • Apache Hive 2.3.2
  • Apache Tez 0.9.0
  • Cloud Storage connector 1.9.9-hadoop2
  • Scala 2.11.8
  • Python 2.7

To avoid lots of troubleshooting, make sure your code is compatible with the image’s versions. It is important to note the image does not contain a version of Python 3. You will need to ensure your Python code is built to run with Python 2.7. Alternatively, use Dataproc’s --initialization-actions flag along with bootstrap and setup shell scripts to install Python 3 on the cluster using pip or conda. Tips for installing Python 3 on Datapoc be found on Stack Overflow and elsewhere on the Internet.

As as an alternative to the Google Cloud Console, we are able to create the cluster using a REST command. Google provides the Google Cloud Console’s equivalent REST request, as shown in the example below.

screen_shot_2018-12-04_at_7.20.07_pm

Additionally, we have the option of using the gcloud command line tool. This tool provides the primary command-line interface to Google Cloud Platform and is part of Google’s Cloud SDK, which also includes the aforementioned gsutil. Here again, Google provides the Google Cloud Console’s equivalent gcloud command. This is a great way to learn to use the command line.

screen_shot_2018-12-04_at_7.20.21_pm

Using the dataproc clusters create command, we are able to create the same cluster as shown above from the command line, as follows:

export PROJECT=your_project_name
export CLUSTER_1=your_single_node_cluster_name 
export REGION=us-east1
export ZONE=us-east1-b
export MACHINE_TYPE_SMALL=n1-standard-1
  
gcloud dataproc clusters create $CLUSTER_1 \
  --region $REGION \
  --zone $ZONE \
  --single-node \
  --master-machine-type $MACHINE_TYPE_SMALL \
  --master-boot-disk-size 500 \
  --image-version 1.3-deb9 \
  --project $PROJECT

There are a few useful commands to inspect your running Dataproc clusters. The dataproc clusters describe command, in particular, provides detailed information about all aspects of the cluster’s configuration and current state.

gcloud dataproc clusters list --region $REGION

gcloud dataproc clusters describe $CLUSTER_2 \
  --region $REGION --format json

Standard Cluster

In addition to the single node cluster, we will create a second three-node Dataproc cluster. We will compare the speed of a single-node cluster to that of a true cluster with multiple worker nodes. Create a new Dataproc cluster using the Standard Cluster mode option. Again, make sure to create the cluster in the same region as the new Storage bucket.

screen_shot_2018-12-04_at_10.15.05_pm

The second cluster contains a single master node and two worker nodes. All three nodes use the n1-standard-4 machine type, with 4 vCPU and 15 GB of memory. Although still considered a minimally-sized cluster, this cluster represents a significant increase in compute power over the first single-node cluster, which had a total of 2 vCPU, 3.75 GB of memory, and no worker nodes on which to distribute processing. Between the two workers in the second cluster, we have 8 vCPU and 30 GB of memory for computation.

screen_shot_2018-12-04_at_10.18.54_pm

Again, we have the option of using the gcloud command line tool to create the cluster:

export PROJECT=your_project_name
export CLUSTER_2=your_three_node_cluster_name 
export REGION=us-east1
export ZONE=us-east1-b
export NUM_WORKERS=2
export MACHINE_TYPE_LARGE=n1-standard-4
  
gcloud dataproc clusters create $CLUSTER_2 \
  --region $REGION \
  --zone $ZONE \
  --master-machine-type $MACHINE_TYPE_LARGE \
  --master-boot-disk-size 500 \
  --num-workers $NUM_WORKERS \
  --worker-machine-type $MACHINE_TYPE_LARGE \
  --worker-boot-disk-size 500 \
  --image-version 1.3-deb9 \
  --project $PROJECT

Cluster Creation Speed: Cloud Dataproc versus Amazon EMS?

In a series of rather unscientific tests, I found the three-node Dataproc cluster took less than two minutes on average to be created. Compare that time to a similar three-node cluster built with Amazon’s EMR service using their general purpose m4.4xlarge Amazon EC2 instance type. In a similar series of tests, I found the EMR cluster took seven minutes on average to be created. The EMR cluster took 3.5 times longer to create than the comparable Dataproc cluster. Again, although not a totally accurate comparison, since both services offer different features, it gives you a sense of the speed of Dataproc as compared to Amazon EMR.

Staging Buckets

According to Google, when you create a cluster, Cloud Dataproc creates a Cloud Storage staging bucket in your project or reuses an existing Cloud Dataproc-created bucket from a previous cluster creation request. Staging buckets are used to stage miscellaneous configuration and control files that are needed by your cluster. Below, we see the staging buckets created for the two Dataproc clusters.

screen_shot_2018-12-04_at_10.26.49_pm

Project Files

Before uploading the jobs and running them on the Cloud Dataproc clusters, we need to understand what is included in the two GitHub projects. If you recall from the Kaggle section of the post, both projects are basically the same but, written in different languages, Java and Python. The jobs they contain all perform the same basic analysis on the dataset.

Java Project

The dataproc-java-demo Java-based GitHub project contains three classes, each which are jobs to run by Spark. The InternationalLoansApp Java class is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file (gist).

On line 20, the Spark Session’s Master URL, .master("local[*]"), directs Spark to run locally with as many worker threads as logical cores on the machine. There are several options for setting the Master URL, detailed here.

On line 30, the path to the data file, and on line 84, the output path for the data file, is a local relative file path.

On lines 38-42, we do a bit of clean up on the column names, for only those columns we are interested in for the analysis. Be warned, the column names of the IBRD data are less than ideal for SQL-based analysis, containing mixed-cased characters, word spaces, and brackets.

On line 79, we call Spark DataFrame’s repartition method, dfDisbursement.repartition(1). The repartition method allows us to recombine the results of our analysis and output a single CSV file to the bucket. Ordinarily, Spark splits the data into partitions and executes computations on the partitions in parallel. Each partition’s data is written to separate CSV files when a DataFrame is written back to the bucket.

Using coalesce(1) or repartition(1) to recombine the resulting 25-Row DataFrame on a single node is okay for the sake of this demonstration, but is not practical for recombining partitions from larger DataFrames. There are more efficient and less costly ways to manage the results of computations, depending on the intended use of the resulting data.

screen_shot_2018-12-05_at_4.04.24_pm

The InternationalLoansAppDataproc class is intended to be run on the Dataproc clusters, analyzing the same smaller CSV data file. The InternationalLoansAppDataprocLarge class is also intended to be run on the Dataproc clusters, however, it analyzes the larger 750K rows of data in the IRBD historic CSV file (gist).

On line 20, note the Spark Session’s Master URL, .master(yarn), directs Spark to connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode when submitting the job. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable. Recall, the Dataproc cluster runs Spark on YARN.

Also, note on lines 30, the path to the data file, and on line 63, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv"). Cloud Dataproc clusters automatically install the Cloud Storage connector. According to Google, there are a number of benefits to choosing Cloud Storage over traditional HDFS including data persistence, reliability, and performance.

These are the only two differences between the local version of the Spark job and the version of the Spark job intended for Dataproc. To build the project’s JAR file, which you will later upload to the Cloud Storage bucket, compile the Java project using the gradle build command from the root of the project.

screen_shot_2018-12-07_at_12.57.55_pm

Python Project

The dataproc-python-demo Python-based GitHub project contains two Python scripts to be run using PySpark. The international_loans_local.py Python script is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file. It does a few different analysis with the smaller dataset. (gist).

Identical to the corresponding Java class, note on line 9, the Spark Session’s Master URL, .master("local[*]"), directs Spark to run locally with as many worker threads as logical cores on the machine.

Also identical to the corresponding Java class, note on lines 23, the path to the data file, and on line 63, the output path for the resulting data file, is a local relative file path.

screen_shot_2018-12-05_at_4.02.50_pm

The international_loans_dataproc-large.py Python script is intended to be run on the Dataproc clusters, analyzing the larger 750K rows of data in the IRBD historic CSV file (gist).

On line 9, note the Spark Session’s Master URL, .master(yarn), directs Spark to connect to a YARN cluster.

Again, note on lines 23, the path to the data file, and on line 56, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv").

These are the only two differences between the local version of the PySpark job and the version of the PySpark job intended for Dataproc. With Python, there is no pre-compilation necessary. We will upload the second script, directly.

Uploading Job Resources to Cloud Storage

In total, we need to upload four items to the new Cloud Storage bucket we created previously. The items include the two Kaggle IBRD CSV files, the compiled Java JAR file from the dataproc-java-demo project, and the Python script from the dataproc-python-demo project. Using the Google Cloud Console, upload the four files to the new Google Storage bucket, as shown below. Make sure you unzip the two Kaggle IRBD CSV data files before uploading.

screen_shot_2018-12-05_at_12.52.51_pm

Like before, we also have the option of using gsutil with the copy (cp) command to upload the four files. The cp command accepts wildcards, as shown below.

export PROJECT=your_project_name
export BUCKET_NAME=gs://your_bucket_name
  
gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME
gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME
gsutil cp international_loans_dataproc_large.py $BUCKET_NAME

If our Java or Python jobs were larger, or more complex and required multiple files to run, we could also choose to upload ZIP or other common compression formatted archives using the --archives flag.

Running Jobs on Dataproc

The easiest way to run a job on the Dataproc cluster is by submitting a job through the Dataproc Jobs UI, part of the Google Cloud Console.

screen_shot_2018-12-05_at_11.29.34_pm

Dataproc has the capability of running multiple types of jobs, including:

  • Hadoop
  • Spark
  • SparkR
  • PySpark
  • Hive
  • SparkSql
  • Pig

We will be running both Spark and PySpark jobs as part of this demonstration.

Spark Jobs

To run a Spark job using the JAR file, select Job type Spark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters in your chosen region. Run both jobs at least twice, once on both clusters, for a total of four jobs.

screen_shot_2018-12-05_at_12.57.55_pm

Lastly, you will need to input the main class and the path to the JAR file. The JAR location will be:

gs://your_bucket_name/dataprocJavaDemo-1.0-SNAPSHOT.jar

The main class for the smaller dataset will be:

org.example.dataproc.InternationalLoansAppDataproc

The main class for the larger dataset will be:

org.example.dataproc.InternationalLoansAppDataprocLarge

During or after job execution, you may view details in the Output tab of the Dataproc Jobs console.

screen_shot_2018-12-04_at_7.53.27_pm

Like every other step in this demonstration, we can also use the gcloud command line tool, instead of the web console, to submit our Spark jobs to each cluster. Here, I am submitting the larger dataset Spark job to the three-node cluster.

export CLUSTER_2=your_three_node_cluster_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
gcloud dataproc jobs submit spark \
  --region $REGION \
  --cluster $CLUSTER_2 \
  --class org.example.dataproc.InternationalLoansAppDataprocLarge \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar \
  --async

PySpark Jobs

To run a Spark job using the Python script, select Job type PySpark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters. Run the job at least twice, once on both clusters.

screen_shot_2018-12-05_at_12.53.36_pm

Lastly, you will need to input the main Python file path. There is only one Dataproc Python script, which analyzes the larger dataset. The script location will be:

gs://your_bucket_name/international_loans_dataproc_large.py

Like every other step in this demonstration, we can also use the gcloud command line tool instead of the web console to submit our PySpark jobs to each cluster. Below, I am submitting the PySpark job to the three-node cluster.

export CLUSTER_2=your_three_node_cluster_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
gcloud dataproc jobs submit pyspark \
  $BUCKET_NAME/international_loans_dataproc_large.py \
  --region $REGION \
  --cluster $CLUSTER_2 \
  --async

Including the optional --async flag with any of the dataproc jobs submit command, the job will be sent to the Dataproc cluster and immediately release the terminal back to the user. If you do not to use the --async flag, the terminal will be unavailable until the job is finished.

However, without the flag, we will get the standard output (stdout) and standard error (stderr) from Dataproc. The output includes some useful information, including different stages of the job execution lifecycle and execution times.

screen_shot_2018-12-05_at_10.38.52_pm

File Output

During development and testing, outputting results to the console is useful. However, in Production, the output from jobs is most often written to Apache Parquet, Apache Avro, CSV, JSON, or XML format files, persisted Apache Hive, SQL, or NoSQL database, or streamed to another system for post-processing, using technologies such as Apache Kafka.

Once both the Java and Python jobs have run successfully on the Dataproc cluster, you should observe the results have been saved back to the Storage bucket. Each script saves its results to a single CSV file in separate directories, as shown below.

screen_shot_2018-12-05_at_4.09.31_pm.png

The final dataset, written to the CSV file, contains the results of the analysis results (gist).

Cleaning Up

When you are finished, make sure to delete your running clusters. This may be done through the Google Cloud Console. Deletion of the three-node cluster took, on average, slightly more than one minute.

screen_shot_2018-12-04_at_11.11.40_pm

As usual, we can also use the gcloud command line tool instead of the web console to delete the Dataproc clusters.

export CLUSTER_1=your_single_node_cluster_name
export CLUSTER_2=your_three_node_cluster_name 
export REGION=us-east1
  
yes | gcloud dataproc clusters delete $CLUSTER_1 --region $REGION
yes | gcloud dataproc clusters delete $CLUSTER_2 --region $REGION

Results

Some observations, based on approximately 75 successful jobs. First, both the Python job and the Java jobs ran in nearly the same amount of time on the single-node cluster and then on the three-node cluster. This is beneficial since, although, a lot of big data analysis is performed with Python, Java is still the lingua franca of many large enterprises.

screen_shot_2018-12-05_at_1.49.01_pm

Consecutive Execution

Below are the average times for running the larger dataset on both clusters, in Java, and in Python. The jobs were all run consecutively as opposed to concurrently. The best time was 59 seconds on the three-node cluster compared to the best time of 150 seconds on the single-node cluster, a difference of 256%. Given the differences in the two clusters, this large variation is expected. The average difference between the two clusters for running the large dataset was 254%.

chart2

Concurrent Execution

It is important to understand the impact of concurrently running multiple jobs on the same Dataproc cluster. To demonstrate this, both the Java and Python jobs were also run concurrently. In one such test, ten copies of the Python job were run concurrently on the three-node cluster.

concurrent-jobs

Observe that the execution times of the concurrent jobs increase in near-linear time. The first job completes in roughly the same time as the consecutively executed jobs, shown above, but each proceeding job’s execution time increases linearly.

chart1

According to Apache, when running on a cluster, each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application. Each application is given a maximum amount of resources it can use and holds onto them for its whole duration. Note no tuning was done to the Dataproc clusters to optimize for concurrent execution.

Really Big Data?

Although there is no exact definition of ‘big data’, 750K rows of data at 265 MB is probably not generally considered big data. Likewise, the three-node cluster used in this demonstration is still pretty diminutive. Lastly, the SQL query was less than complex. To really test the abilities of Dataproc would require a multi-gigabyte or multi-terabyte-sized dataset, divided amongst multiple files, computed on a much beefier cluster with more workers nodes and more computer resources.

Monitoring

In addition to viewing the results of running and completed jobs, we have the option of enabling Google Stackdriver for monitoring and management of services, containers, applications, and infrastructure. Stackdriver offers an impressive array of services, including debugging, error reporting, monitoring, alerting, tracing, logging, and dashboards, to mention only a few Stackdriver features.

screen_shot_2018-12-05_at_3.18.31_pm

There are dozens of metrics available, which collectively, reflect the health of the Dataproc clusters. Below we see the states of one such metric, the YARN virtual cores (vcores). A YARN vcore, introduced in Hadoop 2.4, is a usage share of a host CPU.  The number of YARN virtual cores is equivalent to the number of worker nodes (2) times the number of vCPUs per node (4), for a total of eight YARN virtual cores. Below, we see that at one point in time, 5 of the 8 vcores have been allocated, with 2 more available.

screen_shot_2018-12-05_at_3.29.47_pm

Next, we see the states of the YARN memory size. YARN memory size is calculated as the number of worker nodes (2) times the amount of memory on each node (15 GB) times the fraction given to YARN (0.8), for a total of 24 GB (2 x 15 GB x 0.8). Below, we see that at one point in time, 20 GB of RAM is allocated with 4 GB available. At that instant in time, the workload does not appear to be exhausting the cluster’s memory.

screen_shot_2018-12-05_at_3.30.39_pm

Notifications

Since no one actually watches dashboards all day, waiting for something to fail, how do know when we have an issue with Dataproc? Stackdrive offers integrations with most popular notification channels, including email, SMS, Slack, PagerDuty, HipChat, and Webhooks. With Stackdriver, we define a condition which describes when a service is considered unhealthy. When triggered, Stackdriver sends a notification to one or more channels.

notifications

Below is a preview of two alert notifications in Slack. I enabled Slack as a notification channel and created an alert which is triggered each time a Dataproc job fails. Whenever a job fails, such as the two examples below, I receive a Slack notification through the Slack Channel defined in Stackdriver.

slack.png

Slack notifications contain a link, which routes you back to Stackdriver, to an incident which was opened on your behalf, due to the job failure.

incident

For convenience, the incident also includes a pre-filtered link directly to the log entries at the time of the policy violation. Stackdriver logging offers advanced filtering capabilities to quickly find log entries, as shown below.screen_shot_2018-12-09_at_12.52.51_pm

With Stackdriver, you get monitoring, logging, alerting, notification, and incident management as a service, with minimal cost and upfront configuration. Think about how much time and effort it takes the average enterprise to achieve this level of infrastructure observability on their own, most never do.

Conclusion

In this post, we have seen the ease-of-use, extensive feature-set, out-of-the-box integration ability with other cloud services, low cost, and speed of Google Cloud Dataproc, to run big data analytics workloads. Couple this with the ability of Stackdriver to provide monitoring, logging, alerting, notification, and incident management for Dataproc with minimal up-front configuration. In my opinion, based on these features, Google Cloud Dataproc leads other cloud competitors for fully-managed Spark and Hadoop Cluster management.

In future posts, we will examine the use of Cloud Dataproc Workflow Templates for process automation, the integration capabilities of Dataproc with services such as BigQuery, Bigtable, Cloud Dataflow, and Google Cloud Pub/Sub, and finally, DevOps for Big Data with Dataproc and tools like Spinnaker and Jenkins on GKE.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, nor Apache or Google.

, , , , , , , , , , , ,

Leave a comment

Integrating Search Capabilities with Actions for Google Assistant, using GKE and Elasticsearch: Part 2

Introduction

Voice and text-based conversational interfaces, such as chatbots, have recently seen tremendous growth in popularity. Much of this growth can be attributed to leading Cloud providers, such as Google, Amazon, and Microsoft, who now provide affordable, end-to-end development, machine learning-based training, and hosting platforms for conversational interfaces.

Cloud-based machine learning services greatly improve a conversational interface’s ability to interpret user intent with greater accuracy. However, the ability to return relevant responses to user inquiries, also requires interfaces have access to rich informational datastores, and the ability to quickly and efficiently query and analyze that data.

In this two-part post, we will enhance the capabilities of a voice and text-based conversational interface by integrating it with a search and analytics engine. By interfacing an Action for Google Assistant conversational interface with Elasticsearch, we will improve the Action’s ability to provide relevant results to the end-user. Instead of querying a traditional database for static responses to user intent, our Action will access a  Near Realtime (NRT) Elasticsearch index of searchable documents. The Action will leverage Elasticsearch’s advanced search and analytics capabilities to optimize and shape user responses, based on their intent.

Action Preview

Here is a brief YouTube video preview of the final Action for Google Assistant, integrated with Elasticsearch, running on an Apple iPhone.

Architecture

If you recall from part one of this post, the high-level architecture of our search engine-enhanced Action for Google Assistant resembles the following. Most of the components are running on Google Cloud.

Google Search Assistant Diagram GCP

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for the Spring Boot Service and one for the Action for Google Assistant. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Development Process

In part two of this post, we will tie everything together by creating and integrating our Action for Google Assistant:

  • Create the new Actions for Google Assistant project using the Actions on Google console;
  • Develop the Action’s Intents and Entities using the Dialogflow console;
  • Develop, deploy, and test the Cloud Function to GCP;

Let’s explore each step in more detail.

New ‘Actions on Google’ Project

With Elasticsearch running and the Spring Boot Service deployed to our GKE cluster, we can start building our Actions for Google Assistant. Using the Actions on Google web console, we first create a new Actions project.

wp-search-021

The Directory Information tab is where we define metadata about the project. This information determines how it will look in the Actions directory and is required to publish your project. The Actions directory is where users discover published Actions on the web and mobile devices.

wp-search-019

The Directory Information tab also includes sample invocations, which may be used to invoke our Actions.

wp-search-020

Actions and Intents

Our project will contain a series of related Actions. According to Google, an Action is ‘an interaction you build for the Assistant that supports a specific intent and has a corresponding fulfillment that processes the intent.’ To build our Actions, we first want to create our Intents. To do so, we will want to switch from the Actions on Google console to the Dialogflow console. Actions on Google provides a link for switching to Dialogflow in the Actions tab.

wp-search-022

We will build our Action’s Intents in Dialogflow. The term Intent, used by Dialogflow, is standard terminology across other voice-assistant platforms, such as Amazon’s Alexa and Microsoft’s Azure Bot Service and LUIS. In Dialogflow, will be building Intents — the Find Multiple Posts Intent, Find Post Intent, Find By ID Intent, and so forth.

wp-search-023

Below, we see the Find Post Intent. The Find Post Intent is responsible for handling our user’s requests for a single post about a topic, for example, ‘Find a post about Docker.’ The Intent shown below contains a fair number, but indeed not an exhaustive list, of training phrases. These represent possible ways a user might express intent when invoking the Action.

wp-search-026

Below, we see the Find Multiple Posts Intent. The Find Multiple Posts Intent is responsible for handling our user’s requests for a list of posts about a topic, for example, ‘I’m interested in Docker.’ Similar to the Find Post Intent above, the Find Multiple Posts Intent contains a list of training phrases.

wp-search-025

Dialog Model Training

According to Google, the greater the number of natural language examples in the Training Phrases section of Intents, the better the classification accuracy. Every time a user interacts with our Action, the user’s utterances are logged. Using the Training tab in the Dialogflow console, we can train our model by reviewing and approving or correcting how the Action handled the user’s utterances.

Below we see the user’s utterances, part of an interaction with the Action. We have the option to review and approve the Intent that was called to handle the utterance, re-assign it, or delete it. This helps improve our accuracy of our dialog model.

wp-search-039.png

Dialogflow Entities

Each of the highlighted words in the training phrases maps to the facts parameter, which maps to a collection of @topic Entities. Entities represent a list of intents the Action is trained to understand.  According to Google, there are three types of entities: ‘system’ (defined by Dialogflow), ‘developer’ (defined by a developer), and ‘user’ (built for each individual end-user in every request) objects. We will be creating ‘developer’ type entities for our Action’s Intents.

wp-search-037.png

Automated Expansion

We do not have to define all possible topics a user might search for, as an entity.  By enabling the Allow Automated Expansion option, an Agent will recognize values that have not been explicitly listed in the entity list. Google describes Agents as NLU (Natural Language Understanding) modules.

wp-search-042.png

Entity Synonyms

An entity may contain synonyms. Multiple synonyms are mapped to a single reference value. The reference value is the value passed to the Cloud Function by the Action. For example, take the reference value of ‘GCP.’ The user might ask Google about ‘GCP’. However, the user might also substitute the words ‘Google Cloud’ or ‘Google Cloud Platform.’ Using synonyms, if the user utters any of these three synonymous words or phrase in their intent, the reference value, ‘GCP’, is passed in the request.

But, what if the post contains the phrase, ‘Google Cloud Platform’ more frequently than, or instead of, ‘GCP’? If the acronym, ‘GCP’, is defined as the entity reference value, then it is the value passed to the function, even if you ask for ‘Google Cloud Platform’. In the use case of searching blog posts by topic, entity synonyms are not an effective search strategy.

Elasticsearch Synonyms

A better way to solve for synonyms is by using the synonyms feature of Elasticsearch. Take, for example, the topic of ‘Istio’, Istio is also considered a Service Mesh. If I ask for posts about ‘Service Mesh’, I would like to get back posts that contain the phrase ‘Service Mesh’, but also the word ‘Istio’. To accomplish this, you would define an association between ‘Istio’ and ‘Service Mesh’, as part of the Elasticsearch WordPress posts index.

wp-search-041d

Searches for ‘Istio’ against that index would return results that contain ‘Istio’ and/or contain ‘Service Mesh’; the reverse is also true. Having created and applied a custom synonyms filter to the index, we see how Elasticsearch responds to an analysis of the natural language style phrase, ‘What is a Service Mesh?’. As shown by the tokens output in Kibana’s Dev Tools Console, Elasticsearch understands that ‘service mesh’ is synonymous with ‘istio’.

wp-search-041g

If we query the same five fields as our Action, for the topic of ‘service mesh’, we get four hits for posts (indexed documents) that contain ‘service mesh’ and/or ‘istio’.

wp-search-041c

Actions on Google Integration

Another configuration item in Dialogflow that needs to be completed is the Dialogflow’s Actions on Google integration. This will integrate our Action with Google Assistant. Google currently provides more than fifteen different integrations, including Google Assistant, Slack, Facebook Messanger, Twitter, and Twilio, as shown below.

wp-search-028

To configure the Google Assistant integration, choose the Welcome Intent as our Action’s Explicit Invocation intent. Then we designate our other Intents as Implicit Invocation intents. According to Google, this Google Assistant Integration allows our Action to reach users on every device where the Google Assistant is available.

wp-search-029

Action Fulfillment

When a user’s intent is received, it is fulfilled by the Action. In the Dialogflow Fulfillment console, we see the Action has two fulfillment options, a Webhook or an inline-editable Cloud Function, edited inline. A Webhook allows us to pass information from a matched intent into a web service and get a result back from the service. Our Action’s Webhook will call our Cloud Function on GCP, using the Cloud Function’s URL endpoint (we’ll get this URL in the next section).

wp-search-030

Google Cloud Functions

Our Cloud Function, called by our Action, is written in Node.js. Our function, index.js, is divided into four sections, which are: constants and environment variables, intent handlers, helper functions, and the function’s entry point. The helper functions are part of the Helper module, contained in the helper.js file.

Constants and Environment Variables

The section, in both index.js and helper.js, defines the global constants and environment variables used within the function. Values that reference environment variables, such as SEARCH_API_HOSTNAME are defined in the .env.yaml file. All environment variables in the .env.yaml file will be set during the Cloud Function’s deployment, described later in this post. Environment variables were recently released, and are still considered beta functionality (gist).

The npm module dependencies declared in this section are defined in the dependencies section of the package.json file. Function dependencies include Actions on Google, Firebase Functions, Winston, and Request (gist).

Intent Handlers

The intent handlers in this section correspond to the intents in the Dialogflow console. Each handler responds with a SimpleResponse, BasicCard, and Suggestion Chip response types, or  Simple Response, List, and Suggestion Chip response types. These response types were covered in part one of this post. (gist).

The Welcome Intent handler handles explicit invocations of our Action. The Fallback Intent handler handles both help requests, as well as cases when Dialogflow is unable to handle the user’s request.

As described above in the Dialogflow section, the Find Post Intent handler is responsible for handling our user’s requests for a single post about a topic. For example, ‘Find a post about Docker’. To fulfill the user request, the Find Post Intent handler, calls the Helper module’s getPostByTopic function, passing the topic requested and specifying a result set size of one post with the highest relevance score higher than an arbitrary value of  1.0.

Similarly, the Find Multiple Posts Intent handler is responsible for handling our user’s requests for a list of posts about a topic; for example, ‘I’m interested in Docker’. To fulfill the user request, the Find Multiple Posts Intent handler, calls the Helper module’s getPostsByTopic function, passing the topic requested and specifying a result set size of a maximum of six posts with the highest relevance scores greater than 1.0

The Find By ID Intent handler is responsible for handling our user’s requests for a specific, unique posts ID; for example, ‘Post ID 22141’. To fulfill the user request, the Find By ID Intent handler, calls the Helper module’s getPostById function, passing the unique Post ID (gist).

Entry Point

The entry point creates a way to handle the communication with Dialogflow’s fulfillment API (gist).

Helper Functions

The helper functions are part of the Helper module, contained in the helper.js file. In addition to typical utility functions like formatting dates, there are two functions, which interface with Elasticsearch, via our Spring Boot API, getPostsByTopic and getPostById. As described above, the intent handlers call one of these functions to obtain search results from Elasticsearch.

The getPostsByTopic function handles both the Find Post Intent handler and Find Multiple Posts Intent handler, described above. The only difference in the two calls is the size of the response set, either one result or six results maximum (gist).

Both functions use the request and request-promise-native npm modules to call the Spring Boot service’s RESTful API over HTTP. However, instead of returning a callback, the request-promise-native module allows us to return a native ES6 Promise. By returning a promise, we can use async/await with our Intent handlers. Using async/await with Promises is a newer way of handling asynchronous operations in Node.js. The asynchronous programming model, using promises, is described in greater detail in my previous post, Building Serverless Actions for Google Assistant with Google Cloud Functions, Cloud Datastore, and Cloud Storage.

ThegetPostById function handles both the Find By ID Intent handler and Option Intent handler, described above. This function is similar to the getPostsByTopic function, calling a Spring Boot service’s RESTful API endpoint and passing the Post ID (gist).

Cloud Function Deployment

To deploy the Cloud Function to GCP, use the gcloud CLI with the beta version of the functions deploy command. According to Google, gcloud is a part of the Google Cloud SDK. You must download and install the SDK on your system and initialize it before you can use gcloud. Currently, Cloud Functions are only available in four regions. I have included a shell scriptdeploy-cloud-function.sh, to make this step easier. It is called using the npm run deploy function. (gist).

The creation or update of the Cloud Function can take up to two minutes. Note the output indicates the environment variables, contained in the .env.yaml file, have been deployed. The URL endpoint of the function and the function’s entry point are also both output.

wp-search-031.png

If you recall, the URL endpoint of the Cloud Function is required in the Dialogflow Fulfillment tab. The URL can be retrieved from the deployment output (shown above). The Cloud Function is now deployed and will be called by the Action when a user invokes the Action.

What is Deployed

The .gcloudignore file is created the first time you deploy a new function. Using the the .gcloudignore file, you limit the files deployed to GCP. For this post, of all the files in the project, only four files, index.js, helper.js, package.js, and the PNG file used in the Action’s responses, need to be deployed. All other project files are ear-marked in the .gcloudignore file to avoid being deployed.

wp-search-038.png

Simulation Testing and Debugging

With our Action and all its dependencies deployed and configured, we can test the Action using the Simulation console on Actions on Google. According to Google, the Action Simulation console allows us to manually test our Action by simulating a variety of Google-enabled hardware devices and their settings.

Below, in the Simulation console, we see the successful display of our Programmatic Ponderings Search Action for Google Assistant containing the expected Simple Response, List, and Suggestion Chips response types, triggered by a user’s invocation of the Action.

wp-search-035

The simulated response indicates that the Google Cloud Function was called, and it responded successfully. That also indicates the Dialogflow-based Action successfully communicated with the Cloud Function, the Cloud Function successfully communicated with the Spring Boot service instances running on Google Kubernetes Engine, and finally, the Spring Boot services successfully communicated with Elasticsearch running on Google Compute Engine.

If we had issues with the testing, the Action Simulation console also contains tabs containing the request and response objects sent to and from the Cloud Function, the audio response, a debug console, any errors, and access to the logs.

Stackdriver Logging

In the log output below, from our Cloud Function, we see our Cloud Function’s activities. These activities including information log entries, which we explicitly defined in our Cloud Function using the winston and @google-cloud/logging-winston npm modules. According to Google, the author of the module, Stackdriver Logging for Winston provides an easy to use, higher-level layer (transport) for working with Stackdriver Logging, compatible with Winston. Developing an effective logging strategy is essential to maintaining and troubleshooting your code in Development, as well as Production.

wp-search-036

Conclusion

In this two-part post, we observed how the capabilities of a voice and text-based conversational interface, such as an Action for Google Assistant, may be enhanced through integration with a search and analytics engine, such as Elasticsearch. This post barely scraped the surface of what could be achieved with such an integration. Elasticsearch, as well as other leading Lucene-based search and analytics engines, such as Apache Solr, have tremendous capabilities, which are easily integrated to machine learning-based conversational interfaces, resulting in a more powerful and a more intuitive end-user experience.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, or Google.

, , , , , , , , , , , , ,

1 Comment

Integrating Search Capabilities with Actions for Google Assistant, using GKE and Elasticsearch: Part 1

Introduction

Voice and text-based conversational interfaces, such as chatbots, have recently seen tremendous growth in popularity. Much of this growth can be attributed to leading Cloud providers, such as Google, Amazon, and Microsoft, who now provide affordable, end-to-end development, machine learning-based training, and hosting platforms for conversational interfaces.

Cloud-based machine learning services greatly improve a conversational interface’s ability to interpret user intent with greater accuracy. However, the ability to return relevant responses to user inquiries, also requires interfaces have access to rich informational datastores, and the ability to quickly and efficiently query and analyze that data.

In this two-part post, we will enhance the capabilities of a voice and text-based conversational interface by integrating it with a search and analytics engine. By interfacing an Action for Google Assistant conversational interface with Elasticsearch, we will improve the Action’s ability to provide relevant results to the end-user. Instead of querying a traditional database for static responses to user intent, our Action will access a  Near Realtime (NRT) Elasticsearch index of searchable documents. The Action will leverage Elasticsearch’s advanced search and analytics capabilities to optimize and shape user responses, based on their intent.

Action Preview

Here is a brief YouTube video preview of the final Action for Google Assistant, integrated with Elasticsearch, running on an Apple iPhone.

Google Technologies

The high-level architecture of our search engine-enhanced Action for Google Assistant will look as follows.

Google Search Assistant Diagram GCP

Here is a brief overview of the key technologies we will incorporate into our architecture.

Actions on Google

According to Google, Actions on Google is the platform for developers to extend the Google Assistant. Actions on Google is a web-based platform that provides a streamlined user-experience to create, manage, and deploy Actions. We will use the Actions on Google platform to develop our Action in this post.

Dialogflow

According to Google, Dialogflow is an enterprise-grade NLU platform that makes it easy for developers to design and integrate conversational user interfaces into mobile apps, web applications, devices, and bots. Dialogflow is powered by Google’s machine learning for Natural Language Processing (NLP).

Google Cloud Functions

Google Cloud Functions are part of Google’s event-driven, serverless compute platform, part of the Google Cloud Platform (GCP). Google Cloud Functions are analogous to Amazon’s AWS Lambda and Azure Functions. Features include automatic scaling, high availability, fault tolerance, no servers to provision, manage, patch or update, and a payment model based on the function’s execution time.

Google Kubernetes Engine

Kubernetes Engine is a managed, production-ready environment, available on GCP, for deploying containerized applications. According to Google, Kubernetes Engine is a reliable, efficient, and secure way to run Kubernetes clusters in the Cloud.

Elasticsearch

Elasticsearch is a leading, distributed, RESTful search and analytics engine. Elasticsearch is a product of Elastic, the company behind the Elastic Stack, which includes Elasticsearch, Kibana, Beats, Logstash, X-Pack, and Elastic Cloud. Elasticsearch provides a distributed, multitenant-capable, full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is similar to Apache Solr in terms of features and functionality. Both Solr and Elasticsearch is based on Apache Lucene.

Other Technologies

In addition to the major technologies highlighted above, the project also relies on the following:

  • Google Container Registry – As an alternative to Docker Hub, we will store the Spring Boot API service’s Docker Image in Google Container Registry, making deployment to GKE a breeze.
  • Google Cloud Deployment Manager – Google Cloud Deployment Manager allows users to specify all the resources needed for application in a declarative format using YAML. The Elastic Stack will be deployed with Deployment Manager.
  • Google Compute Engine – Google Compute Engine delivers scalable, high-performance virtual machines (VMs) running in Google’s data centers, on their worldwide fiber network.
  • Google Stackdriver – Stackdriver aggregates metrics, logs, and events from our Cloud-based project infrastructure, for troubleshooting.  We are also integrating Stackdriver Logging for Winston into our Cloud Function for fast application feedback.
  • Google Cloud DNS – Hosts the primary project domain and subdomains for the search engine and API. Google Cloud DNS is a scalable, reliable and managed authoritative Domain Name System (DNS) service running on the same infrastructure as Google.
  • Google VPC Network FirewallFirewall rules provide fine-grain, secure access controls to our API and search engine. We will several firewall port openings to talk to the Elastic Stack.
  • Spring Boot – Pivotal’s Spring Boot project makes it easy to create stand-alone, production-grade Spring-based Java applications, such as our Spring Boot service.
  • Spring Data Elasticsearch – Pivotal Software’s Spring Data Elasticsearch project provides easy integration to Elasticsearch from our Java-based Spring Boot service.

Demonstration

To demonstrate an Action for Google Assistant with search engine integration, we need an index of content to search. In this post, we will build an informational Action, the Programmatic Ponderings Search Action, that responds to a user’s interests in certain technical topics, by returning post suggestions from the Programmatic Ponderings blog. For this demonstration, I have indexed the last two years worth of blog posts into Elasticsearch, using the ElasticPress WordPress plugin.

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for the Spring Boot Service and one for the Action for Google Assistant. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Development Process

This post will focus on the development and integration of the Action for Google Assistant with Elasticsearch, via a Google Cloud Function, Kubernetes Engine, and the Spring Boot API service. The post is not intended to be a general how-to on developing for Actions for Google Assistant, Google Cloud Platform, Elasticsearch, or WordPress.

Building and integrating the Action will involve the following steps:

  • Design the Action’s conversation model;
  • Provision the Elastic Stack on Google Compute Engine using Deployment Manager;
  • Create an Elasticsearch index of blog posts;
  • Provision the Kubernetes cluster on GCP with GKE;
  • Develop and deploy the Spring Boot API service to Kubernetes;

Covered in Part Two of the Post:

  • Create the new Actions project using the Actions on Google;
  • Develop the Action’s Intents using the Dialogflow;
  • Develop, deploy, and test the Cloud Function to GCP;

Let’s explore each step in more detail.

Conversational Model

The conversational model design of the Programmatic Ponderings Search Action for Google Assistant will have the option to invoke the Action in two ways, with or without intent. Below on the left, we see an example of an invocation of the Action – ‘Talk to Programmatic Ponderings’. Google Assistant then responds to the user for more information (intent) – ‘What topic are you interested in reading about?’.

sample-dialog-1.png

Below on the left, we see an invocation of the Action, which includes the intent – ‘Ask Programmatic Ponderings to find a post about Kubernetes’. Google Assistant will respond directly, both verbally and visually with the most relevant post.

sample-dialog-2

When a user requests a single result, for example, ‘Find a post about Docker’, Google Assistant will include Simple ResponseBasic Card, and Suggestion Chip response types for devices with a display. This is shown in the center, above. The user may continue to ask for additional facts or choose to cancel the Action at any time.

When a user requests multiple results, for example, ‘I’m interested in Docker’, Google Assistant will include Simple ResponseList, and Suggestion Chip response types for devices with a display. An example of a List Response is shown in the center of the previous set of screengrabs, above. The user will receive up to six results in the list, with a relevance score of 1.0 or greater. The user may choose to click on any of the post results in the list, which will initiate a new search using the post’s unique ID, as shown on the right, in the first set of screengrabs, above.

The conversational model also understands a request for help and to cancel the interaction.

GCP Account and Project

The following steps assume you have an existing GCP account and you have created a project on GCP to house the Cloud Function, GKE Cluster, and Elastic Stack on Google Compute Engine. The post also assumes that you have the latest Google Cloud SDK installed on your development machine, and have authenticated your identity from the command line (gist).

Elasticsearch on GCP

There are a number of options available to host Elasticsearch. Elastic, the company behind Elasticsearch, offers the Elasticsearch Service, a fully managed, scalable, and reliable service on AWS and GCP. AWS also offers their own managed Elasticsearch Service. I found some limitations with AWS’ Elasticsearch Service, which made integration with Spring Data Elasticsearch difficult. According to AWS, the service supports HTTP but does not support TCP transport.

For this post, we will stand up the Elastic Stack on GCP using an offering from the Google Cloud Platform Marketplace. A well-known provider of packaged applications for multiple Cloud platforms, Bitnami, offers the ELK Stack (the previous name for the Elastic Stack), running on Google Compute Engine.

wp-search-004.png

GCP Marketplace Solutions are deployed using the Google Cloud Deployment Manager.  The Bitnami ELK solution is a complete stack with all the necessary software and software-defined Cloud infrastructure to securely run Elasticsearch. You select the instance’s zone(s), machine type, boot disk size, and security and networking configurations. Using that configuration, the Deployment Manager will deploy the solution and provide you with information and credentials for accessing the Elastic Stack. For this demo, we will configure a minimally-sized, single VM instance to run the Elastic Stack.

wp-search-005.png

Below we see the Bitnami ELK stack’s components being created on GCP, by the Deployment Manager.

wp-search-006.png

Indexed Content

With the Elastic Stack fully provisioned, I then configured WordPress to index the last two years of the Programmatic Pondering blog posts to Elasticsearch on GCP. If you want to follow along with this post and content to index, there is plenty of open source and public domain indexable content available on the Internet – books, movie lists, government and weather data, online catalogs of products, and so forth. Anything in a document database is directly indexable in Elasticsearch. Elastic even provides a set of index samples, available on their GitHub site.

wp-search-009

Firewall Ports for Elasticseach

The Deployment Manager opens up firewall ports 80 and 443. To index the WordPress posts, I also had to open port 9200. According to Elastic, Elasticsearch uses port 9200 for communicating with their RESTful API with JSON over HTTP. For security, I locked down this firewall opening to my WordPress server’s address as the source. (gist).

The two existing firewall rules for port opening 80 and 443 should also be locked down to your own IP address as the source. Common Elasticsearch ports are constantly scanned by Hackers, who will quickly hijack your Elasticsearch contents and hold them for ransom, in addition to deleting your indexes. Similar tactics are used on well-known and unprotected ports for many platforms, including Redis, MySQL, PostgreSQL, MongoDB, and Microsoft SQL Server.

Kibana

Once the posts are indexed, the best way to view the resulting Elasticsearch documents is through Kibana, which is included as part of the Bitnami solution. Below we see approximately thirty posts, spread out across two years.

wp-search-010.png

Each Elasticsearch document, representing an indexed WordPress blog post, contains over 125 fields of information. Fields include a unique post ID, post title, content, publish date, excerpt, author, URL, and so forth. All these fields are exposed through Elasticsearch’s API, and as we will see,  will be available to our Spring Boot service to query.

wp-search-011.png

Spring Boot Service

To ensure decoupling between the Action for Google Assistant and Elasticsearch, we will expose a RESTful search API, written in Java using Spring Boot and Spring Data Elasticsearch. The API will expose a tailored set of flexible endpoints to the Action. Google’s machine learning services will ensure our conversational model is trained to understand user intent. The API’s query algorithm and Elasticsearch’s rich Lucene-based search features will ensure the most relevant results are returned. We will host the Spring Boot service on Google Kubernetes Engine (GKE).

Will use a Spring Rest Controller to expose our RESTful web service’s resources to our Action’s Cloud Function. The current Spring Boot service contains five /elastic resource endpoints exposed by the ElasticsearchPostController class . Of those five, two endpoints will be called by our Action in this demo, the /{id} and the /dismax-search endpoints. The endpoints can be seen using the Swagger UI. Our Spring Boot service implements SpringFox, which has the option to expose the Swagger interactive API UI.

wp-search-017.png

The /{id} endpoint accepts a unique post ID as a path variable in the API call and returns a single ElasticsearchPost object wrapped in a Map object, and serialized to a  JSON payload (gist).

Below we see an example response from the Spring Boot service to an API call to the /{id} endpoint, for post ID 22141. Since we are returning a single post, based on ID, the relevance score will always be 0.0 (gist).

This controller’s /{id} endpoint relies on a method exposed by the ElasticsearchPostRepository interface. The ElasticsearchPostRepository is a Spring Data Repository , which extends ElasticsearchRepository. The repository exposes the findById() method, which returns a single instance of the type, ElasticsearchPost, from Elasticsearch (gist).

The ElasticsearchPost class is annotated as an Elasticsearch Document, similar to other Spring Data Document annotations, such as Spring Data MongoDB. The ElasticsearchPost class is instantiated to hold deserialized JSON documents stored in ElasticSeach stores indexed data (gist).

Dis Max Query

The second API endpoint called by our Action is the /dismax-search endpoint. We use this endpoint to search for a particular post topic, such as ’Docker’. This type of search, as opposed to the Spring Data Repository method used by the /{id} endpoint, requires the use of an ElasticsearchTemplate. The ElasticsearchTemplate allows us to form more complex Elasticsearch queries than is possible using an ElasticsearchRepository class. Below, the /dismax-search endpoint accepts four input request parameters in the API call, which are the topic to search for, the starting point and size of the response to return, and the minimum relevance score (gist).

The logic to create and execute the ElasticsearchTemplate is handled by the ElasticsearchService class. The ElasticsearchPostController calls the ElasticsearchService. The ElasticsearchService handles querying Elasticsearch and returning a list of ElasticsearchPost objects to the ElasticsearchPostController. The dismaxSearch method, called by the /dismax-search endpoint’s method constructs the ElasticsearchTemplate instance, used to build the request to Elasticsearch’s RESTful API (gist).

To obtain the most relevant search results, we will use Elasticsearch’s Dis Max Query combined with the Match Phrase Query. Elastic describes the Dis Max Query as:

‘a query that generates the union of documents produced by its subqueries, and that scores each document with the maximum score for that document as produced by any subquery, plus a tie breaking increment for any additional matching subqueries.

In short, the Dis Max Query allows us to query and weight (boost importance) multiple indexed fields, across all documents. The Match Phrase Query analyzes the text (our topic) and creates a phrase query out of the analyzed text.

After some experimentation, I found the valid search results were returned by applying greater weighting (boost) to the post’s title and excerpt, followed by the post’s tags and categories, and finally, the actual text of the post. I also limited results to a minimum score of 1.0. Just because a word or phrase is repeated in a post, doesn’t mean it is indicative of the post’s subject matter. Setting a minimum score attempts to help ensure the requested topic is featured more prominently in the resulting post or posts. Increasing the minimum score will decrease the number of search results, but theoretically, increase their relevance (gist).

Below we see the results of a /dismax-search API call to our service, querying for posts about the topic, ’Istio’, with a minimum score of 2.0. The search resulted in a serialized JSON payload containing three ElasticsearchPost objects (gist).

Understanding Relevance Scoring

When returning search results, such as in the example above, the top result is the one with the highest score. The highest score should denote the most relevant result to the search query. According to Elastic, in their document titled, The Theory Behind Relevance Scoring, scoring is explained this way:

‘Lucene (and thus Elasticsearch) uses the Boolean model to find matching documents, and a formula called the practical scoring function to calculate relevance. This formula borrows concepts from term frequency/inverse document frequency and the vector space model but adds more-modern features like a coordination factor, field length normalization, and term or query clause boosting.’

In order to better understand this technical explanation of relevance scoring, it is much easy to see it applied to our example. Note the first search result above, Post ID 21867, has the highest score, 5.91989. Knowing that we are searching five fields (title, excerpt, tags, categories, and content), and boosting certain fields more than others, how was this score determined? Conveniently, Spring Data Elasticsearch’s SearchRequestBuilder class exposed the setExplain method. We can see this on line 12 of the dimaxQuery method, shown above. By passing a boolean value of true to the setExplain method, we are able to see the detailed scoring algorithms used by Elasticsearch for the top result, shown above (gist).

What this detail shows us is that of the five fields searched, the term ‘Istio’ was located in four of the five fields (all except ‘categories’). Using the practical scoring function described by Elasticsearch, and taking into account our boost values, we see that the post’s ‘excerpt’ field achieved the highest score of 5.9198895 (score of 1.6739764 * boost of 3.0).

Being able to view the scoring explanation helps us tune our search results. For example, according to the details, the term ‘Istio’ appeared 100 times (termFreq=100.0) in the main body of the post (the ‘content’ field). We might ask ourselves if we are giving enough relevance to the content as opposed to other fields. We might choose to increase the boost or decrease other fields with respect to the ‘content’ field, to produce higher quality search results.

Google Kubernetes Engine

With the Elastic Stack running on Google Compute Engine, and the Spring Boot API service built, we can now provision a Kubernetes cluster to run our Spring Boot service. The service will sit between our Action’s Cloud Function and Elasticsearch. We will use Google Kubernetes Engine (GKE) to manage our Kubernete cluster on GCP. A GKE cluster is a managed group of uniform VM instances for running Kubernetes. The VMs are managed by Google Compute Engine. Google Compute Engine delivers virtual machines running in Google’s data centers, on their worldwide fiber network.

A GKE cluster can be provisioned using GCP’s Cloud Console or using the Cloud SDK, Google’s command-line interface for Google Cloud Platform products and services. I prefer using the CLI, which helps enable DevOps automation through tools like Jenkins and Travis CI (gist).

Below is the command I used to provision a minimally sized three-node GKE cluster, replete with the latest available version of Kubernetes. Although a one-node cluster is sufficient for early-stage development, testing should be done on a multi-node cluster to ensure the service will operate properly with multiple instances running behind a load-balancer (gist).

Below, we see the three n1-standard-1 instance type worker nodes, one in each of three different specific geographical locations, referred to as zones. The three zones are in the us-east1 region. Multiple instances spread across multiple zones provide single-region high-availability for our Spring Boot service. With GKE, the Master Node is fully managed by Google.

wp-search-015

Building Service Image

In order to deploy our Spring Boot service, we must first build a Docker Image and make that image available to our Kubernetes cluster. For lowest latency, I’ve chosen to build and publish the image to Google Container Registry, in addition to Docker Hub. The Spring Boot service’s Docker image is built on the latest Debian-based OpenJDK 10 Slim base image, available on Docker Hub. The Spring Boot JAR file is copied into the image (gist).

To automate the build and publish processes with tools such as Jenkins or Travis CI, we will use a simple shell script. The script builds the Spring Boot service using Gradle, then builds the Docker Image containing the Spring Boot JAR file, tags and publishes the Docker image to the image repository, and finally, redeploys the Spring Boot service container to GKE using kubectl (gist).

Below we see the latest version of our Spring Boot Docker image published to the Google Cloud Registry.

wp-search-016

Deploying the Service

To deploy the Spring Boot service’s container to GKE, we will use a Kubernetes Deployment Controller. The Deployment Controller manages the Pods and ReplicaSets. As a deployment alternative, you could choose to use CoreOS’ Operator Framework to create an Operator or use Helm to create a Helm Chart. Along with the Deployment Controller, there is a ConfigMap and a Horizontal Pod Autoscaler. The ConfigMap contains environment variables that will be available to the Spring Boot service instances running in the Kubernetes Pods. Variables include the host and port of the Elasticsearch cluster on GCP and the name of the Elasticsearch index created by WordPress. These values will override any configuration values set in the service’s application.yml Java properties file.

The Deployment Controller creates a ReplicaSet with three Pods, running the Spring Boot service, one on each worker node (gist).

To properly load-balance the three Spring Boot service Pods, we will also deploy a Kubernetes Service of the Kubernetes ServiceType, LoadBalancer. According to Kubernetes, a Kubernetes Service is an abstraction which defines a logical set of Pods and a policy by which to access them (gist).

Below, we see three instances of the Spring Boot service deployed to the GKE cluster on GCP. Each Pod, containing an instance of the Spring Boot service, is in a load-balanced pool, behind our service load balancer, and exposed on port 80.

wp-search-014

Testing the API

We can test our API and ensure it is talking to Elasticsearch, and returning expected results using the Swagger UI, shown previously, or tools like Postman, shown below.

wp-search-018.png

Communication Between GKE and Elasticsearch

Similar to port 9200, which needed to be opened for indexing content over HTTP, we also need to open firewall port 9300 between the Spring Boot service on GKE and Elasticsearch. According to Elastic, Elasticsearch Java clients talk to the Elasticsearch cluster over port 9300, using the native Elasticsearch transport protocol (TCP).

Google Search Assistant Diagram WordPress Index

Again, locking this port down to the GKE cluster as the source is critical for security (gist).

Part Two

In part one we have examined the creation of the Elastic Stack, the provisioning of the GKE cluster, and the development and deployment of the Spring Boot service to Kubernetes. In part two of this post, we will tie everything together by creating and integrating our Action for Google Assistant:

  • Create the new Actions project using the Actions on Google console;
  • Develop the Action’s Intents using the Dialogflow console;
  • Develop, deploy, and test the Cloud Function to GCP;

Google Search Assistant Diagram part 2b.png

Related Posts

If you’re interested in comparing the development of an Action for Google Assistant with that of Amazon’s Alexa and Microsoft’s LUIS-enabled chatbots, in addition to this post, I would recommend the previous three posts in this conversation interface series:

All three article’s demonstrations leverage their respective Cloud platform’s machine learning-based Natural language understanding (NLU) services. All three take advantage of their respective Cloud platform’s NoSQL database and object storage services. Lastly, all three of the article’s demonstrations are written in a common language, Node.js.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, or Google.

, , , , , , , , , , , , ,

1 Comment

Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 2

Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we’ve been exploring one possible solution to this challenge, using Apache Kafka and the model of eventual consistency. In Part One, we examined the online storefront domain, the storefront’s microservices, and the system’s state change event message flows.

Part Two

In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.

docker-system-diagram

Source code for deploying the Dockerized components of the online storefront, shown in this post, is available on GitHub. All Docker Images are available on Docker Hub. I have chosen the wurstmeister/kafka-docker version of Kafka, available on Docker Hub; it has 580+ stars and 10M+ pulls on Docker Hub. This version of Kafka works well, as long as you run it within a Docker Swarm, locally.

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Deployment Options

For simplicity, I’ve used Docker’s native Docker Swarm Mode to support the deployed online storefront. Docker requires minimal configuration as opposed to other CaaS platforms. Usually, I would recommend Minikube for local development if the final destination of the storefront were Kubernetes in Production (AKS, EKS, or GKE). Alternatively, if the final destination of the storefront were Red Hat OpenShift in Production, I would recommend Minishift for local development.

Docker Deployment

We will break up our deployment into two parts. First, we will deploy everything, except our services. We will allow Kafka, MongoDB, Eureka, and the other components to startup up fully. Afterward, we will deploy the three online storefront services. The storefront-kafka-docker project on Github contains two Docker Compose files, which are divided between the two tasks.

The middleware Docker Compose file (gist).

The services Docker Compose file (gist).

In the storefront-kafka-docker project, there is a shell script, stack_deploy_local.sh. This script will execute both Docker Compose files, in succession, with a pause in between. You may need to adjust the timing for your own system (gist).

Start by running docker swarm init. This command will initialize a Docker Swarm. Next, execute the stack deploy script, using an sh ./stack_deploy_local.sh command. The script will deploy a new Docker Stack, within the Docker Swarm. The Docker Stack will hold all storefront components, deployed as individual Docker containers. The stack is deployed within its own isolated Docker overlay networkkafka-net.

Note we are not using host-based persistent storage for this local development demo. Destroying the Docker stack or the individual Kafka, Zookeeper, or MongoDB Docker containers will result in a loss of data.

stack-deploy

Before completion, the stack deploy script runs docker stack ls command, followed by a docker stack services storefront command. You should see one stack, names storefront, with ten services. You should also see each of the ten services has 1/1 replicas running, indicated everything has started or is starting correctly, without failure. A failure would be reflected here as a service having 0/1 replicas.

docker-stack-ls

Before completion, the stack deploy script also runs docker container ls command. You should observe each of the ten running containers (‘services’ in the Docker stack), along with their instance names and ports.

docker-container-ls

There is also a shell script, stack_delete_local.sh, which will issue a docker stack rm storefront command to destroy the stack when you are done.

Using the names of the storefront’s Docker containers, you can check the start-up logs of any of the components, using the docker logs command.

docker-logs

Testing the Stack

With the storefront stack deployed, we need to confirm that all the components have started correctly and are communicating with each other. To accomplish this, I’ve written a simple Python script, refresh.py. The refresh script has multiple uses. It deletes any existing storefront service MongoDB databases. It also deletes any existing Kafka topics; I call the Kafka Manager’s API to accomplish this. We have no databases or topics since our stack was just created. However, if you are actively developing your data models, you will likely want to purge the databases and topics regularly (gist).

Next, the refresh script calls a series of RESTful HTTP endpoints, in a specific order, to create sample data. Our three storefront services each expose different endpoints. The different /sample endpoints create sample customers, orders, order fulfillment requests, and shipping notifications. The create sample data endpoints include, in order:

  1. Sample Customer: /accounts/customers/sample
  2. Sample Orders: /orders/customers/sample/orders
  3. Sample Fulfillment Requests: /orders/customers/sample/fulfill
  4. Sample Processed Order Events: /fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Events: /fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Events: /fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Events: /fulfillment/fulfillment/sample/receive

You could create data on your own, by POSTing to the exposed CRUD endpoints on each service. However, given the complex data objects required in the request payloads, it is too time-consuming for this demo.

To execute the script, use a python3 ./refresh.py command. I am using Python 3 in the demo, but the script should also work with Python 2.x, if you change shebang.

refresh-script

If everything was successful, the script returns one document from each of the three storefront service’s MongoDB database collections. A result of ‘None’ for any of the MongoDB documents usually indicates one of the earlier commands failed. Given an abnormally high response latency, due to the load of the ten running containers on my laptop, I had to increase the Zuul/Ribbon timeouts.

Observing the System

We should now have the online storefront Docker stack running, three MongoDB databases created and populated with sample documents (data), and three Kafka topics, which have messages in them. Based on the fact we saw database documents printed out with our refresh script, we know the topics were used to pass data between the message producing and message consuming services.

In most enterprise environments, a developer may not the access, nor the operational knowledge to interact with Kafka or MongoDB from within a container, on the command line. So how else can we interact with the system?

Kafka Manager

Kafka Manager gives us the ability to interact with Kafka via a convenient browser-based user interface. For this demo, the Kafka Manager UI is available on default port 9000.

kafka_manager_00

To make Kafka Manager useful, define the Kafka cluster. The Cluster Name is up to you. The Cluster Zookeeper Host should be zookeeper:2181, for our demo.

kafka_manager_01

Kafka Manager gives us useful insights into many aspects of our simple, single-broker cluster. You should observe three topics, created during the deployment of Kafka.

kafka_manager_02

Kafka Manager is an appealing alternative, as opposed to connecting with the Kafka container, with a docker exec command, to interact with Kafka. A typical use case might be deleting a topic or adding partitions to a topic. We can also see which Consumers are consuming which topics, from within Kafka Manager.

kafka_manager_03

Mongo Express

Similar to Kafka Manager, Mongo Express gives us the ability to interact with Kafka via a user interface. For this demo, the Mongo Express browser-based user interface is available on default port 8081. The initial view displays each of the existing databases. Note our three service’s databases, including accounts, orders, and fulfillment.

mongo-express-01

Drilling into an individual database, we can view each of the database’s collections. Digging in further, we can interact with individual database collection documents.

mongo-express-02

We may even edit and save the documents.

mongo-express-03

SpringFox and Swagger

Each of the storefront services also implements SpringFox, the automated JSON API documentation for API’s built with Spring. With SpringFox, each service exposes a rich Swagger UI. The Swagger UI allows us to interact with service endpoints.

Since each service exposes its own Swagger interface, we must access them through the Zuul API Gateway on port 8080. In our demo environment, the Swagger browser-based user interface is accessible at /swagger-ui.html. Below, is a fully self-documented Orders service API, as seen through the Swagger UI.

I believe there are still some incompatibilities with the latest SpringFox release and Spring Boot 2, which prevents Swagger from showing the default Spring Data REST CRUD endpoints. Currently, you only see the API  endpoints you explicitly declare in your Controller classes.

swagger-ui-1

The service’s data models (POJOs) are also exposed through the Swagger UI by default. Below we see the Orders service’s models.

swagger-ui-3

The Swagger UI allows you to drill down into the complex structure of the models, such as the CustomerOrder entity, exposing each of the entity’s nested data objects.

swagger-ui-2

Spring Cloud Netflix Eureka

This post does not cover the use of Eureka or Zuul. Eureka gives us further valuable insight into our storefront system. Eureka is our systems service registry and provides load-balancing for our services if we had multiple instances.

For this demo, the Eureka browser-based user interface is available on default port 8761. Within the Eureka user interface, we should observe the three storefront services and Zuul, the API Gateway, registered with Eureka. If we had more than one instance of each service, we would see all of them listed here.

eureka-ui

Although of limited use in a local environment, we can observe some general information about our host.

eureka-ui-02

Interacting with the Services

The three storefront services are fully functional Spring Boot / Spring Data REST / Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. Additionally, each service includes Spring Boot Actuator. Actuator exposes additional operational endpoints, allowing us to observe the running services. Again, this post is not intended to be a demonstration of Spring Boot or Spring Boot Actuator.

Using an application, such as Postman, we can interact with our service’s RESTful HTTP endpoints. Shown below, we are calling the Account service’s customers resource. The Accounts request is proxied through the Zuul API Gateway.

postman

The above Postman Storefront Collection and Postman Environment are both exported and saved with the project.

Some key endpoints to observe the entities that were created using Event-Carried State Transfer are as follows. They assume you are using localhost as a base URL.

References

Links to my GitHub projects for this post

Some additional references I found useful while authoring this post and the online storefront code:

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

 

, , , , , , ,

Leave a comment

Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1

Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge, using Apache Kafka and the model of eventual consistency.

I previously covered the topic of eventual consistency in a distributed system, using RabbitMQ, in the post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. This post is featured on Pivotal’s RabbitMQ website.

Introduction

To ground the discussion, let’s examine a common example of the online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

mid-map-final-03

Given this problem domain, we can assume we have the concept of the Customer. Further, the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of a Customer would require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record (SOR) for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program. Fulfillment may maintain a record of all the orders shipped to the customer. Security likely holds the customer’s access credentials and privacy settings.

Below, Customer data objects are shown in yellow. Orange represents logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example. mid-map-final-01

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts, or even between services within the same contexts, then we must ensure data consistency. Take, for example, a change in a customer’s address. The Accounting context is the system of record for the customer’s addresses. However, to fulfill orders, the Shipping context might also need to maintain the customer’s address. Likewise, the Marketing context, who is responsible for direct-mail advertising, also needs to be aware of the address change, and update its own customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change, without the expectation of a response. They are stating a fact, not asking a question. Interested parties can choose if, and how, to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, as defined by Martin Fowler, of ThoughtWorks, in his insightful post, What do you mean by “Event-Driven”?. A change to a piece of data can be thought of as a state change event. Coincidently, Fowler also uses a customer’s address change as an example of Event-Carried State Transfer. The Event-Carried State Transfer Pattern is also detailed by fellow ThoughtWorker and noted Architect, Graham Brooks.

Consistency Strategies

Multiple architectural approaches could be taken to solve for data consistency in a distributed system. For example, you could use a single relational database to persist all data, avoiding the distributed data model altogether. Although I would argue, using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages are persisted in Kafka, the service have the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity.

Storefront Example

In this post, our online storefront’s services will be built using Spring Boot. Thus, we will ensure the uniformity of distributed data by using a Publish/Subscribe model with the Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot service, if appropriate, that state change will trigger an event, which will be shared with other services using Kafka topics.

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from each one another, while still ensuring the data is exchanged.

order-process-flow

Given the use case of placing an order, we will examine the interactions of three services, the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other, in a decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service. Kafka Producers may also be Consumers within our domain.

kafka-data-flow-diagram

Below is a view of the online storefront, through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you the idea of where Kafka, and Zookeeper, Kafka’s cluster manager, might sit in a typical, highly-available, microservice-based, distributed, application platform.

kafka-based-systems-diagram

This post will focus on the storefront’s services, database, and messaging sub-systems.

full-system-partial-view.png

Storefront Microservices

First, we will explore the functionality of each of the three microservices. Then, we will examine how they share state change events using Kafka. Each storefront service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data RESTSpring Data MongoDBSpring for Apache KafkaSpring Cloud SleuthSpringFox, Spring Cloud Netflix Eureka, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream is not part of this post.

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

accounts-diagram

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. A Customer, represented as a BSON document in the customer.accounts database collection, looks as follows (gist).

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added, or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

accounts-events-diagram.png

Since the CustomerChangeEvent domain event object is not persisted in MongoDB, to examine its structure, we can look at its JSON message payload in Kafka. Note the differences in the data structure between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload (gist).

For simplicity, we will assume other services do not make changes to the customer’s name, contact information, or addresses. That is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

orders-diagram

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows (gist).

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. TheFulfillmentRequestEvent object only contains the information it needs to share. In our example, it shares a single Order, along with the customer’s name, contact information, and shipping address.

orders-event-diagram

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at it’s JSON message payload in Kafka. Again, note the structural differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka (gist).

Source code for the Orders service is available on GitHub.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

fulfillment-diagram

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store latest shipping event, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping addresses are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service, via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object, represented as a BSON document in the fulfillment.requests database collection, looks as follows (gist).

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

fulfillment-event-diagram

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, to examine it, we can again look at it’s JSON message payload in Kafka (gist).

Source code for the Fulfillment service is available on GitHub.

State Change Event Messaging Flows

There is three state change event messaging flows demonstrated in this post.

  1. Change to a Customer triggers an event message by the Accounts service;
  2. Order approved triggers an event message by the Orders service;
  3. Change to the status of an Order triggers an event message by the Fulfillment service;

Each of these state change event messaging flows follow the exact same architectural pattern on both the Producer and Consumer sides of the Kafka topic.

kafka-event-flow

Let’s examine each state change event messaging flow and the code behind them.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. It can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information, by way of Kafka.

kafka-topic-01

There are different methods to trigger a message to be sent to Kafka, For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity (gist).

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class (gist).

The configuration of the Sender is handled by the SenderConfig class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload (gist).

The Sender uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

kafka-events-01.png

The Orders service’s Receiver class consumes the CustomerChangeEvent messages, produced by the Accounts service (gist).

[gust]cc3c4e55bc291e5435eccdd679d03015[/gist]

The Orders service’s Receiver class is configured differently, compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching (gist).

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). That method accepts a specific object type as input, denoting the object type the message payload needs to be deserialized into. In this way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file, in each service’s resources directory, contains the Kafka configuration (gist).

 Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Kafka-Eventual-Cons Order Flow 2

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class (gist).

The configuration of the Sender class is handled by the SenderConfig class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload (gist).

The Sender class uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Since message order is not critical messages could be sent to a topic with multiple partitions if the volume of messages required it.

kafka-events-02

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. This includes the order to be fulfilled, and the customer’s contact and shipping information (gist).

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object (gist).

Fulfillment Order Status State Change

When the status of the Order in a Fulfillment entity is changed anything other than ‘Approved’, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial ‘Created’ status to the final happy path ‘Received’ status.

kafka-topic-03

The Fulfillment service exposes several endpoints through the FulfillmentController class, which are simulate a change the status of an order. They allow an order status to be changed from ‘Approved’ to ‘Processing’, to ‘Shipped’, to ‘In Transit’, and to ‘Received’. This change is applied to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates an Kafka message, containing the OrderStatusChangeEvent in the message payload. This is handled by the Fulfillment service’s Sender class.

Note in this example, these two events are not handled in an atomic transaction. Either the updating the database or the sending of the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure both these disparate actions succeed or fail as a single transaction, to ensure data consistency (gist).

The configuration of the Sender class is handled by the SenderConfig class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services (gist).

The Sender class uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages could be sent to a topic with multiple partitions if the volume of messages required it.

kafka-events-03

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message, produced by the Fulfillment service (gist).

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service needs to receive messages from more than one topic. The ReceiverConfig class deserializes all message using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching (gist).

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). That method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.

docker-environment.png

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , ,

3 Comments

Deploying Spring Boot Apps to AWS with Netflix Nebula and Spinnaker: Part 2 of 2

In Part One of this post, we examined enterprise deployment tools and introduced two of Netflix’s open-source deployment tools, the Nebula Gradle plugins, and Spinnaker. In Part Two, we will deploy a production-ready Spring Boot application, the Election microservice, to multiple Amazon EC2 instances, behind an Elastic Load Balancer (ELB). We will use a fully automated DevOps workflow. The build, test, package, bake, deploy process will be handled by the Netflix Nebula Gradle Linux Packaging Plugin, Jenkins, and Spinnaker. The high-level process will involve the following steps:

  • Configure Gradle to build a production-ready fully executable application for Unix systems (executable JAR)
  • Using deb-s3 and GPG Suite, create a secure, signed APT (Debian) repository on Amazon S3
  • Using Jenkins and the Netflix Nebula plugin, build a Debian package, containing the executable JAR and configuration files
  • Using Jenkins and deb-s3, publish the package to the S3-based APT repository
  • Using Spinnaker (HashiCorp Packer under the covers), bake an Ubuntu Amazon Machine Image (AMI), replete with the executable JAR installed from the Debian package
  • Deploy an auto-scaling set of Amazon EC2 instances from the baked AMI, behind an ELB, running the Spring Boot application using both the Red/Black and Highlander deployment strategies
  • Be able to repeat the entire automated build, test, package, bake, deploy process, triggered by a new code push to GitHub

The overall build, test, package, bake, deploy process will look as follows.

DebianPackageWorkflow12.png

DevOps Architecture

Spinnaker’s modern architecture is comprised of several independent microservices. The codebase is written in Java and Groovy. It leverages the Spring Boot framework¹. Spinnaker’s configuration, startup, updates, and rollbacks are centrally managed by Halyard. Halyard provides a single point of contact for command line interaction with Spinnaker’s microservices.

Spinnaker can be installed on most private or public infrastructure, either containerized or virtualized. Spinnaker has links to a number of Quickstart installations on their website. For this demonstration, I deployed and configured Spinnaker on Azure, starting with one of the Azure Spinnaker quick-start ARM templates. The template provisions all the necessary Azure resources. For better performance, I chose upgraded the default VM to a larger Standard D4 v3, which contains 4 vCPUs and 16 GB of memory. I would recommend at least 2 vCPUs and 8 GB of memory at a minimum for Spinnaker.

Another Azure VM, in the same virtual network as the Spinnaker VM, already hosts Jenkins, SonarQube, and Nexus Repository OSS.

From Spinnaker on Azure, Debian Packages are uploaded to the APT package repository on AWS S3. Spinnaker also bakes Amazon Machine Images (AMI) on AWS. Spinnaker provisions the AWS resources, including EC2 instances, Load Balancers, Auto Scaling Groups, Launch Configurations, and Security Groups. The only resources you need on AWS to get started with Spinnaker are a VPC and Subnets. There are some minor, yet critical prerequisites for naming your VPC and Subnets.

Other external tools include GitHub for source control and Slack for notifications. I have built and managed everything from a Mac, however, all tools are platform agnostic. The Spring Boot application was developed in JetBrains IntelliJ.

Spinnaker Architecture 2.png

Source Code

All source code for this post can be found on GitHub. The project’s README file contains a list of the Election service’s endpoints.

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

APT Repository

After setting up Spinnaker on Azure, I created an APT repository on Amazon S3, using the instructions provided by Netflix, in their Code Lab, An Introduction to Spinnaker: Hello Deployment. The setup involves creating an Amazon S3 bucket to serve as an APT (Debian) repository, creating a GPG key for signing, and using deb-s3 to manage the repository. The Code Lab also uses Aptly, a great tool, which I skipped for brevity.

spin19

GPG Key

On the Mac, I used GPG Suite to create a GPG (GNU Privacy Guard or GnuPG) automatic signing key for my APT repository. The key is required by Spinnaker to verify the Debian packages in the repository, before installation.

The Ruby Gem, deb-s3, makes management of the Debian packages easy and automatable with Jenkins. Jenkins uploads the Debian packages, using a deb-s3 command, such as the following (gist). In this post, Jenkins calls the command from the shell script, upload-deb-package.sh, which is included in the GitHub project.

The Jenkins user requires access to the signing key, to build and upload the Debian packages. I created my GPG key on my Mac, securely copied the key to my Ubuntu-based Jenkins VM, and then imported the key for the Jenkins user. You could also create your key on Ubuntu, directly. Make sure you backup your private key in a secure location!

Nebula Packaging Plugin

Next, I set up a Gradle task in my build.gradle file to build my Debian packages using the Netflix Nebula Gradle Linux Packaging Plugin. Although Debian packaging tasks could become complex for larger application installations, this task for this post is pretty simple. I used many of the best-practices suggested by Spring for Production-grade deployments. The best-practices guide recommends file location, file modes, and file user and group ownership. I create the JAR as a fully executable JAR, meaning it is started like any other executable and does not have to be started with the standard java -jar command.

In the task, shown below (gist), the JAR and the external configuration file (optional) are copied to specific locations during the deployment and symlinked, as required. I used the older SysVInit system (init.d) to enable the application to automatically starts on boot. You should probably use systemctl for your services with Ubuntu 16.04.

You can use the ar (archive) command (i.e., ar -x spring-postgresql-demo_4.5.0_all.deb), to extract and inspect the structure of a Debian package. The data.tar.gz file, displayed below in Atom, shows the final package structure.

spin47.png

Base AMI

Next, I baked a base AMI for Spinnaker to use. This base AMI is used by Spinnaker to bake (re-bake) the final AMI(s) used for provisioning the EC2 instances, containing the Spring Boot Application. The Spinnaker base AMI is built from another base AMI, the official Ubuntu 16.04 LTS image. I installed the OpenJDK 8 package on the AMI, which is required to run the Java-based Election service. Lastly and critically, I added information about the location of my S3-based APT Debian package repository to the list of configured APT data sources, and the GPG key required for package verification. This information and key will be used later by Spinnaker to bake AMIʼs, using this base AMI. The set-up script, base_ubuntu_ami_setup.sh, which is included in the GitHub project.

Jenkins

This post uses a single Jenkins CI/CD pipeline. Using a Webhook, the pipeline is automatically triggered by every git push to the GitHub project. The pipeline pulls the source code, builds the application, and performs unit-tests and static code analysis with SonarQube. If the build succeeds and the tests pass, the build artifact (JAR file) is bundled into a Debian package using the Nebula Packaging plugin, uploaded to the S3 APT repository using s3-deb, and archived locally for Spinnaker to reference. Once the pipeline is completed, on success or on failure, a Slack notification is sent. The Jenkinsfile, used for this post is available in the project on Github.

Below is a traditional Jenkins view of the CI/CD pipeline, with links to unit test reports, SonarQube results, build artifacts, and GitHub source code.

spin01

Below is the same pipeline viewed using the Jenkins Blue Ocean plugin.

spin02

It is important to perform sufficient testing before building the Debian package. You donʼt want to bake an AMI and deploy EC2 instances, at a cost, before finding out the application has bugs.

spin03

Spinnaker Setup

First, I set up a new Spinnaker Slack channel and a custom bot user. Spinnaker details the Slack set up in their Notifications and Events Guide. You can configure what type of Spinnaker events trigger Slack notifications.

spin46.png

AWS Spinnaker User

Next, I added the required Spinnaker User, Policy, and Roles to AWS. Spinnaker uses this access to query and provision infrastructure on your behalf. The Spinnaker User requires Power User level access to perform all their necessary tasks. AWS IAM set up is detailed by Spinnaker in their Cloud Providers Setup for AWS. They also describe the setup of other cloud providers. You need to be reasonably familiar with AWS IAM, including the PassRole permission to set up this part. As part of the setup, you enable AWS for Spinnaker and add your AWS account using the Halyard interface.

spin45

Spinnaker Security Groups

Next, I set up two Spinnaker Security Groups, corresponding to two AWS Security Groups, one for the load balancer and one for the Election service. The load balancer security group exposes port 80, and the Election service security group exposes port 8080.

spin36

Spinnaker Load Balancer

Next, I created a Spinnaker Load Balancer, corresponding to an Amazon Classic Load Balancer. The Load Balancer will load-balance the Election service EC2 instances. Below you see a Load Balancer, balancing a pair of active EC2 instances, the result of a Red/Black deployment.

spin37

Spinnaker can currently create both AWS Classic Load Balancers as well as Application Load Balancers (ALB).

spin25

Spinnaker Pipeline

This post uses a single, basic Spinnaker Pipeline. The pipeline bakes a new AMI from the Debian package generated by the Jenkins pipeline. After a manual approval stage, Spinnaker deploys a set of EC2 instances, behind the Load Balancer, which contains the latest version of the Election service. Spinnaker finishes the pipeline by sending a Slack notification.

spin26

Jenkins Integration

The pipeline is triggered by the successful completion of the Jenkins pipeline. This is set in the Configuration stage of the pipeline. The integration with Jenkins is managed through Spinnaker’s Igor service.

spin22.png

Bake Stage

Next, in the Bake stage, Spinnaker bakes a new AMI, containing the Debian package generated by the Jenkins pipeline. The stageʼs configuration contains the package name to reference.

spin29

The stageʼs configuration also includes a reference to which Base AMI to use, to bake the new AMIs. Here I have used the AMI ID of the base Spinnaker AMI, I created previously.

spin27

Deploy Stage

Next, the Deploy stage deploys the Election service, running on EC2 instances, provisioned from the new AMI, which was baked in the last stage. To configure the Deploy stage, you define a Spinnaker Server Group. According to Spinnaker, the Server Group identifies the deployable artifact, VM image type, the number of instances, autoscaling policies, metadata, Load Balancer, and a Security Group.

spin32

The Server Group also defines the Deployment Strategy. Below, I chose the Red/Black Deployment Strategy (also referred to as Blue/Green). This strategy will disable, not terminate the active Server Group. If the new deployment fails, we can manually or automatically perform a Rollback to the previous, currently disabled Server Group.

spin11

Letʼs Start Baking!

With set up complete, letʼs kick off a git push, trigger and complete the Jenkins pipeline, and finally trigger the Spinnaker pipeline. Below we see the pipelineʼs Bake stage has been started. Spinnakerʼs UI lets us view the Bakery Details. The Bakery, provided by Spinnakerʼs Rosco service, bakes the AMIs. Rosco uses HashiCorp Packer to bake the AMIs, using standard Packer templates.

spin04

Below we see Spinnaker (Rosco/Packer) locating the Base Spinnaker AMI we configured in the Pipelineʼs Bake stage. Next, we see Spinnaker sshʼing into a new EC2 instance with a temporary keypair and Security Group and starting the Election service Debian package installation.

spin23

Continuing, we see the latest Debian package, derived from the Jenkins pipelineʼs archive, being pulled from the S3-based APT repo. The package is verified using the GPG key and then installed. Lastly, we see a new AMI is created, containing the deployed Election service, which was initially built and packaged by Jenkins. Note the AWS Resource Tags created by Spinnaker, as shown in the Bakery output.

spin24

The base Spinnaker AMI and the AMIs baked by Spinnaker are visible in the AWS Console. Note the naming conventions used by Spinnaker for the AMIs, the Source AMI used to build the new APIs, and the addition of the Tags, which we saw being applied in the Bakery output above. The use of Tags indirectly allows full traceability from the deployed EC2 instance all the way back to the original code commit to git by the Developer.

spin48.png

Red/Black Deployments

With the new AMI baked successfully, and a required manual approval, using a Manual Judgement type pipeline stage, we can now begin a Red/Black deployment to AWS.

spin07

Using the Server Group configuration in the Deploy stage, Spinnaker deploys two EC2 instances, behind the ELB.

spin08

Below, we see the successful results of the Red/Black deployment. The single Spinnaker Cluster contains two deployed Server Groups. One group, the previously active Server Group (RED), comprised of two EC2 instances, is disabled. The ‘RED’ EC2 instances are unregistered with the load balancer but still running. The new Server Group (BLACK), also comprised of two EC2 instances, is now active and registered with the Load Balancer. Spinnaker will spread EC2 instances evenly across all Availability Zones in the US East (N. Virginia) Region.

spin38

From the AWS Console, we can observe four running instances, though only two are registered with the load-balancer.

spin34

Here we see each deployed Server Group has a different Auto Scaling Group and Launch Configuration. Note the continued use of naming conventions by Spinnaker.

spin33

 There can be only one, Highlander!

Now, in the Deploy stage of the pipeline, we will switch the Server Groupʼs Strategy to Highlander. The Highlander strategy will, as you probably guessed by the name, destroy all other Server Groups in the Cluster. This is more typically used for lower environments, like Development or Test, where you are only interested in the next version of the application for testing. The Red/Black strategy is more applicable to Production, where you want the opportunity to quickly rollback to the previous deployment, if necessary.

spin12

Following a successful deployment, below, we now see the first two Server Groups have been terminated, and a third Server Group in the Cluster is active.

spin40.png

In the AWS Console, we can confirm the four previous EC2 instances have been successfully terminated as a result of the Highlander deployment strategy, and two new instances are running.

spin39

As well, the previous Auto Scaling Groups and Launch Configurations have been deleted from AWS by Spinnaker.

spin44.png

As expected, the Classic Load Balancer only contains the two most recent EC2 instances from the last Server Group deployed.

spin41

Confirming the Deployment

Using the DNS address of the load balancer, we can hit the Election service endpoints, on either of the EC2 instances. All API endpoints are listed in the Projectʼs README file. Below, from a web browser, we see the candidates resource returning candidate information, retrieved from the Electionʼs PostgreSQL RDS database Test instance.

spin42

Similarly, from Postman, we can hit the load balancer and get back election information from the elections resource, using an HTTP GET.

spin43.png

I intentionally left out a discussion of the service’s RDS database and how configuration management was handled with Spring Profiles and Spring Cloud Config. Both topics were out of scope for this post.

Conclusion

Although this was a brief, whirlwind overview of deployment tools, it shows the power of delivery tools like Spinnaker, when seamlessly combined with other tools, like Jenkins and the Nebula plugins. Together, these tools are capable of efficiently, repeatably, and securely deploying large numbers of containerized and non-containerized applications to a variety of private, public, and hybrid cloud infrastructure.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

¹ Running Spinnaker on Compute Engine

, , , , , , , , , , , ,

1 Comment

Deploying Spring Boot Apps to AWS with Netflix Nebula and Spinnaker: Part 1 of 2

Listening to DevOps industry pundits, you might be convinced everyone is running containers in Production (or by now, serverless). Although containerization is growing at a phenomenal rate, several recent surveys¹ indicate less than 50% of enterprises are deploying containers in Production. Filter those results further with the fact, of those enterprises, only a small percentage of their total application portfolios are containerized, let alone in Production.

As a DevOps Consultant, I regularly work with corporations whose global portfolios are in the thousands of applications. Indeed, some percentage of their applications are containerized, with less running in Production. However, a majority of those applications, even those built on modern, light-weight, distributed architectures, are still being deployed to bare-metal and virtualized public cloud and private data center infrastructure, for a variety of reasons.

Enterprise Deployment

Due to the scale and complexity of application portfolios, many organizations have invested in enterprise deployment tools, either commercially available or developed in-house. The enterprise deployment tool’s primary objective is to standardize the process of securely, reliably, and repeatably packaging, publishing, and deploying both containerized and non-containerized applications to large fleets of virtual machines and bare-metal servers, across multiple, geographically dispersed data centers and cloud providers. Enterprise deployment tools are particularly common in tightly regulated and compliance-driven organizations, as well as organizations that have undertaken large amounts of M&A, resulting in vastly different application technology stacks.

Enterprise CI/CD/Release Workflow

Better-known examples of commercially available enterprise deployment tools include IBM UrbanCode Deploy (aka uDeploy), XebiaLabs XL Deploy, CA Automic Release Automation, Octopus Deploy, and Electric Cloud ElectricFlow. While commercial tools continue to gain market share³, many organizations are tightly coupled to their in-house solutions through years of use and fear of widespread process disruption, given current economic, security, compliance, and skills-gap sensitivities.

Deployment Tool Anatomy

Most Enterprise deployment tools are compatible with standard binary package types, including Debian (.deb) and Red Hat  (RPM) Package Manager (.rpm) packages for Linux, NuGet (.nupkg) packages for Windows, and Node Package Manager (.npm) and Bower for JavaScript. There are equivalent package types for other popular languages and formats, such as Go, Python, Ruby, SQL, Android, Objective-C, Swift, and Docker. Packages usually contain application metadata, a signature to ensure the integrity and/or authenticity², and a compressed payload.

Enterprise deployment tools are normally integrated with open-source packaging and publishing tools, such as Apache Maven, Apache Ivy/Ant, Gradle, NPMNuGet, BundlerPIP, and Docker.

Binary packages (and images), built with enterprise deployment tools, are typically stored in private, open-source or commercial binary (artifact) repositories, such as SpacewalkJFrog Artifactory, and Sonatype Nexus Repository. The latter two, Artifactory and Nexus, support a multitude of modern package types and repository structures, including Maven, NuGet, PyPI, NPM, Bower, Ruby Gems, CocoaPods, Puppet, Chef, and Docker.

Mature binary repositories provide many features in addition to package management, including role-based access control, vulnerability scanning, rich APIs, DevOps integration, and fault-tolerant, high-availability architectures.

Lastly, enterprise deployment tools generally rely on standard package management systems to retrieve and install cryptographically verifiable packages and images. These include YUM (Yellowdog Updater, Modified), APT (aptitude), APK (Alpine Linux), NuGet, Chocolatey, NPM, PIP, Bundler, and Docker. Packages are deployed directly to running infrastructure, or indirectly to intermediate deployable components as Amazon Machine Images (AMI), Google Compute Engine machine images, VMware machines, Docker Images, or CoreOS rkt.

Open-Source Alternative

One such enterprise with an extensive portfolio of both containerized and non-containerized applications is Netflix. To standardize their deployments to multiple types of cloud infrastructure, Netflix has developed several well-known open-source software (OSS) tools, including the Nebula Gradle plugins and Spinnaker. I discussed Spinnaker in my previous post, Managing Applications Across Multiple Kubernetes Environments with Istio, as an alternative to Jenkins for deploying container workloads to Kubernetes on Google (GKE).

As a leader in OSS, Netflix has documented their deployment process in several articles and presentations, including a post from 2016, ‘How We Build Code at Netflix.’ According to the article, the high-level process for deployment to Amazon EC2 instances involves the following steps:

  • Code is built and tested locally using Nebula
  • Changes are committed to a central git repository
  • Jenkins job executes Nebula, which builds, tests, and packages the application for deployment
  • Builds are “baked” into Amazon Machine Images (using Spinnaker)
  • Spinnaker pipelines are used to deploy and promote the code change

The Nebula plugins and Spinnaker leverage many underlying, open-source technologies, including Pivotal Spring, Java, Groovy, Gradle, Maven, Apache Commons, Redline RPM, HashiCorp Packer, Redis, HashiCorp Consul, Cassandra, and Apache Thrift.

Both the Nebula plugins and Spinnaker have been battle tested in Production by Netflix, as well as by many other industry leaders after Netflix open-sourced the tools in 2014 (Nebula) and 2015 (Spinnaker). Currently, there are approximately 20 Nebula Gradle plugins available on GitHub. Notable core-contributors in the development of Spinnaker include Google, Microsoft, Pivotal, Target, Veritas, and Oracle, to name a few. A sign of its success, Spinnaker currently has over 4,600 Stars on GitHub!

Part Two: Demonstration

In Part Two, we will deploy a production-ready Spring Boot application, the Election microservice, to multiple Amazon EC2 instances, behind an Elastic Load Balancer (ELB). We will use a fully automated DevOps workflow. The build, test, package, bake, deploy process will be handled by the Netflix Nebula Gradle Linux Packaging Plugin, Jenkins, and Spinnaker. The high-level process will involve the following steps:

  • Configure Gradle to build a production-ready fully executable application for Unix systems (executable JAR)
  • Using deb-s3 and GPG Suite, create a secure, signed APT (Debian) repository on Amazon S3
  • Using Jenkins and the Netflix Nebula plugin, build a Debian package, containing the executable JAR and configuration files
  • Using Jenkins and deb-s3, publish the package to the S3-based APT repository
  • Using Spinnaker (HashiCorp Packer under the covers), bake an Ubuntu Amazon Machine Image (AMI), replete with the executable JAR installed from the Debian package
  • Deploy an auto-scaling set of Amazon EC2 instances from the baked AMI, behind an ELB, running the Spring Boot application using both the Red/Black and Highlander deployment strategies
  • Be able to repeat the entire automated build, test, package, bake, deploy process, triggered by a new code push to GitHub

The overall build, test, package, bake, deploy process will look as follows.

DebianPackageWorkflow12

References

 

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

¹ Recent Surveys: ForresterPortworx,  Cloud Foundry Survey
² Courtesy Wikipedia – rpm
³ XebiaLabs Kicks Off 2017 with Triple-Digit Growth in Enterprise DevOps

, , , , , , , , , , , ,

1 Comment