Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options
Posted by Gary A. Stafford in Software Development on December 29, 2020
Introduction
For anyone new to Amazon Managed Workflows for Apache Airflow (Amazon MWAA), especially those used to managing their own Apache Airflow platform, Amazon MWAA’s configuration might appear to be a bit of a black box at first. This brief post will explore Amazon MWAA’s configuration — how to inspect it and how to modify it. We will use Airflow DAGs to review an MWAA environment’s airflow.cfg
file, environment variables, and Python packages.
Amazon MWAA
Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project in 2019.
With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.
The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. For more information on Amazon MWAA, read my last post, Running Spark Jobs on Amazon EMR with Apache Airflow.

Source Code
The DAGs referenced in this post are available on GitHub. Using this git clone
command, download a copy of this post’s GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/aws-airflow-demo.git
Accessing Configuration
Environment Variables
Environment variables are an essential part of an MWAA environment’s configuration. There are various ways to examine the environment variables. You could use Airflow’s BashOperator
to simply call the command, env
, or the PythonOperator
to call a Python iterator function, as shown below. A sample DAG, dags/get_env_vars.py
, is included in the project.
The DAG’s PythonOperator
will iterate over the MWAA environment’s environment variables and output them to the task’s log. Below is a snippet of an example task’s log.
Airflow Configuration File
According to Airflow, the airflow.cfg
file contains Airflow’s configuration. You can edit it to change any of the settings. The first time you run Apache Airflow, it creates an airflow.cfg
configuration file in your AIRFLOW_HOME
directory and attaches the configurations to your environment as environment variables.
Amazon MWAA doesn’t expose the airflow.cfg
in the Apache Airflow UI of an environment. Although you can’t access it directly, you can view the airflow.cfg
file. The configuration file is located in your AIRFLOW_HOME
directory, /usr/local/airflow
(~/airflow
by default).
There are multiple ways to examine your MWAA environment’s airflow.cfg
file. You could use Airflow’s PythonOperator
to call a Python function that reads the contents of the file, as shown below. The function uses the AIRFLOW_HOME
environment variable to locate and read the airflow.cfg
. A sample DAG, dags/get_airflow_cfg.py
, is included in the project.
The DAG’s task will read the MWAA environment’s airflow.cfg
file and output it to the task’s log. Below is a snippet of an example task’s log.
Customizing Airflow Configurations
While AWS doesn’t expose the airflow.cfg
in the Apache Airflow UI of your environment, you can change the default Apache Airflow configuration options directly within the Amazon MWAA console and continue using all other settings in airflow.cfg
. The configuration options changed in the Amazon MWAA console are translated into environment variables.
To customize the Apache Airflow configuration, change the default options directly on the Amazon MWAA console. Select Edit, add or modify configuration options and values in the Airflow configuration options menu, then select Save. For example, we can change Airflow’s default timezone (core.default_ui_timezone
) to America/New_York
.

Once the MWAA environment is updated, which may take several minutes, view your changes by re-running the DAG,dags/get_env_vars.py
. Note the new configuration item on both lines 2 and 6 of the log snippet shown below. The configuration item appears on its own (AIRFLOW__CORE_DEFAULT__UI_TIMEZONE
), as well as part of the AIRFLOW_CONFIG_SECRETS
dictionary environment variable.
Using the MWAA API
We can also make configuration changes using the MWAA API. For example, to change the default Airflow UI timezone, call the MWAA API’s update-environment
command using the AWS CLI. Include the --airflow-configuration-option
parameter, passing the core.default_ui_timezone
key/value pair as a JSON blob.
To review an environment’s configuration, use the get-environment
command in combination with jq
.
Below, we see an example of the output.
Python Packages
Airflow is written in Python, and workflows are created via Python scripts. Python packages are a crucial part of an MWAA environment’s configuration. According to the documentation, an ‘extra package’, is a Python subpackage that is not included in the Apache Airflow base, installed on your MWAA environment. As part of setting up an MWAA environment, you can specify the location of the requirements.txt
file in the Airflow S3 bucket. Extra packages are installed using the requirements.txt
file.

There are several ways to check your MWAA environment’s installed Python packages and versions. You could use Airflow’s BashOperator
to call the command, python3 -m pip list
. A sample DAG, dags/get_py_pkgs.py
, is included in the project.
The DAG’s task will output a list of all Python packages and package versions to the task’s log. Below is a snippet of an example task’s log.
Conclusion
Understanding your Amazon MWAA environment’s airflow.cfg
file, environment variables, and Python packages are all important for proper Airflow platform management. This brief post learned more about Amazon MWAA’s configuration — how to inspect it using DAGs and how to modify it through the Amazon MWAA console.
Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, DevOps, Python, Software Development on December 24, 2020
Introduction
In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.
Amazon EMR
According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.
AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.
Amazon MWAA
Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.
Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation, while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache recently announced the release of Airflow 2.0.0 on December 17, 2020. The latest 1.x version of Airflow is 1.10.14, released December 12, 2020. However, at the time of this post, Amazon MWAA was running Airflow 1.10.12, released August 25, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow 1.10.12 documentation.
The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.
Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the Hooks, Secrets, Sensors, and Operators of Airflow codebase’s contrib section.
Getting Started
Source Code
Using this git clone
command, download a copy of this post’s GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \ https://github.com/garystafford/aws-airflow-demo.git
Preliminary Steps
This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.
Configuring Amazon MWAA
The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12
.

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-
. You must also enable Bucket Versioning on the bucket. Specify a dags
folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.


As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.
You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.
Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small
class will be sufficient for this demonstration.

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Airflow Execution Role
As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole
. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole
and EMR_EC2_DefaultRole
. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole
and EMR_EC2_DemoRole
. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject
permission.
Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json
, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.
The Airflow service role, created by MWAA, is shown below with the new policy attached.

Final Architecture
Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Amazon MWAA Environment
The new MWAA Environment will include a link to the Airflow UI.

Airflow UI
Using the supplied link, you should be able to access the Airflow UI using your web browser.

Our First DAG
The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py
. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi
job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.
Upload the DAG to the Airflow S3 bucket’s dags
directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.
aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/
The DAG, spark_pi_example
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Switching to the EMR Console, you should see the single-node EMR cluster being created.

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Triggering DAGs Programmatically
The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.
Below is an example of triggering the spark_pi_example
DAG programmatically using Airflow’s trigger_dag
CLI command. You will need to replace the WEB_SERVER_HOSTNAME
variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME
variable assumes only one MWAA environment is returned by jq
.
Analytics Job with Airflow
Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py
PySpark application. This job should already exist in the processed
data S3 bucket.
The DAG, dags/bakery_sales.py
, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators
and airflow.contrib.sensors
packages for EMR.
Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value
in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf
.
Import Variables into Airflow UI
First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json
file. You will need to update the values for bootstrap_bucket
, emr_ec2_key_pair
, logs_bucket
, and work_bucket
. The three S3 buckets should all exist from the previous post.
Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Upload the DAG, dags/bakery_sales.py
, to the Airflow S3 bucket, similar to the first DAG.
aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/
The second DAG, bakery_sales
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json
.
{ "airflow_email": "analytics_team@example.com", "email_on_failure": false, "email_on_retry": false }
This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Multi-Step DAG
In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator
steps
parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator
job_flow_overrides
parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.
We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.
Below we see a snippet of two of the four Spark submit-job
job definitions (steps
), which have been moved to a separate JSON file, emr_steps/emr_steps.json
.
Below are the EMR cluster specifications (job_flow_overrides)
, which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json
.
Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name)
, which loads the JSON.
This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.
aws s3 cp emr_steps/emr_steps.json \
s3://emr-demo-work-123412341234
-us-east-1/emr_steps/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
s3://emr-demo-work-123412341234
-us-east-1/job_flow_overrides/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234
-us-east-1/dags/
The second DAG, multiple_steps
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales
DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json
.

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.

Triggering DAGs Programmatically
Similar to the previous example, below we can trigger the multiple_steps
DAG programmatically using Airflow’s trigger_dag
CLI command. Note the addition of the —-conf
named argument, which passes the configuration, containing three key/value pairs, to the trigger command as a JSON blob.
Cleaning Up
Once you are done with the MWAA Environment, be sure to delete it as soon as possible to save additional costs. Also, delete the MWAA-VPC
CloudFormation stack. These resources, like the two NAT Gateways, will also continue to generate additional costs.
aws mwaa delete-environment --name <your_mwaa_environment_name>
aws cloudformation delete-stack --stack-name MWAA-VPC
Conclusion
In this second post in the series, we explored using the newly released Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to run PySpark applications on Amazon Elastic MapReduce (Amazon EMR). In future posts, we will explore the use of Jupyter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.
If you are interested in learning more about configuring Amazon MWAA and Airflow, see my recent post, Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Installing Apache Superset on Amazon EMR: Add data exploration and visualization to your analytics cluster
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, Python on December 24, 2020
Introduction
AWS provides nearly twenty-five different open-source data analytics applications that can be automatically installed and configured on Amazon Elastic MapReduce (Amazon EMR). Of all those options, EMR doesn’t offer a general-purpose data exploration and visualization tool. However, with EMR, you can automate the installation of additional software as part of the cluster creation process or post cluster creation. This brief post will explore how to install, configure, and access Apache Superset, the modern data exploration and visualization platform on Amazon EMR’s Master Node, as a post-cluster creation step. You can use these same techniques to install other software packages on EMR as well, manually or as part of an automated Data Pipeline.
Amazon EMR
According to AWS, Amazon EMR is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

