Archive for category Big Data

Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker

Introduction

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the marketing hype, these technologies are having a significant influence on all aspects of our modern lives. Due to their popularity and potential benefits, commercial enterprises, academic institutions, and the public sector are rushing to develop hardware and software solutions to lower the barriers to entry and increase the velocity of ML and Data Scientists and Engineers.

Machine Learning and Data Science Search Results_ 5-Year Trend r2
(courtesy Google Trends and Plotly)

Many open-source software projects are also lowering the barriers to entry into these technologies. An excellent example of one such open-source project working on this challenge is Project Jupyter. Similar to Apache Zeppelin and the newly open-sourced Netflix’s Polynote, Jupyter Notebooks enables data-driven, interactive, and collaborative data analytics.

This post will demonstrate the creation of a containerized data analytics environment using Jupyter Docker Stacks. The particular environment will be suited for learning and developing applications for Apache Spark using the Python, Scala, and R programming languages. We will focus on Python and Spark, using PySpark.

Featured Technologies

pyspark_article_00b_feature

The following technologies are featured prominently in this post.

Jupyter Notebooks

According to Project Jupyter, the Jupyter Notebook, formerly known as the IPython Notebook, is an open-source web application that allows users to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleansing and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The word, Jupyter, is a loose acronym for Julia, Python, and R, but today, the Jupyter supports many programming languages.

Interest in Jupyter Notebooks has grown dramatically over the last 3–5 years, fueled in part by the major Cloud providers, AWS, Google Cloud, and Azure. Amazon Sagemaker, Amazon EMR (Elastic MapReduce), Google Cloud Dataproc, Google Colab (Collaboratory), and Microsoft Azure Notebooks all have direct integrations with Jupyter notebooks for big data analytics and machine learning.

Jupyter Search Results_ 5-Year Trend
(courtesy Google Trends and Plotly)

Jupyter Docker Stacks

To enable quick and easy access to Jupyter Notebooks, Project Jupyter has created Jupyter Docker Stacks. The stacks are ready-to-run Docker images containing Jupyter applications, along with accompanying technologies. Currently, the Jupyter Docker Stacks focus on a variety of specializations, including the r-notebook, scipy-notebook, tensorflow-notebook, datascience-notebook, pyspark-notebook, and the subject of this post, the all-spark-notebook. The stacks include a wide variety of well-known packages to extend their functionality, such as scikit-learn, pandas, MatplotlibBokeh, NumPy, and Facets.

Apache Spark

According to Apache, Spark is a unified analytics engine for large-scale data processing. Starting as a research project at the UC Berkeley AMPLab in 2009, Spark was open-sourced in early 2010 and moved to the Apache Software Foundation in 2013. Reviewing the postings on any major career site will confirm that Spark is widely used by well-known modern enterprises, such as Netflix, Adobe, Capital One, Lockheed Martin, JetBlue Airways, Visa, and Databricks. At the time of this post, LinkedIn, alone, had approximately 3,500 listings for jobs that reference the use of Apache Spark, just in the United States.

With speeds up to 100 times faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine. Spark’s polyglot programming model allows users to write applications quickly in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). You can run Spark using its standalone cluster mode, Apache Hadoop YARNMesos, or Kubernetes.

PySpark

The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API and uses Py4J. According to Apache, Py4J, a bridge between Python and Java, enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM. Data is processed in Python and cached and shuffled in the JVM.

Docker

According to Docker, their technology gives developers and IT the freedom to build, manage, and secure business-critical applications without the fear of technology or infrastructure lock-in. For this post, I am using the current stable version of Docker Desktop Community version for macOS.

screen_shot_2019-12-04_at_10_42_45_pm

Docker Swarm

Current versions of Docker include both a Kubernetes and Swarm orchestrator for deploying and managing containers. We will choose Swarm for this demonstration. According to Docker, the cluster management and orchestration features embedded in the Docker Engine are built using swarmkit. Swarmkit is a separate project which implements Docker’s orchestration layer and is used directly within Docker.

PostgreSQL

PostgreSQL is a powerful, open-source, object-relational database system. According to their website, PostgreSQL comes with many features aimed to help developers build applications, administrators to protect data integrity and build fault-tolerant environments, and help manage data no matter how big or small the dataset.

Demonstration

In this demonstration, we will explore the capabilities of the Spark Jupyter Docker Stack to provide an effective data analytics development environment. We will explore a few everyday uses, including executing Python scripts, submitting PySpark jobs, and working with Jupyter Notebooks, and reading and writing data to and from different file formats and a database. We will be using the latest jupyter/all-spark-notebook Docker Image. This image includes Python, R, and Scala support for Apache Spark, using Apache Toree.

Architecture

As shown below, we will deploy a Docker stack to a single-node Docker swarm. The stack consists of a Jupyter All-Spark-Notebook, PostgreSQL (Alpine Linux version 12), and Adminer container. The Docker stack will have two local directories bind-mounted into the containers. Files from our GitHub project will be shared with the Jupyter application container through a bind-mounted directory. Our PostgreSQL data will also be persisted through a bind-mounted directory. This allows us to persist data external to the ephemeral containers. If the containers are restarted or recreated, the data is preserved locally.

JupyterDiagram

Source Code

All source code for this post can be found on GitHub. Use the following command to clone the project. Note this post uses the v2 branch.

git clone \
  --branch v2 --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/pyspark-setup-demo.git

Source code samples are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers.

Deploy Docker Stack

To start, create the $HOME/data/postgres directory to store PostgreSQL data files.

mkdir -p ~/data/postgres

This directory will be bind-mounted into the PostgreSQL container on line 41 of the stack.yml file, $HOME/data/postgres:/var/lib/postgresql/data. The HOME environment variable assumes you are working on Linux or macOS and is equivalent to HOMEPATH on Windows.

The Jupyter container’s working directory is set on line 15 of the stack.yml file, working_dir: /home/$USER/work. The local bind-mounted working directory is $PWD/work. This path is bind-mounted to the working directory in the Jupyter container, on line 29 of the Docker stack file, $PWD/work:/home/$USER/work. The PWD environment variable assumes you are working on Linux or macOS (CD on Windows).

By default, the user within the Jupyter container is jovyan. We will override that user with our own local host’s user account, as shown on line 21 of the Docker stack file, NB_USER: $USER. We will use the host’s USER environment variable value (equivalent to USERNAME on Windows). There are additional options for configuring the Jupyter container. Several of those options are used on lines 17–22 of the Docker stack file (gist).

Assuming you have a recent version of Docker installed on your local development machine and running in swarm mode, standing up the stack is as easy as running the following docker command from the root directory of the project.

docker stack deploy -c stack.yml jupyter

The Docker stack consists of a new overlay network, jupyter_demo-net, and three containers. To confirm the stack deployed successfully, run the following docker command.

docker stack ps jupyter --no-trunc

screen-shot-2019-12-04-at-10_34_40-am

The jupyter/all-spark-notebook Docker image is large, approximately 5 GB. Depending on your Internet connection, if this is the first time you have pulled this image, the stack may take several minutes to enter a running state. Although not required, I usually pull Docker images in advance.

docker pull jupyter/all-spark-notebook:latest
docker pull postgres:12-alpine
docker pull adminer:latest

To access the Jupyter Notebook application, you need to obtain the Jupyter URL and access token. The Jupyter URL and the access token are output to the Jupyter container log, which can be accessed with the following command.

docker logs $(docker ps | grep jupyter_spark | awk '{print $NF}')

You should observe log output similar to the following. Retrieve the complete URL, for example, http://127.0.0.1:8888/?token=f78cbe..., to access the Jupyter web-based user interface.

screen-shot-2019-12-04-at-10_34_52-am

From the Jupyter dashboard landing page, you should see all the files in the project’s work/ directory. Note the types of files you can create from the dashboard, including Python 3, R, and Scala (using Apache Toree or spylon-kernal) notebooks, and text. You can also open a Jupyter terminal or create a new Folder from the drop-down menu. At the time of this post, the latest jupyter/all-spark-notebook Docker Image runs Spark 2.4.4, Scala 2.11.12, Python 3.7.3, and OpenJDK 64-Bit Server VM, Java 1.8.0 Update 222.

screen_shot_2019-12-01_at_4_40_12_pm

Bootstrap Environment

Included in the project is a bootstrap script, bootstrap_jupyter.sh. The script will install the required Python packages using pip, the Python package installer, and a requirement.txt file. The bootstrap script also installs the latest PostgreSQL driver JAR, configures Apache Log4j to reduce log verbosity when submitting Spark jobs, and installs htop. Although these tasks could also be done from a Jupyter terminal or from within a Jupyter notebook, using a bootstrap script ensures you will have a consistent work environment every time you spin up the Jupyter Docker stack. Add or remove items from the bootstrap script as necessary.

That’s it, our new Jupyter environment is ready to start exploring.

Running Python Scripts

One of the simplest activities we could perform in our new Jupyter environment is running Python scripts. Instead of worrying about installing and maintaining the correct versions of Python and multiple Python packages on your own development machine, we can run Python scripts from within the Jupyter container. At the time of this post update, the latest jupyter/all-spark-notebook Docker image runs Python 3.7.3 and Conda 4.7.12. Let’s start with a simple Python script, 01_simple_script.py.

From a Jupyter terminal window, use the following command to run the script.

python3 01_simple_script.py

You should observe the following output.

screen_shot_2019-12-01_at_4_46_38_pm

Kaggle Datasets

To explore more features of the Jupyter and PySpark, we will use a publicly available dataset from Kaggle. Kaggle is an excellent open-source resource for datasets used for big-data and ML projects. Their tagline is ‘Kaggle is the place to do data science projects’. For this demonstration, we will use the Transactions from a bakery dataset from Kaggle. The dataset is available as a single CSV-format file. A copy is also included in the project.

pyspark_article_03_kaggle

The ‘Transactions from a bakery’ dataset contains 21,294 rows with 4 columns of data. Although certainly not big data, the dataset is large enough to test out the Spark Jupyter Docker Stack functionality. The data consists of 9,531 customer transactions for 21,294 bakery items between 2016-10-30 and 2017-04-09 (gist).

Submitting Spark Jobs

We are not limited to Jupyter notebooks to interact with Spark. We can also submit scripts directly to Spark from the Jupyter terminal. This is typically how Spark is used in a Production for performing analysis on large datasets, often on a regular schedule, using tools such as Apache Airflow. With Spark, you are load data from one or more data sources. After performing operations and transformations on the data, the data is persisted to a datastore, such as a file or a database, or conveyed to another system for further processing.

The project includes a simple Python PySpark ETL script, 02_pyspark_job.py. The ETL script loads the original Kaggle Bakery dataset from the CSV file into memory, into a Spark DataFrame. The script then performs a simple Spark SQL query, calculating the total quantity of each type of bakery item sold, sorted in descending order. Finally, the script writes the results of the query to a new CSV file, output/items-sold.csv.

Run the script directly from a Jupyter terminal using the following command.

python3 02_pyspark_job.py

An example of the output of the Spark job is shown below.
screen-shot-2019-12-03-at-9_31_38-am

Typically, you would submit the Spark job using the spark-submit command. Use a Jupyter terminal to run the following command.

$SPARK_HOME/bin/spark-submit 02_pyspark_job.py

Below, we see the output from the spark-submit command. Printing the results in the output is merely for the purposes of the demo. Typically, Spark jobs are submitted non-interactively, and the results are persisted directly to a datastore or conveyed to another system.
screen-shot-2019-12-03-at-9_32_25-am

Using the following commands, we can view the resulting CVS file, created by the spark job.

