Archive for category Cloud

Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS

Introduction

In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.

Amazon EMR

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.

Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the Amazon MWAA announcement in November 2020, AWS customers can now focus on developing workflow automation and leave the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache Airflow’s UI

At the time of this post, Amazon MWAA was running version 1.10.12 of Apache Airflow, released August 25, 2020. Apache recently announced the release of version 2.0.0 on December 17, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow version 1.10.12 documentation.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.

Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the Hooks, Secrets, Sensors, and Operators of Airflow codebase’s contrib section.

Getting Started

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/aws-airflow-demo.git

Preliminary Steps

This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.

Configuring Amazon MWAA

The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Amazon MWAA Environment Creation Process

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12.

Amazon MWAA Environment Creation Process

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-. You must also enable Bucket Versioning on the bucket. Specify a dags folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

Amazon MWAA Environment Creation Process

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

Amazon MWAA Environment Creation Process

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

AWS CloudFormation Create Stack Console

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.

AWS CloudFormation Create Stack Console
Amazon MWAA Environment Creation Process

As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.

You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.

Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small class will be sufficient for this demonstration.

Amazon MWAA Environment Creation Process

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Amazon MWAA Environment Creation Process

Airflow Execution Role

As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole and EMR_EC2_DefaultRole. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole and EMR_EC2_DemoRole. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject permission.

Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:RunJobFlow"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::123412341234:role/EMR_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DefaultRole",
"arn:aws:iam::123412341234:role/EMR_DefaultRole"
]
}
]
}

The Airflow service role, created by MWAA, is shown below with the new policy attached.

Airflow Execution Service Role with the new Policy Attached

Final Architecture

Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Demonstration’s Amazon MWAA and Amazon EMR Architecture

Amazon MWAA Environment

The new MWAA Environment will include a link to the Airflow UI.

Amazon MWAA Environment Console

Airflow UI

Using the supplied link, you should be able to access the Airflow UI using your web browser.

Apache Airflow UI

Our First DAG

The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.utils.dates import days_ago
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': 'emr-6.2.0',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw spark_pi_example.py hosted with ❤ by GitHub

Upload the DAG to the Airflow S3 bucket’s dags directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.

aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/

The DAG, spark_pi_example, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

Apache Airflow UI’s Trigger DAG Page

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Apache Airflow UI’s DAG Graph View

Switching to the EMR Console, you should see the single-node EMR cluster being created.

Amazon EMR Console’s Summary tab

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Amazon EMR Console’s Steps tab

Triggering DAGs Programmatically

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.

Below is an example of triggering the spark_pi_example DAG programmatically using Airflow’s trigger_dag CLI command. You will need to replace the WEB_SERVER_HOSTNAME variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME variable assumes only one MWAA environment is returned by jq.

export WEB_SERVER_HOSTNAME="<your_airflow_web_server.us-east-1.airflow.amazonaws.com>"
export ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
export DAG_NAME=spark_pi_example
aws mwaa create-cli-token –name "${ENVIRONMENT_NAME}" | \
export CLI_TOKEN=$(jq -r .CliToken)
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME}"
view raw trigger_dag.sh hosted with ❤ by GitHub

Analytics Job with Airflow

Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py PySpark application. This job should already exist in the processed data S3 bucket.

The DAG, dags/bakery_sales.py, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators and airflow.contrib.sensors packages for EMR.

Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
SPARK_STEPS = [
{
'Name': 'Bakery Sales',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'–deploy-mode',
'cluster',
'–master',
'yarn',
'–conf',
'spark.yarn.submit.waitAppCompletion=true',
's3a://{{ var.value.work_bucket }}/analyze/bakery_sales_ssm.py'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': '{{ var.value.release_label }}',
'LogUri': 's3n://{{ var.value.logs_bucket }}',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.xlarge',
},
],
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': '{{ var.value.emr_ec2_key_pair }}',
},
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh',
}
},
],
'Configurations': [
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
'VisibleToAllUsers': True,
'JobFlowRole': '{{ var.value.job_flow_role }}',
'ServiceRole': '{{ var.value.service_role }}',
'EbsRootVolumeSize': 32,
'StepConcurrencyLevel': 1,
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Analyze Bakery Sales with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw bakery_sales.py hosted with ❤ by GitHub

Import Variables into Airflow UI

First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json file. You will need to update the values for bootstrap_bucket, emr_ec2_key_pair, logs_bucket, and work_bucket. The three S3 buckets should all exist from the previous post.

{
"bootstrap_bucket": "emr-demo-bootstrap-123412341234-us-east-1",
"emr_ec2_key_pair": "emr-demo-123412341234-us-east-1",
"job_flow_role": "EMR_EC2_DemoRole",
"logs_bucket": "emr-demo-logs-123412341234-us-east-1",
"release_label": "emr-6.2.0",
"service_role": "EMR_DemoRole",
"work_bucket": "emr-demo-work-123412341234-us-east-1",
"ec2_subnet_id": "subnet-012abc456efg78900"
}

Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Apache Airflow UI’s Admin > Variables tab

Upload the DAG, dags/bakery_sales.py, to the Airflow S3 bucket, similar to the first DAG.

aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/

The second DAG, bakery_sales, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json.

{
    "airflow_email": "analytics_team@example.com",
    "email_on_failure": false,
    "email_on_retry": false
}

This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Apache Airflow UI’s Trigger DAG Screen

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Amazon EMR Console’s Steps tab

Multi-Step DAG

In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator steps parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator job_flow_overrides parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.

We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.

Below we see a snippet of two of the four Spark submit-job job definitions (steps), which have been moved to a separate JSON file, emr_steps/emr_steps.json.

[
{
"Name": "Movie Ratings",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/movies_avg_ratings_ssm.py",
"–start-date",
"2016-01-01 00:00:00",
"–end-date",
"2016-12-31 23:59:59"
]
}
},
{
"Name": "Stock Volatility",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/stock_volatility_ssm.py",
"–start-date",
"2017-01-01",
"–end-date",
"2018-12-31"
]
}
}
]
view raw emr_steps.json hosted with ❤ by GitHub

Below are the EMR cluster specifications (job_flow_overrides), which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json.

{
"Name": "demo-cluster-airflow",
"ReleaseLabel": "{{ var.value.release_label }}",
"LogUri": "s3n://{{ var.value.logs_bucket }}",
"Applications": [
{
"Name": "Spark"
}
],
"Instances": {
"InstanceFleets": [
{
"Name": "MASTER",
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"Name": "CORE",
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
],
"Ec2SubnetId": "{{ var.value.ec2_subnet_id }}",
"KeepJobFlowAliveWhenNoSteps": false,
"TerminationProtected": false,
"Ec2KeyName": "{{ var.value.emr_ec2_key_pair }}"
},
"BootstrapActions": [
{
"Name": "string",
"ScriptBootstrapAction": {
"Path": "s3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh"
}
}
],
"Configurations": [
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
],
"VisibleToAllUsers": true,
"JobFlowRole": "{{ var.value.job_flow_role }}",
"ServiceRole": "{{ var.value.service_role }}",
"EbsRootVolumeSize": 32,
"StepConcurrencyLevel": 5,
"Tags": [
{
"Key": "Environment",
"Value": "Development"
},
{
"Key": "Name",
"Value": "Airflow EMR Demo Project"
},
{
"Key": "Owner",
"Value": "Data Analytics Team"
}
]
}

Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name), which loads the JSON.

import json
import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
def get_object(key, bucket_name):
"""
Load S3 object as JSON
"""
hook = S3Hook()
content_object = hook.get_key(key=key, bucket_name=bucket_name)
file_content = content_object.get()['Body'].read().decode('utf-8')
return json.loads(file_content)
with DAG(
dag_id=DAG_ID,
description='Run multiple Spark jobs with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval=None,
tags=['emr', 'spark', 'pyspark']
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=get_object('job_flow_overrides/job_flow_overrides.json', work_bucket)
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=get_object('emr_steps/emr_steps.json', work_bucket)
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
cluster_creator >> step_adder >> step_checker
view raw multiple_steps.py hosted with ❤ by GitHub

This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.

aws s3 cp emr_steps/emr_steps.json \
s3://emr-demo-work-123412341234-us-east-1/jobs/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
s3://emr-demo-work-123412341234-us-east-1/jobs/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234-us-east-1/dags/

The second DAG, multiple_steps, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json.

Apache Airflow UI’s DAGs tab

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.

Amazon EMR Console’s Steps tab showing four Steps running in parallel

Triggering DAGs Programmatically

Similar to the previous example, below we can trigger the multiple_steps DAG programmatically using Airflow’s trigger_dag CLI command. Note the addition of the —-conf named argument, which passes the configuration, containing three key/value pairs, to the trigger command as a JSON blob.

export WEB_SERVER_HOSTNAME="<your_airflow_web_server.us-east-1.airflow.amazonaws.com>"
export ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
export DAG_NAME="multiple_steps"
export CONFIG="""'{
\"airflow_email\": \"analytics_team@example.com\",
\"email_on_failure\": true,
\"email_on_retry\": false
}'"""
aws mwaa create-cli-token –name "${ENVIRONMENT_NAME}" | \
export CLI_TOKEN=$(jq -r .CliToken)
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME} –conf ${CONFIG}"

Cleaning Up

Once you are done with the MWAA Environment, be sure to delete it as soon as possible to save additional costs. Also, delete the MWAA-VPC CloudFormation stack. These resources, like the two NAT Gateways, will also continue to generate additional costs.

aws mwaa delete-environment --name <your_mwaa_environment_name>
aws cloudformation delete-stack --stack-name MWAA-VPC

Conclusion

In this second post in the series, we explored using the newly released Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to run PySpark applications on Amazon Elastic MapReduce (Amazon EMR). In future posts, we will explore the use of Juypter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Installing Apache Superset on Amazon EMR: Add data exploration and visualization to your analytics cluster

Introduction

AWS provides nearly twenty-five different open-source data analytics applications that can be automatically installed and configured on Amazon Elastic MapReduce (Amazon EMR). Of all those options, EMR doesn’t offer a general-purpose data exploration and visualization tool. However, with EMR, you can automate the installation of additional software as part of the cluster creation process or post cluster creation. This brief post will explore how to install, configure, and access Apache Superset, the modern data exploration and visualization platform on Amazon EMR’s Master Node, as a post-cluster creation step. You can use these same techniques to install other software packages on EMR as well, manually or as part of an automated Data Pipeline.

Amazon EMR

According to AWS, Amazon EMR is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

AWS currently offers 5.x and 6.x versions of Amazon EMR. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0. Each version of Amazon EMR offers incremental major and minor releases of nearly 25 different, popular open-source big-data software packages to choose from, which Amazon EMR will install and configure when the cluster is created.

Apache Superset

According to its website, Apache Superset is a modern data exploration and visualization platform. Superset is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

Superset natively supports over twenty-five data sources, including Amazon Athena and Redshift, Apache Drill, Druid, Hive, Impala, Kylin, Pinot, and Spark SQL, Elasticsearch, Google BigQuery, Hana, MySQL, Oracle, Postgres, Presto, Snowflake, Microsoft SQL Server, and Teradata.

As shown in their Gallery, Superset includes dozens of visualization types, including Pivot Table, Line Chart, Markup, Pie Chart, Filter Box, Bubble Chart, Box Plot, Histogram, Heatmap, Sunburst, Calendar Heatmap, and several geospatial types.

Apache Superset Visualization Gallery

Setup

Using this git clone command, download a copy of this post’s open-source GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-superset-demo.git

To demonstrate how to install Apache Superset on EMR, I have prepared an AWS CloudFormation template. Deploying the template, cloudformation/superset-emr-demo.yml, to AWS will result in the AWS CloudFormation stack, superset-emr-demo-dev. The stack creates a minimally-sized, two-node EMR cluster, two Amazon S3 buckets, and several AWS Systems Manager (SSM) Parameter Store parameters.

There is also a JSON-format CloudFormation parameters file, cloudformation/superset-emr-demo-params-dev.json. The parameters file contains values for eight of the ten required parameters in the CloudFormation template, all of which you can adjust. For the remaining two required parameters, you will need to supply the name of an existing EC2 key pair to access the EMR Master node. The key pair will need to be deployed to the same AWS Account into which you are deploying EMR. You will also need to supply a Subnet ID for the EMR cluster to be installed into. The subnet must have access to the Internet to install Superset’s required system and Python packages and to access Superset’s web-based user interface. If you need help creating a VPC and subnet to deploy EMR into, refer to my previous blog post, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

The CloudFormation stack is created using a Python script, create_cfn_stack.py. The python script uses the AWS boto3 Python SDK.

#!/usr/bin/env python3
# Purpose: Create EMR bootstrap script bucket and deploy the cfn stack
# Author: Gary A. Stafford (December 2020)
# Reference: https://gist.github.com/svrist/73e2d6175104f7ab4d201280acba049c
# Usage Example: python3 ./create_cfn_stack.py \
# –ec2-key-name emr-demo-123456789012-us-east-1 \
# –ec2-subnet-id subnet-06aa61f790a932b32 \
# –environment dev
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
sts_client = boto3.client('sts')
cfn_client = boto3.client('cloudformation')
region = boto3.DEFAULT_SESSION.region_name
s3_client = boto3.client('s3', region_name=region)
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
def main():
args = parse_args()
# create bootstrap bucket
account_id = sts_client.get_caller_identity()['Account']
bootstrap_bucket = f'superset-emr-demo-bootstrap-{account_id}{region}'
create_bucket(bootstrap_bucket)
# upload bootstrap script
dir_path = os.path.dirname(os.path.realpath(__file__))
upload_file(f'{dir_path}/bootstrap_emr/bootstrap_actions.sh', bootstrap_bucket, 'bootstrap_actions.sh')
# set variables
stack_name = f'emr-superset-demo-{args.environment}'
cfn_template_path = f'{dir_path}/cloudformation/superset-emr-demo.yml'
cfn_params_path = f'{dir_path}/cloudformation/superset-emr-demo-params-{args.environment}.json'
ec2_key_name = args.ec2_key_name
# append new parameters
cfn_params = _parse_parameters(cfn_params_path)
cfn_params.append({'ParameterKey': 'Ec2KeyName', 'ParameterValue': ec2_key_name})
cfn_params.append({'ParameterKey': 'Ec2SubnetId', 'ParameterValue': args.ec2_subnet_id})
cfn_params.append({'ParameterKey': 'BootstrapBucket', 'ParameterValue': bootstrap_bucket})
logging.info(json.dumps(cfn_params, indent=4))
# create the cfn stack
create_stack(stack_name, cfn_template_path, cfn_params)
def create_bucket(bootstrap_bucket):
"""Create an S3 bucket in a specified region
:param bootstrap_bucket: Bucket to create
:return: True if bucket created, else False
"""
try:
s3_client.create_bucket(Bucket=bootstrap_bucket)
logging.info(f'New bucket name: {bootstrap_bucket}')
except ClientError as e:
logging.error(e)
return False
return True
def upload_file(file_name, bootstrap_bucket, object_name):
"""Upload a file to an S3 bucket
:param file_name: File to upload
:param bootstrap_bucket: Bucket to upload to
:param object_name: S3 object name
:return: True if file was uploaded, else False
"""
# Upload the file
try:
response = s3_client.upload_file(file_name, bootstrap_bucket, object_name)
logging.info(f'File {file_name} uploaded to bucket {bootstrap_bucket} as object {object_name}')
except ClientError as e:
logging.error(e)
return False
return True
def create_stack(stack_name, cfn_template, cfn_params):
"""Create EMR Cluster CloudFormation stack"""
template_data = _parse_template(cfn_template)
create_stack_params = {
'StackName': stack_name,
'TemplateBody': template_data,
'Parameters': cfn_params,
'TimeoutInMinutes': 60,
'Capabilities': [
'CAPABILITY_NAMED_IAM',
],
'Tags': [
{
'Key': 'Project',
'Value': 'Superset EMR Demo'
},
]
}
try:
response = cfn_client.create_stack(**create_stack_params)
logging.info(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def _parse_template(template):
with open(template) as template_file_obj:
template_data = template_file_obj.read()
cfn_client.validate_template(TemplateBody=template_data)
return template_data
def _parse_parameters(parameters):
with open(parameters) as parameter_file_obj:
parameter_data = json.load(parameter_file_obj)
return parameter_data
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–environment', required=True, choices=['dev', 'test', 'prod'], help='Environment')
parser.add_argument('-k', '–ec2-key-name', required=True, help='Ec2KeyName: Name of EC2 Keypair')
parser.add_argument('-s', '–ec2-subnet-id', required=True, help='Ec2SubnetId: Name of EC2 Keypair')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw create_cfn_stack.py hosted with ❤ by GitHub

To execute the Python script and create the CloudFormation stack, which will create the EMR cluster, run the following command. Remember to update the parameters to the name of your EC2 key pair and the Subnet ID for the EMR cluster.

python3 ./create_cfn_stack.py \
--ec2-key-name <your_key_pair_name> \
--ec2-subnet-id <your_subnet_id> \
--environment dev

Here is what the complete CloudFormation workflow looks like.

AWS CloudFormation stack creation

Security Group Ingress Rules

To install Superset on the EMR cluster’s Master node via SSH, you need to open port 22 on the Security Group associated with the EMR cluster’s Master Node, allowing access from your IP address. You can use the AWS Management Console or AWS CLI to open port 22. We will use jq and AWS ec2 API from the AWS CLI to get the Security Group ID associated with the EMR cluster’s Master Node and create the two ingress rules.

export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r ".SecurityGroups[] | \
select(.GroupName==\"ElasticMapReduce-master\").GroupId" | \
head -n 1)
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32

Superset Script

Once the CloudFormation stack is created and the ports are open, we can install Apache Superset on the EMR Master Node. The bootstrap script,bootstrap_emr/bootstrap_superset.sh, will be used to install Apache Superset onto the EMR cluster’s Master Node as the hadoop user. The script is roughly based on Superset’s Installing from Scratch instructions.

As part of installing Superset, the script will also deploy several common database drivers, including Amazon Athena, Amazon Redshift, Apache Spark SQL, Presto, PostgreSQL, and MySQL. The script will also create a Superset Admin role, and two Superset User roles — Alpha and Gamma.

#!/bin/bash
# Purpose: Installs Apache Superset on EMR
# Author: Gary A. Stafford (December 2020)
# Usage: sh ./bootstrap_superset.sh 8280
# Reference: https://superset.apache.org/docs/installation/installing-superset-from-scratch
# port for superset (default: 8280)
export SUPERSET_PORT="${1:-8280}"
# install required packages
sudo yum -y install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel \
openssl-devel cyrus-sasl-devel openldap-devel python3-devel.x86_64
# optionally, update Master Node packages
sudo yum -y update
# install required Python package
python3 -m pip install –user –upgrade setuptools virtualenv
python3 -m venv venv
. venv/bin/activate
python3 -m pip install –upgrade apache-superset \
PyAthenaJDBC PyAthena sqlalchemy-redshift pyhive mysqlclient psycopg2-binary
command -v superset
superset db upgrade
export FLASK_APP=superset
echo "export FLASK_APP=superset" >>~/.bashrc
touch superset_config.py
echo "ENABLE_TIME_ROTATE = True" >>superset_config.py
echo "export SUPERSET_CONFIG_PATH=superset_config.py" >>~/.bashrc
export ADMIN_USERNAME="SupersetAdmin"
export ADMIN_PASSWORD="Admin1234"
# create superset admin
superset fab create-admin \
–username "${ADMIN_USERNAME}" \
–firstname Superset \
–lastname Admin \
–email superset_admin@example.com \
–password "${ADMIN_PASSWORD}"
superset init
# create two sample superset users
superset fab create-user \
–role Alpha \
–username SupersetUserAlpha \
–firstname Superset \
–lastname UserAlpha \
–email superset_user_alpha@example.com \
–password UserAlpha1234
superset fab create-user \
–role Gamma \
–username SupersetUserGamma \
–firstname Superset \
–lastname UserGamma \
–email superset_user_gamma@example.com \
–password UserGamma1234
# get instance id
INSTANCE_ID="$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .instanceId)"
export INSTANCE_ID
echo "INSTANCE_ID: ${INSTANCE_ID}"
# use instance id to get public dns of master node
PUBLIC_MASTER_DNS="$(aws ec2 describe-instances –instance-id ${INSTANCE_ID} |
jq -r '.Reservations[0].Instances[0].PublicDnsName')"
export PUBLIC_MASTER_DNS
echo "PUBLIC_MASTER_DNS: ${PUBLIC_MASTER_DNS}"
# start superset in background
nohup superset run \
–host "${PUBLIC_MASTER_DNS}" \
–port "${SUPERSET_PORT}" \
–with-threads –reload –debugger \
>superset_output.log 2>&1 </dev/null &
# output connection info
printf %s """
**********************************************************************
Superset URL: http://${PUBLIC_MASTER_DNS}:${SUPERSET_PORT}
Admin Username: ${ADMIN_USERNAME}
Admin Password: ${ADMIN_PASSWORD}
**********************************************************************
"""

To install Superset using the bootstrap script, we will use another Python script, install_superset.py. The script uses paramiko, a Python implementation of SSHv2. The script also uses scp, a module that uses a paramiko transport to send and receive files via the scp1 protocol.

#!/bin/bash
# Purpose: Installs Apache Superset on EMR
# Author: Gary A. Stafford (December 2020)
# Usage: sh ./bootstrap_superset.sh 8280
# Reference: https://superset.apache.org/docs/installation/installing-superset-from-scratch
# port for superset (default: 8280)
SUPERSET_PORT="${1:-8280}"
# install required packages
sudo yum -y install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel \
openssl-devel cyrus-sasl-devel openldap-devel python3-devel.x86_64
# optionally, update master node packages
sudo yum -y update
# set up a python virtual environment
python3 -m pip install –user –upgrade setuptools virtualenv
python3 -m venv venv
. venv/bin/activate
python3 -m pip install –upgrade apache-superset \
PyAthenaJDBC PyAthena sqlalchemy-redshift pyhive mysqlclient psycopg2-binary
command -v superset
superset db upgrade
FLASK_APP=superset
export "FLASK_APP=${FLASK_APP}"
echo "export FLASK_APP=superset" >>~/.bashrc
touch superset_config.py
echo "ENABLE_TIME_ROTATE = True" >>superset_config.py
echo "export SUPERSET_CONFIG_PATH=superset_config.py" >>~/.bashrc
ADMIN_USERNAME="SupersetAdmin"
ADMIN_PASSWORD="Admin1234"
# create superset admin
superset fab create-admin \
–username "${ADMIN_USERNAME}" \
–firstname Superset \
–lastname Admin \
–email superset_admin@example.com \
–password "${ADMIN_PASSWORD}"
superset init
# create two sample superset users
superset fab create-user \
–role Alpha \
–username SupersetUserAlpha \
–firstname Superset \
–lastname UserAlpha \
–email superset_user_alpha@example.com \
–password UserAlpha1234
superset fab create-user \
–role Gamma \
–username SupersetUserGamma \
–firstname Superset \
–lastname UserGamma \
–email superset_user_gamma@example.com \
–password UserGamma1234
# get ec2 instance id
INSTANCE_ID="$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .instanceId)"
# use ec2 instance id to get public dns of master node
PUBLIC_MASTER_DNS="$(aws ec2 describe-instances –instance-id ${INSTANCE_ID} |
jq -r '.Reservations[0].Instances[0].PublicDnsName')"
# start superset in background
nohup superset run \
–host "${PUBLIC_MASTER_DNS}" \
–port "${SUPERSET_PORT}" \
–with-threads –reload –debugger \
>superset_output.log 2>&1 </dev/null &
# output superset connection info
printf %s """
**********************************************************************
Superset URL: http://${PUBLIC_MASTER_DNS}:${SUPERSET_PORT}
Admin Username: ${ADMIN_USERNAME}
Admin Password: ${ADMIN_PASSWORD}
**********************************************************************
"""
view raw install_superset.py hosted with ❤ by GitHub

The script requires a single input parameter, ec2-key-path, which is the full path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem). Optionally, you can change the default Superset port of 8280, using the superset-port parameter.

python3 ./install_superset.py \
--ec2-key-path </path/to/my-key-pair.pem> \
--superset-port 8280

The script uses SSH and SCP to deploy and execute the bootstrap script,bootstrap_superset.sh. The output from the script includes the URL of Apache Superset running on the EMR cluster. The output also contains the username and password of the Superset Admin.

********************************************************************
Superset URL: http://ec2-111-22-333-44.compute-1.amazonaws.com:8280
Admin Username: SupersetAdmin
Admin Password: Admin1234
********************************************************************

SSH Tunnel

According to AWS, EMR applications publish user interfaces as websites hosted on the master node. For security reasons, these websites are only available on the master node’s local web server. To reach any of the web interfaces, you must establish an SSH tunnel with the master node using either dynamic or local port forwarding. If you are using dynamic port forwarding, you must also configure a proxy server to view the web interfaces.

Instructions for creating an SSH tunnel to access UI’s on EMR

Running the command in your terminal will start the SSH tunnel on port 8157. Once the tunnel is enabled, you can access Apache Superset in a web browser, using the script output’s URL shown in the script output above. Use the Admin credentials or either of the two User credentials to sign in to Superset.

Apache Superset Sign In screen

Once signed in, you will have the ability to connect to your data sources and explore and visualize data. Below, we see an example of a SQL query executed against an Amazon RDS for PostgreSQL database, running in a separate VPC from EMR.

Sample query of an Amazon RDS PostgreSQL database using Superset’s SQL Editor

Conclusion

In this post, we learned how to install Apache Superset onto the Master Node of an Amazon EMR Cluster. If you want to install an application on all the nodes of an EMR cluster, you can add the commands to the bootstrap script, which runs when CloudFormation creates the cluster.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , ,

Leave a comment

Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce

Introduction

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache SparkHiveHBaseFlinkHudi, and ZeppelinJupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics. 

Image for post
Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

With EMR, individuals and teams can also use notebooks, including EMR Notebooks, based on JupyterLab, the web-based interactive development environment for Jupyter notebooks for ad-hoc data analytics. Apache Zeppelin is also available to collaborate and interactively explore, process, and visualize data. With EMR notebooks and the EMR API, users can programmatically execute a notebook without the need to interact with the EMR console, referred to as headless execution.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. One major difference between EMR versions relevant to this post is EMR 6.x’s support for the latest Hadoop and Spark 3.x frameworks. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

PySpark on EMR

In the following series of posts, we will focus on the options available to interact with Amazon EMR using the Python API for Apache Spark, known as PySpark. We will divide the methods for accessing PySpark on EMR into two categories: PySpark applications and notebooks. We will explore both interactive and automated patterns for running PySpark applications (Python scripts) and PySpark-based notebooks. In this first post, I will cover the first four PySpark Application Methods listed below. In part two, I will cover Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and in part three, the use of notebooks.

PySpark Application Methods

  1. Add Job Flow Steps: Remote execution of EMR Steps on an existing EMR cluster using the add_job_flow_steps method;
  2. EMR Master Node: Remote execution over SSH of PySpark applications using spark-submit on an existing EMR cluster’s Master node;
  3. Run Job Flow: Remote execution of EMR Steps on a newly created long-lived or auto-terminating EMR cluster using the run_job_flow method;
  4. AWS Step Functions: Remote execution of EMR Steps using AWS Step Functions on an existing or newly created long-lived or auto-terminating EMR cluster;
  5. Apache Airflow: Remote execution of EMR Steps using the recently released Amazon MWAA on an existing or newly created long-lived or auto-terminating EMR cluster (see part two of this series);

Notebook Methods

  1. EMR Notebooks for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Jupyter Notebooks on an existing EMR cluster;
  2. Headless Execution of EMR Notebooks: Headless execution of notebooks from an existing EMR cluster or newly created auto-terminating cluster;
  3. Apache Zeppelin for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Zeppelin notebooks on an existing EMR cluster;

Note that wherever the AWS SDK for Python (boto3) is used in this post, we can substitute the AWS CLI or AWS Tools for PowerShell. Typically, these commands and Python scripts would be run as part of a DevOps or DataOps deployment workflow, using CI/CD platforms like AWS CodePipeline, Jenkins, Harness, CircleCI, Travis CI, or Spinnaker.

Preliminary Tasks

To prepare the AWS EMR environment for this post, we need to perform a few preliminary tasks.

  1. Download a copy of this post’s GitHub repository;
  2. Download three Kaggle datasets and organize locally;
  3. Create an Amazon EC2 key pair;
  4. Upload the EMR bootstrap script and create the CloudFormation Stack;
  5. Allow your IP address access to the EMR Master node on port 22;
  6. Upload CSV data files and PySpark applications to S3;
  7. Crawl the raw data and create a Data Catalog using AWS Glue;

Step 1: GitHub Repository

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-demo.git

Step 2: Kaggle Datasets

Kaggle is a well-known data science resource with 50,000 public datasets and 400,000 public notebooks. We will be using three Kaggle datasets in this post. You will need to join Kaggle to access these free datasets. Download the following three Kaggle datasets as CSV files. Since we are working with (moderately) big data, the total size of the datasets will be approximately 1 GB.

  1. Movie Ratings: https://www.kaggle.com/rounakbanik/the-movies-dataset
  2. Bakery: https://www.kaggle.com/sulmansarwar/transactions-from-a-bakery
  3. Stocks: https://www.kaggle.com/timoboz/stock-data-dow-jones

Organize the (38) downloaded CSV files into the raw_data directory of the locally cloned GitHub repository, exactly as shown below. We will upload these files to Amazon S3, in the proceeding step.

 > tree raw_data --si -v -A

 raw_data
├── [ 128]  bakery
│   ├── [711k]  BreadBasket_DMS.csv
├── [ 320]  movie_ratings
│   ├── [190M]  credits.csv
│   ├── [6.2M]  keywords.csv
│   ├── [989k]  links.csv
│   ├── [183k]  links_small.csv
│   ├── [ 34M]  movies_metadata.csv
│   ├── [710M]  ratings.csv
│   └── [2.4M]  ratings_small.csv
└── [1.1k]  stocks
    ├── [151k]  AAPL.csv
    ├── [146k]  AXP.csv
    ├── [150k]  BA.csv
    ├── [147k]  CAT.csv
    ├── [146k]  CSCO.csv
    ├── [149k]  CVX.csv
    ├── [147k]  DIS.csv
    ├── [ 42k]  DWDP.csv
    ├── [150k]  GS.csv
    └── [...]  abrdiged... 

In this post, we will be using three different datasets. However, if you want to limit the potential costs associated with big data analytics on AWS, you can choose to limit job submissions to only one or two of the datasets. For example, the bakery and stocks datasets are fairly small yet effectively demonstrate most EMR features. In contrast, the movie rating dataset has nearly 27 million rows of ratings data, which starts to demonstrate the power of EMR and PySpark for big data.

Step 3: Amazon EC2 key pair

According to AWS, a key pair, consisting of a private key and a public key, is a set of security credentials that you use to prove your identity when connecting to an [EC2] instance. Amazon EC2 stores the public key, and you store the private key. To SSH into the EMR cluster, you will need an Amazon key pair. If you do not have an existing Amazon EC2 key pair, create one now. The easiest way to create a key pair is from the AWS Management Console.

Image for post
Amazon EC2 Key pair Console

Your private key is automatically downloaded when you create a key pair in the console. Store your private key somewhere safe. If you use an SSH client on a macOS or Linux computer to connect to EMR, use the following chmod command to set the correct permissions of your private key file so that only you can read it.

chmod 0400 /path/to/my-key-pair.pem

Step 4: Bootstrap Script and CloudFormation Stack

The bulk of the resources that are used as part of this demonstration are created using the CloudFormation stack, emr-dem-dev. The CloudFormation template that creates the stack, cloudformation/emr-demo.yml, is included in the repository. Please review all resources and understand the cost and security implications before continuing.

There is also a JSON-format CloudFormation parameters file, cloudformation/emr-demo-params-dev.json, containing values for all but two of the parameters in the CloudFormation template. The two parameters not in the parameter file are the name of the EC2 key pair you just created and the bootstrap bucket’s name. Both will be passed along with the CloudFormation template using the Python script, create_cfn_stack.py. For each type of environment, such as Development, Test, and Production, you could have a separate CloudFormation parameters file, with different configurations.

AWS CloudFormation stack creation

The template will create approximately (39) AWS resources, including a new AWS VPC, a public subnet, an internet gateway, route tables, a 3-node EMR v6.2.0 cluster, a series of Amazon S3 buckets, AWS Glue data catalog, AWS Glue crawlers, several Systems Manager Parameter Store parameters, and so forth.

The CloudFormation template includes the location of the EMR bootstrap script located on Amazon S3. Before creating the CloudFormation stack, the Python script creates an S3 bootstrap bucket and copies the bootstrap script, bootstrap_actions.sh, from the local project repository to the S3 bucket. The script will be used to install additional packages on EMR cluster nodes, which are required by our PySpark applications. The script also sets the default AWS Region for boto3.

#!/bin/bash
# Purpose: EMR bootstrap script
# Author: Gary A. Stafford (December 2020)
# update and install some useful yum packages
sudo yum install -y jq
# set region for boto3
aws configure set region \
"$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)"
# install some useful python packages
sudo python3 -m pip install boto3 ec2-metadata awswrangler
view raw bootstrap_actions.sh hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to create the bootstrap bucket, copy the bootstrap script, and provision the CloudFormation stack. You will need to pass the name of your EC2 key pair to the script as a command-line argument.

python3 ./scripts/create_cfn_stack.py \
    --environment dev \
    --ec2-key-name <my-key-pair-name>

The CloudFormation template should create a CloudFormation stack, emr-demo-dev, as shown below.

Image for post
AWS CloudFormation Console’s Stacks tab

Step 5: SSH Access to EMR

For this demonstration, we will need access to the new EMR cluster’s Master EC2 node, using SSH and your key pair, on port 22. The easiest way to add a new inbound rule to the correct AWS Security Group is to use the AWS Management Console. First, find your EC2 Security Group named ElasticMapReduce-master.

Image for post
Amazon EC2 Security Group Console

Then, add a new Inbound rule for SSH (port 22) from your IP address, as shown below.

Image for post
Amazon EC2 Security Group Inbound rules

Alternately, you could use the AWS CLI or AWS SDK to create a new security group ingress rule.

export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r '.SecurityGroups[] | select(.GroupName=="ElasticMapReduce-master").GroupId')
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32

Step 6: Raw Data and PySpark Apps to S3

As part of the emr-demo-dev CloudFormation stack, we now have several new Amazon S3 buckets within our AWS Account. The naming conventions and intended usage of these buckets follow common organizational patterns for data lakes. The data buckets use the common naming convention of rawprocessed, and analyzed data in reference to the data stored within them. We also use a widely used, corresponding naming convention of ‘bronze’, ‘silver’, and ‘gold’ when referring to these data buckets as parameters.

> aws s3api list-buckets | \
    jq -r '.Buckets[] | select(.Name | startswith("emr-demo-")).Name'

emr-demo-raw-123456789012-us-east-1
emr-demo-processed-123456789012-us-east-1
emr-demo-analyzed-123456789012-us-east-1
emr-demo-work-123456789012-us-east-1
emr-demo-logs-123456789012-us-east-1
emr-demo-glue-db-123456789012-us-east-1
emr-demo-bootstrap-123456789012-us-east-1

There is a raw data bucket (aka bronze) that will contain the original CSV files. There is a processed data bucket (aka silver) that will contain data that might have had any number of actions applied: data cleansing, obfuscation, data transformation, file format changes, file compression, and data partitioning. Finally, there is an analyzed data bucket (aka gold) that has the results of the data analysis. We also have a work bucket that holds the PySpark applications, a logs bucket that holds EMR logs, and a glue-db bucket to hold the Glue Data Catalog metadata.

Whenever we submit PySpark jobs to EMR, the PySpark application files and data will always be accessed from Amazon S3. From the GitHub repository’s local copy, run the following command, which will execute a Python script to upload the approximately (38) Kaggle dataset CSV files to the raw S3 data bucket.

python3 ./scripts/upload_csv_files_to_s3.py

Next, run the following command, which will execute a Python script to upload a series of PySpark application files to the work S3 data bucket.

python3 ./scripts/upload_apps_to_s3.py

Step 7: Crawl Raw Data with Glue

The last preliminary step to prepare the EMR demonstration environment is to catalog the raw CSV data into an AWS Glue data catalog database, using one of the two Glue Crawlers we created. The three kaggle dataset’s data will reside in Amazon S3, while their schema and metadata will reside within tables in the Glue data catalog database, emr_demo. When we eventually query the data from our PySpark applications, we will be querying the Glue data catalog’s database tables, which reference the underlying data in S3.

From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the raw data’s schema and metadata information into the Glue data catalog database, emr_demo.

python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-raw

Once the crawler is finished, from the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo, all prefixed with raw_. The tables hold metadata and schema information for the three CSV-format Kaggle datasets loaded into S3.

Image for post
AWS Glue Data Catalog Database Tables Console

Alternately, we can use the glue get-tables AWS CLI command to review the tables.

> aws glue get-tables --database emr_demo | \
    jq -r '.TableList[] | select(.Name | startswith("raw_")).Name'

raw_bakery
raw_credits_csv
raw_keywords_csv
raw_links_csv
raw_links_small_csv
raw_movies_metadata_csv
raw_ratings_csv
raw_ratings_small_csv
raw_stocks

PySpark Applications

Let’s explore four methods to run PySpark applications on EMR.

Image for post
High-level architecture of this post’s data analytics platform

1. Add Job Flow Steps to an Existing EMR Cluster

We will start by looking at running PySpark applications using EMR Steps. According to AWS, we can use Amazon EMR steps to submit work to the Spark framework installed on an EMR cluster. The EMR step for PySpark uses a spark-submit command. According to Spark’s documentation, the spark-submit script, located in Spark’s bin directory, is used to launch applications on a [EMR] cluster. A typical spark-submit command we will be using resembles the following example. This command runs a PySpark application in S3, bakery_sales_ssm.py.

spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://emr-demo-work-123456789012-us-east-1/analyze/bakery_sales_ssm.py

We will target the existing EMR cluster created by CloudFormation earlier to execute our PySpark applications using EMR Steps. We have two sets of PySpark applications. The first set of three PySpark applications will transform the raw CSV-format datasets into Apache Parquet, a more efficient file format for big data analytics. Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data.

There are two versions of each PySpark application. Files with suffix _ssm use the AWS Systems Manager (SSM) Parameter Store service to obtain dynamic parameter values at runtime on EMR. Corresponding non-SSM applications require those same parameter values to be passed on the command line when they are submitted to Spark. Therefore, these PySpark applications are not tightly coupled to boto3 or the SSM Parameter Store. We will use _ssm versions of the scripts in this post’s demonstration.

 > tree pyspark_apps --si -v -A

 pyspark_apps
├── [ 320]  analyze
│   ├── [1.4k]  bakery_sales.py
│   ├── [1.5k]  bakery_sales_ssm.py
│   ├── [2.6k]  movie_choices.py
│   ├── [2.7k]  movie_choices_ssm.py
│   ├── [2.0k]  movies_avg_ratings.py
│   ├── [2.3k]  movies_avg_ratings_ssm.py
│   ├── [2.2k]  stock_volatility.py
│   └── [2.3k]  stock_volatility_ssm.py
└── [ 256]  process
    ├── [1.1k]  bakery_csv_to_parquet.py
    ├── [1.3k]  bakery_csv_to_parquet_ssm.py
    ├── [1.3k]  movies_csv_to_parquet.py
    ├── [1.5k]  movies_csv_to_parquet_ssm.py
    ├── [1.9k]  stocks_csv_to_parquet.py
    └── [2.0k]  stocks_csv_to_parquet_ssm.py 

We will start by executing the three PySpark processing applications. They will convert the CSV data to Parquet. Below, we see an example of one of the PySpark applications we will run, bakery_csv_to_parquet_ssm.py. The PySpark application will convert the Bakery Sales dataset’s CSV file to Parquet and write it to S3.

#!/usr/bin/env python3
# Process raw CSV data and output Parquet
# Author: Gary A. Stafford (November 2020)
import os
import boto3
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client('ssm')
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("bakery-csv-to-parquet") \
.getOrCreate()
convert_to_parquet(spark, "bakery", params)
def convert_to_parquet(spark, file, params):
df_bakery = spark.read \
.format("csv") \
.option("header", "true") \
.option("delimiter", ",") \
.option("inferSchema", "true") \
.load(f"s3a://{params['bronze_bucket']}/bakery/{file}.csv")
write_parquet(df_bakery, params)
def write_parquet(df_bakery, params):
df_bakery.write \
.format("parquet") \
.save(f"s3a://{params['silver_bucket']}/bakery/", mode="overwrite")
def get_parameters():
params = {
'bronze_bucket': ssm_client.get_parameter(Name='/emr_demo/bronze_bucket')['Parameter']['Value'],
'silver_bucket': ssm_client.get_parameter(Name='/emr_demo/silver_bucket')['Parameter']['Value']
}
return params
if __name__ == "__main__":
main()

The three PySpark data processing application’s spark-submit commands are defined in a separate JSON-format file, job_flow_steps_process.json, a snippet of which is shown below. The same goes for the four analytics applications.

[
{
"Name": "Bakery CSV to Parquet",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ work_bucket }}/process/bakery_csv_to_parquet_ssm.py"
]
}
},
{
"Name": "Stocks CSV to Parquet",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ work_bucket }}/process/stocks_csv_to_parquet_ssm.py"
]
}
}
]

Using this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python script, add_job_flow_steps_process.py, shown below. Note line 31, where the Steps are injected into the add_job_flow_steps method’s parameters.

#!/usr/bin/env python3
# Purpose: Submit a variable number of Steps defined in a separate JSON file
# Author: Gary A. Stafford (November 2020)
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
emr_client = boto3.client('emr')
def main():
args = parse_args()
params = get_parameters()
steps = get_steps(params, args.job_type)
add_job_flow_steps(params['cluster_id'], steps)
def add_job_flow_steps(cluster_id, steps):
"""Add Steps to an existing EMR cluster"""
try:
response = emr_client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=steps
)
print(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def get_steps(params, job_type):
"""
Load EMR Steps from a separate JSON-format file and substitutes tags for SSM parameter values
"""
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
file = open(f'{dir_path}/job_flow_steps/job_flow_steps_{job_type}.json', 'r')
steps = json.load(file)
new_steps = []
for step in steps:
step['HadoopJarStep']['Args'] = list(
map(lambda st: str.replace(st, '{{ work_bucket }}', params['work_bucket']), step['HadoopJarStep']['Args']))
new_steps.append(step)
return new_steps
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value'],
'cluster_id': ssm_client.get_parameter(Name='/emr_demo/cluster_id')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-t', '–job-type', required=True, choices=['process', 'analyze'],
help='process or analysis')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()

The Python script used for this task takes advantage of AWS Systems Manager Parameter Store parameters. The parameters were placed in the Parameter Store, within the /emr_demo path, by CloudFormation. We will reference these parameters in several scripts throughout the post.

> aws ssm get-parameters-by-path --path '/emr_demo' | \
      jq -r ".Parameters[] | {Name: .Name, Value: .Value}"

{
"Name": "/emr_demo/bootstrap_bucket",
"Value": "emr-demo-bootstrap-123456789012-us-east-1"
}
{
"Name": "/emr_demo/ec2_key_name",
"Value": "emr-demo-123456789012-us-east-1"
}
{
"Name": "/emr_demo/ec2_subnet_id",
"Value": "subnet-06aa61f790a932b32"
}
{
"Name": "/emr_demo/emr_ec2_role",
"Value": "EMR_EC2_DemoRole"
}
{
"Name": "/emr_demo/emr_role",
"Value": "EMR_DemoRole"
}
{
"Name": "/emr_demo/gold_bucket",
"Value": "emr-demo-analyzed-123456789012-us-east-1"
}
{
"Name": "/emr_demo/silver_bucket",
"Value": "emr-demo-processed-123456789012-us-east-1"
}
{
"Name": "/emr_demo/sm_log_group_arn",
"Value": "arn:aws:logs:us-east-1:123456789012:log-group:EmrDemoStateMachineLogGroup:*"
}
{
"Name": "/emr_demo/sm_role_arn",
"Value": "arn:aws:iam::123456789012:role/State_ExecutionRole"
}
{
"Name": "/emr_demo/work_bucket",
"Value": "emr-demo-work-123456789012-us-east-1"
}
{
"Name": "/emr_demo/bronze_bucket",
"Value": "emr-demo-raw-123456789012-us-east-1"
}
{
"Name": "/emr_demo/cluster_id",
"Value": "j-3J44BFJXNVSCT"
}
{
"Name": "/emr_demo/glue_db_bucket",
"Value": "emr-demo-logs-123456789012-us-east-1"
}
{
"Name": "/emr_demo/logs_bucket",
"Value": "emr-demo-logs-123456789012-us-east-1"
}
{
"Name": "/emr_demo/vpc_id",
"Value": "vpc-01d4151396c119d3a"
}
view raw ssm_parameters.json hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to load the three spark-submit commands from JSON-format file, job_flow_steps_process.json, and run the PySpark processing applications on the existing EMR cluster.

python3 ./scripts/add_job_flow_steps.py --job-type process

While the three Steps are running concurrently, the view from the Amazon EMR Console’s Cluster Steps tab should look similar to the example below.

Image for post
Amazon EMR Console’s Cluster Steps tab

Once the three Steps have been completed, we should note three sub-directories in the processed data bucket containing Parquet-format files.

Image for post
Processed CSV data converted to Parquet and organized by dataset

Of special note is the Stocks dataset, which has been converted to Parquet and partitioned by stock symbol. According to AWS, by partitioning your data, we can restrict the amount of data scanned by each query by specifying filters based on the partition, thus improving performance and reducing cost.

Image for post
Processed stock data converted to Parquet and partitioned by stock symbol

Lastly, the movie ratings dataset has been divided into sub-directories, based on the schema of each table. Each sub-directory contains Parquet files specific to that unique schema.

Image for post
Processed movie ratings data converted to Parquet and organized by schema

Crawl Processed Data with Glue

Similar to the raw data earlier, catalog the newly processed Parquet data into the same AWS Glue data catalog database using one of the two Glue Crawlers we created. Similar to the raw data, earlier, processed data will reside in the Amazon S3 processed data bucket while their schemas and metadata will reside within tables in the Glue data catalog database, emr_demo.

From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the processed data’s schema and metadata information into the Glue data catalog database, emr_demo.

python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-processed

Once the crawler has finished successfully, using the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo, all prefixed with processed_. The tables represent the three kaggle dataset’s contents converted to Parquet and correspond to the equivalent tables with the raw_ prefix.

Image for post
AWS Glue Data Catalog Database Tables Console

Alternately, we can use the glue get-tables AWS CLI command to review the tables.

> aws glue get-tables --database emr_demo | \
    jq -r '.TableList[] | select(.Name | startswith("processed_")).Name'

processed_bakery
processed_credits
processed_keywords
processed_links
processed_links_small
processed_movies_metadata
processed_ratings
processed_ratings_small
processed_stocks

2. Run PySpark Jobs from EMR Master Node

Next, we will explore how to execute PySpark applications remotely on the Master node on the EMR cluster using boto3 and SSH. Although this method may be optimal for certain use cases as opposed to using the EMR SDK, remote SSH execution does not scale as well in my opinion due to a lack of automation, and it exposes some potential security risks.

There are four PySpark applications in the GitHub repository. For this part of the demonstration, we will just submit the bakery_sales_ssm.py application. This application will perform a simple analysis of the bakery sales data. While the other three PySpark applications use AWS Glue, the bakery_sales_ssm.py application reads data directly from the processed data S3 bucket.

The application writes its results into the analyzed data S3 bucket, in both Parquet and CSV formats. The CSV file is handy for business analysts and other non-technical stakeholders who might wish to import the results of the analysis into Excel or business applications.

#!/usr/bin/env python3
# Purpose: Submit Spark job to EMR Master Node
# Author: Gary A. Stafford (December 2020)
# Usage Example: python3 ./submit_spark_ssh.py \
# –ec2-key-path ~/.ssh/emr-demo-123456789012-us-east-1.pem
import argparse
import logging
import boto3
from paramiko import SSHClient, AutoAddPolicy
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
def main():
args = parse_args()
params = get_parameters()
submit_job(params['master_public_dns'], 'hadoop', args.ec2_key_path, params['work_bucket'])
def submit_job(master_public_dns, username, ec2_key_path, work_bucket):
"""Submit job to EMR Master Node"""
ssh = SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect(hostname=master_public_dns, username=username, key_filename=ec2_key_path)
stdin_, stdout_, stderr_ = ssh.exec_command(
command=f"""
spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://{work_bucket}/analyze/bakery_sales_ssm.py"""
)
stdout_lines = ''
while not stdout_.channel.exit_status_ready():
if stdout_.channel.recv_ready():
stdout_lines = stdout_.readlines()
logging.info(' '.join(map(str, stdout_lines)))
ssh.close()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'master_public_dns': ssm_client.get_parameter(Name='/emr_demo/master_public_dns')['Parameter']['Value'],
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–ec2-key-path', required=True, help='EC2 Key Path')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw submit_spark_ssh.py hosted with ❤ by GitHub

Earlier, we created an inbound rule to allow your IP address to access the Master node on port 22. From the EMR Console’s Cluster Summary tab, note the command necessary to SSH into the Master node of the EMR cluster.

EMR Console’s Cluster Summary tab

The Python script, submit_spark_ssh.py, shown below, will submit the PySpark job to the EMR Master Node, using paramiko, a Python implementation of SSHv2. The script is replicating the same functionality as the shell-based SSH command above to execute a remote command on the EMR Master Node. The spark-submit command is on lines 36–38, below.

#!/usr/bin/env python3
# Purpose: Submit Spark job to EMR Master Node
# Author: Gary A. Stafford (December 2020)
# Usage Example: python3 ./submit_spark_ssh.py \
# –ec2-key-path ~/.ssh/emr-demo-123456789012-us-east-1.pem
import argparse
import logging
import boto3
from paramiko import SSHClient, AutoAddPolicy
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
def main():
args = parse_args()
params = get_parameters()
submit_job(params['master_public_dns'], 'hadoop', args.ec2_key_path, params['work_bucket'])
def submit_job(master_public_dns, username, ec2_key_path, work_bucket):
"""Submit job to EMR Master Node"""
ssh = SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect(hostname=master_public_dns, username=username, key_filename=ec2_key_path)
stdin_, stdout_, stderr_ = ssh.exec_command(
command=f"""
spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://{work_bucket}/analyze/bakery_sales_ssm.py"""
)
stdout_lines = ''
while not stdout_.channel.exit_status_ready():
if stdout_.channel.recv_ready():
stdout_lines = stdout_.readlines()
logging.info(' '.join(map(str, stdout_lines)))
ssh.close()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'master_public_dns': ssm_client.get_parameter(Name='/emr_demo/master_public_dns')['Parameter']['Value'],
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–ec2-key-path', required=True, help='EC2 Key Path')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw submit_spark_ssh.py hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to submit the job. The script requires one input parameter, which is the path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem)

python3 ./scripts/submit_spark_ssh.py \
    --ec2-key-path </path/to/my-key-pair.pem>

The spark-submit command will be executed remotely on the EMR cluster’s Master node over SSH. All variables in the commands will be replaced by the environment variables, set in advance, which use AWS CLI emr and ssm commands.

Image for post
Remote SSH submission of a Spark job

Monitoring Spark Jobs

We set spark.yarn.submit.waitAppCompletion to true. According to Spark’s documentation, this property controls whether the client waits to exit in YARN cluster mode until the application is completed. If set to true, the client process will stay alive, reporting the application’s status. Otherwise, the client process will exit after submission. We can watch the job’s progress from the terminal.

Image for post
PySpark application shown running on EMR’s Master node

We can also use the YARN Timeline Server and the Spark History Server in addition to the terminal. Links to both are shown on both the EMR Console’s Cluster ‘Summary’ and ‘Application user interfaces’ tabs. Unlike other EMR application web interfaces, using port forwarding, also known as creating an SSH tunnel, is not required for the YARN Timeline Server or the Spark History Server.

Image for post
EMR Console’s Cluster Application user interfaces tab

YARN Timeline Server

Below, we see that the job we submitted running on the YARN Timeline Server also includes useful tools like access to configuration, local logs, server stacks, and server metrics.

Image for post
YARN Timeline Server

YARN Timeline Server allows us to drill down into individual jobs and view logs. Logs are ideal for troubleshooting failed jobs, especially the stdout logs.

Image for post

Spark History Server

You can also view the PySpark application we submitted from the Master node using the Spark History Server. Below, we see completed Spark applications (aka Spark jobs) in the Spark History Server.

Image for post
Spark History Server completed applications

Below, we see more details about our Spark job using the Spark History Server.

Image for post
Spark History Server’s Jobs tab

We can even see visual representations of each Spark job’s Directed Acyclic Graph (DAG).

Image for post
Spark History Server’s Jobs tab

3. Run Job Flow on an Auto-Terminating EMR Cluster

The next option to run PySpark applications on EMR is to create a short-lived, auto-terminating EMR cluster using the run_job_flow method. We will create a new EMR cluster, run a series of Steps (PySpark applications), and then auto-terminate the cluster. This is a cost-effective method of running PySpark applications on-demand.

We will create a second 3-node EMR v6.2.0 cluster to demonstrate this method, using Amazon EC2 Spot instances for all the EMR cluster’s Master and Core nodes. Unlike the first, long-lived, more general-purpose EMR cluster, we will only deploy the Spark application to this cluster as that is the only application we will need to run the Steps.

Using the run_job_flow method, we will execute the four PySpark data analysis applications. The PySpark application’s spark-submit commands are defined in a separate JSON-format file, job_flow_steps_analyze.json. Similar to the previous add_job_flow_steps.py script, this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python execution script. Also similar, this script retrieves parameter values from the SSM Parameter Store.

#!/usr/bin/env python3
# Purpose: Create a new EMR cluster and submits a variable
# number of Steps defined in a separate JSON file
# Author: Gary A. Stafford (November 2020)
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
from scripts.parameters import parameters
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
emr_client = boto3.client('emr')
def main():
args = parse_args()
params = parameters.get_parameters()
steps = get_steps(params, args.job_type)
run_job_flow(params, steps)
def run_job_flow(params, steps):
"""Create EMR cluster, run Steps, and then terminate cluster"""
try:
response = emr_client.run_job_flow(
Name='demo-cluster-run-job-flow',
LogUri=f's3n://{params["logs_bucket"]}',
ReleaseLabel='emr-6.2.0',
Instances={
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.2xlarge',
},
],
},
],
'Ec2KeyName': params['ec2_key_name'],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': params['ec2_subnet_id'],
},
Steps=steps,
BootstrapActions=[
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': f's3://{params["bootstrap_bucket"]}/bootstrap_actions.sh',
}
},
],
Applications=[
{
'Name': 'Spark'
},
],
Configurations=[
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
VisibleToAllUsers=True,
JobFlowRole=params['emr_ec2_role'],
ServiceRole=params['emr_role'],
Tags=[
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'EMR Demo Project Cluster'
},
{
'Key': 'Owner',
'Value': 'Data Analytics'
},
],
EbsRootVolumeSize=32,
StepConcurrencyLevel=5,
)
print(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def get_steps(params, job_type):
"""
Load EMR Steps from a separate JSON-format file and substitutes tags for SSM parameter values
"""
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
file = open(f'{dir_path}/job_flow_steps/job_flow_steps_{job_type}.json', 'r')
steps = json.load(file)
new_steps = []
for step in steps:
step['HadoopJarStep']['Args'] = list(
map(lambda st: str.replace(st, '{{ work_bucket }}', params['work_bucket']), step['HadoopJarStep']['Args']))
new_steps.append(step)
return new_steps
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-t', '–job-type', required=True, choices=['process', 'analyze'], help='process or analysis')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw run_job_flow.py hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to create a new cluster, run the two PySpark applications, and then auto-terminate.

python3 ./scripts/run_job_flow.py --job-type analyze

As shown below, we see the short-lived EMR cluster in the process of terminating after successfully running the PySpark applications as EMR Steps.

Image for post
AWS EMR Console’s Cluster Steps tab
Image for post
AWS EMR Console’s Cluster Summary tab

4. Using AWS Step Functions

According to AWS, AWS Step Functions is a serverless function orchestrator that makes it easy to sequence AWS Lambda functions and multiple AWS services. Step Functions manages sequencing, error handling, retry logic, and state, removing a significant operational burden from your team. Step Functions is based on state machines and tasks. A state machine is a workflow. A task is a state in a workflow that represents a single unit of work that another AWS service performs. Each step in a workflow is a state. Using AWS Step Functions, we define our workflows as state machines, which transform complex code into easy to understand statements and diagrams.

Image for post
AWS Step Function Console’s State Machine Edit tab

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.

We will create two state machines for this demo, one for the PySpark data processing applications and one for the PySpark data analysis applications. To create state machines, we first need to create JSON-based state machine definition files. The files are written in Amazon States Language. According to AWS, Amazon States Language is a JSON-based, structured language used to define a state machine, a collection of states that can do work (Task states), determine which states to transition to next (Choice states), stop execution with an error (Fail states), and so on.

The definition files contain specific references to AWS resources deployed to your AWS account originally created by CloudFormation. Below is a snippet of the state machine definition file, step_function_emr_analyze.json, showing part of the configuration of the EMR cluster. Note the parameterized key/value pairs (e.g., “Ec2KeyName.$”: “$.InstancesEc2KeyName” on line 5). The values will come from a JSON-formatted inputs file and are dynamically replaced upon the state machine’s execution.

"ServiceRole": "$.ServiceRole",
"JobFlowRole.$": "$.JobFlowRole",
"LogUri.$": "$.LogUri",
"Instances": {
"Ec2KeyName.$": "$.InstancesEc2KeyName",
"Ec2SubnetId.$": "$.InstancesEc2SubnetId",
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
]
}

Python Templating

To automate the process of adding dynamic resource references to the state machine’s inputs files, we will use Jinja, the modern and designer-friendly templating language for Python, modeled after Django’s templates. We will render the Jinja template to a JSON-based state machine inputs file, replacing the template’s resource tags (keys) with values from the SSM Parameter Store’s parameters. Below is a snippet from the inputs file Jinja template, step_function_inputs_analyze.j2.

{
"CreateCluster": true,
"TerminateCluster": true,
"ScriptBootstrapActionScript": "s3://{{ bootstrap_bucket }}/bootstrap_actions.sh",
"LogUri": "s3n://{{ logs_bucket }}",
"InstancesEc2KeyName": "{{ ec2_key_name }}",
"InstancesEc2SubnetId": "{{ ec2_subnet_id}}",
"JobFlowRole": "{{ emr_ec2_role }}",
"ServiceRole": "{{ emr_role }}",
"ArgsBakerySalesSsm": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ work_bucket }}/analyze/bakery_sales_ssm.py"
]
}

First, install Jinja2, then create two JSON-based state machine inputs files from the Jinja templates using the included Python file.

# install Jinja2
python3 -m pip install Jinja2

python3 ./scripts/create_inputs_files.py

Below we see the same snippet of the final inputs file. Jinja tags have been replaced with values from the SSM Parameter Store.

{
"CreateCluster": true,
"TerminateCluster": true,
"ScriptBootstrapActionScript": "s3://emr-demo-bootstrap-123456789012-us-east-1/bootstrap_actions.sh",
"LogUri": "s3n://emr-demo-logs-123456789012-us-east-1",
"InstancesEc2KeyName": "emr-demo-123456789012-us-east-1",
"InstancesEc2SubnetId": "subnet-06ab61f888a932d12",
"JobFlowRole": "EMR_EC2_DemoRole",
"ServiceRole": "EMR_DemoRole",
"ArgsBakerySalesSsm": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://emr-demo-work-123456789012-us-east-1/analyze/bakery_sales_ssm.py"
]
}

Using the definition files, create two state machines using the included Python files.

python3 ./scripts/create_state_machine.py \
    --definition-file step_function_emr_process.json \
    --state-machine EMR-Demo-Process

python3 ./scripts/create_state_machine.py \
    --definition-file step_function_emr_analyze.json \
    --state-machine EMR-Demo-Analysis

Both state machines should appear in the AWS Step Functions Console’s State Machines tab. Below, we see the ‘EMR-Demo-Analysis’ state machine’s definition both as JSON and rendered visually to a layout.

Image for post
AWS Step Function Console’s State Machine Edit tab

To execute either of the state machines, use the included Python file, passing in the exact name of the state machine to execute, either ‘EMR-Demo-Process’ or ‘EMR-Demo-Analysis’, and the name of the inputs file. I suggest running the EMR-Demo-Analysis version so as not to re-process all the raw data.

python3 ./scripts/execute_state_machine.py \
    --state-machine EMR-Demo-Process \
    --inputs-file step_function_inputs_process.json

python3 ./scripts/execute_state_machine.py \
    --state-machine EMR-Demo-Analysis \
    --inputs-file step_function_inputs_analyze.json

When the PySpark analysis application’s Step Function state machine is executed, a new EMR cluster is created, the PySpark applications are run, and finally, the cluster is auto-terminated. Below, we see a successfully executed state machine, which successfully ran the four PySpark analysis applications in parallel, on a new auto-terminating EMR cluster.

Image for post
AWS Step Function Console’s State Machine Execution tab

Conclusion

This post explored four methods for running PySpark applications on Amazon Elastic MapReduce (Amazon EMR). The key to scaling data analytics with PySpark on EMR is the use of automation. Therefore, we looked at ways to automate the deployment of EMR resources, create and submit PySpark jobs, and terminate EMR resources when the jobs are complete. Furthermore, we were able to decouple references to dynamic AWS resources within our PySpark applications using parameterization. This allows us to deploy and run PySpark resources across multiple AWS Accounts and AWS Regions without code changes.

In part two of the series, we will explore the use of the recently announced service, Amazon Managed Workflows for Apache Airflow (MWAA), and in part three, the use of Juypter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

Java Development with Microsoft SQL Server: Calling Microsoft SQL Server Stored Procedures from Java Applications Using JDBC

Introduction

Enterprise software solutions often combine multiple technology platforms. Accessing an Oracle database via a Microsoft .NET application and vice-versa, accessing Microsoft SQL Server from a Java-based application is common. In this post, we will explore the use of the JDBC (Java Database Connectivity) API to call stored procedures from a Microsoft SQL Server 2017 database and return data to a Java 11-based console application.

View of the post’s Java project from JetBrains’ IntelliJ IDE

The objectives of this post include:

  • Demonstrate the differences between using static SQL statements and stored procedures to return data.
  • Demonstrate three types of JDBC statements to return data: Statement, PreparedStatement, and CallableStatement.
  • Demonstrate how to call stored procedures with input and output parameters.
  • Demonstrate how to return single values and a result set from a database using stored procedures.

Why Stored Procedures?

To access data, many enterprise software organizations require their developers to call stored procedures within their code as opposed to executing static T-SQL (Transact-SQL) statements against the database. There are several reasons stored procedures are preferred:

  • Optimization: Stored procedures are often written by DBAs or database developers who specialize in database development. They understand the best way to construct queries for optimal performance and minimal load on the database server. Think of it as a developer using an API to interact with the database.
  • Safety and Security: Stored procedures are considered safer and more secure than static SQL statements. The stored procedure provides tight control over the content of the queries, preventing malicious or unintentionally destructive code from being executed against the database.
  • Error Handling: Stored procedures can contain logic for handling errors before they bubble up to the application layer and possibly to the end-user.

AdventureWorks 2017 Database

For brevity, I will use an existing and well-known Microsoft SQL Server database, AdventureWorks. The AdventureWorks database was originally published by Microsoft for SQL Server 2008. Although a bit dated architecturally, the database comes prepopulated with plenty of data for demonstration purposes.

The HumanResources schema, one of five schemas within the AdventureWorks database

For the demonstration, I have created an Amazon RDS for SQL Server 2017 Express Edition instance on AWS. You have several options for deploying SQL Server, including AWS, Microsoft Azure, Google Cloud, or installed on your local workstation.

There are many methods to deploy the AdventureWorks database to Microsoft SQL Server. For this post’s demonstration, I used the AdventureWorks2017.bak backup file, which I copied to Amazon S3. Then, I enabled and configured the native backup and restore feature of Amazon RDS for SQL Server to import and install the backup.

DROP DATABASE IF EXISTS AdventureWorks;
GO

EXECUTE msdb.dbo.rds_restore_database
@restore_db_name='AdventureWorks',
@s3_arn_to_restore_from='arn:aws:s3:::my-bucket/AdventureWorks2017.bak',
@type='FULL',
@with_norecovery=0;

-- get task_id from output (e.g. 1)

EXECUTE msdb.dbo.rds_task_status
@db_name='AdventureWorks',
@task_id=1;

Install Stored Procedures

For the demonstration, I have added four stored procedures to the AdventureWorks database to use in this post. To follow along, you will need to install these stored procedures, which are included in the GitHub project.

View of the new stored procedures from JetBrains’ IntelliJ IDE Database tab

Data Sources, Connections, and Properties

Using the latest Microsoft JDBC Driver 8.4 for SQL Server (ver. 8.4.1.jre11), we create a SQL Server data source, com.microsoft.sqlserver.jdbc.SQLServerDataSource, and database connection, java.sql.Connection. There are several patterns for creating and working with JDBC data sources and connections. This post does not necessarily focus on the best practices for creating or using either. In this example, the application instantiates a connection class, SqlConnection.java, which in turn instantiates the java.sql.Connection and com.microsoft.sqlserver.jdbc.SQLServerDataSource objects. The data source’s properties are supplied from an instance of a singleton class, ProjectProperties.java. This properties class instantiates the java.util.Properties class, which reads values from a configuration properties file, config.properties. On startup, the application creates the database connection, calls each of the example methods, and then closes the connection.

Examples

For each example, I will show the stored procedure, if applicable, followed by the Java method that calls the procedure or executes the static SQL statement. I have left out the data source and connection code in the article. Again, a complete copy of all the code for this article is available on GitHub, including Java source code, SQL statements, helper SQL scripts, and a set of basic JUnit tests.

To run the JUnit unit tests, using Gradle, which the project is based on, use the ./gradlew cleanTest test --warning-mode none command.

A successful run of the JUnit tests

To build and run the application, using Gradle, which the project is based on, use the ./gradlew run --warning-mode none command.

The output of the Java console application

Example 1: SQL Statement

Before jumping into stored procedures, we will start with a simple static SQL statement. This example’s method, getAverageProductWeightST, uses the java.sql.Statement class. According to Oracle’s JDBC documentation, the Statement object is used for executing a static SQL statement and returning the results it produces. This SQL statement calculates the average weight of all products in the AdventureWorks database. It returns a solitary double numeric value. This example demonstrates one of the simplest methods for returning data from SQL Server.

/**
* Statement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightST() {
double averageWeight = 0;
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.getConnection().createStatement();
String sql = "WITH Weights_CTE(AverageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2)" +
"FROM [Weights_CTE];";
rs = stmt.executeQuery(sql);
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 2: Prepared Statement

Next, we will execute almost the same static SQL statement as in Example 1. The only change is the addition of the column name, averageWeight. This allows us to parse the results by column name, making the code easier to understand as opposed to using the numeric index of the column as in Example 1.

Also, instead of using the java.sql.Statement class, we use the java.sql.PreparedStatement class. According to Oracle’s documentation, a SQL statement is precompiled and stored in a PreparedStatement object. This object can then be used to execute this statement multiple times efficiently.

/**
* PreparedStatement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightPS() {
double averageWeight = 0;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
String sql = "WITH Weights_CTE(averageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2) AS [averageWeight]" +
"FROM [Weights_CTE];";
pstmt = connection.getConnection().prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
averageWeight = rs.getDouble("averageWeight");
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 3: Callable Statement

In this example, the average product weight query has been moved into a stored procedure. The procedure is identical in functionality to the static statement in the first two examples. To call the stored procedure, we use the java.sql.CallableStatement class. According to Oracle’s documentation, the CallableStatement extends PreparedStatement. It is the interface used to execute SQL stored procedures. The CallableStatement accepts both input and output parameters; however, this simple example does not use either. Like the previous two examples, the procedure returns a double numeric value.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeight]
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
ResultSet rs = null;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeight]}");
cstmt.execute();
rs = cstmt.getResultSet();
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 4: Calling a Stored Procedure with an Output Parameter

In this example, we use almost the same stored procedure as in Example 3. The only difference is the inclusion of an output parameter. This time, instead of returning a result set with a value in a single unnamed column, the column has a name, averageWeight. We can now call that column by name when retrieving the value.

The stored procedure patterns found in Examples 3 and 4 are both commonly used. One procedure uses an output parameter, and one not, both return the same value(s). You can use the CallableStatement to for either type.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeightOUT]@averageWeight DECIMAL(8, 2) OUT
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT @averageWeight = ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) output parameter, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightOutCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeightOUT](?)}");
cstmt.registerOutParameter("averageWeight", Types.DECIMAL);
cstmt.execute();
averageWeight = cstmt.getDouble("averageWeight");
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 5: Calling a Stored Procedure with an Input Parameter

In this example, the procedure returns a result set, java.sql.ResultSet, of employees whose last name starts with a particular sequence of characters (e.g., ‘M’ or ‘Sa’). The sequence of characters is passed as an input parameter, lastNameStartsWith, to the stored procedure using the CallableStatement.

The method making the call iterates through the rows of the result set returned by the stored procedure, concatenating multiple columns to form the employee’s full name as a string. Each full name string is then added to an ordered collection of strings, a List<String> object. The List instance is returned by the method. You will notice this procedure takes a little longer to run because of the use of the LIKE operator. The database server has to perform pattern matching on each last name value in the table to determine the result set.

CREATE OR
ALTER PROCEDURE [HumanResources].[uspGetEmployeesByLastName]
@lastNameStartsWith VARCHAR(20) = 'A'
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[FirstName], p.[MiddleName], p.[LastName], p.[Suffix], e.[JobTitle], m.[EmailAddress]
FROM [HumanResources].[Employee] AS e
LEFT JOIN [Person].[Person] p ON e.[BusinessEntityID] = p.[BusinessEntityID]
LEFT JOIN [Person].[EmailAddress] m ON e.[BusinessEntityID] = m.[BusinessEntityID]
WHERE e.[CurrentFlag] = 1
AND p.[PersonType] = 'EM'
AND p.[LastName] LIKE @lastNameStartsWith + '%'
ORDER BY p.[LastName], p.[FirstName], p.[MiddleName]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) input parameter, returns ResultSet
*
*
@param lastNameStartsWith
*
@return List of employee names
*/
public List<String> getEmployeesByLastNameCS(String lastNameStartsWith) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<String> employeeFullName = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [HumanResources].[uspGetEmployeesByLastName](?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("lastNameStartsWith", lastNameStartsWith);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
employeeFullName.add(
rs.getString("LastName") + ", "
+ rs.getString("FirstName") + " "
+ rs.getString("MiddleName"));
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return employeeFullName;
}

Example 6: Converting a Result Set to Ordered Collection of Objects

In this last example, we pass two input parameters, productColor and productSize, to a slightly more complex stored procedure. The stored procedure returns a result set containing several columns of product information. This time, the example’s method iterates through the result set returned by the procedure and constructs an ordered collection of products, List<Product> object. The Product objects in the list are instances of the Product.java POJO class. The method converts each results set’s row’s field value into a Product property (e.g., Product.Size, Product.Model). Using a collection is a common method for persisting data from a result set in an application.

CREATE OR
ALTER PROCEDURE [Production].[uspGetProductsByColorAndSize]
@productColor VARCHAR(20),
@productSize INTEGER
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[ProductNumber], m.[Name] AS [Model], p.[Name] AS [Product], p.[Color], p.[Size]
FROM [Production].[ProductModel] AS m
INNER JOIN
[Production].[Product] AS p ON m.[ProductModelID] = p.[ProductModelID]
WHERE (p.[Color] = @productColor)
AND (p.[Size] = @productSize)
ORDER BY p.[ProductNumber], [Model], [Product]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (2) input parameters, returns ResultSet
*
*
@param color
*
@param size
*
@return List of Product objects
*/
public List<Product> getProductsByColorAndSizeCS(String color, String size) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<Product> productList = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetProductsByColorAndSize](?, ?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("productColor", color);
cstmt.setString("productSize", size);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
Product product = new Product(
rs.getString("Product"),
rs.getString("ProductNumber"),
rs.getString("Color"),
rs.getString("Size"),
rs.getString("Model"));
productList.add(product);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return productList;
}

Proper T-SQL: Schema Reference and Brackets

You will notice in all T-SQL statements, I refer to the schema as well as the table or stored procedure name (e.g., {call [Production].[uspGetAverageProductWeightOUT](?)}). According to Microsoft, it is always good practice to refer to database objects by a schema name and the object name, separated by a period; that even includes the default schema (e.g., dbo).

You will also notice I wrap the schema and object names in square brackets (e.g., SELECT [ProductNumber] FROM [Production].[ProductModel]). The square brackets are to indicate that the name represents an object and not a reserved word (e.g, CURRENT or NATIONAL). By default, SQL Server adds these to make sure the scripts it generates run correctly.

Running the Examples

The application will display the name of the method being called, a description, the duration of time it took to retrieve the data, and the results returned by the method.

