Posts Tagged DAG

Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS

Introduction

In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.

Amazon EMR

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache SparkHiveHBaseFlinkHudi, and ZeppelinJupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.

Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation, while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache Airflow’s UI

Apache recently announced the release of Airflow 2.0.0 on December 17, 2020. The latest 1.x version of Airflow is 1.10.14, released December 12, 2020. However, at the time of this post, Amazon MWAA was running Airflow 1.10.12, released August 25, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow 1.10.12 documentation.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.

Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the HooksSecretsSensors, and Operators of Airflow codebase’s contrib section.

Getting Started

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/aws-airflow-demo.git

Preliminary Steps

This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.

Configuring Amazon MWAA

The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Amazon MWAA Environment Creation Process

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12.

Amazon MWAA Environment Creation Process

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-. You must also enable Bucket Versioning on the bucket. Specify a dags folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

Amazon MWAA Environment Creation Process

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

Amazon MWAA Environment Creation Process

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

AWS CloudFormation Create Stack Console

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.

AWS CloudFormation Create Stack Console
Amazon MWAA Environment Creation Process

As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.

You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.

Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small class will be sufficient for this demonstration.

Amazon MWAA Environment Creation Process

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Amazon MWAA Environment Creation Process

Airflow Execution Role

As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole and EMR_EC2_DefaultRole. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole and EMR_EC2_DemoRole. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject permission.

Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:RunJobFlow"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::123412341234:role/EMR_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DefaultRole",
"arn:aws:iam::123412341234:role/EMR_DefaultRole"
]
}
]
}

The Airflow service role, created by MWAA, is shown below with the new policy attached.

Airflow Execution Service Role with the new Policy Attached

Final Architecture

Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Demonstration’s Amazon MWAA and Amazon EMR Architecture

Amazon MWAA Environment

The new MWAA Environment will include a link to the Airflow UI.

Amazon MWAA Environment Console

Airflow UI

Using the supplied link, you should be able to access the Airflow UI using your web browser.

Apache Airflow UI

Our First DAG

The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.utils.dates import days_ago
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': 'emr-6.2.0',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw spark_pi_example.py hosted with ❤ by GitHub

Upload the DAG to the Airflow S3 bucket’s dags directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.

aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/

The DAG, spark_pi_example, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

Apache Airflow UI’s Trigger DAG Page

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Apache Airflow UI’s DAG Graph View

Switching to the EMR Console, you should see the single-node EMR cluster being created.

Amazon EMR Console’s Summary tab

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Amazon EMR Console’s Steps tab

Triggering DAGs Programmatically

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.

Below is an example of triggering the spark_pi_example DAG programmatically using Airflow’s trigger_dag CLI command. You will need to replace the WEB_SERVER_HOSTNAME variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME variable assumes only one MWAA environment is returned by jq.

export WEB_SERVER_HOSTNAME="<your_airflow_web_server.us-east-1.airflow.amazonaws.com>"
export ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
export DAG_NAME=spark_pi_example
aws mwaa create-cli-token –name "${ENVIRONMENT_NAME}" | \
export CLI_TOKEN=$(jq -r .CliToken)
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME}"
view raw trigger_dag.sh hosted with ❤ by GitHub

Analytics Job with Airflow

Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py PySpark application. This job should already exist in the processed data S3 bucket.

The DAG, dags/bakery_sales.py, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators and airflow.contrib.sensors packages for EMR.

Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
SPARK_STEPS = [
{
'Name': 'Bakery Sales',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'–deploy-mode',
'cluster',
'–master',
'yarn',
'–conf',
'spark.yarn.submit.waitAppCompletion=true',
's3a://{{ var.value.work_bucket }}/analyze/bakery_sales_ssm.py'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': '{{ var.value.release_label }}',
'LogUri': 's3n://{{ var.value.logs_bucket }}',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.xlarge',
},
],
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': '{{ var.value.emr_ec2_key_pair }}',
},
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh',
}
},
],
'Configurations': [
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
'VisibleToAllUsers': True,
'JobFlowRole': '{{ var.value.job_flow_role }}',
'ServiceRole': '{{ var.value.service_role }}',
'EbsRootVolumeSize': 32,
'StepConcurrencyLevel': 1,
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Analyze Bakery Sales with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw bakery_sales.py hosted with ❤ by GitHub

Import Variables into Airflow UI

First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json file. You will need to update the values for bootstrap_bucket, emr_ec2_key_pair, logs_bucket, and work_bucket. The three S3 buckets should all exist from the previous post.

{
"bootstrap_bucket": "emr-demo-bootstrap-123412341234-us-east-1",
"emr_ec2_key_pair": "emr-demo-123412341234-us-east-1",
"job_flow_role": "EMR_EC2_DemoRole",
"logs_bucket": "emr-demo-logs-123412341234-us-east-1",
"release_label": "emr-6.2.0",
"service_role": "EMR_DemoRole",
"work_bucket": "emr-demo-work-123412341234-us-east-1",
"ec2_subnet_id": "subnet-012abc456efg78900"
}

Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Apache Airflow UI’s Admin > Variables tab

Upload the DAG, dags/bakery_sales.py, to the Airflow S3 bucket, similar to the first DAG.

aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/

The second DAG, bakery_sales, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json.

{
    "airflow_email": "analytics_team@example.com",
    "email_on_failure": false,
    "email_on_retry": false
}

This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Apache Airflow UI’s Trigger DAG Screen

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Amazon EMR Console’s Steps tab

Multi-Step DAG

In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator steps parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator job_flow_overrides parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.

We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.

Below we see a snippet of two of the four Spark submit-job job definitions (steps), which have been moved to a separate JSON file, emr_steps/emr_steps.json.

[
{
"Name": "Movie Ratings",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/movies_avg_ratings_ssm.py",
"–start-date",
"2016-01-01 00:00:00",
"–end-date",
"2016-12-31 23:59:59"
]
}
},
{
"Name": "Stock Volatility",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/stock_volatility_ssm.py",
"–start-date",
"2017-01-01",
"–end-date",
"2018-12-31"
]
}
}
]
view raw emr_steps.json hosted with ❤ by GitHub

