Posts Tagged docker

Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the marketing hype, these technologies are having a significant influence on many aspects of our modern lives. Due to their popularity and potential benefits, commercial enterprises, academic institutions, and the public sector are rushing to develop hardware and software solutions to lower the barriers to entry and increase the velocity of ML and Data Scientists and Engineers.

Machine Learning and Data Science Search Results_ 5-Year Trend r2
(courtesy Google Trends and Plotly)

Many open-source software projects are also lowering the barriers to entry into these technologies. An excellent example of one such open-source project working on this challenge is Project Jupyter. Similar to Apache Zeppelin and the newly open-sourced Netflix’s Polynote, Jupyter Notebooks enables data-driven, interactive, and collaborative data analytics.

This post will demonstrate the creation of a containerized data analytics environment using Jupyter Docker Stacks. The particular environment will be suited for learning and developing applications for Apache Spark using the Python, Scala, and R programming languages. We will focus on Python and Spark, using PySpark.

Featured Technologies

pyspark_article_00b_feature

The following technologies are featured prominently in this post.

Jupyter Notebooks

According to Project Jupyter, the Jupyter Notebook, formerly known as the IPython Notebook, is an open-source web application that allows users to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleansing and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The word, Jupyter, is a loose acronym for Julia, Python, and R, but today, Jupyter supports many programming languages.

Interest in Jupyter Notebooks has grown dramatically over the last 3–5 years, fueled in part by the major Cloud providers, AWS, Google Cloud, and Azure. Amazon Sagemaker, Amazon EMR (Elastic MapReduce), Google Cloud Dataproc, Google Colab (Collaboratory), and Microsoft Azure Notebooks all have direct integrations with Jupyter notebooks for big data analytics and machine learning.

Jupyter Search Results_ 5-Year Trend
(courtesy Google Trends and Plotly)

Jupyter Docker Stacks

To enable quick and easy access to Jupyter Notebooks, Project Jupyter has created Jupyter Docker Stacks. The stacks are ready-to-run Docker images containing Jupyter applications, along with accompanying technologies. Currently, the Jupyter Docker Stacks focus on a variety of specializations, including the r-notebook, scipy-notebook, tensorflow-notebook, datascience-notebook, pyspark-notebook, and the subject of this post, the all-spark-notebook. The stacks include a wide variety of well-known packages to extend their functionality, such as scikit-learn, pandas, MatplotlibBokeh, NumPy, and Facets.

Apache Spark

According to Apache, Spark is a unified analytics engine for large-scale data processing. Starting as a research project at the UC Berkeley AMPLab in 2009, Spark was open-sourced in early 2010 and moved to the Apache Software Foundation in 2013. Reviewing the postings on any major career site will confirm that Spark is widely used by well-known modern enterprises, such as Netflix, Adobe, Capital One, Lockheed Martin, JetBlue Airways, Visa, and Databricks. At the time of this post, LinkedIn, alone, had approximately 3,500 listings for jobs that reference the use of Apache Spark, just in the United States.

With speeds up to 100 times faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine. Spark’s polyglot programming model allows users to write applications quickly in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). You can run Spark using its standalone cluster mode, Apache Hadoop YARNMesos, or Kubernetes.

PySpark

The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API and uses Py4J. According to Apache, Py4J, a bridge between Python and Java, enables Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine (JVM). Data is processed in Python and cached and shuffled in the JVM.

Docker

According to Docker, their technology gives developers and IT the freedom to build, manage, and secure business-critical applications without the fear of technology or infrastructure lock-in. For this post, I am using the current stable version of Docker Desktop Community version for macOS, as of March 2020.

Screen Shot 2020-03-07 at 9.16.03 PM

Docker Swarm

Current versions of Docker include both a Kubernetes and Swarm orchestrator for deploying and managing containers. We will choose Swarm for this demonstration. According to Docker, the cluster management and orchestration features embedded in the Docker Engine are built using swarmkit. Swarmkit is a separate project that implements Docker’s orchestration layer and is used directly within Docker.

PostgreSQL

PostgreSQL is a powerful, open-source, object-relational database system. According to their website, PostgreSQL comes with many features aimed to help developers build applications, administrators to protect data integrity and build fault-tolerant environments, and help manage data no matter how big or small the dataset.

Demonstration

In this demonstration, we will explore the capabilities of the Spark Jupyter Docker Stack to provide an effective data analytics development environment. We will explore a few everyday uses, including executing Python scripts, submitting PySpark jobs, and working with Jupyter Notebooks, and reading and writing data to and from different file formats and a database. We will be using the latest jupyter/all-spark-notebook Docker Image. This image includes Python, R, and Scala support for Apache Spark, using Apache Toree.

Architecture

As shown below, we will deploy a Docker stack to a single-node Docker swarm. The stack consists of a Jupyter All-Spark-Notebook, PostgreSQL (Alpine Linux version 12), and Adminer container. The Docker stack will have two local directories bind-mounted into the containers. Files from our GitHub project will be shared with the Jupyter application container through a bind-mounted directory. Our PostgreSQL data will also be persisted through a bind-mounted directory. This allows us to persist data external to the ephemeral containers. If the containers are restarted or recreated, the data is preserved locally.

JupyterDiagram

Source Code

All source code for this post can be found on GitHub. Use the following command to clone the project. Note this post uses the v2 branch.

git clone \
  --branch v2 --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/pyspark-setup-demo.git

Source code samples are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers.

Deploy Docker Stack

To start, create the $HOME/data/postgres directory to store PostgreSQL data files.

mkdir -p ~/data/postgres

This directory will be bind-mounted into the PostgreSQL container on line 41 of the stack.yml file, $HOME/data/postgres:/var/lib/postgresql/data. The HOME environment variable assumes you are working on Linux or macOS and is equivalent to HOMEPATH on Windows.

The Jupyter container’s working directory is set on line 15 of the stack.yml file, working_dir: /home/$USER/work. The local bind-mounted working directory is $PWD/work. This path is bind-mounted to the working directory in the Jupyter container, on line 29 of the Docker stack file, $PWD/work:/home/$USER/work. The PWD environment variable assumes you are working on Linux or macOS (CD on Windows).

By default, the user within the Jupyter container is jovyan. We will override that user with our own local host’s user account, as shown on line 21 of the Docker stack file, NB_USER: $USER. We will use the host’s USER environment variable value (equivalent to USERNAME on Windows). There are additional options for configuring the Jupyter container. Several of those options are used on lines 17–22 of the Docker stack file (gist).

# docker stack deploy -c stack.yml jupyter
# optional pgadmin container
version: "3.7"
networks:
demo-net:
services:
spark:
image: jupyter/all-spark-notebook:latest
ports:
"8888:8888/tcp"
"4040:4040/tcp"
networks:
demo-net
working_dir: /home/$USER/work
environment:
CHOWN_HOME: "yes"
GRANT_SUDO: "yes"
NB_UID: 1000
NB_GID: 100
NB_USER: $USER
NB_GROUP: staff
user: root
deploy:
replicas: 1
restart_policy:
condition: on-failure
volumes:
$PWD/work:/home/$USER/work
postgres:
image: postgres:12-alpine
environment:
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: postgres1234
POSTGRES_DB: bakery
ports:
"5432:5432/tcp"
networks:
demo-net
volumes:
$HOME/data/postgres:/var/lib/postgresql/data
deploy:
restart_policy:
condition: on-failure
adminer:
image: adminer:latest
ports:
"8080:8080/tcp"
networks:
demo-net
deploy:
restart_policy:
condition: on-failure
# pgadmin:
# image: dpage/pgadmin4:latest
# environment:
# PGADMIN_DEFAULT_EMAIL: user@domain.com
# PGADMIN_DEFAULT_PASSWORD: 5up3rS3cr3t!
# ports:
# – "8180:80/tcp"
# networks:
# – demo-net
# deploy:
# restart_policy:
# condition: on-failure

view raw
stack.yml
hosted with ❤ by GitHub

The jupyter/all-spark-notebook Docker image is large, approximately 5 GB. Depending on your Internet connection, if this is the first time you have pulled this image, the stack may take several minutes to enter a running state. Although not required, I usually pull new Docker images in advance.

docker pull jupyter/all-spark-notebook:latest
docker pull postgres:12-alpine
docker pull adminer:latest

Assuming you have a recent version of Docker installed on your local development machine and running in swarm mode, standing up the stack is as easy as running the following docker command from the root directory of the project.

docker stack deploy -c stack.yml jupyter

The Docker stack consists of a new overlay network, jupyter_demo-net, and three containers. To confirm the stack deployed successfully, run the following docker command.

docker stack ps jupyter --no-trunc

screen-shot-2019-12-04-at-10_34_40-am

To access the Jupyter Notebook application, you need to obtain the Jupyter URL and access token. The Jupyter URL and the access token are output to the Jupyter container log, which can be accessed with the following command.

docker logs $(docker ps | grep jupyter_spark | awk '{print $NF}')

You should observe log output similar to the following. Retrieve the complete URL, for example, http://127.0.0.1:8888/?token=f78cbe..., to access the Jupyter web-based user interface.

screen-shot-2019-12-04-at-10_34_52-am

From the Jupyter dashboard landing page, you should see all the files in the project’s work/ directory. Note the types of files you can create from the dashboard, including Python 3, R, and Scala (using Apache Toree or spylon-kernal) notebooks, and text. You can also open a Jupyter terminal or create a new Folder from the drop-down menu. At the time of this post (March 2020), the latest jupyter/all-spark-notebook Docker Image runs Spark 2.4.5, Scala 2.11.12, Python 3.7.6, and OpenJDK 64-Bit Server VM, Java 1.8.0 Update 242.

screen_shot_2019-12-01_at_4_40_12_pm

Bootstrap Environment

Included in the project is a bootstrap script, bootstrap_jupyter.sh. The script will install the required Python packages using pip, the Python package installer, and a requirement.txt file. The bootstrap script also installs the latest PostgreSQL driver JAR, configures Apache Log4j to reduce log verbosity when submitting Spark jobs, and installs htop. Although these tasks could also be done from a Jupyter terminal or from within a Jupyter notebook, using a bootstrap script ensures you will have a consistent work environment every time you spin up the Jupyter Docker stack. Add or remove items from the bootstrap script as necessary.

#!/bin/bash
set -ex
# update/upgrade and install htop
sudo apt-get update -y && sudo apt-get upgrade -y
sudo apt-get install htop
# install required python packages
python3 -m pip install –user –upgrade pip
python3 -m pip install -r requirements.txt –upgrade
# download latest postgres driver jar
POSTGRES_JAR="postgresql-42.2.17.jar"
if [ -f "$POSTGRES_JAR" ]; then
echo "$POSTGRES_JAR exist"
else
wget -nv "https://jdbc.postgresql.org/download/${POSTGRES_JAR}"
fi
# spark-submit logging level from INFO to WARN
sudo cp log4j.properties /usr/local/spark/conf/log4j.properties

view raw
bootstrap_jupyter.sh
hosted with ❤ by GitHub

That’s it, our new Jupyter environment is ready to start exploring.

Running Python Scripts

One of the simplest tasks we could perform in our new Jupyter environment is running Python scripts. Instead of worrying about installing and maintaining the correct versions of Python and multiple Python packages on your own development machine, we can run Python scripts from within the Jupyter container. At the time of this post update, the latest jupyter/all-spark-notebook Docker image runs Python 3.7.3 and Conda 4.7.12. Let’s start with a simple Python script, 01_simple_script.py.

#!/usr/bin/python3
import random
technologies = [
'PySpark', 'Python', 'Spark', 'Scala', 'Java', 'Project Jupyter', 'R'
]
print("Technologies: %s\n" % technologies)
technologies.sort()
print("Sorted: %s\n" % technologies)
print("I'm interested in learning about %s." % random.choice(technologies))

view raw
01_simple_script.py
hosted with ❤ by GitHub

From a Jupyter terminal window, use the following command to run the script.

python3 01_simple_script.py

You should observe the following output.

screen_shot_2019-12-01_at_4_46_38_pm

Kaggle Datasets

To explore more features of the Jupyter and PySpark, we will use a publicly available dataset from Kaggle. Kaggle is an excellent open-source resource for datasets used for big-data and ML projects. Their tagline is ‘Kaggle is the place to do data science projects’. For this demonstration, we will use the Transactions from a bakery dataset from Kaggle. The dataset is available as a single CSV-format file. A copy is also included in the project.

pyspark_article_03_kaggle

The ‘Transactions from a bakery’ dataset contains 21,294 rows with 4 columns of data. Although certainly not big data, the dataset is large enough to test out the Spark Jupyter Docker Stack functionality. The data consists of 9,531 customer transactions for 21,294 bakery items between 2016-10-30 and 2017-04-09 (gist).


Date Time Transaction Item
2016-10-30 09:58:11 1 Bread
2016-10-30 10:05:34 2 Scandinavian
2016-10-30 10:05:34 2 Scandinavian
2016-10-30 10:07:57 3 Hot chocolate
2016-10-30 10:07:57 3 Jam
2016-10-30 10:07:57 3 Cookies
2016-10-30 10:08:41 4 Muffin
2016-10-30 10:13:03 5 Coffee
2016-10-30 10:13:03 5 Pastry
2016-10-30 10:13:03 5 Bread

view raw
bakery_data.csv
hosted with ❤ by GitHub

Submitting Spark Jobs

We are not limited to Jupyter notebooks to interact with Spark. We can also submit scripts directly to Spark from the Jupyter terminal. This is typically how Spark is used in a Production for performing analysis on large datasets, often on a regular schedule, using tools such as Apache Airflow. With Spark, you are load data from one or more data sources. After performing operations and transformations on the data, the data is persisted to a datastore, such as a file or a database, or conveyed to another system for further processing.

The project includes a simple Python PySpark ETL script, 02_pyspark_job.py. The ETL script loads the original Kaggle Bakery dataset from the CSV file into memory, into a Spark DataFrame. The script then performs a simple Spark SQL query, calculating the total quantity of each type of bakery item sold, sorted in descending order. Finally, the script writes the results of the query to a new CSV file, output/items-sold.csv.

#!/usr/bin/python3
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-demo') \
.getOrCreate()
df_bakery = spark.read \
.format('csv') \
.option('header', 'true') \
.option('delimiter', ',') \
.option('inferSchema', 'true') \
.load('BreadBasket_DMS.csv')
df_sorted = df_bakery.cube('item').count() \
.filter('item NOT LIKE \'NONE\'') \
.filter('item NOT LIKE \'Adjustment\'') \
.orderBy(['count', 'item'], ascending=[False, True])
df_sorted.show(10, False)
df_sorted.coalesce(1) \
.write.format('csv') \
.option('header', 'true') \
.save('output/items-sold.csv', mode='overwrite')

Run the script directly from a Jupyter terminal using the following command.

python3 02_pyspark_job.py

An example of the output of the Spark job is shown below.
screen-shot-2019-12-03-at-9_31_38-am

Typically, you would submit the Spark job using the spark-submit command. Use a Jupyter terminal to run the following command.

$SPARK_HOME/bin/spark-submit 02_pyspark_job.py

Below, we see the output from the spark-submit command. Printing the results in the output is merely for the purposes of the demo. Typically, Spark jobs are submitted non-interactively, and the results are persisted directly to a datastore or conveyed to another system.
screen-shot-2019-12-03-at-9_32_25-am

Using the following commands, we can view the resulting CVS file, created by the spark job.

