Amazon QuickSight Identity Federation with Auth0: Managing QuickSight users with a third-party enterprise identity provider (IdP)

Introduction

As a Solutions Architect working with Analytics customers, I am often asked about integrating Amazon QuickSight with Active Directory or single sign-on with third-party identity providers for user management.

Amazon QuickSight

Amazon QuickSight, according to AWS, is a scalable, serverless, embeddable, machine learning-powered business intelligence (BI) service built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include machine learning-powered insights. QuickSight dashboards can be accessed from any device and seamlessly embedded into your applications, portals, and websites.

Auth0

Auth0 is an easy-to-implement, adaptable authentication and authorization platform. Auth0’s identity and management platform, according to Auth0, provides greater control, superior security, and ease of use. Single Sign-On (SSO), whether through enterprise federation, social log-in, or username and password authentication, according to Auth0, allows users to simply log in once and use all applications they have been granted access to.

Identity Federation

According to AWS, Amazon QuickSight supports identity federation in both Standard and Enterprise editions. With federated identities, you manage users with your enterprise identity provider (IdP) and use AWS Identity and Access Management (IAM) to authenticate users when they sign in to Amazon QuickSight. You can use a third-party identity provider that supports Security Assertion Markup Language 2.0 (SAML 2.0) to provide a simple onboarding flow for your QuickSight users. Such identity providers include Microsoft Active Directory Federation Services (AD FS), Okta, Ping Identity, Duo, Azure AD, and Auth0.

With identity federation, your users get one-click access to their Amazon QuickSight applications using their existing identity credentials. You also have the security benefit of identity authentication by your identity provider. You can control which users have access to Amazon QuickSight using your existing identity provider. Authenticated users can log directly into QuickSight, bypassing the AWS Management Console.

Initiating Sign-On from Amazon QuickSight

In this post’s scenario, a user initiates the sign-on process from the Amazon QuickSight application portal without being signed on to Auth0, the identity provider. The user has an existing federated account managed by Auth0. The user may or may not already have an account on QuickSight. QuickSight sends an authentication request to the IdP, Auth0. After the user is successfully authenticated, QuickSight opens.

For this post, we will assume that you have signed up for the Enterprise Edition of Amazon QuickSight and chosen Use Role Based Federation (SSO) as opposed to Use Active Directory. The Use Role Based Federation (SSO) option will allow us to configure a third-party IdP for identity authentication.

Auth0 Users and Roles

In Auth0’s User Management interface, create three users and their associated roles representing three QuickSight personas: Admin, Author, and Reader. For demonstration purposes, I chose to name the three users based on their QuickSight personas: QuickSightAdmin1, QuickSightAuthor1, and QuickSightReader1.

Next, create three roles: QuickSight-Admin-Role, QuickSight-Author-Role, and QuickSight-Reader-Role. Role names are arbitrary as long as they are identical to the equivalent IAM roles in AWS (created later in the post).

Associate each user with their corresponding role, one user per role. For example, associate the QuickSightDemoAdmin1 user with the QuickSight-Admin-Role role.

Auth0 Application

Next, in Auth0’s Application interface, create a new Regular Web Application. Name the new application, Amazon QuickSight.

On the new application’s Addons tab, enable the SAML2 Web App option.

On the SAML2 Web App Addon’s Settings tab, set the Application Callback URL value to https://signin.aws.amazon.com/saml. Change the JSON blob value for Settings to the following:

{
"audience": "urn:amazon:webservices"
}

The final configuration should match the example shown below.

Switch to the Usage tab. Download the Identity Provider Metadata XML file and note the Identity Provider Login URL value. You will need the metadata file and the URL to configure QuickSight later in the post.

Next, on the Connections tab of the new Amazon QuickSight application, ensure only Username-Password-Authentication is enabled.

Lastly, on the Settings tab, in the Application URIs configuration section, ensure the Allowed Callback URLs value is also set to https://signin.aws.amazon.com/saml.

Auth Pipeline Rule

Next, in Auth0’s Auth Pipeline Rules interface, create a new Empty rule.

Name the new rule: Change QuickSight SAML configuration.

For the Script field value, use the following JavaScript code snippet.

function changeSamlConfiguration(user, context, callback) {
if (context.clientID !== '<your_web_client_id>')
return callback(null, user, context);
const assignedRoles = (context.authorization || {}).roles;
const accountId = '<your_aws_account_id>';
const provider = 'saml-provider/Auth0';
user.awsRole = 'arn:aws:iam::' + accountId + ':role/' + assignedRoles[0] +
',arn:aws:iam::' + accountId + ':' + provider;
user.quickSightUser = user.name.replace(/@.*/, '');
context.samlConfiguration.mappings = {
'https://aws.amazon.com/SAML/Attributes/Role': 'awsRole',
'https://aws.amazon.com/SAML/Attributes/RoleSessionName': 'quickSightUser',
};
callback(null, user, context);
}

Replace the <your_web_client_id> placeholder with the Client ID of the Amazon QuickSight regular web application you created previously. The Client ID is listed in the Application interface, alongside the application’s name. Also, replace the <your_aws_account_id> placeholder with your twelve digital AWS account Id.

This rule will be used to modify the SAML assertion, as shown in the SAML assertion snippet example below, returned by Auth0 as part of the authentication process. The rule will inject the Amazon Resource Name (ARN) of the IAM role, to which the Auth0 user should be associated: QuickSight-Admin-Role, QuickSight-Author-Role, or QuickSight-Reader-Role. Note that the rule assumes one role per user. Additional logic would be required if the user is assigned to multiple roles.

AWS IAM Identity Provider

Back in the AWS Management Console, add an AWS IAM Identity Provider for Auth0. From the IAM console’s Identity providers interface, click Add provider. For Provider type choose SAML. Name the provider, Auth0. Click Choose file in the Metadata document section. Select the metadata document you downloaded earlier from Auth0. Click Add provider to finish.

The resulting Auth0 IAM Identity provider should be similar to the below example.

AWS IAM Policies and Roles

Next, create three AWS IAM roles that correspond to the three QuickSight personas of Administrator, Author, and Reader. These three roles also correspond to the three Auth0 roles we created previously. The Auth0 user will pass the Auth0 role name in the SAML document. The Auth0 role name will correspond to the IAM role. The IAM role defines the permissions the Auth0 user will have for QuickSight. We can associate many Auth0 users to one Auth0 Role and correspondingly with one IAM role.

First, create three IAM policies: QuickSight-Admin-Policy, QuickSight-Author-Policy, and QuickSight-Reader-Policy. These policies will each be associated with a corresponding IAM role. Create the policy, QuickSight-Admin-Policy. Replace the <your_aws_account_id> placeholder with your twelve digital AWS account Id.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "quicksight:CreateAdmin",
"Resource": "arn:aws:quicksight::<your_aws_account_id>:user/${aws:userid}"
}
]
}

Then, QuickSight-Author-Policy.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "quicksight:CreateUser",
"Resource": "arn:aws:quicksight::<your_aws_account_id>:user/${aws:userid}"
}
]
}

Finally, QuickSight-Reader-Policy.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "quicksight:CreateReader",
"Resource": "arn:aws:quicksight::<your_aws_account_id>:user/${aws:userid}"
}
]
}

Next, create the three corresponding IAM roles and associate the corresponding IAM policy: QuickSight-Admin-Role (QuickSight-Admin-Policy), QuickSight-Author-Role (QuickSight-Author-Policy), and QuickSight-Reader-Role (QuickSight-Reader-Policy).

The role’s Trust relationships establishes a trust relationship between the role and the Auth0 IAM Identity provider, as shown below.

For each of the three roles, click on Edit trust relationship and modify the access control policy document as shown below. Replace the <your_aws_account_id> placeholder with your twelve digital AWS account Id.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::<your_aws_account_id>:saml-provider/Auth0"
},
"Action": "sts:AssumeRoleWithSAML",
"Condition": {
"StringEquals": {
"saml:aud": "https://signin.aws.amazon.com/saml"
}
}
}
]
}

The role’s access control policy document also includes a Condition policy element. According to AWS IAM documentation, for security reasons, AWS should be included as an audience in the SAML assertion your IdP sends to AWS. For the value of the Audience element, specify either https://signin.aws.amazon.com/saml or urn:amazon:webservices.

We can inspect the SAML assertion returned from Auth0 by decoding the form data in the response payload from Base64 format to XML. Below, we see the SAML assertion using Chrome’s Developer Tools to inspect network activity.

Note that the AWS IAM documentation states that the SAML AudienceRestriction value in the SAML assertion from the IdP does not map to the saml:aud context key that you can test in an IAM policy. Instead, the saml:aud context key comes from the SAML recipient attribute because it is the SAML equivalent to the OIDC audience field, for example, by accounts.google.com:aud. These are shown in the SAML assertion XML snippet, below.

QuickSight IdP Configuration

The last step in integrating QuickSight and Auth0 is to configure QuickSight with Auth0’s specific IdP information. From the QuickSight Management interface, select Single sign-on (SSO). Switch the Status to ON.

Add the IdP URL value. As a reminder, this value came from the Auth0 Amazon QuickSight application’s SAML2 Web App Addon’s Usage tab, the Identity Provider Login URL value (see below). Make sure to copy the actual link associated with the URL shown.

Lastly, for the IdP redirect URL parameter value, use RelayState. Select Save to save the IdP configuration.

Testing Identity Federation

The easiest way to test the integration is to open a new Incognito window and point your browser to https://quicksight.aws.amazon.com. You should be prompted for your QuickSight account name. You created your account name when you signed up for QuickSight (e.g., acme-corp-sales).

Once you have entered your QuickSight account name, you should be redirected to Auth0 and presented with the Amazon QuickSight application’s log-in screen. Enter any of the three Auth0 user’s email addresses and passwords.

If the Auth0 log-in is successful, you will be redirected back and into the QuickSight application portal. If this is the first time the user has logged into QuickSight, you will be prompted for the user’s email address. Use the same email address the user is associated with in Auth0. In this scenario, the user will be self-registered with QuickSight and associated with the default namespace.

Your QuickSight experience and available features will vary, depending on the IAM role associated with the user, either Reader, Author, or Admin.

Close the Incognito browser window to end the current user session. Open a new Incognito browser window and repeat the process with the two remaining users, ensuring each can log-in successfully. Also, ensure the user’s experience in QuickSight matches the associated role, either Reader, Author, or Admin.

User Management

As a QuickSight Admin, log back into QuickSight and open the Manage users interface. All three Auth0 users should be registered with QuickSight, as shown in the example below. The users should be associated with the correct IAM role (column 1, below, left of the forward slash): QuickSight-Admin-Role, QuickSight-Author-Role, and QuickSight-Reader-Role. Users should also be associated with the correct QuickSight role (column 3, below): Reader, Author, or Admin.

Users can have custom permissions applied using the Manage permissions option.

Conclusion

In this post, we learned about how Amazon QuickSight supports identity federation. We learned how to manage users with a third-party enterprise identity provider (IdP), Auth0, and use AWS Identity and Access Management (IAM) to authenticate users when they sign in to Amazon QuickSight.

References


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options

Introduction

For anyone new to Amazon Managed Workflows for Apache Airflow (Amazon MWAA), especially those used to managing their own Apache Airflow platform, Amazon MWAA’s configuration might appear to be a bit of a black box at first. This brief post will explore Amazon MWAA’s configuration — how to inspect it and how to modify it. We will use Airflow DAGs to review an MWAA environment’s airflow.cfg file, environment variables, and Python packages.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project in 2019.

With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. For more information on Amazon MWAA, read my last post, Running Spark Jobs on Amazon EMR with Apache Airflow.

Image for post
Apache Airflow UI

Source Code

The DAGs referenced in this post are available on GitHub. Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/aws-airflow-demo.git

Accessing Configuration

Environment Variables

Environment variables are an essential part of an MWAA environment’s configuration. There are various ways to examine the environment variables. You could use Airflow’s BashOperator to simply call the command, env, or the PythonOperator to call a Python iterator function, as shown below. A sample DAG, dags/get_env_vars.py, is included in the project.

def print_env_vars():
keys = str(os.environ.keys().replace("', '", "'|'").split("|")
keys.sort()
for key in keys:
print(key)
get_env_vars_operator = PythonOperator(
task_id='get_env_vars_task',
python_callable=print_env_vars
)
view raw get_env_vars.py hosted with ❤ by GitHub

The DAG’s PythonOperator will iterate over the MWAA environment’s environment variables and output them to the task’s log. Below is a snippet of an example task’s log.

[2020-12-25 23:59:07,170] {{standard_task_runner.py:78}} INFO – Job 272: Subtask get_env_vars_task
[2020-12-25 23:59:08,423] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-25 23:59:08,516] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONSOLE_LOGS_ENABLED': 'false'
[2020-12-25 23:59:08,689] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONSOLE_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:08,777] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_EMAIL': 'airflow@example.com'
[2020-12-25 23:59:08,877] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_ID': 'get_env_vars'
[2020-12-25 23:59:08,970] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_OWNER': 'airflow'
[2020-12-25 23:59:09,269] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_TASK_ID': 'get_env_vars_task'
[2020-12-25 23:59:09,357] {{logging_mixin.py:112}} INFO – 'AIRFLOW_DAG_PROCESSING_LOGS_ENABLED': 'false'
[2020-12-25 23:59:09,552] {{logging_mixin.py:112}} INFO – 'AIRFLOW_DAG_PROCESSING_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:09,647] {{logging_mixin.py:112}} INFO – 'AIRFLOW_ENV_NAME': 'MyAirflowEnvironment'
[2020-12-25 23:59:09,729] {{logging_mixin.py:112}} INFO – 'AIRFLOW_HOME': '/usr/local/airflow'
[2020-12-25 23:59:09,827] {{logging_mixin.py:112}} INFO – 'AIRFLOW_SCHEDULER_LOGS_ENABLED': 'false'
[2020-12-25 23:59:12,915] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-25 23:59:12,986] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'
[2020-12-25 23:59:13,136] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__LOAD_EXAMPLES': 'False'
[2020-12-25 23:59:13,217] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__PARALLELISM': '10000'
[2020-12-25 23:59:14,531] {{logging_mixin.py:112}} INFO – 'AWS_DEFAULT_REGION': 'us-east-1'
[2020-12-25 23:59:14,565] {{logging_mixin.py:112}} INFO – 'AWS_EXECUTION_ENV': 'AWS_ECS_FARGATE'
[2020-12-25 23:59:14,616] {{logging_mixin.py:112}} INFO – 'AWS_REGION': 'us-east-1'
[2020-12-25 23:59:14,647] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_FILE': ''
[2020-12-25 23:59:14,679] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_LEVEL': '20'
[2020-12-25 23:59:14,711] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_REDIRECT': '1'
[2020-12-25 23:59:14,747] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_REDIRECT_LEVEL': 'WARNING'
view raw airflow_env_vars.txt hosted with ❤ by GitHub

Airflow Configuration File

According to Airflow, the airflow.cfg file contains Airflow’s configuration. You can edit it to change any of the settings. The first time you run Apache Airflow, it creates an airflow.cfg configuration file in your AIRFLOW_HOME directory and attaches the configurations to your environment as environment variables.

Amazon MWAA doesn’t expose the airflow.cfg in the Apache Airflow UI of an environment. Although you can’t access it directly, you can view the airflow.cfg file. The configuration file is located in your AIRFLOW_HOME directory, /usr/local/airflow (~/airflow by default).

There are multiple ways to examine your MWAA environment’s airflow.cfg file. You could use Airflow’s PythonOperator to call a Python function that reads the contents of the file, as shown below. The function uses the AIRFLOW_HOME environment variable to locate and read the airflow.cfg. A sample DAG, dags/get_airflow_cfg.py, is included in the project.

def print_airflow_cfg():
with open(f"{os.getenv('AIRFLOW_HOME')}/airflow.cfg", 'r') as airflow_cfg:
file_contents = airflow_cfg.read()
print(f'\n{file_contents}')
get_airflow_cfg_operator = PythonOperator(
task_id='get_airflow_cfg_task',
python_callable=print_airflow_cfg
)
view raw get_airflow_cfg.py hosted with ❤ by GitHub

The DAG’s task will read the MWAA environment’s airflow.cfg file and output it to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 00:02:57,163] {{standard_task_runner.py:78}} INFO – Job 274: Subtask get_airflow_cfg_task
[2020-12-26 00:02:57,583] {{logging_mixin.py:112}} INFO –
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /usr/local/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Set this to True if you want to enable remote logging.
remote_logging = True
# Users must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id = aws_default
remote_base_log_folder = cloudwatch://arn:aws:logs:::log-group:airflow-logs:*
encrypt_s3_logs = False
# Logging level
logging_level = INFO
# Logging level for Flask-appbuilder UI
fab_logging_level = WARN
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# Example: logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
[aws_mwaa]
redirect_url = https://console.aws.amazon.com/
session_duration_minutes = 720
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080
# Default timezone to display all dates in the RBAC UI, can be UTC, system, or
# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the
# default value of core/default_timezone will be used
# Example: default_ui_timezone = America/New_York
default_ui_timezone = UTC
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080
view raw airflow_cfg_log.txt hosted with ❤ by GitHub

Customizing Airflow Configurations

While AWS doesn’t expose the airflow.cfg in the Apache Airflow UI of your environment, you can change the default Apache Airflow configuration options directly within the Amazon MWAA console and continue using all other settings in airflow.cfg. The configuration options changed in the Amazon MWAA console are translated into environment variables.

To customize the Apache Airflow configuration, change the default options directly on the Amazon MWAA console. Select Edit, add or modify configuration options and values in the Airflow configuration options menu, then select Save. For example, we can change Airflow’s default timezone (core.default_ui_timezone) to America/New_York.