AWS currently offers 5.x and 6.x versions of Amazon EMR. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0. Each version of Amazon EMR offers incremental major and minor releases of nearly 25 different, popular open-source big-data software packages to choose from, which Amazon EMR will install and configure when the cluster is created.
Apache Superset
According to its website, Apache Superset is a modern data exploration and visualization platform. Superset is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.
Superset natively supports over twenty-five data sources, including Amazon Athena and Redshift, Apache Drill, Druid, Hive, Impala, Kylin, Pinot, and Spark SQL, Elasticsearch, Google BigQuery, Hana, MySQL, Oracle, Postgres, Presto, Snowflake, Microsoft SQL Server, and Teradata.
As shown in their Gallery, Superset includes dozens of visualization types, including Pivot Table, Line Chart, Markup, Pie Chart, Filter Box, Bubble Chart, Box Plot, Histogram, Heatmap, Sunburst, Calendar Heatmap, and several geospatial types.
Setup
Using this git clone
command, download a copy of this post’s open-source GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-superset-demo.git
To demonstrate how to install Apache Superset on EMR, I have prepared an AWS CloudFormation template. Deploying the template, cloudformation/superset-emr-demo.yml
, to AWS will result in the AWS CloudFormation stack, superset-emr-demo-dev
. The stack creates a minimally-sized, two-node EMR cluster, two Amazon S3 buckets, and several AWS Systems Manager (SSM) Parameter Store parameters.
There is also a JSON-format CloudFormation parameters file, cloudformation/superset-emr-demo-params-dev.json
. The parameters file contains values for eight of the ten required parameters in the CloudFormation template, all of which you can adjust. For the remaining two required parameters, you will need to supply the name of an existing EC2 key pair to access the EMR Master node. The key pair will need to be deployed to the same AWS Account into which you are deploying EMR. You will also need to supply a Subnet ID for the EMR cluster to be installed into. The subnet must have access to the Internet to install Superset’s required system and Python packages and to access Superset’s web-based user interface. If you need help creating a VPC and subnet to deploy EMR into, refer to my previous blog post, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.
The CloudFormation stack is created using a Python script, create_cfn_stack.py
. The python script uses the AWS boto3
Python SDK.
To execute the Python script and create the CloudFormation stack, which will create the EMR cluster, run the following command. Remember to update the parameters to the name of your EC2 key pair and the Subnet ID for the EMR cluster.
python3 ./create_cfn_stack.py \
--ec2-key-name <your_key_pair_name> \
--ec2-subnet-id <your_subnet_id> \
--environment dev
Here is what the complete CloudFormation workflow looks like.
Security Group Ingress Rules
To install Superset on the EMR cluster’s Master node via SSH, you need to open port 22
on the Security Group associated with the EMR cluster’s Master Node, allowing access from your IP address. You can use the AWS Management Console or AWS CLI to open port 22
. We will use jq
and AWS ec2
API from the AWS CLI to get the Security Group ID associated with the EMR cluster’s Master Node and create the two ingress rules.
export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r ".SecurityGroups[] | \
select(.GroupName==\"ElasticMapReduce-master\").GroupId" | \
head -n 1)
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32
Superset Script
Once the CloudFormation stack is created and the ports are open, we can install Apache Superset on the EMR Master Node. The bootstrap script,bootstrap_emr/bootstrap_superset.sh
, will be used to install Apache Superset onto the EMR cluster’s Master Node as the hadoop
user. The script is roughly based on Superset’s Installing from Scratch instructions.
As part of installing Superset, the script will also deploy several common database drivers, including Amazon Athena, Amazon Redshift, Apache Spark SQL, Presto, PostgreSQL, and MySQL. The script will also create a Superset Admin role, and two Superset User roles — Alpha and Gamma.
To install Superset using the bootstrap script, we will use another Python script, install_superset.py
. The script uses paramiko
, a Python implementation of SSHv2. The script also uses scp
, a module that uses a paramiko
transport to send and receive files via the scp1 protocol.
The script requires a single input parameter, ec2-key-path
, which is the full path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem
). Optionally, you can change the default Superset port of 8280
, using the superset-port
parameter.
python3 ./install_superset.py \
--ec2-key-path </path/to/my-key-pair.pem> \
--superset-port 8280
The script uses SSH and SCP to deploy and execute the bootstrap script,bootstrap_superset.sh
. The output from the script includes the URL of Apache Superset running on the EMR cluster. The output also contains the username and password of the Superset Admin.
******************************************************************** Superset URL: http://ec2-111-22-333-44.compute-1.amazonaws.com:8280 Admin Username: SupersetAdmin Admin Password: Admin1234 ********************************************************************
SSH Tunnel
According to AWS, EMR applications publish user interfaces as websites hosted on the master node. For security reasons, these websites are only available on the master node’s local web server. To reach any of the web interfaces, you must establish an SSH tunnel with the master node using either dynamic or local port forwarding. If you are using dynamic port forwarding, you must also configure a proxy server to view the web interfaces.
Running the command in your terminal will start the SSH tunnel on port 8157
. Once the tunnel is enabled, you can access Apache Superset in a web browser, using the script output’s URL shown in the script output above. Use the Admin credentials or either of the two User credentials to sign in to Superset.
Once signed in, you will have the ability to connect to your data sources and explore and visualize data. Below, we see an example of a SQL query executed against an Amazon RDS for PostgreSQL database, running in a separate VPC from EMR.
Conclusion
In this post, we learned how to install Apache Superset onto the Master Node of an Amazon EMR Cluster. If you want to install an application on all the nodes of an EMR cluster, you can add the commands to the bootstrap script, which runs when CloudFormation creates the cluster.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce
Posted by Gary A. Stafford in AWS, Build Automation, Cloud, Python, Software Development on December 2, 2020
Introduction
According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.
With EMR, individuals and teams can also use notebooks, including EMR Notebooks, based on JupyterLab, the web-based interactive development environment for Jupyter notebooks for ad-hoc data analytics. Apache Zeppelin is also available to collaborate and interactively explore, process, and visualize data. With EMR notebooks and the EMR API, users can programmatically execute a notebook without the need to interact with the EMR console, referred to as headless execution.
AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. One major difference between EMR versions relevant to this post is EMR 6.x’s support for the latest Hadoop and Spark 3.x frameworks. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.
PySpark on EMR
In the following series of posts, we will focus on the options available to interact with Amazon EMR using the Python API for Apache Spark, known as PySpark. We will divide the methods for accessing PySpark on EMR into two categories: PySpark applications and notebooks. We will explore both interactive and automated patterns for running PySpark applications (Python scripts) and PySpark-based notebooks. In this first post, I will cover the first four PySpark Application Methods listed below. In part two, I will cover Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and in part three, the use of notebooks.
PySpark Application Methods
- Add Job Flow Steps: Remote execution of EMR Steps on an existing EMR cluster using the
add_job_flow_steps
method; - EMR Master Node: Remote execution over SSH of PySpark applications using
spark-submit
on an existing EMR cluster’s Master node; - Run Job Flow: Remote execution of EMR Steps on a newly created long-lived or auto-terminating EMR cluster using the
run_job_flow
method; - AWS Step Functions: Remote execution of EMR Steps using AWS Step Functions on an existing or newly created long-lived or auto-terminating EMR cluster;
- Apache Airflow: Remote execution of EMR Steps using the recently released Amazon MWAA on an existing or newly created long-lived or auto-terminating EMR cluster (see part two of this series);
Notebook Methods
- EMR Notebooks for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Jupyter Notebooks on an existing EMR cluster;
- Headless Execution of EMR Notebooks: Headless execution of notebooks from an existing EMR cluster or newly created auto-terminating cluster;
- Apache Zeppelin for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Zeppelin notebooks on an existing EMR cluster;
Note that wherever the AWS SDK for Python (boto3
) is used in this post, we can substitute the AWS CLI or AWS Tools for PowerShell. Typically, these commands and Python scripts would be run as part of a DevOps or DataOps deployment workflow, using CI/CD platforms like AWS CodePipeline, Jenkins, Harness, CircleCI, Travis CI, or Spinnaker.
Preliminary Tasks
To prepare the AWS EMR environment for this post, we need to perform a few preliminary tasks.
- Download a copy of this post’s GitHub repository;
- Download three Kaggle datasets and organize locally;
- Create an Amazon EC2 key pair;
- Upload the EMR bootstrap script and create the CloudFormation Stack;
- Allow your IP address access to the EMR Master node on port 22;
- Upload CSV data files and PySpark applications to S3;
- Crawl the raw data and create a Data Catalog using AWS Glue;
Step 1: GitHub Repository
Using this git clone
command, download a copy of this post’s GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-demo.git
Step 2: Kaggle Datasets
Kaggle is a well-known data science resource with 50,000 public datasets and 400,000 public notebooks. We will be using three Kaggle datasets in this post. You will need to join Kaggle to access these free datasets. Download the following three Kaggle datasets as CSV files. Since we are working with (moderately) big data, the total size of the datasets will be approximately 1 GB.
- Movie Ratings: https://www.kaggle.com/rounakbanik/the-movies-dataset
- Bakery: https://www.kaggle.com/sulmansarwar/transactions-from-a-bakery
- Stocks: https://www.kaggle.com/timoboz/stock-data-dow-jones
Organize the (38) downloaded CSV files into the raw_data
directory of the locally cloned GitHub repository, exactly as shown below. We will upload these files to Amazon S3, in the proceeding step.
> tree raw_data --si -v -A
raw_data
├── [ 128] bakery
│ ├── [711k] BreadBasket_DMS.csv
├── [ 320] movie_ratings
│ ├── [190M] credits.csv
│ ├── [6.2M] keywords.csv
│ ├── [989k] links.csv
│ ├── [183k] links_small.csv
│ ├── [ 34M] movies_metadata.csv
│ ├── [710M] ratings.csv
│ └── [2.4M] ratings_small.csv
└── [1.1k] stocks
├── [151k] AAPL.csv
├── [146k] AXP.csv
├── [150k] BA.csv
├── [147k] CAT.csv
├── [146k] CSCO.csv
├── [149k] CVX.csv
├── [147k] DIS.csv
├── [ 42k] DWDP.csv
├── [150k] GS.csv
└── [...] abrdiged...
In this post, we will be using three different datasets. However, if you want to limit the potential costs associated with big data analytics on AWS, you can choose to limit job submissions to only one or two of the datasets. For example, the bakery and stocks datasets are fairly small yet effectively demonstrate most EMR features. In contrast, the movie rating dataset has nearly 27 million rows of ratings data, which starts to demonstrate the power of EMR and PySpark for big data.
Step 3: Amazon EC2 key pair
According to AWS, a key pair, consisting of a private key and a public key, is a set of security credentials that you use to prove your identity when connecting to an [EC2] instance. Amazon EC2 stores the public key, and you store the private key. To SSH into the EMR cluster, you will need an Amazon key pair. If you do not have an existing Amazon EC2 key pair, create one now. The easiest way to create a key pair is from the AWS Management Console.