ls -alh output/items-sold.csv/
head -5 output/items-sold.csv/*.csv

An example of the files created by the spark job is shown below. We should have discovered that coffee is the most commonly sold bakery item with 5,471 sales, followed by bread with 3,325 sales.
screen-shot-2019-12-03-at-9_32_52-am

Interacting with Databases

To demonstrate the flexibility of Jupyter to work with databases, PostgreSQL is part of the Docker Stack. We can read and write data from the Jupyter container to the PostgreSQL instance, running in a separate container. To begin, we will run a SQL script, written in Python, to create our database schema and some test data in a new database table. To do so, we will use the psycopg2, the PostgreSQL database adapter package for the Python, we previously installed into our Jupyter container using the bootstrap script. The below Python script, 03_load_sql.py, will execute a set of SQL statements contained in a SQL file, bakery.sql, against the PostgreSQL container instance.

#!/usr/bin/python3
import psycopg2
# connect to database
connect_str = 'host=postgres port=5432 dbname=bakery user=postgres password=postgres1234'
conn = psycopg2.connect(connect_str)
conn.autocommit = True
cursor = conn.cursor()
# execute sql script
sql_file = open('bakery.sql', 'r')
sqlFile = sql_file.read()
sql_file.close()
sqlCommands = sqlFile.split(';')
for command in sqlCommands:
print(command)
if command.strip() != '':
cursor.execute(command)
# import data from csv file
with open('BreadBasket_DMS.csv', 'r') as f:
next(f) # Skip the header row.
cursor.copy_from(
f,
'transactions',
sep=',',
columns=('date', 'time', 'transaction', 'item')
)
conn.commit()
# confirm by selecting record
command = 'SELECT COUNT(*) FROM public.transactions;'
cursor.execute(command)
recs = cursor.fetchall()
print('Row count: %d' % recs[0])

view raw
03_load_sql.py
hosted with ❤ by GitHub

The SQL file, bakery.sql.

DROP TABLE IF EXISTS "transactions";
DROP SEQUENCE IF EXISTS transactions_id_seq;
CREATE SEQUENCE transactions_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1;
CREATE TABLE "public"."transactions"
(
"id" integer DEFAULT nextval('transactions_id_seq') NOT NULL,
"date" character varying(10) NOT NULL,
"time" character varying(8) NOT NULL,
"transaction" integer NOT NULL,
"item" character varying(50) NOT NULL
) WITH (oids = false);

view raw
bakery.sql
hosted with ❤ by GitHub

To execute the script, run the following command.

python3 03_load_sql.py

This should result in the following output, if successful.
screen-shot-2019-12-03-at-9_34_26-am

Adminer

To confirm the SQL script’s success, use Adminer. Adminer (formerly phpMinAdmin) is a full-featured database management tool written in PHP. Adminer natively recognizes PostgreSQL, MySQL, SQLite, and MongoDB, among other database engines. The current version is 4.7.6 (March 2020).

Adminer should be available on localhost port 8080. The password credentials, shown below, are located in the stack.yml file. The server name, postgres, is the name of the PostgreSQL Docker container. This is the domain name the Jupyter container will use to communicate with the PostgreSQL container.
screen_shot_2019-12-01_at_6_09_57_pm

Connecting to the new bakery database with Adminer, we should see the transactions table.
screen_shot_2019-12-01_at_6_10_20_pm

The table should contain 21,293 rows, each with 5 columns of data.

screen_shot_2019-12-01_at_6_11_32_pm

pgAdmin

Another excellent choice for interacting with PostgreSQL, in particular, is pgAdmin 4. It is my favorite tool for the administration of PostgreSQL. Although limited to PostgreSQL, the user interface and administrative capabilities of pgAdmin is superior to Adminer, in my opinion. For brevity, I chose not to include pgAdmin in this post. The Docker stack also contains a pgAdmin container, which has been commented out. To use pgAdmin, just uncomment the container and re-run the Docker stack deploy command. pgAdmin should then be available on localhost port 81. The pgAdmin login credentials are in the Docker stack file.

screen_shot_2019-12-05_at_10_11_15_amscreen_shot_2019-12-05_at_10_11_44_am

Developing Jupyter Notebooks

The real power of the Jupyter Docker Stacks containers is Jupyter Notebooks. According to the Jupyter Project, the notebook extends the console-based approach to interactive computing in a qualitatively new direction, providing a web-based application suitable for capturing the whole computation process, including developing, documenting, and executing code, as well as communicating the results. Notebook documents contain the inputs and outputs of an interactive session as well as additional text that accompanies the code but is not meant for execution.

To explore the capabilities of Jupyter notebooks, the project includes two simple Jupyter notebooks. The first notebooks, 04_notebook.ipynb, demonstrates typical PySpark functions, such as loading data from a CSV file and from the PostgreSQL database, performing basic data analysis with Spark SQL including the use of PySpark user-defined functions (UDF), graphing the data using BokehJS, and finally, saving data back to the database, as well as to the fast and efficient Apache Parquet file format. Below we see several notebook cells demonstrating these features.screen_shot_2019-12-04_at_11_05_00_pmscreen_shot_2019-12-04_at_11_05_07_pmscreen_shot_2019-12-04_at_11_05_22_pmscreen_shot_2019-12-05_at_3_54_11_pmscreen_shot_2019-12-04_at_11_14_22_pm

IDE Integration

Recall, the working directory, containing the GitHub source code for the project, is bind-mounted to the Jupyter container. Therefore, you can also edit any of the files, including notebooks, in your favorite IDE, such as JetBrains PyCharm and Microsoft Visual Studio Code. PyCharm has built-in language support for Jupyter Notebooks, as shown below.
screen_shot_2019-12-01_at_9_21_49_pm

As does Visual Studio Code using the Python extension.

screen_shot_2019-12-08_at_8_02_43_pm.png

Using Additional Packages

As mentioned in the Introduction, the Jupyter Docker Stacks come ready-to-run, with a wide variety of Python packages to extend their functionality. To demonstrate the use of these packages, the project contains a second Jupyter notebook document, 05_notebook.ipynb. This notebook uses SciPy, the well-known Python package for mathematics, science, and engineering, NumPy, the well-known Python package for scientific computing, and the Plotly Python Graphing Library. While NumPy and SciPy are included on the Jupyter Docker Image, the bootstrap script uses pip to install the required Plotly packages. Similar to Bokeh, shown in the previous notebook, we can use these libraries to create richly interactive data visualizations.

Plotly

To use Plotly from within the notebook, you will first need to sign up for a free account and obtain a username and API key. To ensure we do not accidentally save sensitive Plotly credentials in the notebook, we are using python-dotenv. This Python package reads key/value pairs from a .env file, making them available as environment variables to our notebook. Modify and run the following two commands from a Jupyter terminal to create the .env file and set you Plotly username and API key credentials. Note that the .env file is part of the .gitignore file and will not be committed to back to git, potentially compromising the credentials.

echo "PLOTLY_USERNAME=your-username" >> .env
echo "PLOTLY_API_KEY=your-api-key" >> .env

The notebook expects to find the two environment variables, which it uses to authenticate with Plotly.

screen_shot_2019-12-04_at_11_20_06_pm

Shown below, we use Plotly to construct a bar chart of daily bakery items sold. The chart uses SciPy and NumPy to construct a linear fit (regression) and plot a line of best fit for the bakery data and overlaying the vertical bars. The chart also uses SciPy’s Savitzky-Golay Filter to plot the second line, illustrating a smoothing of our bakery data.

screen_shot_2019-12-05_at_11_14_27_am

Plotly also provides Chart Studio Online Chart Maker. Plotly describes Chart Studio as the world’s most modern enterprise data visualization solutions. We can enhance, stylize, and share our data visualizations using the free version of Chart Studio Cloud.

screen_shot_2019-12-05_at_11_15_55_am

Jupyter Notebook Viewer

Notebooks can also be viewed using nbviewer, an open-source project under Project Jupyter. Thanks to Rackspace hosting, the nbviewer instance is a free service.

screen_shot_2019-12-05_at_11_28_01_am.png

Using nbviewer, below, we see the output of a cell within the 04_notebook.ipynb notebook. View this notebook, here, using nbviewer.

screen_shot_2019-12-04_at_11_39_28_pm

Monitoring Spark Jobs

The Jupyter Docker container exposes Spark’s monitoring and instrumentation web user interface. We can review each completed Spark Job in detail.
screen-shot-2019-12-04-at-11_56_19-pm

We can review details of each stage of the Spark job, including a visualization of the DAG (Directed Acyclic Graph), which Spark constructs as part of the job execution plan, using the DAG Scheduler.
screen-shot-2019-12-04-at-11_57_18-pm

We can also review the task composition and timing of each event occurring as part of the stages of the Spark job.
screen-shot-2019-12-04-at-11_57_48-pm

We can also use the Spark interface to review and confirm the runtime environment configuration, including versions of Java, Scala, and Spark, as well as packages available on the Java classpath.
screen-shot-2019-12-04-at-11_58_02-pm

Local Spark Performance

Running Spark on a single node within the Jupyter Docker container on your local development system is not a substitute for a true Spark cluster, Production-grade, multi-node Spark clusters running on bare metal or robust virtualized hardware, and managed with Apache Hadoop YARN, Apache Mesos, or Kubernetes. In my opinion, you should only adjust your Docker resources limits to support an acceptable level of Spark performance for running small exploratory workloads. You will not realistically replace the need to process big data and execute jobs requiring complex calculations on a Production-grade, multi-node Spark cluster.
screen_shot_2019-12-03_at_9_18_36_am

We can use the following docker stats command to examine the container’s CPU and memory metrics.

docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"

Below, we see the stats from the Docker stack’s three containers showing little or no activity.
Screen Shot 2019-12-05 at 12.16.14 AM

Compare those stats with the ones shown below, recorded while a notebook was reading and writing data, and executing Spark SQL queries. The CPU and memory output show spikes, but both appear to be within acceptable ranges.
Screen Shot 2019-12-05 at 12.13.49 AM

Linux Process Monitors

Another option to examine container performance metrics is top, which is pre-installed in our Jupyter container. For example, execute the following top command from a Jupyter terminal, sorting processes by CPU usage.

top -o %CPU

We should observe the individual performance of each process running in the Jupyter container.

screen_shot_2019-12-05_at_12_25_51_pm.png

A step up from top is htop, an interactive process viewer for Unix. It was installed in the container by the bootstrap script. For example, we can execute the htop command from a Jupyter terminal, sorting processes by CPU % usage.

htop --sort-key PERCENT_CPU

With htop, observe the individual CPU activity. Here, the four CPUs at the top left of the htop window are the CPUs assigned to Docker. We get insight into the way Spark is using multiple CPUs, as well as other performance metrics, such as memory and swap.

screen_shot_2019-12-02_at_12_08_18_pm.png

Assuming your development machine is robust, it is easy to allocate and deallocate additional compute resources to Docker if required. Be careful not to allocate excessive resources to Docker, starving your host machine of available compute resources for other applications.
screen_shot_2019-12-03_at_9_18_50_am

Notebook Extensions

There are many ways to extend the Jupyter Docker Stacks. A popular option is jupyter-contrib-nbextensions. According to their website, the jupyter-contrib-nbextensions package contains a collection of community-contributed unofficial extensions that add functionality to the Jupyter notebook. These extensions are mostly written in JavaScript and will be loaded locally in your browser. Installed notebook extensions can be enabled, either by using built-in Jupyter commands, or more conveniently by using the jupyter_nbextensions_configurator server extension.

The project contains an alternate Docker stack file, stack-nbext.yml. The stack uses an alternative Docker image, garystafford/all-spark-notebook-nbext:latest, This Dockerfile, which builds it, uses the jupyter/all-spark-notebook:latest image as a base image. The alternate image adds in the jupyter-contrib-nbextensions and jupyter_nbextensions_configurator packages. Since Jupyter would need to be restarted after nbextensions is deployed, it cannot be done from within a running jupyter/all-spark-notebook container.

FROM jupyter/all-spark-notebook:latest
USER root
RUN pip install jupyter_contrib_nbextensions \
&& jupyter contrib nbextension install –system \
&& pip install jupyter_nbextensions_configurator \
&& jupyter nbextensions_configurator enable –system \
&& pip install yapf # for code pretty
USER $NB_UID

view raw
Dockerfile
hosted with ❤ by GitHub

Using this alternate stack, below in our Jupyter container, we see the sizable list of extensions available. Some of my favorite extensions include ‘spellchecker’, ‘Codefolding’, ‘Code pretty’, ‘ExecutionTime’, ‘Table of Contents’, and ‘Toggle all line numbers’.

screen_shot_2019-12-05_at_7_53_17_pm.png

Below, we see five new extension icons that have been added to the menu bar of 04_notebook.ipynb. You can also observe the extensions have been applied to the notebook, including the table of contents, code-folding, execution time, and line numbering. The spellchecking and pretty code extensions were both also applied.

screen_shot_2019-12-05_at_7_47_12_pm

Conclusion

In this brief post, we have seen how easy it is to get started learning and performing data analytics using Jupyter notebooks, Python, Spark, and PySpark, all thanks to the Jupyter Docker Stacks. We could use this same stack to learn and perform machine learning using Scala and R. Extending the stack’s capabilities is as simple as swapping out this Jupyter image for another, with a different set of tools, as well as adding additional containers to the stack, such as MySQL, MongoDB, RabbitMQ, Apache Kafka, and Apache Cassandra.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients.

, , , , , , , ,

1 Comment

Getting Started with PySpark for Big Data Analytics using Jupyter Notebooks and Jupyter Docker Stacks

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives. Due to their popularity and potential benefits, academic institutions and commercial enterprises are rushing to train large numbers of Data Scientists and ML and AI Engineers.

google_terms2

Learning popular programming paradigms, such as Python, Scala, R, Apache Hadoop, Apache Spark, and Apache Kafka, requires the use of multiple complex technologies. Installing, configuring, and managing these technologies often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. These barriers may prove a deterrent to Students, Mathematicians, Statisticians, and Data Scientists.

google_terms3

Driven by the explosive growth of these technologies and the need to train individuals, many commercial enterprises are lowering the barriers to entry, making it easier to get started. The three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data-, AI- and ML-as-a-Service offerings.

Similarly, many open-source projects are also lowering the barriers to entry into these technologies. An excellent example of an open-source project working on this challenge is Project Jupyter. Similar to the Spark Notebook and Apache Zeppelin projects, Jupyter Notebooks enables data-driven, interactive, and collaborative data analytics with Julia, Scala, Python, R, and SQL.

This post will demonstrate the creation of a containerized development environment, using Jupyter Docker Stacks. The environment will be suited for learning and developing applications for Apache Spark, using the Python, Scala, and R programming languages. This post is not intended to be a tutorial on Spark, PySpark, or Jupyter Notebooks.

Featured Technologies

The following technologies are featured prominently in this post.

pyspark_article_00b_feature

Jupyter Notebooks

According to Project Jupyter, the Jupyter Notebook, formerly known as the IPython Notebook, is an open-source web application that allows users to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The word, Jupyter, is a loose acronym for Julia, Python, and R, but today, the Jupyter supports many programming languages. Interest in Jupyter Notebooks has grown dramatically.

google_terms4

Jupyter Docker Stacks

To enable quick and easy access to Jupyter Notebooks, Project Jupyter has created Jupyter Docker Stacks. The stacks are ready-to-run Docker images containing Jupyter applications, along with accompanying technologies. Currently, eight different Jupyter Docker Stacks focus on a particular area of practice. They include SciPy (Python-based mathematics, science, and engineering), TensorFlow, R Project for statistical computing, Data Science with Julia, and the main subject of this post, PySpark. The stacks also include a rich variety of well-known packages to extend their functionality, such as scikit-learn, pandas, MatplotlibBokeh, ipywidgets (interactive HTML widgets), and Facets.

Apache Spark

According to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.

Spark’s polyglot programming model allows users to write applications quickly in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). You can run Spark using its standalone cluster mode, on Amazon EC2Apache Hadoop YARNMesos, or Kubernetes.

PySpark

The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.

Docker

According to Docker, their technology developers and IT the freedom to build, manage and secure business-critical applications without the fear of technology or infrastructure lock-in. Although Kubernetes is now the leading open-source container orchestration platform, Docker is still the predominant underlying container engine technology. For this post, I am using Docker Desktop Community version for macOS.

screen_shot_2019-06-09_at_7_41_12_am.png

Docker Swarm

Current versions of Docker include both a Kubernetes and Swarm orchestrator for deploying and managing containers. We will choose Swarm for this demonstration. According to Docker, Swarm is the cluster management and orchestration features embedded in the Docker Engine are built using swarmkit. Swarmkit is a separate project which implements Docker’s orchestration layer and is used directly within Docker.

PostgreSQL

PostgreSQL is a powerful, open-source object-relational database system. According to their website, PostgreSQL comes with many features aimed to help developers build applications, administrators to protect data integrity and build fault-tolerant environments, and help manage data no matter how big or small the dataset.

Demonstration

To show the capabilities of the Jupyter development environment, I will demonstrate a few typical use cases, such as executing Python scripts, submitting PySpark jobs, working with Jupyter Notebooks, and reading and writing data to and from different format files and to a database. We will be using the jupyter/all-spark-notebook:latest Docker Image. This image includes Python, R, and Scala support for Apache Spark, using Apache Toree.

Architecture

As shown below, we will stand-up a Docker stack, consisting of Jupyter All-Spark-Notebook, PostgreSQL 10.5, and Adminer containers. The Docker stack will have local directories bind-mounted into the containers. Files from our GitHub project will be shared with the Jupyter application container through a bind-mounted directory. Our PostgreSQL data will also be persisted through a bind-mounted directory. This allows us to persist data external to the ephemeral containers.

PySparkDocker.png

Source Code

All open-sourced code for this post can be found on GitHub. Use the following command to clone the project. The post and project code was last updated on 9/28/2019.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/pyspark-setup-demo.git

Source code samples are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers.

Deploy Docker Stack

To start, create the $HOME/data/postgres directory to store PostgreSQL data files. This directory will be bind-mounted into the PostgreSQL container on line 36 of the stack.yml file, $HOME/data/postgres:/var/lib/postgresql/data. The HOME environment variable assumes you are working on Linux or macOS and is equivalent to HOMEPATH on Windows.

The Jupyter container’s working directory is set on line 10 of the stack.yml file, working_dir:/home/$USER/workThe local bind-mounted working directory is $PWD/work. This path is bind-mounted to the working directory in the Jupyter container, on line 24 of the stack.yml file, $PWD/work:/home/$USER/work. The PWD environment variable assumes you are working on Linux or macOS (CD on Windows).

By default, the user within the Jupyter container is jovyan. Optionally, I have chosen to override that user with my own local host’s user account, as shown on line 16 of the stack.yml file, NB_USER: $USER. I have used the macOS host’s USER environment variable value (equivalent to USERNAME on Windows). There are many options for configuring the Jupyter container, detailed here. Several of those options are shown on lines 12-18 of the stack.yml file (gist).

version: "3.7"
services:
pyspark:
image: jupyter/all-spark-notebook:latest
ports:
"8888:8888/tcp"
"4040:4040/tcp"
networks:
pyspark-net
working_dir: /home/$USER/work
environment:
CHOWN_HOME: "yes"
GRANT_SUDO: "yes"
NB_UID: 1000
NB_GID: 100
NB_USER: $USER
NB_GROUP: staff
user: root
deploy:
replicas: 1
restart_policy:
condition: on-failure
volumes:
$PWD/work:/home/$USER/work
postgres:
image: postgres:11.3
environment:
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: postgres1234
POSTGRES_DB: demo
ports:
"5432:5432/tcp"
networks:
pyspark-net
volumes:
$HOME/data/postgres:/var/lib/postgresql/data
deploy:
restart_policy:
condition: on-failure
adminer:
image: adminer:latest
ports:
"8080:8080/tcp"
networks:
pyspark-net
deploy:
restart_policy:
condition: on-failure
networks:
pyspark-net:

view raw
stack.yml
hosted with ❤ by GitHub

Assuming you have a recent version of Docker installed on your local development machine, and running in swarm mode, standing up the stack is as easy as running the following command from the root directory of the project:

docker stack deploy -c stack.yml pyspark

The Docker stack consists of a new overlay network, pyspark-net, and three containers. To confirm the stack deployed, you can run the following command:

docker stack ps pyspark --no-trunc

pyspark_article_01_stack_deploy

Note the jupyter/all-spark-notebook container is quite large. Depending on your Internet connection, if this is the first time you have pulled this Docker image, the stack may take several minutes to enter a running state.

To access the Jupyter Notebook application, you need to obtain the Jupyter URL and access token (read more here). This information is output in the Jupyter container log, which can be accessed with the following command:

docker logs $(docker ps | grep pyspark_pyspark | awk '{print $NF}')

pyspark_article_02_pyspark_logs

Using the URL and token shown in the log output, you will be able to access the Jupyter web-based user interface on localhost port 8888. Once there, from the Jupyter dashboard landing page, you should see all the files in the project’s work/ directory.

Also shown below, note the types of files you are able to create from the dashboard, including Python 3, R, Scala (using Apache Toree or spylon-kernal), and text. You can also open a Jupyter Terminal or create a new Folder.

pyspark_article_27_browser.png

Running Python Scripts

Instead of worrying about installing and maintaining the latest version of Python and packages on your own development machine, we can run our Python scripts from the Jupyter container. At the time of this post update, the latest jupyter/all-spark-notebook Docker Image runs Python 3.7.3 and Conda 4.6.14. Let’s start with a simple example of the Jupyter container’s capabilities by running a Python script. I have included a sample Python script, 01_simple_script.py.

#!/usr/bin/python
import random
technologies = ['PySpark', 'Python', 'Spark', 'Scala', 'JVM',
'Project Jupyter', 'PostgreSQL']
print("Technologies: %s" % technologies)
technologies.sort()
print("Sorted: %s" % technologies)
print("I'm interested in learning %s." % random.choice(technologies))

view raw
01_simple_script.py
hosted with ❤ by GitHub

Run the script from within the Jupyter container, from a Jupyter Terminal window:

python ./01_simple_script.py

You should observe the following output.
pyspark_article_08_simple_script

Kaggle Datasets

To explore the features of the Jupyter Notebook container and PySpark, we will use a publicly available dataset from Kaggle. Kaggle is a fantastic open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.

For this demonstration, I chose the ‘Transactions from a Bakery’ dataset from Kaggle.

pyspark_article_03_kaggle

The dataset contains 21,294 rows, each with four columns of data. Although certainly nowhere near ‘big data’, the dataset is large enough to test out the Jupyter container functionality (gist).


Date Time Transaction Item
2016-10-30 09:58:11 1 Bread
2016-10-30 10:05:34 2 Scandinavian
2016-10-30 10:05:34 2 Scandinavian
2016-10-30 10:07:57 3 Hot chocolate
2016-10-30 10:07:57 3 Jam
2016-10-30 10:07:57 3 Cookies
2016-10-30 10:08:41 4 Muffin
2016-10-30 10:13:03 5 Coffee
2016-10-30 10:13:03 5 Pastry
2016-10-30 10:13:03 5 Bread

view raw
bakery_data.csv
hosted with ❤ by GitHub

Submitting Spark Jobs

We are not limited to Jupyter Notebooks to interact with Spark, we can also submit scripts directly to Spark from a Jupyter Terminal, or from our IDE. I have included a simple Python script, 02_bakery_dataframes.py. The script loads the Kaggle Bakery dataset from the CSV file into a Spark DataFrame. The script then prints out the top ten rows of data, along with a count of the total number of rows in the DataFrame.

#!/usr/bin/python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession \
.builder \
.getOrCreate()
sc = spark.sparkContext
bakery_schema = StructType([
StructField('date', StringType(), True),
StructField('time', StringType(), True),
StructField('transaction', IntegerType(), True),
StructField('item', StringType(), True)
])
df3 = spark.read \
.format('csv') \
.option('header', 'true') \
.load('BreadBasket_DMS.csv', schema=bakery_schema)
df3.show(10)
df3.count()

Run the script directly from a Jupyter Terminal window:

python ./02_bakery_dataframes.py

An example of the output of the Spark job is shown below. At the time of this post update (6/7/2019), the latest jupyter/all-spark-notebook Docker Image runs Spark 2.4.3, Scala 2.11.12, and Java 1.8.0_191 using the OpenJDK.
pyspark_article_09_simple_spark

More typically, you would submit the Spark job, using the spark-submit command. Use a Jupyter Terminal window to run the following command:

$SPARK_HOME/bin/spark-submit 02_bakery_dataframes.py

Below, we see the beginning of the output from Spark, using the spark-submit command.
pyspark_article_09B1_spark_submit

Below, we see the scheduled tasks executing and the output of the print statement, displaying the top 10 rows of bakery data.

Interacting with Databases

Often with Spark, you are loading data from one or more data sources (input). After performing operations and transformations on the data, the data is persisted or conveyed to another system for further processing (output).

To demonstrate the flexibility of the Jupyter Docker Stacks to work with databases, I have added PostgreSQL to the Docker Stack. We can read and write data from the Jupyter container to the PostgreSQL instance, running in a separate container.

To begin, we will run a SQL script, written in Python, to create our database schema and some test data in a new database table. To do so, we will need to install the psycopg2 package into our Jupyter container. You can use the docker exec command from your terminal. Alternatively, as a superuser, your user has administrative access to install Python packages within the Jupyter container using the Jupyter Terminal window. Both pip and conda are available to install packages, see details here.

Run the following command to install psycopg2:

# using pip
docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  pip install psycopg2-binary

These packages give Python the ability to interact with PostgreSQL. The included Python script, 03_load_sql.py, will execute a set of SQL statements, contained in a SQL file, bakery_sample.sql, against the PostgreSQL container instance.

#!/usr/bin/python
import psycopg2
# source: https://stackoverflow.com/questions/45805871/python3-psycopg2-execute-sql-file
connect_str = 'host=postgres port=5432 dbname=demo user=postgres password=postgres1234'
conn = psycopg2.connect(connect_str)
conn.autocommit = True
cursor = conn.cursor()
sql_file = open('bakery_sample.sql', 'r')
sqlFile = sql_file.read()
sql_file.close()
sqlCommands = sqlFile.split(';')
for command in sqlCommands:
print(command)
if command.strip() != '':
cursor.execute(command)

view raw
03_load_sql.py
hosted with ❤ by GitHub

To execute the script, run the following command:

python ./03_load_sql.py

This should result in the following output, if successful.
pyspark_article_10_run_sql_py

To confirm the SQL script’s success, I have included Adminer. Adminer (formerly phpMinAdmin) is a full-featured database management tool written in PHP. Adminer natively recognizes PostgreSQL, MySQL, SQLite, and MongoDB, among other database engines.

Adminer should be available on localhost port 8080. The password credentials, shown below, are available in the stack.yml file. The server name, postgres, is the name of the PostgreSQL container. This is the domain name the Jupyter container will use to communicate with the PostgreSQL container.
pyspark_article_06_adminer_login

Connecting to the demo database with Adminer, we should see the bakery_basket table. The table should contain three rows of data, as shown below.
pyspark_article_07_bakery_data

Developing Jupyter NoteBooks

The true power of the Jupyter Docker Stacks containers is Jupyter Notebooks. According to the Jupyter Project, the notebook extends the console-based approach to interactive computing in a qualitatively new direction, providing a web-based application suitable for capturing the whole computation process: developing, documenting, and executing code, as well as communicating the results. Notebook documents contain the inputs and outputs of an interactive session as well as additional text that accompanies the code but is not meant for execution.

To see the power of Jupyter Notebooks, I have written a basic notebook document, 04_pyspark_demo_notebook.ipynb. The document performs some typical PySpark functions, such as loading data from a CSV file and from the PostgreSQL database, performing some basic data analytics with Spark SQL, graphing the data using BokehJS, and finally, saving data back to the database, as well as to the popular Apache Parquet file format. Below we see the notebook document, using the Jupyter Notebook user interface.

pyspark_article_11_notebook.png

PostgreSQL Driver

The only notebook document dependency, not natively part of the Jupyter Image, is the PostgreSQL JDBC driver. The driver, postgresql-42.2.8.jar, is included in the project and referenced in the configuration of the notebook’s Spark Session. The JAR is added to the spark.driver.extraClassPath runtime environment property. This ensures the JAR is available to Spark (written in Scala) when the job is run.

PyCharm

Since the working directory for the project is shared with the container, you can also edit files, including notebook documents, in your favorite IDE, such as JetBrains PyCharm. PyCharm has built-in language support for Jupyter Notebooks, as shown below.
pyspark_article_11_notebook_pycharm.png

As mentioned earlier, a key feature of Jupyter Notebooks is their ability to save the output from each Cell as part of the notebook document. Below, we see the notebook document on GitHub. The output is saved, as part of the notebook document. Not only can you distribute the notebook document, but you can also preserve and share the output from each cell.
pyspark_article_17_github

Using Additional Packages

As mentioned in the Introduction, the Jupyter Docker Stacks come ready-to-run, with a rich variety of Python packages to extend their functionality.  To demonstrate the use of these packages, I have created a second Jupyter notebook document, 05_pyspark_demo_notebook.ipynb. This notebook document uses SciPy (Python-based mathematics, science, and engineering), NumPy (Python-based scientific computing), and the Plotly Python Graphing Library. While NumPy and SciPy are included on the Jupyter Docker Image, the Notebook used pip to install Plotly. Similar to Bokeh, shown previously, we can combine these libraries to create richly interactive data visualizations. To use Plotly, you will need to sign up for a free account and obtain a username and API key.

Shown below, we use Plotly to construct a bar chart of daily bakery items sold for the year 2017 based on the Kaggle dataset. The chart uses SciPy and NumPy to construct a linear fit (regression) and plot a line of best fit for the bakery data. The chart also uses SciPy’s Savitzky-Golay Filter to plot the second line, illustrating a smoothing of our bakery data.

pyspark_article_23a_plotly

Plotly also provides Chart Studio Online Chart Maker. Plotly describes Chart Studio as the world’s most sophisticated editor for creating d3.js and WebGL charts. Shown below, we have the ability to enhance, stylize, and share our bakery data visualization using the free version of Chart Studio Cloud.

pyspark_article_23b_plotly

nbviewer

Notebooks can also be viewed using Jupyter nbviewer, hosted on Rackspace. Below, we see the output of a cell from this project’s notebook document, showing a BokehJS chart, using nbviewer. You can view this project’s actual notebook document, using nbviewer, here.

pyspark_article_26_nbviewer.png

Monitoring Spark Jobs

The Jupyter Docker container exposes Spark’s monitoring and instrumentation web user interface. We can observe each Spark Job in great detail.
pyspark_article_12_spark_jobs

We can review details of each stage of the Spark job, including a visualization of the DAG, which Spark constructs as part of the job execution plan, using the DAG Scheduler.
pyspark_article_12_spark_dag

We can also review the timing of each event, occurring as part of the stages of the Spark job.
pyspark_article_12_timeline

We can also use the Spark interface to review and confirm the runtime environment, including versions of Java, Scala, and Spark, as well as packages available on the Java classpath.
pyspark_article_13_enviornment

Spark Performance

Spark, running on a single node within the Jupyter container, on your development system, is not a substitute for a full Spark cluster, running on bare metal or robust virtualized hardware, with YARN, Mesos, or Kubernetes. In my opinion, you should adjust Docker to support an acceptable performance profile for the stack, running only a modest workload. You are not trying to replace the need to run real jobs on a Production Spark cluster.
screen_shot_2019-06-07_at_4_50_25_pm

We can use the docker stats command to examine the container’s CPU and memory metrics:

docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"

Below, we see the stats from the stack’s three containers immediately after being deployed, showing little or no activity. Here, Docker has been allocated 2 CPUs, 3GB of RAM, and 2 GB of swap space available, from the host machine.
pyspark_article_16a_perf

Compare the stats above with the same three containers, while the example notebook document is running on Spark. The CPU shows a spike, but memory usage appears to be within acceptable ranges.
pyspark_article_16b_perf

Linux top and htop

Another option to examine container performance metrics is with top. We can use the docker exec command to execute the top command within the Jupyter container, sorting processes by CPU usage:

docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  top -o %CPU

With top, we can observe the individual performance of each process running in the Jupyter container.

pyspark_article_20_top.png

Lastly, htop, an interactive process viewer for Unix, can be installed into the container and ran with the following set of bash commands, from a Jupyter Terminal window or using docker exec:

docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  sh -c "apt-get update && apt-get install htop && htop --sort-key PERCENT_CPU"

With htop, we can observe individual CPU activity. The two CPUs at the top left of the htop window are the two CPUs assigned to Docker. We get insight into the way Docker is using each CPU, as well as other basic performance metrics, like memory and swap.

pyspark_article_16f_htop.png

Assuming your development machine host has them available, it is easy to allocate more compute resources to Docker if required. However, in my opinion, this stack is optimized for development and learning, using reasonably sized datasets for data analysis and ML. It should not be necessary to allocate excessive resources to Docker, possibly starving your host machine’s own compute capabilities.
screen_shot_2019-06-07_at_4_50_45_pm

Conclusion

In this brief post, we have seen how easy it is to get started learning and developing applications for big data analytics, using Python, Spark, and PySpark, thanks to the Jupyter Docker Stacks. We could use the same stack to learn and develop for machine learning, using Python, Scala, and R. Extending the stack’s capabilities is as simple as swapping out this Jupyter image for another, with a different set of tools, as well as adding additional containers to the stack, such as Apache Kafka or Apache Cassandra.

Search results courtesy GoogleTrends (https://trends.google.com)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients.

, , , , , , , , , , , ,

Leave a comment

Integrating Search Capabilities with Actions for Google Assistant, using GKE and Elasticsearch: Part 2

Voice and text-based conversational interfaces, such as chatbots, have recently seen tremendous growth in popularity. Much of this growth can be attributed to leading Cloud providers, such as Google, Amazon, and Microsoft, who now provide affordable, end-to-end development, machine learning-based training, and hosting platforms for conversational interfaces.

Cloud-based machine learning services greatly improve a conversational interface’s ability to interpret user intent with greater accuracy. However, the ability to return relevant responses to user inquiries, also requires interfaces have access to rich informational datastores, and the ability to quickly and efficiently query and analyze that data.

In this two-part post, we will enhance the capabilities of a voice and text-based conversational interface by integrating it with a search and analytics engine. By interfacing an Action for Google Assistant conversational interface with Elasticsearch, we will improve the Action’s ability to provide relevant results to the end-user. Instead of querying a traditional database for static responses to user intent, our Action will access a  Near Real-time (NRT) Elasticsearch index of searchable documents. The Action will leverage Elasticsearch’s advanced search and analytics capabilities to optimize and shape user responses, based on their intent.

Action Preview

Here is a brief YouTube video preview of the final Action for Google Assistant, integrated with Elasticsearch, running on an Apple iPhone.

Architecture

If you recall from part one of this post, the high-level architecture of our search engine-enhanced Action for Google Assistant resembles the following. Most of the components are running on Google Cloud.

Google Search Assistant Diagram GCP

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for the Spring Boot Service and one for the Action for Google Assistant. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Development Process

In part two of this post, we will tie everything together by creating and integrating our Action for Google Assistant:

  • Create the new Actions for Google Assistant project using the Actions on Google console;
  • Develop the Action’s Intents and Entities using the Dialogflow console;
  • Develop, deploy, and test the Cloud Function to GCP;

Let’s explore each step in more detail.

New ‘Actions on Google’ Project

With Elasticsearch running and the Spring Boot Service deployed to our GKE cluster, we can start building our Actions for Google Assistant. Using the Actions on Google web console, we first create a new Actions project.

wp-search-021

The Directory Information tab is where we define metadata about the project. This information determines how it will look in the Actions directory and is required to publish your project. The Actions directory is where users discover published Actions on the web and mobile devices.

wp-search-019

The Directory Information tab also includes sample invocations, which may be used to invoke our Actions.

wp-search-020

Actions and Intents

Our project will contain a series of related Actions. According to Google, an Action is ‘an interaction you build for the Assistant that supports a specific intent and has a corresponding fulfillment that processes the intent.’ To build our Actions, we first want to create our Intents. To do so, we will want to switch from the Actions on Google console to the Dialogflow console. Actions on Google provides a link for switching to Dialogflow in the Actions tab.

wp-search-022

We will build our Action’s Intents in Dialogflow. The term Intent, used by Dialogflow, is standard terminology across other voice-assistant platforms, such as Amazon’s Alexa and Microsoft’s Azure Bot Service and LUIS. In Dialogflow, will be building Intents — the Find Multiple Posts Intent, Find Post Intent, Find By ID Intent, and so forth.

wp-search-023

Below, we see the Find Post Intent. The Find Post Intent is responsible for handling our user’s requests for a single post about a topic, for example, ‘Find a post about Docker.’ The Intent shown below contains a fair number, but indeed not an exhaustive list, of training phrases. These represent possible ways a user might express intent when invoking the Action.

wp-search-026

Below, we see the Find Multiple Posts Intent. The Find Multiple Posts Intent is responsible for handling our user’s requests for a list of posts about a topic, for example, ‘I’m interested in Docker.’ Similar to the Find Post Intent above, the Find Multiple Posts Intent contains a list of training phrases.

wp-search-025

Dialog Model Training

According to Google, the greater the number of natural language examples in the Training Phrases section of Intents, the better the classification accuracy. Every time a user interacts with our Action, the user’s utterances are logged. Using the Training tab in the Dialogflow console, we can train our model by reviewing and approving or correcting how the Action handled the user’s utterances.

Below we see the user’s utterances, part of an interaction with the Action. We have the option to review and approve the Intent that was called to handle the utterance, re-assign it, or delete it. This helps improve our accuracy of our dialog model.

wp-search-039.png

Dialogflow Entities

Each of the highlighted words in the training phrases maps to the facts parameter, which maps to a collection of @topic Entities. Entities represent a list of intents the Action is trained to understand.  According to Google, there are three types of entities: ‘system’ (defined by Dialogflow), ‘developer’ (defined by a developer), and ‘user’ (built for each individual end-user in every request) objects. We will be creating ‘developer’ type entities for our Action’s Intents.

wp-search-037.png

Automated Expansion

We do not have to define all possible topics a user might search for, as an entity.  By enabling the Allow Automated Expansion option, an Agent will recognize values that have not been explicitly listed in the entity list. Google describes Agents as NLU (Natural Language Understanding) modules.

wp-search-042.png

Entity Synonyms

An entity may contain synonyms. Multiple synonyms are mapped to a single reference value. The reference value is the value passed to the Cloud Function by the Action. For example, take the reference value of ‘GCP.’ The user might ask Google about ‘GCP’. However, the user might also substitute the words ‘Google Cloud’ or ‘Google Cloud Platform.’ Using synonyms, if the user utters any of these three synonymous words or phrase in their intent, the reference value, ‘GCP’, is passed in the request.

But, what if the post contains the phrase, ‘Google Cloud Platform’ more frequently than, or instead of, ‘GCP’? If the acronym, ‘GCP’, is defined as the entity reference value, then it is the value passed to the function, even if you ask for ‘Google Cloud Platform’. In the use case of searching blog posts by topic, entity synonyms are not an effective search strategy.

Elasticsearch Synonyms

A better way to solve for synonyms is by using the synonyms feature of Elasticsearch. Take, for example, the topic of ‘Istio’, Istio is also considered a Service Mesh. If I ask for posts about ‘Service Mesh’, I would like to get back posts that contain the phrase ‘Service Mesh’, but also the word ‘Istio’. To accomplish this, you would define an association between ‘Istio’ and ‘Service Mesh’, as part of the Elasticsearch WordPress posts index.

wp-search-041d

Searches for ‘Istio’ against that index would return results that contain ‘Istio’ and/or contain ‘Service Mesh’; the reverse is also true. Having created and applied a custom synonyms filter to the index, we see how Elasticsearch responds to an analysis of the natural language style phrase, ‘What is a Service Mesh?’. As shown by the tokens output in Kibana’s Dev Tools Console, Elasticsearch understands that ‘service mesh’ is synonymous with ‘istio’.

wp-search-041g

If we query the same five fields as our Action, for the topic of ‘service mesh’, we get four hits for posts (indexed documents) that contain ‘service mesh’ and/or ‘istio’.

wp-search-041c

Actions on Google Integration

Another configuration item in Dialogflow that needs to be completed is the Dialogflow’s Actions on Google integration. This will integrate our Action with Google Assistant. Google currently provides more than fifteen different integrations, including Google Assistant, Slack, Facebook Messanger, Twitter, and Twilio, as shown below.

wp-search-028

To configure the Google Assistant integration, choose the Welcome Intent as our Action’s Explicit Invocation intent. Then we designate our other Intents as Implicit Invocation intents. According to Google, this Google Assistant Integration allows our Action to reach users on every device where the Google Assistant is available.

wp-search-029

Action Fulfillment

When a user’s intent is received, it is fulfilled by the Action. In the Dialogflow Fulfillment console, we see the Action has two fulfillment options, a Webhook or an inline-editable Cloud Function, edited inline. A Webhook allows us to pass information from a matched intent into a web service and get a result back from the service. Our Action’s Webhook will call our Cloud Function on GCP, using the Cloud Function’s URL endpoint (we’ll get this URL in the next section).

wp-search-030

Google Cloud Functions

Our Cloud Function, called by our Action, is written in Node.js. Our function, index.js, is divided into four sections, which are: constants and environment variables, intent handlers, helper functions, and the function’s entry point. The helper functions are part of the Helper module, contained in the helper.js file.

Constants and Environment Variables

The section, in both index.js and helper.js, defines the global constants and environment variables used within the function. Values that reference environment variables, such as SEARCH_API_HOSTNAME are defined in the .env.yaml file. All environment variables in the .env.yaml file will be set during the Cloud Function’s deployment, described later in this post. Environment variables were recently released, and are still considered beta functionality (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
'use strict';
/* CONSTANTS AND GLOBAL VARIABLES */
const Helper = require('./helper');
let helper = new Helper();
const {
dialogflow,
Button,
Suggestions,
BasicCard,
SimpleResponse,
List
} = require('actions-on-google');
const functions = require('firebase-functions');
const app = dialogflow({debug: true});
app.middleware(conv => {
conv.hasScreen =
conv.surface.capabilities.has('actions.capability.SCREEN_OUTPUT');
conv.hasAudioPlayback =
conv.surface.capabilities.has('actions.capability.AUDIO_OUTPUT');
});
const SUGGESTION_1 = 'tell me about Docker';
const SUGGESTION_2 = 'help';
const SUGGESTION_3 = 'cancel';