Below are the EMR cluster specifications (job_flow_overrides), which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json.

{
"Name": "demo-cluster-airflow",
"ReleaseLabel": "{{ var.value.release_label }}",
"LogUri": "s3n://{{ var.value.logs_bucket }}",
"Applications": [
{
"Name": "Spark"
}
],
"Instances": {
"InstanceFleets": [
{
"Name": "MASTER",
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"Name": "CORE",
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
],
"Ec2SubnetId": "{{ var.value.ec2_subnet_id }}",
"KeepJobFlowAliveWhenNoSteps": false,
"TerminationProtected": false,
"Ec2KeyName": "{{ var.value.emr_ec2_key_pair }}"
},
"BootstrapActions": [
{
"Name": "string",
"ScriptBootstrapAction": {
"Path": "s3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh"
}
}
],
"Configurations": [
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
],
"VisibleToAllUsers": true,
"JobFlowRole": "{{ var.value.job_flow_role }}",
"ServiceRole": "{{ var.value.service_role }}",
"EbsRootVolumeSize": 32,
"StepConcurrencyLevel": 5,
"Tags": [
{
"Key": "Environment",
"Value": "Development"
},
{
"Key": "Name",
"Value": "Airflow EMR Demo Project"
},
{
"Key": "Owner",
"Value": "Data Analytics Team"
}
]
}

Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name), which loads the JSON.

import json
import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
def get_object(key, bucket_name):
"""
Load S3 object as JSON
"""
hook = S3Hook()
content_object = hook.get_key(key=key, bucket_name=bucket_name)
file_content = content_object.get()['Body'].read().decode('utf-8')
return json.loads(file_content)
with DAG(
dag_id=DAG_ID,
description='Run multiple Spark jobs with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval=None,
tags=['emr', 'spark', 'pyspark']
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=get_object('job_flow_overrides/job_flow_overrides.json', work_bucket)
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=get_object('emr_steps/emr_steps.json', work_bucket)
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
cluster_creator >> step_adder >> step_checker
view raw multiple_steps.py hosted with ❤ by GitHub

This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.

aws s3 cp emr_steps/emr_steps.json \
    s3://emr-demo-work-123412341234-us-east-1/emr_steps/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
    s3://emr-demo-work-123412341234-us-east-1/job_flow_overrides/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234-us-east-1/dags/

The second DAG, multiple_steps, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json.

Apache Airflow UI’s DAGs tab

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.

Amazon EMR Console’s Steps tab showing four Steps running in parallel

Triggering DAGs Programmatically

AWS CLI

Similar to the previous example, below we can trigger the multiple_steps DAG programmatically using Airflow’s trigger_dag CLI command. Note the addition of the —-conf named argument, which passes the configuration, containing three key/value pairs, to the trigger command as a JSON blob.

ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
DAG_NAME="multiple_steps"
CONFIG="""'{
\"airflow_email\": \"analytics_team@example.com\",
\"email_on_failure\": true,
\"email_on_retry\": false
}'"""
CLI_JSON=$(aws mwaa create-cli-token –name ${ENVIRONMENT_NAME}) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME} –conf ${CONFIG}"

AWS SDK

Airflow DAGs can also be triggered using the AWS SDK. For example, with boto3 for Python, we could use a script, similar to the following to remotely trigger a DAG.

#!/usr/bin/env python3
# MWAA: Trigger an Apache Airflow DAG using SDK
# Author: Gary A. Stafford (February 2021)
import logging
import boto3
import requests
logging.basicConfig(
format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
mwaa_client = boto3.client('mwaa')
ENVIRONMENT_NAME = 'Your_Airflow_Environment_Name'
DAG_NAME = 'your_dag_name'
CONFIG = '{"foo": "bar"}'
def main():
response = mwaa_client.create_cli_token(
Name=ENVIRONMENT_NAME
)
logging.info('response: ' + str(response))
token = response['CliToken']
url = 'https://{0}/aws_mwaa/cli'.format(response['WebServerHostname'])
headers = {'Authorization': 'Bearer ' + token, 'Content-Type': 'text/plain'}
payload = 'trigger_dag {0} –conf {1}'.format(DAG_NAME, CONFIG)
response = requests.post(url, headers=headers, data=payload)
logging.info('response: ' + str(response)) # should be <Response [200]>
if __name__ == '__main__':
main()
view raw trigger_dag.py hosted with ❤ by GitHub

Cleaning Up

Once you are done with the MWAA Environment, be sure to delete it as soon as possible to save additional costs. Also, delete the MWAA-VPC CloudFormation stack. These resources, like the two NAT Gateways, will also continue to generate additional costs.

aws mwaa delete-environment --name <your_mwaa_environment_name>
aws cloudformation delete-stack --stack-name MWAA-VPC

Conclusion

In this second post in the series, we explored using the newly released Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to run PySpark applications on Amazon Elastic MapReduce (Amazon EMR). In future posts, we will explore the use of Jupyter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.

If you are interested in learning more about configuring Amazon MWAA and Airflow, see my recent post, Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment