Posts Tagged Python

Getting Started with PySpark for Big Data Analytics, using Jupyter Notebooks and Docker

Introduction

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives. Due to their popularity and potential benefits, academic institutions and commercial enterprises are rushing to train large numbers of Data Scientists and ML and AI Engineers.

google_terms2

Learning popular technologies, such as Python, Scala, R, Apache Hadoop, Apache Spark, and Apache Kafka, requires the use of multiple complex technologies. Installing, configuring, and managing these technologies often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. These barriers may prove a deterrent to Students, Mathematicians, Statisticians, and Data Scientists.

google_terms3

Driven by the explosive growth of these technologies and the need to train individuals, many commercial enterprises are lowering the barriers to entry, making it easier to get started. The three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data-, AI- and ML-as-a-Service offerings.

Similarly, many open-source projects are also lowering the barriers to entry into these technologies. An excellent example of an open-source project working on this challenge is Project Jupyter. Similar to the Spark Notebook and Apache Zeppelin projects, Jupyter Notebooks enables data-driven, interactive, and collaborative data analytics with Julia, Scala, Python, R, and SQL.

This post will demonstrate the creation of a containerized development environment, using Jupyter Docker Stacks. The environment will be suited for learning and developing applications for Apache Spark, using the Python, Scala, and R programming languages. This post is not intended to be a tutorial on Spark, PySpark, or Jupyter Notebooks.

Technologies

The following technologies are featured prominently in this post.

pyspark_article_00b_feature

Jupyter Notebooks

According to Project Jupyter, the Jupyter Notebook, formerly known as the IPython Notebook, is an open-source web application that allows users to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The word, Jupyter, is a loose acronym for Julia, Python, and R, but today, the Jupyter supports many programming languages. Interest in Jupyter Notebooks has grown dramatically.

google_terms4

Jupyter Docker Stacks

To enable quick and easy access to Jupyter Notebooks, Project Jupyter has created Jupyter Docker Stacks. The stacks are ready-to-run Docker images containing Jupyter applications, along with accompanying technologies. Currently, eight different Jupyter Docker Stacks focus on a particular area of practice. They include SciPy (Python-based mathematics, science, and engineering), TensorFlow, R Project for statistical computing, Data Science with Julia, and the main subject of this post, PySpark. The stacks also include a rich variety of well-known packages to extend their functionality, such as scikit-learn, pandas, MatplotlibBokeh, ipywidgets (interactive HTML widgets), and Facets.

Apache Spark

According to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.

Spark’s polyglot programming model allows users to write applications quickly in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). You can run Spark using its standalone cluster mode, on Amazon EC2Apache Hadoop YARNMesos, or Kubernetes.

PySpark

The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.

Docker

According to Docker, their technology developers and IT the freedom to build, manage and secure business-critical applications without the fear of technology or infrastructure lock-in. Although Kubernetes is now the leading open-source container orchestration platform, Docker is still the predominant underlying container engine.

Docker Swarm

Current versions of Docker include swarm mode for natively managing a cluster of Docker Engines, called a swarm. The Docker CLI is used to create a swarm, deploy application services to a swarm, and manage swarm behavior.

PostgreSQL

PostgreSQL is a powerful, open source object-relational database system. According to their website, PostgreSQL comes with many features aimed to help developers build applications, administrators to protect data integrity and build fault-tolerant environments, and help manage data no matter how big or small the dataset.

Demonstration

To show the capabilities of the Jupyter development environment, I will demonstrate a few typical use cases, such as executing Python scripts, submitting PySpark jobs, working with Jupyter Notebooks, and reading and writing data to and from different format files and to a database. We will be using the jupyter/all-spark-notebook Docker Image. This image includes Python, R, and Scala support for Apache Spark, using Apache Toree.

Architecture

As shown below, we will stand-up a Docker stack, consisting of Jupyter All-Spark-Notebook, PostgreSQL 10.5, and Adminer containers. The Docker stack will have local directories bind-mounted into the containers. Files from our GitHub project will be shared with the Jupyter application container through a bind-mounted directory. Our PostgreSQL data will also be persisted through a bind-mounted directory. This allows us to persist data external to the ephemeral containers.

PySparkDocker.png

Source Code

All open-sourced code for this post can be found on GitHub. Source code samples are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers.

Deploy Docker Stack