The npm module dependencies declared in this section are defined in the dependencies section of the package.json file. Function dependencies include Actions on Google, Firebase Functions, Winston, and Request (gist).

{
"name": "functionBlogSearchAction",
"description": "Programmatic Ponderings Search Action for Google Assistant",
"version": "1.0.0",
"private": true,
"license": "MIT License",
"author": "Gary A. Stafford",
"engines": {
"node": ">=8"
},
"scripts": {
"deploy": "sh ./deploy-cloud-function.sh"
},
"dependencies": {
"@google-cloud/logging-winston": "^0.9.0",
"actions-on-google": "^2.2.0",
"dialogflow": "^0.6.0",
"dialogflow-fulfillment": "^0.5.0",
"firebase-admin": "^6.0.0",
"firebase-functions": "^2.0.2",
"request": "^2.88.0",
"request-promise-native": "^1.0.5",
"winston": "2.4.4"
}
}
view raw package.json hosted with ❤ by GitHub

Intent Handlers

The intent handlers in this section correspond to the intents in the Dialogflow console. Each handler responds with a SimpleResponse, BasicCard, and Suggestion Chip response types, or  Simple Response, List, and Suggestion Chip response types. These response types were covered in part one of this post. (gist).

/* INTENT HANDLERS */
app.intent('Welcome Intent', conv => {
const WELCOME_TEXT_SHORT = 'What topic are you interested in reading about?';
const WELCOME_TEXT_LONG = `You can say things like: \n` +
` _'Find a post about GCP'_ \n` +
` _'I'd like to read about Kubernetes'_ \n` +
` _'I'm interested in Docker'_`;
conv.ask(new SimpleResponse({
speech: WELCOME_TEXT_SHORT,
text: WELCOME_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: WELCOME_TEXT_LONG,
title: 'Programmatic Ponderings Search',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Fallback Intent', conv => {
const FACTS_LIST = "Kubernetes, Docker, Cloud, DevOps, AWS, Spring, Azure, Messaging, and GCP";
const HELP_TEXT_SHORT = 'Need a little help?';
const HELP_TEXT_LONG = `Some popular topics include: ${FACTS_LIST}.`;
conv.ask(new SimpleResponse({
speech: HELP_TEXT_LONG,
text: HELP_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: HELP_TEXT_LONG,
title: 'Programmatic Ponderings Search Help',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Post Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let posts = await helper.getPostsByTopic(postTopic, 1);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
let post = posts[0];
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `The top result for '${postTopic}' is the post, '${post.post_title}', published ${formattedDate}, with a relevance score of ${post._score.toFixed(2)}`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate} \nScore: ${post._score.toFixed(2)}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Multiple Posts Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let postCount = 6;
let posts = await helper.getPostsByTopic(postTopic, postCount);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
const POST_SPOKEN = `Here's a list of the top ${posts.length} posts about '${postTopic}'`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
}));
let itemsArray = {};
posts.forEach(function (post) {
itemsArray[post.ID] = {
title: `Post ID ${post.ID}`,
description: `${post.post_title.substring(0,80)}... \nScore: ${post._score.toFixed(2)}`,
};
});
if (conv.hasScreen) {
conv.ask(new List({
title: 'Top Results',
items: itemsArray
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find By ID Intent', async (conv, {topic}) => {
let postId = topic.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Okay, I found that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Option Intent', async (conv, params, option) => {
let postId = option.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Sure, here's that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});

The Welcome Intent handler handles explicit invocations of our Action. The Fallback Intent handler handles both help requests, as well as cases when Dialogflow is unable to handle the user’s request.

As described above in the Dialogflow section, the Find Post Intent handler is responsible for handling our user’s requests for a single post about a topic. For example, ‘Find a post about Docker’. To fulfill the user request, the Find Post Intent handler, calls the Helper module’s getPostByTopic function, passing the topic requested and specifying a result set size of one post with the highest relevance score higher than an arbitrary value of  1.0.

Similarly, the Find Multiple Posts Intent handler is responsible for handling our user’s requests for a list of posts about a topic; for example, ‘I’m interested in Docker’. To fulfill the user request, the Find Multiple Posts Intent handler, calls the Helper module’s getPostsByTopic function, passing the topic requested and specifying a result set size of a maximum of six posts with the highest relevance scores greater than 1.0

The Find By ID Intent handler is responsible for handling our user’s requests for a specific, unique posts ID; for example, ‘Post ID 22141’. To fulfill the user request, the Find By ID Intent handler, calls the Helper module’s getPostById function, passing the unique Post ID (gist).

/* INTENT HANDLERS */
app.intent('Welcome Intent', conv => {
const WELCOME_TEXT_SHORT = 'What topic are you interested in reading about?';
const WELCOME_TEXT_LONG = `You can say things like: \n` +
` _'Find a post about GCP'_ \n` +
` _'I'd like to read about Kubernetes'_ \n` +
` _'I'm interested in Docker'_`;
conv.ask(new SimpleResponse({
speech: WELCOME_TEXT_SHORT,
text: WELCOME_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: WELCOME_TEXT_LONG,
title: 'Programmatic Ponderings Search',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Fallback Intent', conv => {
const FACTS_LIST = "Kubernetes, Docker, Cloud, DevOps, AWS, Spring, Azure, Messaging, and GCP";
const HELP_TEXT_SHORT = 'Need a little help?';
const HELP_TEXT_LONG = `Some popular topics include: ${FACTS_LIST}.`;
conv.ask(new SimpleResponse({
speech: HELP_TEXT_LONG,
text: HELP_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: HELP_TEXT_LONG,
title: 'Programmatic Ponderings Search Help',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Post Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let posts = await helper.getPostsByTopic(postTopic, 1);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
let post = posts[0];
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `The top result for '${postTopic}' is the post, '${post.post_title}', published ${formattedDate}, with a relevance score of ${post._score.toFixed(2)}`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate} \nScore: ${post._score.toFixed(2)}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Multiple Posts Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let postCount = 6;
let posts = await helper.getPostsByTopic(postTopic, postCount);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
const POST_SPOKEN = `Here's a list of the top ${posts.length} posts about '${postTopic}'`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
}));
let itemsArray = {};
posts.forEach(function (post) {
itemsArray[post.ID] = {
title: `Post ID ${post.ID}`,
description: `${post.post_title.substring(0,80)}... \nScore: ${post._score.toFixed(2)}`,
};
});
if (conv.hasScreen) {
conv.ask(new List({
title: 'Top Results',
items: itemsArray
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find By ID Intent', async (conv, {topic}) => {
let postId = topic.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Okay, I found that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Option Intent', async (conv, params, option) => {
let postId = option.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Sure, here's that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});

Entry Point

The entry point creates a way to handle the communication with Dialogflow’s fulfillment API (gist).

/* ENTRY POINT */
exports.functionBlogSearchAction = functions.https.onRequest(app);

Helper Functions

The helper functions are part of the Helper module, contained in the helper.js file. In addition to typical utility functions like formatting dates, there are two functions, which interface with Elasticsearch, via our Spring Boot API, getPostsByTopic and getPostById. As described above, the intent handlers call one of these functions to obtain search results from Elasticsearch.

The getPostsByTopic function handles both the Find Post Intent handler and Find Multiple Posts Intent handler, described above. The only difference in the two calls is the size of the response set, either one result or six results maximum (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
'use strict';
/* CONSTANTS AND GLOBAL VARIABLES */
const {
dialogflow,
BasicCard,
SimpleResponse,
} = require('actions-on-google');
const app = dialogflow({debug: true});
app.middleware(conv => {
conv.hasScreen =
conv.surface.capabilities.has('actions.capability.SCREEN_OUTPUT');
conv.hasAudioPlayback =
conv.surface.capabilities.has('actions.capability.AUDIO_OUTPUT');
});
const SEARCH_API_HOSTNAME = process.env.SEARCH_API_HOSTNAME;
const SEARCH_API_PORT = process.env.SEARCH_API_PORT;
const SEARCH_API_ENDPOINT = process.env.SEARCH_API_ENDPOINT;
const rpn = require('request-promise-native');
const winston = require('winston');
const Logger = winston.Logger;
const Console = winston.transports.Console;
const {LoggingWinston} = require('@google-cloud/logging-winston');
const loggingWinston = new LoggingWinston();
const logger = new Logger({
level: 'info', // log at 'info' and above
transports: [
new Console(),
loggingWinston,
],
});
/* HELPER FUNCTIONS */
module.exports = class Helper {
/**
* Returns an collection of ElasticsearchPosts objects based on a topic
* @param postTopic topic to search for
* @param responseSize
* @returns {Promise<any>}
*/
getPostsByTopic(postTopic, responseSize = 1) {
return new Promise((resolve, reject) => {
const SEARCH_API_RESOURCE = `dismax-search?value=${postTopic}&start=0&size=${responseSize}&minScore=1`;
const SEARCH_API_URL = `http://${SEARCH_API_HOSTNAME}:${SEARCH_API_PORT}/${SEARCH_API_ENDPOINT}/${SEARCH_API_RESOURCE}`;
logger.info(`getPostsByTopic API URL: ${SEARCH_API_URL}`);
let options = {
uri: SEARCH_API_URL,
json: true
};
rpn(options)
.then(function (posts) {
posts = posts.ElasticsearchPosts;
logger.info(`getPostsByTopic Posts: ${JSON.stringify(posts)}`);
resolve(posts);
})
.catch(function (err) {
logger.error(`Error: ${err}`);
reject(err)
});
});
}
// truncated for brevity
};
view raw helper-1.js hosted with ❤ by GitHub

Both functions use the request and request-promise-native npm modules to call the Spring Boot service’s RESTful API over HTTP. However, instead of returning a callback, the request-promise-native module allows us to return a native ES6 Promise. By returning a promise, we can use async/await with our Intent handlers. Using async/await with Promises is a newer way of handling asynchronous operations in Node.js. The asynchronous programming model, using promises, is described in greater detail in my previous post, Building Serverless Actions for Google Assistant with Google Cloud Functions, Cloud Datastore, and Cloud Storage.

ThegetPostById function handles both the Find By ID Intent handler and Option Intent handler, described above. This function is similar to the getPostsByTopic function, calling a Spring Boot service’s RESTful API endpoint and passing the Post ID (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// truncated for brevity
module.exports = class Helper {
/**
* Returns a single result based in the Post ID
* @param postId ID of the Post to search for
* @returns {Promise<any>}
*/
getPostById(postId) {
return new Promise((resolve, reject) => {
const SEARCH_API_RESOURCE = `${postId}`;
const SEARCH_API_URL = `http://${SEARCH_API_HOSTNAME}:${SEARCH_API_PORT}/${SEARCH_API_ENDPOINT}/${SEARCH_API_RESOURCE}`;
logger.info(`getPostById API URL: ${SEARCH_API_URL}`);
let options = {
uri: SEARCH_API_URL,
json: true
};
rpn(options)
.then(function (post) {
post = post.ElasticsearchPosts;
logger.info(`getPostById Post: ${JSON.stringify(post)}`);
resolve(post);
})
.catch(function (err) {
logger.error(`Error: ${err}`);
reject(err)
});
});
}
// truncated for brevity
};
view raw helper-2.js hosted with ❤ by GitHub

Cloud Function Deployment

To deploy the Cloud Function to GCP, use the gcloud CLI with the beta version of the functions deploy command. According to Google, gcloud is a part of the Google Cloud SDK. You must download and install the SDK on your system and initialize it before you can use gcloud. Currently, Cloud Functions are only available in four regions. I have included a shell scriptdeploy-cloud-function.sh, to make this step easier. It is called using the npm run deploy function. (gist).

#!/usr/bin/env sh
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
set -ex
# Set constants
REGION="us-east1"
FUNCTION_NAME="<your_function_name>"
# Deploy the Google Cloud Function
gcloud beta functions deploy ${FUNCTION_NAME} \
--runtime nodejs8 \
--region ${REGION} \
--trigger-http \
--memory 256MB \
--env-vars-file .env.yaml

The creation or update of the Cloud Function can take up to two minutes. Note the output indicates the environment variables, contained in the .env.yaml file, have been deployed. The URL endpoint of the function and the function’s entry point are also both output.

wp-search-031.png

If you recall, the URL endpoint of the Cloud Function is required in the Dialogflow Fulfillment tab. The URL can be retrieved from the deployment output (shown above). The Cloud Function is now deployed and will be called by the Action when a user invokes the Action.

What is Deployed

The .gcloudignore file is created the first time you deploy a new function. Using the the .gcloudignore file, you limit the files deployed to GCP. For this post, of all the files in the project, only four files, index.js, helper.js, package.js, and the PNG file used in the Action’s responses, need to be deployed. All other project files are ear-marked in the .gcloudignore file to avoid being deployed.

wp-search-038.png

Simulation Testing and Debugging

With our Action and all its dependencies deployed and configured, we can test the Action using the Simulation console on Actions on Google. According to Google, the Action Simulation console allows us to manually test our Action by simulating a variety of Google-enabled hardware devices and their settings.

Below, in the Simulation console, we see the successful display of our Programmatic Ponderings Search Action for Google Assistant containing the expected Simple Response, List, and Suggestion Chips response types, triggered by a user’s invocation of the Action.

wp-search-035

The simulated response indicates that the Google Cloud Function was called, and it responded successfully. That also indicates the Dialogflow-based Action successfully communicated with the Cloud Function, the Cloud Function successfully communicated with the Spring Boot service instances running on Google Kubernetes Engine, and finally, the Spring Boot services successfully communicated with Elasticsearch running on Google Compute Engine.

If we had issues with the testing, the Action Simulation console also contains tabs containing the request and response objects sent to and from the Cloud Function, the audio response, a debug console, any errors, and access to the logs.

Stackdriver Logging

In the log output below, from our Cloud Function, we see our Cloud Function’s activities. These activities including information log entries, which we explicitly defined in our Cloud Function using the winston and @google-cloud/logging-winston npm modules. According to Google, the author of the module, Stackdriver Logging for Winston provides an easy to use, higher-level layer (transport) for working with Stackdriver Logging, compatible with Winston. Developing an effective logging strategy is essential to maintaining and troubleshooting your code in Development, as well as Production.

wp-search-036

Conclusion

In this two-part post, we observed how the capabilities of a voice and text-based conversational interface, such as an Action for Google Assistant, may be enhanced through integration with a search and analytics engine, such as Elasticsearch. This post barely scraped the surface of what could be achieved with such an integration. Elasticsearch, as well as other leading Lucene-based search and analytics engines, such as Apache Solr, have tremendous capabilities, which are easily integrated to machine learning-based conversational interfaces, resulting in a more powerful and a more intuitive end-user experience.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, or Google.

, , , , , , , , , , , , ,

3 Comments

Integrating Search Capabilities with Actions for Google Assistant, using GKE and Elasticsearch: Part 1

Voice and text-based conversational interfaces, such as chatbots, have recently seen tremendous growth in popularity. Much of this growth can be attributed to leading Cloud providers, such as Google, Amazon, and Microsoft, who now provide affordable, end-to-end development, machine learning-based training, and hosting platforms for conversational interfaces.

Cloud-based machine learning services greatly improve a conversational interface’s ability to interpret user intent with greater accuracy. However, the ability to return relevant responses to user inquiries, also requires interfaces have access to rich informational datastores, and the ability to quickly and efficiently query and analyze that data.

In this two-part post, we will enhance the capabilities of a voice and text-based conversational interface by integrating it with a search and analytics engine. By interfacing an Action for Google Assistant conversational interface with Elasticsearch, we will improve the Action’s ability to provide relevant results to the end-user. Instead of querying a traditional database for static responses to user intent, our Action will access a  Near Realtime (NRT) Elasticsearch index of searchable documents. The Action will leverage Elasticsearch’s advanced search and analytics capabilities to optimize and shape user responses, based on their intent.

Action Preview

Here is a brief YouTube video preview of the final Action for Google Assistant, integrated with Elasticsearch, running on an Apple iPhone.

Google Technologies

The high-level architecture of our search engine-enhanced Action for Google Assistant will look as follows.

Google Search Assistant Diagram GCP

Here is a brief overview of the key technologies we will incorporate into our architecture.

Actions on Google

According to Google, Actions on Google is the platform for developers to extend the Google Assistant. Actions on Google is a web-based platform that provides a streamlined user-experience to create, manage, and deploy Actions. We will use the Actions on Google platform to develop our Action in this post.

Dialogflow

According to Google, Dialogflow is an enterprise-grade NLU platform that makes it easy for developers to design and integrate conversational user interfaces into mobile apps, web applications, devices, and bots. Dialogflow is powered by Google’s machine learning for Natural Language Processing (NLP).

Google Cloud Functions

Google Cloud Functions are part of Google’s event-driven, serverless compute platform, part of the Google Cloud Platform (GCP). Google Cloud Functions are analogous to Amazon’s AWS Lambda and Azure Functions. Features include automatic scaling, high availability, fault tolerance, no servers to provision, manage, patch or update, and a payment model based on the function’s execution time.

Google Kubernetes Engine

Kubernetes Engine is a managed, production-ready environment, available on GCP, for deploying containerized applications. According to Google, Kubernetes Engine is a reliable, efficient, and secure way to run Kubernetes clusters in the Cloud.

Elasticsearch

Elasticsearch is a leading, distributed, RESTful search and analytics engine. Elasticsearch is a product of Elastic, the company behind the Elastic Stack, which includes Elasticsearch, Kibana, Beats, Logstash, X-Pack, and Elastic Cloud. Elasticsearch provides a distributed, multitenant-capable, full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is similar to Apache Solr in terms of features and functionality. Both Solr and Elasticsearch is based on Apache Lucene.

Other Technologies

In addition to the major technologies highlighted above, the project also relies on the following:

  • Google Container Registry – As an alternative to Docker Hub, we will store the Spring Boot API service’s Docker Image in Google Container Registry, making deployment to GKE a breeze.
  • Google Cloud Deployment Manager – Google Cloud Deployment Manager allows users to specify all the resources needed for application in a declarative format using YAML. The Elastic Stack will be deployed with Deployment Manager.
  • Google Compute Engine – Google Compute Engine delivers scalable, high-performance virtual machines (VMs) running in Google’s data centers, on their worldwide fiber network.
  • Google Stackdriver – Stackdriver aggregates metrics, logs, and events from our Cloud-based project infrastructure, for troubleshooting.  We are also integrating Stackdriver Logging for Winston into our Cloud Function for fast application feedback.
  • Google Cloud DNS – Hosts the primary project domain and subdomains for the search engine and API. Google Cloud DNS is a scalable, reliable and managed authoritative Domain Name System (DNS) service running on the same infrastructure as Google.
  • Google VPC Network FirewallFirewall rules provide fine-grain, secure access controls to our API and search engine. We will several firewall port openings to talk to the Elastic Stack.
  • Spring Boot – Pivotal’s Spring Boot project makes it easy to create stand-alone, production-grade Spring-based Java applications, such as our Spring Boot service.
  • Spring Data Elasticsearch – Pivotal Software’s Spring Data Elasticsearch project provides easy integration to Elasticsearch from our Java-based Spring Boot service.

Demonstration

To demonstrate an Action for Google Assistant with search engine integration, we need an index of content to search. In this post, we will build an informational Action, the Programmatic Ponderings Search Action, that responds to a user’s interests in certain technical topics, by returning post suggestions from the Programmatic Ponderings blog. For this demonstration, I have indexed the last two years worth of blog posts into Elasticsearch, using the ElasticPress WordPress plugin.

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for the Spring Boot Service and one for the Action for Google Assistant. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Development Process

This post will focus on the development and integration of the Action for Google Assistant with Elasticsearch, via a Google Cloud Function, Kubernetes Engine, and the Spring Boot API service. The post is not intended to be a general how-to on developing for Actions for Google Assistant, Google Cloud Platform, Elasticsearch, or WordPress.

Building and integrating the Action will involve the following steps:

  • Design the Action’s conversation model;
  • Provision the Elastic Stack on Google Compute Engine using Deployment Manager;
  • Create an Elasticsearch index of blog posts;
  • Provision the Kubernetes cluster on GCP with GKE;
  • Develop and deploy the Spring Boot API service to Kubernetes;

Covered in Part Two of the Post:

  • Create a new Actions project using the Actions on Google;
  • Develop the Action’s Intents using the Dialogflow;
  • Develop, deploy, and test the Cloud Function to GCP;

Let’s explore each step in more detail.

Conversational Model

The conversational model design of the Programmatic Ponderings Search Action for Google Assistant will have the option to invoke the Action in two ways, with or without intent. Below on the left, we see an example of an invocation of the Action – ‘Talk to Programmatic Ponderings’. Google Assistant then responds to the user for more information (intent) – ‘What topic are you interested in reading about?’.

sample-dialog-1.png

Below on the left, we see an invocation of the Action, which includes the intent – ‘Ask Programmatic Ponderings to find a post about Kubernetes’. Google Assistant will respond directly, both verbally and visually with the most relevant post.

sample-dialog-2

When a user requests a single result, for example, ‘Find a post about Docker’, Google Assistant will include Simple ResponseBasic Card, and Suggestion Chip response types for devices with a display. This is shown in the center, above. The user may continue to ask for additional facts or choose to cancel the Action at any time.

When a user requests multiple results, for example, ‘I’m interested in Docker’, Google Assistant will include Simple ResponseList, and Suggestion Chip response types for devices with a display. An example of a List Response is shown in the center of the previous set of screengrabs, above. The user will receive up to six results in the list, with a relevance score of 1.0 or greater. The user may choose to click on any of the post results in the list, which will initiate a new search using the post’s unique ID, as shown on the right, in the first set of screengrabs, above.

The conversational model also understands a request for help and to cancel the interaction.

GCP Account and Project

The following steps assume you have an existing GCP account and you have created a project on GCP to house the Cloud Function, GKE Cluster, and Elastic Stack on Google Compute Engine. The post also assumes that you have the latest Google Cloud SDK installed on your development machine, and have authenticated your identity from the command line (gist).

# Authenticate with the Google Cloud SDK
export PROJECT_ID="<your_project_id>"
gcloud beta auth login
gcloud config set project ${PROJECT_ID}
# Update components or new runtime nodejs8 may be unknown
gcloud components update

Elasticsearch on GCP

There are a number of options available to host Elasticsearch. Elastic, the company behind Elasticsearch, offers the Elasticsearch Service, a fully managed, scalable, and reliable service on AWS and GCP. AWS also offers their own managed Elasticsearch Service. I found some limitations with AWS’ Elasticsearch Service, which made integration with Spring Data Elasticsearch difficult. According to AWS, the service supports HTTP but does not support TCP transport.

For this post, we will stand up the Elastic Stack on GCP using an offering from the Google Cloud Platform Marketplace. A well-known provider of packaged applications for multiple Cloud platforms, Bitnami, offers the ELK Stack (the previous name for the Elastic Stack), running on Google Compute Engine.

wp-search-004.png

GCP Marketplace Solutions are deployed using the Google Cloud Deployment Manager.  The Bitnami ELK solution is a complete stack with all the necessary software and software-defined Cloud infrastructure to securely run Elasticsearch. You select the instance’s zone(s), machine type, boot disk size, and security and networking configurations. Using that configuration, the Deployment Manager will deploy the solution and provide you with information and credentials for accessing the Elastic Stack. For this demo, we will configure a minimally-sized, single VM instance to run the Elastic Stack.

wp-search-005.png

Below we see the Bitnami ELK stack’s components being created on GCP, by the Deployment Manager.

wp-search-006.png

Indexed Content

With the Elastic Stack fully provisioned, I then configured WordPress to index the last two years of the Programmatic Pondering blog posts to Elasticsearch on GCP. If you want to follow along with this post and content to index, there is plenty of open source and public domain indexable content available on the Internet – books, movie lists, government and weather data, online catalogs of products, and so forth. Anything in a document database is directly indexable in Elasticsearch. Elastic even provides a set of index samples, available on their GitHub site.

wp-search-009

Firewall Ports for Elasticseach

The Deployment Manager opens up firewall ports 80 and 443. To index the WordPress posts, I also had to open port 9200. According to Elastic, Elasticsearch uses port 9200 for communicating with their RESTful API with JSON over HTTP. For security, I locked down this firewall opening to my WordPress server’s address as the source. (gist).

SOURCE_IP=<wordpress_ip_address>
PORT=9200
gcloud compute \
--project=wp-search-bot \
firewall-rules create elk-1-tcp-${PORT} \
--description=elk-1-tcp-${PORT} \
--direction=INGRESS \
--priority=1000 \
--network=default \
--action=ALLOW \
--rules=tcp:${PORT} \
--source-ranges=${SOURCE_IP} \
--target-tags=elk-1-tcp-${PORT}

The two existing firewall rules for port opening 80 and 443 should also be locked down to your own IP address as the source. Common Elasticsearch ports are constantly scanned by Hackers, who will quickly hijack your Elasticsearch contents and hold them for ransom, in addition to deleting your indexes. Similar tactics are used on well-known and unprotected ports for many platforms, including Redis, MySQL, PostgreSQL, MongoDB, and Microsoft SQL Server.

Kibana

Once the posts are indexed, the best way to view the resulting Elasticsearch documents is through Kibana, which is included as part of the Bitnami solution. Below we see approximately thirty posts, spread out across two years.

wp-search-010.png

Each Elasticsearch document, representing an indexed WordPress blog post, contains over 125 fields of information. Fields include a unique post ID, post title, content, publish date, excerpt, author, URL, and so forth. All these fields are exposed through Elasticsearch’s API, and as we will see,  will be available to our Spring Boot service to query.

wp-search-011.png

Spring Boot Service

To ensure decoupling between the Action for Google Assistant and Elasticsearch, we will expose a RESTful search API, written in Java using Spring Boot and Spring Data Elasticsearch. The API will expose a tailored set of flexible endpoints to the Action. Google’s machine learning services will ensure our conversational model is trained to understand user intent. The API’s query algorithm and Elasticsearch’s rich Lucene-based search features will ensure the most relevant results are returned. We will host the Spring Boot service on Google Kubernetes Engine (GKE).

Will use a Spring Rest Controller to expose our RESTful web service’s resources to our Action’s Cloud Function. The current Spring Boot service contains five /elastic resource endpoints exposed by the ElasticsearchPostController class . Of those five, two endpoints will be called by our Action in this demo, the /{id} and the /dismax-search endpoints. The endpoints can be seen using the Swagger UI. Our Spring Boot service implements SpringFox, which has the option to expose the Swagger interactive API UI.

wp-search-017.png

The /{id} endpoint accepts a unique post ID as a path variable in the API call and returns a single ElasticsearchPost object wrapped in a Map object, and serialized to a  JSON payload (gist).

@RequestMapping(value = "/{id}")
@ApiOperation(value = "Returns a post by id")
public Map<String, Optional<ElasticsearchPost>> findById(@PathVariable("id") long id) {
Optional<ElasticsearchPost> elasticsearchPost = elasticsearchPostRepository.findById(id);
Map<String, Optional<ElasticsearchPost>> elasticsearchPostMap = new HashMap<>();
elasticsearchPostMap.put("ElasticsearchPosts", elasticsearchPost);
return elasticsearchPostMap;
}

Below we see an example response from the Spring Boot service to an API call to the /{id} endpoint, for post ID 22141. Since we are returning a single post, based on ID, the relevance score will always be 0.0 (gist).

# http http://api.chatbotzlabs.com/blog/api/v1/elastic/22141
HTTP/1.1 200
Content-Type: application/json;charset=UTF-8
Date: Mon, 17 Sep 2018 23:15:01 GMT
Transfer-Encoding: chunked
{
"ElasticsearchPosts": {
"ID": 22141,
"_score": 0.0,
"guid": "https://programmaticponderings.com/?p=22141&quot;,
"post_date": "2018-04-13 12:45:19",
"post_excerpt": "Learn to manage distributed applications, spanning multiple Kubernetes environments, using Istio on GKE.",
"post_title": "Managing Applications Across Multiple Kubernetes Environments with Istio: Part 1"
}
}
view raw post-id-endpoint.txt hosted with ❤ by GitHub

This controller’s /{id} endpoint relies on a method exposed by the ElasticsearchPostRepository interface. The ElasticsearchPostRepository is a Spring Data Repository , which extends ElasticsearchRepository. The repository exposes the findById() method, which returns a single instance of the type, ElasticsearchPost, from Elasticsearch (gist).

package com.example.elasticsearch.repository;
import com.example.elasticsearch.model.ElasticsearchPost;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
public interface ElasticsearchPostRepository extends ElasticsearchRepository<ElasticsearchPost, Long> {
}

The ElasticsearchPost class is annotated as an Elasticsearch Document, similar to other Spring Data Document annotations, such as Spring Data MongoDB. The ElasticsearchPost class is instantiated to hold deserialized JSON documents stored in ElasticSeach stores indexed data (gist).

package com.example.elasticsearch.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import java.io.Serializable;
@JsonIgnoreProperties(ignoreUnknown = true)
@Document(indexName = "<elasticsearch_index_name>", type = "post")
public class ElasticsearchPost implements Serializable {
@Id
@JsonProperty("ID")
private long id;
@JsonProperty("_score")
private float score;
@JsonProperty("post_title")
private String title;
@JsonProperty("post_date")
private String publishDate;
@JsonProperty("post_excerpt")
private String excerpt;
@JsonProperty("guid")
private String url;
// Setters removed for brevity...
}

Dis Max Query

The second API endpoint called by our Action is the /dismax-search endpoint. We use this endpoint to search for a particular post topic, such as ’Docker’. This type of search, as opposed to the Spring Data Repository method used by the /{id} endpoint, requires the use of an ElasticsearchTemplate. The ElasticsearchTemplate allows us to form more complex Elasticsearch queries than is possible using an ElasticsearchRepository class. Below, the /dismax-search endpoint accepts four input request parameters in the API call, which are the topic to search for, the starting point and size of the response to return, and the minimum relevance score (gist).

@RequestMapping(value = "/dismax-search")
@ApiOperation(value = "Performs dismax search and returns a list of posts containing the value input")
public Map<String, List<ElasticsearchPost>> dismaxSearch(@RequestParam("value") String value,
@RequestParam("start") int start,
@RequestParam("size") int size,
@RequestParam("minScore") float minScore) {
List<ElasticsearchPost> elasticsearchPosts = elasticsearchService.dismaxSearch(value, start, size, minScore);
Map<String, List<ElasticsearchPost>> elasticsearchPostMap = new HashMap<>();
elasticsearchPostMap.put("ElasticsearchPosts", elasticsearchPosts);
return elasticsearchPostMap;
}

The logic to create and execute the ElasticsearchTemplate is handled by the ElasticsearchService class. The ElasticsearchPostController calls the ElasticsearchService. The ElasticsearchService handles querying Elasticsearch and returning a list of ElasticsearchPost objects to the ElasticsearchPostController. The dismaxSearch method, called by the /dismax-search endpoint’s method constructs the ElasticsearchTemplate instance, used to build the request to Elasticsearch’s RESTful API (gist).

public List<ElasticsearchPost> dismaxSearch(String value, int start, int size, float minScore) {
QueryBuilder queryBuilder = getQueryBuilder(value);
Client client = elasticsearchTemplate.getClient();
SearchResponse response = client.prepareSearch()
.setQuery(queryBuilder)
.setSize(size)
.setFrom(start)
.setMinScore(minScore)
.addSort("_score", SortOrder.DESC)
.setExplain(true)
.execute()
.actionGet();
List<SearchHit> searchHits = Arrays.asList(response.getHits().getHits());
ObjectMapper mapper = new ObjectMapper();
List<ElasticsearchPost> elasticsearchPosts = new ArrayList<>();
searchHits.forEach(hit -> {
try {
elasticsearchPosts.add(mapper.readValue(hit.getSourceAsString(), ElasticsearchPost.class));
elasticsearchPosts.get(elasticsearchPosts.size() - 1).setScore(hit.getScore());
} catch (IOException e) {
e.printStackTrace();
}
});
return elasticsearchPosts;
}

To obtain the most relevant search results, we will use Elasticsearch’s Dis Max Query combined with the Match Phrase Query. Elastic describes the Dis Max Query as:

‘a query that generates the union of documents produced by its subqueries, and that scores each document with the maximum score for that document as produced by any subquery, plus a tie breaking increment for any additional matching subqueries.

In short, the Dis Max Query allows us to query and weight (boost importance) multiple indexed fields, across all documents. The Match Phrase Query analyzes the text (our topic) and creates a phrase query out of the analyzed text.

After some experimentation, I found the valid search results were returned by applying greater weighting (boost) to the post’s title and excerpt, followed by the post’s tags and categories, and finally, the actual text of the post. I also limited results to a minimum score of 1.0. Just because a word or phrase is repeated in a post, doesn’t mean it is indicative of the post’s subject matter. Setting a minimum score attempts to help ensure the requested topic is featured more prominently in the resulting post or posts. Increasing the minimum score will decrease the number of search results, but theoretically, increase their relevance (gist).

private QueryBuilder getQueryBuilder(String value) {
value = value.toLowerCase();
return QueryBuilders.disMaxQuery()
.add(matchPhraseQuery("post_title", value).boost(3))
.add(matchPhraseQuery("post_excerpt", value).boost(3))
.add(matchPhraseQuery("terms.post_tag.name", value).boost(2))
.add(matchPhraseQuery("terms.category.name", value).boost(2))
.add(matchPhraseQuery("post_content", value).boost(1));
}

Below we see the results of a /dismax-search API call to our service, querying for posts about the topic, ’Istio’, with a minimum score of 2.0. The search resulted in a serialized JSON payload containing three ElasticsearchPost objects (gist).

http http://api.chatbotzlabs.com/blog/api/v1/elastic/dismax-search?minScore=2&size=3&start=0&value=Istio
HTTP/1.1 200
Content-Type: application/json;charset=UTF-8
Date: Tue, 18 Sep 2018 03:50:35 GMT
Transfer-Encoding: chunked
{
"ElasticsearchPosts": [
{
"ID": 21867,
"_score": 5.91989,
"guid": "https://programmaticponderings.com/?p=21867&quot;,
"post_date": "2017-12-22 16:04:17",
"post_excerpt": "Learn to deploy and configure Istio on Google Kubernetes Engine (GKE).",
"post_title": "Deploying and Configuring Istio on Google Kubernetes Engine (GKE)"
},
{
"ID": 22313,
"_score": 3.6616292,
"guid": "https://programmaticponderings.com/?p=22313&quot;,
"post_date": "2018-04-17 07:01:38",
"post_excerpt": "Learn to manage distributed applications, spanning multiple Kubernetes environments, using Istio on GKE.",
"post_title": "Managing Applications Across Multiple Kubernetes Environments with Istio: Part 2"
},
{
"ID": 22141,
"_score": 3.6616292,
"guid": "https://programmaticponderings.com/?p=22141&quot;,
"post_date": "2018-04-13 12:45:19",
"post_excerpt": "Learn to manage distributed applications, spanning multiple Kubernetes environments, using Istio on GKE.",
"post_title": "Managing Applications Across Multiple Kubernetes Environments with Istio: Part 1"
}
]
}

Understanding Relevance Scoring

When returning search results, such as in the example above, the top result is the one with the highest score. The highest score should denote the most relevant result to the search query. According to Elastic, in their document titled, The Theory Behind Relevance Scoring, scoring is explained this way:

‘Lucene (and thus Elasticsearch) uses the Boolean model to find matching documents, and a formula called the practical scoring function to calculate relevance. This formula borrows concepts from term frequency/inverse document frequency and the vector space model but adds more-modern features like a coordination factor, field length normalization, and term or query clause boosting.’

In order to better understand this technical explanation of relevance scoring, it is much easy to see it applied to our example. Note the first search result above, Post ID 21867, has the highest score, 5.91989. Knowing that we are searching five fields (title, excerpt, tags, categories, and content), and boosting certain fields more than others, how was this score determined? Conveniently, Spring Data Elasticsearch’s SearchRequestBuilder class exposed the setExplain method. We can see this on line 12 of the dimaxQuery method, shown above. By passing a boolean value of true to the setExplain method, we are able to see the detailed scoring algorithms used by Elasticsearch for the top result, shown above (gist).

5.9198895 = max of:
5.8995476 = weight(post_title:istio in 3) [PerFieldSimilarity], result of:
5.8995476 = score(doc=3,freq=1.0 = termFreq=1.0), product of:
3.0 = boost
1.6739764 = idf, computed as log(1 + (docCount - docFreq + 0.5) / (docFreq + 0.5)) from:
1.0 = docFreq
7.0 = docCount
1.1747572 = tfNorm, computed as (freq * (k1 + 1)) / (freq + k1 * (1 - b + b * fieldLength / avgFieldLength)) from:
1.0 = termFreq=1.0
1.2 = parameter k1
0.75 = parameter b
11.0 = avgFieldLength
7.0 = fieldLength
5.9198895 = weight(post_excerpt:istio in 3) [PerFieldSimilarity], result of:
5.9198895 = score(doc=3,freq=1.0 = termFreq=1.0), product of:
3.0 = boost
1.6739764 = idf, computed as log(1 + (docCount - docFreq + 0.5) / (docFreq + 0.5)) from:
1.0 = docFreq
7.0 = docCount
1.1788079 = tfNorm, computed as (freq * (k1 + 1)) / (freq + k1 * (1 - b + b * fieldLength / avgFieldLength)) from:
1.0 = termFreq=1.0
1.2 = parameter k1
0.75 = parameter b
12.714286 = avgFieldLength
8.0 = fieldLength
3.3479528 = weight(terms.post_tag.name:istio in 3) [PerFieldSimilarity], result of:
3.3479528 = score(doc=3,freq=1.0 = termFreq=1.0), product of:
2.0 = boost
1.6739764 = idf, computed as log(1 + (docCount - docFreq + 0.5) / (docFreq + 0.5)) from:
1.0 = docFreq
7.0 = docCount
1.0 = tfNorm, computed as (freq * (k1 + 1)) / (freq + k1 * (1 - b + b * fieldLength / avgFieldLength)) from:
1.0 = termFreq=1.0
1.2 = parameter k1
0.75 = parameter b
16.0 = avgFieldLength
16.0 = fieldLength
2.52272 = weight(post_content:istio in 3) [PerFieldSimilarity], result of:
2.52272 = score(doc=3,freq=100.0 = termFreq=100.0), product of:
1.1631508 = idf, computed as log(1 + (docCount - docFreq + 0.5) / (docFreq + 0.5)) from:
2.0 = docFreq
7.0 = docCount
2.1688676 = tfNorm, computed as (freq * (k1 + 1)) / (freq + k1 * (1 - b + b * fieldLength / avgFieldLength)) from:
100.0 = termFreq=100.0
1.2 = parameter k1
0.75 = parameter b
2251.1428 = avgFieldLength
2840.0 = fieldLength

What this detail shows us is that of the five fields searched, the term ‘Istio’ was located in four of the five fields (all except ‘categories’). Using the practical scoring function described by Elasticsearch, and taking into account our boost values, we see that the post’s ‘excerpt’ field achieved the highest score of 5.9198895 (score of 1.6739764 * boost of 3.0).

Being able to view the scoring explanation helps us tune our search results. For example, according to the details, the term ‘Istio’ appeared 100 times (termFreq=100.0) in the main body of the post (the ‘content’ field). We might ask ourselves if we are giving enough relevance to the content as opposed to other fields. We might choose to increase the boost or decrease other fields with respect to the ‘content’ field, to produce higher quality search results.

Google Kubernetes Engine

With the Elastic Stack running on Google Compute Engine, and the Spring Boot API service built, we can now provision a Kubernetes cluster to run our Spring Boot service. The service will sit between our Action’s Cloud Function and Elasticsearch. We will use Google Kubernetes Engine (GKE) to manage our Kubernete cluster on GCP. A GKE cluster is a managed group of uniform VM instances for running Kubernetes. The VMs are managed by Google Compute Engine. Google Compute Engine delivers virtual machines running in Google’s data centers, on their worldwide fiber network.

A GKE cluster can be provisioned using GCP’s Cloud Console or using the Cloud SDK, Google’s command-line interface for Google Cloud Platform products and services. I prefer using the CLI, which helps enable DevOps automation through tools like Jenkins and Travis CI (gist).

GCP_PROJECT="wp-search-bot"
GKE_CLUSTER="wp-search-cluster"
GCP_ZONE="us-east1-b"
NODE_COUNT="1"
INSTANCE_TYPE="n1-standard-1"
GKE_VERSION="1.10.7-gke.1"
gcloud beta container \
--project ${GCP_PROJECT} clusters create ${GKE_CLUSTER} \
--zone ${GCP_ZONE} \
--username "admin" \
--cluster-version ${GKE_VERION} \
--machine-type ${INSTANCE_TYPE} --image-type "COS" \
--disk-type "pd-standard" --disk-size "100" \
--scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \
--num-nodes ${NODE_COUNT} \
--enable-cloud-logging --enable-cloud-monitoring \
--network "projects/wp-search-bot/global/networks/default" \
--subnetwork "projects/wp-search-bot/regions/us-east1/subnetworks/default" \
--additional-zones "us-east1-b","us-east1-c","us-east1-d" \
--addons HorizontalPodAutoscaling,HttpLoadBalancing \
--no-enable-autoupgrade --enable-autorepair

Below is the command I used to provision a minimally sized three-node GKE cluster, replete with the latest available version of Kubernetes. Although a one-node cluster is sufficient for early-stage development, testing should be done on a multi-node cluster to ensure the service will operate properly with multiple instances running behind a load-balancer (gist).

GCP_PROJECT="wp-search-bot"
GKE_CLUSTER="wp-search-cluster"
GCP_ZONE="us-east1-b"
NODE_COUNT="1"
INSTANCE_TYPE="n1-standard-1"
GKE_VERSION="1.10.7-gke.1"
gcloud beta container \
--project ${GCP_PROJECT} clusters create ${GKE_CLUSTER} \
--zone ${GCP_ZONE} \
--username "admin" \
--cluster-version ${GKE_VERION} \
--machine-type ${INSTANCE_TYPE} --image-type "COS" \
--disk-type "pd-standard" --disk-size "100" \
--scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \
--num-nodes ${NODE_COUNT} \
--enable-cloud-logging --enable-cloud-monitoring \
--network "projects/wp-search-bot/global/networks/default" \
--subnetwork "projects/wp-search-bot/regions/us-east1/subnetworks/default" \
--additional-zones "us-east1-b","us-east1-c","us-east1-d" \
--addons HorizontalPodAutoscaling,HttpLoadBalancing \
--no-enable-autoupgrade --enable-autorepair

Below, we see the three n1-standard-1 instance type worker nodes, one in each of three different specific geographical locations, referred to as zones. The three zones are in the us-east1 region. Multiple instances spread across multiple zones provide single-region high-availability for our Spring Boot service. With GKE, the Master Node is fully managed by Google.

wp-search-015

Building Service Image

In order to deploy our Spring Boot service, we must first build a Docker Image and make that image available to our Kubernetes cluster. For lowest latency, I’ve chosen to build and publish the image to Google Container Registry, in addition to Docker Hub. The Spring Boot service’s Docker image is built on the latest Debian-based OpenJDK 10 Slim base image, available on Docker Hub. The Spring Boot JAR file is copied into the image (gist).

FROM openjdk:10.0.2-13-jdk-slim
LABEL maintainer="Gary A. Stafford <garystafford@rochester.rr.com>"
ENV REFRESHED_AT 2018-09-08
EXPOSE 8080
WORKDIR /tmp
COPY /build/libs/*.jar app.jar
CMD ["java", "-jar", "-Djava.security.egd=file:/dev/./urandom", "-Dspring.profiles.active=gcp", "app.jar"]
view raw Dockerfile hosted with ❤ by GitHub

To automate the build and publish processes with tools such as Jenkins or Travis CI, we will use a simple shell script. The script builds the Spring Boot service using Gradle, then builds the Docker Image containing the Spring Boot JAR file, tags and publishes the Docker image to the image repository, and finally, redeploys the Spring Boot service container to GKE using kubectl (gist).

#!/usr/bin/env sh
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
IMAGE_REPOSITORY=<your_image_repo>
IMAGE_NAME=<your_image_name>
GCP_PROJECT=<your_project>
TAG=<your_image_tag>
# Build Spring Boot app
./gradlew clean build
# Build Docker file
docker build -f Docker/Dockerfile --no-cache -t ${IMAGE_REPOSITORY}/${IMAGE_NAME}:${TAG} .
# Push image to Docker Hub
docker push ${IMAGE_REPOSITORY}/${IMAGE_NAME}:${TAG}
# Push image to GCP Container Registry (GCR)
docker tag ${IMAGE_REPOSITORY}/${IMAGE_NAME}:${TAG} gcr.io/${GCP_PROJECT}/${IMAGE_NAME}:${TAG}
docker push gcr.io/${GCP_PROJECT}/${IMAGE_NAME}:${TAG}
# Re-deploy Workload (containerized app) to GKE
kubectl replace --force -f gke/${IMAGE_NAME}.yaml
view raw build-push-to-gcr.sh hosted with ❤ by GitHub

Below we see the latest version of our Spring Boot Docker image published to the Google Cloud Registry.

wp-search-016

Deploying the Service

To deploy the Spring Boot service’s container to GKE, we will use a Kubernetes Deployment Controller. The Deployment Controller manages the Pods and ReplicaSets. As a deployment alternative, you could choose to use CoreOS’ Operator Framework to create an Operator or use Helm to create a Helm Chart. Along with the Deployment Controller, there is a ConfigMap and a Horizontal Pod Autoscaler. The ConfigMap contains environment variables that will be available to the Spring Boot service instances running in the Kubernetes Pods. Variables include the host and port of the Elasticsearch cluster on GCP and the name of the Elasticsearch index created by WordPress. These values will override any configuration values set in the service’s application.yml Java properties file.

The Deployment Controller creates a ReplicaSet with three Pods, running the Spring Boot service, one on each worker node (gist).

---
apiVersion: "v1"
kind: "ConfigMap"
metadata:
name: "wp-es-demo-config"
namespace: "dev"
labels:
app: "wp-es-demo"
data:
cluster_nodes: "<your_elasticsearch_instance_tcp_host_and_port>"
cluser_name: "elasticsearch"
---
apiVersion: "extensions/v1beta1"
kind: "Deployment"
metadata:
name: "wp-es-demo"
namespace: "dev"
labels:
app: "wp-es-demo"
spec:
replicas: 3
selector:
matchLabels:
app: "wp-es-demo"
template:
metadata:
labels:
app: "wp-es-demo"
spec:
containers:
- name: "wp-es-demo"
image: "gcr.io/wp-search-bot/wp-es-demo"
imagePullPolicy: Always
env:
- name: "SPRING_DATA_ELASTICSEARCH_CLUSTER-NODES"
valueFrom:
configMapKeyRef:
key: "cluster_nodes"
name: "wp-es-demo-config"
- name: "SPRING_DATA_ELASTICSEARCH_CLUSTER-NAME"
valueFrom:
configMapKeyRef:
key: "cluser_name"
name: "wp-es-demo-config"
---
apiVersion: "autoscaling/v1"
kind: "HorizontalPodAutoscaler"
metadata:
name: "wp-es-demo-hpa"
namespace: "dev"
labels:
app: "wp-es-demo"
spec:
scaleTargetRef:
kind: "Deployment"
name: "wp-es-demo"
apiVersion: "apps/v1beta1"
minReplicas: 1
maxReplicas: 3
targetCPUUtilizationPercentage: 80
view raw wp-es-demo.yaml hosted with ❤ by GitHub

To properly load-balance the three Spring Boot service Pods, we will also deploy a Kubernetes Service of the Kubernetes ServiceType, LoadBalancer. According to Kubernetes, a Kubernetes Service is an abstraction which defines a logical set of Pods and a policy by which to access them (gist).

---
apiVersion: "v1"
kind: "Service"
metadata:
name: "wp-es-demo-service"
namespace: "dev"
labels:
app: "wp-es-demo"
spec:
ports:
- protocol: "TCP"
port: 80
targetPort: 8080
selector:
app: "wp-es-demo"
type: "LoadBalancer"
loadBalancerIP: ""

Below, we see three instances of the Spring Boot service deployed to the GKE cluster on GCP. Each Pod, containing an instance of the Spring Boot service, is in a load-balanced pool, behind our service load balancer, and exposed on port 80.

wp-search-014

Testing the API

We can test our API and ensure it is talking to Elasticsearch, and returning expected results using the Swagger UI, shown previously, or tools like Postman, shown below.

wp-search-018.png

Communication Between GKE and Elasticsearch

Similar to port 9200, which needed to be opened for indexing content over HTTP, we also need to open firewall port 9300 between the Spring Boot service on GKE and Elasticsearch. According to Elastic, Elasticsearch Java clients talk to the Elasticsearch cluster over port 9300, using the native Elasticsearch transport protocol (TCP).

Google Search Assistant Diagram WordPress Index

Again, locking this port down to the GKE cluster as the source is critical for security (gist).

SOURCE_IP=<gke_cluster_public_ip_address>
PORT=9300
gcloud compute \
--project=wp-search-bot \
firewall-rules create elk-1-tcp-${PORT} \
--description=elk-1-tcp-${PORT} \
--direction=INGRESS \
--priority=1000 \
--network=default \
--action=ALLOW \
--rules=tcp:${PORT} \
--source-ranges=${SOURCE_IP} \
--target-tags=elk-1-tcp-${PORT}

Part Two

In part one we have examined the creation of the Elastic Stack, the provisioning of the GKE cluster, and the development and deployment of the Spring Boot service to Kubernetes. In part two of this post, we will tie everything together by creating and integrating our Action for Google Assistant:

  • Create the new Actions project using the Actions on Google console;
  • Develop the Action’s Intents using the Dialogflow console;
  • Develop, deploy, and test the Cloud Function to GCP;

Google Search Assistant Diagram part 2b.png

Related Posts

If you’re interested in comparing the development of an Action for Google Assistant with that of Amazon’s Alexa and Microsoft’s LUIS-enabled chatbots, in addition to this post, I would recommend the previous three posts in this conversation interface series:

All three article’s demonstrations leverage their respective Cloud platform’s machine learning-based Natural language understanding (NLU) services. All three take advantage of their respective Cloud platform’s NoSQL database and object storage services. Lastly, all three of the article’s demonstrations are written in a common language, Node.js.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, or Google.

, , , , , , , , , , , , ,

3 Comments

Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 2

Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we’ve been exploring one possible solution to this challenge, using Apache Kafka and the model of eventual consistency. In Part One, we examined the online storefront domain, the storefront’s microservices, and the system’s state change event message flows.

Part Two

In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.

Kafka-Eventual-Cons-Swarm

Source code for deploying the Dockerized components of the online storefront, shown in this post, is available on GitHub. All Docker Images are available on Docker Hub. I have chosen the wurstmeister/kafka-docker version of Kafka, available on Docker Hub; it has 580+ stars and 10M+ pulls on Docker Hub. This version of Kafka works well, as long as you run it within a Docker Swarm, locally.

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Deployment Options

For simplicity, I’ve used Docker’s native Docker Swarm Mode to support the deployed online storefront. Docker requires minimal configuration as opposed to other CaaS platforms. Usually, I would recommend Minikube for local development if the final destination of the storefront were Kubernetes in Production (AKS, EKS, or GKE). Alternatively, if the final destination of the storefront were Red Hat OpenShift in Production, I would recommend Minishift for local development.

Docker Deployment

We will break up our deployment into two parts. First, we will deploy everything, except our services. We will allow Kafka, MongoDB, Eureka, and the other components to startup up fully. Afterward, we will deploy the three online storefront services. The storefront-kafka-docker project on Github contains two Docker Compose files, which are divided between the two tasks.

The middleware Docker Compose file (gist).

version: '3.2'
services:
zuul:
image: garystafford/storefront-zuul:latest
expose:
- "8080"
ports:
- "8080:8080/tcp"
depends_on:
- kafka
- mongo
- eureka
hostname: zuul
environment:
# LOGGING_LEVEL_ROOT: DEBUG
RIBBON_READTIMEOUT: 3000
RIBBON_SOCKETTIMEOUT: 3000
ZUUL_HOST_CONNECT_TIMEOUT_MILLIS: 3000
ZUUL_HOST_CONNECT_SOCKET_MILLIS: 3000
networks:
- kafka-net
eureka:
image: garystafford/storefront-eureka:latest
expose:
- "8761"
ports:
- "8761:8761/tcp"
hostname: eureka
networks:
- kafka-net
mongo:
image: mongo:latest
command: --smallfiles
# expose:
# - "27017"
ports:
- "27017:27017/tcp"
hostname: mongo
networks:
- kafka-net
mongo_express:
image: mongo-express:latest
expose:
- "8081"
ports:
- "8081:8081/tcp"
hostname: mongo_express
networks:
- kafka-net
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181/tcp"
hostname: zookeeper
networks:
- kafka-net
kafka:
image: wurstmeister/kafka:latest
depends_on:
- zookeeper
# expose:
# - "9092"
ports:
- "9092:9092/tcp"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_CREATE_TOPICS: "accounts.customer.change:1:1,fulfillment.order.change:1:1,orders.order.fulfill:1:1"
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DELETE_TOPIC_ENABLE: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
hostname: kafka
networks:
- kafka-net
kafka_manager:
image: hlebalbau/kafka-manager:latest
ports:
- "9000:9000/tcp"
expose:
- "9000"
depends_on:
- kafka
environment:
ZK_HOSTS: "zookeeper:2181"
APPLICATION_SECRET: "random-secret"
command: -Dpidfile.path=/dev/null
hostname: kafka_manager
networks:
- kafka-net
networks:
kafka-net:
driver: overlay

The services Docker Compose file (gist).

version: '3.2'
services:
accounts:
image: garystafford/storefront-accounts:latest
depends_on:
- kafka
- mongo
hostname: accounts
# environment:
# LOGGING_LEVEL_ROOT: DEBUG
networks:
- kafka-net
orders:
image: garystafford/storefront-orders:latest
depends_on:
- kafka
- mongo
- eureka
hostname: orders
# environment:
# LOGGING_LEVEL_ROOT: DEBUG
networks:
- kafka-net
fulfillment:
image: garystafford/storefront-fulfillment:latest
depends_on:
- kafka
- mongo
- eureka
hostname: fulfillment
# environment:
# LOGGING_LEVEL_ROOT: DEBUG
networks:
- kafka-net
networks:
kafka-net:
driver: overlay

In the storefront-kafka-docker project, there is a shell script, stack_deploy_local.sh. This script will execute both Docker Compose files, in succession, with a pause in between. You may need to adjust the timing for your own system (gist).

#!/bin/sh
# Deploys the storefront Docker stack
# usage: sh ./stack_deploy_local.sh
set -e
docker stack deploy -c docker-compose-middleware.yml storefront
echo "Starting the stack: middleware...pausing for 30 seconds..."
sleep 30
docker stack deploy -c docker-compose-services.yml storefront
echo "Starting the stack: services...pausing for 10 seconds..."
sleep 10
docker stack ls
docker stack services storefront
docker container ls
echo "Script completed..."
echo "Services may take up to several minutes to start, fully..."

Start by running docker swarm init. This command will initialize a Docker Swarm. Next, execute the stack deploy script, using an sh ./stack_deploy_local.sh command. The script will deploy a new Docker Stack, within the Docker Swarm. The Docker Stack will hold all storefront components, deployed as individual Docker containers. The stack is deployed within its own isolated Docker overlay networkkafka-net.

Note we are not using host-based persistent storage for this local development demo. Destroying the Docker stack or the individual Kafka, Zookeeper, or MongoDB Docker containers will result in a loss of data.

stack-deploy

Before completion, the stack deploy script runs docker stack ls command, followed by a docker stack services storefront command. You should see one stack, names storefront, with ten services. You should also see each of the ten services has 1/1 replicas running, indicated everything has started or is starting correctly, without failure. A failure would be reflected here as a service having 0/1 replicas.

docker-stack-ls

Before completion, the stack deploy script also runs docker container ls command. You should observe each of the ten running containers (‘services’ in the Docker stack), along with their instance names and ports.

docker-container-ls

There is also a shell script, stack_delete_local.sh, which will issue a docker stack rm storefront command to destroy the stack when you are done.

Using the names of the storefront’s Docker containers, you can check the start-up logs of any of the components, using the docker logs command.

docker-logs

Testing the Stack

With the storefront stack deployed, we need to confirm that all the components have started correctly and are communicating with each other. To accomplish this, I’ve written a simple Python script, refresh.py. The refresh script has multiple uses. It deletes any existing storefront service MongoDB databases. It also deletes any existing Kafka topics; I call the Kafka Manager’s API to accomplish this. We have no databases or topics since our stack was just created. However, if you are actively developing your data models, you will likely want to purge the databases and topics regularly (gist).

#!/usr/bin/env python3
# Delete (3) MongoDB databases, (3) Kafka topics,
# create sample data by hitting Zuul API Gateway endpoints,
# and return MongoDB documents as verification.
# usage: python3 ./refresh.py
from pprint import pprint
from pymongo import MongoClient
import requests
import time
client = MongoClient('mongodb://localhost:27017/')
def main():
delete_databases()
delete_topics()
create_sample_data()
get_mongo_doc('accounts', 'customer.accounts')
get_mongo_doc('orders', 'customer.orders')
get_mongo_doc('fulfillment', 'fulfillment.requests')
def delete_databases():
dbs = ['accounts', 'orders', 'fulfillment']
for db in dbs:
client.drop_database(db)
print('MongoDB dropped: ' + db)
dbs = client.database_names()
print('Reamining databases:')
print(dbs)
print('\n')
def delete_topics():
# call Kafka Manager API
topics = ['accounts.customer.change',
'orders.order.fulfill',
'fulfillment.order.change']
for topic in topics:
kafka_manager_url = 'http://localhost:9000/clusters/dev/topics/delete?t=&#39; + topic
r = requests.post(kafka_manager_url, data={'topic': topic})
time.sleep(3)
print('Kafka topic deleted: ' + topic)
print('\n')
def create_sample_data():
sample_urls = [
'http://localhost:8080/accounts/customers/sample&#39;,
'http://localhost:8080/orders/customers/sample/orders&#39;,
'http://localhost:8080/orders/customers/sample/fulfill&#39;,
'http://localhost:8080/fulfillment/fulfillments/sample/process&#39;,
'http://localhost:8080/fulfillment/fulfillments/sample/ship&#39;,
'http://localhost:8080/fulfillment/fulfillments/sample/in-transit&#39;,
'http://localhost:8080/fulfillment/fulfillments/sample/receive&#39;]
for sample_url in sample_urls:
r = requests.get(sample_url)
print(r.text)
time.sleep(5)
print('\n')
def get_mongo_doc(db_name, collection_name):
db = client[db_name]
collection = db[collection_name]
pprint(collection.find_one())
print('\n')
if __name__ == "__main__":
main()
view raw refresh.py hosted with ❤ by GitHub

Next, the refresh script calls a series of RESTful HTTP endpoints, in a specific order, to create sample data. Our three storefront services each expose different endpoints. The different /sample endpoints create sample customers, orders, order fulfillment requests, and shipping notifications. The create sample data endpoints include, in order:

  1. Sample Customer: /accounts/customers/sample
  2. Sample Orders: /orders/customers/sample/orders
  3. Sample Fulfillment Requests: /orders/customers/sample/fulfill
  4. Sample Processed Order Events: /fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Events: /fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Events: /fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Events: /fulfillment/fulfillment/sample/receive

You could create data on your own, by POSTing to the exposed CRUD endpoints on each service. However, given the complex data objects required in the request payloads, it is too time-consuming for this demo.

To execute the script, use a python3 ./refresh.py command. I am using Python 3 in the demo, but the script should also work with Python 2.x if you change shebang.

refresh-script

If everything was successful, the script returns one document from each of the three storefront service’s MongoDB database collections. A result of ‘None’ for any of the MongoDB documents usually indicates one of the earlier commands failed. Given an abnormally high response latency, due to the load of the ten running containers on my laptop, I had to increase the Zuul/Ribbon timeouts.

Observing the System

We should now have the online storefront Docker stack running, three MongoDB databases created and populated with sample documents (data), and three Kafka topics, which have messages in them. Based on the fact we saw database documents printed out with our refresh script, we know the topics were used to pass data between the message producing and message consuming services.

In most enterprise environments, a developer may not the access, nor the operational knowledge to interact with Kafka or MongoDB from within a container, on the command line. So how else can we interact with the system?

Kafka Manager

Kafka Manager gives us the ability to interact with Kafka via a convenient browser-based user interface. For this demo, the Kafka Manager UI is available on default port 9000.

kafka_manager_00

To make Kafka Manager useful, define the Kafka cluster. The Cluster Name is up to you. The Cluster Zookeeper Host should be zookeeper:2181, for our demo.

kafka_manager_01

Kafka Manager gives us useful insights into many aspects of our simple, single-broker cluster. You should observe three topics, created during the deployment of Kafka.

kafka_manager_02

Kafka Manager is an appealing alternative, as opposed to connecting with the Kafka container, with a docker exec command, to interact with Kafka. A typical use case might be deleting a topic or adding partitions to a topic. We can also see which Consumers are consuming which topics, from within Kafka Manager.

kafka_manager_03

Mongo Express

Similar to Kafka Manager, Mongo Express gives us the ability to interact with Kafka via a user interface. For this demo, the Mongo Express browser-based user interface is available on default port 8081. The initial view displays each of the existing databases. Note our three service’s databases, including accounts, orders, and fulfillment.

mongo-express-01

Drilling into an individual database, we can view each of the database’s collections. Digging in further, we can interact with individual database collection documents.

mongo-express-02

We may even edit and save the documents.

mongo-express-03

SpringFox and Swagger

Each of the storefront services also implements SpringFox, the automated JSON API documentation for API’s built with Spring. With SpringFox, each service exposes a rich Swagger UI. The Swagger UI allows us to interact with service endpoints.

Since each service exposes its own Swagger interface, we must access them through the Zuul API Gateway on port 8080. In our demo environment, the Swagger browser-based user interface is accessible at /swagger-ui.html. Below, is a fully self-documented Orders service API, as seen through the Swagger UI.

I believe there are still some incompatibilities with the latest SpringFox release and Spring Boot 2, which prevents Swagger from showing the default Spring Data REST CRUD endpoints. Currently, you only see the API  endpoints you explicitly declare in your Controller classes.

swagger-ui-1

The service’s data models (POJOs) are also exposed through the Swagger UI by default. Below we see the Orders service’s models.

swagger-ui-3

The Swagger UI allows you to drill down into the complex structure of the models, such as the CustomerOrder entity, exposing each of the entity’s nested data objects.

swagger-ui-2

Spring Cloud Netflix Eureka

This post does not cover the use of Eureka or Zuul. Eureka gives us further valuable insight into our storefront system. Eureka is our systems service registry and provides load-balancing for our services if we had multiple instances.

For this demo, the Eureka browser-based user interface is available on default port 8761. Within the Eureka user interface, we should observe the three storefront services and Zuul, the API Gateway, registered with Eureka. If we had more than one instance of each service, we would see all of them listed here.

eureka-ui

Although of limited use in a local environment, we can observe some general information about our host.

eureka-ui-02

Interacting with the Services

The three storefront services are fully functional Spring Boot / Spring Data REST / Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. Additionally, each service includes Spring Boot Actuator. Actuator exposes additional operational endpoints, allowing us to observe the running services. Again, this post is not intended to be a demonstration of Spring Boot or Spring Boot Actuator.

Using an application, such as Postman, we can interact with our service’s RESTful HTTP endpoints. Shown below, we are calling the Account service’s customers resource. The Accounts request is proxied through the Zuul API Gateway.

postman

The above Postman Storefront Collection and Postman Environment are both exported and saved with the project.

Some key endpoints to observe the entities that were created using Event-Carried State Transfer are as follows. They assume you are using localhost as a base URL.

References

Links to my GitHub projects for this post

Some additional references I found useful while authoring this post and the online storefront code:

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

 

, , , , , , ,

1 Comment

Managing Applications Across Multiple Kubernetes Environments with Istio: Part 2

In this two-part post, we are exploring the creation of a GKE cluster, replete with the latest version of Istio, often referred to as IoK (Istio on Kubernetes). We will then deploy, perform integration testing, and promote an application across multiple environments within the cluster.

Part Two

In Part One of this post, we created a Kubernetes cluster on the Google Cloud Platform, installed Istio, provisioned a PostgreSQL database, and configured DNS for routing. Under the assumption that v1 of the Election microservice had already been released to Production, we deployed v1 to each of the three namespaces.

In Part Two of this post, we will learn how to utilize the advanced API testing capabilities of Postman and Newman to ensure v2 is ready for UAT and release to Production. We will deploy and perform integration testing of a new v2 of the Election microservice, locally on Kubernetes Minikube. Once confident v2 is functioning as intended, we will promote and test v2 across the dev, test, and uat namespaces.

Source Code

As a reminder, all source code for this post can be found on GitHub. The project’s README file contains a list of the Election microservice’s endpoints. To get started quickly, use one of the two following options (gist).

# clone the official v3.0.0 release for this post
git clone --depth 1 --branch v3.0.0 \
https://github.com/garystafford/spring-postgresql-demo.git \
&& cd spring-postgresql-demo \
&& git checkout -b v3.0.0
# clone the latest version of code (newer than article)
git clone --depth 1 --branch master \
https://github.com/garystafford/spring-postgresql-demo.git \
&& cd spring-postgresql-demo

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

This project includes a kubernetes sub-directory, containing all the Kubernetes resource files and scripts necessary to recreate the example shown in the post.

Testing Locally with Minikube

Deploying to GKE, no matter how automated, takes time and resources, whether those resources are team members or just compute and system resources. Before deploying v2 of the Election service to the non-prod GKE cluster, we should ensure that it has been thoroughly tested locally. Local testing should include the following test criteria:

  1. Source code builds successfully
  2. All unit-tests pass
  3. A new Docker Image can be created from the build artifact
  4. The Service can be deployed to Kubernetes (Minikube)
  5. The deployed instance can connect to the database and execute the Liquibase changesets
  6. The deployed instance passes a minimal set of integration tests

Minikube gives us the ability to quickly iterate and test an application, as well as the Kubernetes and Istio resources required for its operation, before promoting to GKE. These resources include Kubernetes Namespaces, Secrets, Deployments, Services, Route Rules, and Istio Ingresses. Since Minikube is just that, a miniature version of our GKE cluster, we should be able to have a nearly one-to-one parity between the Kubernetes resources we apply locally and those applied to GKE. This post assumes you have the latest version of Minikube installed, and are familiar with its operation.

This project includes a minikube sub-directory, containing all the Kubernetes resource files and scripts necessary to recreate the Minikube deployment example shown in this post. The three included scripts are designed to be easily adapted to a CI/CD DevOps workflow. You may need to modify the scripts to match your environment’s configuration. Note this Minikube-deployed version of the Election service relies on the external Amazon RDS database instance.

Local Database Version

To eliminate the AWS costs, I have included a second, alternate version of the Minikube Kubernetes resource files, minikube_db_local This version deploys a single containerized PostgreSQL database instance to Minikube, as opposed to relying on the external Amazon RDS instance. Be aware, the database does not have persistent storage or an Istio sidecar proxy.

istio_100.png

Minikube Cluster

If you do not have a running Minikube cluster, create one with the minikube start command.

istio_081

Minikube allows you to use normal kubectl CLI commands to interact with the Minikube cluster. Using the kubectl get nodes command, we should see a single Minikube node running the latest Kubernetes v1.10.0.

istio_082

Istio on Minikube

Next, install Istio following Istio’s online installation instructions. A basic Istio installation on Minikube, without the additional add-ons, should only require a single Istio install script.

istio_083

If successful, you should observe a new istio-system namespace, containing the four main Istio components: istio-ca, istio-ingress, istio-mixer, and istio-pilot.

istio_084

Deploy v2 to Minikube

Next, create a Minikube Development environment, consisting of a dev Namespace, Istio Ingress, and Secret, using the part1-create-environment.sh script. Next, deploy v2 of the Election service to thedev Namespace, along with an associated Route Rule, using the part2-deploy-v2.sh script. One v2 instance should be sufficient to satisfy the testing requirements.

istio_085

Access to v2 of the Election service on Minikube is a bit different than with GKE. When routing external HTTP requests, there is no load balancer, no external public IP address, and no public DNS or subdomains. To access the single instance of v2 running on Minikube, we use the local IP address of the Minikube cluster, obtained with the minikube ip command. The access port required is the Node Port (nodePort) of the istio-ingress Service. The command is shown below (gist) and included in the part3-smoke-test.sh script.

export GATEWAY_URL="$(minikube ip):"\
"$(kubectl get svc istio-ingress -n istio-system -o 'jsonpath={.spec.ports[0].nodePort}')"
echo $GATEWAY_URL
curl $GATEWAY_URL/v2/actuator/health && echo

The second part of our HTTP request routing is the same as with GKE, relying on an Istio Route Rules. The /v2/ sub-collection resource in the HTTP request URL is rewritten and routed to the v2 election Pod by the Route Rule. To confirm v2 of the Election service is running and addressable, curl the /v2/actuator/health endpoint. Spring Actuator’s /health endpoint is frequently used at the end of a CI/CD server’s deployment pipeline to confirm success. The Spring Boot application can take a few minutes to fully start up and be responsive to requests, depending on the speed of your local machine.

istio_093.png

Using the Kubernetes Dashboard, we should see our deployment of the single Election service Pod is running successfully in Minikube’s dev namespace.

istio_087

Once deployed, we run a battery of integration tests to confirm that the new v2 functionality is working as intended before deploying to GKE. In the next section of this post, we will explore the process creating and managing Postman Collections and Postman Environments, and how to automate those Collections of tests with Newman and Jenkins.

istio_088

Integration Testing

The typical reason an application is deployed to lower environments, prior to Production, is to perform application testing. Although definitions vary across organizations, testing commonly includes some or all of the following types: Integration Testing, Functional Testing, System Testing, Stress or Load Testing, Performance Testing, Security Testing, Usability Testing, Acceptance Testing, Regression Testing, Alpha and Beta Testing, and End-to-End Testing. Test teams may also refer to other testing forms, such as Whitebox (Glassbox), Blackbox Testing, Smoke, Validation, or Sanity Testing, and Happy Path Testing.

The site, softwaretestinghelp.com, defines integration testing as, ‘testing of all integrated modules to verify the combined functionality after integration is termed so. Modules are typically code modules, individual applications, client and server applications on a network, etc. This type of testing is especially relevant to client/server and distributed systems.

In this post, we are concerned that our integrated modules are functioning cohesively, primarily the Election service, Amazon RDS database, DNS, Istio Ingress, Route Rules, and the Istio sidecar Proxy. Unlike Unit Testing and Static Code Analysis (SCA), which is done pre-deployment, integration testing requires an application to be deployed and running in an environment.

Postman

I have chosen Postman, along with Newman, to execute a Collection of integration tests before promoting to the next environment. The integration tests confirm the deployed application’s name and version. The integration tests then perform a series of HTTP GET, POST, PUT, PATCH, and DELETE actions against the service’s resources. The integration tests verify a successful HTTP response code is returned, based on the type of request made.

istio_055

Postman tests are written in JavaScript, similar to other popular, modern testing frameworks. Postman offers advanced features such as test-chaining. Tests can be chained together through the use of environment variables to store response values and pass them onto to other tests. Values shared between tests are also stored in the Postman Environments. Below, we store the ID of the new candidate, the result of an HTTP POST to the /candidates endpoint. We then use the stored candidate ID in proceeding HTTP GET, PUT, and PATCH test requests to the same /candidates endpoint.

istio_056

Environment-specific variables, such as the resource host, port, and environment sub-collection resource, are abstracted and stored as key/value pairs within Postman Environments, and called through variables in the request URL and within the tests. Thus, the same Postman Collection of tests may be run against multiple environments using different Postman Environments.

istio_057

Postman Runner allows us to run multiple iterations of our Collection. We also have the option to build in delays between tests. Lastly, Postman Runner can load external JSON and CSV formatted test data, which is beyond the scope of this post.

istio_058

Postman contains a simple Run Summary UI for viewing test results.

istio_060

Test Automation

To support running tests from the command line, Postman provides Newman. According to Postman, Newman is a command-line collection runner for Postman. Newman offers the same functionality as Postman’s Collection Runner, all part of the newman CLI. Newman is Node.js module, installed globally as an npm package, npm install newman --global.

Typically, Development and Testing teams compose Postman Collections and define Postman Environments, locally. Teams run their tests locally in Postman, during their development cycle. Then, those same Postman Collections are executed from the command line, or more commonly as part of a CI/CD pipeline, such as with Jenkins.

Below, the same Collection of integration tests ran in the Postman Runner UI, are run from the command line, using Newman.

istio_061

Jenkins

Without a doubt, Jenkins is the leading open-source CI/CD automation server. The building, testing, publishing, and deployment of microservices to Kubernetes is relatively easy with Jenkins. Generally, you would build,