Posts Tagged DataOps

Capturing Data Analytics Workflows and System Requirements

Implement an effective, consistent, and repeatable strategy for documenting data analytics workflows and capturing system requirements

Data analytics applications involve more than just analyzing data, particularly on advanced analytics projects. Much of the required work takes place upfront, in collecting, integrating, and preparing data and then developing, testing, and revising analytical models to ensure that they produce accurate results. In addition to data scientists and other data analysts, analytics teams often include data engineers, who create data pipelines and help prepare data sets for analysis.” — TechTarget

Audio version of the post on YouTube

Introduction

Successful consultants, project managers, and product owners use well-proven and systematic approaches to achieve desired outcomes, including successful customer engagements, project results, and product and service launches. Modern data stacks and analytics workflows are increasingly complex. This technology-agnostic discovery process aims to help an organization efficiently and repeatably capture a concise record of existing analytics workflows, business and technical goals and constraints, and measures of success. If applicable, the discovery process is also used to compile and clarify requirements for new data analytics workflows.

Animation of the discovery process

Analytics Workflow Stages

There are many patterns organizations use to delineate the stages of their analytics workflows. This process utilizes six stages of a typical analytics workflow:

  1. Generate: All the ways data is generated and the systems of record where it is stored or originates from, also referred to as data ingress
  2. Collect: All the ways data is collected or ingested
  3. Prepare: All the ways data is transformed, including ETL, ELT, reverse ETL, and ML
  4. Store: All the ways data is stored, organized, and secured for analytics purposes
  5. Analyze: All the ways data is analyzed
  6. Deliver: All the ways data is delivered and how it is consumed, also referred to as data egress or data products

The precise nomenclature is not critical to this process as long as all major functionality is considered.

The Process

The discovery process starts by working backward. It first identifies existing goals and desired outcomes. It then identifies existing and anticipated future constraints. Next, it breaks down the current analytics workflows, examining the four stages of collect, prepare, store, and analyze, the steps required to get from data sources to deliverables. Finally, it captures the inputs and the outputs for the workflows and the data producers and consumers.

Collect, prepare, store, and analyze — the steps required to get from data sources to deliverables.

Specifically, the process identifies and documents the following:

  1. Business and technical goals and desired outcomes
  2. Business and technical constraints also referred to as limitations or restrictions
  3. Analytics workflows: tools, techniques, procedures, and organizational structure
  4. Outputs also referred to as deliverables, required to achieve desired outcomes
  5. Inputs, also referred to as data sources, required to achieve desired outcomes
  6. Data producers and consumers
  7. Measures of success
  8. Recommended next steps

Outcomes

Capture business and technical goals and desired outcomes driving the necessity to rearchitect current analytics processes. For example:

  1. Re-architect analytics processes to modernize, reduce complexity, or add new capabilities
  2. Reduce or control costs
  3. Increase performance, scalability, speed
  4. Migrate on-premises workloads, workflows, processes to the Cloud
  5. Migrate from one cloud provider or SaaS provider to another
  6. Move away from proprietary software products to open source software (OSS) or commercial open source software (COSS)
  7. Migrate away from custom-built software to commercial off-the-shelf (COTS), OSS, or COSS solutions
  8. Integrate DevOps, GitOps, DataOps, or MLOps practices
  9. Integrate on-premises, multi-cloud, and SaaS-based hybrid architectures
  10. Develop new analytics product or service offerings
  11. Standardize analytics processes
  12. Leverage the data for AI and ML purposes
  13. Provide stakeholders with a real-time business KPIs dashboard
  14. Construct a data lake, data warehouse, data lakehouse, or data mesh

If migration is involved, review the 6 R’s of Cloud Migration: Rehost, Replatform, Repurchase, Refactor, Retain, or Retire.

Constraints

Identify the existing and potential future business and technical constraints that impact analytics workflows. For example:

  1. Budgets
  2. Cost attribution
  3. Timelines
  4. Access to skilled resources
  5. Internal and external regulatory requirements, such as HIPAA, SOC2, FedRAMP, GDPR, PCI DSS, CCPA, and FISMA
  6. Business Continuity and Disaster Recovery (BCDR) requirements
  7. Architecture Review Board (ARB), Center of Excellence (CoE), Change Advisory Board (CAB), and Release Management standards and guidelines
  8. Data residency and data sovereignty requirements
  9. Security policies
  10. Service Level Agreements (SLAs); see ‘Measures of Success’ section
  11. Existing vendor, partner, cloud-provider, and SaaS relationships
  12. Existing licensing and contractual obligations
  13. Deprecated code dependencies and other technical debt
  14. Must-keep aspects of existing processes
  15. Build versus buy propensity
  16. Proprietary versus open source software propensity
  17. Insourcing versus outsourcing propensity
  18. Managed, hosted, SaaS versus self-managed software propensity