Your private key is automatically downloaded when you create a key pair in the console. Store your private key somewhere safe. If you use an SSH client on a macOS or Linux computer to connect to EMR, use the following chmod
command to set the correct permissions of your private key file so that only you can read it.
chmod 0400 /path/to/my-key-pair.pem
Step 4: Bootstrap Script and CloudFormation Stack
The bulk of the resources that are used as part of this demonstration are created using the CloudFormation stack, emr-dem-dev
. The CloudFormation template that creates the stack, cloudformation/emr-demo.yml
, is included in the repository. Please review all resources and understand the cost and security implications before continuing.
There is also a JSON-format CloudFormation parameters file, cloudformation/emr-demo-params-dev.json
, containing values for all but two of the parameters in the CloudFormation template. The two parameters not in the parameter file are the name of the EC2 key pair you just created and the bootstrap bucket’s name. Both will be passed along with the CloudFormation template using the Python script, create_cfn_stack.py
. For each type of environment, such as Development, Test, and Production, you could have a separate CloudFormation parameters file, with different configurations.
The template will create approximately (39) AWS resources, including a new AWS VPC, a public subnet, an internet gateway, route tables, a 3-node EMR v6.2.0 cluster, a series of Amazon S3 buckets, AWS Glue data catalog, AWS Glue crawlers, several Systems Manager Parameter Store parameters, and so forth.
The CloudFormation template includes the location of the EMR bootstrap script located on Amazon S3. Before creating the CloudFormation stack, the Python script creates an S3 bootstrap bucket and copies the bootstrap script, bootstrap_actions.sh
, from the local project repository to the S3 bucket. The script will be used to install additional packages on EMR cluster nodes, which are required by our PySpark applications. The script also sets the default AWS Region for boto3
.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create the bootstrap bucket, copy the bootstrap script, and provision the CloudFormation stack. You will need to pass the name of your EC2 key pair to the script as a command-line argument.
python3 ./scripts/create_cfn_stack.py \ --environment dev \ --ec2-key-name <my-key-pair-name>
The CloudFormation template should create a CloudFormation stack, emr-demo-dev
, as shown below.

Step 5: SSH Access to EMR
For this demonstration, we will need access to the new EMR cluster’s Master EC2 node, using SSH and your key pair, on port 22. The easiest way to add a new inbound rule to the correct AWS Security Group is to use the AWS Management Console. First, find your EC2 Security Group named ElasticMapReduce-master
.

Then, add a new Inbound rule for SSH (port 22) from your IP address, as shown below.

Alternately, you could use the AWS CLI or AWS SDK to create a new security group ingress rule.
export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r '.SecurityGroups[] | select(.GroupName=="ElasticMapReduce-master").GroupId')
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32
Step 6: Raw Data and PySpark Apps to S3
As part of the emr-demo-dev
CloudFormation stack, we now have several new Amazon S3 buckets within our AWS Account. The naming conventions and intended usage of these buckets follow common organizational patterns for data lakes. The data buckets use the common naming convention of raw
, processed
, and analyzed
data in reference to the data stored within them. We also use a widely used, corresponding naming convention of ‘bronze’, ‘silver’, and ‘gold’ when referring to these data buckets as parameters.
> aws s3api list-buckets | \ jq -r '.Buckets[] | select(.Name | startswith("emr-demo-")).Name' emr-demo-raw-123456789012-us-east-1 emr-demo-processed-123456789012-us-east-1 emr-demo-analyzed-123456789012-us-east-1 emr-demo-work-123456789012-us-east-1 emr-demo-logs-123456789012-us-east-1 emr-demo-glue-db-123456789012-us-east-1 emr-demo-bootstrap-123456789012-us-east-1
There is a raw
data bucket (aka bronze) that will contain the original CSV files. There is a processed
data bucket (aka silver) that will contain data that might have had any number of actions applied: data cleansing, obfuscation, data transformation, file format changes, file compression, and data partitioning. Finally, there is an analyzed
data bucket (aka gold) that has the results of the data analysis. We also have a work
bucket that holds the PySpark applications, a logs
bucket that holds EMR logs, and a glue-db
bucket to hold the Glue Data Catalog metadata.
Whenever we submit PySpark jobs to EMR, the PySpark application files and data will always be accessed from Amazon S3. From the GitHub repository’s local copy, run the following command, which will execute a Python script to upload the approximately (38) Kaggle dataset CSV files to the raw
S3 data bucket.
python3 ./scripts/upload_csv_files_to_s3.py
Next, run the following command, which will execute a Python script to upload a series of PySpark application files to the work
S3 data bucket.
python3 ./scripts/upload_apps_to_s3.py
Step 7: Crawl Raw Data with Glue
The last preliminary step to prepare the EMR demonstration environment is to catalog the raw CSV data into an AWS Glue data catalog database, using one of the two Glue Crawlers we created. The three kaggle dataset’s data will reside in Amazon S3, while their schema and metadata will reside within tables in the Glue data catalog database, emr_demo
. When we eventually query the data from our PySpark applications, we will be querying the Glue data catalog’s database tables, which reference the underlying data in S3.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the raw data’s schema and metadata information into the Glue data catalog database, emr_demo
.
python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-raw
Once the crawler is finished, from the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo
, all prefixed with raw_
. The tables hold metadata and schema information for the three CSV-format Kaggle datasets loaded into S3.

Alternately, we can use the glue get-tables
AWS CLI command to review the tables.
> aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("raw_")).Name' raw_bakery raw_credits_csv raw_keywords_csv raw_links_csv raw_links_small_csv raw_movies_metadata_csv raw_ratings_csv raw_ratings_small_csv raw_stocks
PySpark Applications
Let’s explore four methods to run PySpark applications on EMR.

1. Add Job Flow Steps to an Existing EMR Cluster
We will start by looking at running PySpark applications using EMR Steps. According to AWS, we can use Amazon EMR steps to submit work to the Spark framework installed on an EMR cluster. The EMR step for PySpark uses a spark-submit
command. According to Spark’s documentation, the spark-submit
script, located in Spark’s bin
directory, is used to launch applications on a [EMR] cluster. A typical spark-submit
command we will be using resembles the following example. This command runs a PySpark application in S3, bakery_sales_ssm.py
.
We will target the existing EMR cluster created by CloudFormation earlier to execute our PySpark applications using EMR Steps. We have two sets of PySpark applications. The first set of three PySpark applications will transform the raw CSV-format datasets into Apache Parquet, a more efficient file format for big data analytics. Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data.
There are two versions of each PySpark application. Files with suffix _ssm
use the AWS Systems Manager (SSM) Parameter Store service to obtain dynamic parameter values at runtime on EMR. Corresponding non-SSM applications require those same parameter values to be passed on the command line when they are submitted to Spark. Therefore, these PySpark applications are not tightly coupled to boto3
or the SSM Parameter Store. We will use _ssm
versions of the scripts in this post’s demonstration.
> tree pyspark_apps --si -v -A
pyspark_apps
├── [ 320] analyze
│ ├── [1.4k] bakery_sales.py
│ ├── [1.5k] bakery_sales_ssm.py
│ ├── [2.6k] movie_choices.py
│ ├── [2.7k] movie_choices_ssm.py
│ ├── [2.0k] movies_avg_ratings.py
│ ├── [2.3k] movies_avg_ratings_ssm.py
│ ├── [2.2k] stock_volatility.py
│ └── [2.3k] stock_volatility_ssm.py
└── [ 256] process
├── [1.1k] bakery_csv_to_parquet.py
├── [1.3k] bakery_csv_to_parquet_ssm.py
├── [1.3k] movies_csv_to_parquet.py
├── [1.5k] movies_csv_to_parquet_ssm.py
├── [1.9k] stocks_csv_to_parquet.py
└── [2.0k] stocks_csv_to_parquet_ssm.py
We will start by executing the three PySpark processing applications. They will convert the CSV data to Parquet. Below, we see an example of one of the PySpark applications we will run, bakery_csv_to_parquet_ssm.py
. The PySpark application will convert the Bakery Sales dataset’s CSV file to Parquet and write it to S3.
The three PySpark data processing application’s spark-submit
commands are defined in a separate JSON-format file, job_flow_steps_process.json
, a snippet of which is shown below. The same goes for the four analytics applications.
Using this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python script, add_job_flow_steps_process.py
, shown below. Note line 31, where the Steps are injected into the add_job_flow_steps
method’s parameters.
The Python script used for this task takes advantage of AWS Systems Manager Parameter Store parameters. The parameters were placed in the Parameter Store, within the /emr_demo
path, by CloudFormation. We will reference these parameters in several scripts throughout the post.
> aws ssm get-parameters-by-path --path '/emr_demo' | \ jq -r ".Parameters[] | {Name: .Name, Value: .Value}"
From the GitHub repository’s local copy, run the following command, which will execute a Python script to load the three spark-submit
commands from JSON-format file, job_flow_steps_process.json
, and run the PySpark processing applications on the existing EMR cluster.
python3 ./scripts/add_job_flow_steps.py --job-type process
While the three Steps are running concurrently, the view from the Amazon EMR Console’s Cluster Steps tab should look similar to the example below.

Once the three Steps have been completed, we should note three sub-directories in the processed
data bucket containing Parquet-format files.

