Posts Tagged Elastic MapReduce
Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, DevOps, Python, Software Development on December 24, 2020
Introduction
In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.
Amazon EMR
According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.
AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.
Amazon MWAA
Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.
Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation, while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache recently announced the release of Airflow 2.0.0 on December 17, 2020. The latest 1.x version of Airflow is 1.10.14, released December 12, 2020. However, at the time of this post, Amazon MWAA was running Airflow 1.10.12, released August 25, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow 1.10.12 documentation.
The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.
Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the Hooks, Secrets, Sensors, and Operators of Airflow codebase’s contrib section.
Getting Started
Source Code
Using this git clone
command, download a copy of this post’s GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \ https://github.com/garystafford/aws-airflow-demo.git
Preliminary Steps
This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.
Configuring Amazon MWAA
The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12
.

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-
. You must also enable Bucket Versioning on the bucket. Specify a dags
folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.


As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.
You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.
Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small
class will be sufficient for this demonstration.

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Airflow Execution Role
As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole
. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole
and EMR_EC2_DefaultRole
. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole
and EMR_EC2_DemoRole
. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject
permission.
Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json
, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.
The Airflow service role, created by MWAA, is shown below with the new policy attached.

Final Architecture
Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Amazon MWAA Environment
The new MWAA Environment will include a link to the Airflow UI.

Airflow UI
Using the supplied link, you should be able to access the Airflow UI using your web browser.

Our First DAG
The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py
. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi
job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.
Upload the DAG to the Airflow S3 bucket’s dags
directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.
aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/
The DAG, spark_pi_example
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Switching to the EMR Console, you should see the single-node EMR cluster being created.

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Triggering DAGs Programmatically
The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.
Below is an example of triggering the spark_pi_example
DAG programmatically using Airflow’s trigger_dag
CLI command. You will need to replace the WEB_SERVER_HOSTNAME
variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME
variable assumes only one MWAA environment is returned by jq
.
Analytics Job with Airflow
Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py
PySpark application. This job should already exist in the processed
data S3 bucket.
The DAG, dags/bakery_sales.py
, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators
and airflow.contrib.sensors
packages for EMR.
Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value
in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf
.
Import Variables into Airflow UI
First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json
file. You will need to update the values for bootstrap_bucket
, emr_ec2_key_pair
, logs_bucket
, and work_bucket
. The three S3 buckets should all exist from the previous post.
Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Upload the DAG, dags/bakery_sales.py
, to the Airflow S3 bucket, similar to the first DAG.
aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/
The second DAG, bakery_sales
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json
.
{ "airflow_email": "analytics_team@example.com", "email_on_failure": false, "email_on_retry": false }
This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Multi-Step DAG
In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator
steps
parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator
job_flow_overrides
parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.
We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.
Below we see a snippet of two of the four Spark submit-job
job definitions (steps
), which have been moved to a separate JSON file, emr_steps/emr_steps.json
.
Below are the EMR cluster specifications (job_flow_overrides)
, which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json
.
Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name)
, which loads the JSON.
This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.
aws s3 cp emr_steps/emr_steps.json \
s3://emr-demo-work-123412341234
-us-east-1/emr_steps/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
s3://emr-demo-work-123412341234
-us-east-1/job_flow_overrides/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234
-us-east-1/dags/
The second DAG, multiple_steps
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales
DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json
.

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.

Triggering DAGs Programmatically
AWS CLI
Similar to the previous example, below we can trigger the multiple_steps
DAG programmatically using Airflow’s trigger_dag
CLI command. Note the addition of the —-conf
named argument, which passes the configuration, containing three key/value pairs, to the trigger command as a JSON blob.
AWS SDK
Airflow DAGs can also be triggered using the AWS SDK. For example, with boto3
for Python, we could use a script, similar to the following to remotely trigger a DAG.
Cleaning Up
Once you are done with the MWAA Environment, be sure to delete it as soon as possible to save additional costs. Also, delete the MWAA-VPC
CloudFormation stack. These resources, like the two NAT Gateways, will also continue to generate additional costs.
aws mwaa delete-environment --name <your_mwaa_environment_name>
aws cloudformation delete-stack --stack-name MWAA-VPC
Conclusion
In this second post in the series, we explored using the newly released Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to run PySpark applications on Amazon Elastic MapReduce (Amazon EMR). In future posts, we will explore the use of Jupyter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.
If you are interested in learning more about configuring Amazon MWAA and Airflow, see my recent post, Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Installing Apache Superset on Amazon EMR: Add data exploration and visualization to your analytics cluster
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, Python on December 24, 2020
Introduction
AWS provides nearly twenty-five different open-source data analytics applications that can be automatically installed and configured on Amazon Elastic MapReduce (Amazon EMR). Of all those options, EMR doesn’t offer a general-purpose data exploration and visualization tool. However, with EMR, you can automate the installation of additional software as part of the cluster creation process or post cluster creation. This brief post will explore how to install, configure, and access Apache Superset, the modern data exploration and visualization platform on Amazon EMR’s Master Node, as a post-cluster creation step. You can use these same techniques to install other software packages on EMR as well, manually or as part of an automated Data Pipeline.
Amazon EMR
According to AWS, Amazon EMR is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