Analytics Workflows

Capture analytics workflows using the four stages of collect, prepare, store, and analyze, as a way to organize the discussion:

  1. High- and low-level architecture, process flow diagrams, sequence diagrams
  2. Recent architectural assessments such as reviews based on the AWS Data Analytics Lens, AWS Well-Architected Framework, Microsoft Azure Well-Architected Framework, or Google Cloud Architecture Framework
  3. Analytics tools, including hardware and commercial, custom, and open-source software
  4. Security policies, processes, standards, and technologies
  5. Observability, logging, monitoring, alerting, and notification
  6. Teams, including roles, responsibilities, and skillsets
  7. Partners, including consultants, vendors, SaaS providers, and Managed Service Providers (MSP)
  8. SDLC environments, such as Local, Sandbox, Development, Testing, Staging, Production, and Disaster Recovery (DR)
  9. Business Continuity Planning (BCP) policies, processes, standards, and technologies
  10. Primary analytics programming languages
  11. External system dependencies
  12. DataOps, MLOps, DevOps, CI/CD, SCM/VCS, and Infrastructure-as-Code (IaC) automation policies, processes, standards, and technologies
  13. Data governance and data lineage policies, processes, standards, and technologies
  14. Data quality (or data assurance) policies, processes, standards, technologies, and testing methodologies
  15. Data anomaly detection policies, processes, standards, and technologies
  16. Intellectual property (IP), the ‘secret sauce’ that differentiates the organization’s processes and provides a competitive advantage, such as ML models, proprietary algorithms, datasets, highly specialized knowledge, and patents
  17. Overall effectiveness and customer satisfaction with existing analytics platform (document sources of customer feedback)
  18. Known deficiencies with current analytics processes

Outputs

Identify the deliverables required to meet the desired outcomes. For example, prepare and provide data for:

  1. Data analytics purposes
  2. Business Intelligence (BI), visualizations, and dashboards
  3. Machine Learning (ML) and Artificial Intelligence (AI)
  4. Data exports and data feeds, such as Excel or CSV-format files
  5. Hosted datasets for external or internal consumption
  6. Data APIs for external or internal consumption
  7. Documentation, Data API guides, data dictionaries, example code such as Notebooks
  8. SaaS-based product offering

Inputs

Capture sources of data that are required to produce the outputs. For example:

  1. Batch sources such as flat files from legacy systems, third-party providers, and enterprise platforms
  2. Streaming sources such as message queues, change data capture (CDC), IoT device telemetry, operational metrics, real time logs, clickstream data, connected devices, mobile, and gaming feeds
  3. Databases, including relational, NoSQL, key-value, document, in-memory, graph, time series, and wide column (OLTP data stores)
  4. Data warehouses (OLAP data stores)
  5. Data lakes
  6. API endpoints
  7. Internal, public, and licensed datasets

Use the 5 V’s of big data to dive deep into each data source: Volume, Velocity, Variety, Veracity (or Validity), and Value.

Data Producers and Consumers

Capture all producers and consumers of data:

  1. Data producers
  2. Data consumers
  3. Data access patterns
  4. Data usage patterns
  5. Consumer and producer requirements and constraints

Measures of Success

Identify how success is measured for the analytics workflows and by whom. For example:

  1. Key Performance Indicators (KPIs)
  2. Service Level Agreements (SLAs)
  3. Customer Satisfaction Score (CSAT)
  4. Net Promoter Score (NPS)
  5. SaaS growth metrics: churn, activation rate, MRR, ARR, CAC, CLV, expansion revenue (source: appcues.com)
  6. Data quality guarantees
  7. How are measurements determined, calculated, and weighted?
  8. What are the business and technical actions resulting from missed measures of success?

Results

The immediate artifact of the data analytics discovery process is a clear and concise document that captures all feedback and inputs. In addition, the document contains all customer-supplied artifacts, such as architectural and process flow diagrams. The document should be thoroughly reviewed for accuracy and completeness by the process participants. This artifact serves as a record of current data analytics workflows and a basis for making workflow improvement recommendations or architecting new workflows.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , ,

Leave a comment

DevOps for DataOps: Building a CI/CD Pipeline for Apache Airflow DAGs

Build an effective CI/CD pipeline to test and deploy your Apache Airflow DAGs to Amazon MWAA using GitHub Actions

Introduction