To start, create the $HOME/data/postgre directory to store PostgreSQL data files. This directory will be bind mounted into the PostgreSQL container on line 36 of the stack.yml file, $HOME/data/postgre:/var/lib/postgresql/data. The HOME environment variable assumes you are working on Linux or MacOS, and is equivalent to HOMEPATH on Windows.

The Jupyter container’s working directory is set on line 10 of the stack.yml file, working_dir:/home/$USER/workThe local bind-mounted working directory is $PWD/work. This path is bind-mounted to the working directory in the Jupyter container, on line 24 of the stack.yml file, $PWD/work:/home/$USER/work. The PWD environment variable assumes you are working on Linux or MacOS (CD on Windows).

By default, the user within the Jupyter container is jovyan. Optionally, I have chosen to override that user with my own local host’s user account, as shown on line 16 of the stack.yml file, NB_USER: $USER. I have used the MacOS host’s USER environment variable value (equivalent to USERNAME on Windows). There are many options for configuring the Jupyter container, detailed here. Several of those options are shown on lines 12-18 of the stack.yml file.

Assuming you have a recent version of Docker installed on your local development machine, and running in swarm mode, standing up the stack is as easy as running the following command from the root directory of the project:

docker stack deploy -c stack.yml pyspark

The Docker stack consists of a new overlay network, pyspark-net and the three containers. To confirm the stack deployed, you can run the following command:

docker stack ps pyspark --no-trunc

pyspark_article_01_stack_deploy

To access the Jupyter Notebook application, you need to obtain the Jupyter URL and access token (read more here). This information is output in the Jupyter container log, which can be accessed with the following command:

docker logs $(docker ps | grep _pyspark | awk '{print $NF}')

pyspark_article_02_pyspark_logs

Using the URL and token shown in the log output, you will be able to access the Jupyter web-based user interface on localhost port 8888. Once there, from Jupyter dashboard landing page, you should see all the files in the project’s work/ directory.

Also shown below, note the types of files you are able to create from the dashboard, including Python 3, R, Scala (using Apache Toree or spylon-kernal), and text. You can also open a Jupyter Terminal or create a new Folder.

pyspark_article_27_browser.png

Running Python Scripts

Instead of worrying about installing and maintaining the latest version of Python and packages on your own development machine, we can run our Python scripts from the Jupyter container. At the time of this post, the latest jupyter/all-spark-notebook Docker Image runs Python 3.6.6. Let’s start with a simple example of the Jupyter container’s capabilities by running a Python script. I’ve included a sample Python script, 01_simple_script.py.

Run the script from within the Jupyter container, from a Jupyter Terminal window:

python ./01_simple_script.py

You should observe the following output.
pyspark_article_08_simple_script

Kaggle Datasets

To explore the features of the Jupyter Notebook container and PySpark, we will use a publically-available dataset from Kaggle. Kaggle is a fantastic open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.

For this demonstration, I chose the ‘Transactions from a Bakery’ dataset from Kaggle.

pyspark_article_03_kaggle

The dataset contains 21,294 rows, each with four columns of data. Although certainly nowhere near ‘big data’, the dataset is large enough to test out the Jupyter container functionality.

Submitting Spark Jobs

We are not limited to Jupyter Notebooks to interact with Spark, we can also submit scripts directly to Spark from a Jupyter Terminal, or from our IDE. I have included a simple Python script, 02_bakery_dataframes.py. The script loads the Kaggle Bakery dataset from the CSV file into a Spark DataFrame. The script then prints out the top ten rows of data, along with a count of the total number of rows in the DataFrame.

Run the script directly from a Jupyter Terminal window:

python ./02_bakery_dataframes.py

An example of the output of the Spark job is shown below. At the time of this post, the latest jupyter/all-spark-notebook Docker Image runs Spark 2.3.1, Scala 2.11.8, Conda 4.5.11, and Java 1.8.0 using the OpenJDK.
pyspark_article_09_simple_spark

More typically, you would submit the Spark job, using the spark-submit command. Use a Jupyter Terminal window to run the following command:

$SPARK_HOME/bin/spark-submit 02_bakery_dataframes.py

Below, we see the beginning of the output from Spark, using the spark-submit command.
pyspark_article_09B1_spark_submit

Below, we see the scheduled tasks executing and the output of the print statement, displaying the top 10 rows of bakery data.

Interacting with Databases