Image for post
Amazon MWAA’s Airflow configuration options

Once the MWAA environment is updated, which may take several minutes, view your changes by re-running the DAG,dags/get_env_vars.py. Note the new configuration item on both lines 2 and 6 of the log snippet shown below. The configuration item appears on its own (AIRFLOW__CORE_DEFAULT__UI_TIMEZONE), as well as part of the AIRFLOW_CONFIG_SECRETS dictionary environment variable.

[2020-12-26 05:00:57,756] {{standard_task_runner.py:78}} INFO – Job 293: Subtask get_env_vars_task
[2020-12-26 05:00:58,158] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONFIG_SECRETS': '{"AIRFLOW__CORE__DEFAULT_UI_TIMEZONE":"America/New_York"}'
[2020-12-26 05:00:58,190] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-26 05:01:00,537] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-26 05:01:00,578] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DEFAULT_UI_TIMEZONE': 'America/New_York'
[2020-12-26 05:01:00,630] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'

Using the MWAA API

We can also make configuration changes using the MWAA API. For example, to change the default Airflow UI timezone, call the MWAA API’s update-environment command using the AWS CLI. Include the --airflow-configuration-option parameter, passing the core.default_ui_timezone key/value pair as a JSON blob.

aws mwaa update-environment \
–name <your_environment_name> \
–airflow-configuration-options """{
\"core.default_ui_timezone\": \"America/Los_Angeles\"
}"""

To review an environment’s configuration, use the get-environment command in combination with jq.

aws mwaa get-environment \
–name <your_environment_name> | \
jq -r '.Environment.AirflowConfigurationOptions'

Below, we see an example of the output.

{
"core.default_ui_timezone": "America/Los_Angeles"
}

Python Packages

Airflow is written in Python, and workflows are created via Python scripts. Python packages are a crucial part of an MWAA environment’s configuration. According to the documentation, an ‘extra package’, is a Python subpackage that is not included in the Apache Airflow base, installed on your MWAA environment. As part of setting up an MWAA environment, you can specify the location of the requirements.txt file in the Airflow S3 bucket. Extra packages are installed using the requirements.txt file.

Image for post
Amazon MWAA environment’s configuration

There are several ways to check your MWAA environment’s installed Python packages and versions. You could use Airflow’s BashOperator to call the command, python3 -m pip list. A sample DAG, dags/get_py_pkgs.py, is included in the project.

list_python_packages_operator = BashOperator(
task_id='list_python_packages',
bash_command='python3 -m pip list'
)
view raw get_py_pkgs.py hosted with ❤ by GitHub

The DAG’s task will output a list of all Python packages and package versions to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 21:53:06,310] {{bash_operator.py:136}} INFO – Temporary script location: /tmp/airflowtmp2whgp_p8/list_python_packagesxo8slhc6
[2020-12-26 21:53:06,350] {{bash_operator.py:146}} INFO – Running command: python3 -m pip list
[2020-12-26 21:53:06,395] {{bash_operator.py:153}} INFO – Output:
[2020-12-26 21:53:06,750] {{bash_operator.py:157}} INFO – Package Version
[2020-12-26 21:53:06,786] {{bash_operator.py:157}} INFO – ———————- ———
[2020-12-26 21:53:06,815] {{bash_operator.py:157}} INFO – alembic 1.4.2
[2020-12-26 21:53:06,856] {{bash_operator.py:157}} INFO – amqp 2.6.1
[2020-12-26 21:53:06,898] {{bash_operator.py:157}} INFO – apache-airflow 1.10.12
[2020-12-26 21:53:06,929] {{bash_operator.py:157}} INFO – apispec 1.3.3
[2020-12-26 21:53:06,960] {{bash_operator.py:157}} INFO – argcomplete 1.12.0
[2020-12-26 21:53:07,002] {{bash_operator.py:157}} INFO – attrs 19.3.0
[2020-12-26 21:53:07,036] {{bash_operator.py:157}} INFO – Babel 2.8.0
[2020-12-26 21:53:07,071] {{bash_operator.py:157}} INFO – billiard 3.6.3.0
[2020-12-26 21:53:07,960] {{bash_operator.py:157}} INFO – boto3 1.16.10
[2020-12-26 21:53:07,993] {{bash_operator.py:157}} INFO – botocore 1.19.10
[2020-12-26 21:53:08,028] {{bash_operator.py:157}} INFO – cached-property 1.5.1
[2020-12-26 21:53:08,061] {{bash_operator.py:157}} INFO – cattrs 1.0.0
[2020-12-26 21:53:08,096] {{bash_operator.py:157}} INFO – celery 4.4.7
[2020-12-26 21:53:08,130] {{bash_operator.py:157}} INFO – certifi 2020.6.20
[2020-12-26 21:53:12,260] {{bash_operator.py:157}} INFO – pandas 1.1.0
[2020-12-26 21:53:12,289] {{bash_operator.py:157}} INFO – pendulum 1.4.4
[2020-12-26 21:53:12,490] {{bash_operator.py:157}} INFO – pip 9.0.3
[2020-12-26 21:53:12,522] {{bash_operator.py:157}} INFO – prison 0.1.3
[2020-12-26 21:53:12,550] {{bash_operator.py:157}} INFO – prometheus-client 0.8.0
[2020-12-26 21:53:12,580] {{bash_operator.py:157}} INFO – psutil 5.7.2
[2020-12-26 21:53:12,613] {{bash_operator.py:157}} INFO – pycparser 2.20
[2020-12-26 21:53:12,641] {{bash_operator.py:157}} INFO – pycurl 7.43.0.5
[2020-12-26 21:53:12,676] {{bash_operator.py:157}} INFO – Pygments 2.6.1
[2020-12-26 21:53:12,710] {{bash_operator.py:157}} INFO – PyGreSQL 5.2.1
[2020-12-26 21:53:12,746] {{bash_operator.py:157}} INFO – PyJWT 1.7.1

Conclusion

Understanding your Amazon MWAA environment’s airflow.cfg file, environment variables, and Python packages are all important for proper Airflow platform management. This brief post learned more about Amazon MWAA’s configuration — how to inspect it using DAGs and how to modify it through the Amazon MWAA console.

, , , ,

Leave a comment

Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS

Introduction

In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.

Amazon EMR

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache SparkHiveHBaseFlinkHudi, and ZeppelinJupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.

Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation, while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache Airflow’s UI

Apache recently announced the release of Airflow 2.0.0 on December 17, 2020. The latest 1.x version of Airflow is 1.10.14, released December 12, 2020. However, at the time of this post, Amazon MWAA was running Airflow 1.10.12, released August 25, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow 1.10.12 documentation.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.

Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the HooksSecretsSensors, and Operators of Airflow codebase’s contrib section.

Getting Started

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/aws-airflow-demo.git

Preliminary Steps

This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.

Configuring Amazon MWAA

The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Amazon MWAA Environment Creation Process

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12.

Amazon MWAA Environment Creation Process

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-. You must also enable Bucket Versioning on the bucket. Specify a dags folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

Amazon MWAA Environment Creation Process

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

Amazon MWAA Environment Creation Process

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

AWS CloudFormation Create Stack Console

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.

AWS CloudFormation Create Stack Console
Amazon MWAA Environment Creation Process

As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.

You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.

Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small class will be sufficient for this demonstration.

Amazon MWAA Environment Creation Process

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Amazon MWAA Environment Creation Process

Airflow Execution Role

As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole and EMR_EC2_DefaultRole. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole and EMR_EC2_DemoRole. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject permission.

Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:RunJobFlow"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::123412341234:role/EMR_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DefaultRole",
"arn:aws:iam::123412341234:role/EMR_DefaultRole"
]
}
]
}

The Airflow service role, created by MWAA, is shown below with the new policy attached.

Airflow Execution Service Role with the new Policy Attached

Final Architecture

Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Demonstration’s Amazon MWAA and Amazon EMR Architecture

Amazon MWAA Environment

The new MWAA Environment will include a link to the Airflow UI.

Amazon MWAA Environment Console

Airflow UI

Using the supplied link, you should be able to access the Airflow UI using your web browser.

Apache Airflow UI

Our First DAG

The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.utils.dates import days_ago
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': 'emr-6.2.0',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw spark_pi_example.py hosted with ❤ by GitHub

Upload the DAG to the Airflow S3 bucket’s dags directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.

aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/

The DAG, spark_pi_example, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

Apache Airflow UI’s Trigger DAG Page

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Apache Airflow UI’s DAG Graph View

Switching to the EMR Console, you should see the single-node EMR cluster being created.

Amazon EMR Console’s Summary tab

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Amazon EMR Console’s Steps tab

Triggering DAGs Programmatically

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.

Below is an example of triggering the spark_pi_example DAG programmatically using Airflow’s trigger_dag CLI command. You will need to replace the WEB_SERVER_HOSTNAME variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME variable assumes only one MWAA environment is returned by jq.

export WEB_SERVER_HOSTNAME="<your_airflow_web_server.us-east-1.airflow.amazonaws.com>"
export ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
export DAG_NAME=spark_pi_example
aws mwaa create-cli-token –name "${ENVIRONMENT_NAME}" | \
export CLI_TOKEN=$(jq -r .CliToken)
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME}"
view raw trigger_dag.sh hosted with ❤ by GitHub

Analytics Job with Airflow

Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py PySpark application. This job should already exist in the processed data S3 bucket.

The DAG, dags/bakery_sales.py, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators and airflow.contrib.sensors packages for EMR.

Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
SPARK_STEPS = [
{
'Name': 'Bakery Sales',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'–deploy-mode',
'cluster',
'–master',
'yarn',
'–conf',
'spark.yarn.submit.waitAppCompletion=true',
's3a://{{ var.value.work_bucket }}/analyze/bakery_sales_ssm.py'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': '{{ var.value.release_label }}',
'LogUri': 's3n://{{ var.value.logs_bucket }}',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.xlarge',
},
],
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': '{{ var.value.emr_ec2_key_pair }}',
},
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh',
}
},
],
'Configurations': [
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
'VisibleToAllUsers': True,
'JobFlowRole': '{{ var.value.job_flow_role }}',
'ServiceRole': '{{ var.value.service_role }}',
'EbsRootVolumeSize': 32,
'StepConcurrencyLevel': 1,
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Analyze Bakery Sales with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw bakery_sales.py hosted with ❤ by GitHub

Import Variables into Airflow UI

First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json file. You will need to update the values for bootstrap_bucket, emr_ec2_key_pair, logs_bucket, and work_bucket. The three S3 buckets should all exist from the previous post.

{
"bootstrap_bucket": "emr-demo-bootstrap-123412341234-us-east-1",
"emr_ec2_key_pair": "emr-demo-123412341234-us-east-1",
"job_flow_role": "EMR_EC2_DemoRole",
"logs_bucket": "emr-demo-logs-123412341234-us-east-1",
"release_label": "emr-6.2.0",
"service_role": "EMR_DemoRole",
"work_bucket": "emr-demo-work-123412341234-us-east-1",
"ec2_subnet_id": "subnet-012abc456efg78900"
}

Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Apache Airflow UI’s Admin > Variables tab

Upload the DAG, dags/bakery_sales.py, to the Airflow S3 bucket, similar to the first DAG.

aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/

The second DAG, bakery_sales, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json.

{
    "airflow_email": "analytics_team@example.com",
    "email_on_failure": false,
    "email_on_retry": false
}

This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Apache Airflow UI’s Trigger DAG Screen

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Amazon EMR Console’s Steps tab

Multi-Step DAG

In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator steps parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator job_flow_overrides parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.

We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.

Below we see a snippet of two of the four Spark submit-job job definitions (steps), which have been moved to a separate JSON file, emr_steps/emr_steps.json.

[
{
"Name": "Movie Ratings",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/movies_avg_ratings_ssm.py",
"–start-date",
"2016-01-01 00:00:00",
"–end-date",
"2016-12-31 23:59:59"
]
}
},
{
"Name": "Stock Volatility",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/stock_volatility_ssm.py",
"–start-date",
"2017-01-01",
"–end-date",
"2018-12-31"
]
}
}
]
view raw emr_steps.json hosted with ❤ by GitHub

Below are the EMR cluster specifications (job_flow_overrides), which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json.

{
"Name": "demo-cluster-airflow",
"ReleaseLabel": "{{ var.value.release_label }}",
"LogUri": "s3n://{{ var.value.logs_bucket }}",
"Applications": [
{
"Name": "Spark"
}
],
"Instances": {
"InstanceFleets": [
{
"Name": "MASTER",
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"Name": "CORE",
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
],
"Ec2SubnetId": "{{ var.value.ec2_subnet_id }}",
"KeepJobFlowAliveWhenNoSteps": false,
"TerminationProtected": false,
"Ec2KeyName": "{{ var.value.emr_ec2_key_pair }}"
},
"BootstrapActions": [
{
"Name": "string",
"ScriptBootstrapAction": {
"Path": "s3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh"
}
}
],
"Configurations": [
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
],
"VisibleToAllUsers": true,
"JobFlowRole": "{{ var.value.job_flow_role }}",
"ServiceRole": "{{ var.value.service_role }}",
"EbsRootVolumeSize": 32,
"StepConcurrencyLevel": 5,
"Tags": [
{
"Key": "Environment",
"Value": "Development"
},
{
"Key": "Name",
"Value": "Airflow EMR Demo Project"
},
{
"Key": "Owner",
"Value": "Data Analytics Team"
}
]
}

Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name), which loads the JSON.

import json
import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
def get_object(key, bucket_name):
"""
Load S3 object as JSON
"""
hook = S3Hook()
content_object = hook.get_key(key=key, bucket_name=bucket_name)
file_content = content_object.get()['Body'].read().decode('utf-8')
return json.loads(file_content)
with DAG(
dag_id=DAG_ID,
description='Run multiple Spark jobs with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval=None,
tags=['emr', 'spark', 'pyspark']
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=get_object('job_flow_overrides/job_flow_overrides.json', work_bucket)
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=get_object('emr_steps/emr_steps.json', work_bucket)
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
cluster_creator >> step_adder >> step_checker
view raw multiple_steps.py hosted with ❤ by GitHub

This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.

aws s3 cp emr_steps/emr_steps.json \
    s3://emr-demo-work-123412341234-us-east-1/emr_steps/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
    s3://emr-demo-work-123412341234-us-east-1/job_flow_overrides/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234-us-east-1/dags/

The second DAG, multiple_steps, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json.

Apache Airflow UI’s DAGs tab

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.

Amazon EMR Console’s Steps tab showing four Steps running in parallel

Triggering DAGs Programmatically

AWS CLI

Similar to the previous example, below we can trigger the multiple_steps DAG programmatically using Airflow’s trigger_dag CLI command. Note the addition of the —-conf named argument, which passes the configuration, containing three key/value pairs, to the trigger command as a JSON blob.

ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
DAG_NAME="multiple_steps"
CONFIG="""'{
\"airflow_email\": \"analytics_team@example.com\",
\"email_on_failure\": true,
\"email_on_retry\": false
}'"""
CLI_JSON=$(aws mwaa create-cli-token –name ${ENVIRONMENT_NAME}) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME} –conf ${CONFIG}"

AWS SDK

Airflow DAGs can also be triggered using the AWS SDK. For example, with boto3 for Python, we could use a script, similar to the following to remotely trigger a DAG.