In this post, we will learn how to use GitHub Actions to build an effective CI/CD workflow for our Apache Airflow DAGs. We will use the DevOps concepts of Continuous Integration and Continuous Delivery to automate the testing and deployment of Airflow DAGs to Amazon Managed Workflows for Apache Airflow (Amazon MWAA) on AWS.

Fork and pull model of collaborative Airflow development used in this post

Technologies

Apache Airflow

According to the documentation, Apache Airflow is an open-source platform to author, schedule, and monitor workflows programmatically. With Airflow, you author workflows as Directed Acyclic Graphs (DAGs) of tasks written in Python.

Amazon Managed Workflows for Apache Airflow

According to AWS, Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a highly available, secure, and fully-managed workflow orchestration for Apache Airflow. MWAA automatically scales its workflow execution capacity to meet your needs and is integrated with AWS security services to help provide fast and secure access to data.

Example of Apache Airflow UI within Amazon MWAA Environment

GitHub Actions

According to GitHub, GitHub Actions makes it easy to automate software workflows with CI/CD. GitHub Actions allow you to build, test, and deploy code right from GitHub. GitHub Actions are workflows triggered by GitHub events like push, issue creation, or a new release. You can leverage GitHub Actions prebuilt and maintained by the community.

Example of GitHub Action workflow running in the GitHub repository used in this post

If you are new to GitHub Actions, I recommend my previous post, Continuous Integration and Deployment of Docker Images using GitHub Actions.

Terminology

DataOps

According to Wikipedia, DataOps is an automated, process-oriented methodology used by analytic and data teams to improve the quality and reduce the cycle time of data analytics. While DataOps began as a set of best practices, it has now matured to become a new approach to data analytics.

DataOps applies to the entire data lifecycle from data preparation to reporting and recognizes the interconnected nature of the data analytics team and IT operations. DataOps incorporates the Agile methodology to shorten the software development life cycle (SDLC) of analytics development.

DevOps

According to Wikipedia, DevOps is a set of practices that combines software development (Dev) and IT operations (Ops). It aims to shorten the systems development life cycle and provide continuous delivery with high software quality.

DevOps is a set of practices intended to reduce the time between committing a change to a system and the change being placed into normal production, while ensuring high quality. -Wikipedia

Fail Fast

According to Wikipedia, a fail-fast system is one that immediately reports any condition that is likely to indicate a failure. Using the DevOps concept of fail fast, we build steps into our workflows to uncover errors sooner in the SDLC. We shift testing as far to the left as possible (referring to the pipeline of steps moving from left to right) and test at multiple points along the way.

Source Code

All source code for this demonstration, including the GitHub Actions, Pytest unit tests, and Git Hooks, is open-sourced and located on GitHub.

Architecture

The diagram below represents the architecture for a recent blog post and video demonstration, Lakehouse Automation on AWS with Apache Airflow. The post and video show how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based data lake using Apache Airflow.

Architecture for the post and video, Lakehouse Automation on AWS with Apache Airflow

In this post, we will review how the DAGs from the previous were developed, tested, and deployed to MWAA using a variety of progressively more effective CI/CD workflows. The workflows demonstrated could also be easily applied to other Airflow resources in addition to DAGs, such as SQL scripts, configuration and data files, Python requirement files, and plugins.

Workflows

No DevOps

Below we see a minimally viable workflow for loading DAGs into Amazon MWAA, which does not use the principles of CI/CD. Changes are made in the local Airflow developer’s environment. The modified DAGs are copied directly to the Amazon S3 bucket, which are then automatically synced with Amazon MWAA, barring any errors. Those changes are also (hopefully) pushed back to the centralized version control or source code management (SCM) system, which is GitHub in this post.

Error-prone, non-DevOps workflow for modifying and syncing DAGs to MWAA

There are at least two significant issues with this error-prone workflow. First, the DAGs are always out of sync between the Amazon S3 bucket and GitHub. These are two independent steps — copying or syncing the DAGs to S3 and pushing the DAGs to GitHub. A developer might continue making changes and pushing DAGs to S3 without pushing to GitHub or vice versa.

Secondly, the DevOps concept of fail-fast is missing. The first time you know your DAG contains errors is likely when it is synced to MWAA and throws an Import Error. By then, the DAG has already been copied to S3, synced to MWAA, and possibly pushed to GitHub, which other developers could then pull.

Example of a typical DAG Import Error, easily caught with a simple test

GitHub Actions

A significant step up from the previous workflow is using GitHub Actions to test and deploy your code after pushing it to GitHub. Although in this workflow, code is still ‘pushed straight to Trunk’ (the main branch in GitHub) and risks other developers in a collaborative environment pulling potentially erroneous code, you have far less chance of DAG errors making it to MWAA.

GitHub Actions allow you to fail faster and catch errors sooner

Using GitHub Actions, you also eliminate human error that could result in the changes to DAGs not being synced to Amazon S3. Lastly, using this workflow improves security by eliminating the need to provide direct access to the Airflow Amazon S3 bucket to Airflow Developers.

Fork and pull model of collaborative Airflow development used in this post (video only)

Types of Tests

The first GitHub Action, test_dags.yml, is triggered on a push to the dags directory in the main branch of the repository. It is also triggered whenever a pull request is made for the main branch. The first GitHub Action runs a battery of tests, including checking Python dependencies, code style, code quality, DAG import errors, and unit tests. The tests catch issues with DAGs before being synced to S3 by a second GitHub Action.

name: Test DAGs

on:
  push:
    paths:
      - 'dags/**'
  pull_request:
    branches:
      - main

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.7'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements/requirements.txt
        pip check
    - name: Lint with Flake8
      run: |
        pip install flake8
        flake8 --ignore E501 dags --benchmark -v
    - name: Confirm Black code compliance (psf/black)
      run: |
        pip install pytest-black
        pytest dags --black -v
    - name: Test with Pytest
      run: |
        pip install pytest
        cd tests || exit
        pytest tests.py -v

Successful runs of the ‘Test DAGs’ GitHub Action, shown in the Actions Console

Python Dependencies

The first test installs the modules listed in the requirements.txt file used locally to develop the application. This test is designed to uncover any missing or conflicting modules.

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements/requirements.txt
pip check

It is essential to develop your DAGs against the same version of Python and with the same version of the Python modules used in your Airflow environment. You can use the BashOperator to run shell commands to obtain the versions of Python and module installed in your Airflow environment:

python3 --version; python3 -m pip list

A snippet of log output from DAG showing Python version and Python modules available in MWAA 2.0.2:

Python version and Python modules available in MWAA 2.0.2

The latest stable release of Airflow is currently version 2.2.2, released 2021-11-15. However, as of December 2021, Amazon’s latest version of MWAA 2.x is version 2.0.2, released 2021-04-19. MWAA 2.0.2 currently runs Python3 version 3.7.10.

Available versions of Amazon MWAA as of December 2021

Flake8

Known as ‘your tool for style guide enforcement,’ Flake8 is described as the modular source code checker. It is a command-line utility for enforcing style consistency across Python projects. Flake8 is a wrapper around PyFlakes, pycodestyle, and Ned Batchelder’s McCabe script. The module, pycodestyle, is a tool to check your Python code against some of the style conventions in PEP 8.

Flake8 is highly configurable, with options to ignore specific rules if not required by your development team. For example, in this demonstration, I intentionally ignored rule E501, which states that ‘line length should be limited to 72 characters.

- name: Lint with Flake8
run: |
pip install flake8
flake8 --ignore E501 dags --benchmark -v

Black

Known as ‘the uncompromising code formatter,’ Python code formatted using Black (referred to as Blackened code) looks the same regardless of the project you’re reading. Formatting becomes transparent, allowing teams to focus on the content instead. Black makes code review faster by producing the smallest diffs possible, assuming all developers are using black to format their code.

The Airflow DAGs in this GitHub repository are automatically formatted with black using a pre-commit Git Hooks before being committed and pushed to GitHub. The test confirms black code compliance.

- name: Confirm Black code compliance (psf/black)
run: |
pip install pytest-black
pytest dags --black -v

Pytest

The pytest framework describes itself as a mature, fully-featured Python testing tool that helps you write better programs. The Pytest framework makes it easy to write small tests yet scales to support complex functional testing for applications and libraries.

The GitHub Action in the GitHub project, test_dags.yml, calls the tests.py file, also contained in the project.

- name: Test with Pytest
run: |
pip install pytest
cd tests || exit
pytest tests.py -v

The tests.py file contains several pytest unit tests. The tests are based on my project requirements; your tests will vary. These tests confirm that all DAGs:

  1. Do not contain DAG Import Errors (test catches 75% of my errors);
  2. Follow specific file naming conventions;
  3. Include a description and an owner other than ‘airflow’;
  4. Contain required project tags;
  5. Do not send emails (my projects use SNS or Slack for notifications);
  6. Do not retry more than three times;
import os
import sys
import pytest
from airflow.models import DagBag
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags"))
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags/utilities"))
# Airflow variables called from DAGs under test are stubbed out
os.environ["AIRFLOW_VAR_DATA_LAKE_BUCKET"] = "test_bucket"
os.environ["AIRFLOW_VAR_ATHENA_QUERY_RESULTS"] = "SELECT 1;"
os.environ["AIRFLOW_VAR_SNS_TOPIC"] = "test_topic"
os.environ["AIRFLOW_VAR_REDSHIFT_UNLOAD_IAM_ROLE"] = "test_role_1"
os.environ["AIRFLOW_VAR_GLUE_CRAWLER_IAM_ROLE"] = "test_role_2"
@pytest.fixture(params=["../dags/"])
def dag_bag(request):
return DagBag(dag_folder=request.param, include_examples=False)
def test_no_import_errors(dag_bag):
assert not dag_bag.import_errors
def test_requires_tags(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.tags
def test_requires_specific_tag(dag_bag):
for dag_id, dag in dag_bag.dags.items():
try:
assert dag.tags.index("data lake demo") >= 0
except ValueError:
assert dag.tags.index("redshift demo") >= 0
def test_desc_len_greater_than_fifteen(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.description) > 15
def test_owner_len_greater_than_five(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.owner) > 5
def test_owner_not_airflow(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag.owner) != "airflow"
def test_no_emails_on_retry(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_retry"]
def test_no_emails_on_failure(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_failure"]
def test_three_or_less_retries(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.default_args["retries"] <= 3
def test_dag_id_contains_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).find("__") != -1
def test_dag_id_requires_specific_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).startswith("data_lake__") \
or str.lower(dag_id).startswith("redshift_demo__")

If you are building custom Airflow Operators, additional unit, functional, and integration tests are recommended.

Fork and Pull

We can improve on the practice of pushing directly to Trunk by implementing one of two collaborative development models, recommended by GitHub:

  1. The Shared repository model: uses ‘topic’ branches, which are reviewed, approved, and merged into the main branch.
  2. Fork and pull model: a repo is forked, changes are made, a pull request is created, the request is reviewed, and if approved, merged into the main branch.

In the fork and pull model, we create a fork of the DAG repository where we make our changes. We then commit and push those changes back to the forked repository. When ready, we create a pull request. If the pull request is approved and passes all the tests, it is manually or automatically merged into the main branch. DAGs are then synced to S3 and, eventually, to MWAA. I usually prefer to trigger merges manually once all tests have passed.

The fork and pull model greatly reduces the chance that bad code is merged to the main branch before passing all tests.

Errors are caught early in the fork and pull model prior to merging code changes

Syncing DAGs to S3

The second GitHub Action in the GitHub project, sync_dags.yml, is triggered when the previous Action, test_dags.yml, completes successfully, or in the case of the folk and pull method, the merge to the main branch is successful.

name: Sync DAGs

on:
workflow_run:
workflows:
- 'Test DAGs'
types:
- completed
pull_request:
types:
- closed

jobs:
deploy:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@master
- uses: jakejarvis/s3-sync-action@master
env:
AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: 'us-east-1'
SOURCE_DIR: 'dags'
DEST_DIR: 'dags'

The GitHub Action, sync_dags.yml, requires three GitHub encrypted secrets, created in advance and associated with the GitHub repository. According to GitHub, secrets are encrypted environment variables you create in an organization, repository, or repository environment. Encrypted secrets allow you to store sensitive information, such as access tokens, in your repository. The secrets that you create are available to use in GitHub Actions workflows.

Encrypted repository secrets used by GitHub Action to sync with Amazon S3

The DAGs are synced to Amazon S3 and, eventually, automatically synced to MWAA.

GitHub Action syncs DAGs to Amazon S3 if tests are successful

Local Testing and Git Hooks

To further improve your CI/CD workflows, you should consider using Git Hooks. Using Git Hooks, we can ensure code is tested locally before committing and pushing changes to GitHub. Testing locally allows us to fail-faster, catching errors during development instead of once code is pushed to GitHub.

Errors are caught even early using Git Hooks

According to the documentation, Git has a way to fire off custom scripts when certain important actions occur. There are two types of hooks: client-side and server-side. Client-side hooks are triggered by operations such as committing and merging, while server-side hooks run on network operations such as receiving pushed commits.

You can use these hooks for all sorts of reasons. I often use a client-side pre-commit hook to format DAGs using black. Using a client-side pre-push Git Hook, we will ensure that tests are run before pushing the DAGs to GitHub. According to Git, The pre-push hook runs when the git push command is executed after the remote refs have been updated but before any objects have been transferred. You can use it to validate a set of ref updates before a push occurs. A non-zero exit code will abort the push. The test could instead be run as part of the pre-commit hook if they are not too time-consuming.

To use the pre-push hook, create the following file within the local repository, .git/hooks/pre-push:

#!/bin/sh
# do nothing if there are no commits to push
if [ -z "$(git log @{u}..)" ]; then
exit 0
fi
sh ./run_tests_locally.sh

Then, run the following chmod command to make the hook executable:

chmod 755 .git/hooks/pre-push

The the pre-push hook runs the shell script, run_tests_locally.sh. The script executes nearly identical tests, locally, as the GitHub Action, test_dags.yml, does remotely on GitHub:

#!/bin/sh
echo "Starting Flake8 test..."
flake8 --ignore E501 dags --benchmark || exit 1
echo "Starting Black test..."
python3 -m pytest --cache-clear
python3 -m pytest dags/ --black -v || exit 1
echo "Starting Pytest tests..."
cd tests || exit
python3 -m pytest tests.py -v || exit 1
echo "All tests completed successfully! 🥳"

Complete CI/CD workflow including running tests locally using a Git Hook (video only)

References

Here are some additional references for testing and deploying Airflow DAGs and the use of GitHub Actions:

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options

Introduction

For anyone new to Amazon Managed Workflows for Apache Airflow (Amazon MWAA), especially those used to managing their own Apache Airflow platform, Amazon MWAA’s configuration might appear to be a bit of a black box at first. This brief post will explore Amazon MWAA’s configuration — how to inspect it and how to modify it. We will use Airflow DAGs to review an MWAA environment’s airflow.cfg file, environment variables, and Python packages.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project in 2019.

With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. For more information on Amazon MWAA, read my last post, Running Spark Jobs on Amazon EMR with Apache Airflow.

Image for post
Apache Airflow UI

Source Code

The DAGs referenced in this post are available on GitHub. Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/aws-airflow-demo.git

Accessing Configuration

Environment Variables

Environment variables are an essential part of an MWAA environment’s configuration. There are various ways to examine the environment variables. You could use Airflow’s BashOperator to simply call the command, env, or the PythonOperator to call a Python iterator function, as shown below. A sample DAG, dags/get_env_vars.py, is included in the project.

def print_env_vars():
keys = str(os.environ.keys().replace("', '", "'|'").split("|")
keys.sort()
for key in keys:
print(key)
get_env_vars_operator = PythonOperator(
task_id='get_env_vars_task',
python_callable=print_env_vars
)
view raw get_env_vars.py hosted with ❤ by GitHub

The DAG’s PythonOperator will iterate over the MWAA environment’s environment variables and output them to the task’s log. Below is a snippet of an example task’s log.

[2020-12-25 23:59:07,170] {{standard_task_runner.py:78}} INFO – Job 272: Subtask get_env_vars_task
[2020-12-25 23:59:08,423] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-25 23:59:08,516] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONSOLE_LOGS_ENABLED': 'false'
[2020-12-25 23:59:08,689] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONSOLE_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:08,777] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_EMAIL': 'airflow@example.com'
[2020-12-25 23:59:08,877] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_ID': 'get_env_vars'
[2020-12-25 23:59:08,970] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_OWNER': 'airflow'
[2020-12-25 23:59:09,269] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_TASK_ID': 'get_env_vars_task'
[2020-12-25 23:59:09,357] {{logging_mixin.py:112}} INFO – 'AIRFLOW_DAG_PROCESSING_LOGS_ENABLED': 'false'
[2020-12-25 23:59:09,552] {{logging_mixin.py:112}} INFO – 'AIRFLOW_DAG_PROCESSING_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:09,647] {{logging_mixin.py:112}} INFO – 'AIRFLOW_ENV_NAME': 'MyAirflowEnvironment'
[2020-12-25 23:59:09,729] {{logging_mixin.py:112}} INFO – 'AIRFLOW_HOME': '/usr/local/airflow'
[2020-12-25 23:59:09,827] {{logging_mixin.py:112}} INFO – 'AIRFLOW_SCHEDULER_LOGS_ENABLED': 'false'
[2020-12-25 23:59:12,915] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-25 23:59:12,986] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'
[2020-12-25 23:59:13,136] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__LOAD_EXAMPLES': 'False'
[2020-12-25 23:59:13,217] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__PARALLELISM': '10000'
[2020-12-25 23:59:14,531] {{logging_mixin.py:112}} INFO – 'AWS_DEFAULT_REGION': 'us-east-1'
[2020-12-25 23:59:14,565] {{logging_mixin.py:112}} INFO – 'AWS_EXECUTION_ENV': 'AWS_ECS_FARGATE'
[2020-12-25 23:59:14,616] {{logging_mixin.py:112}} INFO – 'AWS_REGION': 'us-east-1'
[2020-12-25 23:59:14,647] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_FILE': ''
[2020-12-25 23:59:14,679] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_LEVEL': '20'
[2020-12-25 23:59:14,711] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_REDIRECT': '1'
[2020-12-25 23:59:14,747] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_REDIRECT_LEVEL': 'WARNING'