Often with Spark, you are loading data from one or more data sources (input). After performing operations and transformations on the data, the data is persisted or conveyed to another system for further processing (output).

To demonstrate the flexibility of the Jupyter Docker Stacks to work with databases, I have added PostgreSQL to the Docker Stack. We can read and write data from the Jupyter container to the PostgreSQL instance, running in a separate container.

To begin, we will run a SQL script, written in Python, to create our database schema and some test data in a new database table. To do so, we will need to install the psycopg2 package into our Jupyter container. You can use the docker exec command, or as a superuser, your user has administrative access to install Python packages within the container or using the Jupyter Terminal window. Both pip and conda are available to install packages, see details here.

Run the following command to install psycopg2:

docker exec -it \
  $(docker ps | grep _pyspark | awk '{print $NF}') \
  pip install psycopg2 psycopg2-binary

These packages give Python the ability to interact with the PostgreSQL. The included Python script, 03_load_sql.py, will execute a set of SQL statements, contained in a SQL file, bakery_sample.sql, against the PostgreSQL container instance.

To execute the script, run the following command:

python ./03_load_sql.py

This should result in the following output, if successful.
pyspark_article_10_run_sql_py

To confirm the SQL script’s success, I have included Adminer. Adminer (formerly phpMinAdmin) is a full-featured database management tool written in PHP. Adminer natively recognizes PostgreSQL, MySQL, SQLite, and MongoDB, among other database engines.

Adminer should be available on localhost port 8080. The password credentials, shown below, are available in the stack.yml file. The server name, postgres, is the name of the PostgreSQL container. This is the domain name the Jupyter container will use to communicate with the PostgreSQL container.
pyspark_article_06_adminer_login

Connecting to the demo database with Adminer, we should see the bakery_basket table. The table should contain three rows of data, as shown below.
pyspark_article_07_bakery_data

Developing Jupyter NoteBooks

The true power of the Jupyter Docker Stacks containers is Jupyter Notebooks. According to the Jupyter Project, the notebook extends the console-based approach to interactive computing in a qualitatively new direction, providing a web-based application suitable for capturing the whole computation process: developing, documenting, and executing code, as well as communicating the results. Notebook documents contain the inputs and outputs of an interactive session as well as additional text that accompanies the code but is not meant for execution.

To see the power of Jupyter Notebooks, I have written a basic notebook document, 04_pyspark_demo_notebook.ipynb. The document performs some typical PySpark functions, such as loading data from a CSV file and from the PostgreSQL database, performing some basic data analytics with Spark SQL, graphing the data using BokehJS, and finally, saving data back to the database, as well as to the popular Apache Parquet file format. Below we see the notebook document, using the Jupyter Notebook user interface.

pyspark_article_11_notebook.png

PostgreSQL Driver

The only notebook document dependency, not natively part of the Jupyter Image, is the PostgreSQL JDBC driver. The driver, postgresql-42.2.5.jar, is included in the project and referenced in the configuration of the notebook’s Spark Session. The JAR is added to the spark.driver.extraClassPath runtime environment property. This ensures the JAR is available to Spark (written in Scala) when the job is run.

PyCharm

Since the working directory for the project is shared with the container, you can also edit files, including notebook documents, in your favorite IDE, such as JetBrains PyCharm. PyCharm has built-in language support for Jupyter Notebooks, as shown below.
pyspark_article_11_notebook_pycharm.png

As mentioned earlier, a key feature of Jupyter Notebooks is their ability to save the output from each Cell as part of the notebook document. Below, we see the notebook document on GitHub. The output is saved, as part of the notebook document. Not only can you distribute the notebook document, but you can also preserve and share the output from each cell.
pyspark_article_17_github

Using Additional Packages

As mentioned in the Introduction, the Jupyter Docker Stacks come ready-to-run, with a rich variety of Python packages to extend their functionality.  To demonstrate the use of these packages, I have created a second Jupyter notebook document, 05_pyspark_demo_notebook.ipynb. This notebook document uses SciPy (Python-based mathematics, science, and engineering), NumPy (Python-based scientific computing), and the Plotly Python Graphing Library. While NumPy and SciPy are included on the Jupyter Docker Image, the Notebook used pip to install Plotly. Similar to Bokeh, shown previously, we can combine these libraries to create a rich interactive data visualization.