Of special note is the Stocks dataset, which has been converted to Parquet and partitioned by stock symbol. According to AWS, by partitioning your data, we can restrict the amount of data scanned by each query by specifying filters based on the partition, thus improving performance and reducing cost.

Lastly, the movie ratings dataset has been divided into sub-directories, based on the schema of each table. Each sub-directory contains Parquet files specific to that unique schema.

Crawl Processed Data with Glue
Similar to the raw data earlier, catalog the newly processed Parquet data into the same AWS Glue data catalog database using one of the two Glue Crawlers we created. Similar to the raw data, earlier, processed data will reside in the Amazon S3 processed
data bucket while their schemas and metadata will reside within tables in the Glue data catalog database, emr_demo
.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the processed data’s schema and metadata information into the Glue data catalog database, emr_demo
.
python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-processed
Once the crawler has finished successfully, using the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo
, all prefixed with processed_
. The tables represent the three kaggle dataset’s contents converted to Parquet and correspond to the equivalent tables with the raw_
prefix.

Alternately, we can use the glue get-tables
AWS CLI command to review the tables.
> aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("processed_")).Name' processed_bakery processed_credits processed_keywords processed_links processed_links_small processed_movies_metadata processed_ratings processed_ratings_small processed_stocks
2. Run PySpark Jobs from EMR Master Node
Next, we will explore how to execute PySpark applications remotely on the Master node on the EMR cluster using boto3
and SSH. Although this method may be optimal for certain use cases as opposed to using the EMR SDK, remote SSH execution does not scale as well in my opinion due to a lack of automation, and it exposes some potential security risks.
There are four PySpark applications in the GitHub repository. For this part of the demonstration, we will just submit the bakery_sales_ssm.py
application. This application will perform a simple analysis of the bakery sales data. While the other three PySpark applications use AWS Glue, the bakery_sales_ssm.py
application reads data directly from the processed
data S3 bucket.
The application writes its results into the analyzed
data S3 bucket, in both Parquet and CSV formats. The CSV file is handy for business analysts and other non-technical stakeholders who might wish to import the results of the analysis into Excel or business applications.
Earlier, we created an inbound rule to allow your IP address to access the Master node on port 22. From the EMR Console’s Cluster Summary tab, note the command necessary to SSH into the Master node of the EMR cluster.

The Python script, submit_spark_ssh.py
, shown below, will submit the PySpark job to the EMR Master Node, using paramiko
, a Python implementation of SSHv2. The script is replicating the same functionality as the shell-based SSH command above to execute a remote command on the EMR Master Node. The spark-submit
command is on lines 36–38, below.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to submit the job. The script requires one input parameter, which is the path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem
)
python3 ./scripts/submit_spark_ssh.py \
--ec2-key-path </path/to/my-key-pair.pem>
The spark-submit
command will be executed remotely on the EMR cluster’s Master node over SSH. All variables in the commands will be replaced by the environment variables, set in advance, which use AWS CLI emr
and ssm
commands.

Monitoring Spark Jobs
We set spark.yarn.submit.waitAppCompletion
to true
. According to Spark’s documentation, this property controls whether the client waits to exit in YARN cluster mode until the application is completed. If set to true
, the client process will stay alive, reporting the application’s status. Otherwise, the client process will exit after submission. We can watch the job’s progress from the terminal.

We can also use the YARN Timeline Server and the Spark History Server in addition to the terminal. Links to both are shown on both the EMR Console’s Cluster ‘Summary’ and ‘Application user interfaces’ tabs. Unlike other EMR application web interfaces, using port forwarding, also known as creating an SSH tunnel, is not required for the YARN Timeline Server or the Spark History Server.

YARN Timeline Server
Below, we see that the job we submitted running on the YARN Timeline Server also includes useful tools like access to configuration, local logs, server stacks, and server metrics.

YARN Timeline Server allows us to drill down into individual jobs and view logs. Logs are ideal for troubleshooting failed jobs, especially the stdout
logs.

Spark History Server
You can also view the PySpark application we submitted from the Master node using the Spark History Server. Below, we see completed Spark applications (aka Spark jobs) in the Spark History Server.

Below, we see more details about our Spark job using the Spark History Server.

We can even see visual representations of each Spark job’s Directed Acyclic Graph (DAG).

3. Run Job Flow on an Auto-Terminating EMR Cluster
The next option to run PySpark applications on EMR is to create a short-lived, auto-terminating EMR cluster using the run_job_flow
method. We will create a new EMR cluster, run a series of Steps (PySpark applications), and then auto-terminate the cluster. This is a cost-effective method of running PySpark applications on-demand.
We will create a second 3-node EMR v6.2.0 cluster to demonstrate this method, using Amazon EC2 Spot instances for all the EMR cluster’s Master and Core nodes. Unlike the first, long-lived, more general-purpose EMR cluster, we will only deploy the Spark application to this cluster as that is the only application we will need to run the Steps.
Using the run_job_flow
method, we will execute the four PySpark data analysis applications. The PySpark application’s spark-submit
commands are defined in a separate JSON-format file, job_flow_steps_analyze.json
. Similar to the previous add_job_flow_steps.py
script, this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python execution script. Also similar, this script retrieves parameter values from the SSM Parameter Store.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create a new cluster, run the two PySpark applications, and then auto-terminate.
python3 ./scripts/run_job_flow.py --job-type analyze
As shown below, we see the short-lived EMR cluster in the process of terminating after successfully running the PySpark applications as EMR Steps.


4. Using AWS Step Functions
According to AWS, AWS Step Functions is a serverless function orchestrator that makes it easy to sequence AWS Lambda functions and multiple AWS services. Step Functions manages sequencing, error handling, retry logic, and state, removing a significant operational burden from your team. Step Functions is based on state machines and tasks. A state machine is a workflow. A task is a state in a workflow that represents a single unit of work that another AWS service performs. Each step in a workflow is a state. Using AWS Step Functions, we define our workflows as state machines, which transform complex code into easy to understand statements and diagrams.

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.
We will create two state machines for this demo, one for the PySpark data processing applications and one for the PySpark data analysis applications. To create state machines, we first need to create JSON-based state machine definition files. The files are written in Amazon States Language. According to AWS, Amazon States Language is a JSON-based, structured language used to define a state machine, a collection of states that can do work (Task states), determine which states to transition to next (Choice states), stop execution with an error (Fail states), and so on.
The definition files contain specific references to AWS resources deployed to your AWS account originally created by CloudFormation. Below is a snippet of the state machine definition file, step_function_emr_analyze.json
, showing part of the configuration of the EMR cluster. Note the parameterized key/value pairs (e.g., “Ec2KeyName.$”: “$.InstancesEc2KeyName” on line 5). The values will come from a JSON-formatted inputs file and are dynamically replaced upon the state machine’s execution.
Python Templating
To automate the process of adding dynamic resource references to the state machine’s inputs files, we will use Jinja, the modern and designer-friendly templating language for Python, modeled after Django’s templates. We will render the Jinja template to a JSON-based state machine inputs file, replacing the template’s resource tags (keys) with values from the SSM Parameter Store’s parameters. Below is a snippet from the inputs file Jinja template, step_function_inputs_analyze.j2
.
First, install Jinja2, then create two JSON-based state machine inputs files from the Jinja templates using the included Python file.
# install Jinja2 python3 -m pip install Jinja2 python3 ./scripts/create_inputs_files.py
Below we see the same snippet of the final inputs file. Jinja tags have been replaced with values from the SSM Parameter Store.
Using the definition files, create two state machines using the included Python files.
python3 ./scripts/create_state_machine.py \ --definition-file step_function_emr_process.json \ --state-machine EMR-Demo-Process python3 ./scripts/create_state_machine.py \ --definition-file step_function_emr_analyze.json \ --state-machine EMR-Demo-Analysis
Both state machines should appear in the AWS Step Functions Console’s State Machines tab. Below, we see the ‘EMR-Demo-Analysis’ state machine’s definition both as JSON and rendered visually to a layout.

To execute either of the state machines, use the included Python file, passing in the exact name of the state machine to execute, either ‘EMR-Demo-Process’ or ‘EMR-Demo-Analysis’, and the name of the inputs file. I suggest running the EMR-Demo-Analysis version so as not to re-process all the raw data.
python3 ./scripts/execute_state_machine.py \ --state-machine EMR-Demo-Process \ --inputs-file step_function_inputs_process.json python3 ./scripts/execute_state_machine.py \ --state-machine EMR-Demo-Analysis \ --inputs-file step_function_inputs_analyze.json
When the PySpark analysis application’s Step Function state machine is executed, a new EMR cluster is created, the PySpark applications are run, and finally, the cluster is auto-terminated. Below, we see a successfully executed state machine, which successfully ran the four PySpark analysis applications in parallel, on a new auto-terminating EMR cluster.

Conclusion
This post explored four methods for running PySpark applications on Amazon Elastic MapReduce (Amazon EMR). The key to scaling data analytics with PySpark on EMR is the use of automation. Therefore, we looked at ways to automate the deployment of EMR resources, create and submit PySpark jobs, and terminate EMR resources when the jobs are complete. Furthermore, we were able to decouple references to dynamic AWS resources within our PySpark applications using parameterization. This allows us to deploy and run PySpark resources across multiple AWS Accounts and AWS Regions without code changes.
In part two of the series, we will explore the use of the recently announced service, Amazon Managed Workflows for Apache Airflow (MWAA), and in part three, the use of Juypter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
GTM Stack: Exploring IoT Data Analytics at the Edge with Grafana, Mosquitto, and TimescaleDB on ARM-based Architectures
Posted by Gary A. Stafford in Big Data, IoT, Python, Raspberry Pi, Software Development on October 12, 2020
In the following post, we will explore the integration of several open-source software applications to build an IoT edge analytics stack, designed to operate on ARM-based edge nodes. We will use the stack to collect, analyze, and visualize IoT data without first shipping the data to the Cloud or other external systems.

The Edge
Edge computing is a fast-growing technology trend, which involves pushing compute capabilities to the edge. Wikipedia describes edge computing as a distributed computing paradigm that brings computation and data storage closer to the location needed to improve response times and save bandwidth. The term edge commonly refers to a compute node at the edge of a network (edge device), sitting in close proximity to the source a data and between that data source and external system such as the Cloud.
In his recent post, 3 Advantages (And 1 Disadvantage) Of Edge Computing, well-known futurist Bernard Marr argues reduced bandwidth requirements, reduced latency, and enhanced security and privacy as three primary advantages of edge computing. Due to techniques like data downsampling, Marr advises one potential disadvantage of edge computing is that important data could end up being overlooked and discarded in the quest to save bandwidth and reduce latency.
David Ricketts, Head of Marketing at Quiss Technology PLC, estimates in his post, Cloud and Edge Computing — The Stats You Need to Know for 2018, the global edge computing market is expected to reach USD 6.72 billion by 2022 at a compound annual growth rate of a whopping 35.4 percent. Realizing the market potential, many major Cloud providers, edge device manufacturers, and integrators are rapidly expanding their edge compute capabilities. AWS, for example, currently offers more than a dozen services in the edge computing category.
Internet of Things
Edge computing is frequently associated with the Internet of Things (IoT). IoT devices, industrial equipment, and sensors generate data, which is transmitted to other internal and external systems, often by way of edge nodes, such as an IoT Gateway. IoT devices typically generate time-series data. According to Wikipedia, a time series is a set of data points indexed in time order — a sequence taken at successive equally spaced points in time. IoT devices typically generate continuous high-volume streams of time-series data, often on the scale of millions of data points per second. IoT data characteristics require IoT platforms to minimally support temporal accuracy, high-volume ingestion and processing, efficient data compression and downsampling, and real-time querying capabilities.
The IoT devices and the edge devices, such as IoT Gateways, which aggregate and transmit IoT data from these devices to external systems, are generally lower-powered, with limited processor, memory, and storage capabilities. Accordingly, IoT platforms must satisfy all the requirements of IoT data while simultaneously supporting resource-constrained environments.
IoT Analytics at the Edge
Leading Cloud providers AWS, Azure, Google Cloud, IBM Cloud, Oracle Cloud, and Alibaba Cloud all offer IoT services. Many offer IoT services with edge computing capabilities. AWS offers AWS IoT Greengrass. Greengrass provides local compute, messaging, data management, sync, and ML inference capabilities to edge devices. Azure offers Azure IoT Edge. Azure IoT Edge provides the ability to run AI, Azure and third-party services, and custom business logic on edge devices using standard containers. Google Cloud offers Edge TPU. Edge TPU (Tensor Processing Unit) is Google’s purpose-built application-specific integrated circuit (ASIC), designed to run AI at the edge.
IoT Analytics
Many Cloud providers also offer IoT analytics as part of their suite of IoT services, although not at the edge. AWS offers AWS IoT Analytics, while Azure has Azure Time Series Insights. Google provides IoT analytics, indirectly, through downstream analytic systems and ad hoc analysis using Google BigQuery or advanced analytics and machine learning with Cloud Machine Learning Engine. These services generally all require data to be transmitted to the Cloud for analytics.

The ability to analyze IoT data at the edge, as data is streamed in real-time, is critical to a rapid feedback loop. IoT edge analytics can accelerate anomaly detection, improve predictive maintenance capabilities, and expedite proactive inventory replenishment.
The IoT Edge Analytics Stack
In my opinion, the ideal IoT edge analytics stack is comprised of lightweight, purpose-built, easily deployable and manageable, platform- and programming language-agnostic, open-source software components. The minimal IoT edge analytics stack should include a lightweight message broker, a time-series database, an ANSI-standard ad-hoc query engine, and a data visualization tool. Each component should be purpose-built for IoT.
Lightweight Message Broker
We will use Eclipse Mosquitto as our message broker. According to the project’s description, Mosquitto is an open-source message broker that implements the Message Queuing Telemetry Transport (MQTT) protocol versions 5.0, 3.1.1, and 3.1. Mosquitto is lightweight and suitable for use on all devices from low power single board computers (SBCs) to full servers.
MQTT Client Library
We will interact with Mosquitto using Eclipse Paho. According to the project, the Eclipse Paho project provides open-source, mainly client-side implementations of MQTT and MQTT-SN in a variety of programming languages. MQTT and MQTT for Sensor Networks (MQTT-SN) are lightweight publish/subscribe messaging transports for TCP/IP and connectionless protocols, such as UDP, respectively.
We will be using Paho’s Python Client. The Paho Python Client provides a client class with support for both MQTT v3.1 and v3.1.1 on Python 2.7 or 3.x. The client also provides helper functions to make publishing messages to an MQTT server straightforward.
Time-Series Database
Time-series databases are optimal for storing IoT data. According to InfluxData, makers of a leading time-series database, InfluxDB, a time-series database (TSDB), is a database optimized for time-stamped or time-series data. Time series data are simply measurements or events that are tracked, monitored, downsampled, and aggregated over time. Jiao Xian, of Alibaba Cloud, has authored an insightful post on the time-series database ecosystem, What Are Time Series Databases? A few leading Cloud providers offer purpose-built time-series databases, though they are not available at the edge. AWS offers Amazon Timestream and Alibaba Cloud offers Time Series Database.
InfluxDB is an excellent choice for a time-series database. It was my first choice, along with TimescaleDB, when developing this stack. However, InfluxDB Flux’s apparent incompatibilities with some ARM-based architectures ruled it out for inclusion in the stack for this particular post.
We will use TimescaleDB as our time-series database. TimescaleDB is the leading open-source relational database for time-series data. Described as ‘PostgreSQL for time-series,’ TimescaleDB is based on PostgreSQL, which provides full ANSI SQL, rock-solid reliability, and a massive ecosystem. TimescaleDB claims to achieve 10–100x faster queries than PostgreSQL, InfluxDB, and MongoDB, with native optimizations for time-series analytics.
TimescaleDB claims to achieve 10–100x faster queries than PostgreSQL, InfluxDB, and MongoDB, with native optimizations for time-series analytics.
TimescaleDB is designed for performing analytical queries, both through its native support for PostgreSQL’s full range of SQL functionality, as well as additional functions native to TimescaleDB. These time-series optimized functions include Median/Percentile, Cumulative Sum, Moving Average, Increase, Rate, Delta, Time Bucket, Histogram, and Gap Filling.
Ad-hoc Data Query Engine
We have the option of using psql
, the terminal-based front-end to PostgreSQL, to execute ad-hoc queries against TimescaleDB. The psql
front-end enables you to enter queries interactively, issue them to PostgreSQL, and see the query results.

We also have the option of using pgAdmin, specifically the biarms/pgadmin4 Docker version, to execute ad-hoc queries and perform most other database tasks. pgAdmin is the most popular open-source administration and development platform for PostgreSQL. While several popular Docker versions of pgAdmin only support Linux AMD64 architectures, the biarms/pgadmin4 Docker version supports ARM-based devices.


Data Visualization
For data visualization, we will use Grafana. Grafana allows you to query, visualize, alert on, and understand metrics no matter where they are stored. With Grafana, you can create, explore, and share dashboards, fostering a data-driven culture. Grafana allows you to define thresholds visually and get notified via Slack, PagerDuty, and more. Grafana supports dozens of data sources, including MySQL, PostgreSQL, Elasticsearch, InfluxDB, TimescaleDB, Graphite, Prometheus, Google BigQuery, GraphQL, and Oracle. Grafana is extensible through a large collection of plugins.

Edge Deployment and Management Platform
Docker introduced the current industry standard for containers in 2013. Docker containers are a standardized unit of software that allows developers to isolate apps from their environment. We will use Docker to deploy the IoT edge analytics stack, referred to herein as the GTM Stack, composed of containerized versions of Eclipse Mosquitto, TimescaleDB, and Grafana, and pgAdmin, to an ARM-based edge node. The acronym, GTM, comes from the three primary OSS projects composing the stack. The acronym also suggests Greenwich Mean Time, relating to the precise time-series nature of IoT data.

Running Docker Engine in swarm mode, we can use Docker to deploy the complete IoT edge analytics stack to the swarm, running on the edge node. The deploy command accepts a stack description in the form of a Docker Compose file, a YAML file used to configure the application’s services. With a single command, we can create and start all the services from the configuration file.
Source Code
All source code for this post is available on GitHub. Use the following command to git clone
a local copy of the project.
IoT Devices
For this post, I have deployed three Linux ARM-based IoT devices, each connected to a sensor array. Each sensor array contains multiple analog and digital sensors. The sensors record temperature, humidity, air quality (liquefied petroleum gas (LPG), carbon monoxide (CO), and smoke), light, and motion. For more information on the IoT device and sensor hardware involved, please see my previous post: Getting Started with IoT Analytics on AWS.
Each ARM-based IoT device is running a small Python3-based script, sensor_data_to_mosquitto.py, shown below.
The IoT devices’ script implements the Eclipse Paho MQTT Python client library. An MQTT message containing simultaneous readings from each sensor is sent to a Mosquitto topic on the edge node, at a configurable frequency.
IoT Edge Node
For this post, I have deployed a single Linux ARM-based edge node. The three IoT devices, containing sensor arrays, communicate with the edge node over Wi-Fi. The IoT devices could easily use an alternative communication protocol, such as BLE, LoRaWAN, or Ethernet. For more information on BLE and LoRaWAN, please see some of my previous posts: LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT and BLE and GATT for IoT: Getting Started with Bluetooth Low Energy (BLE) and Generic Attribute Profile (GATT) Specification for IoT.
The edge node is also running a small Python3-based script, mosquitto_to_timescaledb.py, shown below.
Similar to the IoT devices, the edge node’s script implements the Eclipse Paho MQTT Python client library. The script pulls MQTT messages off a Mosquitto topic(s), serializes the message payload to JSON, and writes the payload’s data to the TimescaleDB database. The edge node’s script accepts several arguments, which allow you to configure necessary Mosquitto and TimescaleDB variables.
Why not use Telegraf?
Telegraf is a plugin-driven agent that collects, processes, aggregates, and writes metrics. There is a Telegraf output plugin, the PostgreSQL and TimescaleDB Output Plugin for Telegraf, produced by TimescaleDB. The plugin could replace the need to manage and maintain the above script. However, I chose not to use it because it is not yet an official Telegraf plugin. If the plugin was included in a Telegraf release, I would certainly encourage its use.
Script Management
Both the Linux-based IoT devices and the edge node run systemd
system and service manager. To ensure the Python scripts keep running in the case of a system restart, we define a systemd
unit. Units are the objects that systemd
knows how to manage. These are basically a standardized representation of system resources that can be managed by the suite of daemons and manipulated by the provided utilities. Each script has a systemd
unit files. Below, we see the gtm_stack_mosquitto
unit file, gtm_stack_mosquitto.service.
The gtm_stack_mosq_to_tmscl
unit file, gtm_stack_mosq_to_tmscl.service, is nearly identical.
To install the gtm_stack_mosquitto.service
systemd
unit file on each IoT device, use the following commands.
Installing the gtm_stack_mosq_to_tmscl.service
unit file on the edge node is nearly identical.
Docker Stack
The edge node runs the GTM Docker stack, stack.yml, in a swarm. As discussed earlier, the stack contains four containers: Eclipse Mosquitto, TimescaleDB, and Grafana, along with pgAdmin. The Mosquitto, TimescaleDB, and Grafana containers have paths within the containers, bind-mounted to directories on the edge device. With bind-mounting, the container’s data will persist if the containers are removed and re-created. The containers are running on their own isolated overlay network.
The GTM Docker stack is installed using the following commands on the edge node. We will assume Docker and git are pre-installed on the edge node for this post.
First, we create the proper local directories on the edge device, which will be used to bind-mount to the container’s directories. Below, we see the bind-mounted local directories with the eventual container’s contents stored within them.

Next, we copy the custom Mosquitto configuration file, mosquitto.conf, included in the project, to the correct location on the edge device. Lastly, we initialize the Docker swarm and deploy the stack.

docker container ls
command, showing the running GTM Stack containers
docker stats
command, showing the resource consumption of GTM Stack containersTimescaleDB Setup
With the GTM stack running, we need to create a single Timescale hypertable, sensor_data
, in the TimescaleDB demo_iot
database, to hold the incoming IoT sensor data. Hypertables, according to TimescaleDB, are designed to be easy to manage and to behave like standard PostgreSQL tables. Hypertables are comprised of many interlinked “chunk” tables. Commands made to the hypertable automatically propagate changes down to all of the chunks belonging to that hypertable.
I suggest using psql
to execute the required DDL statements, which will create the hypertable, as well as the proceeding views and database user permissions. All SQL statements are included in the project’s statements.sql file. One way to use psql
is to install it on your local workstation, then use psql
to connect to the remote edge node. I prefer to instantiate a local PostgreSQL Docker container instance running psql
. I then use the local container’s psql
client to connect to the edge node’s TimescaleDB database. For example, from my local machine, I run the following docker run
command to connect to the edge node’s TimescaleDB database, on the edge node, located locally at 192.168.1.12
.
Although not always practical, could also access psql
from within the TimescaleDB Docker container, running on the actual edge node, using the following docker exec
command.
TimescaleDB Continuous Aggregates
For this post’s demonstration, we need to create four TimescaleDB database views, which will be queried from an eventual Grafana Dashboard. The views are TimescaleDB Continuous Aggregates. According to Timescale, aggregate queries that touch large swathes of time-series data can take a long time to compute because the system needs to scan large amounts of data on every query execution. TimescaleDB continuous aggregates automatically calculate the results of a query in the background and materialize the results.
For example, in this post, we generate sensor data every five seconds from the three IoT devices. When visualizing a 24-hour period in Grafana, using continuous aggregates with an interval of one minute, we would reduce the total volume of data queried from ~51,840 rows to ~4,320 rows, a reduction of over 91%. The larger the time period or the number of IoT devices being analyzed, the more significant these savings will positively impact query performance.
A time_bucket
on the time partitioning column of the hypertable is required in all continuous aggregate views. The time_bucket
function, in this case, has a bucket width (interval) of 1 minute. The interval is configurable.
Limiting Grafana’s Access to IoT Data
Following the Grafana recommendation for database user permissions, we create a grafanareader
PostgresSQL user, and limit the user’s access to the sensor_data
table and the four views we created. Grafana will use this user’s credentials to perform SELECT
queries of the TimescaleDB demo_iot
database.
Grafana Dashboards
Using the TimescaleDB continuous aggregates we have created, we can quickly build a richly-featured dashboard in Grafana. Below we see a typical IoT Dashboard you might build to monitor the post’s IoT sensor data in near real-time. An exported version, dashboard_external_export.json, is included in the GitHub project.


Grafana’s documentation includes a comprehensive set of instructions for Using PostgreSQL in Grafana. To connect to the TimescaleDB database from Grafana, we use a PostgreSQL data source.

The data displayed in each Panel in the Grafana Dashboard is based on a SQL query. For example, the Average Temperature Panel might use a query similar to the example below. This particular query also converts Celcius to Fahrenheit. Note the use of Grafana Macros (e.g., $__time()
, $__timeFilter()
). Macros can be used within a query to simplify syntax and allow for dynamic parts.
SELECT
$__time(bucket),
device_id AS metric,
((avg_temp * 1.9) + 32) AS avg_temp
FROM temperature_humidity_summary_minute
WHERE
$__timeFilter(bucket)
ORDER BY 1,2
Below, we see another example from the Average Humidity Panel. In this particular query, we might choose to validate the humidity data is within an acceptable range of 0%–100%.
SELECT
$__time(bucket),
device_id AS metric,
avg_humidity
FROM temperature_humidity_summary_minute
WHERE
$__timeFilter(bucket)
AND avg_humidity >= 0.0
AND avg_humidity <= 100.0
ORDER BY 1,2
Mobile Friendly
Grafana dashboards are mobile-friendly. Below we see two views of the dashboard, using the Chrome mobile browser on an Apple iPhone.

Grafana Alerts
Grafana allows Alerts to be created based on Rules you define in each Panel. If data values match the Rule’s conditions, which you pre-define, such as a temperature reading above a certain threshold for a set amount of time, an alert is sent to your choice of destinations. According to the Rule shown below, If the average temperature exceeds 75°F for a period of 5 minutes, an alert is sent.

As demonstrated below, when the temperature in the laboratory began to exceed 75°F, the alert entered a ‘Pending’ state. If the temperature exceeded 75°F for the pre-determined period of 5 minutes, the alert status changed to ‘Alerting’, and an alert was sent. When the temperature dropped back below 75°F for the pre-determined period of 5 minutes, the alert status changed from ‘Alerting’ to ‘OK’, and a subsequent notification was sent.

There are currently 18 destinations available out-of-the-box with Grafana, including Slack, email, PagerDuty, webhooks, HipChat, and Microsoft Teams. We can use Grafana Alerts to notify the proper resources, in near real-time, if an issue is detected, based on the data. Below, we see an actual series of high-temperature alerts sent by Grafana to a Slack channel, followed by subsequent notifications as the temperature returned to normal.

Ad-hoc Queries
The ability to perform ad-hoc queries on the time-series IoT data is an essential feature of the IoT edge analytics stack. We can use psql
or pgAdmin to perform ad-hoc queries against the TimescaleDB database. Below are examples of typical ad-hoc queries we might perform on the IoT sensor data. These example queries demonstrate TimescaleDB’s advanced analytical capabilities for working with time-series data, including Moving Average, Delta, Time Bucket, and Histogram.
Conclusion
In this post, we have explored the development of an IoT edge analytics stack, comprised of lightweight, purpose-built, easily deployable and manageable, platform- and programming language-agnostic, open-source software components. These components included Docker containerized versions of Eclipse Mosquitto, TimescaleDB, Grafana, and pgAdmin, referred to as the GTM Stack. Using the GTM stack, we collected, analyzed, and visualized IoT data, without first shipping the data to Cloud or other external systems.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Java Development with Microsoft SQL Server: Calling Microsoft SQL Server Stored Procedures from Java Applications Using JDBC
Posted by Gary A. Stafford in AWS, Java Development, Software Development, SQL, SQL Server Development on September 9, 2020
Introduction
Enterprise software solutions often combine multiple technology platforms. Accessing an Oracle database via a Microsoft .NET application and vice-versa, accessing Microsoft SQL Server from a Java-based application is common. In this post, we will explore the use of the JDBC (Java Database Connectivity) API to call stored procedures from a Microsoft SQL Server 2017 database and return data to a Java 11-based console application.

The objectives of this post include:
- Demonstrate the differences between using static SQL statements and stored procedures to return data.
- Demonstrate three types of JDBC statements to return data:
Statement
,PreparedStatement
, andCallableStatement
. - Demonstrate how to call stored procedures with input and output parameters.
- Demonstrate how to return single values and a result set from a database using stored procedures.
Why Stored Procedures?
To access data, many enterprise software organizations require their developers to call stored procedures within their code as opposed to executing static T-SQL (Transact-SQL) statements against the database. There are several reasons stored procedures are preferred:
- Optimization: Stored procedures are often written by DBAs or database developers who specialize in database development. They understand the best way to construct queries for optimal performance and minimal load on the database server. Think of it as a developer using an API to interact with the database.
- Safety and Security: Stored procedures are considered safer and more secure than static SQL statements. The stored procedure provides tight control over the content of the queries, preventing malicious or unintentionally destructive code from being executed against the database.
- Error Handling: Stored procedures can contain logic for handling errors before they bubble up to the application layer and possibly to the end-user.
AdventureWorks 2017 Database
For brevity, I will use an existing and well-known Microsoft SQL Server database, AdventureWorks. The AdventureWorks database was originally published by Microsoft for SQL Server 2008. Although a bit dated architecturally, the database comes prepopulated with plenty of data for demonstration purposes.

HumanResources
schema, one of five schemas within the AdventureWorks databaseFor the demonstration, I have created an Amazon RDS for SQL Server 2017 Express Edition instance on AWS. You have several options for deploying SQL Server, including AWS, Microsoft Azure, Google Cloud, or installed on your local workstation.
There are many methods to deploy the AdventureWorks database to Microsoft SQL Server. For this post’s demonstration, I used the AdventureWorks2017.bak
backup file, which I copied to Amazon S3. Then, I enabled and configured the native backup and restore feature of Amazon RDS for SQL Server to import and install the backup.
DROP DATABASE IF EXISTS AdventureWorks;
GO
EXECUTE msdb.dbo.rds_restore_database
@restore_db_name='AdventureWorks',
@s3_arn_to_restore_from='arn:aws:s3:::my-bucket/AdventureWorks2017.bak',
@type='FULL',
@with_norecovery=0;
-- get task_id from output (e.g. 1)
EXECUTE msdb.dbo.rds_task_status
@db_name='AdventureWorks',
@task_id=1;
Install Stored Procedures
For the demonstration, I have added four stored procedures to the AdventureWorks database to use in this post. To follow along, you will need to install these stored procedures, which are included in the GitHub project.

Data Sources, Connections, and Properties
Using the latest Microsoft JDBC Driver 8.4 for SQL Server (ver. 8.4.1.jre11), we create a SQL Server data source, com.microsoft.sqlserver.jdbc.SQLServerDataSource
, and database connection, java.sql.Connection
. There are several patterns for creating and working with JDBC data sources and connections. This post does not necessarily focus on the best practices for creating or using either. In this example, the application instantiates a connection class, SqlConnection.java
, which in turn instantiates the java.sql.Connection
and com.microsoft.sqlserver.jdbc.SQLServerDataSource
objects. The data source’s properties are supplied from an instance of a singleton class, ProjectProperties.java
. This properties class instantiates the java.util.Properties
class, which reads values from a configuration properties file, config.properties
. On startup, the application creates the database connection, calls each of the example methods, and then closes the connection.
Examples
For each example, I will show the stored procedure, if applicable, followed by the Java method that calls the procedure or executes the static SQL statement. I have left out the data source and connection code in the article. Again, a complete copy of all the code for this article is available on GitHub, including Java source code, SQL statements, helper SQL scripts, and a set of basic JUnit tests.
To run the JUnit unit tests, using Gradle, which the project is based on, use the ./gradlew cleanTest test --warning-mode none
command.

To build and run the application, using Gradle, which the project is based on, use the ./gradlew run --warning-mode none
command.

Example 1: SQL Statement
Before jumping into stored procedures, we will start with a simple static SQL statement. This example’s method, getAverageProductWeightST
, uses the java.sql.Statement
class. According to Oracle’s JDBC documentation, the Statement
object is used for executing a static SQL statement and returning the results it produces. This SQL statement calculates the average weight of all products in the AdventureWorks database. It returns a solitary double
numeric value. This example demonstrates one of the simplest methods for returning data from SQL Server.
/**
* Statement example, no parameters, returns Integer
*
* @return Average weight of all products
*/
public double getAverageProductWeightST() {
double averageWeight = 0;
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.getConnection().createStatement();
String sql = "WITH Weights_CTE(AverageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2)" +
"FROM [Weights_CTE];";
rs = stmt.executeQuery(sql);
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}
Example 2: Prepared Statement
Next, we will execute almost the same static SQL statement as in Example 1. The only change is the addition of the column name, averageWeight
. This allows us to parse the results by column name, making the code easier to understand as opposed to using the numeric index of the column as in Example 1.
Also, instead of using the java.sql.Statement
class, we use the java.sql.PreparedStatement
class. According to Oracle’s documentation, a SQL statement is precompiled and stored in a PreparedStatement
object. This object can then be used to execute this statement multiple times efficiently.
/**
* PreparedStatement example, no parameters, returns Integer
*
* @return Average weight of all products
*/
public double getAverageProductWeightPS() {
double averageWeight = 0;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
String sql = "WITH Weights_CTE(averageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2) AS [averageWeight]" +
"FROM [Weights_CTE];";
pstmt = connection.getConnection().prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
averageWeight = rs.getDouble("averageWeight");
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}
Example 3: Callable Statement
In this example, the average product weight query has been moved into a stored procedure. The procedure is identical in functionality to the static statement in the first two examples. To call the stored procedure, we use the java.sql.CallableStatement
class. According to Oracle’s documentation, the CallableStatement
extends PreparedStatement
. It is the interface used to execute SQL stored procedures. The CallableStatement
accepts both input and output parameters; however, this simple example does not use either. Like the previous two examples, the procedure returns a double
numeric value.
CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeight]
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO
The calling Java method is shown below.
/**
* CallableStatement, no parameters, returns Integer
*
* @return Average weight of all products
*/
public double getAverageProductWeightCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
ResultSet rs = null;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeight]}");
cstmt.execute();
rs = cstmt.getResultSet();
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}
Example 4: Calling a Stored Procedure with an Output Parameter
In this example, we use almost the same stored procedure as in Example 3. The only difference is the inclusion of an output parameter. This time, instead of returning a result set with a value in a single unnamed column, the column has a name, averageWeight
. We can now call that column by name when retrieving the value.
The stored procedure patterns found in Examples 3 and 4 are both commonly used. One procedure uses an output parameter, and one not, both return the same value(s). You can use the CallableStatement
to for either type.
CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeightOUT]@averageWeight DECIMAL(8, 2) OUT
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT @averageWeight = ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO
The calling Java method is shown below.
/**
* CallableStatement example, (1) output parameter, returns Integer
*
* @return Average weight of all products
*/
public double getAverageProductWeightOutCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeightOUT](?)}");
cstmt.registerOutParameter("averageWeight", Types.DECIMAL);
cstmt.execute();
averageWeight = cstmt.getDouble("averageWeight");
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}
Example 5: Calling a Stored Procedure with an Input Parameter
In this example, the procedure returns a result set, java.sql.ResultSet
, of employees whose last name starts with a particular sequence of characters (e.g., ‘M’ or ‘Sa’). The sequence of characters is passed as an input parameter, lastNameStartsWith
, to the stored procedure using the CallableStatement
.
The method making the call iterates through the rows of the result set returned by the stored procedure, concatenating multiple columns to form the employee’s full name as a string. Each full name string is then added to an ordered collection of strings, a List<String>
object. The List instance is returned by the method. You will notice this procedure takes a little longer to run because of the use of the LIKE
operator. The database server has to perform pattern matching on each last name value in the table to determine the result set.
CREATE OR
ALTER PROCEDURE [HumanResources].[uspGetEmployeesByLastName]
@lastNameStartsWith VARCHAR(20) = 'A'
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[FirstName], p.[MiddleName], p.[LastName], p.[Suffix], e.[JobTitle], m.[EmailAddress]
FROM [HumanResources].[Employee] AS e
LEFT JOIN [Person].[Person] p ON e.[BusinessEntityID] = p.[BusinessEntityID]
LEFT JOIN [Person].[EmailAddress] m ON e.[BusinessEntityID] = m.[BusinessEntityID]
WHERE e.[CurrentFlag] = 1
AND p.[PersonType] = 'EM'
AND p.[LastName] LIKE @lastNameStartsWith + '%'
ORDER BY p.[LastName], p.[FirstName], p.[MiddleName]
END
GO
The calling Java method is shown below.
/**
* CallableStatement example, (1) input parameter, returns ResultSet
*
* @param lastNameStartsWith
* @return List of employee names
*/
public List<String> getEmployeesByLastNameCS(String lastNameStartsWith) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<String> employeeFullName = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [HumanResources].[uspGetEmployeesByLastName](?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("lastNameStartsWith", lastNameStartsWith);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
employeeFullName.add(
rs.getString("LastName") + ", "
+ rs.getString("FirstName") + " "
+ rs.getString("MiddleName"));
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return employeeFullName;
}
Example 6: Converting a Result Set to Ordered Collection of Objects
In this last example, we pass two input parameters, productColor
and productSize
, to a slightly more complex stored procedure. The stored procedure returns a result set containing several columns of product information. This time, the example’s method iterates through the result set returned by the procedure and constructs an ordered collection of products, List<Product>
object. The Product objects in the list are instances of the Product.java
POJO class. The method converts each results set’s row’s field value into a Product
property (e.g., Product.Size
, Product.Model
). Using a collection is a common method for persisting data from a result set in an application.
CREATE OR
ALTER PROCEDURE [Production].[uspGetProductsByColorAndSize]
@productColor VARCHAR(20),
@productSize INTEGER
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[ProductNumber], m.[Name] AS [Model], p.[Name] AS [Product], p.[Color], p.[Size]
FROM [Production].[ProductModel] AS m
INNER JOIN
[Production].[Product] AS p ON m.[ProductModelID] = p.[ProductModelID]
WHERE (p.[Color] = @productColor)
AND (p.[Size] = @productSize)
ORDER BY p.[ProductNumber], [Model], [Product]
END
GO
The calling Java method is shown below.
/**
* CallableStatement example, (2) input parameters, returns ResultSet
*
* @param color
* @param size
* @return List of Product objects
*/
public List<Product> getProductsByColorAndSizeCS(String color, String size) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<Product> productList = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetProductsByColorAndSize](?, ?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("productColor", color);
cstmt.setString("productSize", size);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
Product product = new Product(
rs.getString("Product"),
rs.getString("ProductNumber"),
rs.getString("Color"),
rs.getString("Size"),
rs.getString("Model"));
productList.add(product);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return productList;
}
Proper T-SQL: Schema Reference and Brackets
You will notice in all T-SQL statements, I refer to the schema as well as the table or stored procedure name (e.g., {call [Production].[uspGetAverageProductWeightOUT](?)}
). According to Microsoft, it is always good practice to refer to database objects by a schema name and the object name, separated by a period; that even includes the default schema (e.g., dbo
).
You will also notice I wrap the schema and object names in square brackets (e.g., SELECT [ProductNumber] FROM [Production].[ProductModel]
). The square brackets are to indicate that the name represents an object and not a reserved word (e.g, CURRENT
or NATIONAL
). By default, SQL Server adds these to make sure the scripts it generates run correctly.
Running the Examples
The application will display the name of the method being called, a description, the duration of time it took to retrieve the data, and the results returned by the method.
Below, we see the results.

SQL Statement Performance
This post is certainly not about SQL performance, demonstrated by the fact I am only using Amazon RDS for SQL Server 2017 Express Edition on a single, very underpowered db.t2.micro Amazon RDS instance types. However, I have added a timer feature, ProcessTimer.java
class, to capture the duration of time each example takes to return data, measured in milliseconds. The ProcessTimer.java
class is part of the project code. Using the timer, you should observe significant differences between the first run and proceeding runs of the application for several of the called methods. The time difference is a result of several factors, primarily pre-compilation of the SQL statements and SQL Server plan caching.
The effects of these two factors are easily demonstrated by clearing the SQL Server plan cache (see SQL script below) using DBCC (Database Console Commands) statements. and then running the application twice in a row. The second time, pre-compilation and plan caching should result in significantly faster times for the prepared statements and callable statements, in Examples 2–6. In the two random runs shown below, we see up to a 497% improvement in query time.
USE AdventureWorks;
DBCC FREESYSTEMCACHE('SQL Plans');
GO
CHECKPOINT;
GO
-- Impossible to run with Amazon RDS for Microsoft SQL Server on AWS
-- DBCC DROPCLEANBUFFERS;
-- GO
The first run results are shown below.
SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 122
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 146
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 72
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 623
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 830
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 427
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---
The second run results are shown below.
SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 116
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 89
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 80
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 340
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 139
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 208
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---
Conclusion
This post has demonstrated several methods for querying and calling stored procedures from a SQL Server 2017 database using JDBC with the Microsoft JDBC Driver 8.4 for SQL Server. Although the examples are quite simple, the same patterns can be used with more complex stored procedures, with multiple input and output parameters, which not only select, but insert, update, and delete data.
There are some limitations of the Microsoft JDBC Driver for SQL Server you should be aware of by reading the documentation. However, for most tasks that require database interaction, the Driver provides adequate functionality with SQL Server.
This blog represents my own viewpoints and not of my employer, Amazon Web Services.
Getting Started with Presto Federated Queries using Ahana’s PrestoDB Sandbox on AWS
Posted by Gary A. Stafford in AWS, Big Data, Software Development, SQL on September 3, 2020
Introduction
According to The Presto Foundation, Presto (aka PrestoDB), not to be confused with PrestoSQL, is an open-source, distributed, ANSI SQL compliant query engine. Presto is designed to run interactive ad-hoc analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto is used in production at an immense scale by many well-known organizations, including Facebook, Twitter, Uber, Alibaba, Airbnb, Netflix, Pinterest, Atlassian, Nasdaq, and more.
In the following post, we will gain a better understanding of Presto’s ability to execute federated queries, which join multiple disparate data sources without having to move the data. Additionally, we will explore Apache Hive, the Hive Metastore, Hive partitioned tables, and the Apache Parquet file format.
Presto on AWS
There are several options for Presto on AWS. AWS recommends Amazon EMR and Amazon Athena. Presto comes pre-installed on EMR 5.0.0 and later. The Athena query engine is a derivation of Presto 0.172 and does not support all of Presto’s native features. However, Athena has many comparable features and deep integrations with other AWS services. If you need full, fine-grain control, you could deploy and manage Presto, yourself, on Amazon EC2, Amazon ECS, or Amazon EKS. Lastly, you may decide to purchase a Presto distribution with commercial support from an AWS Partner, such as Ahana or Starburst. If your organization needs 24x7x365 production-grade support from experienced Presto engineers, this is an excellent choice.
Federated Queries
In a modern Enterprise, it is rare to find all data living in a monolithic datastore. Given the multitude of available data sources, internal and external to an organization, and the growing number of purpose-built databases, analytics engines must be able to join and aggregate data across many sources efficiently. AWS defines a federated query as a capability that ‘enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources.’
Presto allows querying data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, and MongoDB. In fact, there are currently 24 different Presto data source connectors available. With Presto, we can write queries that join multiple disparate data sources, without moving the data. Below is a simple example of a Presto federated query statement that correlates a customer’s credit rating with their age and gender. The query federates two different data sources, a PostgreSQL database table, postgresql.public.customer
, and an Apache Hive Metastore table, hive.default.customer_demographics
, whose underlying data resides in Amazon S3.
Ahana
The Linux Foundation’s Presto Foundation member, Ahana, was founded as the first company focused on bringing PrestoDB-based ad hoc analytics offerings to market and working to foster growth and evangelize the Presto community. Ahana’s mission is to simplify ad hoc analytics for organizations of all shapes and sizes. Ahana has been successful in raising seed funding, led by GV (formerly Google Ventures). Ahana’s founders have a wealth of previous experience in tech companies, including Alluxio, Kinetica, Couchbase, IBM, Apple, Splunk, and Teradata.
PrestoDB Sandbox
This post will use Ahana’s PrestoDB Sandbox, an Amazon Linux 2, AMI-based solution available on AWS Marketplace, to execute Presto federated queries.

Ahana’s PrestoDB Sandbox AMI allows you to easily get started with Presto to query data wherever your data resides. This AMI configures a single EC2 instance Sandbox to be both the Presto Coordinator and a Presto Worker. It comes with an Apache Hive Metastore backed by PostgreSQL bundled in. In addition, the following catalogs are bundled in to try, test, and prototype with Presto:
- JMX: useful for monitoring and debugging Presto
- Memory: stores data and metadata in RAM, which is discarded when Presto restarts
- TPC-DS: provides a set of schemas to support the TPC Benchmark DS
- TPC-H: provides a set of schemas to support the TPC Benchmark H
Apache Hive
In this demonstration, we will use Apache Hive and an Apache Hive Metastore backed by PostgreSQL. Apache Hive is data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. The structure can be projected onto data already in storage. A command-line tool and JDBC driver are provided to connect users to Hive. The Metastore provides two essential features of a data warehouse: data abstraction and data discovery. Hive accomplishes both features by providing a metadata repository that is tightly integrated with the Hive query processing system so that data and metadata are in sync.
Getting Started
To get started creating federated queries with Presto, we first need to create and configure our AWS environment, as shown below.

Subscribe to Ahana’s PrestoDB Sandbox
To start, subscribe to Ahana’s PrestoDB Sandbox on AWS Marketplace. Make sure you are aware of the costs involved. The AWS current pricing for the default, Linux-based r5.xlarge on-demand EC2 instance hosted in US East (N. Virginia) is USD 0.252 per hour. For the demonstration, since performance is not an issue, you could try a smaller EC2 instance, such as r5.large instance costs USD 0.126 per hour.

The configuration process will lead you through the creation of an EC2 instance based on Ahana’s PrestoDB Sandbox AMI.

I chose to create the EC2 instance in my default VPC. Part of the demonstration includes connecting to Presto locally using JDBC. Therefore, it was also necessary to include a public IP address for the EC2 instance. If you chose to do so, I strongly recommend limiting the required ports 22
and 8080
in the instance’s Security Group to just your IP address (a /32
CIDR block).

Lastly, we need to assign an IAM Role to the EC2 instance, which has access to Amazon S3. I assigned the AWS managed policy, AmazonS3FullAccess
, to the EC2’s IAM Role.

AWS managed policy to the Role
Part of the configuration also asks for a key pair. You can use an existing key or create a new key for the demo. For reference in future commands, I am using a key named ahana-presto
and my key path of ~/.ssh/ahana-presto.pem
. Be sure to update the commands to match your own key’s name and location.

Once complete, instructions for using the PrestoDB Sandbox EC2 are provided.



You can view the running EC2 instance, containing Presto, from the web-based AWS EC2 Management Console. Make sure to note the public IPv4 address or the public IPv4 DNS address as this value will be required during the demo.

AWS CloudFormation
We will use Amazon RDS for PostgreSQL and Amazon S3 as additional data sources for Presto. Included in the project files on GitHub is an AWS CloudFormation template, cloudformation/presto_ahana_demo.yaml
. The template creates a single RDS for PostgreSQL instance in the default VPC and an encrypted Amazon S3 bucket.
All the source code for this post is on GitHub. Use the following command to git clone
a local copy of the project.
git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/presto-aws-federated-queries.git
To create the AWS CloudFormation stack from the template, cloudformation/rds_s3.yaml
, execute the following aws cloudformation
command. Make sure you change the DBAvailabilityZone
parameter value (shown in bold) to match the AWS Availability Zone in which your Ahana PrestoDB Sandbox EC2 instance was created. In my case, us-east-1f
.
aws cloudformation create-stack \
--stack-name ahana-prestodb-demo \
--template-body file://cloudformation/presto_ahana_demo.yaml \
--parameters ParameterKey=DBAvailabilityZone,ParameterValue=us-east-1f
To ensure the RDS for PostgreSQL database instance can be accessed by Presto running on the Ahana PrestoDB Sandbox EC2, manually add the PrestoDB Sandbox EC2’s Security Group to port 5432
within the database instance’s VPC Security Group’s Inbound rules. I have also added my own IP to port 5432
, which enables me to connect to the RDS instance directly from my IDE using JDBC.

The AWS CloudFormation stack’s Outputs tab includes a set of values, including the JDBC connection string for the new RDS for PostgreSQL instance, JdbcConnString
, and the Amazon S3 bucket’s name, Bucket
. All these values will be required during the demonstration.

Preparing the PrestoDB Sandbox
There are a few steps we need to take to properly prepare the PrestoD