Airflow Configuration File

According to Airflow, the airflow.cfg file contains Airflow’s configuration. You can edit it to change any of the settings. The first time you run Apache Airflow, it creates an airflow.cfg configuration file in your AIRFLOW_HOME directory and attaches the configurations to your environment as environment variables.

Amazon MWAA doesn’t expose the airflow.cfg in the Apache Airflow UI of an environment. Although you can’t access it directly, you can view the airflow.cfg file. The configuration file is located in your AIRFLOW_HOME directory, /usr/local/airflow (~/airflow by default).

There are multiple ways to examine your MWAA environment’s airflow.cfg file. You could use Airflow’s PythonOperator to call a Python function that reads the contents of the file, as shown below. The function uses the AIRFLOW_HOME environment variable to locate and read the airflow.cfg. A sample DAG, dags/get_airflow_cfg.py, is included in the project.

def print_airflow_cfg():
with open(f"{os.getenv('AIRFLOW_HOME')}/airflow.cfg", 'r') as airflow_cfg:
file_contents = airflow_cfg.read()
print(f'\n{file_contents}')
get_airflow_cfg_operator = PythonOperator(
task_id='get_airflow_cfg_task',
python_callable=print_airflow_cfg
)

The DAG’s task will read the MWAA environment’s airflow.cfg file and output it to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 00:02:57,163] {{standard_task_runner.py:78}} INFO – Job 274: Subtask get_airflow_cfg_task
[2020-12-26 00:02:57,583] {{logging_mixin.py:112}} INFO –
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /usr/local/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Set this to True if you want to enable remote logging.
remote_logging = True
# Users must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id = aws_default
remote_base_log_folder = cloudwatch://arn:aws:logs:::log-group:airflow-logs:*
encrypt_s3_logs = False
# Logging level
logging_level = INFO
# Logging level for Flask-appbuilder UI
fab_logging_level = WARN
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# Example: logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
[aws_mwaa]
redirect_url = https://console.aws.amazon.com/
session_duration_minutes = 720
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080
# Default timezone to display all dates in the RBAC UI, can be UTC, system, or
# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the
# default value of core/default_timezone will be used
# Example: default_ui_timezone = America/New_York
default_ui_timezone = UTC
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080

Customizing Airflow Configurations

While AWS doesn’t expose the airflow.cfg in the Apache Airflow UI of your environment, you can change the default Apache Airflow configuration options directly within the Amazon MWAA console and continue using all other settings in airflow.cfg. The configuration options changed in the Amazon MWAA console are translated into environment variables.

To customize the Apache Airflow configuration, change the default options directly on the Amazon MWAA console. Select Edit, add or modify configuration options and values in the Airflow configuration options menu, then select Save. For example, we can change Airflow’s default timezone (core.default_ui_timezone) to America/New_York.

Image for post
Amazon MWAA’s Airflow configuration options

Once the MWAA environment is updated, which may take several minutes, view your changes by re-running the DAG,dags/get_env_vars.py. Note the new configuration item on both lines 2 and 6 of the log snippet shown below. The configuration item appears on its own (AIRFLOW__CORE_DEFAULT__UI_TIMEZONE), as well as part of the AIRFLOW_CONFIG_SECRETS dictionary environment variable.

[2020-12-26 05:00:57,756] {{standard_task_runner.py:78}} INFO – Job 293: Subtask get_env_vars_task
[2020-12-26 05:00:58,158] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONFIG_SECRETS': '{"AIRFLOW__CORE__DEFAULT_UI_TIMEZONE":"America/New_York"}'
[2020-12-26 05:00:58,190] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-26 05:01:00,537] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-26 05:01:00,578] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DEFAULT_UI_TIMEZONE': 'America/New_York'
[2020-12-26 05:01:00,630] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'

Using the MWAA API

We can also make configuration changes using the MWAA API. For example, to change the default Airflow UI timezone, call the MWAA API’s update-environment command using the AWS CLI. Include the --airflow-configuration-option parameter, passing the core.default_ui_timezone key/value pair as a JSON blob.

aws mwaa update-environment \
–name <your_environment_name> \
–airflow-configuration-options """{
\"core.default_ui_timezone\": \"America/Los_Angeles\"
}"""

To review an environment’s configuration, use the get-environment command in combination with jq.

aws mwaa get-environment \
–name <your_environment_name> | \
jq -r '.Environment.AirflowConfigurationOptions'