#!/usr/bin/env python3
# MWAA: Trigger an Apache Airflow DAG using SDK
# Author: Gary A. Stafford (February 2021)
import logging
import boto3
import requests
logging.basicConfig(
format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
mwaa_client = boto3.client('mwaa')
ENVIRONMENT_NAME = 'Your_Airflow_Environment_Name'
DAG_NAME = 'your_dag_name'
CONFIG = '{"foo": "bar"}'
def main():
response = mwaa_client.create_cli_token(
Name=ENVIRONMENT_NAME
)
logging.info('response: ' + str(response))
token = response['CliToken']
url = 'https://{0}/aws_mwaa/cli'.format(response['WebServerHostname'])
headers = {'Authorization': 'Bearer ' + token, 'Content-Type': 'text/plain'}
payload = 'trigger_dag {0} –conf {1}'.format(DAG_NAME, CONFIG)
response = requests.post(url, headers=headers, data=payload)
logging.info('response: ' + str(response)) # should be <Response [200]>
if __name__ == '__main__':
main()
view raw trigger_dag.py hosted with ❤ by GitHub

Cleaning Up

Once you are done with the MWAA Environment, be sure to delete it as soon as possible to save additional costs. Also, delete the MWAA-VPC CloudFormation stack. These resources, like the two NAT Gateways, will also continue to generate additional costs.

aws mwaa delete-environment --name <your_mwaa_environment_name>
aws cloudformation delete-stack --stack-name MWAA-VPC

Conclusion

In this second post in the series, we explored using the newly released Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to run PySpark applications on Amazon Elastic MapReduce (Amazon EMR). In future posts, we will explore the use of Jupyter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.

If you are interested in learning more about configuring Amazon MWAA and Airflow, see my recent post, Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment

Installing Apache Superset on Amazon EMR: Add data exploration and visualization to your analytics cluster

Introduction

AWS provides nearly twenty-five different open-source data analytics applications that can be automatically installed and configured on Amazon Elastic MapReduce (Amazon EMR). Of all those options, EMR doesn’t offer a general-purpose data exploration and visualization tool. However, with EMR, you can automate the installation of additional software as part of the cluster creation process or post cluster creation. This brief post will explore how to install, configure, and access Apache Superset, the modern data exploration and visualization platform on Amazon EMR’s Master Node, as a post-cluster creation step. You can use these same techniques to install other software packages on EMR as well, manually or as part of an automated Data Pipeline.

Amazon EMR

According to AWS, Amazon EMR is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

AWS currently offers 5.x and 6.x versions of Amazon EMR. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0. Each version of Amazon EMR offers incremental major and minor releases of nearly 25 different, popular open-source big-data software packages to choose from, which Amazon EMR will install and configure when the cluster is created.

Apache Superset

According to its website, Apache Superset is a modern data exploration and visualization platform. Superset is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

Superset natively supports over twenty-five data sources, including Amazon Athena and Redshift, Apache Drill, Druid, Hive, Impala, Kylin, Pinot, and Spark SQL, Elasticsearch, Google BigQuery, Hana, MySQL, Oracle, Postgres, Presto, Snowflake, Microsoft SQL Server, and Teradata.

As shown in their Gallery, Superset includes dozens of visualization types, including Pivot Table, Line Chart, Markup, Pie Chart, Filter Box, Bubble Chart, Box Plot, Histogram, Heatmap, Sunburst, Calendar Heatmap, and several geospatial types.

Apache Superset Visualization Gallery

Setup

Using this git clone command, download a copy of this post’s open-source GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-superset-demo.git

To demonstrate how to install Apache Superset on EMR, I have prepared an AWS CloudFormation template. Deploying the template, cloudformation/superset-emr-demo.yml, to AWS will result in the AWS CloudFormation stack, superset-emr-demo-dev. The stack creates a minimally-sized, two-node EMR cluster, two Amazon S3 buckets, and several AWS Systems Manager (SSM) Parameter Store parameters.

There is also a JSON-format CloudFormation parameters file, cloudformation/superset-emr-demo-params-dev.json. The parameters file contains values for eight of the ten required parameters in the CloudFormation template, all of which you can adjust. For the remaining two required parameters, you will need to supply the name of an existing EC2 key pair to access the EMR Master node. The key pair will need to be deployed to the same AWS Account into which you are deploying EMR. You will also need to supply a Subnet ID for the EMR cluster to be installed into. The subnet must have access to the Internet to install Superset’s required system and Python packages and to access Superset’s web-based user interface. If you need help creating a VPC and subnet to deploy EMR into, refer to my previous blog post, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

The CloudFormation stack is created using a Python script, create_cfn_stack.py. The python script uses the AWS boto3 Python SDK.

#!/usr/bin/env python3
# Purpose: Create EMR bootstrap script bucket and deploy the cfn stack
# Author: Gary A. Stafford (December 2020)
# Reference: https://gist.github.com/svrist/73e2d6175104f7ab4d201280acba049c
# Usage Example: python3 ./create_cfn_stack.py \
# –ec2-key-name emr-demo-123456789012-us-east-1 \
# –ec2-subnet-id subnet-06aa61f790a932b32 \
# –environment dev
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
sts_client = boto3.client('sts')
cfn_client = boto3.client('cloudformation')
region = boto3.DEFAULT_SESSION.region_name
s3_client = boto3.client('s3', region_name=region)
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
def main():
args = parse_args()
# create bootstrap bucket
account_id = sts_client.get_caller_identity()['Account']
bootstrap_bucket = f'superset-emr-demo-bootstrap-{account_id}{region}'
create_bucket(bootstrap_bucket)
# upload bootstrap script
dir_path = os.path.dirname(os.path.realpath(__file__))
upload_file(f'{dir_path}/bootstrap_emr/bootstrap_actions.sh', bootstrap_bucket, 'bootstrap_actions.sh')
# set variables
stack_name = f'emr-superset-demo-{args.environment}'
cfn_template_path = f'{dir_path}/cloudformation/superset-emr-demo.yml'
cfn_params_path = f'{dir_path}/cloudformation/superset-emr-demo-params-{args.environment}.json'
ec2_key_name = args.ec2_key_name
# append new parameters
cfn_params = _parse_parameters(cfn_params_path)
cfn_params.append({'ParameterKey': 'Ec2KeyName', 'ParameterValue': ec2_key_name})
cfn_params.append({'ParameterKey': 'Ec2SubnetId', 'ParameterValue': args.ec2_subnet_id})
cfn_params.append({'ParameterKey': 'BootstrapBucket', 'ParameterValue': bootstrap_bucket})
logging.info(json.dumps(cfn_params, indent=4))
# create the cfn stack
create_stack(stack_name, cfn_template_path, cfn_params)
def create_bucket(bootstrap_bucket):
"""Create an S3 bucket in a specified region
:param bootstrap_bucket: Bucket to create
:return: True if bucket created, else False
"""
try:
s3_client.create_bucket(Bucket=bootstrap_bucket)
logging.info(f'New bucket name: {bootstrap_bucket}')
except ClientError as e:
logging.error(e)
return False
return True
def upload_file(file_name, bootstrap_bucket, object_name):
"""Upload a file to an S3 bucket
:param file_name: File to upload
:param bootstrap_bucket: Bucket to upload to
:param object_name: S3 object name
:return: True if file was uploaded, else False
"""
# Upload the file
try:
response = s3_client.upload_file(file_name, bootstrap_bucket, object_name)
logging.info(f'File {file_name} uploaded to bucket {bootstrap_bucket} as object {object_name}')
except ClientError as e:
logging.error(e)
return False
return True
def create_stack(stack_name, cfn_template, cfn_params):
"""Create EMR Cluster CloudFormation stack"""
template_data = _parse_template(cfn_template)
create_stack_params = {
'StackName': stack_name,
'TemplateBody': template_data,
'Parameters': cfn_params,
'TimeoutInMinutes': 60,
'Capabilities': [
'CAPABILITY_NAMED_IAM',
],
'Tags': [
{
'Key': 'Project',
'Value': 'Superset EMR Demo'
},
]
}
try:
response = cfn_client.create_stack(**create_stack_params)
logging.info(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def _parse_template(template):
with open(template) as template_file_obj:
template_data = template_file_obj.read()
cfn_client.validate_template(TemplateBody=template_data)
return template_data
def _parse_parameters(parameters):
with open(parameters) as parameter_file_obj:
parameter_data = json.load(parameter_file_obj)
return parameter_data
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–environment', required=True, choices=['dev', 'test', 'prod'], help='Environment')
parser.add_argument('-k', '–ec2-key-name', required=True, help='Ec2KeyName: Name of EC2 Keypair')
parser.add_argument('-s', '–ec2-subnet-id', required=True, help='Ec2SubnetId: Name of EC2 Keypair')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw create_cfn_stack.py hosted with ❤ by GitHub

To execute the Python script and create the CloudFormation stack, which will create the EMR cluster, run the following command. Remember to update the parameters to the name of your EC2 key pair and the Subnet ID for the EMR cluster.

python3 ./create_cfn_stack.py \
--ec2-key-name <your_key_pair_name> \
--ec2-subnet-id <your_subnet_id> \
--environment dev

Here is what the complete CloudFormation workflow looks like.

AWS CloudFormation stack creation

Security Group Ingress Rules

To install Superset on the EMR cluster’s Master node via SSH, you need to open port 22 on the Security Group associated with the EMR cluster’s Master Node, allowing access from your IP address. You can use the AWS Management Console or AWS CLI to open port 22. We will use jq and AWS ec2 API from the AWS CLI to get the Security Group ID associated with the EMR cluster’s Master Node and create the two ingress rules.

export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r ".SecurityGroups[] | \
select(.GroupName==\"ElasticMapReduce-master\").GroupId" | \
head -n 1)
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32

Superset Script

Once the CloudFormation stack is created and the ports are open, we can install Apache Superset on the EMR Master Node. The bootstrap script,bootstrap_emr/bootstrap_superset.sh, will be used to install Apache Superset onto the EMR cluster’s Master Node as the hadoop user. The script is roughly based on Superset’s Installing from Scratch instructions.

As part of installing Superset, the script will also deploy several common database drivers, including Amazon Athena, Amazon Redshift, Apache Spark SQL, Presto, PostgreSQL, and MySQL. The script will also create a Superset Admin role, and two Superset User roles — Alpha and Gamma.

#!/bin/bash
# Purpose: Installs Apache Superset on EMR
# Author: Gary A. Stafford (December 2020)
# Usage: sh ./bootstrap_superset.sh 8280
# Reference: https://superset.apache.org/docs/installation/installing-superset-from-scratch
# port for superset (default: 8280)
export SUPERSET_PORT="${1:-8280}"
# install required packages
sudo yum -y install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel \
openssl-devel cyrus-sasl-devel openldap-devel python3-devel.x86_64
# optionally, update Master Node packages
sudo yum -y update
# install required Python package
python3 -m pip install –user –upgrade setuptools virtualenv
python3 -m venv venv
. venv/bin/activate
python3 -m pip install –upgrade apache-superset \
PyAthenaJDBC PyAthena sqlalchemy-redshift pyhive mysqlclient psycopg2-binary
command -v superset
superset db upgrade
export FLASK_APP=superset
echo "export FLASK_APP=superset" >>~/.bashrc
touch superset_config.py
echo "ENABLE_TIME_ROTATE = True" >>superset_config.py
echo "export SUPERSET_CONFIG_PATH=superset_config.py" >>~/.bashrc
export ADMIN_USERNAME="SupersetAdmin"
export ADMIN_PASSWORD="Admin1234"
# create superset admin
superset fab create-admin \
–username "${ADMIN_USERNAME}" \
–firstname Superset \
–lastname Admin \
–email superset_admin@example.com \
–password "${ADMIN_PASSWORD}"
superset init
# create two sample superset users
superset fab create-user \
–role Alpha \
–username SupersetUserAlpha \
–firstname Superset \
–lastname UserAlpha \
–email superset_user_alpha@example.com \
–password UserAlpha1234
superset fab create-user \
–role Gamma \
–username SupersetUserGamma \
–firstname Superset \
–lastname UserGamma \
–email superset_user_gamma@example.com \
–password UserGamma1234
# get instance id
INSTANCE_ID="$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .instanceId)"
export INSTANCE_ID
echo "INSTANCE_ID: ${INSTANCE_ID}"
# use instance id to get public dns of master node
PUBLIC_MASTER_DNS="$(aws ec2 describe-instances –instance-id ${INSTANCE_ID} |
jq -r '.Reservations[0].Instances[0].PublicDnsName')"
export PUBLIC_MASTER_DNS
echo "PUBLIC_MASTER_DNS: ${PUBLIC_MASTER_DNS}"
# start superset in background
nohup superset run \
–host "${PUBLIC_MASTER_DNS}" \
–port "${SUPERSET_PORT}" \
–with-threads –reload –debugger \
>superset_output.log 2>&1 </dev/null &
# output connection info
printf %s """
**********************************************************************
Superset URL: http://${PUBLIC_MASTER_DNS}:${SUPERSET_PORT}
Admin Username: ${ADMIN_USERNAME}
Admin Password: ${ADMIN_PASSWORD}
**********************************************************************
"""

To install Superset using the bootstrap script, we will use another Python script, install_superset.py. The script uses paramiko, a Python implementation of SSHv2. The script also uses scp, a module that uses a paramiko transport to send and receive files via the scp1 protocol.

#!/bin/bash
# Purpose: Installs Apache Superset on EMR
# Author: Gary A. Stafford (December 2020)
# Usage: sh ./bootstrap_superset.sh 8280
# Reference: https://superset.apache.org/docs/installation/installing-superset-from-scratch
# port for superset (default: 8280)
SUPERSET_PORT="${1:-8280}"
# install required packages
sudo yum -y install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel \
openssl-devel cyrus-sasl-devel openldap-devel python3-devel.x86_64
# optionally, update master node packages
sudo yum -y update
# set up a python virtual environment
python3 -m pip install –user –upgrade setuptools virtualenv
python3 -m venv venv
. venv/bin/activate
python3 -m pip install –upgrade apache-superset \
PyAthenaJDBC PyAthena sqlalchemy-redshift pyhive mysqlclient psycopg2-binary
command -v superset
superset db upgrade
FLASK_APP=superset
export "FLASK_APP=${FLASK_APP}"
echo "export FLASK_APP=superset" >>~/.bashrc
touch superset_config.py
echo "ENABLE_TIME_ROTATE = True" >>superset_config.py
echo "export SUPERSET_CONFIG_PATH=superset_config.py" >>~/.bashrc
ADMIN_USERNAME="SupersetAdmin"
ADMIN_PASSWORD="Admin1234"
# create superset admin
superset fab create-admin \
–username "${ADMIN_USERNAME}" \
–firstname Superset \
–lastname Admin \
–email superset_admin@example.com \
–password "${ADMIN_PASSWORD}"
superset init
# create two sample superset users
superset fab create-user \
–role Alpha \
–username SupersetUserAlpha \
–firstname Superset \
–lastname UserAlpha \
–email superset_user_alpha@example.com \
–password UserAlpha1234
superset fab create-user \
–role Gamma \
–username SupersetUserGamma \
–firstname Superset \
–lastname UserGamma \
–email superset_user_gamma@example.com \
–password UserGamma1234
# get ec2 instance id
INSTANCE_ID="$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .instanceId)"
# use ec2 instance id to get public dns of master node
PUBLIC_MASTER_DNS="$(aws ec2 describe-instances –instance-id ${INSTANCE_ID} |
jq -r '.Reservations[0].Instances[0].PublicDnsName')"
# start superset in background
nohup superset run \
–host "${PUBLIC_MASTER_DNS}" \
–port "${SUPERSET_PORT}" \
–with-threads –reload –debugger \
>superset_output.log 2>&1 </dev/null &
# output superset connection info
printf %s """
**********************************************************************
Superset URL: http://${PUBLIC_MASTER_DNS}:${SUPERSET_PORT}
Admin Username: ${ADMIN_USERNAME}
Admin Password: ${ADMIN_PASSWORD}
**********************************************************************
"""
view raw install_superset.py hosted with ❤ by GitHub

The script requires a single input parameter, ec2-key-path, which is the full path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem). Optionally, you can change the default Superset port of 8280, using the superset-port parameter.

python3 ./install_superset.py \
--ec2-key-path </path/to/my-key-pair.pem> \
--superset-port 8280

The script uses SSH and SCP to deploy and execute the bootstrap script,bootstrap_superset.sh. The output from the script includes the URL of Apache Superset running on the EMR cluster. The output also contains the username and password of the Superset Admin.

********************************************************************
Superset URL: http://ec2-111-22-333-44.compute-1.amazonaws.com:8280
Admin Username: SupersetAdmin
Admin Password: Admin1234
********************************************************************

SSH Tunnel

According to AWS, EMR applications publish user interfaces as websites hosted on the master node. For security reasons, these websites are only available on the master node’s local web server. To reach any of the web interfaces, you must establish an SSH tunnel with the master node using either dynamic or local port forwarding. If you are using dynamic port forwarding, you must also configure a proxy server to view the web interfaces.

Instructions for creating an SSH tunnel to access UI’s on EMR

Running the command in your terminal will start the SSH tunnel on port 8157. Once the tunnel is enabled, you can access Apache Superset in a web browser, using the script output’s URL shown in the script output above. Use the Admin credentials or either of the two User credentials to sign in to Superset.

Apache Superset Sign In screen

Once signed in, you will have the ability to connect to your data sources and explore and visualize data. Below, we see an example of a SQL query executed against an Amazon RDS for PostgreSQL database, running in a separate VPC from EMR.

Sample query of an Amazon RDS PostgreSQL database using Superset’s SQL Editor

Conclusion

In this post, we learned how to install Apache Superset onto the Master Node of an Amazon EMR Cluster. If you want to install an application on all the nodes of an EMR cluster, you can add the commands to the bootstrap script, which runs when CloudFormation creates the cluster.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , ,

Leave a comment

Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce

Introduction

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache SparkHiveHBaseFlinkHudi, and ZeppelinJupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics. 

Image for post
Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

With EMR, individuals and teams can also use notebooks, including EMR Notebooks, based on JupyterLab, the web-based interactive development environment for Jupyter notebooks for ad-hoc data analytics. Apache Zeppelin is also available to collaborate and interactively explore, process, and visualize data. With EMR notebooks and the EMR API, users can programmatically execute a notebook without the need to interact with the EMR console, referred to as headless execution.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. One major difference between EMR versions relevant to this post is EMR 6.x’s support for the latest Hadoop and Spark 3.x frameworks. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

PySpark on EMR

In the following series of posts, we will focus on the options available to interact with Amazon EMR using the Python API for Apache Spark, known as PySpark. We will divide the methods for accessing PySpark on EMR into two categories: PySpark applications and notebooks. We will explore both interactive and automated patterns for running PySpark applications (Python scripts) and PySpark-based notebooks. In this first post, I will cover the first four PySpark Application Methods listed below. In part two, I will cover Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and in part three, the use of notebooks.

PySpark Application Methods

  1. Add Job Flow Steps: Remote execution of EMR Steps on an existing EMR cluster using the add_job_flow_steps method;
  2. EMR Master Node: Remote execution over SSH of PySpark applications using spark-submit on an existing EMR cluster’s Master node;
  3. Run Job Flow: Remote execution of EMR Steps on a newly created long-lived or auto-terminating EMR cluster using the run_job_flow method;
  4. AWS Step Functions: Remote execution of EMR Steps using AWS Step Functions on an existing or newly created long-lived or auto-terminating EMR cluster;
  5. Apache Airflow: Remote execution of EMR Steps using the recently released Amazon MWAA on an existing or newly created long-lived or auto-terminating EMR cluster (see part two of this series);