package com.article.examples;
import java.util.List;
/**
* Main class that calls all example methods
*
* @author Gary A. Stafford
*/
public class RunExamples {
private static final Examples examples = new Examples();
private static final ProcessTimer timer = new ProcessTimer();
/**
* @param args the command line arguments
* @throws Exception
*/
public static void main(String[] args) throws Exception {
System.out.println();
System.out.println("SQL SERVER STATEMENT EXAMPLES");
System.out.println("======================================");
// Statement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
double averageWeight = examples.getAverageProductWeightST();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightST");
System.out.println("Description: Statement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// PreparedStatement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightPS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightPS");
System.out.println("Description: PreparedStatement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// CallableStatement, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightCS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightCS");
System.out.println("Description: CallableStatement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.println("");
// CallableStatement example, (1) output parameter, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightOutCS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightOutCS");
System.out.println("Description: CallableStatement, (1) output parameter, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// CallableStatement example, (1) input parameter, returns ResultSet
timer.setStartTime(System.nanoTime());
String lastNameStartsWith = "Sa";
List<String> employeeFullName =
examples.getEmployeesByLastNameCS(lastNameStartsWith);
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetEmployeesByLastNameCS");
System.out.println("Description: CallableStatement, (1) input parameter, returns ResultSet");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Last names starting with '%s': %d%n", lastNameStartsWith, employeeFullName.size());
if (employeeFullName.size() > 0) {
System.out.printf(" Last employee found: %s%n", employeeFullName.get(employeeFullName.size() 1));
} else {
System.out.printf("No employees found with last name starting with '%s'%n", lastNameStartsWith);
}
System.out.println("");
// CallableStatement example, (2) input parameters, returns ResultSet
timer.setStartTime(System.nanoTime());
String color = "Red";
String size = "44";
List<Product> productList =
examples.getProductsByColorAndSizeCS(color, size);
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetProductsByColorAndSizeCS");
System.out.println("Description: CallableStatement, (2) input parameter, returns ResultSet");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
if (productList.size() > 0) {
System.out.printf("Results: Products found (color: '%s', size: '%s'): %d%n", color, size, productList.size());
System.out.printf(" First product: %s (%s)%n", productList.get(0).getProduct(), productList.get(0).getProductNumber());
} else {
System.out.printf("No products found with color '%s' and size '%s'%n", color, size);
}
System.out.println("");
examples.closeConnection();
}
}
view raw RunExamples.java hosted with ❤ by GitHub

Below, we see the results.

SQL Statement Performance

This post is certainly not about SQL performance, demonstrated by the fact I am only using Amazon RDS for SQL Server 2017 Express Edition on a single, very underpowered db.t2.micro Amazon RDS instance types. However, I have added a timer feature, ProcessTimer.java class, to capture the duration of time each example takes to return data, measured in milliseconds. The ProcessTimer.java class is part of the project code. Using the timer, you should observe significant differences between the first run and proceeding runs of the application for several of the called methods. The time difference is a result of several factors, primarily pre-compilation of the SQL statements and SQL Server plan caching.

The effects of these two factors are easily demonstrated by clearing the SQL Server plan cache (see SQL script below) using DBCC (Database Console Commands) statements. and then running the application twice in a row. The second time, pre-compilation and plan caching should result in significantly faster times for the prepared statements and callable statements, in Examples 2–6. In the two random runs shown below, we see up to a 497% improvement in query time.

USE AdventureWorks;
DBCC FREESYSTEMCACHE('SQL Plans');
GO
CHECKPOINT;
GO
-- Impossible to run with Amazon RDS for Microsoft SQL Server on AWS
-- DBCC DROPCLEANBUFFERS;
-- GO

The first run results are shown below.

SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 122
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 146
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 72
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 623
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 830
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 427
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---

The second run results are shown below.

SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 116
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 89
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 80
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 340
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 139
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 208
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---

Conclusion

This post has demonstrated several methods for querying and calling stored procedures from a SQL Server 2017 database using JDBC with the Microsoft JDBC Driver 8.4 for SQL Server. Although the examples are quite simple, the same patterns can be used with more complex stored procedures, with multiple input and output parameters, which not only select, but insert, update, and delete data.

There are some limitations of the Microsoft JDBC Driver for SQL Server you should be aware of by reading the documentation. However, for most tasks that require database interaction, the Driver provides adequate functionality with SQL Server.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

Leave a comment

Getting Started with Presto Federated Queries using Ahana’s PrestoDB Sandbox on AWS

Introduction

According to The Presto Foundation, Presto (aka PrestoDB), not to be confused with PrestoSQL, is an open-source, distributed, ANSI SQL compliant query engine. Presto is designed to run interactive ad-hoc analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto is used in production at an immense scale by many well-known organizations, including Facebook, Twitter, Uber, Alibaba, Airbnb, Netflix, Pinterest, Atlassian, Nasdaq, and more.

In the following post, we will gain a better understanding of Presto’s ability to execute federated queries, which join multiple disparate data sources without having to move the data. Additionally, we will explore Apache Hive, the Hive Metastore, Hive partitioned tables, and the Apache Parquet file format.

Presto on AWS

There are several options for Presto on AWS. AWS recommends Amazon EMR and Amazon Athena. Presto comes pre-installed on EMR 5.0.0 and later. The Athena query engine is a derivation of Presto 0.172 and does not support all of Presto’s native features. However, Athena has many comparable features and deep integrations with other AWS services. If you need full, fine-grain control, you could deploy and manage Presto, yourself, on Amazon EC2, Amazon ECS, or Amazon EKS. Lastly, you may decide to purchase a Presto distribution with commercial support from an AWS Partner, such as Ahana or Starburst. If your organization needs 24x7x365 production-grade support from experienced Presto engineers, this is an excellent choice.

Federated Queries

In a modern Enterprise, it is rare to find all data living in a monolithic datastore. Given the multitude of available data sources, internal and external to an organization, and the growing number of purpose-built databases, analytics engines must be able to join and aggregate data across many sources efficiently. AWS defines a federated query as a capability that ‘enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources.

Presto allows querying data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, and MongoDB. In fact, there are currently 24 different Presto data source connectors available. With Presto, we can write queries that join multiple disparate data sources, without moving the data. Below is a simple example of a Presto federated query statement that correlates a customer’s credit rating with their age and gender. The query federates two different data sources, a PostgreSQL database table, postgresql.public.customer, and an Apache Hive Metastore table, hive.default.customer_demographics, whose underlying data resides in Amazon S3.

WITH credit_demographics AS (
SELECT
(year (now()) c_birth_year) AS age,
cd_credit_rating AS credit_rating,
cd_gender AS gender,
count(cd_gender) AS gender_count
FROM
postgresql.public.customer
LEFT JOIN hive.default.customer_demographics ON c_current_cdemo_sk = cd_demo_sk
WHERE
c_birth_year IS NOT NULL
AND cd_credit_rating IS NOT NULL
AND lower(cd_credit_rating) != 'unknown'
AND cd_gender IS NOT NULL
GROUP BY
cd_credit_rating,
c_birth_year,
cd_gender
)
SELECT
age,
credit_rating,
gender,
gender_count
FROM
credit_demographics
WHERE
age BETWEEN 21 AND 65
ORDER BY
age,
credit_rating,
gender;

Ahana

The Linux Foundation’s Presto Foundation member, Ahana, was founded as the first company focused on bringing PrestoDB-based ad hoc analytics offerings to market and working to foster growth and evangelize the Presto community. Ahana’s mission is to simplify ad hoc analytics for organizations of all shapes and sizes. Ahana has been successful in raising seed funding, led by GV (formerly Google Ventures). Ahana’s founders have a wealth of previous experience in tech companies, including Alluxio, Kinetica, Couchbase, IBM, Apple, Splunk, and Teradata.

PrestoDB Sandbox

This post will use Ahana’s PrestoDB Sandbox, an Amazon Linux 2, AMI-based solution available on AWS Marketplace, to execute Presto federated queries.

Ahana’s PrestoDB Sandbox AMI allows you to easily get started with Presto to query data wherever your data resides. This AMI configures a single EC2 instance Sandbox to be both the Presto Coordinator and a Presto Worker. It comes with an Apache Hive Metastore backed by PostgreSQL bundled in. In addition, the following catalogs are bundled in to try, test, and prototype with Presto:

  • JMX: useful for monitoring and debugging Presto
  • Memory: stores data and metadata in RAM, which is discarded when Presto restarts
  • TPC-DS: provides a set of schemas to support the TPC Benchmark DS
  • TPC-H: provides a set of schemas to support the TPC Benchmark H

Apache Hive

In this demonstration, we will use Apache Hive and an Apache Hive Metastore backed by PostgreSQL. Apache Hive is data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. The structure can be projected onto data already in storage. A command-line tool and JDBC driver are provided to connect users to Hive. The Metastore provides two essential features of a data warehouse: data abstraction and data discovery. Hive accomplishes both features by providing a metadata repository that is tightly integrated with the Hive query processing system so that data and metadata are in sync.

Getting Started

To get started creating federated queries with Presto, we first need to create and configure our AWS environment, as shown below.

Architecture of the demonstration’s AWS environment and resources

Subscribe to Ahana’s PrestoDB Sandbox

To start, subscribe to Ahana’s PrestoDB Sandbox on AWS Marketplace. Make sure you are aware of the costs involved. The AWS current pricing for the default, Linux-based r5.xlarge on-demand EC2 instance hosted in US East (N. Virginia) is USD 0.252 per hour. For the demonstration, since performance is not an issue, you could try a smaller EC2 instance, such as r5.large instance costs USD 0.126 per hour.

The configuration process will lead you through the creation of an EC2 instance based on Ahana’s PrestoDB Sandbox AMI.

I chose to create the EC2 instance in my default VPC. Part of the demonstration includes connecting to Presto locally using JDBC. Therefore, it was also necessary to include a public IP address for the EC2 instance. If you chose to do so, I strongly recommend limiting the required ports 22 and 8080 in the instance’s Security Group to just your IP address (a /32 CIDR block).

Limiting access to ports 22 and 8080 from only my current IP address

Lastly, we need to assign an IAM Role to the EC2 instance, which has access to Amazon S3. I assigned the AWS managed policy, AmazonS3FullAccess, to the EC2’s IAM Role.

Attaching the AmazonS3FullAccess AWS managed policy to the Role

Part of the configuration also asks for a key pair. You can use an existing key or create a new key for the demo. For reference in future commands, I am using a key named ahana-presto and my key path of ~/.ssh/ahana-presto.pem. Be sure to update the commands to match your own key’s name and location.

Once complete, instructions for using the PrestoDB Sandbox EC2 are provided.

You can view the running EC2 instance, containing Presto, from the web-based AWS EC2 Management Console. Make sure to note the public IPv4 address or the public IPv4 DNS address as this value will be required during the demo.

AWS CloudFormation

We will use Amazon RDS for PostgreSQL and Amazon S3 as additional data sources for Presto. Included in the project files on GitHub is an AWS CloudFormation template, cloudformation/presto_ahana_demo.yaml. The template creates a single RDS for PostgreSQL instance in the default VPC and an encrypted Amazon S3 bucket.

AWSTemplateFormatVersion: "2010-09-09"
Description: "This template deploys a RDS PostgreSQL database and an Amazon S3 bucket"
Parameters:
DBInstanceIdentifier:
Type: String
Default: "ahana-prestodb-demo"
DBEngine:
Type: String
Default: "postgres"
DBEngineVersion:
Type: String
Default: "12.3"
DBAvailabilityZone:
Type: String
Default: "us-east-1f"
DBInstanceClass:
Type: String
Default: "db.t3.medium"
DBStorageType:
Type: String
Default: "gp2"
DBAllocatedStorage:
Type: Number
Default: 20
DBName:
Type: String
Default: "shipping"
DBUser:
Type: String
Default: "presto"
DBPassword:
Type: String
Default: "5up3r53cr3tPa55w0rd"
# NoEcho: True
Resources:
MasterDatabase:
Type: AWS::RDS::DBInstance
Properties:
DBInstanceIdentifier:
Ref: DBInstanceIdentifier
DBName:
Ref: DBName
AllocatedStorage:
Ref: DBAllocatedStorage
DBInstanceClass:
Ref: DBInstanceClass
StorageType:
Ref: DBStorageType
Engine:
Ref: DBEngine
EngineVersion:
Ref: DBEngineVersion
MasterUsername:
Ref: DBUser
MasterUserPassword:
Ref: DBPassword
AvailabilityZone: !Ref DBAvailabilityZone
PubliclyAccessible: true
Tags:
Key: Project
Value: "Demo of RDS PostgreSQL"
DataBucket:
DeletionPolicy: Retain
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Outputs:
Endpoint:
Description: "Endpoint of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Address
Port:
Description: "Port of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Port
JdbcConnString:
Description: "JDBC connection string of RDS PostgreSQL database"
Value: !Join
""
– "jdbc:postgresql://"
!GetAtt MasterDatabase.Endpoint.Address
":"
!GetAtt MasterDatabase.Endpoint.Port
"/"
!Ref DBName
"?user="
!Ref DBUser
"&password="
!Ref DBPassword
Bucket:
Description: "Name of Amazon S3 data bucket"
Value: !Ref DataBucket

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/presto-aws-federated-queries.git

To create the AWS CloudFormation stack from the template, cloudformation/rds_s3.yaml, execute the following aws cloudformation command. Make sure you change the DBAvailabilityZone parameter value (shown in bold) to match the AWS Availability Zone in which your Ahana PrestoDB Sandbox EC2 instance was created. In my case, us-east-1f.

aws cloudformation create-stack \
--stack-name ahana-prestodb-demo \
--template-body file://cloudformation/presto_ahana_demo.yaml \
--parameters ParameterKey=DBAvailabilityZone,ParameterValue=us-east-1f

To ensure the RDS for PostgreSQL database instance can be accessed by Presto running on the Ahana PrestoDB Sandbox EC2, manually add the PrestoDB Sandbox EC2’s Security Group to port 5432 within the database instance’s VPC Security Group’s Inbound rules. I have also added my own IP to port 5432, which enables me to connect to the RDS instance directly from my IDE using JDBC.

The AWS CloudFormation stack’s Outputs tab includes a set of values, including the JDBC connection string for the new RDS for PostgreSQL instance, JdbcConnString, and the Amazon S3 bucket’s name, Bucket. All these values will be required during the demonstration.

Preparing the PrestoDB Sandbox

There are a few steps we need to take to properly prepare the PrestoDB Sandbox EC2 for our demonstration. First, use your PrestoDB Sandbox EC2 SSH key to scp the properties and sql directories to the Presto EC2 instance. First, you will need to set the EC2_ENDPOINT value (shown in bold) to your EC2’s public IPv4 address or public IPv4 DNS value. You can hardcode the value or use the aws ec2 API command is shown below to retrieve the value programmatically.

# on local workstation
EC2_ENDPOINT=$(aws ec2 describe-instances \
--filters "Name=product-code,Values=ejee5zzmv4tc5o3tr1uul6kg2" \
"Name=product-code.type,Values=marketplace" \
--query "Reservations[*].Instances[*].{Instance:PublicDnsName}" \
--output text)
scp -i "~/.ssh/ahana-presto.pem" \
-r properties/ sql/ \
ec2-user@${EC2_ENDPOINT}:~/
ssh -i "~/.ssh/ahana-presto.pem" ec2-user@${EC2_ENDPOINT}

Environment Variables

Next, we need to set several environment variables. First, replace the DATA_BUCKET and POSTGRES_HOST values below (shown in bold) to match your environment. The PGPASSWORD value should be correct unless you changed it in the CloudFormation template. Then, execute the command to add the variables to your .bash_profile file.

echo """
export DATA_BUCKET=prestodb-demo-databucket-CHANGE_ME
export POSTGRES_HOST=presto-demo.CHANGE_ME.us-east-1.rds.amazonaws.com
export PGPASSWORD=5up3r53cr3tPa55w0rd
export JAVA_HOME=/usr
export HADOOP_HOME=/home/ec2-user/hadoop
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/tools/lib/*
export HIVE_HOME=/home/ec2-user/hive
export PATH=$HIVE_HOME/bin:$HADOOP_HOME/bin:$PATH
""" >>~/.bash_profile

Optionally, I suggest updating the EC2 instance with available updates and install your favorite tools, likehtop, to monitor the EC2 performance.

yes | sudo yum update
yes | sudo yum install htop
View of htop running on an r5.xlarge EC2 instance

Before further configuration for the demonstration, let’s review a few aspects of the Ahana PrestoDB EC2 instance. There are several applications pre-installed on the instance, including Java, Presto, Hadoop, PostgreSQL, and Hive. Versions shown are current as of early September 2020.

java -version
# openjdk version "1.8.0_252"
# OpenJDK Runtime Environment (build 1.8.0_252-b09)
# OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
hadoop version
# Hadoop 2.9.2
postgres --version
# postgres (PostgreSQL) 9.2.24
psql --version
# psql (PostgreSQL) 9.2.24
hive --version
# Hive 2.3.7
presto-cli --version
# Presto CLI 0.235-cb21100

The Presto configuration files are in the /etc/presto/ directory. The Hive configuration files are in the ~/hive/conf/ directory. Here are a few commands you can use to gain a better understanding of their configurations.

ls /etc/presto/
cat /etc/presto/jvm.config
cat /etc/presto/config.properties
cat /etc/presto/node.properties
# installed and configured catalogs
ls /etc/presto/catalog/
cat ~/hive/conf/hive-site.xml

Configure Presto

To configure Presto, we need to create and copy a new Presto postgresql catalog properties file for the newly created RDS for PostgreSQL instance. Modify the properties/rds_postgresql.properties file, replacing the value, connection-url (shown in bold), with your own JDBC connection string, shown in the CloudFormation Outputs tab.

connector.name=postgresql
connection-url=jdbc:postgresql://presto-demo.abcdefg12345.us-east-1.rds.amazonaws.com:5432/shipping
connection-user=presto
connection-password=5up3r53cr3tPa55w0rd

Move the rds_postgresql.properties file to its correct location using sudo.

sudo mv properties/rds_postgresql.properties /etc/presto/catalog/

We also need to modify the existing Hive catalog properties file, which will allow us to write to non-managed Hive tables from Presto.

connector.name=hive-hadoop2
hive.metastore.uri=thrift://localhost:9083
hive.non-managed-table-writes-enabled=true

The following command will overwrite the existing hive.properties file with the modified version containing the new property.

sudo mv properties/hive.properties |
/etc/presto/catalog/hive.properties

To finalize the configuration of the catalog properties files, we need to restart Presto. The easiest way is to reboot the EC2 instance, then SSH back into the instance. Since our environment variables are in the .bash_profile file, they will survive a restart and logging back into the EC2 instance.

sudo reboot

Add Tables to Apache Hive Metastore

We will use RDS for PostgreSQL and Apache Hive Metastore/Amazon S3 as additional data sources for our federated queries. The Ahana PrestoDB Sandbox instance comes pre-configured with Apache Hive and an Apache Hive Metastore, backed by PostgreSQL (a separate PostgreSQL 9.x instance pre-installed on the EC2).

The Sandbox’s instance of Presto comes pre-configured with schemas for the TPC Benchmark DS (TPC-DS). We will create identical tables in our Apache Hive Metastore, which correspond to three external tables in the TPC-DS data source’s sf1 schema: tpcds.sf1.customer, tpcds.sf1.customer_address, and tpcds.sf1.customer_demographics. A Hive external table describes the metadata/schema on external files. External table files can be accessed and managed by processes outside of Hive. As an example, here is the SQL statement that creates the external customer table in the Hive Metastore and whose data will be stored in the S3 bucket.

CREATE EXTERNAL TABLE IF NOT EXISTS `customer`(
`c_customer_sk` bigint,
`c_customer_id` char(16),
`c_current_cdemo_sk` bigint,
`c_current_hdemo_sk` bigint,
`c_current_addr_sk` bigint,
`c_first_shipto_date_sk` bigint,
`c_first_sales_date_sk` bigint,
`c_salutation` char(10),
`c_first_name` char(20),
`c_last_name` char(30),
`c_preferred_cust_flag` char(1),
`c_birth_day` integer,
`c_birth_month` integer,
`c_birth_year` integer,
`c_birth_country` char(20),
`c_login` char(13),
`c_email_address` char(50),
`c_last_review_date_sk` bigint)
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

The threeCREATE EXTERNAL TABLE SQL statements are included in the sql/ directory: sql/hive_customer.sql, sql/hive_customer_address.sql, and sql/hive_customer_demographics.sql. The bucket name (shown in bold above), needs to be manually updated to your own bucket name in all three files before continuing.

Next, run the following hive commands to create the external tables in the Hive Metastore within the existing default schema/database.

hive --database default -f sql/hive_customer.sql
hive --database default -f sql/hive_customer_address.sql
hive --database default -f sql/hive_customer_demographics.sql

To confirm the tables were created successfully, we could use a variety of hive commands.

hive --database default -e "SHOW TABLES;"
hive --database default -e "DESCRIBE FORMATTED customer;"
hive --database default -e "SELECT * FROM customer LIMIT 5;"
Using the ‘DESCRIBE FORMATTED customer_address;’ Hive command

Alternatively, you can also create the external table interactively from within Hive, using the hive command to access the CLI. Copy and paste the contents of the SQL files to the hive CLI. To exit hive use quit;.

Interactively querying within Apache Hive

Amazon S3 Data Source Setup

With the external tables created, we will now select all the data from each of the three tables in the TPC-DS data source and insert that data into the equivalent Hive tables. The physical data will be written to Amazon S3 as highly-efficient, columnar storage format, SNAPPY-compressed Apache Parquet files. Execute the following commands. I will explain why the customer_address table statements are a bit different, next.

# inserts 100,000 rows
presto-cli --execute """
INSERT INTO hive.default.customer
SELECT * FROM tpcds.sf1.customer;
"""
# inserts 50,000 rows across 52 partitions
presto-cli --execute """
INSERT INTO hive.default.customer_address
SELECT ca_address_sk, ca_address_id, ca_street_number,
ca_street_name, ca_street_type, ca_suite_number,
ca_city, ca_county, ca_zip, ca_country, ca_gmt_offset,
ca_location_type, ca_state
FROM tpcds.sf1.customer_address
ORDER BY ca_address_sk;
"""
# add new partitions in metastore
hive -e "MSCK REPAIR TABLE default.customer_address;"
# inserts 1,920,800 rows
presto-cli --execute """
INSERT INTO hive.default.customer_demographics
SELECT * FROM tpcds.sf1.customer_demographics;
"""

Confirm the data has been loaded into the correct S3 bucket locations and is in Parquet-format using the AWS Management Console or AWS CLI. Rest assured, the Parquet-format data is SNAPPY-compressed even though the S3 console incorrectly displays Compression as None. You can easily confirm the compression codec with a utility like parquet-tools.

Data organized by key prefixes in Amazon S3
Using S3’s ‘Select from’ feature to preview the SNAPPY-compressed Parquet format data

Partitioned Tables

The customer_address table is unique in that it has been partitioned by the ca_state column. Partitioned tables are created using the PARTITIONED BY clause.

CREATE EXTERNAL TABLE `customer_address`(
`ca_address_sk` bigint,
`ca_address_id` char(16),
`ca_street_number` char(10),
`ca_street_name` char(60),
`ca_street_type` char(15),
`ca_suite_number` char(10),
`ca_city` varchar(60),
`ca_county` varchar(30),
`ca_zip` char(10),
`ca_country` char(20),
`ca_gmt_offset` double precision,
`ca_location_type` char(20)
)
PARTITIONED BY (`ca_state` char(2))
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

According to Apache Hive, a table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Since the data for the Hive tables are stored in Amazon S3, that means that when the data is written to the customer_address table, it is automatically separated into different S3 key prefixes based on the state. The data is physically “partitioned”.

customer_address data, partitioned by the state, in Amazon S3

Whenever add new partitions in S3, we need to run the MSCK REPAIR TABLE command to add that table’s new partitions to the Hive Metastore.

hive -e "MSCK REPAIR TABLE default.customer_address;"

In SQL, a predicate is a condition expression that evaluates to a Boolean value, either true or false. Defining the partitions aligned with the attributes that are frequently used in the conditions/filters (predicates) of the queries can significantly increase query efficiency. When we execute a query that uses an equality comparison condition, such as ca_state = 'TN', partitioning means the query will only work with a slice of the data in the corresponding ca_state=TN prefix key. There are 50,000 rows of data in the customer_address table, but only 1,418 rows (2.8% of the total data) in the ca_state=TN partition. With the additional advantage of Parquet format with SNAPPY compression, partitioning can significantly reduce query execution time.

Adding Data to RDS for PostgreSQL Instance

For the demonstration, we will also replicate the schema and data of the tpcds.sf1.customer_address table to the new RDS for PostgreSQL instance’s shipping database.

CREATE TABLE customer_address (
ca_address_sk bigint,
ca_address_id char(16),
ca_street_number char(10),
ca_street_name char(60),
ca_street_type char(15),
ca_suite_number char(10),
ca_city varchar(60),
ca_county varchar(30),
ca_state char(2),
ca_zip char(10),
ca_country char(20),
ca_gmt_offset double precision,
ca_location_type char(20)
);

Like Hive and Presto, we can create the table programmatically from the command line or interactively; I prefer the programmatic approach. Use the following psql command, we can create the customer_address table in the public schema of the shipping database.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-f sql/postgres_customer_address.sql

Now, to insert the data into the new PostgreSQL table, run the following presto-cli command.

# inserts 50,000 rows
presto-cli --execute """
INSERT INTO rds_postgresql.public.customer_address
SELECT * FROM tpcds.sf1.customer_address;
"""

To confirm that the data was imported properly, we can use a variety of commands.

-- Should be 50000 rows in table
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT COUNT(*) FROM customer_address;"
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT * FROM customer_address LIMIT 5;"

Alternatively, you could use the PostgreSQL client interactively by copying and pasting the contents of the sql/postgres_customer_address.sql file to the psql command prompt. To interact with PostgreSQL from the psql command prompt, use the following command.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto

Use the \dt command to list the PostgreSQL tables and the \q command to exit the PostgreSQL client. We now have all the new data sources created and configured for Presto!

Interacting with Presto

Presto provides a web interface for monitoring and managing queries. The interface provides dashboard-like insights into the Presto Cluster and queries running on the cluster. The Presto UI is available on port 8080 using the public IPv4 address or the public IPv4 DNS.

There are several ways to interact with Presto, via the PrestoDB Sandbox. The post will demonstrate how to execute ad-hoc queries against Presto from an IDE using a JDBC connection and the Presto CLI. Other options include running queries against Presto from Java and Python applications, Tableau, or Apache Spark/PySpark.

Below, we see a query being run against Presto from JetBrains PyCharm, using a Java Database Connectivity (JDBC) connection. The advantage of using an IDE like JetBrains is having a single visual interface, including all the project files, multiple JDBC configurations, output results, and the ability to run multiple ad hoc queries.

Below, we see an example of configuring the Presto Data Source using the JDBC connection string, supplied in the CloudFormation stack Outputs tab.

Make sure to download and use the latest Presto JDBC driver JAR.

With JetBrains’ IDEs, we can even limit the databases/schemas displayed by the Data Source. This is helpful when we have multiple Presto catalogs configured, but we are only interested in certain data sources.

We can also run queries using the Presto CLI, three different ways. We can pass a SQL statement to the Presto CLI, pass a file containing a SQL statement to the Presto CLI, or work interactively from the Presto CLI. Below, we see a query being run, interactively from the Presto CLI.

As the query is running, we can observe the live Presto query statistics (not very user friendly in my terminal).

And finally, the view the query results.

Federated Queries

The example queries used in the demonstration and included in the project were mainly extracted from the scholarly article, Why You Should Run TPC-DS: A Workload Analysis, available as a PDF on the tpc.org website. I have modified the SQL queries to work with Presto.

In the first example, we will run the three versions of the same basic query statement. Version 1 of the query is not a federated query; it only queries a single data source. Version 2 of the query queries two different data sources. Finally, version 3 of the query queries three different data sources. Each of the three versions of the SQL statement should return the same results — 93 rows of data.

Version 1: Single Data Source

The first version of the query statement, sql/presto_query2.sql, is not a federated query. Each of the query’s four tables (catalog_returns, date_dim, customer, and customer_address) reference the TPC-DS data source, which came pre-installed with the PrestoDB Sandbox. Note table references on lines 11–13 and 41–42 are all associated with the tpcds.sf1 schema.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
catalog_returns,
date_dim,
customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
customer_address,
customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;
view raw presto_query2.sql hosted with ❤ by GitHub

We will run each query non-interactively using the presto-cli. We will choose the sf1 (scale factor of 1) tpcds schema. According to Presto, every unit in the scale factor (sf1, sf10, sf100) corresponds to a gigabyte of data.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2.sql \
--output-format ALIGNED \
--client-tags "presto_query2"

Below, we see the query results in the presto-cli.

Below, we see the first query running in Presto’s web interface.

Below, we see the first query’s results detailed in Presto’s web interface.

Version 2: Two Data Sources

In the second version of the query statement, sql/presto_query2_federated_v1.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. The other two tables (customer and customer_address) now reference the Apache Hive Metastore for their schema and underlying data in Amazon S3. Note table references on lines 11 and 12, as opposed to lines 13, 41, and 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
hive.default.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
hive.default.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v1.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v1"

Below, we see the second query’s results detailed in Presto’s web interface.

Even though the data is in two separate and physically different data sources, we can easily query it as though it were all in the same place.

Version 3: Three Data Sources

In the third version of the query statement, sql/presto_query2_federated_v2.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. One of the tables (hive.default.customer) references the Apache Hive Metastore. The fourth table (rds_postgresql.public.customer_address) references the new RDS for PostgreSQL database instance. The underlying data is in Amazon S3. Note table references on lines 11 and 12, and on lines 13 and 41, as opposed to line 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
rds_postgresql.public.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
rds_postgresql.public.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, we have run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v2.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v2"

Below, we see the third query’s results detailed in Presto’s web interface.

Again, even though the data is in three separate and physically different data sources, we can easily query it as though it were all in the same place.

Additional Query Examples

The project contains several additional query statements, which I have extracted from Why You Should Run TPC-DS: A Workload Analysis and modified work with Presto and federate across multiple data sources.

# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1.sql \
--output-format ALIGNED \
--client-tags "presto_query1"
# federated - two sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query1_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4.sql \
--output-format ALIGNED \
--client-tags "presto_query4"
# federated - three sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query4_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query5.sql \
--output-format ALIGNED \
--client-tags "presto_query5"

Conclusion

In this post, we gained a better understanding of Presto using Ahana’s PrestoDB Sandbox product from AWS Marketplace. We learned how Presto queries data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, MongoDB, etc. We also learned about Apache Hive and the Apache Hive Metastore, Apache Parquet file format, and how and why to partition Hive data in Amazon S3. Most importantly, we learned how to write federated queries that join multiple disparate data sources without having to move the data into a single monolithic data store.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

Leave a comment

Collecting and Analyzing IoT Data in Near Real-Time with AWS IoT, LoRa, and LoRaWAN

Introduction

In a recent post published on ITNEXT, LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT, we explored the use of the LoRa (Long Range) and LoRaWAN protocols to transmit and receive sensor data, over a substantial distance, between an IoT device, containing several embedded sensors, and an IoT gateway. In this post, we will extend that architecture to the Cloud, using AWS IoT, a broad and deep set of IoT services, from the edge to the Cloud. We will securely collect, transmit, and analyze IoT data using the AWS cloud platform.

LoRa and LoRaWAN

According to the LoRa Alliance, Low-Power, Wide-Area Networks (LPWAN) are projected to support a major portion of the billions of devices forecasted for the Internet of Things (IoT). LoRaWAN is designed from the bottom up to optimize LPWANs for battery lifetime, capacity, range, and cost. LoRa and LoRaWAN permit long-range connectivity for IoT devices in different types of industries. According to Wikipedia, LoRaWAN defines the communication protocol and system architecture for the network, while the LoRa physical layer enables the long-range communication link.

AWS IoT

AWS describes AWS IoT as a set of managed services that enable ‘internet-connected devices to connect to the AWS Cloud and lets applications in the cloud interact with internet-connected devices.’ AWS IoT services span three categories: Device Software, Connectivity and Control, and Analytics.

In this post, we will focus on three AWS IOT services, one from each category, including AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics. According to AWS, the AWS IoT Device SDKs include open-source libraries and developer and porting guides with samples to help you build innovative IoT products or solutions on your choice of hardware platforms. AWS IoT Core is a managed cloud service that lets connected devices easily and securely interact with cloud applications and other devices. AWS IoT Core can process and route messages to AWS endpoints and other devices reliably and securely. Finally, AWS IoT Analytics is a fully-managed IoT analytics service, designed specifically for IoT, which collects, pre-processes, enriches, stores, and analyzes IoT device data at scale.

To learn more about AWS IoT, specifically the AWS IoT services we will be exploring within this post, I recommend reading my recent post published on Towards Data Science, Getting Started with IoT Analytics on AWS.

Hardware Selection

In this post, we will use the following hardware.

IoT Device with Embedded Sensors

An Arduino single-board microcontroller will serve as our IoT device. The 3.3V AI-enabled Arduino Nano 33 BLE Sense board (Amazon: USD 36.00), released in August 2019, comes with the powerful nRF52840 processor from Nordic Semiconductors, a 32-bit ARM Cortex-M4 CPU running at 64 MHz, 1MB of CPU Flash Memory, 256KB of SRAM, and a NINA-B306 stand-alone Bluetooth 5 low energy (BLE) module.

The Sense contains an impressive array of embedded sensors:

  • 9-axis Inertial Sensor (LSM9DS1): 3D digital linear acceleration sensor, a 3D digital
    angular rate sensor, and a 3D digital magnetic sensor
  • Humidity and Temperature Sensor (HTS221): Capacitive digital sensor for relative humidity and temperature
  • Barometric Sensor (LPS22HB): MEMS nano pressure sensor: 260–1260 hectopascal (hPa) absolute digital output barometer
  • Microphone (MP34DT05): MEMS audio sensor omnidirectional digital microphone
  • Gesture, Proximity, Light Color, and Light Intensity Sensor (APDS9960): Advanced Gesture detection, Proximity detection, Digital Ambient Light Sense (ALS), and Color Sense (RGBC).

The Arduino Sense is an excellent, low-cost single-board microcontroller for learning about the collection and transmission of IoT sensor data.

IoT Gateway

An IoT Gateway, according to TechTarget, is a physical device or software program that serves as the connection point between the Cloud and controllers, sensors, and intelligent devices. All data moving to the Cloud, or vice versa, goes through the gateway, which can be either a dedicated hardware appliance or software program.

LoRa Gateways, to paraphrase The Things Network, form the bridge between devices and the Cloud. Devices use low power networks like LoRaWAN to connect to the Gateway, while the Gateway uses high bandwidth networks like WiFi, Ethernet, or Cellular to connect to the Cloud.

A third-generation Raspberry Pi 3 Model B+ single-board computer (SBC) will serve as our LoRa IoT Gateway. This Raspberry Pi model features a 1.4GHz Cortex-A53 (ARMv8) 64-bit quad-core processor System on a Chip (SoC), 1GB LPDDR2 SDRAM, dual-band wireless LAN, Bluetooth 4.2 BLE, and Gigabit Ethernet (Amazon: USD 42.99).

LoRa Transceiver Modules

To transmit the IoT sensor data between the IoT device, containing the embedded sensors, and the IoT gateway, I have used the REYAX RYLR896 LoRa transceiver module (Amazon: USD 19.50 x 2). The transceiver modules are commonly referred to as a universal asynchronous receiver-transmitter (UART). A UART is a computer hardware device for asynchronous serial communication in which the data format and transmission speeds are configurable.

According to the manufacturer, REYAX, the RYLR896 contains the Semtech SX1276 long-range, low power transceiver. The RYLR896 module provides ultra-long range spread spectrum communication and high interference immunity while minimizing current consumption. Each RYLR896 module contains a small, PCB integrated, helical antenna. This transceiver operates at both the 868 and 915 MHz frequency ranges. In this demonstration, we will be transmitting at 915 MHz for North America.

The Arduino Sense (IoT device) transmits data, using one of the RYLR896 modules (shown below front). The Raspberry Pi (IoT Gateway), connected to the other RYLR896 module (shown below rear), receives the data.

LoRaWAN Security

The RYLR896 is capable of AES 128-bit data encryption. Using the Advanced Encryption Standard (AES), we will encrypt the data sent from the IoT device to the IoT gateway, using a 32 hex digit password (128 bits / 4 bits/hex digit).

Provisioning AWS Resources

To start, we will create the necessary AWS IoT and associated resources on the AWS cloud platform. Once these resources are in place, we can then proceed to configure the IoT device and IoT gateway to securely transmit the sensor data to the Cloud.

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
  –branch master –single-branch –depth 1 –no-tags \
  https://github.com/garystafford/aws-iot-analytics-demo.git

AWS CloudFormation

The CloudFormation template, iot-analytics.yaml, will create an AWS IoT CloudFormation stack containing the following resources.

  • AWS IoT Thing
  • AWS IoT Thing Policy
  • AWS IoT Core Topic Rule
  • AWS IoT Analytics Channel, Pipeline, Data store, and Data set
  • AWS Lambda and Lambda Permission
  • Amazon S3 Bucket
  • Amazon SageMaker Notebook Instance
  • AWS IAM Roles

Please be aware of the costs involved with the AWS resources used in the CloudFormation template before continuing. To create the AWS CloudFormation stack from the included CloudFormation template, execute the following AWS CLI command.

aws cloudformation create-stack \
–stack-name lora-iot-demo \
–template-body file://cloudformation/iot-analytics.yaml \
–parameters ParameterKey=ProjectName,ParameterValue=lora-iot-demo \
ParameterKey=IoTTopicName,ParameterValue=lora-iot-demo \
–capabilities CAPABILITY_NAMED_IAM

The resulting CloudFormation stack should contain 16 AWS resources.

Additional Resources

Unfortunately, AWS CloudFormation cannot create all the AWS IoT resources we require for this demonstration. To complete the AWS provisioning process, execute the following series of AWS CLI commands, aws_cli_commands.md. These commands will create the remaining resources, including an AWS IoT Thing Type, Thing Group, Thing Billing Group, and an X.509 Certificate.

# LoRaWAN / AWS IoT Demo
# Author: Gary Stafford
# Run AWS CLI commands after CloudFormation stack completes successfully
# variables
thingName=lora-iot-gateway-01
thingPolicy=LoRaDevicePolicy
thingType=LoRaIoTGateway
thingGroup=LoRaIoTGateways
thingBillingGroup=LoRaIoTGateways
mkdir ${thingName}
aws iot create-keys-and-certificate \
–certificate-pem-outfile "${thingName}/${thingName}.cert.pem" \
–public-key-outfile "${thingName}/${thingName}.public.key" \
–private-key-outfile "${thingName}/${thingName}.private.key" \
–set-as-active
# assuming you only have one certificate registered
certificate=$(aws iot list-certificates | jq '.[][] | .certificateArn')
## alternately, for a specific certificate if you have more than one
# aws iot list-certificates
## then change the value below
# certificate=arn:aws:iot:us-east-1:123456789012:cert/<certificate>
aws iot attach-policy \
–policy-name $thingPolicy \
–target $certificate
aws iot attach-thing-principal \
–thing-name $thingName \
–principal $certificate
aws iot create-thing-type \
–thing-type-name $thingType \
–thing-type-properties "thingTypeDescription=LoRaWAN IoT Gateway"
aws iot create-thing-group \
–thing-group-name $thingGroup \
–thing-group-properties "thingGroupDescription=\"LoRaWAN IoT Gateway Thing Group\", attributePayload={attributes={Manufacturer=RaspberryPiFoundation}}"
aws iot add-thing-to-thing-group \
–thing-name $thingName \
–thing-group-name $thingGroup
aws iot create-billing-group \
–billing-group-name $thingBillingGroup \
–billing-group-properties "billingGroupDescription=\"Gateway Billing Group\""
aws iot add-thing-to-billing-group \
–thing-name $thingName \
–billing-group-name $thingBillingGroup
aws iot update-thing \
–thing-name $thingName \
–thing-type-name $thingType \
–attribute-payload "{\"attributes\": {\"GatewayMfr\":\"RaspberryPiFoundation\", \"LoRaMfr\":\"REYAX\", \"LoRaModel\":\"RYLR896\"}}"
aws iot describe-thing \
–thing-name $thingName
view raw aws_cli_commands.sh hosted with ❤ by GitHub

IoT Device Configuration

With the AWS resources deployed, we can configure the IoT device and IoT Gateway.

Arduino Sketch

For those not familiar with Arduino, a sketch is the name that Arduino uses for a program. It is the unit of code that is uploaded into non-volatile flash memory and runs on an Arduino board. The Arduino language is a set of C and C++ functions. All standard C and C++ constructs supported by the avr-g++ compiler should work in Arduino.

For this post, the sketch, lora_iot_demo_aws.ino, contains the code necessary to collect and securely transmit the environmental sensor data, including temperature, relative humidity, barometric pressure, Red, Green, and Blue (RGB) color, and ambient light intensity, using the LoRaWAN protocol.

/*
Description: Transmits Arduino Nano 33 BLE Sense sensor telemetry over LoRaWAN,
including temperature, humidity, barometric pressure, and color,
using REYAX RYLR896 transceiver modules
http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
Author: Gary Stafford
*/
#include <Arduino_HTS221.h>
#include <Arduino_LPS22HB.h>
#include <Arduino_APDS9960.h>
const int UPDATE_FREQUENCY = 5000; // update frequency in ms
const float CALIBRATION_FACTOR = –4.0; // temperature calibration factor (Celsius)
const int ADDRESS = 116;
const int NETWORK_ID = 6;
const String PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF";
const String DELIMITER = "|";
String uid = "";
void setup()
{
Serial.begin(9600);
Serial1.begin(115200); // default baud rate of module is 115200
delay(1000); // wait for LoRa module to be ready
// get unique transceiver id to identify iot device on network
Serial1.print((String)"AT+UID?\r\n");
uid = Serial1.readString();
uid.replace("+UID=", ""); // trim off '+UID=' at start of line
uid.replace("\r\n", ""); // trim off CR/LF at end of line
// needs all need to be same for receiver and transmitter
Serial1.print((String)"AT+ADDRESS=" + ADDRESS + "\r\n");
delay(200);
Serial1.print((String)"AT+NETWORKID=" + NETWORK_ID + "\r\n");
delay(200);
Serial1.print("AT+CPIN=" + PASSWORD + "\r\n");
delay(200);
Serial1.print("AT+CPIN?\r\n"); // confirm password is set
if (!HTS.begin())
{ // initialize HTS221 sensor
Serial.println("Failed to initialize humidity temperature sensor!");
while (1);
}
if (!BARO.begin())
{ // initialize LPS22HB sensor
Serial.println("Failed to initialize pressure sensor!");
while (1);
}
// avoid bad readings to start bug
// https://forum.arduino.cc/index.php?topic=660360.0
BARO.readPressure();
delay(1000);
if (!APDS.begin())
{ // initialize APDS9960 sensor
Serial.println("Failed to initialize color sensor!");
while (1);
}
}
void loop()
{
updateReadings();
delay(UPDATE_FREQUENCY);
}
void updateReadings()
{
float temperature = getTemperature(CALIBRATION_FACTOR);
float humidity = getHumidity();
float pressure = getPressure();
int colors[4];
getColor(colors);
String payload = buildPayload(temperature, humidity, pressure, colors);
Serial.println("Payload: " + payload); // display the payload for debugging
Serial1.print(payload); // send the payload over LoRaWAN WiFi
displayResults(temperature, humidity, pressure, colors); // display the results for debugging
}
float getTemperature(float calibration)
{
return HTS.readTemperature() + calibration;
}
float getHumidity()
{
return HTS.readHumidity();
}
float getPressure()
{
return BARO.readPressure();
}
void getColor(int c[])
{
// check if a color reading is available
while (!APDS.colorAvailable())
{
delay(5);
}
int r, g, b, a;
APDS.readColor(r, g, b, a);
c[0] = r;
c[1] = g;
c[2] = b;
c[3] = a;
}
// display for debugging purposes
void displayResults(float t, float h, float p, int c[])
{
Serial.println((String)"UID: " + uid);
Serial.print("Temperature: ");
Serial.println(t);
Serial.print("Humidity: ");
Serial.println(h);
Serial.print("Pressure: ");
Serial.println(p);
Serial.print("Color (r, g, b, a): ");
Serial.print(c[0]);
Serial.print(", ");
Serial.print(c[1]);
Serial.print(", ");
Serial.print(c[2]);
Serial.print(", ");
Serial.println(c[3]);
Serial.println("———-");
}
String buildPayload(float t, float h, float p, int c[])
{
String readings = "";
readings += uid;
readings += DELIMITER;
readings += t;
readings += DELIMITER;
readings += h;
readings += DELIMITER;
readings += p;
readings += DELIMITER;
readings += c[0];
readings += DELIMITER;
readings += c[1];
readings += DELIMITER;
readings += c[2];
readings += DELIMITER;
readings += c[3];
String payload = "";
payload += "AT+SEND=";
payload += ADDRESS;
payload += ",";
payload += readings.length();
payload += ",";
payload += readings;
payload += "\r\n";
return payload;
}

AT Commands

Communications with the RYLR896’s long-range modem is done using AT commands. AT commands are instructions used to control a modem. AT is the abbreviation of ATtention. Every command line starts with AT. That is why modem commands are called AT commands, according to Developer’s Home. A complete list of AT commands can be downloaded as a PDF from the RYLR896 product page.

To efficiently transmit the environmental sensor data from the IoT sensor to the IoT gateway, the sketch concatenates the sensor ID and the sensor values together in a single string. The string will be incorporated into an AT command, sent to the RYLR896 LoRa transceiver module. To make it easier to parse the sensor data on the IoT gateway, we will delimit the sensor values with a pipe (|), as opposed to a comma. According to REYAX, the maximum length of the LoRa payload is approximately 330 bytes.

Below, we see an example of an AT command used to send the sensor data from the IoT sensor and the corresponding unencrypted data received by the IoT gateway. Both contain the LoRa transmitter Address ID, payload length (62 bytes in the example), and the payload. The data received by the IoT gateway also has the Received signal strength indicator (RSSI), and Signal-to-noise ratio (SNR).

Receiving Data on IoT Gateway

The Raspberry Pi will act as a LoRa IoT gateway, receiving the environmental sensor data from the IoT device, the Arduino, and sending the data to AWS. The Raspberry Pi runs a Python script, rasppi_lora_receiver_aws.py, which will receive the data from the Arduino Sense, decrypt the data, parse the sensor values, and serialize the data to a JSON payload, and finally, transmit the payload in an MQTT-protocol message to AWS. The script uses the pyserial, the Python Serial Port Extension, which encapsulates the access for the serial port for communication with the RYLR896 module. The script uses the AWS IoT Device SDK for Python v2 to communicate with AWS.

import json
import logging
import sys
import threading
import time
from argparse import ArgumentParser
import serial
from awscrt import io, mqtt, auth, http, exceptions
from awsiot import mqtt_connection_builder
# LoRaWAN IoT Sensor Demo
# Using REYAX RYLR896 transceiver modules
# http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
# Author: Gary Stafford
# Requirements: python3 -m pip install –user -r requirements.txt
# Usage:
# sh ./rasppi_lora_receiver_aws.sh \
# a1d0wxnxn1hs7m-ats.iot.us-east-1.amazonaws.com
# constants
ADDRESS = 116
NETWORK_ID = 6
PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF"
# global variables
count = 0 # from args
received_count = 0
received_all_event = threading.Event()
def main():
# get args
logging.basicConfig(filename='output.log',
filemode='w', level=logging.DEBUG)
args = get_args() # get args
payload = ""
lora_payload = {}
# set log level
io.init_logging(getattr(io.LogLevel, args.verbosity), 'stderr')
# spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
# set MQTT connection
mqtt_connection = set_mqtt_connection(args, client_bootstrap)
logging.debug("Connecting to {} with client ID '{}'…".format(
args.endpoint, args.client_id))
connect_future = mqtt_connection.connect()
# future.result() waits until a result is available
connect_future.result()
logging.debug("Connecting to REYAX RYLR896 transceiver module…")
serial_conn = serial.Serial(
port=args.tty,
baudrate=int(args.baud_rate),
timeout=5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
if serial_conn.isOpen():
logging.debug("Connected!")
set_lora_config(serial_conn)
check_lora_config(serial_conn)
while True:
# read data from serial port
serial_payload = serial_conn.readline()
logging.debug(serial_payload)
if len(serial_payload) >= 1:
payload = serial_payload.decode(encoding="utf-8")
payload = payload[:2]
try:
data = parse_payload(payload)
lora_payload = {
"ts": time.time(),
"data": {
"device_id": str(data[0]),
"gateway_id": str(args.gateway_id),
"temperature": float(data[1]),
"humidity": float(data[2]),
"pressure": float(data[3]),
"color": {
"red": float(data[4]),
"green": float(data[5]),
"blue": float(data[6]),
"ambient": float(data[7])
}
}
}
logging.debug(lora_payload)
except IndexError:
logging.error("IndexError: {}".format(payload))
except ValueError:
logging.error("ValueError: {}".format(payload))
# publish mqtt message
message_json = json.dumps(
lora_payload,
sort_keys=True,
indent=None,
separators=(',', ':'))
try:
mqtt_connection.publish(
topic=args.topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE)
except mqtt.SubscribeError as err:
logging.error(".SubscribeError: {}".format(err))
except exceptions.AwsCrtError as err:
logging.error("AwsCrtError: {}".format(err))
def set_mqtt_connection(args, client_bootstrap):
if args.use_websocket:
proxy_options = None
if args.proxy_host:
proxy_options = http.HttpProxyOptions(
host_name=args.proxy_host, port=args.proxy_port)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(
client_bootstrap)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=args.endpoint,
client_bootstrap=client_bootstrap,
region=args.signing_region,
credentials_provider=credentials_provider,
websocket_proxy_options=proxy_options,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
else:
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=args.endpoint,
cert_filepath=args.cert,
pri_key_filepath=args.key,
client_bootstrap=client_bootstrap,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
return mqtt_connection
def get_args():
parser = ArgumentParser(
description="Send and receive messages through and MQTT connection.")
parser.add_argument("–tty", required=True,
help="serial tty", default="/dev/ttyAMA0")
parser.add_argument("–baud-rate", required=True,
help="serial baud rate", default=1152000)
parser.add_argument('–endpoint', required=True, help="Your AWS IoT custom endpoint, not including a port. " +
"Ex: \"abcd123456wxyz-ats.iot.us-east-1.amazonaws.com\"")
parser.add_argument('–cert', help="File path to your client certificate, in PEM format.")
parser.add_argument('–key', help="File path to your private key, in PEM format.")
parser.add_argument('–root-ca', help="File path to root certificate authority, in PEM format. " +
"Necessary if MQTT server uses a certificate that's not already in " +
"your trust store.")
parser.add_argument('–client-id', default='samples-client-id',
help="Client ID for MQTT connection.")
parser.add_argument('–topic', default="samples/test",
help="Topic to subscribe to, and publish messages to.")
parser.add_argument('–message', default="Hello World!", help="Message to publish. " +
"Specify empty string to publish nothing.")
parser.add_argument('–count', default=0, type=int, help="Number of messages to publish/receive before exiting. " +
"Specify 0 to run forever.")
parser.add_argument('–use-websocket', default=False, action='store_true',
help="To use a websocket instead of raw mqtt. If you specify this option you must "
"specify a region for signing, you can also enable proxy mode.")
parser.add_argument('–signing-region', default='us-east-1',
help="If you specify –use-web-socket, this is the region that will be used for computing "
"the Sigv4 signature")
parser.add_argument('–proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " +
"you will likely need to set –root-ca to the ca for your proxy.")
parser.add_argument('–proxy-port', type=int, default=8080,
help="Port for proxy to connect to.")
parser.add_argument('–verbosity', choices=[x.name for x in io.LogLevel], default=io.LogLevel.NoLogs.name,
help='Logging level')
parser.add_argument("–gateway-id", help="IoT Gateway serial number")
args = parser.parse_args()
return args
def parse_payload(payload):
# input: +RCV=116,29,0447383033363932003C0034|23.94|37.71|99.89|16|38|53|80,-61,56
# output: [0447383033363932003C0034, 23.94, 37.71, 99.89, 16.0, 38.0, 53.0, 80.0]
payload = payload.split(",")
payload = payload[2].split("|")
payload = [i for i in payload]
return payload
def set_lora_config(serial_conn):
# configures the REYAX RYLR896 transceiver module
serial_conn.write(str.encode("AT+ADDRESS=" + str(ADDRESS) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID=" + str(NETWORK_ID) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network Id set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN=" + PASSWORD + "\r\n"))
time.sleep(1)
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES-128 password set? {}".format(serial_payload.decode(encoding="utf-8")))
def check_lora_config(serial_conn):
serial_conn.write(str.encode("AT?\r\n"))