AWS currently offers 5.x and 6.x versions of Amazon EMR. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0. Each version of Amazon EMR offers incremental major and minor releases of nearly 25 different, popular open-source big-data software packages to choose from, which Amazon EMR will install and configure when the cluster is created.
Apache Superset
According to its website, Apache Superset is a modern data exploration and visualization platform. Superset is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.
Superset natively supports over twenty-five data sources, including Amazon Athena and Redshift, Apache Drill, Druid, Hive, Impala, Kylin, Pinot, and Spark SQL, Elasticsearch, Google BigQuery, Hana, MySQL, Oracle, Postgres, Presto, Snowflake, Microsoft SQL Server, and Teradata.
As shown in their Gallery, Superset includes dozens of visualization types, including Pivot Table, Line Chart, Markup, Pie Chart, Filter Box, Bubble Chart, Box Plot, Histogram, Heatmap, Sunburst, Calendar Heatmap, and several geospatial types.
Setup
Using this git clone
command, download a copy of this post’s open-source GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-superset-demo.git
To demonstrate how to install Apache Superset on EMR, I have prepared an AWS CloudFormation template. Deploying the template, cloudformation/superset-emr-demo.yml
, to AWS will result in the AWS CloudFormation stack, superset-emr-demo-dev
. The stack creates a minimally-sized, two-node EMR cluster, two Amazon S3 buckets, and several AWS Systems Manager (SSM) Parameter Store parameters.
There is also a JSON-format CloudFormation parameters file, cloudformation/superset-emr-demo-params-dev.json
. The parameters file contains values for eight of the ten required parameters in the CloudFormation template, all of which you can adjust. For the remaining two required parameters, you will need to supply the name of an existing EC2 key pair to access the EMR Master node. The key pair will need to be deployed to the same AWS Account into which you are deploying EMR. You will also need to supply a Subnet ID for the EMR cluster to be installed into. The subnet must have access to the Internet to install Superset’s required system and Python packages and to access Superset’s web-based user interface. If you need help creating a VPC and subnet to deploy EMR into, refer to my previous blog post, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.
The CloudFormation stack is created using a Python script, create_cfn_stack.py
. The python script uses the AWS boto3
Python SDK.
To execute the Python script and create the CloudFormation stack, which will create the EMR cluster, run the following command. Remember to update the parameters to the name of your EC2 key pair and the Subnet ID for the EMR cluster.
python3 ./create_cfn_stack.py \
--ec2-key-name <your_key_pair_name> \
--ec2-subnet-id <your_subnet_id> \
--environment dev
Here is what the complete CloudFormation workflow looks like.
Security Group Ingress Rules
To install Superset on the EMR cluster’s Master node via SSH, you need to open port 22
on the Security Group associated with the EMR cluster’s Master Node, allowing access from your IP address. You can use the AWS Management Console or AWS CLI to open port 22
. We will use jq
and AWS ec2
API from the AWS CLI to get the Security Group ID associated with the EMR cluster’s Master Node and create the two ingress rules.
export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r ".SecurityGroups[] | \
select(.GroupName==\"ElasticMapReduce-master\").GroupId" | \
head -n 1)
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32
Superset Script
Once the CloudFormation stack is created and the ports are open, we can install Apache Superset on the EMR Master Node. The bootstrap script,bootstrap_emr/bootstrap_superset.sh
, will be used to install Apache Superset onto the EMR cluster’s Master Node as the hadoop
user. The script is roughly based on Superset’s Installing from Scratch instructions.
As part of installing Superset, the script will also deploy several common database drivers, including Amazon Athena, Amazon Redshift, Apache Spark SQL, Presto, PostgreSQL, and MySQL. The script will also create a Superset Admin role, and two Superset User roles — Alpha and Gamma.
To install Superset using the bootstrap script, we will use another Python script, install_superset.py
. The script uses paramiko
, a Python implementation of SSHv2. The script also uses scp
, a module that uses a paramiko
transport to send and receive files via the scp1 protocol.
The script requires a single input parameter, ec2-key-path
, which is the full path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem
). Optionally, you can change the default Superset port of 8280
, using the superset-port
parameter.
python3 ./install_superset.py \
--ec2-key-path </path/to/my-key-pair.pem> \
--superset-port 8280
The script uses SSH and SCP to deploy and execute the bootstrap script,bootstrap_superset.sh
. The output from the script includes the URL of Apache Superset running on the EMR cluster. The output also contains the username and password of the Superset Admin.
******************************************************************** Superset URL: http://ec2-111-22-333-44.compute-1.amazonaws.com:8280 Admin Username: SupersetAdmin Admin Password: Admin1234 ********************************************************************
SSH Tunnel
According to AWS, EMR applications publish user interfaces as websites hosted on the master node. For security reasons, these websites are only available on the master node’s local web server. To reach any of the web interfaces, you must establish an SSH tunnel with the master node using either dynamic or local port forwarding. If you are using dynamic port forwarding, you must also configure a proxy server to view the web interfaces.
Running the command in your terminal will start the SSH tunnel on port 8157
. Once the tunnel is enabled, you can access Apache Superset in a web browser, using the script output’s URL shown in the script output above. Use the Admin credentials or either of the two User credentials to sign in to Superset.
Once signed in, you will have the ability to connect to your data sources and explore and visualize data. Below, we see an example of a SQL query executed against an Amazon RDS for PostgreSQL database, running in a separate VPC from EMR.
Conclusion
In this post, we learned how to install Apache Superset onto the Master Node of an Amazon EMR Cluster. If you want to install an application on all the nodes of an EMR cluster, you can add the commands to the bootstrap script, which runs when CloudFormation creates the cluster.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce
Posted by Gary A. Stafford in AWS, Build Automation, Cloud, Python, Software Development on December 2, 2020
Introduction
According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.
With EMR, individuals and teams can also use notebooks, including EMR Notebooks, based on JupyterLab, the web-based interactive development environment for Jupyter notebooks for ad-hoc data analytics. Apache Zeppelin is also available to collaborate and interactively explore, process, and visualize data. With EMR notebooks and the EMR API, users can programmatically execute a notebook without the need to interact with the EMR console, referred to as headless execution.
AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. One major difference between EMR versions relevant to this post is EMR 6.x’s support for the latest Hadoop and Spark 3.x frameworks. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.
PySpark on EMR
In the following series of posts, we will focus on the options available to interact with Amazon EMR using the Python API for Apache Spark, known as PySpark. We will divide the methods for accessing PySpark on EMR into two categories: PySpark applications and notebooks. We will explore both interactive and automated patterns for running PySpark applications (Python scripts) and PySpark-based notebooks. In this first post, I will cover the first four PySpark Application Methods listed below. In part two, I will cover Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and in part three, the use of notebooks.
PySpark Application Methods
- Add Job Flow Steps: Remote execution of EMR Steps on an existing EMR cluster using the
add_job_flow_steps
method; - EMR Master Node: Remote execution over SSH of PySpark applications using
spark-submit
on an existing EMR cluster’s Master node; - Run Job Flow: Remote execution of EMR Steps on a newly created long-lived or auto-terminating EMR cluster using the
run_job_flow
method; - AWS Step Functions: Remote execution of EMR Steps using AWS Step Functions on an existing or newly created long-lived or auto-terminating EMR cluster;
- Apache Airflow: Remote execution of EMR Steps using the recently released Amazon MWAA on an existing or newly created long-lived or auto-terminating EMR cluster (see part two of this series);
Notebook Methods
- EMR Notebooks for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Jupyter Notebooks on an existing EMR cluster;
- Headless Execution of EMR Notebooks: Headless execution of notebooks from an existing EMR cluster or newly created auto-terminating cluster;
- Apache Zeppelin for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Zeppelin notebooks on an existing EMR cluster;
Note that wherever the AWS SDK for Python (boto3
) is used in this post, we can substitute the AWS CLI or AWS Tools for PowerShell. Typically, these commands and Python scripts would be run as part of a DevOps or DataOps deployment workflow, using CI/CD platforms like AWS CodePipeline, Jenkins, Harness, CircleCI, Travis CI, or Spinnaker.
Preliminary Tasks
To prepare the AWS EMR environment for this post, we need to perform a few preliminary tasks.
- Download a copy of this post’s GitHub repository;
- Download three Kaggle datasets and organize locally;
- Create an Amazon EC2 key pair;
- Upload the EMR bootstrap script and create the CloudFormation Stack;
- Allow your IP address access to the EMR Master node on port 22;
- Upload CSV data files and PySpark applications to S3;
- Crawl the raw data and create a Data Catalog using AWS Glue;
Step 1: GitHub Repository
Using this git clone
command, download a copy of this post’s GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-demo.git
Step 2: Kaggle Datasets
Kaggle is a well-known data science resource with 50,000 public datasets and 400,000 public notebooks. We will be using three Kaggle datasets in this post. You will need to join Kaggle to access these free datasets. Download the following three Kaggle datasets as CSV files. Since we are working with (moderately) big data, the total size of the datasets will be approximately 1 GB.
- Movie Ratings: https://www.kaggle.com/rounakbanik/the-movies-dataset
- Bakery: https://www.kaggle.com/sulmansarwar/transactions-from-a-bakery
- Stocks: https://www.kaggle.com/timoboz/stock-data-dow-jones
Organize the (38) downloaded CSV files into the raw_data
directory of the locally cloned GitHub repository, exactly as shown below. We will upload these files to Amazon S3, in the proceeding step.
> tree raw_data --si -v -A
raw_data
├── [ 128] bakery
│ ├── [711k] BreadBasket_DMS.csv
├── [ 320] movie_ratings
│ ├── [190M] credits.csv
│ ├── [6.2M] keywords.csv
│ ├── [989k] links.csv
│ ├── [183k] links_small.csv
│ ├── [ 34M] movies_metadata.csv
│ ├── [710M] ratings.csv
│ └── [2.4M] ratings_small.csv
└── [1.1k] stocks
├── [151k] AAPL.csv
├── [146k] AXP.csv
├── [150k] BA.csv
├── [147k] CAT.csv
├── [146k] CSCO.csv
├── [149k] CVX.csv
├── [147k] DIS.csv
├── [ 42k] DWDP.csv
├── [150k] GS.csv
└── [...] abrdiged...
In this post, we will be using three different datasets. However, if you want to limit the potential costs associated with big data analytics on AWS, you can choose to limit job submissions to only one or two of the datasets. For example, the bakery and stocks datasets are fairly small yet effectively demonstrate most EMR features. In contrast, the movie rating dataset has nearly 27 million rows of ratings data, which starts to demonstrate the power of EMR and PySpark for big data.
Step 3: Amazon EC2 key pair
According to AWS, a key pair, consisting of a private key and a public key, is a set of security credentials that you use to prove your identity when connecting to an [EC2] instance. Amazon EC2 stores the public key, and you store the private key. To SSH into the EMR cluster, you will need an Amazon key pair. If you do not have an existing Amazon EC2 key pair, create one now. The easiest way to create a key pair is from the AWS Management Console.