Notebook Methods

  1. EMR Notebooks for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Jupyter Notebooks on an existing EMR cluster;
  2. Headless Execution of EMR Notebooks: Headless execution of notebooks from an existing EMR cluster or newly created auto-terminating cluster;
  3. Apache Zeppelin for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Zeppelin notebooks on an existing EMR cluster;

Note that wherever the AWS SDK for Python (boto3) is used in this post, we can substitute the AWS CLI or AWS Tools for PowerShell. Typically, these commands and Python scripts would be run as part of a DevOps or DataOps deployment workflow, using CI/CD platforms like AWS CodePipeline, Jenkins, Harness, CircleCI, Travis CI, or Spinnaker.

Preliminary Tasks

To prepare the AWS EMR environment for this post, we need to perform a few preliminary tasks.

  1. Download a copy of this post’s GitHub repository;
  2. Download three Kaggle datasets and organize locally;
  3. Create an Amazon EC2 key pair;
  4. Upload the EMR bootstrap script and create the CloudFormation Stack;
  5. Allow your IP address access to the EMR Master node on port 22;
  6. Upload CSV data files and PySpark applications to S3;
  7. Crawl the raw data and create a Data Catalog using AWS Glue;

Step 1: GitHub Repository

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-demo.git

Step 2: Kaggle Datasets

Kaggle is a well-known data science resource with 50,000 public datasets and 400,000 public notebooks. We will be using three Kaggle datasets in this post. You will need to join Kaggle to access these free datasets. Download the following three Kaggle datasets as CSV files. Since we are working with (moderately) big data, the total size of the datasets will be approximately 1 GB.

  1. Movie Ratings: https://www.kaggle.com/rounakbanik/the-movies-dataset
  2. Bakery: https://www.kaggle.com/sulmansarwar/transactions-from-a-bakery
  3. Stocks: https://www.kaggle.com/timoboz/stock-data-dow-jones

Organize the (38) downloaded CSV files into the raw_data directory of the locally cloned GitHub repository, exactly as shown below. We will upload these files to Amazon S3, in the proceeding step.

 > tree raw_data --si -v -A

 raw_data
├── [ 128]  bakery
│   ├── [711k]  BreadBasket_DMS.csv
├── [ 320]  movie_ratings
│   ├── [190M]  credits.csv
│   ├── [6.2M]  keywords.csv
│   ├── [989k]  links.csv
│   ├── [183k]  links_small.csv
│   ├── [ 34M]  movies_metadata.csv
│   ├── [710M]  ratings.csv
│   └── [2.4M]  ratings_small.csv
└── [1.1k]  stocks
    ├── [151k]  AAPL.csv
    ├── [146k]  AXP.csv
    ├── [150k]  BA.csv
    ├── [147k]  CAT.csv
    ├── [146k]  CSCO.csv
    ├── [149k]  CVX.csv
    ├── [147k]  DIS.csv
    ├── [ 42k]  DWDP.csv
    ├── [150k]  GS.csv
    └── [...]  abrdiged... 

In this post, we will be using three different datasets. However, if you want to limit the potential costs associated with big data analytics on AWS, you can choose to limit job submissions to only one or two of the datasets. For example, the bakery and stocks datasets are fairly small yet effectively demonstrate most EMR features. In contrast, the movie rating dataset has nearly 27 million rows of ratings data, which starts to demonstrate the power of EMR and PySpark for big data.

Step 3: Amazon EC2 key pair

According to AWS, a key pair, consisting of a private key and a public key, is a set of security credentials that you use to prove your identity when connecting to an [EC2] instance. Amazon EC2 stores the public key, and you store the private key. To SSH into the EMR cluster, you will need an Amazon key pair. If you do not have an existing Amazon EC2 key pair, create one now. The easiest way to create a key pair is from the AWS Management Console.

Image for post
Amazon EC2 Key pair Console

Your private key is automatically downloaded when you create a key pair in the console. Store your private key somewhere safe. If you use an SSH client on a macOS or Linux computer to connect to EMR, use the following chmod command to set the correct permissions of your private key file so that only you can read it.

chmod 0400 /path/to/my-key-pair.pem

Step 4: Bootstrap Script and CloudFormation Stack

The bulk of the resources that are used as part of this demonstration are created using the CloudFormation stack, emr-dem-dev. The CloudFormation template that creates the stack, cloudformation/emr-demo.yml, is included in the repository. Please review all resources and understand the cost and security implications before continuing.

There is also a JSON-format CloudFormation parameters file, cloudformation/emr-demo-params-dev.json, containing values for all but two of the parameters in the CloudFormation template. The two parameters not in the parameter file are the name of the EC2 key pair you just created and the bootstrap bucket’s name. Both will be passed along with the CloudFormation template using the Python script, create_cfn_stack.py. For each type of environment, such as Development, Test, and Production, you could have a separate CloudFormation parameters file, with different configurations.

AWS CloudFormation stack creation

The template will create approximately (39) AWS resources, including a new AWS VPC, a public subnet, an internet gateway, route tables, a 3-node EMR v6.2.0 cluster, a series of Amazon S3 buckets, AWS Glue data catalog, AWS Glue crawlers, several Systems Manager Parameter Store parameters, and so forth.

The CloudFormation template includes the location of the EMR bootstrap script located on Amazon S3. Before creating the CloudFormation stack, the Python script creates an S3 bootstrap bucket and copies the bootstrap script, bootstrap_actions.sh, from the local project repository to the S3 bucket. The script will be used to install additional packages on EMR cluster nodes, which are required by our PySpark applications. The script also sets the default AWS Region for boto3.

#!/bin/bash
# Purpose: EMR bootstrap script
# Author: Gary A. Stafford (December 2020)
# update and install some useful yum packages
sudo yum install -y jq
# set region for boto3
aws configure set region \
"$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)"
# install some useful python packages
sudo python3 -m pip install boto3 ec2-metadata awswrangler
view raw bootstrap_actions.sh hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to create the bootstrap bucket, copy the bootstrap script, and provision the CloudFormation stack. You will need to pass the name of your EC2 key pair to the script as a command-line argument.

python3 ./scripts/create_cfn_stack.py \
    --environment dev \
    --ec2-key-name <my-key-pair-name>

The CloudFormation template should create a CloudFormation stack, emr-demo-dev, as shown below.

Image for post
AWS CloudFormation Console’s Stacks tab

Step 5: SSH Access to EMR

For this demonstration, we will need access to the new EMR cluster’s Master EC2 node, using SSH and your key pair, on port 22. The easiest way to add a new inbound rule to the correct AWS Security Group is to use the AWS Management Console. First, find your EC2 Security Group named ElasticMapReduce-master.

Image for post
Amazon EC2 Security Group Console

Then, add a new Inbound rule for SSH (port 22) from your IP address, as shown below.

Image for post
Amazon EC2 Security Group Inbound rules

Alternately, you could use the AWS CLI or AWS SDK to create a new security group ingress rule.

export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r '.SecurityGroups[] | select(.GroupName=="ElasticMapReduce-master").GroupId')
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32

Step 6: Raw Data and PySpark Apps to S3

As part of the emr-demo-dev CloudFormation stack, we now have several new Amazon S3 buckets within our AWS Account. The naming conventions and intended usage of these buckets follow common organizational patterns for data lakes. The data buckets use the common naming convention of rawprocessed, and analyzed data in reference to the data stored within them. We also use a widely used, corresponding naming convention of ‘bronze’, ‘silver’, and ‘gold’ when referring to these data buckets as parameters.

> aws s3api list-buckets | \
    jq -r '.Buckets[] | select(.Name | startswith("emr-demo-")).Name'

emr-demo-raw-123456789012-us-east-1
emr-demo-processed-123456789012-us-east-1
emr-demo-analyzed-123456789012-us-east-1
emr-demo-work-123456789012-us-east-1
emr-demo-logs-123456789012-us-east-1
emr-demo-glue-db-123456789012-us-east-1
emr-demo-bootstrap-123456789012-us-east-1

There is a raw data bucket (aka bronze) that will contain the original CSV files. There is a processed data bucket (aka silver) that will contain data that might have had any number of actions applied: data cleansing, obfuscation, data transformation, file format changes, file compression, and data partitioning. Finally, there is an analyzed data bucket (aka gold) that has the results of the data analysis. We also have a work bucket that holds the PySpark applications, a logs bucket that holds EMR logs, and a glue-db bucket to hold the Glue Data Catalog metadata.

Whenever we submit PySpark jobs to EMR, the PySpark application files and data will always be accessed from Amazon S3. From the GitHub repository’s local copy, run the following command, which will execute a Python script to upload the approximately (38) Kaggle dataset CSV files to the raw S3 data bucket.

python3 ./scripts/upload_csv_files_to_s3.py

Next, run the following command, which will execute a Python script to upload a series of PySpark application files to the work S3 data bucket.

python3 ./scripts/upload_apps_to_s3.py

Step 7: Crawl Raw Data with Glue

The last preliminary step to prepare the EMR demonstration environment is to catalog the raw CSV data into an AWS Glue data catalog database, using one of the two Glue Crawlers we created. The three kaggle dataset’s data will reside in Amazon S3, while their schema and metadata will reside within tables in the Glue data catalog database, emr_demo. When we eventually query the data from our PySpark applications, we will be querying the Glue data catalog’s database tables, which reference the underlying data in S3.

From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the raw data’s schema and metadata information into the Glue data catalog database, emr_demo.

python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-raw

Once the crawler is finished, from the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo, all prefixed with raw_. The tables hold metadata and schema information for the three CSV-format Kaggle datasets loaded into S3.

Image for post
AWS Glue Data Catalog Database Tables Console

Alternately, we can use the glue get-tables AWS CLI command to review the tables.

> aws glue get-tables --database emr_demo | \
    jq -r '.TableList[] | select(.Name | startswith("raw_")).Name'

raw_bakery
raw_credits_csv
raw_keywords_csv
raw_links_csv
raw_links_small_csv
raw_movies_metadata_csv
raw_ratings_csv
raw_ratings_small_csv
raw_stocks

PySpark Applications

Let’s explore four methods to run PySpark applications on EMR.

Image for post
High-level architecture of this post’s data analytics platform

1. Add Job Flow Steps to an Existing EMR Cluster

We will start by looking at running PySpark applications using EMR Steps. According to AWS, we can use Amazon EMR steps to submit work to the Spark framework installed on an EMR cluster. The EMR step for PySpark uses a spark-submit command. According to Spark’s documentation, the spark-submit script, located in Spark’s bin directory, is used to launch applications on a [EMR] cluster. A typical spark-submit command we will be using resembles the following example. This command runs a PySpark application in S3, bakery_sales_ssm.py.

spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://emr-demo-work-123456789012-us-east-1/analyze/bakery_sales_ssm.py

We will target the existing EMR cluster created by CloudFormation earlier to execute our PySpark applications using EMR Steps. We have two sets of PySpark applications. The first set of three PySpark applications will transform the raw CSV-format datasets into Apache Parquet, a more efficient file format for big data analytics. Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data.

There are two versions of each PySpark application. Files with suffix _ssm use the AWS Systems Manager (SSM) Parameter Store service to obtain dynamic parameter values at runtime on EMR. Corresponding non-SSM applications require those same parameter values to be passed on the command line when they are submitted to Spark. Therefore, these PySpark applications are not tightly coupled to boto3 or the SSM Parameter Store. We will use _ssm versions of the scripts in this post’s demonstration.

 > tree pyspark_apps --si -v -A

 pyspark_apps
├── [ 320]  analyze
│   ├── [1.4k]  bakery_sales.py
│   ├── [1.5k]  bakery_sales_ssm.py
│   ├── [2.6k]  movie_choices.py
│   ├── [2.7k]  movie_choices_ssm.py
│   ├── [2.0k]  movies_avg_ratings.py
│   ├── [2.3k]  movies_avg_ratings_ssm.py
│   ├── [2.2k]  stock_volatility.py
│   └── [2.3k]  stock_volatility_ssm.py
└── [ 256]  process
    ├── [1.1k]  bakery_csv_to_parquet.py
    ├── [1.3k]  bakery_csv_to_parquet_ssm.py
    ├── [1.3k]  movies_csv_to_parquet.py
    ├── [1.5k]  movies_csv_to_parquet_ssm.py
    ├── [1.9k]  stocks_csv_to_parquet.py
    └── [2.0k]  stocks_csv_to_parquet_ssm.py 

We will start by executing the three PySpark processing applications. They will convert the CSV data to Parquet. Below, we see an example of one of the PySpark applications we will run, bakery_csv_to_parquet_ssm.py. The PySpark application will convert the Bakery Sales dataset’s CSV file to Parquet and write it to S3.

#!/usr/bin/env python3
# Process raw CSV data and output Parquet
# Author: Gary A. Stafford (November 2020)
import os
import boto3
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client('ssm')
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("bakery-csv-to-parquet") \
.getOrCreate()
convert_to_parquet(spark, "bakery", params)
def convert_to_parquet(spark, file, params):
df_bakery = spark.read \
.format("csv") \
.option("header", "true") \
.option("delimiter", ",") \
.option("inferSchema", "true") \
.load(f"s3a://{params['bronze_bucket']}/bakery/{file}.csv")
write_parquet(df_bakery, params)
def write_parquet(df_bakery, params):
df_bakery.write \
.format("parquet") \
.save(f"s3a://{params['silver_bucket']}/bakery/", mode="overwrite")
def get_parameters():
params = {
'bronze_bucket': ssm_client.get_parameter(Name='/emr_demo/bronze_bucket')['Parameter']['Value'],
'silver_bucket': ssm_client.get_parameter(Name='/emr_demo/silver_bucket')['Parameter']['Value']
}
return params
if __name__ == "__main__":
main()

The three PySpark data processing application’s spark-submit commands are defined in a separate JSON-format file, job_flow_steps_process.json, a snippet of which is shown below. The same goes for the four analytics applications.

[
{
"Name": "Bakery CSV to Parquet",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ work_bucket }}/process/bakery_csv_to_parquet_ssm.py"
]
}
},
{
"Name": "Stocks CSV to Parquet",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ work_bucket }}/process/stocks_csv_to_parquet_ssm.py"
]
}
}
]

Using this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python script, add_job_flow_steps_process.py, shown below. Note line 31, where the Steps are injected into the add_job_flow_steps method’s parameters.

#!/usr/bin/env python3
# Purpose: Submit a variable number of Steps defined in a separate JSON file
# Author: Gary A. Stafford (November 2020)
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
emr_client = boto3.client('emr')
def main():
args = parse_args()
params = get_parameters()
steps = get_steps(params, args.job_type)
add_job_flow_steps(params['cluster_id'], steps)
def add_job_flow_steps(cluster_id, steps):
"""Add Steps to an existing EMR cluster"""
try:
response = emr_client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=steps
)
print(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def get_steps(params, job_type):
"""
Load EMR Steps from a separate JSON-format file and substitutes tags for SSM parameter values
"""
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
file = open(f'{dir_path}/job_flow_steps/job_flow_steps_{job_type}.json', 'r')
steps = json.load(file)
new_steps = []
for step in steps:
step['HadoopJarStep']['Args'] = list(
map(lambda st: str.replace(st, '{{ work_bucket }}', params['work_bucket']), step['HadoopJarStep']['Args']))
new_steps.append(step)
return new_steps
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value'],
'cluster_id': ssm_client.get_parameter(Name='/emr_demo/cluster_id')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-t', '–job-type', required=True, choices=['process', 'analyze'],
help='process or analysis')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()

The Python script used for this task takes advantage of AWS Systems Manager Parameter Store parameters. The parameters were placed in the Parameter Store, within the /emr_demo path, by CloudFormation. We will reference these parameters in several scripts throughout the post.

> aws ssm get-parameters-by-path --path '/emr_demo' | \
      jq -r ".Parameters[] | {Name: .Name, Value: .Value}"

{
"Name": "/emr_demo/bootstrap_bucket",
"Value": "emr-demo-bootstrap-123456789012-us-east-1"
}
{
"Name": "/emr_demo/ec2_key_name",
"Value": "emr-demo-123456789012-us-east-1"
}
{
"Name": "/emr_demo/ec2_subnet_id",
"Value": "subnet-06aa61f790a932b32"
}
{
"Name": "/emr_demo/emr_ec2_role",
"Value": "EMR_EC2_DemoRole"
}
{
"Name": "/emr_demo/emr_role",
"Value": "EMR_DemoRole"
}
{
"Name": "/emr_demo/gold_bucket",
"Value": "emr-demo-analyzed-123456789012-us-east-1"
}
{
"Name": "/emr_demo/silver_bucket",
"Value": "emr-demo-processed-123456789012-us-east-1"
}
{
"Name": "/emr_demo/sm_log_group_arn",
"Value": "arn:aws:logs:us-east-1:123456789012:log-group:EmrDemoStateMachineLogGroup:*"
}
{
"Name": "/emr_demo/sm_role_arn",
"Value": "arn:aws:iam::123456789012:role/State_ExecutionRole"
}
{
"Name": "/emr_demo/work_bucket",
"Value": "emr-demo-work-123456789012-us-east-1"
}
{
"Name": "/emr_demo/bronze_bucket",
"Value": "emr-demo-raw-123456789012-us-east-1"
}
{
"Name": "/emr_demo/cluster_id",
"Value": "j-3J44BFJXNVSCT"
}
{
"Name": "/emr_demo/glue_db_bucket",
"Value": "emr-demo-logs-123456789012-us-east-1"
}
{
"Name": "/emr_demo/logs_bucket",
"Value": "emr-demo-logs-123456789012-us-east-1"
}
{
"Name": "/emr_demo/vpc_id",
"Value": "vpc-01d4151396c119d3a"
}
view raw ssm_parameters.json hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to load the three spark-submit commands from JSON-format file, job_flow_steps_process.json, and run the PySpark processing applications on the existing EMR cluster.