Shown below, we use Plotly to construct a bar chart of daily bakery items sold for the year 2017 based on the Kaggle dataset. The chart uses SciPy and NumPy to construct a linear fit (regression) and plot a line of best fit to the bakery data. The chart also uses SciPy’s Savitzky-Golay Filter to plot the second line, illustrating a smoothing of our bakery data.

pyspark_article_23a_plotly

Plotly also provides Chart Studio Online Chart Maker. Plotly describes Chart Studio as the world’s most sophisticated editor for creating D3.js and WebGL charts. Shown below, we have the ability to enhance, stylize, and share our bakery data visualization using the free version of Chart Studio Cloud.

pyspark_article_23b_plotly

nbviewer

Notebooks can also be viewed using Jupyter nbviewer, hosted on Rackspace. Below, we see the output of a cell from this project’s notebook document, showing a BokehJS chart, using nbviewer. You can view this project’s actual notebook document, using nbviewer, here.

pyspark_article_26_nbviewer.png

Monitoring Spark Jobs

The Jupyter Docker container exposes Spark’s monitoring and instrumentation web user interface. We can observe each Spark Job in great detail.
pyspark_article_12_spark_jobs

We can review details of each stage of the Spark job, including a visualization of the DAG, which Spark constructs as part of the job execution plan, using the DAG Scheduler.
pyspark_article_12_spark_dag

We can also review the timing of each event, occurring as part of the stages of the Spark job.
pyspark_article_12_timeline

We can also use the Spark interface to review and confirm the runtime environment, including versions of Java, Scala, and Spark, as well as packages available on the Java classpath.
pyspark_article_13_enviornment

Spark Performance

Spark, running on a single node within the Jupyter container, on your development system, is not a substitute for a full Spark cluster, running on bare metal or robust virtualized hardware, with YARN, Mesos, or Kubernetes. In my opinion, you should adjust Docker to support an acceptable performance profile for the stack, running only a modest workload. You are not trying to replace the need to run real jobs on a Production Spark cluster.
pyspark_article_16c_perf

We can use the docker stats command to examine the container’s CPU and memory metrics:

docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"

Below, we see the stats from the stack’s three containers immediately after being deployed, showing little or no activity. Here, Docker has been allocated 2 CPUs, 3GB of RAM, and 2 GB of swap space available, from the host machine.
pyspark_article_16a_perf

Compare the stats above with the same three containers, while the example notebook document is running on Spark. The CPU shows a spike, but memory usage appears to be within acceptable ranges.
pyspark_article_16b_perf

Linux top and htop

Another option to examine container performance metrics is with top. We can use the docker exec command to execute the top command within the Jupyter container, sorting processes by CPU usage:

docker exec -it \
  $(docker ps | grep _pyspark | awk '{print $NF}') \
  top -o %CPU

With top, we can observe the individual performance of each processes running in the Jupyter container.

pyspark_article_20_top.png

Lastly, htop, an interactive process viewer for Unix, can be installed into the container and ran with the following set of bash commands, from a Jupyter Terminal window or using docker exec:

docker exec -it \
  $(docker ps | grep _pyspark | awk '{print $NF}') \
  sh -c "apt-get update && apt-get install htop && htop --sort-key PERCENT_CPU"

With htop, we can observe individual CPU activity. The two CPUs at the top left of the htop window are the two CPUs assigned to Docker. We get insight into the way Docker is using each CPU, as well as other basic performance metrics, like memory and swap.

pyspark_article_16f_htop.png

Assuming your development machine host has them available, it is easy to allocate more compute resources to Docker if required. However, in my opinion, this stack is optimized for development and learning, using reasonably sized datasets for data analysis and ML. It should not be necessary to allocate excessive resources to Docker, possibly starving your host machine own compute capabilities.
pyspark_article_16e_perf

Conclusion

In this brief post, we have seen how easy it is to get started learning and developing applications for big data analytics, using Python, Spark, and PySpark, thanks to the Jupyter Docker Stacks. We could use the same stack to learn and develop for machine learning, using Python, Scala, and R. Extending the stack’s capabilities is as simple as swapping out this Jupyter image for another, with a different set of tools, as well as adding additional containers to the stack, such as Apache Kafka or Apache Cassandra.

 

Search results courtesy GoogleTrends (https://trends.google.com)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients.

, , , , , , , , , , , ,

Leave a comment