Your private key is automatically downloaded when you create a key pair in the console. Store your private key somewhere safe. If you use an SSH client on a macOS or Linux computer to connect to EMR, use the following chmod
command to set the correct permissions of your private key file so that only you can read it.
chmod 0400 /path/to/my-key-pair.pem
Step 4: Bootstrap Script and CloudFormation Stack
The bulk of the resources that are used as part of this demonstration are created using the CloudFormation stack, emr-dem-dev
. The CloudFormation template that creates the stack, cloudformation/emr-demo.yml
, is included in the repository. Please review all resources and understand the cost and security implications before continuing.
There is also a JSON-format CloudFormation parameters file, cloudformation/emr-demo-params-dev.json
, containing values for all but two of the parameters in the CloudFormation template. The two parameters not in the parameter file are the name of the EC2 key pair you just created and the bootstrap bucket’s name. Both will be passed along with the CloudFormation template using the Python script, create_cfn_stack.py
. For each type of environment, such as Development, Test, and Production, you could have a separate CloudFormation parameters file, with different configurations.
The template will create approximately (39) AWS resources, including a new AWS VPC, a public subnet, an internet gateway, route tables, a 3-node EMR v6.2.0 cluster, a series of Amazon S3 buckets, AWS Glue data catalog, AWS Glue crawlers, several Systems Manager Parameter Store parameters, and so forth.
The CloudFormation template includes the location of the EMR bootstrap script located on Amazon S3. Before creating the CloudFormation stack, the Python script creates an S3 bootstrap bucket and copies the bootstrap script, bootstrap_actions.sh
, from the local project repository to the S3 bucket. The script will be used to install additional packages on EMR cluster nodes, which are required by our PySpark applications. The script also sets the default AWS Region for boto3
.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create the bootstrap bucket, copy the bootstrap script, and provision the CloudFormation stack. You will need to pass the name of your EC2 key pair to the script as a command-line argument.
python3 ./scripts/create_cfn_stack.py \ --environment dev \ --ec2-key-name <my-key-pair-name>
The CloudFormation template should create a CloudFormation stack, emr-demo-dev
, as shown below.

Step 5: SSH Access to EMR
For this demonstration, we will need access to the new EMR cluster’s Master EC2 node, using SSH and your key pair, on port 22. The easiest way to add a new inbound rule to the correct AWS Security Group is to use the AWS Management Console. First, find your EC2 Security Group named ElasticMapReduce-master
.

Then, add a new Inbound rule for SSH (port 22) from your IP address, as shown below.

Alternately, you could use the AWS CLI or AWS SDK to create a new security group ingress rule.
export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r '.SecurityGroups[] | select(.GroupName=="ElasticMapReduce-master").GroupId')
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32
Step 6: Raw Data and PySpark Apps to S3
As part of the emr-demo-dev
CloudFormation stack, we now have several new Amazon S3 buckets within our AWS Account. The naming conventions and intended usage of these buckets follow common organizational patterns for data lakes. The data buckets use the common naming convention of raw
, processed
, and analyzed
data in reference to the data stored within them. We also use a widely used, corresponding naming convention of ‘bronze’, ‘silver’, and ‘gold’ when referring to these data buckets as parameters.
> aws s3api list-buckets | \ jq -r '.Buckets[] | select(.Name | startswith("emr-demo-")).Name' emr-demo-raw-123456789012-us-east-1 emr-demo-processed-123456789012-us-east-1 emr-demo-analyzed-123456789012-us-east-1 emr-demo-work-123456789012-us-east-1 emr-demo-logs-123456789012-us-east-1 emr-demo-glue-db-123456789012-us-east-1 emr-demo-bootstrap-123456789012-us-east-1
There is a raw
data bucket (aka bronze) that will contain the original CSV files. There is a processed
data bucket (aka silver) that will contain data that might have had any number of actions applied: data cleansing, obfuscation, data transformation, file format changes, file compression, and data partitioning. Finally, there is an analyzed
data bucket (aka gold) that has the results of the data analysis. We also have a work
bucket that holds the PySpark applications, a logs
bucket that holds EMR logs, and a glue-db
bucket to hold the Glue Data Catalog metadata.
Whenever we submit PySpark jobs to EMR, the PySpark application files and data will always be accessed from Amazon S3. From the GitHub repository’s local copy, run the following command, which will execute a Python script to upload the approximately (38) Kaggle dataset CSV files to the raw
S3 data bucket.
python3 ./scripts/upload_csv_files_to_s3.py
Next, run the following command, which will execute a Python script to upload a series of PySpark application files to the work
S3 data bucket.
python3 ./scripts/upload_apps_to_s3.py
Step 7: Crawl Raw Data with Glue
The last preliminary step to prepare the EMR demonstration environment is to catalog the raw CSV data into an AWS Glue data catalog database, using one of the two Glue Crawlers we created. The three kaggle dataset’s data will reside in Amazon S3, while their schema and metadata will reside within tables in the Glue data catalog database, emr_demo
. When we eventually query the data from our PySpark applications, we will be querying the Glue data catalog’s database tables, which reference the underlying data in S3.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the raw data’s schema and metadata information into the Glue data catalog database, emr_demo
.
python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-raw
Once the crawler is finished, from the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo
, all prefixed with raw_
. The tables hold metadata and schema information for the three CSV-format Kaggle datasets loaded into S3.

Alternately, we can use the glue get-tables
AWS CLI command to review the tables.
> aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("raw_")).Name' raw_bakery raw_credits_csv raw_keywords_csv raw_links_csv raw_links_small_csv raw_movies_metadata_csv raw_ratings_csv raw_ratings_small_csv raw_stocks
PySpark Applications
Let’s explore four methods to run PySpark applications on EMR.

1. Add Job Flow Steps to an Existing EMR Cluster
We will start by looking at running PySpark applications using EMR Steps. According to AWS, we can use Amazon EMR steps to submit work to the Spark framework installed on an EMR cluster. The EMR step for PySpark uses a spark-submit
command. According to Spark’s documentation, the spark-submit
script, located in Spark’s bin
directory, is used to launch applications on a [EMR] cluster. A typical spark-submit
command we will be using resembles the following example. This command runs a PySpark application in S3, bakery_sales_ssm.py
.
We will target the existing EMR cluster created by CloudFormation earlier to execute our PySpark applications using EMR Steps. We have two sets of PySpark applications. The first set of three PySpark applications will transform the raw CSV-format datasets into Apache Parquet, a more efficient file format for big data analytics. Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data.
There are two versions of each PySpark application. Files with suffix _ssm
use the AWS Systems Manager (SSM) Parameter Store service to obtain dynamic parameter values at runtime on EMR. Corresponding non-SSM applications require those same parameter values to be passed on the command line when they are submitted to Spark. Therefore, these PySpark applications are not tightly coupled to boto3
or the SSM Parameter Store. We will use _ssm
versions of the scripts in this post’s demonstration.
> tree pyspark_apps --si -v -A
pyspark_apps
├── [ 320] analyze
│ ├── [1.4k] bakery_sales.py
│ ├── [1.5k] bakery_sales_ssm.py
│ ├── [2.6k] movie_choices.py
│ ├── [2.7k] movie_choices_ssm.py
│ ├── [2.0k] movies_avg_ratings.py
│ ├── [2.3k] movies_avg_ratings_ssm.py
│ ├── [2.2k] stock_volatility.py
│ └── [2.3k] stock_volatility_ssm.py
└── [ 256] process
├── [1.1k] bakery_csv_to_parquet.py
├── [1.3k] bakery_csv_to_parquet_ssm.py
├── [1.3k] movies_csv_to_parquet.py
├── [1.5k] movies_csv_to_parquet_ssm.py
├── [1.9k] stocks_csv_to_parquet.py
└── [2.0k] stocks_csv_to_parquet_ssm.py
We will start by executing the three PySpark processing applications. They will convert the CSV data to Parquet. Below, we see an example of one of the PySpark applications we will run, bakery_csv_to_parquet_ssm.py
. The PySpark application will convert the Bakery Sales dataset’s CSV file to Parquet and write it to S3.
The three PySpark data processing application’s spark-submit
commands are defined in a separate JSON-format file, job_flow_steps_process.json
, a snippet of which is shown below. The same goes for the four analytics applications.
Using this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python script, add_job_flow_steps_process.py
, shown below. Note line 31, where the Steps are injected into the add_job_flow_steps
method’s parameters.
The Python script used for this task takes advantage of AWS Systems Manager Parameter Store parameters. The parameters were placed in the Parameter Store, within the /emr_demo
path, by CloudFormation. We will reference these parameters in several scripts throughout the post.
> aws ssm get-parameters-by-path --path '/emr_demo' | \ jq -r ".Parameters[] | {Name: .Name, Value: .Value}"
From the GitHub repository’s local copy, run the following command, which will execute a Python script to load the three spark-submit
commands from JSON-format file, job_flow_steps_process.json
, and run the PySpark processing applications on the existing EMR cluster.
python3 ./scripts/add_job_flow_steps.py --job-type process
While the three Steps are running concurrently, the view from the Amazon EMR Console’s Cluster Steps tab should look similar to the example below.

Once the three Steps have been completed, we should note three sub-directories in the processed
data bucket containing Parquet-format files.

Of special note is the Stocks dataset, which has been converted to Parquet and partitioned by stock symbol. According to AWS, by partitioning your data, we can restrict the amount of data scanned by each query by specifying filters based on the partition, thus improving performance and reducing cost.

Lastly, the movie ratings dataset has been divided into sub-directories, based on the schema of each table. Each sub-directory contains Parquet files specific to that unique schema.

Crawl Processed Data with Glue
Similar to the raw data earlier, catalog the newly processed Parquet data into the same AWS Glue data catalog database using one of the two Glue Crawlers we created. Similar to the raw data, earlier, processed data will reside in the Amazon S3 processed
data bucket while their schemas and metadata will reside within tables in the Glue data catalog database, emr_demo
.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the processed data’s schema and metadata information into the Glue data catalog database, emr_demo
.
python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-processed
Once the crawler has finished successfully, using the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo
, all prefixed with processed_
. The tables represent the three kaggle dataset’s contents converted to Parquet and correspond to the equivalent tables with the raw_
prefix.

Alternately, we can use the glue get-tables
AWS CLI command to review the tables.
> aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("processed_")).Name' processed_bakery processed_credits processed_keywords processed_links processed_links_small processed_movies_metadata processed_ratings processed_ratings_small processed_stocks
2. Run PySpark Jobs from EMR Master Node
Next, we will explore how to execute PySpark applications remotely on the Master node on the EMR cluster using boto3
and SSH. Although this method may be optimal for certain use cases as opposed to using the EMR SDK, remote SSH execution does not scale as well in my opinion due to a lack of automation, and it exposes some potential security risks.
There are four PySpark applications in the GitHub repository. For this part of the demonstration, we will just submit the bakery_sales_ssm.py
application. This application will perform a simple analysis of the bakery sales data. While the other three PySpark applications use AWS Glue, the bakery_sales_ssm.py
application reads data directly from the processed
data S3 bucket.
The application writes its results into the analyzed
data S3 bucket, in both Parquet and CSV formats. The CSV file is handy for business analysts and other non-technical stakeholders who might wish to import the results of the analysis into Excel or business applications.
Earlier, we created an inbound rule to allow your IP address to access the Master node on port 22. From the EMR Console’s Cluster Summary tab, note the command necessary to SSH into the Master node of the EMR cluster.

The Python script, submit_spark_ssh.py
, shown below, will submit the PySpark job to the EMR Master Node, using paramiko
, a Python implementation of SSHv2. The script is replicating the same functionality as the shell-based SSH command above to execute a remote command on the EMR Master Node. The spark-submit
command is on lines 36–38, below.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to submit the job. The script requires one input parameter, which is the path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem
)
python3 ./scripts/submit_spark_ssh.py \
--ec2-key-path </path/to/my-key-pair.pem>
The spark-submit
command will be executed remotely on the EMR cluster’s Master node over SSH. All variables in the commands will be replaced by the environment variables, set in advance, which use AWS CLI emr
and ssm
commands.

Monitoring Spark Jobs
We set spark.yarn.submit.waitAppCompletion
to true
. According to Spark’s documentation, this property controls whether the client waits to exit in YARN cluster mode until the application is completed. If set to true
, the client process will stay alive, reporting the application’s status. Otherwise, the client process will exit after submission. We can watch the job’s progress from the terminal.

We can also use the YARN Timeline Server and the Spark History Server in addition to the terminal. Links to both are shown on both the EMR Console’s Cluster ‘Summary’ and ‘Application user interfaces’ tabs. Unlike other EMR application web interfaces, using port forwarding, also known as creating an SSH tunnel, is not required for the YARN Timeline Server or the Spark History Server.

YARN Timeline Server
Below, we see that the job we submitted running on the YARN Timeline Server also includes useful tools like access to configuration, local logs, server stacks, and server metrics.

YARN Timeline Server allows us to drill down into individual jobs and view logs. Logs are ideal for troubleshooting failed jobs, especially the stdout
logs.

Spark History Server
You can also view the PySpark application we submitted from the Master node using the Spark History Server. Below, we see completed Spark applications (aka Spark jobs) in the Spark History Server.

Below, we see more details about our Spark job using the Spark History Server.

We can even see visual representations of each Spark job’s Directed Acyclic Graph (DAG).

3. Run Job Flow on an Auto-Terminating EMR Cluster
The next option to run PySpark applications on EMR is to create a short-lived, auto-terminating EMR cluster using the run_job_flow
method. We will create a new EMR cluster, run a series of Steps (PySpark applications), and then auto-terminate the cluster. This is a cost-effective method of running PySpark applications on-demand.
We will create a second 3-node EMR v6.2.0 cluster to demonstrate this method, using Amazon EC2 Spot instances for all the EMR cluster’s Master and Core nodes. Unlike the first, long-lived, more general-purpose EMR cluster, we will only deploy the Spark application to this cluster as that is the only application we will need to run the Steps.
Using the run_job_flow
method, we will execute the four PySpark data analysis applications. The PySpark application’s spark-submit
commands are defined in a separate JSON-format file, job_flow_steps_analyze.json
. Similar to the previous add_job_flow_steps.py
script, this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python execution script. Also similar, this script retrieves parameter values from the SSM Parameter Store.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create a new cluster, run the two PySpark applications, and then auto-terminate.
python3 ./scripts/run_job_flow.py --job-type analyze
As shown below, we see the short-lived EMR cluster in the process of terminating after successfully running the PySpark applications as EMR Steps.


4. Using AWS Step Functions
According to AWS, AWS Step Functions is a serverless function orchestrator that makes it easy to sequence AWS Lambda functions and multiple AWS services. Step Functions manages sequencing, error handling, retry logic, and state, removing a significant operational burden from your team. Step Functions is based on state machines and tasks. A state machine is a workflow. A task is a state in a workflow that represents a single unit of work that another AWS service performs. Each step in a workflow is a state. Using AWS Step Functions, we define our workflows as state machines, which transform complex code into easy to understand statements and diagrams.

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.
We will create two state machines for this demo, one for the PySpark data processing applications and one for the PySpark data analysis applications. To create state machines, we first need to create JSON-based state machine definition files. The files are written in Amazon States Language. According to AWS, Amazon States Language is a JSON-based, structured language used to define a state machine, a collection of states that can do work (Task states), determine which states to transition to next (Choice states), stop execution with an error (Fail states), and so on.
The definition files contain specific references to AWS resources deployed to your AWS account originally created by CloudFormation. Below is a snippet of the state machine definition file, step_function_emr_analyze.json
, showing part of the configuration of the EMR cluster. Note the parameterized key/value pairs (e.g., “Ec2KeyName.$”: “$.InstancesEc2KeyName” on line 5). The values will come from a JSON-formatted inputs file and are dynamically replaced upon the state machine’s execution.
Python Templating
To automate the process of adding dynamic resource references to the state machine’s inputs files, we will use Jinja, the modern and designer-friendly templating language for Python, modeled after Django’s templates. We will render the Jinja template to a JSON-based state machine inputs file, replacing the template’s resource tags (keys) with values from the SSM Parameter Store’s parameters. Below is a snippet from the inputs file Jinja template, step_function_inputs_analyze.j2
.
First, install Jinja2, then create two JSON-based state machine inputs files from the Jinja templates using the included Python file.
# install Jinja2 python3 -m pip install Jinja2 python3 ./scripts/create_inputs_files.py
Below we see the same snippet of the final inputs file. Jinja tags have been replaced with values from the SSM Parameter Store.
Using the definition files, create two state machines using the included Python files.
python3 ./scripts/create_state_machine.py \ --definition-file step_function_emr_process.json \ --state-machine EMR-Demo-Process python3 ./scripts/create_state_machine.py \ --definition-file step_function_emr_analyze.json \ --state-machine EMR-Demo-Analysis
Both state machines should appear in the AWS Step Functions Console’s State Machines tab. Below, we see the ‘EMR-Demo-Analysis’ state machine’s definition both as JSON and rendered visually to a layout.