python3 ./scripts/add_job_flow_steps.py --job-type process

While the three Steps are running concurrently, the view from the Amazon EMR Console’s Cluster Steps tab should look similar to the example below.

Image for post
Amazon EMR Console’s Cluster Steps tab

Once the three Steps have been completed, we should note three sub-directories in the processed data bucket containing Parquet-format files.

Image for post
Processed CSV data converted to Parquet and organized by dataset

Of special note is the Stocks dataset, which has been converted to Parquet and partitioned by stock symbol. According to AWS, by partitioning your data, we can restrict the amount of data scanned by each query by specifying filters based on the partition, thus improving performance and reducing cost.

Image for post
Processed stock data converted to Parquet and partitioned by stock symbol

Lastly, the movie ratings dataset has been divided into sub-directories, based on the schema of each table. Each sub-directory contains Parquet files specific to that unique schema.

Image for post
Processed movie ratings data converted to Parquet and organized by schema

Crawl Processed Data with Glue

Similar to the raw data earlier, catalog the newly processed Parquet data into the same AWS Glue data catalog database using one of the two Glue Crawlers we created. Similar to the raw data, earlier, processed data will reside in the Amazon S3 processed data bucket while their schemas and metadata will reside within tables in the Glue data catalog database, emr_demo.

From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the processed data’s schema and metadata information into the Glue data catalog database, emr_demo.

python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-processed

Once the crawler has finished successfully, using the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo, all prefixed with processed_. The tables represent the three kaggle dataset’s contents converted to Parquet and correspond to the equivalent tables with the raw_ prefix.

Image for post
AWS Glue Data Catalog Database Tables Console

Alternately, we can use the glue get-tables AWS CLI command to review the tables.

> aws glue get-tables --database emr_demo | \
    jq -r '.TableList[] | select(.Name | startswith("processed_")).Name'

processed_bakery
processed_credits
processed_keywords
processed_links
processed_links_small
processed_movies_metadata
processed_ratings
processed_ratings_small
processed_stocks

2. Run PySpark Jobs from EMR Master Node

Next, we will explore how to execute PySpark applications remotely on the Master node on the EMR cluster using boto3 and SSH. Although this method may be optimal for certain use cases as opposed to using the EMR SDK, remote SSH execution does not scale as well in my opinion due to a lack of automation, and it exposes some potential security risks.

There are four PySpark applications in the GitHub repository. For this part of the demonstration, we will just submit the bakery_sales_ssm.py application. This application will perform a simple analysis of the bakery sales data. While the other three PySpark applications use AWS Glue, the bakery_sales_ssm.py application reads data directly from the processed data S3 bucket.

The application writes its results into the analyzed data S3 bucket, in both Parquet and CSV formats. The CSV file is handy for business analysts and other non-technical stakeholders who might wish to import the results of the analysis into Excel or business applications.

#!/usr/bin/env python3
# Purpose: Submit Spark job to EMR Master Node
# Author: Gary A. Stafford (December 2020)
# Usage Example: python3 ./submit_spark_ssh.py \
# –ec2-key-path ~/.ssh/emr-demo-123456789012-us-east-1.pem
import argparse
import logging
import boto3
from paramiko import SSHClient, AutoAddPolicy
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
def main():
args = parse_args()
params = get_parameters()
submit_job(params['master_public_dns'], 'hadoop', args.ec2_key_path, params['work_bucket'])
def submit_job(master_public_dns, username, ec2_key_path, work_bucket):
"""Submit job to EMR Master Node"""
ssh = SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect(hostname=master_public_dns, username=username, key_filename=ec2_key_path)
stdin_, stdout_, stderr_ = ssh.exec_command(
command=f"""
spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://{work_bucket}/analyze/bakery_sales_ssm.py"""
)
stdout_lines = ''
while not stdout_.channel.exit_status_ready():
if stdout_.channel.recv_ready():
stdout_lines = stdout_.readlines()
logging.info(' '.join(map(str, stdout_lines)))
ssh.close()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'master_public_dns': ssm_client.get_parameter(Name='/emr_demo/master_public_dns')['Parameter']['Value'],
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–ec2-key-path', required=True, help='EC2 Key Path')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw submit_spark_ssh.py hosted with ❤ by GitHub

Earlier, we created an inbound rule to allow your IP address to access the Master node on port 22. From the EMR Console’s Cluster Summary tab, note the command necessary to SSH into the Master node of the EMR cluster.

EMR Console’s Cluster Summary tab

The Python script, submit_spark_ssh.py, shown below, will submit the PySpark job to the EMR Master Node, using paramiko, a Python implementation of SSHv2. The script is replicating the same functionality as the shell-based SSH command above to execute a remote command on the EMR Master Node. The spark-submit command is on lines 36–38, below.

#!/usr/bin/env python3
# Purpose: Submit Spark job to EMR Master Node
# Author: Gary A. Stafford (December 2020)
# Usage Example: python3 ./submit_spark_ssh.py \
# –ec2-key-path ~/.ssh/emr-demo-123456789012-us-east-1.pem
import argparse
import logging
import boto3
from paramiko import SSHClient, AutoAddPolicy
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
def main():
args = parse_args()
params = get_parameters()
submit_job(params['master_public_dns'], 'hadoop', args.ec2_key_path, params['work_bucket'])
def submit_job(master_public_dns, username, ec2_key_path, work_bucket):
"""Submit job to EMR Master Node"""
ssh = SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect(hostname=master_public_dns, username=username, key_filename=ec2_key_path)
stdin_, stdout_, stderr_ = ssh.exec_command(
command=f"""
spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://{work_bucket}/analyze/bakery_sales_ssm.py"""
)
stdout_lines = ''
while not stdout_.channel.exit_status_ready():
if stdout_.channel.recv_ready():
stdout_lines = stdout_.readlines()
logging.info(' '.join(map(str, stdout_lines)))
ssh.close()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'master_public_dns': ssm_client.get_parameter(Name='/emr_demo/master_public_dns')['Parameter']['Value'],
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–ec2-key-path', required=True, help='EC2 Key Path')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw submit_spark_ssh.py hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to submit the job. The script requires one input parameter, which is the path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem)

python3 ./scripts/submit_spark_ssh.py \
    --ec2-key-path </path/to/my-key-pair.pem>

The spark-submit command will be executed remotely on the EMR cluster’s Master node over SSH. All variables in the commands will be replaced by the environment variables, set in advance, which use AWS CLI emr and ssm commands.

Image for post
Remote SSH submission of a Spark job

Monitoring Spark Jobs

We set spark.yarn.submit.waitAppCompletion to true. According to Spark’s documentation, this property controls whether the client waits to exit in YARN cluster mode until the application is completed. If set to true, the client process will stay alive, reporting the application’s status. Otherwise, the client process will exit after submission. We can watch the job’s progress from the terminal.

Image for post
PySpark application shown running on EMR’s Master node

We can also use the YARN Timeline Server and the Spark History Server in addition to the terminal. Links to both are shown on both the EMR Console’s Cluster ‘Summary’ and ‘Application user interfaces’ tabs. Unlike other EMR application web interfaces, using port forwarding, also known as creating an SSH tunnel, is not required for the YARN Timeline Server or the Spark History Server.

Image for post
EMR Console’s Cluster Application user interfaces tab

YARN Timeline Server

Below, we see that the job we submitted running on the YARN Timeline Server also includes useful tools like access to configuration, local logs, server stacks, and server metrics.

Image for post
YARN Timeline Server

YARN Timeline Server allows us to drill down into individual jobs and view logs. Logs are ideal for troubleshooting failed jobs, especially the stdout logs.

Image for post

Spark History Server

You can also view the PySpark application we submitted from the Master node using the Spark History Server. Below, we see completed Spark applications (aka Spark jobs) in the Spark History Server.

Image for post
Spark History Server completed applications

Below, we see more details about our Spark job using the Spark History Server.

Image for post
Spark History Server’s Jobs tab

We can even see visual representations of each Spark job’s Directed Acyclic Graph (DAG).

Image for post
Spark History Server’s Jobs tab

3. Run Job Flow on an Auto-Terminating EMR Cluster

The next option to run PySpark applications on EMR is to create a short-lived, auto-terminating EMR cluster using the run_job_flow method. We will create a new EMR cluster, run a series of Steps (PySpark applications), and then auto-terminate the cluster. This is a cost-effective method of running PySpark applications on-demand.

We will create a second 3-node EMR v6.2.0 cluster to demonstrate this method, using Amazon EC2 Spot instances for all the EMR cluster’s Master and Core nodes. Unlike the first, long-lived, more general-purpose EMR cluster, we will only deploy the Spark application to this cluster as that is the only application we will need to run the Steps.

Using the run_job_flow method, we will execute the four PySpark data analysis applications. The PySpark application’s spark-submit commands are defined in a separate JSON-format file, job_flow_steps_analyze.json. Similar to the previous add_job_flow_steps.py script, this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python execution script. Also similar, this script retrieves parameter values from the SSM Parameter Store.

#!/usr/bin/env python3
# Purpose: Create a new EMR cluster and submits a variable
# number of Steps defined in a separate JSON file
# Author: Gary A. Stafford (November 2020)
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
from scripts.parameters import parameters
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
emr_client = boto3.client('emr')
def main():
args = parse_args()
params = parameters.get_parameters()
steps = get_steps(params, args.job_type)
run_job_flow(params, steps)
def run_job_flow(params, steps):
"""Create EMR cluster, run Steps, and then terminate cluster"""
try:
response = emr_client.run_job_flow(
Name='demo-cluster-run-job-flow',
LogUri=f's3n://{params["logs_bucket"]}',
ReleaseLabel='emr-6.2.0',
Instances={
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.2xlarge',
},
],
},
],
'Ec2KeyName': params['ec2_key_name'],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': params['ec2_subnet_id'],
},
Steps=steps,
BootstrapActions=[
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': f's3://{params["bootstrap_bucket"]}/bootstrap_actions.sh',
}
},
],
Applications=[
{
'Name': 'Spark'
},
],
Configurations=[
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
VisibleToAllUsers=True,
JobFlowRole=params['emr_ec2_role'],
ServiceRole=params['emr_role'],
Tags=[
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'EMR Demo Project Cluster'
},
{
'Key': 'Owner',
'Value': 'Data Analytics'
},
],
EbsRootVolumeSize=32,
StepConcurrencyLevel=5,
)
print(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def get_steps(params, job_type):
"""
Load EMR Steps from a separate JSON-format file and substitutes tags for SSM parameter values
"""
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
file = open(f'{dir_path}/job_flow_steps/job_flow_steps_{job_type}.json', 'r')
steps = json.load(file)
new_steps = []
for step in steps:
step['HadoopJarStep']['Args'] = list(
map(lambda st: str.replace(st, '{{ work_bucket }}', params['work_bucket']), step['HadoopJarStep']['Args']))
new_steps.append(step)
return new_steps
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-t', '–job-type', required=True, choices=['process', 'analyze'], help='process or analysis')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw run_job_flow.py hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to create a new cluster, run the two PySpark applications, and then auto-terminate.

python3 ./scripts/run_job_flow.py --job-type analyze

As shown below, we see the short-lived EMR cluster in the process of terminating after successfully running the PySpark applications as EMR Steps.

Image for post
AWS EMR Console’s Cluster Steps tab
Image for post
AWS EMR Console’s Cluster Summary tab

4. Using AWS Step Functions

According to AWS, AWS Step Functions is a serverless function orchestrator that makes it easy to sequence AWS Lambda functions and multiple AWS services. Step Functions manages sequencing, error handling, retry logic, and state, removing a significant operational burden from your team. Step Functions is based on state machines and tasks. A state machine is a workflow. A task is a state in a workflow that represents a single unit of work that another AWS service performs. Each step in a workflow is a state. Using AWS Step Functions, we define our workflows as state machines, which transform complex code into easy to understand statements and diagrams.

Image for post
AWS Step Function Console’s State Machine Edit tab

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.

We will create two state machines for this demo, one for the PySpark data processing applications and one for the PySpark data analysis applications. To create state machines, we first need to create JSON-based state machine definition files. The files are written in Amazon States Language. According to AWS, Amazon States Language is a JSON-based, structured language used to define a state machine, a collection of states that can do work (Task states), determine which states to transition to next (Choice states), stop execution with an error (Fail states), and so on.

The definition files contain specific references to AWS resources deployed to your AWS account originally created by CloudFormation. Below is a snippet of the state machine definition file, step_function_emr_analyze.json, showing part of the configuration of the EMR cluster. Note the parameterized key/value pairs (e.g., “Ec2KeyName.$”: “$.InstancesEc2KeyName” on line 5). The values will come from a JSON-formatted inputs file and are dynamically replaced upon the state machine’s execution.

"ServiceRole": "$.ServiceRole",
"JobFlowRole.$": "$.JobFlowRole",
"LogUri.$": "$.LogUri",
"Instances": {
"Ec2KeyName.$": "$.InstancesEc2KeyName",
"Ec2SubnetId.$": "$.InstancesEc2SubnetId",
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
]
}

Python Templating

To automate the process of adding dynamic resource references to the state machine’s inputs files, we will use Jinja, the modern and designer-friendly templating language for Python, modeled after Django’s templates. We will render the Jinja template to a JSON-based state machine inputs file, replacing the template’s resource tags (keys) with values from the SSM Parameter Store’s parameters. Below is a snippet from the inputs file Jinja template, step_function_inputs_analyze.j2.

{
"CreateCluster": true,
"TerminateCluster": true,
"ScriptBootstrapActionScript": "s3://{{ bootstrap_bucket }}/bootstrap_actions.sh",
"LogUri": "s3n://{{ logs_bucket }}",
"InstancesEc2KeyName": "{{ ec2_key_name }}",
"InstancesEc2SubnetId": "{{ ec2_subnet_id}}",
"JobFlowRole": "{{ emr_ec2_role }}",
"ServiceRole": "{{ emr_role }}",
"ArgsBakerySalesSsm": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ work_bucket }}/analyze/bakery_sales_ssm.py"
]
}

First, install Jinja2, then create two JSON-based state machine inputs files from the Jinja templates using the included Python file.

# install Jinja2
python3 -m pip install Jinja2

python3 ./scripts/create_inputs_files.py

Below we see the same snippet of the final inputs file. Jinja tags have been replaced with values from the SSM Parameter Store.

{
"CreateCluster": true,
"TerminateCluster": true,
"ScriptBootstrapActionScript": "s3://emr-demo-bootstrap-123456789012-us-east-1/bootstrap_actions.sh",
"LogUri": "s3n://emr-demo-logs-123456789012-us-east-1",
"InstancesEc2KeyName": "emr-demo-123456789012-us-east-1",
"InstancesEc2SubnetId": "subnet-06ab61f888a932d12",
"JobFlowRole": "EMR_EC2_DemoRole",
"ServiceRole": "EMR_DemoRole",
"ArgsBakerySalesSsm": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://emr-demo-work-123456789012-us-east-1/analyze/bakery_sales_ssm.py"
]
}

Using the definition files, create two state machines using the included Python files.

python3 ./scripts/create_state_machine.py \
    --definition-file step_function_emr_process.json \
    --state-machine EMR-Demo-Process

python3 ./scripts/create_state_machine.py \
    --definition-file step_function_emr_analyze.json \
    --state-machine EMR-Demo-Analysis

Both state machines should appear in the AWS Step Functions Console’s State Machines tab. Below, we see the ‘EMR-Demo-Analysis’ state machine’s definition both as JSON and rendered visually to a layout.

Image for post
AWS Step Function Console’s State Machine Edit tab

To execute either of the state machines, use the included Python file, passing in the exact name of the state machine to execute, either ‘EMR-Demo-Process’ or ‘EMR-Demo-Analysis’, and the name of the inputs file. I suggest running the EMR-Demo-Analysis version so as not to re-process all the raw data.

python3 ./scripts/execute_state_machine.py \
    --state-machine EMR-Demo-Process \
    --inputs-file step_function_inputs_process.json

python3 ./scripts/execute_state_machine.py \
    --state-machine EMR-Demo-Analysis \
    --inputs-file step_function_inputs_analyze.json

When the PySpark analysis application’s Step Function state machine is executed, a new EMR cluster is created, the PySpark applications are run, and finally, the cluster is auto-terminated. Below, we see a successfully executed state machine, which successfully ran the four PySpark analysis applications in parallel, on a new auto-terminating EMR cluster.

Image for post
AWS Step Function Console’s State Machine Execution tab

Conclusion

This post explored four methods for running PySpark applications on Amazon Elastic MapReduce (Amazon EMR). The key to scaling data analytics with PySpark on EMR is the use of automation. Therefore, we looked at ways to automate the deployment of EMR resources, create and submit PySpark jobs, and terminate EMR resources when the jobs are complete. Furthermore, we were able to decouple references to dynamic AWS resources within our PySpark applications using parameterization. This allows us to deploy and run PySpark resources across multiple AWS Accounts and AWS Regions without code changes.

In part two of the series, we will explore the use of the recently announced service, Amazon Managed Workflows for Apache Airflow (MWAA), and in part three, the use of Juypter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , , , ,

1 Comment

GTM Stack: Exploring IoT Data Analytics at the Edge with Grafana, Mosquitto, and TimescaleDB on ARM-based Architectures

In the following post, we will explore the integration of several open-source software applications to build an IoT edge analytics stack, designed to operate on ARM-based edge nodes. We will use the stack to collect, analyze, and visualize IoT data without first shipping the data to the Cloud or other external systems.