Below, we see an example of the output.

{
"core.default_ui_timezone": "America/Los_Angeles"
}

Python Packages

Airflow is written in Python, and workflows are created via Python scripts. Python packages are a crucial part of an MWAA environment’s configuration. According to the documentation, an ‘extra package’, is a Python subpackage that is not included in the Apache Airflow base, installed on your MWAA environment. As part of setting up an MWAA environment, you can specify the location of the requirements.txt file in the Airflow S3 bucket. Extra packages are installed using the requirements.txt file.

Image for post
Amazon MWAA environment’s configuration

There are several ways to check your MWAA environment’s installed Python packages and versions. You could use Airflow’s BashOperator to call the command, python3 -m pip list. A sample DAG, dags/get_py_pkgs.py, is included in the project.

list_python_packages_operator = BashOperator(
task_id='list_python_packages',
bash_command='python3 -m pip list'
)
view raw get_py_pkgs.py hosted with ❤ by GitHub

The DAG’s task will output a list of all Python packages and package versions to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 21:53:06,310] {{bash_operator.py:136}} INFO – Temporary script location: /tmp/airflowtmp2whgp_p8/list_python_packagesxo8slhc6
[2020-12-26 21:53:06,350] {{bash_operator.py:146}} INFO – Running command: python3 -m pip list
[2020-12-26 21:53:06,395] {{bash_operator.py:153}} INFO – Output:
[2020-12-26 21:53:06,750] {{bash_operator.py:157}} INFO – Package Version
[2020-12-26 21:53:06,786] {{bash_operator.py:157}} INFO – ———————- ———
[2020-12-26 21:53:06,815] {{bash_operator.py:157}} INFO – alembic 1.4.2
[2020-12-26 21:53:06,856] {{bash_operator.py:157}} INFO – amqp 2.6.1
[2020-12-26 21:53:06,898] {{bash_operator.py:157}} INFO – apache-airflow 1.10.12
[2020-12-26 21:53:06,929] {{bash_operator.py:157}} INFO – apispec 1.3.3
[2020-12-26 21:53:06,960] {{bash_operator.py:157}} INFO – argcomplete 1.12.0
[2020-12-26 21:53:07,002] {{bash_operator.py:157}} INFO – attrs 19.3.0
[2020-12-26 21:53:07,036] {{bash_operator.py:157}} INFO – Babel 2.8.0
[2020-12-26 21:53:07,071] {{bash_operator.py:157}} INFO – billiard 3.6.3.0
[2020-12-26 21:53:07,960] {{bash_operator.py:157}} INFO – boto3 1.16.10
[2020-12-26 21:53:07,993] {{bash_operator.py:157}} INFO – botocore 1.19.10
[2020-12-26 21:53:08,028] {{bash_operator.py:157}} INFO – cached-property 1.5.1
[2020-12-26 21:53:08,061] {{bash_operator.py:157}} INFO – cattrs 1.0.0
[2020-12-26 21:53:08,096] {{bash_operator.py:157}} INFO – celery 4.4.7
[2020-12-26 21:53:08,130] {{bash_operator.py:157}} INFO – certifi 2020.6.20
[2020-12-26 21:53:12,260] {{bash_operator.py:157}} INFO – pandas 1.1.0
[2020-12-26 21:53:12,289] {{bash_operator.py:157}} INFO – pendulum 1.4.4
[2020-12-26 21:53:12,490] {{bash_operator.py:157}} INFO – pip 9.0.3
[2020-12-26 21:53:12,522] {{bash_operator.py:157}} INFO – prison 0.1.3
[2020-12-26 21:53:12,550] {{bash_operator.py:157}} INFO – prometheus-client 0.8.0
[2020-12-26 21:53:12,580] {{bash_operator.py:157}} INFO – psutil 5.7.2
[2020-12-26 21:53:12,613] {{bash_operator.py:157}} INFO – pycparser 2.20
[2020-12-26 21:53:12,641] {{bash_operator.py:157}} INFO – pycurl 7.43.0.5
[2020-12-26 21:53:12,676] {{bash_operator.py:157}} INFO – Pygments 2.6.1
[2020-12-26 21:53:12,710] {{bash_operator.py:157}} INFO – PyGreSQL 5.2.1
[2020-12-26 21:53:12,746] {{bash_operator.py:157}} INFO – PyJWT 1.7.1

Conclusion

Understanding your Amazon MWAA environment’s airflow.cfg file, environment variables, and Python packages are all important for proper Airflow platform management. This brief post learned more about Amazon MWAA’s configuration — how to inspect it using DAGs and how to modify it through the Amazon MWAA console.

, , , ,

Leave a comment