To execute either of the state machines, use the included Python file, passing in the exact name of the state machine to execute, either ‘EMR-Demo-Process’ or ‘EMR-Demo-Analysis’, and the name of the inputs file. I suggest running the EMR-Demo-Analysis version so as not to re-process all the raw data.
python3 ./scripts/execute_state_machine.py \ --state-machine EMR-Demo-Process \ --inputs-file step_function_inputs_process.json python3 ./scripts/execute_state_machine.py \ --state-machine EMR-Demo-Analysis \ --inputs-file step_function_inputs_analyze.json
When the PySpark analysis application’s Step Function state machine is executed, a new EMR cluster is created, the PySpark applications are run, and finally, the cluster is auto-terminated. Below, we see a successfully executed state machine, which successfully ran the four PySpark analysis applications in parallel, on a new auto-terminating EMR cluster.

Conclusion
This post explored four methods for running PySpark applications on Amazon Elastic MapReduce (Amazon EMR). The key to scaling data analytics with PySpark on EMR is the use of automation. Therefore, we looked at ways to automate the deployment of EMR resources, create and submit PySpark jobs, and terminate EMR resources when the jobs are complete. Furthermore, we were able to decouple references to dynamic AWS resources within our PySpark applications using parameterization. This allows us to deploy and run PySpark resources across multiple AWS Accounts and AWS Regions without code changes.
In part two of the series, we will explore the use of the recently announced service, Amazon Managed Workflows for Apache Airflow (MWAA), and in part three, the use of Juypter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 2
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, DevOps, Python, Software Development on November 30, 2019
Introduction
In Part 1 of this two-part post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on Amazon Elastic MapReduce (EMR). Further, we configured Zeppelin integrations with AWS Glue Data Catalog, Amazon Relational Database Service (RDS) for PostgreSQL, and Amazon Simple Cloud Storage Service (S3) Data Lake. We also covered how to obtain the project’s source code from the two GitHub repositories, zeppelin-emr-demo and zeppelin-emr-config. Below is a high-level architectural diagram of the infrastructure we constructed in Part 1 for this demonstration.
Part 2
In Part 2 of this post, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using a series of four Zeppelin notebooks. Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.
Notebook 1
The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.
Interpreters
When you open a notebook for the first time, you are given the choice of interpreters to bind and unbind to the notebook. The last interpreter in the list shown below, postgres
, is the new PostgreSQL JDBC Zeppelin interpreter we created in Part 1 of this post. We will use this interpreter in Notebook 3.
Application Versions
The first two paragraphs of the notebook are used to confirm the version of Spark, Scala, OpenJDK, and Python we are using. Recall we updated the Spark and Python interpreters to use Python 3.
Helium Visualizations
If you recall from Part 1 of the post, we pre-installed several additional Helium Visualizations, including the Ultimate Pie Chart. Below, we see the use of the Spark SQL (%sql
) interpreter to query a Spark DataFrame, return results, and visualize the data using the Ultimate Pie Chart. In addition to the pie chart, we see the other pre-installed Helium visualizations proceeding the five default visualizations, in the menu bar. With Zeppelin, all we have to do is write Spark SQL queries against the Spark DataFrame created earlier in the notebook, and Zeppelin will handle the visualization. You have some basic controls over charts using the ‘settings’ option.
Building the Data Lake
Notebook 1 demonstrates how to read and write data to S3. We read and write the Bakery dataset to both CSV-format and Apache Parquet-format, using Spark (PySpark). We also write the results of Spark SQL queries, like the one above, in Parquet, to S3.
With Parquet, data may be split into multiple files, as shown in the S3 bucket directory below. Parquet is much faster to read into a Spark DataFrame than CSV. Spark provides support for both reading and writing Parquet files. We will write all of our data to Parquet in S3, making future re-use of the data much more efficient than downloading data from the Internet, like GroupLens or kaggle, or consuming CSV from S3.
Preview S3 Data
In addition to using the Zeppelin notebook, we can preview data right in the S3 bucket web interface using the Amazon S3 Select feature. This query in place feature is helpful to quickly understand the structure and content of new data files with which you want to interact within Zeppelin.
Saving Changes to GitHub
In Part 1, we configured Zeppelin to read and write the notebooks from your own copy of the GitHub notebook repository. Using the ‘version control’ menu item, changes made to the notebooks can be committed directly to GitHub.
In GitHub, note the committer is the zeppelin
user.
Notebook 2
The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.
Multi-Node EMR Cluster
If you recall from Part 1, we waited to create this cluster due to the compute costs of running the cluster’s large EC2 instances. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.
Normalized Instance Hours
Understanding the costs of EMR requires understanding the concept of normalized instance hours. Cluster displayed in the EMR AWS Console contains two columns, ‘Elapsed time’ and ‘Normalized instance hours’. The ‘Elapsed time’ column reflects the actual wall-clock time the cluster was used. The ‘Normalized instance hours’ column indicates the approximate number of compute hours the cluster has used, rounded up to the nearest hour.
Normalized instance hours calculations are based on a normalization factor. The normalization factor ranges from 1 for a small instance, up to 64 for an 8xlarge. Based on the type and quantity of instances in our multi-node cluster, we would use approximately 56 compute hours (aka normalized instance hours) for every one hour of wall-clock time our EMR cluster is running. Note the multi-node cluster used in our demo, highlighted in yellow above. The cluster ran for two hours, which equated to 112 normalized instance hours.
Create the Multi-Node Cluster
Create the multi-node EMR cluster using CloudFormation. Change the following nine variable values, then run the emr cloudformation create-stack
API command, using the AWS CLI.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" EC2_KEY_NAME="your-key-name" LOG_BUCKET="aws-logs-your_aws_account_id-your_region" GITHUB_ACCOUNT="your-account-name" GITHUB_REPO="your-new-project-name" GITHUB_TOKEN="your-token-value" MASTER_INSTANCE_TYPE="m5.xlarge" # optional CORE_INSTANCE_TYPE="m5.2xlarge" # optional CORE_INSTANCE_COUNT=3 # optional aws cloudformation create-stack \ --stack-name zeppelin-emr-prod-stack \ --template-body file://cloudformation/emr_cluster.yml \ --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \ ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \ ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \ ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \ ParameterKey=CoreInstanceType,ParameterValue=${CORE_INSTANCE_TYPE} \ ParameterKey=CoreInstanceCount,ParameterValue=${CORE_INSTANCE_COUNT} \ ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \ ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \ ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}
Use the Amazon EMR web interface to confirm the success of the CloudFormation stack. The fully-provisioned cluster should be in the ‘Waiting’ state when ready.
Configuring the EMR Cluster
Refer to Part 1 for the configuration steps necessary to prepare the EMR cluster and Zeppelin before continuing. Repeat all the steps used for the single-node cluster.
Monitoring with Ganglia
In Part 1, we installed Ganglia as part of creating the EMR cluster. Ganglia, according to its website, is a scalable distributed monitoring system for high-performance computing systems such as clusters and grids. Ganglia can be used to evaluate the performance of the single-node and multi-node EMR clusters. With Ganglia, we can easily view cluster and individual instance CPU, memory, and network I/O performance.
Ganglia Example: Cluster Memory
Ganglia Example: Cluster Network I/O
YARN Resource Manager
The YARN Resource Manager Web UI is also available on our EMR cluster. Using the Resource Manager, we can view the compute resource load on the cluster, as well as the individual EMR Core nodes. Below, we see that the multi-node cluster has 24 vCPUs and 72 GiB of memory available, split evenly across the three Core cluster nodes.
You might recall, the m5.2xlarge EC2 instance type, used for the three Core nodes, each contains 8 vCPUs and 32 GiB of memory. However, by default, although all 8 vCPUs are available for computation per node, only 24 GiB of the node’s 32 GiB of memory are available for computation. EMR ensures a portion of the memory on each node is reserved for other system processes. The maximum available memory is controlled by the YARN memory configuration option, yarn.scheduler.maximum-allocation-mb
.
The YARN Resource Manager preview above shows the load on the Code nodes as Notebook 2 is executing the Spark SQL queries on the large MovieLens with 27MM ratings. Note that only 4 of the 24 vCPUs (16.6%) are in use, but that 70.25 of the 72 GiB (97.6%) of available memory is being used. According to Spark, because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth. In this case, memory appears to be the most constrained resource. Using memory-optimized instances, such as r4 or r5 instance types, might be more effective for the core nodes than the m5 instance types.
MovieLens Datasets
By changing one variable in the notebook, we can work with the latest, smaller GroupLens MovieLens dataset containing approximately 100k rows (ml-latest-small
) or the larger dataset, containing approximately 27M rows (ml-latest
). For this demo, try both datasets on both the single-node and multi-node clusters. Compare the Spark SQL paragraph execution times for each of the four variations, including single-node with the small dataset, single-node with the large dataset, multi-node with the small dataset, and multi-node with the large dataset. Observe how fast the SQL queries are executed on the single-node versus multi-node cluster. Try switching to a different Core node instance type, such as r5.2xlarge. Try creating a cluster with additional Core nodes. How is the compute time effected?
Terminate the multi-node EMR cluster to save yourself the expense before continuing to Notebook 3.
aws cloudformation delete-stack \ --stack-name=zeppelin-emr-prod-stack
Notebook 3
The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.
Glue Crawlers
Before continuing with Notebook 3, run the two Glue Crawlers using the AWS CLI.
aws glue start-crawler --name bakery-transactions-crawler aws glue start-crawler --name movie-ratings-crawler
The two Crawlers will create a total of seven tables in the Glue Data Catalog database.
If we examine the Glue Data Catalog database, we should now observe several tables, one for each dataset found in the S3 bucket. The location of each dataset is shown in the ‘Location’ column of the tables view.
From the Zeppelin notebook, we can even use Spark SQL to query the AWS Glue Data Catalog, itself, for its databases and the tables within them.
According to Amazon, the Glue Data Catalog tables and databases are containers for the metadata definitions that define a schema for underlying source data. Using Zeppelin’s SQL interpreter, we can query the Data Catalog database and return the underlying source data. The SQL query example, below, demonstrates how we can perform a join across two tables in the data catalog database, representing two different data sources, and return results.
Notebook 4
The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.
First, we create a new schema and four related tables for the RDS PostgreSQL movie ratings database, using the Psycopg 2 PostgreSQL adapter for Python and the SQL file we copied to S3 in Part 1.
The RDS database’s schema, shown below, approximates the schema of the four CSV files from the GroupLens MovieLens rating dataset we used in Notebook 2.
Since the schema of the PostgreSQL database matches the MovieLens dataset files, we can import the data from the CVS files, downloaded from GroupLens, directly into the RDS database, again using the Psycopg PostgreSQL adapter for Python.
According to the Spark documentation, Spark SQL also includes a data source that can read data from other databases using JDBC. Using Spark’s JDBC capability and the PostgreSQL JDBC Driver we installed in Part 1, we can perform Spark SQL queries against the RDS database using PySpark (%spark.pyspark
). Below, we see a paragraph example of reading the RDS database’s movies
table, using Spark.
As a third method of querying the RDS database, we can use the custom Zeppelin PostgreSQL JDBC interpreter (%postgres
) we created in Part 1. Although the default driver of the JDBC interpreter is set as PostgreSQL, and the associated JAR is included with Zeppelin, we overrode that older JAR, with the latest PostgreSQL JDBC Driver JAR.
Using the %postgres
interpreter, we query the RDS database’s public
schema, and return the four database tables we created earlier in the notebook.
Again, below, using the %postgres
interpreter in the notebook’s paragraph, we query the RDS database and return data, which we then visualize using Zeppelin’s bar chart. Finally, note the use of Zeppelin Dynamic Forms in this example. Dynamic Forms allows Zeppelin to dynamically creates input forms, whose input values are then available to use programmatically. Here, we use two form input values to control the data returned from our query and the resulting visualization.
Conclusion
In this two-part post, we learned how effectively Apache Zeppelin integrates with Amazon EMR. We also learned how to extend Zeppelin’s capabilities, using AWS Glue, Amazon RDS, and Amazon S3 as a Data Lake. Beyond what was covered in this post, there are dozens of more Zeppelin and EMR features, as well as dozens of more AWS services that integrate with Zeppelin and EMR, for you to discover.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 1
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, DevOps, Python, Software Development on November 22, 2019
Introduction
There is little question big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last 3–5 years. Behind the hype cycles and marketing buzz, these technologies are having a significant influence on many aspects of our modern lives. Due to their popularity, commercial enterprises, academic institutions, and the public sector have all rushed to develop hardware and software solutions to decrease the barrier to entry and increase the velocity of ML and Data Scientists and Engineers.
Data Science: 5-Year Search Trend (courtesy Google Trends)
Machine Learning: 5-Year Search Trend (courtesy Google Trends)
Technologies
All three major cloud providers, Amazon Web Services (AWS), Microsoft Azure, and Google Cloud, have rapidly maturing big data analytics, data science, and AI and ML services. AWS, for example, introduced Amazon Elastic MapReduce (EMR) in 2009, primarily as an Apache Hadoop-based big data processing service. Since then, according to Amazon, EMR has evolved into a service that uses Apache Spark, Apache Hadoop, and several other leading open-source frameworks to quickly and cost-effectively process and analyze vast amounts of data. More recently, in late 2017, Amazon released SageMaker, a service that provides the ability to build, train, and deploy machine learning models quickly and securely.
Simultaneously, organizations are building solutions that integrate and enhance these Cloud-based big data analytics, data science, AI, and ML services. One such example is Apache Zeppelin. Similar to the immensely popular Project Jupyter and the newly open-sourced Netflix’s Polynote, Apache Zeppelin is a web-based, polyglot, computational notebook. Zeppelin enables data-driven, interactive data analytics and document collaboration using a number of interpreters such as Scala (with Apache Spark), Python (with Apache Spark), Spark SQL, JDBC, Markdown, Shell and so on. Zeppelin is one of the core applications supported natively by Amazon EMR.
In the following two-part post, we will explore the use of Apache Zeppelin on EMR for data science and data analytics using a series of Zeppelin notebooks. The notebooks feature the use of AWS Glue, the fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load data for analytics. The notebooks also feature the use of Amazon Relational Database Service (RDS) for PostgreSQL and Amazon Simple Cloud Storage Service (S3). Amazon S3 will serve as a Data Lake to store our unstructured data. Given the current choice of Zeppelin’s more than twenty different interpreters, we will use Python3 and Apache Spark, specifically Spark SQL and PySpark, for all notebooks.
We will build an economical single-node EMR cluster for data exploration, as well as a larger multi-node EMR cluster for analyzing large data sets. Amazon S3 will be used to store input and output data, while intermediate results are stored in the Hadoop Distributed File System (HDFS) on the EMR cluster. Amazon provides a good overview of EMR architecture. Below is a high-level architectural diagram of the infrastructure we will construct during Part 1 for this demonstration.
Notebook Features
Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin, including Moonsoo Lee, Zepl CTO and creator for Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.
Notebook 1
The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.
Notebook 2
The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.
Notebook 3
The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.
Notebook 4
The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.
Demonstration
In Part 1 of the post, as a DataOps Engineer, we will create and configure the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3-based data lake. In Part 2 of this post, as a Data Scientist, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the Zeppelin notebooks.
Source Code
The demonstration’s source code is contained in two public GitHub repositories. The first repository, zeppelin-emr-demo, includes the four Zeppelin notebooks, organized according to the conventions of Zeppelin’s pluggable notebook storage mechanisms.
. ├── 2ERVVKTCG │ └── note.json ├── 2ERYY923A │ └── note.json ├── 2ESH8DGFS │ └── note.json ├── 2EUZKQXX7 │ └── note.json ├── LICENSE └── README.md
Zeppelin GitHub Storage
During the demonstration, changes made to your copy of the Zeppelin notebooks running on EMR will be automatically pushed back to GitHub when a commit occurs. To accomplish this, instead of just cloning a local copy of my zeppelin-emr-demo project repository, you will want your own copy, within your personal GitHub account. You could folk my zeppelin-emr-demo GitHub repository or copy a clone into your own GitHub repository.
To make a copy of the project in your own GitHub account, first, create a new empty repository on GitHub, for example, ‘my-zeppelin-emr-demo-copy’. Then, execute the following commands from your terminal, to clone the original project repository to your local environment, and finally, push it to your GitHub account.
# change me GITHUB_ACCOUNT="your-account-name" # i.e. garystafford GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo-copy # shallow clone into new directory git clone --branch master \ --single-branch --depth 1 --no-tags \ https://github.com/garystafford/zeppelin-emr-demo.git \ ${GITHUB_REPO} # re-initialize repository cd ${GITHUB_REPO} rm -rf .git git init # re-commit code git add -A git commit -m "Initial commit of my copy of zeppelin-emr-demo" # push to your repo git remote add origin \ https://github.com/$GITHUB_ACCOUNT/$GITHUB_REPO.git git push -u origin master
GitHub Personal Access Token
To automatically push changes to your GitHub repository when a commit occurs, Zeppelin will need a GitHub personal access token. Create a personal access token with the scope shown below. Be sure to keep the token secret. Make sure you do not accidentally check your token value into your source code on GitHub. To minimize the risk, change or delete the token after completing the demo.
The second repository, zeppelin-emr-config, contains the necessary bootstrap files, CloudFormation templates, and PostgreSQL DDL (Data Definition Language) SQL script.
. ├── LICENSE ├── README.md ├── bootstrap │ ├── bootstrap.sh │ ├── emr-config.json │ ├── helium.json ├── cloudformation │ ├── crawler.yml │ ├── emr_single_node.yml │ ├── emr_cluster.yml │ └── rds_postgres.yml └── sql └── ratings.sql
Use the following AWS CLI command to clone the GitHub repository to your local environment.
git clone --branch master \ --single-branch --depth 1 --no-tags \ https://github.com/garystafford/zeppelin-emr-demo-setup.git
Requirements
To follow along with the demonstration, you will need an AWS Account, an existing Amazon S3 bucket to store EMR configuration and data, and an EC2 key pair. You will also need a current version of the AWS CLI installed in your work environment. Due to the particular EMR features, we will be using, I recommend using the us-east-1
AWS Region to create the demonstration’s resources.
# create secure emr config and data bucket # change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" aws s3api create-bucket \ --bucket ${ZEPPELIN_DEMO_BUCKET} aws s3api put-public-access-block \ --bucket ${ZEPPELIN_DEMO_BUCKET} \ --public-access-block-configuration \ BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true
Copy Configuration Files to S3
To start, we need to copy three configuration files, bootstrap.sh, helium.json, and ratings.sql, from the zeppelin-emr-demo-setup
project directory to our S3 bucket. Change the ZEPPELIN_DEMO_BUCKET
variable value, then run the following s3 cp
API command, using the AWS CLI. The three files will be copied to a bootstrap directory within your S3 bucket.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" aws s3 cp bootstrap/bootstrap.sh s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/ aws s3 cp bootstrap/helium.json s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/ aws s3 cp sql/ratings.sql s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/
Below, sample output from copying local files to S3.
Create AWS Resources
We will start by creating most of the required AWS resources for the demonstration using three CloudFormation templates. We will create a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, an AWS Glue Data Catalog database, two AWS Glue Crawlers, and a Glue IAM Role. We will wait to create the multi-node EMR cluster due to the compute costs of running large EC2 instances in the cluster. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.
Single-Node EMR Cluster
We will start by creating the single-node Amazon EMR cluster, consisting of just one master node with no core or task nodes (a cluster of one). All operations will take place on the master node.
Default EMR Resources
The following EMR instructions assume you have already created at least one EMR cluster in the past, in your current AWS Region, using the EMR web interface with the ‘Create Cluster – Quick Options’ option. Creating a cluster this way creates several additional AWS resources, such as the EMR_EC2_DefaultRole
EC2 instance profile, the default EMR_DefaultRole
EMR IAM Role, and the default EMR S3 log bucket.
If you haven’t created any EMR clusters using the EMR ‘Create Cluster – Quick Options’ in the past, don’t worry, you can also create the required resources with a few quick AWS CLI commands. Change the following LOG_BUCKET
variable value, then run the aws emr
and aws s3api
API commands, using the AWS CLI. The LOG_BUCKET
variable value follows the convention of aws-logs-awsaccount-region
. For example, aws-logs-012345678901-us-east-1
.
# create emr roles aws emr create-default-roles # create log secure bucket # change me LOG_BUCKET="aws-logs-your_aws_account_id-your_region" aws s3api create-bucket --bucket ${LOG_BUCKET} aws s3api put-public-access-block --bucket ${LOG_BUCKET} \ --public-access-block-configuration \ BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true
The new EMR IAM Roles can be viewed in the IAM Roles web interface.
Often, I see tutorials that reference these default EMR resources from the AWS CLI or CloudFormation, without any understanding or explanation of how they are created.
EMR Bootstrap Script
As part of creating our EMR cluster, the CloudFormation template, emr_single_node.yml, will call the bootstrap script we copied to S3, earlier, bootstrap.sh. The bootstrap script pre-installs required Python and Linux software packages, and the PostgreSQL driver JAR. The bootstrap script also clones your copy of the zeppelin-emr-demo GitHub repository.
#!/bin/bash set -ex if [[ $# -ne 2 ]] ; then echo "Script requires two arguments" exit 1 fi GITHUB_ACCOUNT=$1 GITHUB_REPO=$2 # install extra python packages sudo python3 -m pip install psycopg2-binary boto3 # install extra linux packages yes | sudo yum install git htop # clone github repo cd /tmp git clone "https://github.com/${GITHUB_ACCOUNT}/${GITHUB_REPO}.git" # install extra jars POSTGRES_JAR="postgresql-42.2.8.jar" wget -nv "https://jdbc.postgresql.org/download/${POSTGRES_JAR}" sudo chown -R hadoop:hadoop ${POSTGRES_JAR} mkdir -p /home/hadoop/extrajars/ cp ${POSTGRES_JAR} /home/hadoop/extrajars/
EMR Application Configuration
The EMR CloudFormation template will also modify the EMR cluster’s Spark and Zeppelin application configurations. Amongst other configuration properties, the template sets the default Python version to Python3, instructs Zeppelin to use the cloned GitHub notebook directory path, and adds the PostgreSQL Driver JAR to the JVM ClassPath. Below we can see the configuration properties applied to an existing EMR cluster.