Image for post
GMT IoT Edge Analytics Stack architecture (Image by author)

The Edge

Edge computing is a fast-growing technology trend, which involves pushing compute capabilities to the edgeWikipedia describes edge computing as a distributed computing paradigm that brings computation and data storage closer to the location needed to improve response times and save bandwidth. The term edge commonly refers to a compute node at the edge of a network (edge device), sitting in close proximity to the source a data and between that data source and external system such as the Cloud.

In his recent post, 3 Advantages (And 1 Disadvantage) Of Edge Computing, well-known futurist Bernard Marr argues reduced bandwidth requirements, reduced latency, and enhanced security and privacy as three primary advantages of edge computing. Due to techniques like data downsampling, Marr advises one potential disadvantage of edge computing is that important data could end up being overlooked and discarded in the quest to save bandwidth and reduce latency.

David Ricketts, Head of Marketing at Quiss Technology PLC, estimates in his post, Cloud and Edge Computing — The Stats You Need to Know for 2018, the global edge computing market is expected to reach USD 6.72 billion by 2022 at a compound annual growth rate of a whopping 35.4 percent. Realizing the market potential, many major Cloud providers, edge device manufacturers, and integrators are rapidly expanding their edge compute capabilities. AWS, for example, currently offers more than a dozen services in the edge computing category.

Internet of Things

Edge computing is frequently associated with the Internet of Things (IoT). IoT devices, industrial equipment, and sensors generate data, which is transmitted to other internal and external systems, often by way of edge nodes, such as an IoT Gateway. IoT devices typically generate time-series data. According to Wikipedia, a time series is a set of data points indexed in time order — a sequence taken at successive equally spaced points in time. IoT devices typically generate continuous high-volume streams of time-series data, often on the scale of millions of data points per second. IoT data characteristics require IoT platforms to minimally support temporal accuracy, high-volume ingestion and processing, efficient data compression and downsampling, and real-time querying capabilities.

The IoT devices and the edge devices, such as IoT Gateways, which aggregate and transmit IoT data from these devices to external systems, are generally lower-powered, with limited processor, memory, and storage capabilities. Accordingly, IoT platforms must satisfy all the requirements of IoT data while simultaneously supporting resource-constrained environments.

IoT Analytics at the Edge

Leading Cloud providers AWS, Azure, Google Cloud, IBM Cloud, Oracle Cloud, and Alibaba Cloud all offer IoT services. Many offer IoT services with edge computing capabilities. AWS offers AWS IoT Greengrass. Greengrass provides local compute, messaging, data management, sync, and ML inference capabilities to edge devices. Azure offers Azure IoT Edge. Azure IoT Edge provides the ability to run AI, Azure and third-party services, and custom business logic on edge devices using standard containers. Google Cloud offers Edge TPU. Edge TPU (Tensor Processing Unit) is Google’s purpose-built application-specific integrated circuit (ASIC), designed to run AI at the edge.

IoT Analytics

Many Cloud providers also offer IoT analytics as part of their suite of IoT services, although not at the edge. AWS offers AWS IoT Analytics, while Azure has Azure Time Series Insights. Google provides IoT analytics, indirectly, through downstream analytic systems and ad hoc analysis using Google BigQuery or advanced analytics and machine learning with Cloud Machine Learning Engine. These services generally all require data to be transmitted to the Cloud for analytics.

Image for post
Cloud-centric IoT platform data flow (Image by author)

The ability to analyze IoT data at the edge, as data is streamed in real-time, is critical to a rapid feedback loop. IoT edge analytics can accelerate anomaly detection, improve predictive maintenance capabilities, and expedite proactive inventory replenishment.

The IoT Edge Analytics Stack

In my opinion, the ideal IoT edge analytics stack is comprised of lightweight, purpose-built, easily deployable and manageable, platform- and programming language-agnostic, open-source software components. The minimal IoT edge analytics stack should include a lightweight message broker, a time-series database, an ANSI-standard ad-hoc query engine, and a data visualization tool. Each component should be purpose-built for IoT.

Lightweight Message Broker

We will use Eclipse Mosquitto as our message broker. According to the project’s description, Mosquitto is an open-source message broker that implements the Message Queuing Telemetry Transport (MQTT) protocol versions 5.0, 3.1.1, and 3.1. Mosquitto is lightweight and suitable for use on all devices from low power single board computers (SBCs) to full servers.

MQTT Client Library

We will interact with Mosquitto using Eclipse Paho. According to the project, the Eclipse Paho project provides open-source, mainly client-side implementations of MQTT and MQTT-SN in a variety of programming languages. MQTT and MQTT for Sensor Networks (MQTT-SN) are lightweight publish/subscribe messaging transports for TCP/IP and connectionless protocols, such as UDP, respectively.

We will be using Paho’s Python Client. The Paho Python Client provides a client class with support for both MQTT v3.1 and v3.1.1 on Python 2.7 or 3.x. The client also provides helper functions to make publishing messages to an MQTT server straightforward.

Time-Series Database

Time-series databases are optimal for storing IoT data. According to InfluxData, makers of a leading time-series database, InfluxDB, a time-series database (TSDB), is a database optimized for time-stamped or time-series data. Time series data are simply measurements or events that are tracked, monitored, downsampled, and aggregated over time. Jiao Xian, of Alibaba Cloud, has authored an insightful post on the time-series database ecosystem, What Are Time Series Databases? A few leading Cloud providers offer purpose-built time-series databases, though they are not available at the edge. AWS offers Amazon Timestream and Alibaba Cloud offers Time Series Database.

InfluxDB is an excellent choice for a time-series database. It was my first choice, along with TimescaleDB, when developing this stack. However, InfluxDB Flux’s apparent incompatibilities with some ARM-based architectures ruled it out for inclusion in the stack for this particular post.

We will use TimescaleDB as our time-series database. TimescaleDB is the leading open-source relational database for time-series data. Described as ‘PostgreSQL for time-series,’ TimescaleDB is based on PostgreSQL, which provides full ANSI SQL, rock-solid reliability, and a massive ecosystem. TimescaleDB claims to achieve 10–100x faster queries than PostgreSQL, InfluxDB, and MongoDB, with native optimizations for time-series analytics.

TimescaleDB claims to achieve 10–100x faster queries than PostgreSQL, InfluxDB, and MongoDB, with native optimizations for time-series analytics.

TimescaleDB is designed for performing analytical queries, both through its native support for PostgreSQL’s full range of SQL functionality, as well as additional functions native to TimescaleDB. These time-series optimized functions include Median/Percentile, Cumulative Sum, Moving Average, Increase, Rate, Delta, Time Bucket, Histogram, and Gap Filling.

Ad-hoc Data Query Engine

We have the option of using psql, the terminal-based front-end to PostgreSQL, to execute ad-hoc queries against TimescaleDB. The psql front-end enables you to enter queries interactively, issue them to PostgreSQL, and see the query results.

Image for post
View of psql terminal-based interface for querying the TimescaleDB database

We also have the option of using pgAdmin, specifically the biarms/pgadmin4 Docker version, to execute ad-hoc queries and perform most other database tasks. pgAdmin is the most popular open-source administration and development platform for PostgreSQL. While several popular Docker versions of pgAdmin only support Linux AMD64 architectures, the biarms/pgadmin4 Docker version supports ARM-based devices.

Image for post
Dashboard view of TimescaleDB database from within pgAdmin UI

Image for post
Executing a query against the TimescaleDB database using pgAdmin’s Query Tool

Data Visualization

For data visualization, we will use Grafana. Grafana allows you to query, visualize, alert on, and understand metrics no matter where they are stored. With Grafana, you can create, explore, and share dashboards, fostering a data-driven culture. Grafana allows you to define thresholds visually and get notified via Slack, PagerDuty, and more. Grafana supports dozens of data sources, including MySQL, PostgreSQL, Elasticsearch, InfluxDB, TimescaleDB, Graphite, Prometheus, Google BigQuery, GraphQL, and Oracle. Grafana is extensible through a large collection of plugins.

Image for post
Example of Grafana dashboard showing the post’s IoT sensor data

Edge Deployment and Management Platform

Docker introduced the current industry standard for containers in 2013. Docker containers are a standardized unit of software that allows developers to isolate apps from their environment. We will use Docker to deploy the IoT edge analytics stack, referred to herein as the GTM Stack, composed of containerized versions of Eclipse Mosquitto, TimescaleDB, and Grafana, and pgAdmin, to an ARM-based edge node. The acronym, GTM, comes from the three primary OSS projects composing the stack. The acronym also suggests Greenwich Mean Time, relating to the precise time-series nature of IoT data.

Image for post
GMT IoT Edge Analytics Stack architecture (Image by author)

Running Docker Engine in swarm mode, we can use Docker to deploy the complete IoT edge analytics stack to the swarm, running on the edge node. The deploy command accepts a stack description in the form of a Docker Compose file, a YAML file used to configure the application’s services. With a single command, we can create and start all the services from the configuration file.

Source Code

All source code for this post is available on GitHub. Use the following command to git clone a local copy of the project.

git clone –branch main –single-branch –depth 1 –no-tags \
https://github.com/garystafford/iot-analytics-at-the-edge.git
view raw github_gtm.sh hosted with ❤ by GitHub

IoT Devices

For this post, I have deployed three Linux ARM-based IoT devices, each connected to a sensor array. Each sensor array contains multiple analog and digital sensors. The sensors record temperature, humidity, air quality (liquefied petroleum gas (LPG), carbon monoxide (CO), and smoke), light, and motion. For more information on the IoT device and sensor hardware involved, please see my previous post: Getting Started with IoT Analytics on AWS.

Each ARM-based IoT device is running a small Python3-based script, sensor_data_to_mosquitto.py, shown below.

import argparse
import json
import logging
import sys
import time
from datetime import datetime
import paho.mqtt.client as client
import paho.mqtt.publish as publish
from Sensors import Sensors
from getmac import get_mac_address
from pytz import timezone
# Author: Gary A. Stafford
# Date: 10/11/2020
# Usage: python3 sensor_data_to_mosquitto.py \
# –host "192.168.1.12" –port 1883 \
# –topic "sensor/output" –frequency 10
sensors = Sensors()
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
def main():
args = parse_args()
publish_message_to_db(args)
def get_readings():
sensors.led_state(0)
# Retrieve sensor readings
payload_dht = sensors.get_sensor_data_dht()
payload_gas = sensors.get_sensor_data_gas()
payload_light = sensors.get_sensor_data_light()
payload_motion = sensors.get_sensor_data_motion()
message = {
"device_id": get_mac_address(),
"time": datetime.now(timezone("UTC")),
"data": {
"temperature": payload_dht["temperature"],
"humidity": payload_dht["humidity"],
"lpg": payload_gas["lpg"],
"co": payload_gas["co"],
"smoke": payload_gas["smoke"],
"light": payload_light["light"],
"motion": payload_motion["motion"]
}
}
return message
def date_converter(o):
if isinstance(o, datetime):
return o.__str__()
def publish_message_to_db(args):
while True:
message = get_readings()
message_json = json.dumps(message, default=date_converter, sort_keys=True,
indent=None, separators=(',', ':'))
logger.debug(message_json)
try:
publish.single(args.topic, payload=message_json, qos=0, retain=False,
hostname=args.host, port=args.port, client_id="",
keepalive=60, will=None, auth=None, tls=None,
protocol=client.MQTTv311, transport="tcp")
except Exception as error:
logger.error("Exception: {}".format(error))
finally:
time.sleep(args.frequency)
# Read in command-line parameters
def parse_args():
parser = argparse.ArgumentParser(description='Script arguments')
parser.add_argument('–host', help='Mosquitto host', default='localhost')
parser.add_argument('–port', help='Mosquitto port', type=int, default=1883)
parser.add_argument('–topic', help='Mosquitto topic', default='paho/test')
parser.add_argument('–frequency', help='Message frequency in seconds', type=int, default=5)
return parser.parse_args()
if __name__ == "__main__":
main()

The IoT devices’ script implements the Eclipse Paho MQTT Python client library. An MQTT message containing simultaneous readings from each sensor is sent to a Mosquitto topic on the edge node, at a configurable frequency.

message = {
"device_id": get_mac_address(),
"time": datetime.now(timezone("UTC")),
"data": {
"temperature": payload_dht["temperature"],
"humidity": payload_dht["humidity"],
"lpg": payload_gas["lpg"],
"co": payload_gas["co"],
"smoke": payload_gas["smoke"],
"light": payload_light["light"],
"motion": payload_motion["motion"]
}
}
view raw sensor_message.py hosted with ❤ by GitHub

IoT Edge Node

For this post, I have deployed a single Linux ARM-based edge node. The three IoT devices, containing sensor arrays, communicate with the edge node over Wi-Fi. The IoT devices could easily use an alternative communication protocol, such as BLE, LoRaWAN, or Ethernet. For more information on BLE and LoRaWAN, please see some of my previous posts: LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT and BLE and GATT for IoT: Getting Started with Bluetooth Low Energy (BLE) and Generic Attribute Profile (GATT) Specification for IoT.

The edge node is also running a small Python3-based script, mosquitto_to_timescaledb.py, shown below.