ls -alh output/items-sold.csv/
head -5 output/items-sold.csv/*.csv

An example of the files created by the spark job is shown below. We should have discovered that coffee is the most commonly sold bakery item with 5,471 sales, followed by bread with 3,325 sales.
screen-shot-2019-12-03-at-9_32_52-am

Interacting with Databases

To demonstrate the flexibility of Jupyter to work with databases, PostgreSQL is part of the Docker Stack. We can read and write data from the Jupyter container to the PostgreSQL instance, running in a separate container. To begin, we will run a SQL script, written in Python, to create our database schema and some test data in a new database table. To do so, we will use the psycopg2, the PostgreSQL database adapter package for the Python, we previously installed into our Jupyter container using the bootstrap script. The below Python script, 03_load_sql.py, will execute a set of SQL statements contained in a SQL file, bakery.sql, against the PostgreSQL container instance.

The SQL file, bakery.sql.

To execute the script, run the following command.

python3 03_load_sql.py

This should result in the following output, if successful.
screen-shot-2019-12-03-at-9_34_26-am

Adminer

To confirm the SQL script’s success, use Adminer. Adminer (formerly phpMinAdmin) is a full-featured database management tool written in PHP. Adminer natively recognizes PostgreSQL, MySQL, SQLite, and MongoDB, among other database engines.

Adminer should be available on localhost port 8080. The password credentials, shown below, are located in the stack.yml file. The server name, postgres, is the name of the PostgreSQL Docker container. This is the domain name the Jupyter container will use to communicate with the PostgreSQL container.
screen_shot_2019-12-01_at_6_09_57_pm

Connecting to the new bakery database with Adminer, we should see the transactions table.
screen_shot_2019-12-01_at_6_10_20_pm

The table should contain 21,293 rows, each with 5 columns of data.

screen_shot_2019-12-01_at_6_11_32_pm

pgAdmin

Another excellent choice for interacting with PostgreSQL, in particular, is pgAdmin 4. It is my favorite tool for the administration of PostgreSQL. Although limited to PostgreSQL, the user interface and administrative capabilities of pgAdmin is superior to Adminer, in my opinion. For brevity, I chose not to include pgAdmin in this post. The Docker stack also contains a pgAdmin container, which has been commented out. To use pgAdmin, just uncomment the container and re-run the Docker stack deploy command. pgAdmin should then be available on localhost port 81. The pgAdmin login credentials are in the Docker stack file.

screen_shot_2019-12-05_at_10_11_15_amscreen_shot_2019-12-05_at_10_11_44_am

Developing Jupyter Notebooks

The real power of the Jupyter Docker Stacks containers is Jupyter Notebooks. According to the Jupyter Project, the notebook extends the console-based approach to interactive computing in a qualitatively new direction, providing a web-based application suitable for capturing the whole computation process, including developing, documenting, and executing code, as well as communicating the results. Notebook documents contain the inputs and outputs of an interactive session as well as additional text that accompanies the code but is not meant for execution.

To explore the capabilities of Jupyter notebooks, the project includes two simple Jupyter notebooks. The first notebooks, 04_notebook.ipynb, demonstrates typical PySpark functions, such as loading data from a CSV file and from the PostgreSQL database, performing basic data analysis with Spark SQL including the use of PySpark user-defined functions (UDF), graphing the data using BokehJS, and finally, saving data back to the database, as well as to the fast and efficient Apache Parquet file format. Below we see several notebook cells demonstrating these features.screen_shot_2019-12-04_at_11_05_00_pmscreen_shot_2019-12-04_at_11_05_07_pmscreen_shot_2019-12-04_at_11_05_22_pmscreen_shot_2019-12-05_at_3_54_11_pmscreen_shot_2019-12-04_at_11_14_22_pm

IDE Integration

Recall, the working directory, containing the GitHub source code for the project, is bind-mounted to the Jupyter container. Therefore, you can also edit any of the files, including notebooks, in your favorite IDE, such as JetBrains PyCharm and Microsoft Visual Studio Code. PyCharm has built-in language support for Jupyter Notebooks, as shown below.
screen_shot_2019-12-01_at_9_21_49_pm

As does Visual Studio Code using the Python extension.

screen_shot_2019-12-08_at_8_02_43_pm.png

Using Additional Packages

As mentioned in the Introduction, the Jupyter Docker Stacks come ready-to-run, with a wide variety of Python packages to extend their functionality. To demonstrate the use of these packages, the project contains a second Jupyter notebook document, 05_notebook.ipynb. This notebook uses SciPy, the well-known Python package for mathematics, science, and engineering, NumPy, the well-known Python package for scientific computing, and the Plotly Python Graphing Library. While NumPy and SciPy are included on the Jupyter Docker Image, the bootstrap script uses pip to install the required Plotly packages. Similar to Bokeh, shown in the previous notebook, we can use these libraries to create richly interactive data visualizations.

Plotly

To use Plotly from within the notebook, you will first need to sign up for a free account and obtain a username and API key. To ensure we do not accidentally save sensitive Plotly credentials in the notebook, we are using python-dotenv. This Python package reads key/value pairs from a .env file, making them available as environment variables to our notebook. Modify and run the following two commands from a Jupyter terminal to create the .env file and set you Plotly username and API key credentials. Note that the .env file is part of the .gitignore file and will not be committed to back to git, potentially compromising the credentials.

echo "PLOTLY_USERNAME=your-username" >> .env
echo "PLOTLY_API_KEY=your-api-key" >> .env

The notebook expects to find the two environment variables, which it uses to authenticate with Plotly.

screen_shot_2019-12-04_at_11_20_06_pm

Shown below, we use Plotly to construct a bar chart of daily bakery items sold. The chart uses SciPy and NumPy to construct a linear fit (regression) and plot a line of best fit for the bakery data and overlaying the vertical bars. The chart also uses SciPy’s Savitzky-Golay Filter to plot the second line, illustrating a smoothing of our bakery data.

screen_shot_2019-12-05_at_11_14_27_am

Plotly also provides Chart Studio Online Chart Maker. Plotly describes Chart Studio as the world’s most modern enterprise data visualization solutions. We can enhance, stylize, and share our data visualizations using the free version of Chart Studio Cloud.

screen_shot_2019-12-05_at_11_15_55_am

Jupyter Notebook Viewer

Notebooks can also be viewed using nbviewer, an open-source project under Project Jupyter. Thanks to Rackspace hosting, the nbviewer instance is a free service.

screen_shot_2019-12-05_at_11_28_01_am.png

Using nbviewer, below, we see the output of a cell within the 04_notebook.ipynb notebook. View this notebook, here, using nbviewer.

screen_shot_2019-12-04_at_11_39_28_pm

Monitoring Spark Jobs

The Jupyter Docker container exposes Spark’s monitoring and instrumentation web user interface. We can review each completed Spark Job in detail.
screen-shot-2019-12-04-at-11_56_19-pm

We can review details of each stage of the Spark job, including a visualization of the DAG (Directed Acyclic Graph), which Spark constructs as part of the job execution plan, using the DAG Scheduler.
screen-shot-2019-12-04-at-11_57_18-pm

We can also review the task composition and timing of each event occurring as part of the stages of the Spark job.
screen-shot-2019-12-04-at-11_57_48-pm

We can also use the Spark interface to review and confirm the runtime environment configuration, including versions of Java, Scala, and Spark, as well as packages available on the Java classpath.
screen-shot-2019-12-04-at-11_58_02-pm

Local Spark Performance

Running Spark on a single node within the Jupyter Docker container on your local development system is not a substitute for a true Spark cluster, Production-grade, multi-node Spark clusters running on bare metal or robust virtualized hardware, and managed with Apache Hadoop YARN, Apache Mesos, or Kubernetes. In my opinion, you should only adjust your Docker resources limits to support an acceptable level of Spark performance for running small exploratory workloads. You will not realistically replace the need to process big data and execute jobs requiring complex calculations on a Production-grade, multi-node Spark cluster.
screen_shot_2019-12-03_at_9_18_36_am

We can use the following docker stats command to examine the container’s CPU and memory metrics.

docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"

Below, we see the stats from the Docker stack’s three containers showing little or no activity.
Screen Shot 2019-12-05 at 12.16.14 AM

Compare those stats with the ones shown below, recorded while a notebook was reading and writing data, and executing Spark SQL queries. The CPU and memory output show spikes, but both appear to be within acceptable ranges.
Screen Shot 2019-12-05 at 12.13.49 AM

Linux Process Monitors

Another option to examine container performance metrics is top, which is pre-installed in our Jupyter container. For example, execute the following top command from a Jupyter terminal, sorting processes by CPU usage.

top -o %CPU

We should observe the individual performance of each process running in the Jupyter container.

screen_shot_2019-12-05_at_12_25_51_pm.png

A step up from top is htop, an interactive process viewer for Unix. It was installed in the container by the bootstrap script. For example, we can execute the htop command from a Jupyter terminal, sorting processes by CPU % usage.

htop --sort-key PERCENT_CPU

With htop, observe the individual CPU activity. Here, the four CPUs at the top left of the htop window are the CPUs assigned to Docker. We get insight into the way Spark is using multiple CPUs, as well as other performance metrics, such as memory and swap.

screen_shot_2019-12-02_at_12_08_18_pm.png

Assuming your development machine is robust, it is easy to allocate and deallocate additional compute resources to Docker if required. Be careful not to allocate excessive resources to Docker, starving your host machine of available compute resources for other applications.
screen_shot_2019-12-03_at_9_18_50_am

Notebook Extensions

There are many ways to extend the Jupyter Docker Stacks. A popular option is jupyter-contrib-nbextensions. According to their website, the jupyter-contrib-nbextensions package contains a collection of community-contributed unofficial extensions that add functionality to the Jupyter notebook. These extensions are mostly written in JavaScript and will be loaded locally in your browser. Installed notebook extensions can be enabled, either by using built-in Jupyter commands, or more conveniently by using the jupyter_nbextensions_configurator server extension.
The project contains an alternate Docker stack file, stack-nbext.yml. The stack uses an alternative Docker image, garystafford/all-spark-notebook-nbext:latest, This Dockerfile, which builds it, uses the jupyter/all-spark-notebook:latest image as a base image. The alternate image adds in the jupyter-contrib-nbextensions and jupyter_nbextensions_configurator packages. Since Jupyter would need to be restarted after nbextensions is deployed, it cannot be done from within a running jupyter/all-spark-notebook container.

FROM        jupyter/all-spark-notebook:latest

USER        root

RUN         pip install jupyter_contrib_nbextensions \
                && jupyter contrib nbextension install --system \
                && pip install jupyter_nbextensions_configurator \
                && jupyter nbextensions_configurator enable --system

USER        $NB_UID

Using this alternate stack, below in our Jupyter container, we see the sizable list of extensions available. Some of my favorite extensions include ‘spellchecker’, ‘Codefolding’, ‘Code pretty’, ‘ExecutionTime’, ‘Table of Contents’, and ‘Toggle all line numbers’.

screen_shot_2019-12-05_at_7_53_17_pm.png

Below, we see five new extension icons that have been added to the menu bar of 04_notebook.ipynb. You can also observe the extensions have been applied to the notebook, including the table of contents, code-folding, execution time, and line numbering. The spellchecking and pretty code extensions were both also applied.

screen_shot_2019-12-05_at_7_47_12_pm

Conclusion

In this brief post, we have seen how easy it is to get started learning and performing data analytics using Jupyter notebooks, Python, Spark, and PySpark, all thanks to the Jupyter Docker Stacks. We could use this same stack to learn and perform machine learning using Scala and R. Extending the stack’s capabilities is as simple as swapping out this Jupyter image for another, with a different set of tools, as well as adding additional containers to the stack, such as MySQL, MongoDB, RabbitMQ, Apache Kafka, and Apache Cassandra.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients.

, , , , , , , ,

Leave a comment

Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 2

Introduction

In Part 1 of this two-part post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on Amazon Elastic MapReduce (EMR). Further, we configured Zeppelin integrations with AWS Glue Data CatalogAmazon Relational Database Service (RDS) for PostgreSQL, and Amazon Simple Cloud Storage Service (S3) Data Lake. We also covered how to obtain the project’s source code from the two GitHub repositories, zeppelin-emr-demo and zeppelin-emr-config. Below is a high-level architectural diagram of the infrastructure we constructed in Part 1 for this demonstration.

EMR-Zeppelin

Part 2

In Part 2 of this post, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using a series of four Zeppelin notebooks. Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.

Notebook 1

The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.

Interpreters

When you open a notebook for the first time, you are given the choice of interpreters to bind and unbind to the notebook. The last interpreter in the list shown below, postgres, is the new PostgreSQL JDBC Zeppelin interpreter we created in Part 1 of this post. We will use this interpreter in Notebook 3.

screen-shot-2019-11-24-at-8_03_50-pm

Application Versions

The first two paragraphs of the notebook are used to confirm the version of Spark, Scala, OpenJDK, and Python we are using. Recall we updated the Spark and Python interpreters to use Python 3.

screen_shot_2019-11-26_at_6_58_33_pm

Helium Visualizations

If you recall from Part 1 of the post, we pre-installed several additional Helium Visualizations, including the Ultimate Pie Chart. Below, we see the use of the Spark SQL (%sql) interpreter to query a Spark DataFrame, return results, and visualize the data using the Ultimate Pie Chart.  In addition to the pie chart, we see the other pre-installed Helium visualizations proceeding the five default visualizations, in the menu bar. With Zeppelin, all we have to do is write Spark SQL queries against the Spark DataFrame created earlier in the notebook, and Zeppelin will handle the visualization. You have some basic controls over charts using the ‘settings’ option.

screen-shot-2019-11-24-at-8_06_56-pm

Building the Data Lake

Notebook 1 demonstrates how to read and write data to S3. We read and write the Bakery dataset to both CSV-format and Apache Parquet-format, using Spark (PySpark). We also write the results of Spark SQL queries, like the one above, in Parquet, to S3.

screen-shot-2019-11-24-at-8_20_18-pm

With Parquet, data may be split into multiple files, as shown in the S3 bucket directory below. Parquet is much faster to read into a Spark DataFrame than CSV. Spark provides support for both reading and writing Parquet files. We will write all of our data to Parquet in S3, making future re-use of the data much more efficient than downloading data from the Internet, like GroupLens or kaggle, or consuming CSV from S3.

screen-shot-2019-11-24-at-8_22_46-pm

Preview S3 Data

In addition to using the Zeppelin notebook, we can preview data right in the S3 bucket web interface using the Amazon S3 Select feature. This query in place feature is helpful to quickly understand the structure and content of new data files with which you want to interact within Zeppelin.

screen-shot-2019-11-24-at-8_23_33-pm

screen-shot-2019-11-24-at-8_23_40-pm

screen_shot_2019-11-28_at_7_41_49_pm.png

Saving Changes to GitHub

In Part 1, we configured Zeppelin to read and write the notebooks from your own copy of the GitHub notebook repository. Using the ‘version control’ menu item, changes made to the notebooks can be committed directly to GitHub.

screen-shot-2019-11-24-at-8_16_36-pm

screen-shot-2019-11-24-at-8_38_19-pm

In GitHub, note the committer is the zeppelin user.

screen_shot_2019-11-26_at_7_48_42_pm.png

Notebook 2

The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.

screen-shot-2019-11-24-at-8_41_31-pm

Multi-Node EMR Cluster

If you recall from Part 1, we waited to create this cluster due to the compute costs of running the cluster’s large EC2 instances. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.

Normalized Instance Hours
Understanding the costs of EMR requires understanding the concept of normalized instance hours. Cluster displayed in the EMR AWS Console contains two columns, ‘Elapsed time’ and ‘Normalized instance hours’. The ‘Elapsed time’ column reflects the actual wall-clock time the cluster was used. The ‘Normalized instance hours’ column indicates the approximate number of compute hours the cluster has used, rounded up to the nearest hour.

screen_shot_2019-11-28_at_6_09_38_pm

Normalized instance hours calculations are based on a normalization factor. The normalization factor ranges from 1 for a small instance, up to 64 for an 8xlarge. Based on the type and quantity of instances in our multi-node cluster, we would use approximately 56 compute hours (aka normalized instance hours) for every one hour of wall-clock time. Note the multi-node cluster used in our demo, highlighted in yellow above. The cluster ran for two hours, which equated to 112 normalized instance hours.

Instance
Type
Normalization
Factor

Quantity
Normalized
Instance Hours
m5.xlarge 8 1 8
m5.2xlarge 16 3 48
total 56

Create the Multi-Node Cluster

Create the multi-node EMR cluster using CloudFormation. Change the following nine variable values, then run the emr cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
EC2_KEY_NAME="your-key-name"
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"
GITHUB_ACCOUNT="your-account-name"
GITHUB_REPO="your-new-project-name"
GITHUB_TOKEN="your-token-value"
MASTER_INSTANCE_TYPE="m5.xlarge" # optional
CORE_INSTANCE_TYPE="m5.2xlarge" # optional
CORE_INSTANCE_COUNT=3 # optional

aws cloudformation create-stack \
    --stack-name zeppelin-emr-prod-stack \
    --template-body file://cloudformation/emr_cluster.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
                 ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \
                 ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \
                 ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \
                 ParameterKey=CoreInstanceType,ParameterValue=${CORE_INSTANCE_TYPE} \
                 ParameterKey=CoreInstanceCount,ParameterValue=${CORE_INSTANCE_COUNT} \
                 ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \
                 ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \
                 ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}

Use the Amazon EMR web interface to confirm the success of the CloudFormation stack. The fully-provisioned cluster should be in the ‘Waiting’ state when ready.

screen_shot_2019-11-26_at_4_58_05_pm

Configuring the EMR Cluster

Refer to Part 1 for the configuration steps necessary to prepare the EMR cluster and Zeppelin before continuing. Repeat all the steps used for the single-node cluster.

Monitoring with Ganglia

In Part 1, we installed Ganglia as part of creating the EMR cluster. Ganglia, according to its website, is a scalable distributed monitoring system for high-performance computing systems such as clusters and grids. Ganglia can be used to evaluate the performance of the single-node and multi-node EMR clusters. With Ganglia, we can easily view cluster and individual instance CPU, memory, and network I/O performance.

screen-shot-2019-11-24-at-8_46_46-pm
Ganglia Example: Cluster CPU

screen-shot-2019-11-24-at-8_48_44-pm
Ganglia Example: Cluster Memory

screen_shot_2019-11-26_at_5_18_51_pm
Ganglia Example: Cluster Network I/O

YARN Resource Manager

The YARN Resource Manager Web UI is also available on our EMR cluster. Using the Resource Manager, we can view the compute resource load on the cluster, as well as the individual EMR Core nodes. Below, we see that the multi-node cluster has 24 vCPUs and 72 GiB of memory available, split evenly across the three Core cluster nodes.

You might recall, the m5.2xlarge EC2 instance type, used for the three Core nodes, each contains 8 vCPUs and 32 GiB of memory. However, by default, although all 8 vCPUs are available for computation per node, only 24 GiB of the node’s 32 GiB of memory are available for computation. EMR ensures a portion of the memory on each node is reserved for other system processes. The maximum available memory is controlled by the YARN memory configuration option, yarn.scheduler.maximum-allocation-mb.

screen_shot_2019-11-26_at_5_15_00_pm

The YARN Resource Manager preview above shows the load on the Code nodes as Notebook 2 is executing the Spark SQL queries on the large MovieLens with 27MM ratings. Note that only 4 of the 24 vCPUs (16.6%) are in use, but that 70.25 of the 72 GiB (97.6%) of available memory is being used. According to Spark, because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth. In this case, memory appears to be the most constrained resource. Using memory-optimized instances, such as r4 or r5 instance types, might be more effective for the core nodes than the m5 instance types.

MovieLens Datasets

By changing one variable in the notebook, we can work with the latest, smaller GroupLens MovieLens dataset containing approximately 100k rows (ml-latest-small) or the larger dataset, containing approximately 27M rows (ml-latest). For this demo, try both datasets on both the single-node and multi-node clusters. Compare the Spark SQL paragraph execution times for each of the four variations, including single-node with the small dataset, single-node with the large dataset, multi-node with the small dataset, and multi-node with the large dataset. Observe how fast the SQL queries are executed on the single-node versus multi-node cluster. Try switching to a different Core node instance type, such as r5.2xlarge. Try creating a cluster with additional Core nodes. How is the compute time effected?

screen_shot_2019-11-26_at_5_02_34_pm

Terminate the multi-node EMR cluster to save yourself the expense before continuing to Notebook 3.

aws cloudformation delete-stack \
    --stack-name=zeppelin-emr-prod-stack

Notebook 3

The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.

screen_shot_2019-11-27_at_11_44_44_pm

Glue Crawlers

Before continuing with Notebook 3, run the two Glue Crawlers using the AWS CLI.

aws glue start-crawler --name bakery-transactions-crawler
aws glue start-crawler --name movie-ratings-crawler

The two Crawlers will create a total of seven tables in the Glue Data Catalog database.

screen_shot_2019-11-27_at_8_50_09_pm

If we examine the Glue Data Catalog database, we should now observe several tables, one for each dataset found in the S3 bucket. The location of each dataset is shown in the ‘Location’ column of the tables view.

screen-shot-2019-11-24-at-9_14_19-pm

From the Zeppelin notebook, we can even use Spark SQL to query the AWS Glue Data Catalog, itself, for its databases and the tables within them.

screen-shot-2019-11-24-at-9_12_52-pm

According to Amazon, the Glue Data Catalog tables and databases are containers for the metadata definitions that define a schema for underlying source data. Using Zeppelin’s SQL interpreter, we can query the Data Catalog database and return the underlying source data. The SQL query example, below, demonstrates how we can perform a join across two tables in the data catalog database, representing two different data sources, and return results.

screen-shot-2019-11-24-at-9_09_26-pm

Notebook 4

The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.

screen-shot-2019-11-27-at-11_26_34-am.png

First, we create a new schema and four related tables for the RDS PostgreSQL movie ratings database, using the Psycopg 2 PostgreSQL adapter for Python and the SQL file we copied to S3 in Part 1.

screen_shot_2019-11-27_at_11_09_42_am

The RDS database’s schema, shown below, approximates the schema of the four CSV files from the GroupLens MovieLens rating dataset we used in Notebook 2.

screen_shot_2019-11-22_at_12_55_25_pm

Since the schema of the PostgreSQL database matches the MovieLens dataset files, we can import the data from the CVS files, downloaded from GroupLens, directly into the RDS database, again using the Psycopg PostgreSQL adapter for Python.

screen_shot_2019-11-27_at_11_09_52_am

According to the Spark documentation, Spark SQL also includes a data source that can read data from other databases using JDBC. Using Spark’s JDBC capability and the PostgreSQL JDBC Driver we installed in Part 1, we can perform Spark SQL queries against the RDS database using PySpark (%spark.pyspark). Below, we see a paragraph example of reading the RDS database’s movies table, using Spark.

screen_shot_2019-11-27_at_11_10_01_am

As a third method of querying the RDS database, we can use the custom Zeppelin PostgreSQL JDBC interpreter (%postgres) we created in Part 1. Although the default driver of the JDBC interpreter is set as PostgreSQL, and the associated JAR is included with Zeppelin, we overrode that older JAR, with the latest PostgreSQL JDBC Driver JAR.

Using the %postgres interpreter, we query the RDS database’s public schema, and return the four database tables we created earlier in the notebook.

screen_shot_2019-11-27_at_11_10_26_am

Again, below, using the %postgres interpreter in the notebook’s paragraph, we query the RDS database and return data, which we then visualize using Zeppelin’s bar chart. Finally, note the use of Zeppelin Dynamic Forms in this example. Dynamic Forms allows Zeppelin to dynamically creates input forms, whose input values are then available to use programmatically. Here, we use two form input values to control the data returned from our query and the resulting visualization.

screen_shot_2019-11-27_at_11_10_54_am

Conclusion

In this two-part post, we learned how effectively Apache Zeppelin integrates with Amazon EMR. We also learned how to extend Zeppelin’s capabilities, using  AWS Glue, Amazon RDS, and Amazon S3 as a Data Lake. Beyond what was covered in this post, there are dozens of more Zeppelin and EMR features, as well as dozens of more AWS services that integrate with Zeppelin and EMR, for you to discover.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

Leave a comment

Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 1 – Setup

Introduction

There is little question big data analyticsdata scienceartificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last 3–5 years. Behind the hype cycles and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives. Due to their popularity, commercial enterprises, academic institutions, and the public sector have all rushed to develop hardware and software solutions to decrease the barrier to entry and increase the velocity of ML and Data Scientists and Engineers.

screen_shot_2019-11-17_at_7_24_10_am1Data Science: 5-Year Search Trend (courtesy Google Trends)

screen_shot_2019-11-17_at_7_24_10_am2Machine Learning: 5-Year Search Trend (courtesy Google Trends)

Technologies

All three major cloud providers, Amazon Web Services (AWS), Microsoft Azure, and Google Cloud, have rapidly maturing big data analytics, data science, and AI and ML services. AWS, for example, introduced Amazon Elastic MapReduce (EMR) in 2009, primarily as an Apache Hadoop-based big data processing service. Since then, according to Amazon, EMR has evolved into a service that uses Apache SparkApache Hadoop, and several other leading open-source frameworks to quickly and cost-effectively process and analyze vast amounts of data. More recently, in late 2017, Amazon released SageMaker, a service that provides the ability to build, train, and deploy machine learning models quickly and securely.

Simultaneously, organizations are building solutions that integrate and enhance these Cloud-based big data analytics, data science, AI, and ML services. One such example is Apache Zeppelin. Similar to the immensely popular Project Jupyter and the newly open-sourced Netflix’s Polynote, Apache Zeppelin is a web-based, polyglot, computational notebook. Zeppelin enables data-driven, interactive data analytics and document collaboration using a number of interpreters such as Scala (with Apache Spark), Python (with Apache Spark), Spark SQL, JDBC, Markdown, Shell and so on. Zeppelin is one of the core applications supported natively by Amazon EMR.

screen_shot_2019-11-15_at_11_59_16_pm

In the following two-part post, we will explore the use of Apache Zeppelin on EMR for data science and data analytics using a series of Zeppelin notebooks. The notebooks feature the use of AWS Glue, the fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load data for analytics. The notebooks also feature the use of Amazon Relational Database Service (RDS) for PostgreSQL and Amazon Simple Cloud Storage Service (S3). Amazon S3 will serve as a Data Lake to store our unstructured data. Given the current choice of Zeppelin’s more than twenty different interpreters, we will use Python3 and Apache Spark, specifically Spark SQL and PySpark, for all notebooks.

zeppelin_header

We will build an economical single-node EMR cluster for data exploration, as well as a larger multi-node EMR cluster for analyzing large data sets. Amazon S3 will be used to store input and output data, while intermediate results are stored in the Hadoop Distributed File System (HDFS) on the EMR cluster. Amazon provides a good overview of EMR architecture. Below is a high-level architectural diagram of the infrastructure we will construct during Part 1 for this demonstration.

EMR-Zeppelin.png

Notebook Features

Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin, including Moonsoo Lee, Zepl CTO and creator for Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.

Notebook 1

The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.

screen_shot_2019-11-27_at_11_13_53_am

Notebook 2

The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.

screen_shot_2019-11-27_at_11_14_01_am

Notebook 3

The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.

screen_shot_2019-11-27_at_11_44_44_pm.png

Notebook 4

The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.

screen-shot-2019-11-27-at-11_26_34-am

Demonstration

In Part 1 of the post, as a DataOps Engineer, we will create and configure the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3-based data lake. In Part 2 of this post, as a Data Scientist, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the Zeppelin notebooks.

Source Code

The demonstration’s source code is contained in two public GitHub repositories. The first repository, zeppelin-emr-demo, includes the four Zeppelin notebooks, organized according to the conventions of Zeppelin’s pluggable notebook storage mechanisms.

.
├── 2ERVVKTCG
│   └── note.json
├── 2ERYY923A
│   └── note.json
├── 2ESH8DGFS
│   └── note.json
├── 2EUZKQXX7
│   └── note.json
├── LICENSE
└── README.md

Zeppelin GitHub Storage

During the demonstration, changes made to your copy of the Zeppelin notebooks running on EMR will be automatically pushed back to GitHub when a commit occurs. To accomplish this, instead of just cloning a local copy of my zeppelin-emr-demo project repository, you will want your own copy, within your personal GitHub account. You could folk my zeppelin-emr-demo GitHub repository or copy a clone into your own GitHub repository.

To make a copy of the project in your own GitHub account, first, create a new empty repository on GitHub, for example, ‘my-zeppelin-emr-demo-copy’. Then, execute the following commands from your terminal, to clone the original project repository to your local environment, and finally, push it to your GitHub account.

# change me
GITHUB_ACCOUNT="your-account-name" # i.e. garystafford
GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo-copy

# shallow clone into new directory
git clone --branch master \
    --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/zeppelin-emr-demo.git \
    ${GITHUB_REPO}

# re-initialize repository
cd ${GITHUB_REPO}
rm -rf .git
git init

# re-commit code
git add -A
git commit -m "Initial commit of my copy of zeppelin-emr-demo"

# push to your repo
git remote add origin \
    https://github.com/$GITHUB_ACCOUNT/$GITHUB_REPO.git
git push -u origin master

GitHub Personal Access Token

To automatically push changes to your GitHub repository when a commit occurs, Zeppelin will need a GitHub personal access token. Create a personal access token with the scope shown below. Be sure to keep the token secret. Make sure you do not accidentally check your token value into your source code on GitHub. To minimize the risk, change or delete the token after completing the demo.

screen_shot_2019-11-18_at_8_44_27_pm

The second repository, zeppelin-emr-config, contains the necessary bootstrap files, CloudFormation templates, and PostgreSQL DDL (Data Definition Language) SQL script.

.
├── LICENSE
├── README.md
├── bootstrap
│   ├── bootstrap.sh
│   ├── emr-config.json
│   ├── helium.json
├── cloudformation
│   ├── crawler.yml
│   ├── emr_single_node.yml
│   ├── emr_cluster.yml
│   └── rds_postgres.yml
└── sql
    └── ratings.sql

Use the following AWS CLI command to clone the GitHub repository to your local environment.

git clone --branch master \
    --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/zeppelin-emr-demo-setup.git

Requirements

To follow along with the demonstration, you will need an AWS Account, an existing Amazon S3 bucket to store EMR configuration and data, and an EC2 key pair. You will also need a current version of the AWS CLI installed in your work environment. Due to the particular EMR features, we will be using, I recommend using the us-east-1 AWS Region to create the demonstration’s resources.

# create secure emr config and data bucket
# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"

aws s3api create-bucket \
    --bucket ${ZEPPELIN_DEMO_BUCKET}
aws s3api put-public-access-block \
    --bucket ${ZEPPELIN_DEMO_BUCKET} \
    --public-access-block-configuration \
    BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

Copy Configuration Files to S3

To start, we need to copy three configuration files, bootstrap.sh, helium.json, and ratings.sql, from the zeppelin-emr-demo-setup project directory to our S3 bucket. Change the ZEPPELIN_DEMO_BUCKET variable value, then run the following s3 cp API command, using the AWS CLI. The three files will be copied to a bootstrap directory within your S3 bucket.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
aws s3 cp bootstrap/bootstrap.sh s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/
aws s3 cp bootstrap/helium.json s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/
aws s3 cp sql/ratings.sql s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/

Below, sample output from copying local files to S3.

screen-shot-2019-11-21-at-2_03_49-pm

Create AWS Resources

We will start by creating most of the required AWS resources for the demonstration using three CloudFormation templates. We will create a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, an AWS Glue Data Catalog database, two AWS Glue Crawlers, and a Glue IAM Role. We will wait to create the multi-node EMR cluster due to the compute costs of running large EC2 instances in the cluster. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.

Single-Node EMR Cluster

We will start by creating the single-node Amazon EMR cluster, consisting of just one master node with no core or task nodes (a cluster of one). All operations will take place on the master node.

Default EMR Resources

The following EMR instructions assume you have already created at least one EMR cluster in the past, in your current AWS Region, using the EMR web interface with the ‘Create Cluster – Quick Options’ option. Creating a cluster this way creates several additional AWS resources, such as the EMR_EC2_DefaultRole EC2 instance profile, the default EMR_DefaultRole EMR IAM Role, and the default EMR S3 log bucket.

screen_shot_2019-11-17_at_5_46_56_pm.png

If you haven’t created any EMR clusters using the EMR ‘Create Cluster – Quick Options’ in the past, don’t worry, you can also create the required resources with a few quick AWS CLI commands. Change the following LOG_BUCKET variable value, then run the aws emr and aws s3api API commands, using the AWS CLI. The LOG_BUCKET variable value follows the convention of aws-logs-awsaccount-region. For example, aws-logs-012345678901-us-east-1.

# create emr roles
aws emr create-default-roles

# create log secure bucket
# change me
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"

aws s3api create-bucket --bucket ${LOG_BUCKET}
aws s3api put-public-access-block --bucket ${LOG_BUCKET} \
    --public-access-block-configuration \
    BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

The new EMR IAM Roles can be viewed in the IAM Roles web interface.

screen-shot-2019-11-21-at-2_05_35-pm

Often, I see tutorials that reference these default EMR resources from the AWS CLI or CloudFormation, without any understanding or explanation of how they are created.

EMR Bootstrap Script

As part of creating our EMR cluster, the CloudFormation template, emr_single_node.yml, will call the bootstrap script we copied to S3, earlier, bootstrap.sh. The bootstrap script pre-installs required Python and Linux software packages, and the PostgreSQL driver JAR. The bootstrap script also clones your copy of the zeppelin-emr-demo GitHub repository.

#!/bin/bash
set -ex

if [[ $# -ne 2 ]] ; then
    echo "Script requires two arguments"
    exit 1
fi

GITHUB_ACCOUNT=$1
GITHUB_REPO=$2

# install extra python packages
sudo python3 -m pip install psycopg2-binary boto3

# install extra linux packages
yes | sudo yum install git htop

# clone github repo
cd /tmp
git clone "https://github.com/${GITHUB_ACCOUNT}/${GITHUB_REPO}.git"

# install extra jars
POSTGRES_JAR="postgresql-42.2.8.jar"
wget -nv "https://jdbc.postgresql.org/download/${POSTGRES_JAR}"
sudo chown -R hadoop:hadoop ${POSTGRES_JAR}
mkdir -p /home/hadoop/extrajars/
cp ${POSTGRES_JAR} /home/hadoop/extrajars/

EMR Application Configuration

The EMR CloudFormation template will also modify the EMR cluster’s Spark and Zeppelin application configurations. Amongst other configuration properties, the template sets the default Python version to Python3, instructs Zeppelin to use the cloned GitHub notebook directory path, and adds the PostgreSQL Driver JAR to the JVM ClassPath. Below we can see the configuration properties applied to an existing EMR cluster.

screen-shot-2019-11-21-at-2_24_43-pm
EMR Application Versions

As of the date of this post, EMR is at version 5.28.0. Below, as shown in the EMR web interface, are the current (21) applications and frameworks available for installation on EMR.

emr-28.png

For this demo, we will install Apache Spark v2.4.4, Ganglia v3.7.2, and Zeppelin 0.8.2.

screen_shot_2019-11-17_at_8_32_17_pmApache Zeppelin: Web Interface

screen_shot_2019-11-13_at_5_40_12_pmApache Spark: DAG Visualization

screen_shot_2019-11-13_at_8_31_13_pmGanglia: Cluster CPU Monitoring

Create the EMR CloudFormation Stack

Change the following (7) variable values, then run the emr cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
EC2_KEY_NAME="your-key-name"
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"
GITHUB_ACCOUNT="your-account-name" # i.e. garystafford
GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo
GITHUB_TOKEN="your-token-value"
MASTER_INSTANCE_TYPE="m5.xlarge" # optional
 
aws cloudformation create-stack \
    --stack-name zeppelin-emr-dev-stack \
    --template-body file://cloudformation/emr_single_node.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
                 ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \
                 ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \
                 ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \
                 ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \
                 ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \
                 ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}

You can use the Amazon EMR web interface to confirm the results of the CloudFormation stack. The cluster should be in the ‘Waiting’ state.

screen_shot_2019-11-15_at_7_42_09_pm

PostgreSQL on Amazon RDS

Next, create a simple, single-AZ, single-master, non-replicated Amazon RDS PostgreSQL database, using the included CloudFormation template, rds_postgres.yml. We will use this database in Notebook 4. For the demo, I have selected the current-generation general purpose db.m4.large EC2 instance type to run PostgreSQL. You can easily change the instance type to another RDS-supported instance type to suit your own needs.

Change the following (3) variable values, then run the cloudformation create-stack API command, using the AWS CLI.

# change me
DB_MASTER_USER="your-db-username" # i.e. masteruser
DB_MASTER_PASSWORD="your-db-password" # i.e. 5up3r53cr3tPa55w0rd
MASTER_INSTANCE_TYPE="db.m4.large" # optional
 
aws cloudformation create-stack \
    --stack-name zeppelin-rds-stack \
    --template-body file://cloudformation/rds_postgres.yml \
    --parameters ParameterKey=DBUser,ParameterValue=${DB_MASTER_USER} \
                 ParameterKey=DBPassword,ParameterValue=${DB_MASTER_PASSWORD} \
                 ParameterKey=DBInstanceClass,ParameterValue=${MASTER_INSTANCE_TYPE}

You can use the Amazon RDS web interface to confirm the results of the CloudFormation stack.

screen_shot_2019-11-17_at_8_06_44_pm.png

AWS Glue

Next, create the AWS Glue Data Catalog database, the Apache Hive-compatible metastore for Spark SQL, two AWS Glue Crawlers, and a Glue IAM Role (ZeppelinDemoCrawlerRole), using the included CloudFormation template, crawler.yml. The AWS Glue Data Catalog database will be used in Notebook 3.

Change the following variable value, then run the cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
aws cloudformation create-stack \
    --stack-name zeppelin-crawlers-stack \
    --template-body file://cloudformation/crawler.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
    --capabilities CAPABILITY_NAMED_IAM

You can use the AWS Glue web interface to confirm the results of the CloudFormation stack. Note the Data Catalog database and the two Glue Crawlers. We will not run the two crawlers until Part 2 of the post, so no tables will exist in the Data Catalog database, yet.

screen_shot_2019-11-27_at_8_50_09_pmscreen_shot_2019-11-27_at_8_56_47_pm

At this point in the demonstration, you should have successfully created a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, and several AWS Glue resources, all using CloudFormation templates.

screen_shot_2019-11-21_at_4_19_01_pm

Post-EMR Creation Configuration

RDS Security

For the new EMR cluster to communicate with the RDS PostgreSQL database, we need to ensure that port 5432 is open from the RDS database’s VPC security group, which is the default VPC security group, to the security groups of the EMR nodes. Obtain the Group ID of the ElasticMapReduce-master and ElasticMapReduce-slave Security Groups from the EMR web interface.

screen_shot_2019-11-20_at_11_51_10_am

Access the Security Group for the RDS database using the RDS web interface. Change the inbound rule for port 5432 to include both Security Group IDs.

screen_shot_2019-11-20_at_11_52_23_am

SSH to EMR Master Node

In addition to the bootstrap script and configurations, we already applied to the EMR cluster, we need to make several post-EMR creation configuration changes to the EMR cluster for our demonstration. These changes will require SSH’ing to the EMR cluster. Using the master node’s public DNS address and SSH command provided in the EMR web console, SSH into the master node.

screen_shot_2019-11-15_at_7_42_09_pm_v3

If you cannot access the node using SSH, check that port 22 is open on the associated EMR master node IAM Security Group (ElasticMapReduce-master) to your IP address or address range.

screen_shot_2019-11-15_at_7_42_09_pm_v2

screen_shot_2019-11-21_at_4_51_01_pm.png

Git Permissions

We need to change permissions on the git repository we installed during the EMR bootstrapping phase. Typically, with an EC2 instance, you perform operations as the ec2-user user. With Amazon EMR, you often perform actions as the hadoop user. With Zeppelin on EMR, the notebooks perform operations, including interacting with the git repository as the zeppelin user. As a result of the bootstrap.sh script, the contents of the git repository directory, /tmp/zeppelin-emr-demo/, are owned by the hadoop user and group by default.

screen_shot_2019-11-17_at_8_01_24_pm

We will change their owner to the zeppelin user and group. We could not perform this step as part of the bootstrap script since the the zeppelin user and group did not exist at the time the script was executed.

cd /tmp/zeppelin-emr-demo/
sudo chown -R zeppelin:zeppelin .

The results should look similar to the following output.

screen_shot_2019-11-17_at_8_02_16_pm

Pre-Install Visualization Packages

Next, we will pre-install several Apache Zeppelin Visualization packages. According to the Zeppelin website, an Apache Zeppelin Visualization is a pluggable package that can be loaded/unloaded on runtime through the Helium framework in Zeppelin. We can use them just like any other built-in visualization in the notebook. A Visualization is a javascript npm package. For example, here is a link to the ultimate-pie-chart on the public npm registry.

We can pre-load plugins by replacing the /usr/lib/zeppelin/conf/helium.json file with the version of helium.json we copied to S3, earlier, and restarting Zeppelin. If you have a lot of Visualizations or package types or use any DataOps automation to create EMR clusters, this approach is much more efficient and repeatable than manually loading plugins using the Zeppelin UI, each time you create a new EMR cluster. Below, the helium.json file, which pre-loads (8) Visualization packages.

{
    "enabled": {
        "ultimate-pie-chart": "ultimate-pie-chart@0.0.2",
        "ultimate-column-chart": "ultimate-column-chart@0.0.2",
        "ultimate-scatter-chart": "ultimate-scatter-chart@0.0.2",
        "ultimate-range-chart": "ultimate-range-chart@0.0.2",
        "ultimate-area-chart": "ultimate-area-chart@0.0.1",
        "ultimate-line-chart": "ultimate-line-chart@0.0.1",
        "zeppelin-bubblechart": "zeppelin-bubblechart@0.0.4",
        "zeppelin-highcharts-scatterplot": "zeppelin-highcharts-scatterplot@0.0.2"
    },
    "packageConfig": {},
    "bundleDisplayOrder": [
        "ultimate-pie-chart",
        "ultimate-column-chart",
        "ultimate-scatter-chart",
        "ultimate-range-chart",
        "ultimate-area-chart",
        "ultimate-line-chart",
        "zeppelin-bubblechart",
        "zeppelin-highcharts-scatterplot"
    ]
}

Run the following commands to load the plugins and adjust the permissions on the file.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
sudo aws s3 cp s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/helium.json \
    /usr/lib/zeppelin/conf/helium.json
sudo chown zeppelin:zeppelin /usr/lib/zeppelin/conf/helium.json

Create New JDBC Interpreter

Lastly, we need to create a new Zeppelin JDBC Interpreter to connect to our RDS database. By default, Zeppelin has several interpreters installed. You can review a list of available interpreters using the following command.

sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --list

screen_shot_2019-11-18_at_6_29_40_am

The new JDBC interpreter will allow us to connect to our RDS PostgreSQL database, using Java Database Connectivity (JDBC). First, ensure all available interpreters are installed, including the current Zeppelin JDBC driver (org.apache.zeppelin:zeppelin-jdbc:0.8.0) to /usr/lib/zeppelin/interpreter/jdbc.

Creating a new interpreter is a two-part process. In this stage, we install the required interpreter files on the master node using the following command. Then later, in the Zeppelin web interface, we will configure the new PostgreSQL JDBC interpreter. Note we must provide a unique name for the interpreter (I chose ‘postgres’ in this case), which we will refer to in part two of the interpreter creation process.

sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --all
 
sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh \
    --name "postgres" \
    --artifact org.apache.zeppelin:zeppelin-jdbc:0.8.0

To complete the post-EMR creation configuration on the master node, we must restart Zeppelin for our changes to take effect.

sudo stop zeppelin && sudo start zeppelin

In my experience, it could take 2–3 minutes for the Zeppelin UI to become fully responsive after a restart.

screen_shot_2019-11-18_at_10_01_54_pm

Zeppelin Web Interface Access

With all the EMR application configuration complete, we will access the Zeppelin web interface running on the master node. Use the Zeppelin connection information provided in the EMR web interface to setup SSH tunneling to the Zeppelin web interface, running on the master node. Using this method, we can also access the Spark History Server, Ganglia, and Hadoop Resource Manager web interfaces; all links are provided from EMR.

screen_shot_2019-11-15_at_7_42_09_pm

To set up a web connection to the applications installed on the EMR cluster, I am using FoxyProxy as a proxy management tool with Google Chrome.

screen_shot_2019-11-17_at_8_22_09_pm.png

If everything is working so far, you should see the Zeppelin web interface with all four Zeppelin notebooks available from the cloned GitHub repository. You will be logged in as the anonymous user. Zeppelin offers authentication for accessing notebooks on the EMR cluster. For brevity, we will not cover setting up authentication in Zeppelin, using Shiro Authentication.

screen_shot_2019-11-17_at_8_32_17_pm.png

To confirm the path to the local, cloned copy of the GitHub notebook repository, is correct, check the Notebook Repos interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen. The value should match the ZEPPELIN_NOTEBOOK_DIR configuration property value in the emr_single_node.yml CloudFormation template we executed earlier.

screen_shot_2019-11-18_at_10_04_23_pm

To confirm the Helium Visualizations were pre-installed correctly, using the helium.json file, open the Helium interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen.

screen_shot_2019-11-15_at_7_45_28_pmNote the enabled visualizations. And, it is easy to enable additional plugins through the web interface.screen_shot_2019-11-15_at_7_45_33_pm

New PostgreSQL JDBC Interpreter

If you recall, earlier, we install the required interpreter files on the master node using the following command using the bootstrap script. We will now complete the process of configuring the new PostgreSQL JDBC interpreter. Open the Interpreter interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen.

The title of the new interpreter must match the name we used to install the interpreter files, ‘postgres’. The interpreter group will be ‘jdbc’. There are, minimally, three properties we need to configure for your specific RDS database instance, including default.url, default.user, and and default.password. These should match the values you used to create your RDS instance, earlier. Make sure to includes the database name in the default.url. An example is shown below.

default.url: jdbc:postgresql://zeppelin-demo.abcd1234efg56.us-east-1.rds.amazonaws.com:5432/ratings
default.user: masteruser
default.password: 5up3r53cr3tPa55w0rd

We also need to provide a path to the PostgreSQL driver JAR dependency. This path is the location where we placed the JAR using the bootstrap.sh script, earlier, /home/hadoop/extrajars/postgresql-42.2.8.jar. Save the new interpreter and make sure it starts successfully (shows a green icon).

screen_shot_2019-11-17_at_9_03_02_pm.png

screen_shot_2019-11-18_at_6_42_13_am

Switch Interpreters to Python 3

The last thing we need to do is change the Spark and Python interpreters to use Python 3 instead of the default Python 2. On the same screen you used to create a new interpreter, modify the Spark and Python interpreters. First, for the Python interpreter, change the zeppelin.python property to python3.

screen_shot_2019-11-18_at_3_30_39_pm

Next, for the Spark interpreter, change the zeppelin.pyspark.python property to python3.

screen_shot_2019-11-18_at_3_32_34_pm

Part 2

In Part 1 of this post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3 data lake. In Part 2 of this post, we will explore some of Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the notebooks.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

Leave a comment

IoT Telemetry Collection using Google Protocol Buffers, Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas

Business team meeting. Photo professional investor working new s
Collect IoT sensor telemetry using Google Protocol Buffers’ serialized binary format over HTTPS, serverless Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas on GCP, as an alternative to integrated Cloud IoT platforms and standard IoT protocols. Aggregate, analyze, and build machine learning models with the data using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

Introduction

Most of the dominant Cloud providers offer IoT (Internet of Things) and IIotT  (Industrial IoT) integrated services. Amazon has AWS IoT, Microsoft Azure has multiple offering including IoT Central, IBM’s offering including IBM Watson IoT Platform, Alibaba Cloud has multiple IoT/IIoT solutions for different vertical markets, and Google offers Google Cloud IoT platform. All of these solutions are marketed as industrial-grade, highly-performant, scalable technology stacks. They are capable of scaling to tens-of-thousands of IoT devices or more and massive amounts of streaming telemetry.

In reality, not everyone needs a fully integrated IoT solution. Academic institutions, research labs, tech start-ups, and many commercial enterprises want to leverage the Cloud for IoT applications, but may not be ready for a fully-integrated IoT platform or are resistant to Cloud vendor platform lock-in.

Similarly, depending on the performance requirements and the type of application, organizations may not need or want to start out using IoT/IIOT industry standard data and transport protocols, such as MQTT (Message Queue Telemetry Transport) or CoAP (Constrained Application Protocol), over UDP (User Datagram Protocol). They may prefer to transmit telemetry over HTTP using TCP, or securely, using HTTPS (HTTP over TLS).

Demonstration

In this demonstration, we will collect environmental sensor data from a number of IoT device sensors and stream that telemetry over the Internet to Google Cloud. Each IoT device is installed in a different physical location. The devices contain a variety of common sensors, including humidity and temperature, motion, and light intensity.

iot_3.jpg

Prototype IoT Devices used in this Demonstration

We will transmit the sensor telemetry data as JSON over HTTP to serverless Google Cloud Function HTTPS endpoints. We will then switch to using Google’s Protocol Buffers to transmit binary data over HTTP. We should observe a reduction in the message size contained in the request payload as we move from JSON to Protobuf, which should reduce system latency and cost.

Data received by Cloud Functions over HTTP will be published asynchronously to Google Cloud Pub/Sub. A second Cloud Function will respond to all published events and push the messages to MongoDB Atlas on GCP. Once in Atlas, we will aggregate, transform, analyze, and build machine learning models with the data, using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

For this demonstration, the architecture for JSON over HTTP will look as follows. All sensors will transmit data to a single Cloud Function HTTPS endpoint.

JSON IoT Basic Icons.png

For Protobuf over HTTP, the architecture will look as follows in the demonstration. Each type of sensor will transmit data to a different Cloud Function HTTPS endpoint.

Demo IoT Diagram Icons.png

Although the Cloud Functions will automatically scale horizontally to accommodate additional load created by the volume of telemetry being received, there are also other options to scale the system. For example, we could create individual pipelines of functions and topic/subscriptions for each sensor type. We could also split the telemetry data across multiple MongoDB Atlas Collections, based on sensor type, instead of a single collection. In all cases, we will still benefit from the Cloud Function’s horizontal scaling capabilities.

Complex IoT Diagram Icons.png

Source Code

All source code is all available on GitHub. Use the following command to clone the project.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/iot-protobuf-demo.git

You will need to adjust the project’s environment variables to fit your own development and Cloud environments. All source code for this post is written in Python. It is intended for Python 3 interpreters but has been tested using Python 2 interpreters. The project’s Jupyter Notebooks can be viewed from within the project on GitHub or using the free, online Jupyter nbviewer.

screen_shot_2019-05-21_at_12_55_25_pm

Technologies

Protocol Buffers

Image result for google developerAccording to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.

Each protocol buffer message is a small logical record of information, containing a series of strongly-typed name-value pairs. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto file to generate data access classes.

Google Cloud Functions

Cloud-Functions.png

According to Google, Cloud Functions is Google’s event-driven, serverless compute platform. Key features of Cloud Functions include automatic scaling, high-availability, fault-tolerance,
no servers to provision, manage, patch or update, only
pay while your code runs, and they easily connect and extend other cloud services. Cloud Functions natively support multiple event-types, including HTTP, Cloud Pub/Sub, Cloud Storage, and Firebase. Current language support includes Python, Go, and Node.

Google Cloud Pub/Sub

google pub-subAccording to Google, Cloud Pub/Sub is an enterprise message-oriented middleware for the Cloud. It is a scalable, durable event ingestion and delivery system. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independent applications. Cloud Pub/Sub delivers low-latency, durable messaging that integrates with systems hosted on the Google Cloud Platform and externally.

MongoDB Atlas

mongodbMongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime service-level agreements, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, free M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

Cost Effectiveness of Cloud Functions

At true IIoT scale, Google Cloud Functions may not be the most efficient or cost-effective method of ingesting telemetry data. Based on Google’s pricing model, you get two million free function invocations per month, with each additional million invocations costing USD $0.40. The total cost also includes memory usage, total compute time, and outbound data transfer. If your system is comprised of tens or hundreds of IoT devices, Cloud Functions may prove cost-effective.

However, with thousands of devices or more, each transmitting data multiple times per minutes, you could quickly outgrow the cost-effectiveness of Google Functions. In that case, you might look to Google’s Google Cloud IoT platform. Alternately, you can build your own platform with Google products such as Knative, letting you choose to run your containers either fully managed with the newly-released Cloud Run, or in your Google Kubernetes Engine cluster with Cloud Run on GKE.

Sensor Scripts

For each sensor type, I have developed separate Python scripts, which run on each IoT device. There are two versions of each script, one for JSON over HTTP and one for Protobuf over HTTP.

JSON over HTTPS

Below we see the script, dht_sensor_http_json.py, used to transmit humidity and temperature data via JSON over HTTP to a Google Cloud Function running on GCP. The JSON request payload contains a timestamp, IoT device ID, device type, and the temperature and humidity sensor readings. The URL for the Google Cloud Function is stored as an environment variable, local to the IoT devices, and set when the script is deployed.

import json
import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests

URL = os.environ.get('GCF_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)
        payload = {'device': device_id,
                   'type': TYPE,
                   'timestamp': time.time(),
                   'data': {'temperature': temperature,
                            'humidity': humidity}}
        post_data(payload)
        time.sleep(FREQUENCY)


def post_data(payload):
    payload = json.dumps(payload)

    headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': JWT
    }

    try:
        requests.post(URL, json=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Telemetry Frequency

Although the sensors are capable of producing data many times per minute, for this demonstration, sensor telemetry is intentionally limited to only being transmitted every 15 seconds. To reduce system complexity, potential latency, back-pressure, and cost, in my opinion, you should only produce telemetry data at the frequency your requirements dictate.

JSON Web Tokens

For security, in addition to the HTTPS endpoints exposed by the Google Cloud Functions, I have incorporated the use of a JSON Web Token (JWT). JSON Web Tokens are an open, industry standard RFC 7519 method for representing claims securely between two parties. In this case, the JWT is used to verify the identity of the sensor scripts sending telemetry to the Cloud Functions. The JWT contains an id, password, and expiration, all encrypted with a secret key, which is known to each Cloud Function, in order to verify the IoT device’s identity. Without the correct JWT being passed in the Authorization header, the request to the Cloud Function will fail with an HTTP status code of 401 Unauthorized. Below is an example of the JWT’s payload data.

{
  "sub": "IoT Protobuf Serverless Demo",
  "id": "iot-demo-key",
  "password": "t7J2gaQHCFcxMD6584XEpXyzWhZwRrNJ",
  "iat": 1557407124,
  "exp": 1564664724
}

For this demonstration, I created a temporary JWT using jwt.io. The HTTP Functions are using PyJWT, a Python library which allows you to encode and decode the JWT. The PyJWT library allows the Function to decode and validate the JWT (Bearer Token) from the incoming request’s Authorization header. The JWT token is stored as an environment variable. Deployment instructions are included in the GitHub project.

screen_shot_2019-05-09_at_5_13_28_pm

JSON Payload

Below is a typical JSON request payload (pretty-printed), containing DHT sensor data. This particular message is 148 bytes in size. The message format is intentionally reader-friendly. We could certainly shorten the message’s key fields, to reduce the payload size by an additional 15-20 bytes.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Protocol Buffers

For the demonstration, I have built a Protocol Buffers file, sensors.proto, to support the data output by three sensor types: digital humidity and temperature (DHT), passive infrared sensor (PIR), and digital light intensity (DLI). I am using the newer proto3 version of the protocol buffers language. I have created a common Protobuf sensor message schema, with the variable sensor telemetry stored in the nested data object, within each message type.

It is important to use the correct Protobuf Scalar Value Type to maintain numeric precision in the language you compile for. For simplicity, I am using a double to represent the timestamp, as well as the numeric humidity and temperature readings. Alternately, you could choose Google’s Protobuf WellKnownTypesTimestamp to store timestamp.

syntax = "proto3";

package sensors;

// DHT22
message SensorDHT {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDHT data = 4;
}

message DataDHT {
    double temperature = 1;
    double humidity = 2;
}

// Onyehn_PIR
message SensorPIR {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataPIR data = 4;
}

message DataPIR {
    bool motion = 1;
}

// Anmbest_MD46N
message SensorDLI {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDLI data = 4;
}

message DataDLI {
    bool light = 1;
}

Since the sensor data will be captured with scripts written in Python 3, the Protocol Buffers file is compiled for Python, resulting in the file, sensors_pb2.py.

protoc --python_out=. sensors.proto

Protocol Buffers over HTTPS

Below we see the alternate DHT sensor script, dht_sensor_http_pb.py, which transmits a Protocol Buffers-based binary request payload over HTTPS to a Google Cloud Function running on GCP. Note the request’s Content-Type header has been changed from application/json to application/x-protobuf. In this case, instead of JSON, the same data fields are stored in an instance of the Protobuf’s SensorDHT message type (sensors_pb2.SensorDHT()). Note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device.

import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests
import sensors_pb2

URL = os.environ.get('GCF_DHT_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        try:
            humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)

            sensor_dht = sensors_pb2.SensorDHT()
            sensor_dht.device = device_id
            sensor_dht.type = TYPE
            sensor_dht.timestamp = time.time()
            sensor_dht.data.temperature = temperature
            sensor_dht.data.humidity = humidity

            payload = sensor_dht.SerializeToString()

            post_data(payload)
            time.sleep(FREQUENCY)
        except TypeError:
            logging.error('Error getting sensor data!')


def post_data(payload):
    headers = {
        'Content-Type': 'application/x-protobuf',
        'Authorization': JWT
    }

    try:
        requests.post(URL, data=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Protobuf Binary Payload

To understand the binary Protocol Buffers-based payload, we can write a sample SensorDHT message to a file on disk as a byte array.

message = sensorDHT.SerializeToString()

binary_file_output = open("./data_binary.txt", "wb")
file_byte_array = bytearray(message)
binary_file_output.write(file_byte_array)

Then, using the hexdump command, we can view a representation of the binary data file.

> hexdump -C data_binary.txt
00000000  0a 08 38 32 39 63 37 65  30 65 12 05 44 48 54 32  |..829c7e0e..DHT2|
00000010  32 1d 05 a0 b9 4e 22 0a  0d ec 51 b2 41 15 cd cc  |2....N"...Q.A...|
00000020  38 42                                             |8B|
00000022

The binary data file size is 48 bytes on disk, as compared to the equivalent JSON file size of 148 bytes on disk (32% the size). As a test, we could then send that binary data file as the payload of a POST to the Cloud Function, as shown below using Postman. Postman will serialize the binary data file’s contents to a binary string before transmitting.

screen_shot_2019-05-14_at_7_00_39_am.png

Similarly, we can serialize the same binary Protocol Buffers-based SensorDHT message to a binary string using the SerializeToString method.

message = sensorDHT.SerializeToString()
print(message)

The resulting binary string resembles the following.

b'\n\nrp829c7e0e\x12\x05DHT22\x19c\xee\xbcg\xf5\x8e\xccA"\x12\t\x00\x00\x00\xa0\x99\x191@\x11\x00\x00\x00`f\x06Q@'

The binary string length of the serialized message, and therefore the request payload sent by Postman and received by the Cloud Function for this particular message, is 111 bytes, as compared to the JSON payload size of 148 bytes (75% the size).

Validate Protobuf Payload

To validate the data contained in the Protobuf payload is identical to the JSON payload, we can parse the payload from the serialized binary string using the Protobuf ParseFromString method. We then convert it to JSON using the Protobuf MessageToJson method.

message = sensorDHT.SerializeToString() 
message_parsed = sensors_pb2.SensorDHT()
message_parsed.ParseFromString(message)
print(MessageToJson(message_parsed))

The resulting JSON object is identical to the JSON payload sent using JSON over HTTPS, earlier in the demonstration.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Google Cloud Functions

There are a series of Google Cloud Functions, specifically four HTTP Functions, which accept the sensor data over HTTP from the IoT devices. Each function exposes an HTTPS endpoint. According to Google, you use HTTP functions when you want to invoke your function via an HTTP(S) request. To allow for HTTP semantics, HTTP function signatures accept HTTP-specific arguments.

Below, I have deployed a single function that accepts JSON sensor telemetry from all sensor types, and three functions for Protobuf, one for each sensor type: DHT, PIR, and DLI.

screen-shot-2019-05-13-at-8_34_49-pm

JSON Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming JSON over HTTPS payload from all sensor types. Once the request’s JWT is validated, the JSON message payload is serialized to a byte string and sent to a common Google Cloud Pub/Sub Topic. Note the JWT secret key, id, and password, and the Google Cloud Pub/Sub Topic are all stored as environment variables, local to the Cloud Functions. In my tests, the JSON-based HTTP Functions took an average of 9–18 ms to execute successfully.

import logging
import os

import jwt
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    request_json = request.get_json()
    if not request_json:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    send_message(request_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, 
                      data=bytes(str(message), 'utf-8'))

The Cloud Functions are deployed to GCP using the gcloud functions deploy CLI command (I use Jenkins to automate the deployments). I have wrapped the deploy commands into bash scripts. The script also copies over a common environment variables YAML file, consumed by the Cloud Function. Each Function has a deployment script, included in the project.

# get latest env vars file
cp -f ./../env_vars_file/env.yaml .

# deploy function
gcloud functions deploy http_json_to_pubsub \
  --runtime python37 \
  --trigger-http \
  --region us-central1 \
  --memory 256 \
  --entry-point incoming_message \
  --env-vars-file env.yaml

Using a .gcloudignore file, the gcloud functions deploy CLI command deploys three files: the cloud function (main.py), required Python packages file (requirements.txt), the environment variables file (env.yaml). Google automatically installs dependencies using the requirements.txt file.

Protobuf Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming Protobuf over HTTPS payload from DHT sensor types. Once the sensor data Protobuf message payload is received by the HTTP Function, it is deserialized to JSON and then serialized to a byte string. The byte string is then sent to a Google Cloud Pub/Sub Topic. In my tests, the Protobuf-based HTTP Functions took an average of 7–14 ms to execute successfully.

As before, note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device. It is used to parse a serialized message into its original Protobuf’s SensorDHT message type.

import logging
import os

import jwt
import sensors_pb2
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1
from google.protobuf.json_format import MessageToJson

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    data = request.get_data()
    if not data:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    sensor_pb = sensors_pb2.SensorDHT()
    sensor_pb.ParseFromString(data)
    sensor_json = MessageToJson(sensor_pb)
    send_message(sensor_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, data=bytes(message, 'utf-8'))

Cloud Pub/Sub Functions

In addition to HTTP Functions, the demonstration uses a function triggered by Google Cloud Pub/Sub Triggers. According to Google, Cloud Functions can be triggered by messages published to Cloud Pub/Sub Topics in the same GCP project as the function. The function automatically subscribes to the Topic. Below, we see that the function has automatically subscribed to iot-data-demo Cloud Pub/Sub Topic.

screen_shot_2019-05-09_at_2_41_17_pm

Sending Telemetry to MongoDB Atlas

The common Cloud Function, triggered by messages published to Cloud Pub/Sub, then sends the messages to MongoDB Atlas. There is a minimal amount of cleanup required to re-format the Cloud Pub/Sub messages to BSON (binary JSON). Interestingly, according to bsonspec.org, BSON can be com­pared to bin­ary inter­change for­mats, like Proto­col Buf­fers. BSON is more schema-less than Proto­col Buf­fers, which can give it an ad­vant­age in flex­ib­il­ity but also a slight dis­ad­vant­age in space ef­fi­ciency (BSON has over­head for field names with­in the seri­al­ized data).

The function uses the PyMongo to connect to MongoDB Atlas. According to their website, PyMongo is a Python distribution containing tools for working with MongoDB and is the recommended way to work with MongoDB from Python.

import base64
import json
import logging
import os
import pymongo

MONGODB_CONN = os.environ.get('MONGODB_CONN')
MONGODB_DB = os.environ.get('MONGODB_DB')
MONGODB_COL = os.environ.get('MONGODB_COL')


def read_message(event, context):
    message = base64.b64decode(event['data']).decode('utf-8')
    message = message.replace("'", '"')
    message = message.replace('True', 'true')
    message = json.loads(message)

    client = pymongo.MongoClient(MONGODB_CONN)
    db = client[MONGODB_DB]
    col = db[MONGODB_COL]
    col.insert_one(message)

The function responds to the published events and sends the messages to the MongoDB Atlas cluster, running in the same Region, us-central1, as the Cloud Functions and Pub/Sub Topic. Below, we see the current options available when provisioning an Atlas cluster.

screen_shot_2019-05-09_at_6_17_18_pm

MongoDB Atlas provides a rich, web-based UI for managing and monitoring MongoDB clusters, databases, collections, security, and performance.

screen_shot_2019-05-10_at_10_08_14_pm

Although Cloud Pub/Sub to Atlas function execution times are longer in duration than the HTTP functions, the latency is greatly reduced by locating the Cloud Pub/Sub Topic, Cloud Functions, and MongoDB Atlas cluster into the same GCP Region. Cross-region execution times were as high as 500-600 ms, while same-region execution times averaged 200-225 ms. Selecting a more performant Atlas cluster would likely result in even lower function execution times.

screen_shot_2019-05-10_at_10_16_49_pm

Aggregating Data with MongoDB Compass

MongoDB Compass is a free, convenient, desktop application for interacting with your MongoDB databases. You can view the collected sensor data, review message (document) schema, manage indexes, and build complex MongoDB aggregations.

screen_shot_2019-05-14_at_5_19_17_pm

screen_shot_2019-05-21_at_1_17_40_pm.pngscreen_shot_2019-05-21_at_1_15_09_pm

When performing analytics or machine learning, I primarily use MongoDB Compass to preview the captured telemetry data and build aggregation pipelines. Aggregation operations process data records and returns computed results. This feature saves a ton of time, filtering and preparing data for further analysis, visualization, and machine learning with Jupyter Notebooks.

screen_shot_2019-05-14_at_5_22_58_pm

Aggregation pipelines can be directly exported to Java, Node, C#, and Python 3. The exported aggregation pipeline code can be placed directly into your Python applications and Jupyter Notebooks.

screen_shot_2019-05-14_at_5_23_39_pm

Below, the exported aggregation pipelines code from MongoDB Compass is used to load a resultset directly into a Pandas DataFrame. This particular aggregation returns time-series DHT sensor data from a specific IoT device over a 72-hour period.

DEVICE_1 = 'rp59adf374'
pipeline = [
    {
        '$match': {
            'type': 'DHT22', 
            'device': DEVICE_1, 
            'timestamp': {
                '$gt': 1557619200,
                '$lt': 1557792000
            }
        }
    }, {
        '$project': {
            '_id': 0,
            'timestamp': 1, 
            'temperature': '$data.temperature', 
            'humidity': '$data.humidity'
        }
    }, {
        '$sort': {
            'timestamp': 1
        }
    }
]
aggResult = iot_data.aggregate(pipeline)
df1 = pd.DataFrame(list(aggResult))

MongoDB Atlas Performance

In this demonstration, from Python3-based Jupyter Notebooks, I was able to consistently query a MongoDB Atlas collection of almost 70k documents for resultsets containing 3 days (72 hours) worth of digital temperature and humidity data, roughly 10.2k documents, in an average of 825 ms. That is round trip from my local development laptop to MongoDB Atlas running on GCP, in a different geographic region.

Query times on GCP are much faster, such as when running a Notebook in JupyterLab on Google’s AI Platform, or a PySpark job with Cloud Dataproc, against Atlas. Running the same Jupyter Notebook directly on Google’s AI Platform, the same MongoDB Atlas query took an average of 450 ms versus 825 ms (1.83x faster). This was across two different GCP Regions; same Region times should be even faster.

screen-shot-2019-05-13-at-9_09_52-pm

GCP Observability

There are several choices for observing the system’s Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas. As shown above, the GCP Cloud Functions interface lets you see the individual function executions, execution times, memory usage, and active instances, over varying time intervals.

For a more detailed view of Google Cloud Functions and Google Cloud Pub/Sub, I built two custom dashboards using Stackdriver. According to Google, Stackdriver aggregates metrics, logs, and events from infrastructure, giving developers and operators a rich set of observable signals. I built a custom Stackdriver Cloud Functions dashboard (shown below) and a Cloud Pub/Sub Topics and Subscriptions dashboard.

For functions, I chose to display execution times, memory usage, the number of executions, and network egress, all in a single pane of glass, using four graphs. Below, I am using the 95th percentile average for monitoring. The 95th percentile asserts that 95% of the time, the observed values are below this amount and the remaining 5% of the time, the observed values are above that amount.

screen_shot_2019-05-10_at_10_13_37_pm

Data Analysis using Jupyter Notebooks

According to jupyter.org, the Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The widespread use of Jupyter Notebooks has grown significantly, as Big Data, AI, and ML have all experienced explosive growth.

PyCharm

JetBrains PyCharm, my favorite Python IDE, has direct integrations with Jupyter Notebooks. In fact, PyCharm’s most recent updates to the Professional Edition greatly enhanced those integrations. PyCharm offers round-trip editing in the IDE and the Jupyter Notebook web browser interface. PyCharm allows you to run and debug individual cells within the notebook. PyCharm automatically starts the Jupyter Server and appropriate kernel for the Notebook you have opened. And, one of my favorite features, PyCharm’s variable viewer tracks the current value of a variable, automatically.

screen_shot_2019-05-10_at_10_38_42_am

Below, we see the example Analytics Notebook, included in the demonstration’s project, displayed in PyCharm 19.1.2 (Professional Edition). To effectively work with Notebooks in PyCharm really requires a full-size monitor. Working on a laptop with PyCharm’s crowded Notebook UI is workable, but certainly not as effective as on a larger monitor.

screen_shot_2019-05-20_at_2_23_46_pm.png

Jupyter Notebook Server

Below, we see the same Analytics Notebook, shown above in PyCharm, opened in Jupyter Notebook Server’s web-based client interface, running locally on the development workstation. The web browser-based interface also offers a rich set of features for Notebook development.

From within the Notebook, we are able to query the data from MongoDB Atlas, again using PyMongo, and load the resultsets into Panda DataFrames. As an alternative to hard-coded values and environment variables, with Notebooks, I use the python-dotenv Python package. This package allows me to place my environment variables in a common .env file and reference them from any Notebook. The package has many options for managing environment variables.

screen_shot_2019-05-19_at_9_46_32_am.png

We can then analyze the data using a number of common frameworks, including PandasMatplotlib, SciPy, PySpark, and NumPy, to name but a few. Below, we see time series data from four different sensors, on the same IoT device. Viewing the data together, we can study the causal effect of one environment variable on another, such as the impact of light on temperature or humidity.

screen_shot_2019-05-23_at_5_25_44_pm

Below, we can use histograms to visualize temperature frequencies for
intervals, over time, for a given device location.

screen-shot-2019-05-13-at-9_13_35-pm

Machine Learning using Jupyter Notebooks

In addition to data analytics, we can use Jupyter Notebooks with tools such as scikit-learn to build machine learning models based on our sensor telemetry.  Scikit-learn is a set of machine learning tools in Python, built on NumPy, SciPy, and matplotlib. Below, I have used JupyterLab on Google’s AI Platform and scikit-learn to build several models, based on the sensor data.

screen_shot_2019-05-19_at_12_25_45_pm

screen_shot_2019-05-19_at_12_26_45_pm

Using scikit-learn, we can build models to predict such things as which IoT device generated a specific temperature and humidity reading, or the temperature and humidity, given the time of day, device location, and external environment variables, or discover anomalies in the sensor telemetry.

Scikit-learn makes it easy to construct randomized training and test datasets, to build models, using data from multiple IoT devices, as shown below.

screen_shot_2019-05-19_at_12_27_39_pm

The project includes a Jupyter Notebook that demonstrates how to build several ML models using sensor data. Examples of supervised learning algorithms used to build the classification models in this demonstration include Support Vector Machine (SVM), k-nearest neighbors (k-NN), and Random Forest Classifier.

screen_shot_2019-05-19_at_12_30_38_pm

Having data from multiple sensors, we are able to enrich the ML models by adding additional categorical (discrete) features to our training data. For example, we could look at the effect of light, motion, and time of day on temperature and humidity.

screen_shot_2019-05-19_at_12_29_25_pm

Conclusion

Hopefully, this post has demonstrated how to efficiently collect telemetry data from IoT devices using Google Protocol Buffers over HTTPS, serverless Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas, all on the Google Cloud Platform. Once captured, the telemetry data was easily aggregated and analyzed using common tools, such as MongoDB Compass and Jupyter Notebooks. Further, we used the data and tools to build machine learning models for prediction and anomaly detection.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

Image: everythingpossible © 123RF.com

, , , , , , , , , ,

Leave a comment

Using the Google Cloud Dataproc WorkflowTemplates API to Automate Spark and Hadoop Workloads on GCP

In the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we explored Google Cloud Dataproc using the Google Cloud Console as well as the Google Cloud SDK and Cloud Dataproc API. We created clusters, then uploaded and ran Spark and PySpark jobs, then deleted clusters, each as discrete tasks. Although each task could be done via the Dataproc API and therefore automatable, they were independent tasks, without awareness of the previous task’s state.

Screen Shot 2018-12-15 at 11.39.26 PM.png

In this brief follow-up post, we will examine the Cloud Dataproc WorkflowTemplates API to more efficiently and effectively automate Spark and Hadoop workloads. According to Google, the Cloud Dataproc WorkflowTemplates API provides a flexible and easy-to-use mechanism for managing and executing Dataproc workflows. A Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. A Workflow is an operation that runs a Directed Acyclic Graph (DAG) of jobs on a cluster. Shown below, we see one of the Workflows that will be demonstrated in this post, displayed in Spark History Server Web UI.

screen-shot-2018-12-16-at-11.07.29-am.png

Here we see a four-stage DAG of one of the three jobs in the workflow, displayed in Spark History Server Web UI.

screen-shot-2018-12-16-at-11.18.45-am

Workflows are ideal for automating large batches of dynamic Spark and Hadoop jobs, and for long-running and unattended job execution, such as overnight.

Demonstration

Using the Python and Java projects from the previous post, we will first create workflow templates using the just the WorkflowTemplates API. We will create the template, set a managed cluster, add jobs to the template, and instantiate the workflow. Next, we will further optimize and simplify our workflow by using a YAML-based workflow template file. The YAML-based template file eliminates the need to make API calls to set the template’s cluster and add the jobs to the template. Finally, to further enhance the workflow and promote re-use of the template, we will incorporate parameterization. Parameters will allow us to pass parameters (key/value) pairs from the command line to workflow template, and on to the Python script as input arguments.

It is not necessary to use the Google Cloud Console for this post. All steps will be done using Google Cloud SDK shell commands. This means all steps may be automated using CI/CD DevOps tools, like Jenkins and Spinnaker on GKE.

Source Code

All open-sourced code for this post can be found on GitHub within three repositories: dataproc-java-demodataproc-python-demo, and dataproc-workflow-templates. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

WorkflowTemplates API

Always start by ensuring you have the latest Google Cloud SDK updates and are working within the correct Google Cloud project.

gcloud components update

export PROJECT_ID=your-project-id 
gcloud config set project $PROJECT

Set the following variables based on your Google environment. The variables will be reused throughout the post for multiple commands.

export REGION=your-region
export ZONE=your-zone
export BUCKET_NAME=your-bucket

The post assumes you still have the Cloud Storage bucket we created in the previous post. In the bucket, you will need the two Kaggle IBRD CSV files, available on Kaggle, the compiled Java JAR file from the dataproc-java-demo project, and a new Python script, international_loans_dataproc.py, from the dataproc-python-demo project.

screen-shot-2018-12-16-at-12.03.51-pm

Use gsutil with the copy (cp) command to upload the four files to your Storage bucket.

gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME
gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME
gsutil cp international_loans_dataproc.py $BUCKET_NAME

Following Google’s suggested process, we create a workflow template using the workflow-templates create command.

export TEMPLATE_ID=template-demo-1
  
gcloud dataproc workflow-templates create \
  $TEMPLATE_ID --region $REGION

Adding a Cluster

Next, we need to set a cluster for the workflow to use, in order to run the jobs. Cloud Dataproc will create and use a Managed Cluster for your workflow or use an existing cluster. If the workflow uses a managed cluster, it creates the cluster, runs the jobs, and then deletes the cluster when the jobs are finished. This means, for many use cases, there is no need to maintain long-lived clusters, they become just an ephemeral part of the workflow.

We set a managed cluster for our Workflow using the workflow-templates set-managed-cluster command. We will re-use the same cluster specifications we used in the previous post, the Standard, 1 master node and 2 worker nodes, cluster type.

gcloud dataproc workflow-templates set-managed-cluster \
  $TEMPLATE_ID \
  --region $REGION \
  --zone $ZONE \
  --cluster-name three-node-cluster \
  --master-machine-type n1-standard-4 \
  --master-boot-disk-size 500 \
  --worker-machine-type n1-standard-4 \
  --worker-boot-disk-size 500 \
  --num-workers 2 \
  --image-version 1.3-deb9

Alternatively, if we already had an existing cluster, we would use the workflow-templates set-cluster-selector command, to associate that cluster with the workflow template.

gcloud dataproc workflow-templates set-cluster-selector \
  $TEMPLATE_ID \
  --region $REGION \
  --cluster-labels goog-dataproc-cluster-uuid=$CLUSTER_UUID

To get the existing cluster’s UUID label value, you could use a command similar to the following.

CLUSTER_UUID=$(gcloud dataproc clusters describe $CLUSTER_2 \
  --region $REGION \
  | grep 'goog-dataproc-cluster-uuid:' \
  | sed 's/.* //')

echo $CLUSTER_UUID

1c27efd2-f296-466e-b14e-c4263d0d7e19

Adding Jobs

Next, we add the jobs we want to run to the template. Each job is considered a step in the template, each step requires a unique step id. We will add three jobs to the template, two Java-based Spark jobs from the previous post, and a new Python-based PySpark job.

First, we add the two Java-based Spark jobs, using the workflow-templates add-job spark command. This command’s flags are nearly identical to the dataproc jobs submit spark command, used in the previous post.

export STEP_ID=ibrd-small-spark
  
gcloud dataproc workflow-templates add-job spark \
  --region $REGION \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --class org.example.dataproc.InternationalLoansAppDataprocSmall \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar

export STEP_ID=ibrd-large-spark
  
gcloud dataproc workflow-templates add-job spark \
  --region $REGION \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --class org.example.dataproc.InternationalLoansAppDataprocLarge \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar

Next, we add the Python-based PySpark job, international_loans_dataproc.py, as the second job in the template. This Python script requires three input arguments, on lines 15–17, which are the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket where the results will be placed (gist).

We pass the arguments to the Python script as part of the PySpark job, using the workflow-templates add-job pyspark command.

export STEP_ID=ibrd-large-pyspark
  
gcloud dataproc workflow-templates add-job pyspark \
  $BUCKET_NAME/international_loans_dataproc.py \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --region $REGION \
  -- $BUCKET_NAME \
     ibrd-statement-of-loans-historical-data.csv \
     ibrd-summary-large-python

That’s it, we have created our first Cloud Dataproc Workflow Template using the Dataproc WorkflowTemplate API. To view our template we can use the following two commands. First, use the workflow-templates list command to display a list of available templates. The list command output displays the version of the workflow template and how many jobs are in the template.

gcloud dataproc workflow-templates list --region $REGION
  
ID               JOBS  UPDATE_TIME               VERSION
template-demo-1  3     2018-12-15T16:32:06.508Z  5

Then, we use the workflow-templates describe command to show the details of a specific template.

gcloud dataproc workflow-templates describe \
  $TEMPLATE_ID --region $REGION

Using the workflow-templates describe command, we should see output similar to the following (gist).

In the template description, notice the template’s id, the managed cluster in the placement section, and the three jobs, all which we added using the above series of workflow-templates commands. Also, notice the creation and update timestamps and version number, which were automatically generated by Dataproc. Lastly, notice the name, which refers to the GCP project and region where this copy of the template is located. Had we used an existing cluster with our workflow, as opposed to a managed cluster, the placement section would have looked as follows.

placement:
  clusterSelector:
    clusterLabels:
      goog-dataproc-cluster-uuid: your_clusters_uuid_label_value

To instantiate the workflow, we use the workflow-templates instantiate command. This command will create the managed cluster, run all the steps (jobs), then delete the cluster. I have added the time command to see how fast the workflow will take to complete.

time gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION #--async

We can observe the progress from the Google Cloud Dataproc Console, or from the command line by omitting the --async flag. Below we see the three jobs completed successfully on the managed cluster.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/e720bb96-9c87-330e-b1cd-efa4612b3c57].
WorkflowTemplate [template-demo-1] RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/e1fe53de-92f2-4f8c-8b3a-fda5e13829b6].
Created cluster: three-node-cluster-ugdo4ygpl52bo.
Job ID ibrd-small-spark-ugdo4ygpl52bo RUNNING
Job ID ibrd-large-spark-ugdo4ygpl52bo RUNNING
Job ID ibrd-large-pyspark-ugdo4ygpl52bo RUNNING
Job ID ibrd-small-spark-ugdo4ygpl52bo COMPLETED
Job ID ibrd-large-spark-ugdo4ygpl52bo COMPLETED
Job ID ibrd-large-pyspark-ugdo4ygpl52bo COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/f2a40c33-3cdf-47f5-92d6-345463fbd404].
WorkflowTemplate [template-demo-1] DONE
Deleted cluster: three-node-cluster-ugdo4ygpl52bo.

1.02s user 0.35s system 0% cpu 5:03.55 total

In the output, you see the creation of the cluster, the three jobs running and completing successfully, and finally the cluster deletion. The entire workflow took approximately 5 minutes to complete. Below is the view of the workflow’s results from the Dataproc Clusters Console Jobs tab.

screen_shot_2018-12-15_at_11.42.44_am

Below we see the output from the PySpark job, run as part of the workflow template, shown in the Dataproc Clusters Console Output tab. Notice the three input arguments we passed to the Python script from the workflow template, listed in the output.

screen_shot_2018-12-15_at_11.43.56_am

We see the arguments passed to the job, from the Jobs Configuration tab.

screen_shot_2018-12-15_at_1.11.11_pm.png

Examining the Google Cloud Dataproc Jobs Console, we will observe that the WorkflowTemplate API automatically adds a unique alphanumeric extension to both the name of the managed clusters we create, as well as to the name of each job that is run. The extension on the cluster name matches the extension on the jobs ran on that cluster.

screen_shot_2018-12-15_at_1.05.41_pm

YAML-based Workflow Template

Although, the above WorkflowTemplates API-based workflow was certainly more convenient than using the individual Cloud Dataproc API commands. At a minimum, we don’t have to remember to delete our cluster when the jobs are complete, as I often do. To further optimize the workflow, we will introduce YAML-based Workflow Template. According to Google, you can define a workflow template in a YAML file, then instantiate the template to run the workflow. You can also import and export a workflow template YAML file to create and update a Cloud Dataproc workflow template resource.

We can export our first workflow template to create our YAML-based template file.

gcloud dataproc workflow-templates export template-demo-1 \
  --destination template-demo-2.yaml \
  --region $REGION

Below is our first YAML-based template, template-demo-2.yaml. You will need to replace the values in the template with your own values, based on your environment (gist).

Note the template looks almost similar to the template we just created previously using the WorkflowTemplates API. The YAML-based template requires the placement and jobs fields. All the available fields are detailed, here.

To run the template we use the workflow-templates instantiate-from-file command. Again, I will use the time command to measure performance.

time gcloud dataproc workflow-templates instantiate-from-file \
  --file template-demo-2.yaml \
  --region $REGION

Running the workflow-templates instantiate-from-file command will run a workflow, nearly identical to the workflow we ran in the previous example, with a similar timing. Below we see the three jobs completed successfully on the managed cluster, in approximately the same time as the previous workflow.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/7ba3c28e-ebfa-32e7-9dd6-d938a1cfe23b].
WorkflowTemplate RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/8d05199f-ed36-4787-8a28-ae784c5bc8ae].
Created cluster: three-node-cluster-5k3bdmmvnna2y.
Job ID ibrd-small-spark-5k3bdmmvnna2y RUNNING
Job ID ibrd-large-spark-5k3bdmmvnna2y RUNNING
Job ID ibrd-large-pyspark-5k3bdmmvnna2y RUNNING
Job ID ibrd-small-spark-5k3bdmmvnna2y COMPLETED
Job ID ibrd-large-spark-5k3bdmmvnna2y COMPLETED
Job ID ibrd-large-pyspark-5k3bdmmvnna2y COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/a436ae82-f171-4b0a-9b36-5e16406c75d5].
WorkflowTemplate DONE
Deleted cluster: three-node-cluster-5k3bdmmvnna2y.

1.16s user 0.44s system 0% cpu 4:48.84 total

Parameterization of Templates

To further optimize the workflow template process for re-use, we have the option of passing parameters to our template. Imagine you now receive new loan snapshot data files every night. Imagine you need to run the same data analysis on the financial transactions of thousands of your customers, nightly. Parameterizing templates makes it more flexible and reusable. By removing hard-codes values, such as Storage bucket paths and data file names, a single template may be re-used for multiple variations of the same job. Parameterization allows you to automate hundreds or thousands of Spark and Hadoop jobs in a workflow or workflows, each with different parameters, programmatically.

To demonstrate the parameterization of a workflow template, we create another YAML-based template with just the Python/PySpark job, template-demo-3.yaml. If you recall from our first example, the Python script, international_loans_dataproc.py, requires three input arguments: the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket, where the results will be placed.

We will replace four of the values in the template with parameters. We will inject those parameter’s values when we instantiate the workflow. Below is the new parameterized template. The template now has a parameters section from lines 26–46. They define parameters that will be used to replace the four values on lines 3–7 (gist).

Note the PySpark job’s three arguments and the location of the Python script have been parameterized. Parameters may include validation. As an example of validation, the template uses regex to validate the format of the Storage bucket path. The regex follows Google’s RE2 regular expression library syntax. If you need help with regex, the Regex Tester – Golang website is a convenient way to test your parameter’s regex validations.

First, we import the new parameterized YAML-based workflow template, using the workflow-templates import command. Then, we instantiate the template using the workflow-templates instantiate command. The workflow-templates instantiate command will run the single PySpark job, analyzing the smaller IBRD data file, and placing the resulting Parquet-format file in a directory within the Storage bucket. We pass the Python script location, bucket link, smaller IBRD data file name, and output directory, as parameters to the template, and therefore indirectly, three of these, as input arguments to the Python script.

export TEMPLATE_ID=template-demo-3

gcloud dataproc workflow-templates import $TEMPLATE_ID \
   --region $REGION --source template-demo-3.yaml
  
gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION --async \
  --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-latest-available-snapshot.csv",RESULTS_DIRECTORY="ibrd-summary-small-python"

Next, we will analyze the larger historic data file, using the same parameterized YAML-based workflow template, but changing two of the four parameters we are passing to the template with the workflow-templates instantiate command. This will run a single PySpark job on the larger IBRD data file and place the resulting Parquet-format file in a different directory within the Storage bucket.

time gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION \
  --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-historical-data.csv",RESULTS_DIRECTORY="ibrd-summary-large-python"

This is the power of parameterization—one workflow template and one job script, but two different datasets and two different results.

Below we see the single PySpark job ran on the managed cluster.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/b3c5063f-e3cf-3833-b613-83db12b82f32].
WorkflowTemplate [template-demo-3] RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105].
Created cluster: three-node-cluster-j6q2al2mkkqck.
Job ID ibrd-pyspark-j6q2al2mkkqck RUNNING
Job ID ibrd-pyspark-j6q2al2mkkqck COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/fe4a263e-7c6d-466e-a6e2-52292cbbdc9b].
WorkflowTemplate [template-demo-3] DONE
Deleted cluster: three-node-cluster-j6q2al2mkkqck.

0.98s user 0.40s system 0% cpu 4:19.42 total

Using the workflow-templates list command again, should display a list of two workflow templates.

gcloud dataproc workflow-templates list --region $REGION
  
ID               JOBS  UPDATE_TIME               VERSION
template-demo-3  1     2018-12-15T17:04:39.064Z  2
template-demo-1  3     2018-12-15T16:32:06.508Z  5

Looking within the Google Cloud Storage bucket, we should now see four different folders, the results of the workflows.

screen-shot-2018-12-16-at-11.58.32-am.png

Job Results and Testing

To check on the status of a job, we use the dataproc jobs wait command. This returns the standard output (stdout) and standard error (stderr) for that specific job.

export SET_ID=ibrd-large-dataset-pyspark-cxzzhr2ro3i54
  
gcloud dataproc jobs wait $SET_ID \
  --project $PROJECT_ID \
  --region $REGION

The dataproc jobs wait command is frequently used for automated testing of jobs, often within a CI/CD pipeline. Assume we have expected part of the job output that indicates success, such as a string, boolean, or numeric value. We could any number of test frameworks or other methods to confirm the existence of that expected value or values. Below is a simple example of using the grep command to check for the existence of the expected line ‘ state: FINISHED’ in the standard output of the dataproc jobs wait command.

command=$(gcloud dataproc jobs wait $SET_ID \
--project $PROJECT_ID \
--region $REGION) &>/dev/null

if grep -Fqx "  state: FINISHED" <<< $command &>/dev/null; then
  echo "Job Success!"
else
  echo "Job Failure?"
fi

# single line alternative
if grep -Fqx "  state: FINISHED" <<< $command &>/dev/null;then echo "Job Success!";else echo "Job Failure?";fi

Job Success!

Individual Operations

To view individual workflow operations, use the operations list and operations describe commands. The operations list command will list all operations.

Notice the three distinct series of operations within each workflow, shown with the operations list command: WORKFLOW, CREATE, and DELETE. In the example below, I’ve separated the operations by workflow, for better clarity.

gcloud dataproc operations list --region $REGION

NAME                                  TIMESTAMP                 TYPE      STATE  ERROR  WARNINGS
fe4a263e-7c6d-466e-a6e2-52292cbbdc9b  2018-12-15T17:11:45.178Z  DELETE    DONE
896b7922-da8e-49a9-bd80-b1ac3fda5105  2018-12-15T17:08:38.322Z  CREATE    DONE
b3c5063f-e3cf-3833-b613-83db12b82f32  2018-12-15T17:08:37.497Z  WORKFLOW  DONE
---
be0e5293-275f-46ad-b1f4-696ba44c222e  2018-12-15T17:07:26.305Z  DELETE    DONE
6784078c-cbe3-4c1e-a56e-217149f555a4  2018-12-15T17:04:40.613Z  CREATE    DONE
fcd8039e-a260-3ab3-ad31-01abc1a524b4  2018-12-15T17:04:40.007Z  WORKFLOW  DONE
---
b4b23ca6-9442-4ffb-8aaf-460bac144dd8  2018-12-15T17:02:16.744Z  DELETE    DONE
89ef9c7c-f3c9-4d01-9091-61ed9e1f085d  2018-12-15T17:01:45.514Z  CREATE    DONE
243fa7c1-502d-3d7a-aaee-b372fe317570  2018-12-15T17:01:44.895Z  WORKFLOW  DONE

We use the results of the operations list command to execute the operations describe command to describe a specific operation.

gcloud dataproc operations describe \
  projects/$PROJECT_ID/regions/$REGION/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105

Each type of operation contains different details. Note the fine-grain of detail we get from Dataproc using the operations describe command for a CREATE operation (gist).

Conclusion

In this brief, follow-up post to the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we have seen how easy the WorkflowTemplates API and YAML-based workflow templates make automating our analytics jobs. This post only scraped the surface of the complete functionality of the WorkflowTemplates API and parameterization of templates.

In a future post, we leverage the automation capabilities of the Google Cloud Platform, the WorkflowTemplates API, YAML-based workflow templates, and parameterization, to develop a fully-automated DevOps for Big Data workflow, capable of running hundreds of Spark and Hadoop jobs.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

1 Comment

Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives.

However, installing, configuring, and managing the technologies that support big data analytics, data science, ML, and AI, at scale and in Production, often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. The mere ability to manage terabytes and petabytes of transient data is beyond the capability of many enterprises, let alone performing analysis of that data.

To ease the burden of implementing these technologies, the three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data Analytics-, AI-, and ML-as-a-Service offerings. In this post, we will explore one such cloud-based service offering in the field of big data analytics, Google Cloud Dataproc. We will focus on Cloud Dataproc’s ability to quickly and efficiently run Spark jobs written in Java and Python, two widely adopted enterprise programming languages.

Featured Technologies

The following technologies are featured prominently in this post.

dataproc

Google Cloud Dataproc

dataproc_logoAccording to Google, Cloud Dataproc is a fast, easy-to-use, fully-managed cloud service for running the Apache Spark and Apache Hadoop ecosystem on Google Cloud Platform. Dataproc is a complete platform for data processing, analytics, and machine learning. Dataproc offers per-second billing, so you only pay for exactly the resources you consume. Dataproc offers frequently updated and native versions of Apache Spark, Hadoop, Pig, and Hive, as well as other related applications. Dataproc has built-in integrations with other Google Cloud Platform (GCP) services, such as Cloud Storage, BigQuery, Bigtable, Stackdriver Logging, and Stackdriver Monitoring. Dataproc’s clusters are configurable and resizable from a three to hundreds of nodes, and each cluster action takes less than 90 seconds on average.

Similar Platform as a Service (PaaS) offerings to Dataproc, include Amazon Elastic MapReduce (EMR), Microsoft Azure HDInsight, and Qubole Data Service. Qubole is offered on AWS, Azure, and Oracle Cloud Infrastructure (Oracle OCI).

According to Google, Cloud Dataproc and Cloud Dataflow, both part of GCP’s Data Analytics/Big Data Product offerings, can both be used for data processing, and there’s overlap in their batch and streaming capabilities. Cloud Dataflow is a fully-managed service for transforming and enriching data in stream and batch modes. Dataflow uses the Apache Beam SDK to provide developers with Java and Python APIs, similar to Spark.

Apache Spark

spark_logoAccording to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With in-memory speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.

According to a post by DataFlair, ‘the DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD (Resilient Distributed Dataset). In Spark DAG, every edge directs from earlier to later in the sequence. On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.’ Below, we see a three-stage DAG visualization, displayed using the Spark History Server Web UI, from a job demonstrated in this post.

Screen Shot 2018-12-15 at 11.20.57 PM

Spark’s polyglot programming model allows users to write applications in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). Spark may be run using its standalone cluster mode or on Apache Hadoop YARNMesos, and Kubernetes.

PySpark

pyspark_logoThe Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.

Apache Hadoop

hadoop_logo1According to Apache, the Apache Hadoop project develops open-source software for reliable, scalable, distributed computing. The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. This is a rather modest description of such a significant and transformative project. When we talk about Hadoop, often it is in the context of the project’s well-known modules, which includes:

  • Hadoop Common: The common utilities that support the other Hadoop modules
  • Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data
  • Hadoop YARN (Yet Another Resource Negotiator): A framework for job scheduling and cluster resource management, also known as ‘Hadoop NextGen’
  • Hadoop MapReduce: A YARN-based system for parallel processing of large datasets
  • Hadoop Ozone: An object store for Hadoop

Based on the Powered by Apache Hadoop list, there are many well-known enterprises and academic institutions using Apache Hadoop, including Adobe, eBay, Facebook, Hulu, LinkedIn, and The New York Times.

Spark vs. Hadoop

There are many articles and posts that delve into the Spark versus Hadoop debate, this post is not one of them. Although both are mature technologies, Spark, the new kid on the block, reached version 1.0.0 in May 2014, whereas Hadoop reached version 1.0.0, earlier, in December 2011. According to Google Trends, interest in both technologies has remained relatively high over the last three years. However, interest in Spark, based on the volume of searches, has been steadily outpacing Hadoop for well over a year now. The in-memory speed of Spark over HDFS-based Hadoop and ease of Spark SQL for working with structured data are likely big differentiators for many users coming from a traditional relational database background and users with large or streaming datasets, requiring near real-time processing.

spark-to-hadoop

In this post, all examples are built to run on Spark. This is not meant to suggest Spark is necessarily superior or that Spark runs better on Dataproc than Hadoop. In fact, Dataproc’s implementation of Spark relies on Hadoop’s core HDFS and YARN technologies to run.

Demonstration

To show the capabilities of Cloud Dataproc, we will create both a single-node Dataproc cluster and three-node cluster, upload Java- and Python-based analytics jobs and data to Google Cloud Storage, and execute the jobs on the Spark cluster. Finally, we will enable monitoring and notifications for the Dataproc clusters and the jobs running on the clusters with Stackdriver. The post will demonstrate the use of the Google Cloud Console, as well as Google’s Cloud SDK’s command line tools, for all tasks.

In this post, we will be uploading and running individual jobs on the Dataproc Spark cluster, as opposed to using the Cloud Dataproc Workflow Templates. According to Google, Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. Workflow Templates are useful for automating your Datapoc workflows, however, automation is not the primary topic of this post.

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for Java with Spark and one for Python with PySpark. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

Cost

Of course, there is a cost associated with provisioning cloud services. However, if you manage the Google Cloud Dataproc resources prudently, the costs are negligible. Regarding pricing, according to Google, Cloud Dataproc pricing is based on the size of Cloud Dataproc clusters and the duration of time that they run. The size of a cluster is based on the aggregate number of virtual CPUs (vCPUs) across the entire cluster, including the master and worker nodes. The duration of a cluster is the length of time, measured in minutes, between cluster creation and cluster deletion.

Over the course of writing the code for this post, as well as writing the post itself, the entire cost of all the related resources was a minuscule US$7.50. The cost includes creating, running, and deleting more than a dozen Dataproc clusters and uploading and executing approximately 75-100 Spark and PySpark jobs. Given the quick creation time of a cluster, 2 minutes on average or less in this demonstration, there is no reason to leave a cluster running longer than it takes to complete your workloads.

Kaggle Datasets

To explore the features of Dataproc, we will use a publicly-available dataset from Kaggle. Kaggle is a popular open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.

For this demonstration, I chose the IBRD Statement Of Loans Data dataset, from World Bank Financial Open Data, and available on Kaggle. The International Bank for Reconstruction and Development (IBRD) loans are public and publicly guaranteed debt extended by the World Bank Group. IBRD loans are made to, or guaranteed by, countries that are members of IBRD. This dataset contains historical snapshots of the Statement of Loans including the latest available snapshots.

screen_shot_2018-12-04_at_7.02.53_pm

There are two data files available. The ‘Statement of Loans’ latest available snapshots data file contains 8,713 rows of loan data (~3 MB), ideal for development and testing. The ‘Statement of Loans’ historic data file contains approximately 750,000 rows of data (~265 MB). Although not exactly ‘big data’, the historic dataset is large enough to sufficiently explore Dataproc. Both IBRD files have an identical schema with 33 columns of data (gist).

In this demonstration, both the Java and Python jobs will perform the same simple analysis of the larger historic dataset. For the analysis, we will ascertain the top 25 historic IBRD borrower, we will determine their total loan disbursements, current loan obligations, and the average interest rates they were charged for all loans. This simple analysis will be performed using Spark’s SQL capabilities. The results of the analysis, a Spark DataFrame containing 25 rows, will be saved as a CSV-format data file.

SELECT country, country_code,
       Format_number(total_disbursement, 0) AS total_disbursement,
       Format_number(total_obligation, 0) AS total_obligation,
       Format_number(avg_interest_rate, 2) AS avg_interest_rate
FROM   (SELECT country,
               country_code,
               Sum(disbursed) AS total_disbursement,
               Sum(obligation) AS total_obligation,
               Avg(interest_rate) AS avg_interest_rate
        FROM   loans
        GROUP  BY country, country_code
        ORDER  BY total_disbursement DESC
        LIMIT  25)

Google Cloud Storage

First, we need a location to store our Spark jobs, data files, and results, which will be accessible to Dataproc. Although there are a number of choices, the simplest and most convenient location for Dataproc is a Google Cloud Storage bucket. According to Google, Cloud Storage offers the highest level of availability and performance within a single region and is ideal for compute, analytics, and ML workloads in a particular region. Cloud Storage buckets are nearly identical to Amazon Simple Storage Service (Amazon S3), their object storage service.

Using the Google Cloud Console, Google’s Web Admin UI, create a new, uniquely named Cloud Storage bucket. Our Dataproc clusters will eventually be created in a single regional location. We need to ensure our new bucket is created in the same regional location as the clusters; I chose us-east1.

screen_shot_2018-12-04_at_7.04.45_pm

We will need the new bucket’s link, to use within the Java and Python code as well from the command line with gsutil. The gsutil tool is a Python application that lets you access Cloud Storage from the command line. The bucket’s link may be found on the Storage Browser Console’s Overview tab. A bucket’s link is always in the format, gs://bucket-name.

screen_shot_2018-12-04_at_7.06.06_pm

Alternatively, we may also create the Cloud Storage bucket using gsutil with the make buckets (mb) command, as follows:

# Always best practice since features are updated frequently
gcloud components update
  
export PROJECT=your_project_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
# Make sure you are creating resources in the correct project
gcloud config set project $PROJECT
  
gsutil mb -p $PROJECT -c regional -l $REGION $BUCKET_NAME

Cloud Dataproc Cluster

Next, we will create two different Cloud Dataproc clusters for demonstration purposes. If you have not used Cloud Dataproc previously in your GCP Project, you will first need to enable the API for Cloud Dataproc.

screen_shot_2018-12-04_at_7.15.05_pm

Single Node Cluster

We will start with a single node cluster with no worker nodes, suitable for development and testing Spark and Hadoop jobs, using small datasets. Create a single-node Dataproc cluster using the Single Node Cluster mode option. Create the cluster in the same region as the new Cloud Storage bucket. This will allow the Dataproc cluster access to the bucket without additional security or IAM configuration. I used the n1-standard-1 machine type, with 1 vCPU and 3.75 GB of memory. Observe the resources assigned to Hadoop YARN for Spark job scheduling and cluster resource management.

screen_shot_2018-12-04_at_7.19.37_pm

The new cluster, consisting of a single node and no worker nodes, should be ready for use in a few minutes or less.

screen_shot_2018-12-04_at_7.38.23_pm

Note the Image version, 1.3.16-deb9. According to Google, Dataproc uses image versions to bundle operating system, big data components, and Google Cloud Platform connectors into one package that is deployed on a cluster.  This image, released in November 2018, is the latest available version at the time of this post. The image contains:

  • Apache Spark 2.3.1
  • Apache Hadoop 2.9.0
  • Apache Pig 0.17.0
  • Apache Hive 2.3.2
  • Apache Tez 0.9.0
  • Cloud Storage connector 1.9.9-hadoop2
  • Scala 2.11.8
  • Python 2.7

To avoid lots of troubleshooting, make sure your code is compatible with the image’s versions. It is important to note the image does not contain a version of Python 3. You will need to ensure your Python code is built to run with Python 2.7. Alternatively, use Dataproc’s --initialization-actions flag along with bootstrap and setup shell scripts to install Python 3 on the cluster using pip or conda. Tips for installing Python 3 on Datapoc be found on Stack Overflow and elsewhere on the Internet.

As as an alternative to the Google Cloud Console, we are able to create the cluster using a REST command. Google provides the Google Cloud Console’s equivalent REST request, as shown in the example below.

screen_shot_2018-12-04_at_7.20.07_pm

Additionally, we have the option of using the gcloud command line tool. This tool provides the primary command-line interface to Google Cloud Platform and is part of Google’s Cloud SDK, which also includes the aforementioned gsutil. Here again, Google provides the Google Cloud Console’s equivalent gcloud command. This is a great way to learn to use the command line.

screen_shot_2018-12-04_at_7.20.21_pm

Using the dataproc clusters create command, we are able to create the same cluster as shown above from the command line, as follows:

export PROJECT=your_project_name
export CLUSTER_1=your_single_node_cluster_name 
export REGION=us-east1
export ZONE=us-east1-b
export MACHINE_TYPE_SMALL=n1-standard-1
  
gcloud dataproc clusters create $CLUSTER_1 \
  --region $REGION \
  --zone $ZONE \
  --single-node \
  --master-machine-type $MACHINE_TYPE_SMALL \
  --master-boot-disk-size 500 \
  --image-version 1.3-deb9 \
  --project $PROJECT

There are a few useful commands to inspect your running Dataproc clusters. The dataproc clusters describe command, in particular, provides detailed information about all aspects of the cluster’s configuration and current state.

gcloud dataproc clusters list --region $REGION

gcloud dataproc clusters describe $CLUSTER_2 \
  --region $REGION --format json

Standard Cluster

In addition to the single node cluster, we will create a second three-node Dataproc cluster. We will compare the speed of a single-node cluster to that of a true cluster with multiple worker nodes. Create a new Dataproc cluster using the Standard Cluster mode option. Again, make sure to create the cluster in the same region as the new Storage bucket.

screen_shot_2018-12-04_at_10.15.05_pm

The second cluster contains a single master node and two worker nodes. All three nodes use the n1-standard-4 machine type, with 4 vCPU and 15 GB of memory. Although still considered a minimally-sized cluster, this cluster represents a significant increase in compute power over the first single-node cluster, which had a total of 2 vCPU, 3.75 GB of memory, and no worker nodes on which to distribute processing. Between the two workers in the second cluster, we have 8 vCPU and 30 GB of memory for computation.

screen_shot_2018-12-04_at_10.18.54_pm

Again, we have the option of using the gcloud command line tool to create the cluster:

export PROJECT=your_project_name
export CLUSTER_2=your_three_node_cluster_name 
export REGION=us-east1
export ZONE=us-east1-b
export NUM_WORKERS=2
export MACHINE_TYPE_LARGE=n1-standard-4
  
gcloud dataproc clusters create $CLUSTER_2 \
  --region $REGION \
  --zone $ZONE \
  --master-machine-type $MACHINE_TYPE_LARGE \
  --master-boot-disk-size 500 \
  --num-workers $NUM_WORKERS \
  --worker-machine-type $MACHINE_TYPE_LARGE \
  --worker-boot-disk-size 500 \
  --image-version 1.3-deb9 \
  --project $PROJECT

Cluster Creation Speed: Cloud Dataproc versus Amazon EMS?

In a series of rather unscientific tests, I found the three-node Dataproc cluster took less than two minutes on average to be created. Compare that time to a similar three-node cluster built with Amazon’s EMR service using their general purpose m4.4xlarge Amazon EC2 instance type. In a similar series of tests, I found the EMR cluster took seven minutes on average to be created. The EMR cluster took 3.5 times longer to create than the comparable Dataproc cluster. Again, although not a totally accurate comparison, since both services offer different features, it gives you a sense of the speed of Dataproc as compared to Amazon EMR.

Staging Buckets

According to Google, when you create a cluster, Cloud Dataproc creates a Cloud Storage staging bucket in your project or reuses an existing Cloud Dataproc-created bucket from a previous cluster creation request. Staging buckets are used to stage miscellaneous configuration and control files that are needed by your cluster. Below, we see the staging buckets created for the two Dataproc clusters.

screen_shot_2018-12-04_at_10.26.49_pm

Project Files

Before uploading the jobs and running them on the Cloud Dataproc clusters, we need to understand what is included in the two GitHub projects. If you recall from the Kaggle section of the post, both projects are basically the same but, written in different languages, Java and Python. The jobs they contain all perform the same basic analysis on the dataset.

Java Project

The dataproc-java-demo Java-based GitHub project contains three classes, each which are jobs to run by Spark. The InternationalLoansApp Java class is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file (gist).

On line 20, the Spark Session’s Master URL, .master("local[*]"), directs Spark to run locally with as many worker threads as logical cores on the machine. There are several options for setting the Master URL, detailed here.

On line 30, the path to the data file, and on line 84, the output path for the data file, is a local relative file path.

On lines 38–42, we do a bit of clean up on the column names, for only those columns we are interested in for the analysis. Be warned, the column names of the IBRD data are less than ideal for SQL-based analysis, containing mixed-cased characters, word spaces, and brackets.

On line 79, we call Spark DataFrame’s repartition method, dfDisbursement.repartition(1). The repartition method allows us to recombine the results of our analysis and output a single CSV file to the bucket. Ordinarily, Spark splits the data into partitions and executes computations on the partitions in parallel. Each partition’s data is written to separate CSV files when a DataFrame is written back to the bucket.

Using coalesce(1) or repartition(1) to recombine the resulting 25-Row DataFrame on a single node is okay for the sake of this demonstration, but is not practical for recombining partitions from larger DataFrames. There are more efficient and less costly ways to manage the results of computations, depending on the intended use of the resulting data.

screen_shot_2018-12-05_at_4.04.24_pm

The InternationalLoansAppDataprocSmall class is intended to be run on the Dataproc clusters, analyzing the same smaller CSV data file. The InternationalLoansAppDataprocLarge class is also intended to be run on the Dataproc clusters, however, it analyzes the larger 750K rows of data in the IRBD historic CSV file (gist).

On line 20, note the Spark Session’s Master URL, .master(yarn), directs Spark to connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode when submitting the job. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable. Recall, the Dataproc cluster runs Spark on YARN.

Also, note on line 30, the path to the data file, and on line 63, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv"). Cloud Dataproc clusters automatically install the Cloud Storage connector. According to Google, there are a number of benefits to choosing Cloud Storage over traditional HDFS including data persistence, reliability, and performance.

These are the only two differences between the local version of the Spark job and the version of the Spark job intended for Dataproc. To build the project’s JAR file, which you will later upload to the Cloud Storage bucket, compile the Java project using the gradle build command from the root of the project. For convenience, the JAR file is also included in the GitHub repository.

screen_shot_2018-12-07_at_12.57.55_pm

Python Project

The dataproc-python-demo Python-based GitHub project contains two Python scripts to be run using PySpark for this post. The international_loans_local.py Python script is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file. It does a few different analysis with the smaller dataset. (gist).

Identical to the corresponding Java class, note on line 12, the Spark Session’s Master URL, .master("local[*]"), directs Spark to run locally with as many worker threads as logical cores on the machine.

Also identical to the corresponding Java class, note on line 26, the path to the data file, and on line 66, the output path for the resulting data file, is a local relative file path.

screen_shot_2018-12-05_at_4.02.50_pm

The international_loans_dataproc-large.py Python script is intended to be run on the Dataproc clusters, analyzing the larger 750K rows of data in the IRBD historic CSV file (gist).

On line 12, note the Spark Session’s Master URL, .master(yarn), directs Spark to connect to a YARN cluster.

Again, note on line 26, the path to the data file, and on line 59, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv").

These are the only two differences between the local version of the PySpark job and the version of the PySpark job intended for Dataproc. With Python, there is no pre-compilation necessary. We will upload the second script, directly.

Uploading Job Resources to Cloud Storage

In total, we need to upload four items to the new Cloud Storage bucket we created previously. The items include the two Kaggle IBRD CSV files, the compiled Java JAR file from the dataproc-java-demo project, and the Python script from the dataproc-python-demo project. Using the Google Cloud Console, upload the four files to the new Google Storage bucket, as shown below. Make sure you unzip the two Kaggle IRBD CSV data files before uploading.

screen_shot_2018-12-05_at_12.52.51_pm

Like before, we also have the option of using gsutil with the copy (cp) command to upload the four files. The cp command accepts wildcards, as shown below.

export PROJECT=your_project_name
export BUCKET_NAME=gs://your_bucket_name
  
gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME
gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME
gsutil cp international_loans_dataproc_large.py $BUCKET_NAME

If our Java or Python jobs were larger, or more complex and required multiple files to run, we could also choose to upload ZIP or other common compression formatted archives using the --archives flag.

Running Jobs on Dataproc

The easiest way to run a job on the Dataproc cluster is by submitting a job through the Dataproc Jobs UI, part of the Google Cloud Console.

screen_shot_2018-12-05_at_11.29.34_pm

Dataproc has the capability of running multiple types of jobs, including:

  • Hadoop
  • Spark
  • SparkR
  • PySpark
  • Hive
  • SparkSql
  • Pig

We will be running both Spark and PySpark jobs as part of this demonstration.

Spark Jobs

To run a Spark job using the JAR file, select Job type Spark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters in your chosen region. Run both jobs at least twice, once on both clusters, for a total of four jobs.

screen_shot_2018-12-05_at_12.57.55_pm

Lastly, you will need to input the main class and the path to the JAR file. The JAR location will be:

gs://your_bucket_name/dataprocJavaDemo-1.0-SNAPSHOT.jar

The main class for the smaller dataset will be:

org.example.dataproc.InternationalLoansAppDataprocSmall

The main class for the larger dataset will be:

org.example.dataproc.InternationalLoansAppDataprocLarge

During or after job execution, you may view details in the Output tab of the Dataproc Jobs console.

screen_shot_2018-12-04_at_7.53.27_pm

Like every other step in this demonstration, we can also use the gcloud command line tool, instead of the web console, to submit our Spark jobs to each cluster. Here, I am submitting the larger dataset Spark job to the three-node cluster.

export CLUSTER_2=your_three_node_cluster_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
gcloud dataproc jobs submit spark \
  --region $REGION \
  --cluster $CLUSTER_2 \
  --class org.example.dataproc.InternationalLoansAppDataprocLarge \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar \
  --async

PySpark Jobs

To run a Spark job using the Python script, select Job type PySpark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters. Run the job at least twice, once on both clusters.

screen_shot_2018-12-05_at_12.53.36_pm

Lastly, you will need to input the main Python file path. There is only one Dataproc Python script, which analyzes the larger dataset. The script location will be:

gs://your_bucket_name/international_loans_dataproc_large.py

Like every other step in this demonstration, we can also use the gcloud command line tool instead of the web console to submit our PySpark jobs to each cluster. Below, I am submitting the PySpark job to the three-node cluster.

export CLUSTER_2=your_three_node_cluster_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
gcloud dataproc jobs submit pyspark \
  $BUCKET_NAME/international_loans_dataproc_large.py \
  --region $REGION \
  --cluster $CLUSTER_2 \
  --async

Including the optional --async flag with any of the dataproc jobs submit command, the job will be sent to the Dataproc cluster and immediately release the terminal back to the user. If you do not to use the --async flag, the terminal will be unavailable until the job is finished.

However, without the flag, we will get the standard output (stdout) and standard error (stderr) from Dataproc. The output includes some useful information, including different stages of the job execution lifecycle and execution times.

screen_shot_2018-12-05_at_10.38.52_pm

File Output

During development and testing, outputting results to the console is useful. However, in Production, the output from jobs is most often written to Apache Parquet, Apache Avro, CSV, JSON, or XML format files, persisted Apache Hive, SQL, or NoSQL database, or streamed to another system for post-processing, using technologies such as Apache Kafka.

Once both the Java and Python jobs have run successfully on the Dataproc cluster, you should observe the results have been saved back to the Storage bucket. Each script saves its results to a single CSV file in separate directories, as shown below.

screen_shot_2018-12-05_at_4.09.31_pm.png

The final dataset, written to the CSV file, contains the results of the analysis results (gist).

Cleaning Up

When you are finished, make sure to delete your running clusters. This may be done through the Google Cloud Console. Deletion of the three-node cluster took, on average, slightly more than one minute.

screen_shot_2018-12-04_at_11.11.40_pm

As usual, we can also use the gcloud command line tool instead of the web console to delete the Dataproc clusters.

export CLUSTER_1=your_single_node_cluster_name
export CLUSTER_2=your_three_node_cluster_name 
export REGION=us-east1
  
yes | gcloud dataproc clusters delete $CLUSTER_1 --region $REGION
yes | gcloud dataproc clusters delete $CLUSTER_2 --region $REGION

Results

Some observations, based on approximately 75 successful jobs. First, both the Python job and the Java jobs ran in nearly the same amount of time on the single-node cluster and then on the three-node cluster. This is beneficial since, although, a lot of big data analysis is performed with Python, Java is still the lingua franca of many large enterprises.

screen_shot_2018-12-05_at_1.49.01_pm

Consecutive Execution

Below are the average times for running the larger dataset on both clusters, in Java, and in Python. The jobs were all run consecutively as opposed to concurrently. The best time was 59 seconds on the three-node cluster compared to the best time of 150 seconds on the single-node cluster, a difference of 256%. Given the differences in the two clusters, this large variation is expected. The average difference between the two clusters for running the large dataset was 254%.

chart2

Concurrent Execution

It is important to understand the impact of concurrently running multiple jobs on the same Dataproc cluster. To demonstrate this, both the Java and Python jobs were also run concurrently. In one such test, ten copies of the Python job were run concurrently on the three-node cluster.

concurrent-jobs

Observe that the execution times of the concurrent jobs increase in near-linear time. The first job completes in roughly the same time as the consecutively executed jobs, shown above, but each proceeding job’s execution time increases linearly.

chart1

According to Apache, when running on a cluster, each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application. Each application is given a maximum amount of resources it can use and holds onto them for its whole duration. Note no tuning was done to the Dataproc clusters to optimize for concurrent execution.

Really Big Data?

Although there is no exact definition of ‘big data’, 750K rows of data at 265 MB is probably not generally considered big data. Likewise, the three-node cluster used in this demonstration is still pretty diminutive. Lastly, the SQL query was less than complex. To really test the abilities of Dataproc would require a multi-gigabyte or multi-terabyte-sized dataset, divided amongst multiple files, computed on a much beefier cluster with more workers nodes and more computer resources.

Monitoring and Instrumentation

In addition to viewing the results of running and completed jobs, there are a number of additional monitoring resources, including the Hadoop Yarn Resource Manager, HDFS NameNode, and Spark History Server Web UIs, and Google Stackdriver. I will only briefly introduce these resources, and not examine any of these interfaces in detail. You’re welcome to investigate the resources for your own clusters. Apache lists other Spark monitoring and instrumentation resources in their documentation.

To access the Hadoop Yarn Resource Manager, HDFS NameNode, and Spark History Server Web UIs, you must create an SSH tunnel and run Chrome through a proxy. Google Dataproc provides both commands and a link to documentation in the Dataproc Cluster tab, to connect.

screen_shot_2018-12-15_at_9.09.00_pm

Hadoop Yarn Resource Manager Web UI

Once you are connected to the Dataproc cluster, via the SSH tunnel and proxy, the Hadoop Yarn Resource Manager Web UI is accessed on port 8088. The UI allows you to view all aspects of the YARN cluster and the distributed applications running on the YARN system.

screen_shot_2018-12-15_at_9.15.27_pm

HDFS NameNode Web UI

Once you are connected to the Dataproc cluster, via the SSH tunnel and proxy, the HDFS NameNode Web UI may is accessed on port 9870. According to the Hadoop Wiki, the NameNode is the centerpiece of an HDFS file system. It keeps the directory tree of all files in the file system, and tracks were across the cluster the file data is kept. It does not store the data of these files itself.

screen_shot_2018-12-15_at_9.41.19_pm

Spark History Server Web UI

We can view the details of all completed jobs using the Spark History Server Web UI. Once you are connected to the cluster, via the SSH tunnel and proxy, the Spark History Server Web UI is accessed on port 18080. Of all the methods of reviewing aspects of a completed Spark job, the History Server provides the most detailed.

screen_shot_2018-12-15_at_9.15.09_pm

Using the History Server UI, we can drill into fine-grained details of each job, including the event timeline.

screen_shot_2018-12-15_at_9.22.32_pm

Also, using the History Server UI, we can see a visualization of the Spark job’s DAG (Directed Acyclic Graph). DataBricks published an excellent post on learning how to interpret the information visualized in the Spark UI.

screen_shot_2018-12-15_at_9.19.15_pm

Not only can view the DAG and drill into each Stage of the DAG, from the UI.

screen_shot_2018-12-15_at_10.22.33_pm

Stackdriver

We can also enable Google Stackdriver for monitoring and management of services, containers, applications, and infrastructure. Stackdriver offers an impressive array of services, including debugging, error reporting, monitoring, alerting, tracing, logging, and dashboards, to mention only a few Stackdriver features.

screen_shot_2018-12-05_at_3.18.31_pm

There are dozens of metrics available, which collectively, reflect the health of the Dataproc clusters. Below we see the states of one such metric, the YARN virtual cores (vcores). A YARN vcore, introduced in Hadoop 2.4, is a usage share of a host CPU.  The number of YARN virtual cores is equivalent to the number of worker nodes (2) times the number of vCPUs per node (4), for a total of eight YARN virtual cores. Below, we see that at one point in time, 5 of the 8 vcores have been allocated, with 2 more available.

screen_shot_2018-12-05_at_3.29.47_pm

Next, we see the states of the YARN memory size. YARN memory size is calculated as the number of worker nodes (2) times the amount of memory on each node (15 GB) times the fraction given to YARN (0.8), for a total of 24 GB (2 x 15 GB x 0.8). Below, we see that at one point in time, 20 GB of RAM is allocated with 4 GB available. At that instant in time, the workload does not appear to be exhausting the cluster’s memory.

screen_shot_2018-12-05_at_3.30.39_pm

Notifications

Since no one actually watches dashboards all day, waiting for something to fail, how do know when we have an issue with Dataproc? Stackdrive offers integrations with most popular notification channels, including email, SMS, Slack, PagerDuty, HipChat, and Webhooks. With Stackdriver, we define a condition which describes when a service is considered unhealthy. When triggered, Stackdriver sends a notification to one or more channels.

notifications

Below is a preview of two alert notifications in Slack. I enabled Slack as a notification channel and created an alert which is triggered each time a Dataproc job fails. Whenever a job fails, such as the two examples below, I receive a Slack notification through the Slack Channel defined in Stackdriver.

slack.png

Slack notifications contain a link, which routes you back to Stackdriver, to an incident which was opened on your behalf, due to the job failure.

incident

For convenience, the incident also includes a pre-filtered link directly to the log entries at the time of the policy violation. Stackdriver logging offers advanced filtering capabilities to quickly find log entries, as shown below.screen_shot_2018-12-09_at_12.52.51_pm

With Stackdriver, you get monitoring, logging, alerting, notification, and incident management as a service, with minimal cost and upfront configuration. Think about how much time and effort it takes the average enterprise to achieve this level of infrastructure observability on their own, most never do.

Conclusion

In this post, we have seen the ease-of-use, extensive feature-set, out-of-the-box integration ability with other cloud services, low cost, and speed of Google Cloud Dataproc, to run big data analytics workloads. Couple this with the ability of Stackdriver to provide monitoring, logging, alerting, notification, and incident management for Dataproc with minimal up-front configuration. In my opinion, based on these features, Google Cloud Dataproc leads other cloud competitors for fully-managed Spark and Hadoop Cluster management.

In future posts, we will examine the use of Cloud Dataproc Workflow Templates for process automation, the integration capabilities of Dataproc with services such as BigQuery, Bigtable, Cloud Dataflow, and Google Cloud Pub/Sub, and finally, DevOps for Big Data with Dataproc and tools like Spinnaker and Jenkins on GKE.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, nor Apache or Google.

, , , , , , , , , , , ,

2 Comments

Getting Started with PySpark for Big Data Analytics using Jupyter Notebooks and Jupyter Docker Stacks

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives. Due to their popularity and potential benefits, academic institutions and commercial enterprises are rushing to train large numbers of Data Scientists and ML and AI Engineers.

google_terms2

Learning popular programming paradigms, such as Python, Scala, R, Apache Hadoop, Apache Spark, and Apache Kafka, requires the use of multiple complex technologies. Installing, configuring, and managing these technologies often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. These barriers may prove a deterrent to Students, Mathematicians, Statisticians, and Data Scientists.

google_terms3

Driven by the explosive growth of these technologies and the need to train individuals, many commercial enterprises are lowering the barriers to entry, making it easier to get started. The three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data-, AI- and ML-as-a-Service offerings.

Similarly, many open-source projects are also lowering the barriers to entry into these technologies. An excellent example of an open-source project working on this challenge is Project Jupyter. Similar to the Spark Notebook and Apache Zeppelin projects, Jupyter Notebooks enables data-driven, interactive, and collaborative data analytics with Julia, Scala, Python, R, and SQL.

This post will demonstrate the creation of a containerized development environment, using Jupyter Docker Stacks. The environment will be suited for learning and developing applications for Apache Spark, using the Python, Scala, and R programming languages. This post is not intended to be a tutorial on Spark, PySpark, or Jupyter Notebooks.

Featured Technologies

The following technologies are featured prominently in this post.

pyspark_article_00b_feature

Jupyter Notebooks

According to Project Jupyter, the Jupyter Notebook, formerly known as the IPython Notebook, is an open-source web application that allows users to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The word, Jupyter, is a loose acronym for Julia, Python, and R, but today, the Jupyter supports many programming languages. Interest in Jupyter Notebooks has grown dramatically.

google_terms4

Jupyter Docker Stacks

To enable quick and easy access to Jupyter Notebooks, Project Jupyter has created Jupyter Docker Stacks. The stacks are ready-to-run Docker images containing Jupyter applications, along with accompanying technologies. Currently, eight different Jupyter Docker Stacks focus on a particular area of practice. They include SciPy (Python-based mathematics, science, and engineering), TensorFlow, R Project for statistical computing, Data Science with Julia, and the main subject of this post, PySpark. The stacks also include a rich variety of well-known packages to extend their functionality, such as scikit-learn, pandas, MatplotlibBokeh, ipywidgets (interactive HTML widgets), and Facets.

Apache Spark

According to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.

Spark’s polyglot programming model allows users to write applications quickly in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). You can run Spark using its standalone cluster mode, on Amazon EC2Apache Hadoop YARNMesos, or Kubernetes.

PySpark

The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.

Docker

According to Docker, their technology developers and IT the freedom to build, manage and secure business-critical applications without the fear of technology or infrastructure lock-in. Although Kubernetes is now the leading open-source container orchestration platform, Docker is still the predominant underlying container engine technology. For this post, I am using Docker Desktop Community version for macOS.

screen_shot_2019-06-09_at_7_41_12_am.png

Docker Swarm

Current versions of Docker include both a Kubernetes and Swarm orchestrator for deploying and managing containers. We will choose Swarm for this demonstration. According to Docker, Swarm is the cluster management and orchestration features embedded in the Docker Engine are built using swarmkit. Swarmkit is a separate project which implements Docker’s orchestration layer and is used directly within Docker.

PostgreSQL

PostgreSQL is a powerful, open-source object-relational database system. According to their website, PostgreSQL comes with many features aimed to help developers build applications, administrators to protect data integrity and build fault-tolerant environments, and help manage data no matter how big or small the dataset.

Demonstration

To show the capabilities of the Jupyter development environment, I will demonstrate a few typical use cases, such as executing Python scripts, submitting PySpark jobs, working with Jupyter Notebooks, and reading and writing data to and from different format files and to a database. We will be using the jupyter/all-spark-notebook:latest Docker Image. This image includes Python, R, and Scala support for Apache Spark, using Apache Toree.

Architecture

As shown below, we will stand-up a Docker stack, consisting of Jupyter All-Spark-Notebook, PostgreSQL 10.5, and Adminer containers. The Docker stack will have local directories bind-mounted into the containers. Files from our GitHub project will be shared with the Jupyter application container through a bind-mounted directory. Our PostgreSQL data will also be persisted through a bind-mounted directory. This allows us to persist data external to the ephemeral containers.

PySparkDocker.png

Source Code

All open-sourced code for this post can be found on GitHub. Use the following command to clone the project. The post and project code was last updated on 9/28/2019.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/pyspark-setup-demo.git

Source code samples are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers.

Deploy Docker Stack

To start, create the $HOME/data/postgres directory to store PostgreSQL data files. This directory will be bind-mounted into the PostgreSQL container on line 36 of the stack.yml file, $HOME/data/postgres:/var/lib/postgresql/data. The HOME environment variable assumes you are working on Linux or macOS and is equivalent to HOMEPATH on Windows.

The Jupyter container’s working directory is set on line 10 of the stack.yml file, working_dir:/home/$USER/workThe local bind-mounted working directory is $PWD/work. This path is bind-mounted to the working directory in the Jupyter container, on line 24 of the stack.yml file, $PWD/work:/home/$USER/work. The PWD environment variable assumes you are working on Linux or macOS (CD on Windows).

By default, the user within the Jupyter container is jovyan. Optionally, I have chosen to override that user with my own local host’s user account, as shown on line 16 of the stack.yml file, NB_USER: $USER. I have used the macOS host’s USER environment variable value (equivalent to USERNAME on Windows). There are many options for configuring the Jupyter container, detailed here. Several of those options are shown on lines 12-18 of the stack.yml file (gist).

Assuming you have a recent version of Docker installed on your local development machine, and running in swarm mode, standing up the stack is as easy as running the following command from the root directory of the project:

docker stack deploy -c stack.yml pyspark

The Docker stack consists of a new overlay network, pyspark-net, and three containers. To confirm the stack deployed, you can run the following command:

docker stack ps pyspark --no-trunc

pyspark_article_01_stack_deploy

Note the jupyter/all-spark-notebook container is quite large. Depending on your Internet connection, if this is the first time you have pulled this Docker image, the stack may take several minutes to enter a running state.

To access the Jupyter Notebook application, you need to obtain the Jupyter URL and access token (read more here). This information is output in the Jupyter container log, which can be accessed with the following command:

docker logs $(docker ps | grep pyspark_pyspark | awk '{print $NF}')

pyspark_article_02_pyspark_logs

Using the URL and token shown in the log output, you will be able to access the Jupyter web-based user interface on localhost port 8888. Once there, from the Jupyter dashboard landing page, you should see all the files in the project’s work/ directory.

Also shown below, note the types of files you are able to create from the dashboard, including Python 3, R, Scala (using Apache Toree or spylon-kernal), and text. You can also open a Jupyter Terminal or create a new Folder.

pyspark_article_27_browser.png

Running Python Scripts

Instead of worrying about installing and maintaining the latest version of Python and packages on your own development machine, we can run our Python scripts from the Jupyter container. At the time of this post update, the latest jupyter/all-spark-notebook Docker Image runs Python 3.7.3 and Conda 4.6.14. Let’s start with a simple example of the Jupyter container’s capabilities by running a Python script. I have included a sample Python script, 01_simple_script.py.

Run the script from within the Jupyter container, from a Jupyter Terminal window:

python ./01_simple_script.py

You should observe the following output.
pyspark_article_08_simple_script

Kaggle Datasets

To explore the features of the Jupyter Notebook container and PySpark, we will use a publicly available dataset from Kaggle. Kaggle is a fantastic open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.

For this demonstration, I chose the ‘Transactions from a Bakery’ dataset from Kaggle.

pyspark_article_03_kaggle

The dataset contains 21,294 rows, each with four columns of data. Although certainly nowhere near ‘big data’, the dataset is large enough to test out the Jupyter container functionality (gist).

Submitting Spark Jobs

We are not limited to Jupyter Notebooks to interact with Spark, we can also submit scripts directly to Spark from a Jupyter Terminal, or from our IDE. I have included a simple Python script, 02_bakery_dataframes.py. The script loads the Kaggle Bakery dataset from the CSV file into a Spark DataFrame. The script then prints out the top ten rows of data, along with a count of the total number of rows in the DataFrame.

Run the script directly from a Jupyter Terminal window:

python ./02_bakery_dataframes.py

An example of the output of the Spark job is shown below. At the time of this post update (6/7/2019), the latest jupyter/all-spark-notebook Docker Image runs Spark 2.4.3, Scala 2.11.12, and Java 1.8.0_191 using the OpenJDK.
pyspark_article_09_simple_spark

More typically, you would submit the Spark job, using the spark-submit command. Use a Jupyter Terminal window to run the following command:

$SPARK_HOME/bin/spark-submit 02_bakery_dataframes.py

Below, we see the beginning of the output from Spark, using the spark-submit command.
pyspark_article_09B1_spark_submit

Below, we see the scheduled tasks executing and the output of the print statement, displaying the top 10 rows of bakery data.

Interacting with Databases

Often with Spark, you are loading data from one or more data sources (input). After performing operations and transformations on the data, the data is persisted or conveyed to another system for further processing (output).

To demonstrate the flexibility of the Jupyter Docker Stacks to work with databases, I have added PostgreSQL to the Docker Stack. We can read and write data from the Jupyter container to the PostgreSQL instance, running in a separate container.

To begin, we will run a SQL script, written in Python, to create our database schema and some test data in a new database table. To do so, we will need to install the psycopg2 package into our Jupyter container. You can use the docker exec command from your terminal. Alternatively, as a superuser, your user has administrative access to install Python packages within the Jupyter container using the Jupyter Terminal window. Both pip and conda are available to install packages, see details here.

Run the following command to install psycopg2:

# using pip
docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  pip install psycopg2-binary

These packages give Python the ability to interact with PostgreSQL. The included Python script, 03_load_sql.py, will execute a set of SQL statements, contained in a SQL file, bakery_sample.sql, against the PostgreSQL container instance.

To execute the script, run the following command:

python ./03_load_sql.py

This should result in the following output, if successful.
pyspark_article_10_run_sql_py

To confirm the SQL script’s success, I have included Adminer. Adminer (formerly phpMinAdmin) is a full-featured database management tool written in PHP. Adminer natively recognizes PostgreSQL, MySQL, SQLite, and MongoDB, among other database engines.

Adminer should be available on localhost port 8080. The password credentials, shown below, are available in the stack.yml file. The server name, postgres, is the name of the PostgreSQL container. This is the domain name the Jupyter container will use to communicate with the PostgreSQL container.
pyspark_article_06_adminer_login

Connecting to the demo database with Adminer, we should see the bakery_basket table. The table should contain three rows of data, as shown below.
pyspark_article_07_bakery_data

Developing Jupyter NoteBooks

The true power of the Jupyter Docker Stacks containers is Jupyter Notebooks. According to the Jupyter Project, the notebook extends the console-based approach to interactive computing in a qualitatively new direction, providing a web-based application suitable for capturing the whole computation process: developing, documenting, and executing code, as well as communicating the results. Notebook documents contain the inputs and outputs of an interactive session as well as additional text that accompanies the code but is not meant for execution.

To see the power of Jupyter Notebooks, I have written a basic notebook document, 04_pyspark_demo_notebook.ipynb. The document performs some typical PySpark functions, such as loading data from a CSV file and from the PostgreSQL database, performing some basic data analytics with Spark SQL, graphing the data using BokehJS, and finally, saving data back to the database, as well as to the popular Apache Parquet file format. Below we see the notebook document, using the Jupyter Notebook user interface.

pyspark_article_11_notebook.png

PostgreSQL Driver

The only notebook document dependency, not natively part of the Jupyter Image, is the PostgreSQL JDBC driver. The driver, postgresql-42.2.8.jar, is included in the project and referenced in the configuration of the notebook’s Spark Session. The JAR is added to the spark.driver.extraClassPath runtime environment property. This ensures the JAR is available to Spark (written in Scala) when the job is run.

PyCharm

Since the working directory for the project is shared with the container, you can also edit files, including notebook documents, in your favorite IDE, such as JetBrains PyCharm. PyCharm has built-in language support for Jupyter Notebooks, as shown below.
pyspark_article_11_notebook_pycharm.png

As mentioned earlier, a key feature of Jupyter Notebooks is their ability to save the output from each Cell as part of the notebook document. Below, we see the notebook document on GitHub. The output is saved, as part of the notebook document. Not only can you distribute the notebook document, but you can also preserve and share the output from each cell.
pyspark_article_17_github

Using Additional Packages

As mentioned in the Introduction, the Jupyter Docker Stacks come ready-to-run, with a rich variety of Python packages to extend their functionality.  To demonstrate the use of these packages, I have created a second Jupyter notebook document, 05_pyspark_demo_notebook.ipynb. This notebook document uses SciPy (Python-based mathematics, science, and engineering), NumPy (Python-based scientific computing), and the Plotly Python Graphing Library. While NumPy and SciPy are included on the Jupyter Docker Image, the Notebook used pip to install Plotly. Similar to Bokeh, shown previously, we can combine these libraries to create richly interactive data visualizations. To use Plotly, you will need to sign up for a free account and obtain a username and API key.

Shown below, we use Plotly to construct a bar chart of daily bakery items sold for the year 2017 based on the Kaggle dataset. The chart uses SciPy and NumPy to construct a linear fit (regression) and plot a line of best fit for the bakery data. The chart also uses SciPy’s Savitzky-Golay Filter to plot the second line, illustrating a smoothing of our bakery data.

pyspark_article_23a_plotly

Plotly also provides Chart Studio Online Chart Maker. Plotly describes Chart Studio as the world’s most sophisticated editor for creating d3.js and WebGL charts. Shown below, we have the ability to enhance, stylize, and share our bakery data visualization using the free version of Chart Studio Cloud.

pyspark_article_23b_plotly

nbviewer

Notebooks can also be viewed using Jupyter nbviewer, hosted on Rackspace. Below, we see the output of a cell from this project’s notebook document, showing a BokehJS chart, using nbviewer. You can view this project’s actual notebook document, using nbviewer, here.

pyspark_article_26_nbviewer.png

Monitoring Spark Jobs

The Jupyter Docker container exposes Spark’s monitoring and instrumentation web user interface. We can observe each Spark Job in great detail.
pyspark_article_12_spark_jobs

We can review details of each stage of the Spark job, including a visualization of the DAG, which Spark constructs as part of the job execution plan, using the DAG Scheduler.
pyspark_article_12_spark_dag

We can also review the timing of each event, occurring as part of the stages of the Spark job.
pyspark_article_12_timeline

We can also use the Spark interface to review and confirm the runtime environment, including versions of Java, Scala, and Spark, as well as packages available on the Java classpath.
pyspark_article_13_enviornment

Spark Performance

Spark, running on a single node within the Jupyter container, on your development system, is not a substitute for a full Spark cluster, running on bare metal or robust virtualized hardware, with YARN, Mesos, or Kubernetes. In my opinion, you should adjust Docker to support an acceptable performance profile for the stack, running only a modest workload. You are not trying to replace the need to run real jobs on a Production Spark cluster.
screen_shot_2019-06-07_at_4_50_25_pm

We can use the docker stats command to examine the container’s CPU and memory metrics:

docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"

Below, we see the stats from the stack’s three containers immediately after being deployed, showing little or no activity. Here, Docker has been allocated 2 CPUs, 3GB of RAM, and 2 GB of swap space available, from the host machine.
pyspark_article_16a_perf

Compare the stats above with the same three containers, while the example notebook document is running on Spark. The CPU shows a spike, but memory usage appears to be within acceptable ranges.
pyspark_article_16b_perf

Linux top and htop

Another option to examine container performance metrics is with top. We can use the docker exec command to execute the top command within the Jupyter container, sorting processes by CPU usage:

docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  top -o %CPU

With top, we can observe the individual performance of each process running in the Jupyter container.

pyspark_article_20_top.png

Lastly, htop, an interactive process viewer for Unix, can be installed into the container and ran with the following set of bash commands, from a Jupyter Terminal window or using docker exec:

docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  sh -c "apt-get update && apt-get install htop && htop --sort-key PERCENT_CPU"

With htop, we can observe individual CPU activity. The two CPUs at the top left of the htop window are the two CPUs assigned to Docker. We get insight into the way Docker is using each CPU, as well as other basic performance metrics, like memory and swap.

pyspark_article_16f_htop.png

Assuming your development machine host has them available, it is easy to allocate more compute resources to Docker if required. However, in my opinion, this stack is optimized for development and learning, using reasonably sized datasets for data analysis and ML. It should not be necessary to allocate excessive resources to Docker, possibly starving your host machine’s own compute capabilities.
screen_shot_2019-06-07_at_4_50_45_pm

Conclusion

In this brief post, we have seen how easy it is to get started learning and developing applications for big data analytics, using Python, Spark, and PySpark, thanks to the Jupyter Docker Stacks. We could use the same stack to learn and develop for machine learning, using Python, Scala, and R. Extending the stack’s capabilities is as simple as swapping out this Jupyter image for another, with a different set of tools, as well as adding additional containers to the stack, such as Apache Kafka or Apache Cassandra.

Search results courtesy GoogleTrends (https://trends.google.com)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients.

, , , , , , , , , , , ,

Leave a comment