EMR Application Versions
As of the date of this post, EMR is at version 5.28.0. Below, as shown in the EMR web interface, are the current (21) applications and frameworks available for installation on EMR.
For this demo, we will install Apache Spark v2.4.4, Ganglia v3.7.2, and Zeppelin 0.8.2.
Apache Zeppelin: Web Interface
Apache Spark: DAG Visualization
Ganglia: Cluster CPU Monitoring
Create the EMR CloudFormation Stack
Change the following (7) variable values, then run the emr cloudformation create-stack
API command, using the AWS CLI.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" EC2_KEY_NAME="your-key-name" LOG_BUCKET="aws-logs-your_aws_account_id-your_region" GITHUB_ACCOUNT="your-account-name" # i.e. garystafford GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo GITHUB_TOKEN="your-token-value" MASTER_INSTANCE_TYPE="m5.xlarge" # optional aws cloudformation create-stack \ --stack-name zeppelin-emr-dev-stack \ --template-body file://cloudformation/emr_single_node.yml \ --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \ ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \ ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \ ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \ ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \ ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \ ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}
You can use the Amazon EMR web interface to confirm the results of the CloudFormation stack. The cluster should be in the ‘Waiting’ state.
PostgreSQL on Amazon RDS
Next, create a simple, single-AZ, single-master, non-replicated Amazon RDS PostgreSQL database, using the included CloudFormation template, rds_postgres.yml. We will use this database in Notebook 4. For the demo, I have selected the current-generation general purpose db.m4.large
EC2 instance type to run PostgreSQL. You can easily change the instance type to another RDS-supported instance type to suit your own needs.
Change the following (3) variable values, then run the cloudformation create-stack
API command, using the AWS CLI.
# change me DB_MASTER_USER="your-db-username" # i.e. masteruser DB_MASTER_PASSWORD="your-db-password" # i.e. 5up3r53cr3tPa55w0rd MASTER_INSTANCE_TYPE="db.m4.large" # optional aws cloudformation create-stack \ --stack-name zeppelin-rds-stack \ --template-body file://cloudformation/rds_postgres.yml \ --parameters ParameterKey=DBUser,ParameterValue=${DB_MASTER_USER} \ ParameterKey=DBPassword,ParameterValue=${DB_MASTER_PASSWORD} \ ParameterKey=DBInstanceClass,ParameterValue=${MASTER_INSTANCE_TYPE}
You can use the Amazon RDS web interface to confirm the results of the CloudFormation stack.
AWS Glue
Next, create the AWS Glue Data Catalog database, the Apache Hive-compatible metastore for Spark SQL, two AWS Glue Crawlers, and a Glue IAM Role (ZeppelinDemoCrawlerRole
), using the included CloudFormation template, crawler.yml. The AWS Glue Data Catalog database will be used in Notebook 3.
Change the following variable value, then run the cloudformation create-stack
API command, using the AWS CLI.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" aws cloudformation create-stack \ --stack-name zeppelin-crawlers-stack \ --template-body file://cloudformation/crawler.yml \ --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \ --capabilities CAPABILITY_NAMED_IAM
You can use the AWS Glue web interface to confirm the results of the CloudFormation stack. Note the Data Catalog database and the two Glue Crawlers. We will not run the two crawlers until Part 2 of the post, so no tables will exist in the Data Catalog database, yet.
At this point in the demonstration, you should have successfully created a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, and several AWS Glue resources, all using CloudFormation templates.
Post-EMR Creation Configuration
RDS Security
For the new EMR cluster to communicate with the RDS PostgreSQL database, we need to ensure that port 5432 is open from the RDS database’s VPC security group, which is the default VPC security group, to the security groups of the EMR nodes. Obtain the Group ID of the ElasticMapReduce-master
and ElasticMapReduce-slave
Security Groups from the EMR web interface.
Access the Security Group for the RDS database using the RDS web interface. Change the inbound rule for port 5432 to include both Security Group IDs.
SSH to EMR Master Node
In addition to the bootstrap script and configurations, we already applied to the EMR cluster, we need to make several post-EMR creation configuration changes to the EMR cluster for our demonstration. These changes will require SSH’ing to the EMR cluster. Using the master node’s public DNS address and SSH command provided in the EMR web console, SSH into the master node.
If you cannot access the node using SSH, check that port 22 is open on the associated EMR master node IAM Security Group (ElasticMapReduce-master
) to your IP address or address range.
Git Permissions
We need to change permissions on the git repository we installed during the EMR bootstrapping phase. Typically, with an EC2 instance, you perform operations as the ec2-user
user. With Amazon EMR, you often perform actions as the hadoop
user. With Zeppelin on EMR, the notebooks perform operations, including interacting with the git repository as the zeppelin
user. As a result of the bootstrap.sh script, the contents of the git repository directory, /tmp/zeppelin-emr-demo/
, are owned by the hadoop
user and group by default.
We will change their owner to the zeppelin
user and group. We could not perform this step as part of the bootstrap script since the the zeppelin
user and group did not exist at the time the script was executed.
cd /tmp/zeppelin-emr-demo/ sudo chown -R zeppelin:zeppelin .
The results should look similar to the following output.
Pre-Install Visualization Packages
Next, we will pre-install several Apache Zeppelin Visualization packages. According to the Zeppelin website, an Apache Zeppelin Visualization is a pluggable package that can be loaded/unloaded on runtime through the Helium framework in Zeppelin. We can use them just like any other built-in visualization in the notebook. A Visualization is a javascript npm package. For example, here is a link to the ultimate-pie-chart on the public npm registry.
We can pre-load plugins by replacing the /usr/lib/zeppelin/conf/helium.json
file with the version of helium.json we copied to S3, earlier, and restarting Zeppelin. If you have a lot of Visualizations or package types or use any DataOps automation to create EMR clusters, this approach is much more efficient and repeatable than manually loading plugins using the Zeppelin UI, each time you create a new EMR cluster. Below, the helium.json
file, which pre-loads (8) Visualization packages.
{ "enabled": { "ultimate-pie-chart": "ultimate-pie-chart@0.0.2", "ultimate-column-chart": "ultimate-column-chart@0.0.2", "ultimate-scatter-chart": "ultimate-scatter-chart@0.0.2", "ultimate-range-chart": "ultimate-range-chart@0.0.2", "ultimate-area-chart": "ultimate-area-chart@0.0.1", "ultimate-line-chart": "ultimate-line-chart@0.0.1", "zeppelin-bubblechart": "zeppelin-bubblechart@0.0.4", "zeppelin-highcharts-scatterplot": "zeppelin-highcharts-scatterplot@0.0.2" }, "packageConfig": {}, "bundleDisplayOrder": [ "ultimate-pie-chart", "ultimate-column-chart", "ultimate-scatter-chart", "ultimate-range-chart", "ultimate-area-chart", "ultimate-line-chart", "zeppelin-bubblechart", "zeppelin-highcharts-scatterplot" ] }
Run the following commands to load the plugins and adjust the permissions on the file.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" sudo aws s3 cp s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/helium.json \ /usr/lib/zeppelin/conf/helium.json sudo chown zeppelin:zeppelin /usr/lib/zeppelin/conf/helium.json
Create New JDBC Interpreter
Lastly, we need to create a new Zeppelin JDBC Interpreter to connect to our RDS database. By default, Zeppelin has several interpreters installed. You can review a list of available interpreters using the following command.
sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --list
The new JDBC interpreter will allow us to connect to our RDS PostgreSQL database, using Java Database Connectivity (JDBC). First, ensure all available interpreters are installed, including the current Zeppelin JDBC driver (org.apache.zeppelin:zeppelin-jdbc:0.8.0
) to /usr/lib/zeppelin/interpreter/jdbc
.
Creating a new interpreter is a two-part process. In this stage, we install the required interpreter files on the master node using the following command. Then later, in the Zeppelin web interface, we will configure the new PostgreSQL JDBC interpreter. Note we must provide a unique name for the interpreter (I chose ‘postgres’ in this case), which we will refer to in part two of the interpreter creation process.
sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --all sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh \ --name "postgres" \ --artifact org.apache.zeppelin:zeppelin-jdbc:0.8.0
To complete the post-EMR creation configuration on the master node, we must restart Zeppelin for our changes to take effect.
sudo stop zeppelin && sudo start zeppelin
In my experience, it could take 2–3 minutes for the Zeppelin UI to become fully responsive after a restart.
Zeppelin Web Interface Access
With all the EMR application configuration complete, we will access the Zeppelin web interface running on the master node. Use the Zeppelin connection information provided in the EMR web interface to setup SSH tunneling to the Zeppelin web interface, running on the master node. Using this method, we can also access the Spark History Server, Ganglia, and Hadoop Resource Manager web interfaces; all links are provided from EMR.
To set up a web connection to the applications installed on the EMR cluster, I am using FoxyProxy as a proxy management tool with Google Chrome.
If everything is working so far, you should see the Zeppelin web interface with all four Zeppelin notebooks available from the cloned GitHub repository. You will be logged in as the anonymous
user. Zeppelin offers authentication for accessing notebooks on the EMR cluster. For brevity, we will not cover setting up authentication in Zeppelin, using Shiro Authentication.
To confirm the path to the local, cloned copy of the GitHub notebook repository, is correct, check the Notebook Repos interface, accessible under the Settings dropdown (anonymous
user) in the upper right of the screen. The value should match the ZEPPELIN_NOTEBOOK_DIR
configuration property value in the emr_single_node.yml CloudFormation template we executed earlier.
To confirm the Helium Visualizations were pre-installed correctly, using the helium.json file, open the Helium interface, accessible under the Settings dropdown (anonymous
user) in the upper right of the screen.
Note the enabled visualizations. And, it is easy to enable additional plugins through the web interface.
New PostgreSQL JDBC Interpreter
If you recall, earlier, we install the required interpreter files on the master node using the following command using the bootstrap script. We will now complete the process of configuring the new PostgreSQL JDBC interpreter. Open the Interpreter interface, accessible under the Settings dropdown (anonymous
user) in the upper right of the screen.
The title of the new interpreter must match the name we used to install the interpreter files, ‘postgres’. The interpreter group will be ‘jdbc’. There are, minimally, three properties we need to configure for your specific RDS database instance, including default.url
, default.user
, and default.password
. These should match the values you used to create your RDS instance, earlier. Make sure to includes the database name in the default.url
. An example is shown below.
default.url: jdbc:postgresql://zeppelin-demo.abcd1234efg56.us-east-1.rds.amazonaws.com:5432/ratings default.user: masteruser default.password: 5up3r53cr3tPa55w0rd
We also need to provide a path to the PostgreSQL driver JAR dependency. This path is the location where we placed the JAR using the bootstrap.sh script, earlier, /home/hadoop/extrajars/postgresql-42.2.8.jar
. Save the new interpreter and make sure it starts successfully (shows a green icon).
Switch Interpreters to Python 3
The last thing we need to do is change the Spark and Python interpreters to use Python 3 instead of the default Python 2. On the same screen you used to create a new interpreter, modify the Spark and Python interpreters. First, for the Python interpreter, change the zeppelin.python
property to python3
.
Next, for the Spark interpreter, change the zeppelin.pyspark.python
property to python3
.
Part 2
In Part 1 of this post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3 data lake. In Part 2 of this post, we will explore some of Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the notebooks.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.