import argparse
import json
import logging
import sys
from datetime import datetime
import paho.mqtt.client as mqtt
import psycopg2
# Author: Gary A. Stafford
# Date: 10/11/2020
# Usage: python3 mosquitto_to_timescaledb.py \
# –msqt_topic "sensor/output –msqt_host "192.168.1.12" –msqt_port 1883 \
# –ts_host "192.168.1.12" –ts_port 5432 \
# –ts_username postgres –ts_password postgres1234 –ts_database demo_iot
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
args = argparse.Namespace
ts_connection: str = ""
def main():
global args
args = parse_args()
global ts_connection
ts_connection = "postgres://{}:{}@{}:{}/{}".format(args.ts_username, args.ts_password, args.ts_host,
args.ts_port, args.ts_database)
logger.debug("TimescaleDB connection: {}".format(ts_connection))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.msqt_host, args.msqt_port, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
logger.debug("Connected with result code {}".format(str(rc)))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(args.msqt_topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
logger.debug("Topic: {}, Message Payload: {}".format(msg.topic, str(msg.payload)))
publish_message_to_db(msg)
def date_converter(o):
if isinstance(o, datetime):
return o.__str__()
def publish_message_to_db(message):
message_payload = json.loads(message.payload)
# logger.debug("message.payload: {}".format(json.dumps(message_payload, default=date_converter)))
sql = """INSERT INTO sensor_data(time, device_id, temperature, humidity, lpg, co, smoke, light, motion)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);"""
data = (
message_payload["time"], message_payload["device_id"], message_payload["data"]["temperature"],
message_payload["data"]["humidity"], message_payload["data"]["lpg"], message_payload["data"]["co"],
message_payload["data"]["smoke"], message_payload["data"]["light"], message_payload["data"]["motion"])
try:
with psycopg2.connect(ts_connection, connect_timeout=3) as conn:
with conn.cursor() as curs:
try:
curs.execute(sql, data)
except psycopg2.Error as error:
logger.error("Exception: {}".format(error.pgerror))
except Exception as error:
logger.error("Exception: {}".format(error))
except psycopg2.OperationalError as error:
logger.error("Exception: {}".format(error.pgerror))
finally:
conn.close()
# Read in command-line parameters
def parse_args():
parser = argparse.ArgumentParser(description='Script arguments')
parser.add_argument('–msqt_topic', help='Mosquitto topic', default='paho/test')
parser.add_argument('–msqt_host', help='Mosquitto host', default='localhost')
parser.add_argument('–msqt_port', help='Mosquitto port', type=int, default=1883)
parser.add_argument('–ts_host', help='TimescaleDB host', default='localhost')
parser.add_argument('–ts_port', help='TimescaleDB port', type=int, default=5432)
parser.add_argument('–ts_username', help='TimescaleDB username', default='postgres')
parser.add_argument('–ts_password', help='TimescaleDB password', default='postgres1234')
parser.add_argument('–ts_database', help='TimescaleDB password', default='demo_iot')
return parser.parse_args()
if __name__ == "__main__":
main()

Similar to the IoT devices, the edge node’s script implements the Eclipse Paho MQTT Python client library. The script pulls MQTT messages off a Mosquitto topic(s), serializes the message payload to JSON, and writes the payload’s data to the TimescaleDB database. The edge node’s script accepts several arguments, which allow you to configure necessary Mosquitto and TimescaleDB variables.

Why not use Telegraf?

Telegraf is a plugin-driven agent that collects, processes, aggregates, and writes metrics. There is a Telegraf output plugin, the PostgreSQL and TimescaleDB Output Plugin for Telegraf, produced by TimescaleDB. The plugin could replace the need to manage and maintain the above script. However, I chose not to use it because it is not yet an official Telegraf plugin. If the plugin was included in a Telegraf release, I would certainly encourage its use.

Script Management

Both the Linux-based IoT devices and the edge node run systemd system and service manager. To ensure the Python scripts keep running in the case of a system restart, we define a systemd unit. Units are the objects that systemd knows how to manage. These are basically a standardized representation of system resources that can be managed by the suite of daemons and manipulated by the provided utilities. Each script has a systemd unit files. Below, we see the gtm_stack_mosquitto unit file, gtm_stack_mosquitto.service.

[Unit]
Description=GTM Stack – Mosquitto Script
After=network.target
[Service]
ExecStart=/usr/bin/python3 -u sensor_data_to_mosquitto.py \
–host ${MOSQUITTO_HOST} –port ${MOSQUITTO_PORT} –topic ${MOSQUITTO_TOPIC}
WorkingDirectory=/home/pi/iot-analytics-at-the-edge
StandardOutput=inherit
StandardError=inherit
Restart=always
User=pi
[Install]
WantedBy=multi-user.target

The gtm_stack_mosq_to_tmscl unit file, gtm_stack_mosq_to_tmscl.service, is nearly identical.

To install the gtm_stack_mosquitto.service systemd unit file on each IoT device, use the following commands.

SERVICE=gtm_stack_mosquitto
sudo cp ${SERVICE}.service /etc/systemd/system/
sudo systemctl start ${SERVICE}.service
sudo systemctl enable ${SERVICE}.service
# check status
systemctl status ${SERVICE}
view raw systemd.sh hosted with ❤ by GitHub

Installing the gtm_stack_mosq_to_tmscl.service unit file on the edge node is nearly identical.

Docker Stack

The edge node runs the GTM Docker stack, stack.yml, in a swarm. As discussed earlier, the stack contains four containers: Eclipse Mosquitto, TimescaleDB, and Grafana, along with pgAdmin. The Mosquitto, TimescaleDB, and Grafana containers have paths within the containers, bind-mounted to directories on the edge device. With bind-mounting, the container’s data will persist if the containers are removed and re-created. The containers are running on their own isolated overlay network.

version: "3.8"
services:
timescaledb:
image: timescale/timescaledb:1.7.4-pg12
ports:
"5432:5432/tcp"
networks:
demo-iot-net
environment:
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: postgres1234
POSTGRES_DB: demo_iot
deploy:
restart_policy:
condition: on-failure
volumes:
$HOME/data/postgres:/var/lib/postgresql/data
grafana:
image: grafana/grafana:7.1.5
ports:
"3000:3000/tcp"
networks:
demo-iot-net
deploy:
restart_policy:
condition: on-failure
volumes:
$HOME/data/grafana:/var/lib/grafana
user: $ID
mosquitto:
image: eclipse-mosquitto:1.6.12
ports:
"1883:1883/tcp"
# – "9001:9001/tcp"
networks:
demo-iot-net
deploy:
restart_policy:
condition: on-failure
volumes:
$HOME/data/mosquitto:/mosquitto
pgadmin:
image: biarms/pgadmin4:4.21
ports:
"5050:5050/tcp"
networks:
demo-iot-net
deploy:
restart_policy:
condition: on-failure
networks:
demo-iot-net:
view raw gtm_stack.yml hosted with ❤ by GitHub

The GTM Docker stack is installed using the following commands on the edge node. We will assume Docker and git are pre-installed on the edge node for this post.

# on edge node
git clone https://github.com/garystafford/iot-analytics-at-the-edge.git
# build directories
mkdir -p ~/data/postgres
mkdir -p ~/data/grafana
mkdir -p ~/data/mosquitto/config
# move mosquitto config
cd iot-analytics-at-the-edge
cp mosquitto.conf ~/data/mosquitto/config/
# deploy stack
docker swarm init
docker stack deploy -c stack.yml iot
# check status of stack
docker stack ps iot –no-trunc
docker stack services iot
view raw gtm_stack.sh hosted with ❤ by GitHub

First, we create the proper local directories on the edge device, which will be used to bind-mount to the container’s directories. Below, we see the bind-mounted local directories with the eventual container’s contents stored within them.

Image for post
The bind-mounted local directories on the edge device from the stack

Next, we copy the custom Mosquitto configuration file, mosquitto.conf, included in the project, to the correct location on the edge device. Lastly, we initialize the Docker swarm and deploy the stack.

Image for post
Output of docker container ls command, showing the running GTM Stack containers
Image for post
Output of docker stats command, showing the resource consumption of GTM Stack containers

TimescaleDB Setup

With the GTM stack running, we need to create a single Timescale hypertablesensor_data, in the TimescaleDB demo_iot database, to hold the incoming IoT sensor data. Hypertables, according to TimescaleDB, are designed to be easy to manage and to behave like standard PostgreSQL tables. Hypertables are comprised of many interlinked “chunk” tables. Commands made to the hypertable automatically propagate changes down to all of the chunks belonging to that hypertable.

CREATE TABLE IF NOT EXISTS sensor_data (
time timestamptz NOT NULL,
device_id text NOT NULL,
temperature double PRECISION NOT NULL,
humidity double PRECISION NOT NULL,
lpg double PRECISION NOT NULL,
co double PRECISION NOT NULL,
smoke double PRECISION NOT NULL,
light boolean NOT NULL,
motion boolean NOT NULL
);
SELECT create_hypertable('sensor_data', 'time');
view raw sensor_data.sql hosted with ❤ by GitHub

I suggest using psql to execute the required DDL statements, which will create the hypertable, as well as the proceeding views and database user permissions. All SQL statements are included in the project’s statements.sql file. One way to use psql is to install it on your local workstation, then use psql to connect to the remote edge node. I prefer to instantiate a local PostgreSQL Docker container instance running psql. I then use the local container’s psql client to connect to the edge node’s TimescaleDB database. For example, from my local machine, I run the following docker run command to connect to the edge node’s TimescaleDB database, on the edge node, located locally at 192.168.1.12.

docker run -it –rm postgres psql \
-U postgres -h 192.168.1.12 -p 5432 -d demo_iot
view raw docker_run.sh hosted with ❤ by GitHub

Although not always practical, could also access psql from within the TimescaleDB Docker container, running on the actual edge node, using the following docker exec command.

TIMESCALEDB_CONTAINER=$(docker ps -q \
–filter='name=iot_timescaledb.1' –format '{{.Names}}')
docker exec -it ${TIMESCALEDB_CONTAINER} psql \
-U postgres -h localhost -d demo_iot
view raw access_psql.sh hosted with ❤ by GitHub

TimescaleDB Continuous Aggregates

For this post’s demonstration, we need to create four TimescaleDB database views, which will be queried from an eventual Grafana Dashboard. The views are TimescaleDB Continuous Aggregates. According to Timescale, aggregate queries that touch large swathes of time-series data can take a long time to compute because the system needs to scan large amounts of data on every query execution. TimescaleDB continuous aggregates automatically calculate the results of a query in the background and materialize the results.

For example, in this post, we generate sensor data every five seconds from the three IoT devices. When visualizing a 24-hour period in Grafana, using continuous aggregates with an interval of one minute, we would reduce the total volume of data queried from ~51,840 rows to ~4,320 rows, a reduction of over 91%. The larger the time period or the number of IoT devices being analyzed, the more significant these savings will positively impact query performance.

time_bucket on the time partitioning column of the hypertable is required in all continuous aggregate views. The time_bucket function, in this case, has a bucket width (interval) of 1 minute. The interval is configurable.

temperature and humidity
CREATE VIEW temperature_humidity_summary_minute WITH (timescaledb.continuous)
AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time) AS bucket,
AVG(temperature) AS avg_temp,
AVG(humidity) AS avg_humidity
FROM sensor_data
GROUP BY device_id,
bucket;
air quality (lpg, co, smoke)
CREATE VIEW air_quality_summary_minute WITH (timescaledb.continuous)
AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time) AS bucket,
AVG(lpg) AS avg_lpg,
MAX(co) AS avg_co,
MIN(smoke) AS avg_smoke
FROM sensor_data
GROUP BY device_id,
bucket;
light
CREATE VIEW light_summary_minute WITH (timescaledb.continuous)
AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time) AS bucket,
AVG(
case
when light = 't' then 1
else 0
end
) AS avg_light
FROM sensor_data
GROUP BY device_id,
bucket;
motion
CREATE VIEW motion_summary_minute WITH (timescaledb.continuous)
AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time) AS bucket,
AVG(
case
when motion = 't' then 1
else 0
end
) AS avg_motion
FROM sensor_data
GROUP BY device_id,
bucket;
view raw view.sql hosted with ❤ by GitHub

Limiting Grafana’s Access to IoT Data

Following the Grafana recommendation for database user permissions, we create a grafanareader PostgresSQL user, and limit the user’s access to the sensor_data table and the four views we created. Grafana will use this user’s credentials to perform SELECT queries of the TimescaleDB demo_iot database.

CREATE USER grafanareader WITH PASSWORD 'grafana1234';
GRANT USAGE ON SCHEMA public TO grafanareader;
GRANT SELECT ON public.sensor_data TO grafanareader;
GRANT SELECT ON public.temperature_humidity_summary_minute TO grafanareader;
GRANT SELECT ON public.air_quality_summary_minute TO grafanareader;
GRANT SELECT ON public.light_summary_minute TO grafanareader;
GRANT SELECT ON public.motion_summary_minute TO grafanareader;
view raw grafanareader.sql hosted with ❤ by GitHub

Grafana Dashboards

Using the TimescaleDB continuous aggregates we have created, we can quickly build a richly-featured dashboard in Grafana. Below we see a typical IoT Dashboard you might build to monitor the post’s IoT sensor data in near real-time. An exported version, dashboard_external_export.json, is included in the GitHub project.

Image for post
Example of Grafana dashboard showing the post’s IoT sensor data
Image for post
Example of Grafana IoT Demo Dashboard showing sensor data

Grafana’s documentation includes a comprehensive set of instructions for Using PostgreSQL in Grafana. To connect to the TimescaleDB database from Grafana, we use a PostgreSQL data source.

Image for post
Configuring the TimescaleDB database connection in Grafana

The data displayed in each Panel in the Grafana Dashboard is based on a SQL query. For example, the Average Temperature Panel might use a query similar to the example below. This particular query also converts Celcius to Fahrenheit. Note the use of Grafana Macros (e.g., $__time()$__timeFilter()). Macros can be used within a query to simplify syntax and allow for dynamic parts.

SELECT
$__time(bucket),
device_id AS metric,
((avg_temp * 1.9) + 32) AS avg_temp
FROM temperature_humidity_summary_minute
WHERE
$__timeFilter(bucket)
ORDER BY 1,2

Below, we see another example from the Average Humidity Panel. In this particular query, we might choose to validate the humidity data is within an acceptable range of 0%–100%.

SELECT
$__time(bucket),
device_id AS metric,
avg_humidity
FROM temperature_humidity_summary_minute
WHERE
$__timeFilter(bucket)
AND avg_humidity >= 0.0
AND avg_humidity <= 100.0
ORDER BY 1,2

Mobile Friendly

Grafana dashboards are mobile-friendly. Below we see two views of the dashboard, using the Chrome mobile browser on an Apple iPhone.

Image for post

Grafana Alerts

Grafana allows Alerts to be created based on Rules you define in each Panel. If data values match the Rule’s conditions, which you pre-define, such as a temperature reading above a certain threshold for a set amount of time, an alert is sent to your choice of destinations. According to the Rule shown below, If the average temperature exceeds 75°F for a period of 5 minutes, an alert is sent.

Image for post
High-temperature rule configuration

As demonstrated below, when the temperature in the laboratory began to exceed 75°F, the alert entered a ‘Pending’ state. If the temperature exceeded 75°F for the pre-determined period of 5 minutes, the alert status changed to ‘Alerting’, and an alert was sent. When the temperature dropped back below 75°F for the pre-determined period of 5 minutes, the alert status changed from ‘Alerting’ to ‘OK’, and a subsequent notification was sent.

Image for post
Average temperature graph showing the various alert status changes

There are currently 18 destinations available out-of-the-box with Grafana, including Slack, email, PagerDuty, webhooks, HipChat, and Microsoft Teams. We can use Grafana Alerts to notify the proper resources, in near real-time, if an issue is detected, based on the data. Below, we see an actual series of high-temperature alerts sent by Grafana to a Slack channel, followed by subsequent notifications as the temperature returned to normal.

Image for post
Grafana alert notifications in Slack channel

Ad-hoc Queries

The ability to perform ad-hoc queries on the time-series IoT data is an essential feature of the IoT edge analytics stack. We can use psql or pgAdmin to perform ad-hoc queries against the TimescaleDB database. Below are examples of typical ad-hoc queries we might perform on the IoT sensor data. These example queries demonstrate TimescaleDB’s advanced analytical capabilities for working with time-series data, including Moving Average, Delta, Time Bucket, and Histogram.

find max temperature (°C) and humidity (%) for last 3 hours in 15 minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#select
SELECT time_bucket('15 minutes', time) AS fifteen_min,
device_id,
COUNT(*),
MAX(temperature) AS max_temp,
MAX(humidity) AS max_hum
FROM sensor_data
WHERE time > NOW() INTERVAL '3 hours'
AND humidity BETWEEN 0 AND 100
GROUP BY fifteen_min, device_id
ORDER BY fifteen_min DESC, max_temp DESC;
find temperature (°C) anomalies (delta > ~5°F)
https://docs.timescale.com/latest/using-timescaledb/reading-data#delta
SELECT ht.time, ht.temperature, ht.delta
FROM (
SELECT time,
temperature,
abs(temperature LAG(temperature) OVER (ORDER BY time)) AS delta
FROM sensor_data) AS ht
WHERE ht.delta > 2.63
ORDER BY ht.time;
find three minute moving average of temperature (°F) for last day
(5 sec. interval * 36 rows = 3 min.)
https://docs.timescale.com/latest/using-timescaledb/reading-data#moving-average
SELECT time,
AVG((temperature * 1.9) + 32) OVER (ORDER BY time
ROWS BETWEEN 35 PRECEDING AND CURRENT ROW)
AS smooth_temp
FROM sensor_data
WHERE device_id = 'Manufacturing Plant'
AND time > NOW() INTERVAL '1 day'
ORDER BY time DESC;
find average humidity (%) for last 12 hours in 5-minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#time-bucket
SELECT time_bucket('5 minutes', time) AS time_period,
AVG(humidity) AS avg_humidity
FROM sensor_data
WHERE device_id = 'Main Warehouse'
AND humidity BETWEEN 0 AND 100
AND time > NOW() INTERVAL '12 hours'
GROUP BY time_period
ORDER BY time_period DESC;
calculate histograms of avg. temperature (°F) between 55-85°F in 5°F buckets during last 2 days
https://docs.timescale.com/latest/using-timescaledb/reading-data#histogram
SELECT device_id,
COUNT(*),
histogram((temperature * 1.9) + 32, 55.0, 85.0, 5)
FROM sensor_data
WHERE temperature is not Null
AND time > NOW() INTERVAL '2 days'
GROUP BY device_id;
find average light value for last 90 minutes in 5-minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#time-bucket
SELECT device_id,
time_bucket('5 minutes', time) AS five_min,
AVG(case when light = 't' then 1 else 0 end) AS avg_light
FROM sensor_data
WHERE device_id = 'Manufacturing Plant'
AND time > NOW() INTERVAL '90 minutes'
GROUP BY device_id, five_min
ORDER BY five_min DESC;
view raw ad_hoc_timescale.sql hosted with ❤ by GitHub

Conclusion

In this post, we have explored the development of an IoT edge analytics stack, comprised of lightweight, purpose-built, easily deployable and manageable, platform- and programming language-agnostic, open-source software components. These components included Docker containerized versions of Eclipse Mosquitto, TimescaleDB, Grafana, and pgAdmin, referred to as the GTM Stack. Using the GTM stack, we collected, analyzed, and visualized IoT data, without first shipping the data to Cloud or other external systems.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment

Java Development with Microsoft SQL Server: Calling Microsoft SQL Server Stored Procedures from Java Applications Using JDBC

Introduction

Enterprise software solutions often combine multiple technology platforms. Accessing an Oracle database via a Microsoft .NET application and vice-versa, accessing Microsoft SQL Server from a Java-based application is common. In this post, we will explore the use of the JDBC (Java Database Connectivity) API to call stored procedures from a Microsoft SQL Server 2017 database and return data to a Java 11-based console application.

View of the post’s Java project from JetBrains’ IntelliJ IDE

The objectives of this post include:

  • Demonstrate the differences between using static SQL statements and stored procedures to return data.
  • Demonstrate three types of JDBC statements to return data: Statement, PreparedStatement, and CallableStatement.
  • Demonstrate how to call stored procedures with input and output parameters.
  • Demonstrate how to return single values and a result set from a database using stored procedures.

Why Stored Procedures?

To access data, many enterprise software organizations require their developers to call stored procedures within their code as opposed to executing static T-SQL (Transact-SQL) statements against the database. There are several reasons stored procedures are preferred:

  • Optimization: Stored procedures are often written by DBAs or database developers who specialize in database development. They understand the best way to construct queries for optimal performance and minimal load on the database server. Think of it as a developer using an API to interact with the database.
  • Safety and Security: Stored procedures are considered safer and more secure than static SQL statements. The stored procedure provides tight control over the content of the queries, preventing malicious or unintentionally destructive code from being executed against the database.
  • Error Handling: Stored procedures can contain logic for handling errors before they bubble up to the application layer and possibly to the end-user.

AdventureWorks 2017 Database

For brevity, I will use an existing and well-known Microsoft SQL Server database, AdventureWorks. The AdventureWorks database was originally published by Microsoft for SQL Server 2008. Although a bit dated architecturally, the database comes prepopulated with plenty of data for demonstration purposes.

The HumanResources schema, one of five schemas within the AdventureWorks database

For the demonstration, I have created an Amazon RDS for SQL Server 2017 Express Edition instance on AWS. You have several options for deploying SQL Server, including AWS, Microsoft Azure, Google Cloud, or installed on your local workstation.

There are many methods to deploy the AdventureWorks database to Microsoft SQL Server. For this post’s demonstration, I used the AdventureWorks2017.bak backup file, which I copied to Amazon S3. Then, I enabled and configured the native backup and restore feature of Amazon RDS for SQL Server to import and install the backup.

DROP DATABASE IF EXISTS AdventureWorks;
GO

EXECUTE msdb.dbo.rds_restore_database
@restore_db_name='AdventureWorks',
@s3_arn_to_restore_from='arn:aws:s3:::my-bucket/AdventureWorks2017.bak',
@type='FULL',
@with_norecovery=0;

-- get task_id from output (e.g. 1)

EXECUTE msdb.dbo.rds_task_status
@db_name='AdventureWorks',
@task_id=1;

Install Stored Procedures

For the demonstration, I have added four stored procedures to the AdventureWorks database to use in this post. To follow along, you will need to install these stored procedures, which are included in the GitHub project.

View of the new stored procedures from JetBrains’ IntelliJ IDE Database tab

Data Sources, Connections, and Properties

Using the latest Microsoft JDBC Driver 8.4 for SQL Server (ver. 8.4.1.jre11), we create a SQL Server data source, com.microsoft.sqlserver.jdbc.SQLServerDataSource, and database connection, java.sql.Connection. There are several patterns for creating and working with JDBC data sources and connections. This post does not necessarily focus on the best practices for creating or using either. In this example, the application instantiates a connection class, SqlConnection.java, which in turn instantiates the java.sql.Connection and com.microsoft.sqlserver.jdbc.SQLServerDataSource objects. The data source’s properties are supplied from an instance of a singleton class, ProjectProperties.java. This properties class instantiates the java.util.Properties class, which reads values from a configuration properties file, config.properties. On startup, the application creates the database connection, calls each of the example methods, and then closes the connection.

Examples

For each example, I will show the stored procedure, if applicable, followed by the Java method that calls the procedure or executes the static SQL statement. I have left out the data source and connection code in the article. Again, a complete copy of all the code for this article is available on GitHub, including Java source code, SQL statements, helper SQL scripts, and a set of basic JUnit tests.

To run the JUnit unit tests, using Gradle, which the project is based on, use the ./gradlew cleanTest test --warning-mode none command.

A successful run of the JUnit tests

To build and run the application, using Gradle, which the project is based on, use the ./gradlew run --warning-mode none command.

The output of the Java console application

Example 1: SQL Statement

Before jumping into stored procedures, we will start with a simple static SQL statement. This example’s method, getAverageProductWeightST, uses the java.sql.Statement class. According to Oracle’s JDBC documentation, the Statement object is used for executing a static SQL statement and returning the results it produces. This SQL statement calculates the average weight of all products in the AdventureWorks database. It returns a solitary double numeric value. This example demonstrates one of the simplest methods for returning data from SQL Server.

/**
* Statement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightST() {
double averageWeight = 0;
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.getConnection().createStatement();
String sql = "WITH Weights_CTE(AverageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2)" +
"FROM [Weights_CTE];";
rs = stmt.executeQuery(sql);
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 2: Prepared Statement

Next, we will execute almost the same static SQL statement as in Example 1. The only change is the addition of the column name, averageWeight. This allows us to parse the results by column name, making the code easier to understand as opposed to using the numeric index of the column as in Example 1.

Also, instead of using the java.sql.Statement class, we use the java.sql.PreparedStatement class. According to Oracle’s documentation, a SQL statement is precompiled and stored in a PreparedStatement object. This object can then be used to execute this statement multiple times efficiently.

/**
* PreparedStatement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightPS() {
double averageWeight = 0;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
String sql = "WITH Weights_CTE(averageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2) AS [averageWeight]" +
"FROM [Weights_CTE];";
pstmt = connection.getConnection().prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
averageWeight = rs.getDouble("averageWeight");
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 3: Callable Statement

In this example, the average product weight query has been moved into a stored procedure. The procedure is identical in functionality to the static statement in the first two examples. To call the stored procedure, we use the java.sql.CallableStatement class. According to Oracle’s documentation, the CallableStatement extends PreparedStatement. It is the interface used to execute SQL stored procedures. The CallableStatement accepts both input and output parameters; however, this simple example does not use either. Like the previous two examples, the procedure returns a double numeric value.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeight]
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
ResultSet rs = null;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeight]}");
cstmt.execute();
rs = cstmt.getResultSet();
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 4: Calling a Stored Procedure with an Output Parameter

In this example, we use almost the same stored procedure as in Example 3. The only difference is the inclusion of an output parameter. This time, instead of returning a result set with a value in a single unnamed column, the column has a name, averageWeight. We can now call that column by name when retrieving the value.

The stored procedure patterns found in Examples 3 and 4 are both commonly used. One procedure uses an output parameter, and one not, both return the same value(s). You can use the CallableStatement to for either type.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeightOUT]@averageWeight DECIMAL(8, 2) OUT
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT @averageWeight = ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) output parameter, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightOutCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeightOUT](?)}");
cstmt.registerOutParameter("averageWeight", Types.DECIMAL);
cstmt.execute();
averageWeight = cstmt.getDouble("averageWeight");
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 5: Calling a Stored Procedure with an Input Parameter

In this example, the procedure returns a result set, java.sql.ResultSet, of employees whose last name starts with a particular sequence of characters (e.g., ‘M’ or ‘Sa’). The sequence of characters is passed as an input parameter, lastNameStartsWith, to the stored procedure using the CallableStatement.

The method making the call iterates through the rows of the result set returned by the stored procedure, concatenating multiple columns to form the employee’s full name as a string. Each full name string is then added to an ordered collection of strings, a List<String> object. The List instance is returned by the method. You will notice this procedure takes a little longer to run because of the use of the LIKE operator. The database server has to perform pattern matching on each last name value in the table to determine the result set.

CREATE OR
ALTER PROCEDURE [HumanResources].[uspGetEmployeesByLastName]
@lastNameStartsWith VARCHAR(20) = 'A'
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[FirstName], p.[MiddleName], p.[LastName], p.[Suffix], e.[JobTitle], m.[EmailAddress]
FROM [HumanResources].[Employee] AS e
LEFT JOIN [Person].[Person] p ON e.[BusinessEntityID] = p.[BusinessEntityID]
LEFT JOIN [Person].[EmailAddress] m ON e.[BusinessEntityID] = m.[BusinessEntityID]
WHERE e.[CurrentFlag] = 1
AND p.[PersonType] = 'EM'
AND p.[LastName] LIKE @lastNameStartsWith + '%'
ORDER BY p.[LastName], p.[FirstName], p.[MiddleName]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) input parameter, returns ResultSet
*
*
@param lastNameStartsWith
*
@return List of employee names
*/
public List<String> getEmployeesByLastNameCS(String lastNameStartsWith) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<String> employeeFullName = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [HumanResources].[uspGetEmployeesByLastName](?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("lastNameStartsWith", lastNameStartsWith);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
employeeFullName.add(
rs.getString("LastName") + ", "
+ rs.getString("FirstName") + " "
+ rs.getString("MiddleName"));
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return employeeFullName;
}

Example 6: Converting a Result Set to Ordered Collection of Objects

In this last example, we pass two input parameters, productColor and productSize, to a slightly more complex stored procedure. The stored procedure returns a result set containing several columns of product information. This time, the example’s method iterates through the result set returned by the procedure and constructs an ordered collection of products, List<Product> object. The Product objects in the list are instances of the Product.java POJO class. The method converts each results set’s row’s field value into a Product property (e.g., Product.Size, Product.Model). Using a collection is a common method for persisting data from a result set in an application.

CREATE OR
ALTER PROCEDURE [Production].[uspGetProductsByColorAndSize]
@productColor VARCHAR(20),
@productSize INTEGER
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[ProductNumber], m.[Name] AS [Model], p.[Name] AS [Product], p.[Color], p.[Size]
FROM [Production].[ProductModel] AS m
INNER JOIN
[Production].[Product] AS p ON m.[ProductModelID] = p.[ProductModelID]
WHERE (p.[Color] = @productColor)
AND (p.[Size] = @productSize)
ORDER BY p.[ProductNumber], [Model], [Product]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (2) input parameters, returns ResultSet
*
*
@param color
*
@param size
*
@return List of Product objects
*/
public List<Product> getProductsByColorAndSizeCS(String color, String size) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<Product> productList = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetProductsByColorAndSize](?, ?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("productColor", color);
cstmt.setString("productSize", size);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
Product product = new Product(
rs.getString("Product"),
rs.getString("ProductNumber"),
rs.getString("Color"),
rs.getString("Size"),
rs.getString("Model"));
productList.add(product);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return productList;
}

Proper T-SQL: Schema Reference and Brackets

You will notice in all T-SQL statements, I refer to the schema as well as the table or stored procedure name (e.g., {call [Production].[uspGetAverageProductWeightOUT](?)}). According to Microsoft, it is always good practice to refer to database objects by a schema name and the object name, separated by a period; that even includes the default schema (e.g., dbo).

You will also notice I wrap the schema and object names in square brackets (e.g., SELECT [ProductNumber] FROM [Production].[ProductModel]). The square brackets are to indicate that the name represents an object and not a reserved word (e.g, CURRENT or NATIONAL). By default, SQL Server adds these to make sure the scripts it generates run correctly.

Running the Examples

The application will display the name of the method being called, a description, the duration of time it took to retrieve the data, and the results returned by the method.

package com.article.examples;
import java.util.List;
/**
* Main class that calls all example methods
*
* @author Gary A. Stafford
*/
public class RunExamples {
private static final Examples examples = new Examples();
private static final ProcessTimer timer = new ProcessTimer();
/**
* @param args the command line arguments
* @throws Exception
*/
public static void main(String[] args) throws Exception {
System.out.println();
System.out.println("SQL SERVER STATEMENT EXAMPLES");
System.out.println("======================================");
// Statement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
double averageWeight = examples.getAverageProductWeightST();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightST");
System.out.println("Description: Statement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// PreparedStatement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightPS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightPS");
System.out.println("Description: PreparedStatement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// CallableStatement, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightCS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightCS");
System.out.println("Description: CallableStatement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.println("");
// CallableStatement example, (1) output parameter, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightOutCS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightOutCS");
System.out.println("Description: CallableStatement, (1) output parameter, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// CallableStatement example, (1) input parameter, returns ResultSet
timer.setStartTime(System.nanoTime());
String lastNameStartsWith = "Sa";
List<String> employeeFullName =
examples.getEmployeesByLastNameCS(lastNameStartsWith);
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetEmployeesByLastNameCS");
System.out.println("Description: CallableStatement, (1) input parameter, returns ResultSet");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Last names starting with '%s': %d%n", lastNameStartsWith, employeeFullName.size());
if (employeeFullName.size() > 0) {
System.out.printf(" Last employee found: %s%n", employeeFullName.get(employeeFullName.size() 1));
} else {
System.out.printf("No employees found with last name starting with '%s'%n", lastNameStartsWith);
}
System.out.println("");
// CallableStatement example, (2) input parameters, returns ResultSet
timer.setStartTime(System.nanoTime());
String color = "Red";
String size = "44";
List<Product> productList =
examples.getProductsByColorAndSizeCS(color, size);
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetProductsByColorAndSizeCS");
System.out.println("Description: CallableStatement, (2) input parameter, returns ResultSet");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
if (productList.size() > 0) {
System.out.printf("Results: Products found (color: '%s', size: '%s'): %d%n", color, size, productList.size());
System.out.printf(" First product: %s (%s)%n", productList.get(0).getProduct(), productList.get(0).getProductNumber());
} else {
System.out.printf("No products found with color '%s' and size '%s'%n", color, size);
}
System.out.println("");
examples.closeConnection();
}
}
view raw RunExamples.java hosted with ❤ by GitHub

Below, we see the results.

SQL Statement Performance

This post is certainly not about SQL performance, demonstrated by the fact I am only using Amazon RDS for SQL Server 2017 Express Edition on a single, very underpowered db.t2.micro Amazon RDS instance types. However, I have added a timer feature, ProcessTimer.java class, to capture the duration of time each example takes to return data, measured in milliseconds. The ProcessTimer.java class is part of the project code. Using the timer, you should observe significant differences between the first run and proceeding runs of the application for several of the called methods. The time difference is a result of several factors, primarily pre-compilation of the SQL statements and SQL Server plan caching.

The effects of these two factors are easily demonstrated by clearing the SQL Server plan cache (see SQL script below) using DBCC (Database Console Commands) statements. and then running the application twice in a row. The second time, pre-compilation and plan caching should result in significantly faster times for the prepared statements and callable statements, in Examples 2–6. In the two random runs shown below, we see up to a 497% improvement in query time.

USE AdventureWorks;
DBCC FREESYSTEMCACHE('SQL Plans');
GO
CHECKPOINT;
GO
-- Impossible to run with Amazon RDS for Microsoft SQL Server on AWS
-- DBCC DROPCLEANBUFFERS;
-- GO

The first run results are shown below.

SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 122
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 146
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 72
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 623
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 830
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 427
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---

The second run results are shown below.

SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 116
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 89
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 80
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 340
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 139
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 208
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---

Conclusion

This post has demonstrated several methods for querying and calling stored procedures from a SQL Server 2017 database using JDBC with the Microsoft JDBC Driver 8.4 for SQL Server. Although the examples are quite simple, the same patterns can be used with more complex stored procedures, with multiple input and output parameters, which not only select, but insert, update, and delete data.

There are some limitations of the Microsoft JDBC Driver for SQL Server you should be aware of by reading the documentation. However, for most tasks that require database interaction, the Driver provides adequate functionality with SQL Server.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

Leave a comment

Getting Started with Presto Federated Queries using Ahana’s PrestoDB Sandbox on AWS

Introduction

According to The Presto Foundation, Presto (aka PrestoDB), not to be confused with PrestoSQL, is an open-source, distributed, ANSI SQL compliant query engine. Presto is designed to run interactive ad-hoc analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto is used in production at an immense scale by many well-known organizations, including Facebook, Twitter, Uber, Alibaba, Airbnb, Netflix, Pinterest, Atlassian, Nasdaq, and more.

In the following post, we will gain a better understanding of Presto’s ability to execute federated queries, which join multiple disparate data sources without having to move the data. Additionally, we will explore Apache Hive, the Hive Metastore, Hive partitioned tables, and the Apache Parquet file format.

Presto on AWS

There are several options for Presto on AWS. AWS recommends Amazon EMR and Amazon Athena. Presto comes pre-installed on EMR 5.0.0 and later. The Athena query engine is a derivation of Presto 0.172 and does not support all of Presto’s native features. However, Athena has many comparable features and deep integrations with other AWS services. If you need full, fine-grain control, you could deploy and manage Presto, yourself, on Amazon EC2, Amazon ECS, or Amazon EKS. Lastly, you may decide to purchase a Presto distribution with commercial support from an AWS Partner, such as Ahana or Starburst. If your organization needs 24x7x365 production-grade support from experienced Presto engineers, this is an excellent choice.

Federated Queries

In a modern Enterprise, it is rare to find all data living in a monolithic datastore. Given the multitude of available data sources, internal and external to an organization, and the growing number of purpose-built databases, analytics engines must be able to join and aggregate data across many sources efficiently. AWS defines a federated query as a capability that ‘enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources.

Presto allows querying data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, and MongoDB. In fact, there are currently 24 different Presto data source connectors available. With Presto, we can write queries that join multiple disparate data sources, without moving the data. Below is a simple example of a Presto federated query statement that correlates a customer’s credit rating with their age and gender. The query federates two different data sources, a PostgreSQL database table, postgresql.public.customer, and an Apache Hive Metastore table, hive.default.customer_demographics, whose underlying data resides in Amazon S3.

WITH credit_demographics AS (
SELECT
(year (now()) c_birth_year) AS age,
cd_credit_rating AS credit_rating,
cd_gender AS gender,
count(cd_gender) AS gender_count
FROM
postgresql.public.customer
LEFT JOIN hive.default.customer_demographics ON c_current_cdemo_sk = cd_demo_sk
WHERE
c_birth_year IS NOT NULL
AND cd_credit_rating IS NOT NULL
AND lower(cd_credit_rating) != 'unknown'
AND cd_gender IS NOT NULL
GROUP BY
cd_credit_rating,
c_birth_year,
cd_gender
)
SELECT
age,
credit_rating,
gender,
gender_count
FROM
credit_demographics
WHERE
age BETWEEN 21 AND 65
ORDER BY
age,
credit_rating,
gender;

Ahana

The Linux Foundation’s Presto Foundation member, Ahana, was founded as the first company focused on bringing PrestoDB-based ad hoc analytics offerings to market and working to foster growth and evangelize the Presto community. Ahana’s mission is to simplify ad hoc analytics for organizations of all shapes and sizes. Ahana has been successful in raising seed funding, led by GV (formerly Google Ventures). Ahana’s founders have a wealth of previous experience in tech companies, including Alluxio, Kinetica, Couchbase, IBM, Apple, Splunk, and Teradata.

PrestoDB Sandbox

This post will use Ahana’s PrestoDB Sandbox, an Amazon Linux 2, AMI-based solution available on AWS Marketplace, to execute Presto federated queries.

Ahana’s PrestoDB Sandbox AMI allows you to easily get started with Presto to query data wherever your data resides. This AMI configures a single EC2 instance Sandbox to be both the Presto Coordinator and a Presto Worker. It comes with an Apache Hive Metastore backed by PostgreSQL bundled in. In addition, the following catalogs are bundled in to try, test, and prototype with Presto:

  • JMX: useful for monitoring and debugging Presto
  • Memory: stores data and metadata in RAM, which is discarded when Presto restarts
  • TPC-DS: provides a set of schemas to support the TPC Benchmark DS
  • TPC-H: provides a set of schemas to support the TPC Benchmark H

Apache Hive

In this demonstration, we will use Apache Hive and an Apache Hive Metastore backed by PostgreSQL. Apache Hive is data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. The structure can be projected onto data already