Posts Tagged Data Catalog
End-to-End Data Discovery, Observability, and Governance on AWS with LinkedIn’s Open-source DataHub
Posted by Gary A. Stafford in Analytics, AWS, Azure, Bash Scripting, Build Automation, Cloud, DevOps, GCP, Kubernetes, Python, Software Development, SQL, Technology Consulting on March 26, 2022
Use DataHub’s data catalog capabilities to collect, organize, enrich, and search for metadata across multiple platforms
Introduction
According to Shirshanka Das, Founder of LinkedIn DataHub, Apache Gobblin, and Acryl Data, one of the simplest definitions for a data catalog can be found on the Oracle website: “Simply put, a data catalog is an organized inventory of data assets in the organization. It uses metadata to help organizations manage their data. It also helps data professionals collect, organize, access, and enrich metadata to support data discovery and governance.”
Another succinct description of a data catalog’s purpose comes from Alation: “a collection of metadata, combined with data management and search tools, that helps analysts and other data users to find the data that they need, serves as an inventory of available data, and provides information to evaluate the fitness of data for intended uses.”
Working with many organizations in the area of Analytics, one of the more common requests I receive regards choosing and implementing a data catalog. Organizations have datasources hosted in corporate data centers, on AWS, by SaaS providers, and with other Cloud Service Providers. Several of these organizations have recently gravitated to DataHub, the open-source metadata platform for the modern data stack, originally developed by LinkedIn.

In this post, we will explore the capabilities of DataHub to build a centralized data catalog on AWS for datasources hosted in multiple AWS accounts, SaaS providers, cloud service providers, and corporate data centers. I will demonstrate how to build a DataHub data catalog using out-of-the-box data source plugins for automated metadata ingestion.

Data Catalog Competitors
Data catalogs are not new; technologies such as data dictionaries have been around as far back as the 1980’s. Gartner publishes their Metadata Management (EMM) Solutions Reviews and Ratings and Metadata Management Magic Quadrant. These reports contain a comprehensive list of traditional commercial enterprise players, modern cloud-native SaaS vendors, and Cloud Service Provider (CSP) offerings. DBMS Tools also hosts a comprehensive list of 30 data catalogs. A sampling of current data catalogs includes:
Open Source Software
Commercial
- Acryl Data (based on LinkedIn’s DataHub)
- Atlan
- Stemma (based on Lyft’s Amundsen)
- Talend
- Alation
- Collibra
- data.world
Cloud Service Providers
Data Catalog Features
DataHub describes itself as “a modern data catalog built to enable end-to-end data discovery, data observability, and data governance.” Sorting through vendor’s marketing jargon and hype, standard features of leading data catalogs include:
- Metadata ingestion
- Data discovery
- Data governance
- Data observability
- Data lineage
- Data dictionary
- Data classification
- Usage/popularity statistics
- Sensitive data handling
- Data fitness (aka data quality or data profiling)
- Manage both technical and business metadata
- Business glossary
- Tagging
- Natively supported datasource integrations
- Advanced metadata search
- Fine-grain authentication and authorization
- UI- and API-based interaction
Datasources
When considering a data catalog solution, in my experience, the most common datasources that customers want to discover, inventory, and search include:
- Relational databases and other OLTP datasources such as PostgreSQL, MySQL, Microsoft SQL Server, and Oracle
- Cloud Data Warehouses and other OLAP datasources such as Amazon Redshift, Snowflake, and Google BigQuery
- NoSQL datasources such as MongoDB, MongoDB Atlas, and Azure Cosmos DB
- Persistent event-streaming platforms such as Apache Kafka (Amazon MSK and Confluent)
- Distributed storage datasets (e.g., Data Lakes) such as Amazon S3, Apache Hive, and AWS Glue Data Catalogs
- Business Intelligence (BI), dashboards, and data visualization sources such as Looker, Tableau, and Microsoft Power BI
- ETL sources, such as Apache Spark, Apache Airflow, Apache NiFi, and dbt
DataHub on AWS
DataHub’s convenient AWS setup guide covers options to deploy DataHub to AWS. For this post, I have hosted DataHub on Kubernetes, using Amazon Elastic Kubernetes Service (Amazon EKS). Alternately, you could choose Google Kubernetes Engine (GKE) on Google Cloud or Azure Kubernetes Service (AKS) on Microsoft Azure.
Conveniently, DataHub offers a Helm chart, making deployment to Kubernetes straightforward. Furthermore, Helm charts are easily integrated with popular CI/CD tools. For this post, I’ve used ArgoCD, the declarative GitOps continuous delivery tool for Kubernetes, to deploy the DataHub Helm charts to Amazon EKS.

According to the documentation, DataHub consists of four main components: GMS, MAE Consumer (optional), MCE Consumer (optional), and Frontend. Kubernetes deployment for each of the components is defined as sub-charts under the main DataHub Helm chart.
External Storage Layer Dependencies
Four external storage layer dependencies power the main DataHub components: Kafka, Local DB (MySQL, Postgres, or MariaDB), Search Index (Elasticsearch), and Graph Index (Neo4j or Elasticsearch). DataHub has provided a separate DataHub Prerequisites Helm chart for the dependencies. The dependencies must be deployed before deploying DataHub.
Alternately, you can substitute AWS managed services for the external storage layer dependencies, which is also detailed in the Deploying to AWS documentation. AWS managed service dependency substitutions include Amazon RDS for MySQL, Amazon OpenSearch (fka Amazon Elasticsearch), and Amazon Managed Streaming for Apache Kafka (Amazon MSK). According to DataHub, support for using AWS Neptune as the Graph Index is coming soon.
DataHub CLI and Plug-ins
DataHub comes with the datahub
CLI, allowing you to perform many common operations on the command line. You can install and use the DataHub CLI within your development environment or integrate it with your CI/CD tooling.

DataHub uses a plugin architecture. Plugins allow you to install only the datasource dependencies you need. For example, if you want to ingest metadata from Amazon Athena, just install the Athena plugin: pip install 'acryl-datahub[athena]'
. DataHub Source, Sink, and Transformer plugins can be displayed using the datahub check plugins
CLI command.


Secure Metadata Ingestion
Often, datasources are not externally accessible for security reasons. Further, many datasources may not be accessible to individual users, especially in higher environments like UAT, Staging, and Production. They are only accessible to applications or CI/CD tooling. To overcome these limitations when extracting metadata with DataHub, I prefer to perform my DataHub-related development and testing locally but execute all DataHub ingestion securely on AWS.
In my local development environment, I use JetBrains PyCharm to author the Python and YAML-based DataHub configuration files and ingestion pipeline recipes, then commit those files to git and push them to a private GitHub repository. Finally, I use GitHub Actions to test DataHub files.
To run DataHub ingestion jobs and push the results to DataHub running in Kubernetes on Amazon EKS, I have built a custom Python-based Docker container. The container runs the DataHub CLI, required DataHub plugins, and any additional Python dependencies. The container’s pod has the appropriate AWS IAM permissions, using IAM Roles for Service Accounts (IRSA), to securely access datasources to ingest and the DataHub application.
Schedule and Monitor Pipelines
Scheduling and managing multiple metadata ingestion jobs on AWS is best handled with Apache Airflow with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Ingestion jobs run as Airflow DAG tasks, which call the EKS-based DataHub CLI container. With MWAA, datasource connections, credentials, and other sensitive configurations can be kept secure and not be exposed externally or in plain text.
When running the ingestion pipelines on AWS with DataHub, all communications between AWS-based datasources, ingestion jobs running in Airflow, and DataHub, should use secure private IP addressing and DNS resolution instead of transferring metadata over the Internet. Make sure to create all the necessary VPC peering connections, network route table configurations, and VPC endpoints to connect all relevant services.
SaaS services such as Snowflake or MongoDB Atlas, services provided by other Cloud Service Providers such as Google Cloud and Microsoft Azure, and datasources in corporate datasources require alternate networking and security strategies to access metadata securely.

Markup or Code?
According to the documentation, a DataHub recipe is a configuration file that tells ingestion scripts where to pull data from (source) and where to put it (sink). Recipes normally contain a source
, sink
, and transformers
configuration section. Mark-up language-based job automation written in YAML, JSON, or Domain Specific Languages (DSLs) is often an alternative to writing code. DataHub recipes can be written in YAML. The example recipe shown below is used to ingest metadata from an Amazon RDS for PostgreSQL database, running on AWS.
YAML-based recipes can also use automatic environment variable expansion for convenience, automation, and security. It is considered best practice to secure sensitive configuration values, such as database credentials, in a secure location and reference them as environment variables. For example, note the server: ${DATAHUB_REST_ENDPOINT}
entry in the sink
section below. The DATAHUB_REST_ENDPOINT
environment variable is set ahead of time and re-used for all ingestion jobs. Sensitive database connection information has also been variablized and stored separately.
Using Python
You can configure and run a pipeline entirely from within a custom Python script using DataHub’s Python API as an alternative to YAML. Below, we see two nearly identical ingestion recipes to the YAML above, written in Python. Writing ingestion pipeline logic programmatically gives you increased flexibility for automation, error checking, unit-testing, and notification. Below is a basic pipeline written in Python. The code is functional, but not very Pythonic, secure, scalable, or Production ready.
The second version of the same pipeline is more Production ready. The code is more Pythonic in nature and makes use of error checking, logging, and the AWS Systems Manager (SSM) Parameter Store. Like recipes written in YAML, environment variables can be used for convenience and security. In this example, commonly reused and sensitive connection configuration items have been extracted and placed in the SSM Parameter Store. Additional configuration is pulled from the environment, such as AWS Account ID and AWS Region. The script loads these values at runtime.
Sinking to DataHub
When syncing metadata to DataHub, you have two choices, the GMS REST API or Kafka. According to DataHub, the advantage of the REST-based interface is that any errors can immediately be reported. On the other hand, the advantage of the Kafka-based interface is that it is asynchronous and can handle higher throughput. For this post, I am DataHub’s REST API.


Column-level Metadata
In addition to column names and data types, it is possible to extract column descriptions and key types from certain datasources. Column descriptions, tags, and glossary terms can also be input through the DataHub UI. Below, we see an example of an Amazon Redshift fact table, whose table and column descriptions were ingested as part of the metadata.

Business Glossary
DataHub can assign business glossary terms to entities. The DataHub Business Glossary plugin pulls business glossary metadata from a YAML-based configuration file.
Business glossary terms can be reviewed in the Glossary Terms tab of the DataHub’s UI. Below, we see the three terms associated with the Classification
glossary node: Confidential
, HighlyConfidential
, and Sensitive
.

We can search for entities inventoried in DataHub using their assigned business glossary terms.

Finally, we see an example of an AWS Athena data catalog table with business glossary terms applied to columns within the table’s schema.

SQL-based Profiler
DataHub also can extract statistics about entities in DataHub using the SQL-based Profiler. According to the DataHub documentation, the Profiler can extract the following:
- Row and column counts for each table
- Column null counts and proportions
- Column distinct counts and proportions
- Column min, max, mean, median, standard deviation, quantile values
- Column histograms or frequencies of unique values
In addition, we can also track the historical stats for each profiled entity each time metadata is ingested.


Data Lineage
DataHub’s data lineage features allow us to view upstream and downstream relationships between different types of entities. DataHub can trace lineage across multiple platforms, datasets, pipelines, charts, and dashboards.
Below, we see a simple example of dataset entity-to-entity lineage in Amazon Redshift and then Apache Spark on Amazon EMR. The fact table has a downstream relationship to four database views. The views are based on SQL queries that include the upstream table as a datasource.


DataHub Analytics
DataHub provides basic metadata quality and usage analytics in the DataHub UI: user activity, counts of datasource types, business glossary terms, environments, and actions.


Conclusion
In this post, we explored the features of a data catalog and learned about some of the leading commercial and open-source data catalogs. Next, we learned how DataHub could collect, organize, enrich, and search metadata across multiple datasources. Lastly, we discovered how easy it is to catalog metadata from datasources spread across multiple CSP, SaaS providers, and corporate data centers, and centralize those results in DataHub.
In addition to the basic features reviewed in this post, DataHub offers a growing number of additional capabilities, including GraphQL and Timeline APIs, robust authentication and authorization, application monitoring observability, and Great Expectations integration. All these qualities make DataHub an excellent choice for a data catalog.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 2
Posted by Gary A. Stafford in AWS, Big Data, Cloud, Software Development, SQL on January 14, 2020
Introduction
In part one, we learned how to ingest, transform, and enrich raw, semi-structured data, in multiple formats, using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we will use the transformed and enriched data sources, stored in the data lake, to create compelling visualizations using Amazon QuickSight.
High-level AWS architecture diagram of the demonstration.
Background
If you recall the demonstration from part one of the post, we had adopted the persona of a large, US-based electric energy provider. The energy provider had developed and sold its next-generation Smart Electrical Monitoring Hub (Smart Hub) to residential customers. Customers can analyze their electrical usage with a fine level of granularity, per device and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.
Data Visualization and BI
The data analysis process in the demonstration was divided into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).
Full data analysis workflow diagram (click to enlarge…)
In the final, Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for data visualization and business intelligence, which integrate with Amazon Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, many available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo.
In this demonstration, we will focus on Amazon QuickSight. Amazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that include ML Insights. Dashboards can be accessed from any device, and embedded into your applications, portals, and websites. QuickSight serverlessly scales automatically from tens of users to tens of thousands without any infrastructure management.
Using QuickSight
QuickSight APIs
Amazon recently added a full set of aws quicksight
APIs for interacting with QuickSight. For example, to preview the three QuickSight data sets created for this part of the demo, with the AWS CLI, we would use the list-data-sets
comand.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws quicksight list-data-sets –aws-account-id 123456789012 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"Status": 200, | |
"DataSetSummaries": [ | |
{ | |
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/9eb88a69-20de-d8be-aefd-2c7ac4e23748", | |
"DataSetId": "9eb88a69-20de-d8be-aefd-2c7ac4e23748", | |
"Name": "etl_output_parquet", | |
"CreatedTime": 1578028774.897, | |
"LastUpdatedTime": 1578955245.02, | |
"ImportMode": "SPICE" | |
}, | |
{ | |
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/78e81193-189c-6dd0-864fb-a33244c9654", | |
"DataSetId": "78e81193-189c-6dd0-864fb-a33244c9654", | |
"Name": "electricity_rates_parquet", | |
"CreatedTime": 1578029224.996, | |
"LastUpdatedTime": 1578945179.472, | |
"ImportMode": "SPICE" | |
}, | |
{ | |
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/a474214d-c838-b384-bcca-ea1fcd2dd094", | |
"DataSetId": "a474214d-c838-b384-bcca-ea1fcd2dd094", | |
"Name": "smart_hub_locations_parquet", | |
"CreatedTime": 1578029124.565, | |
"LastUpdatedTime": 1578888788.135, | |
"ImportMode": "SPICE" | |
} | |
], | |
"RequestId": "2524e80c-7c67-7fbd-c3f1-b700c521badc" | |
} |
To examine details of a single data set, with the AWS CLI, we would use the describe-data-set
command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws quicksight describe-data-set \ | |
–aws-account-id 123456789012 \ | |
–data-set-id 9eb88a69-20de-d8be-aefd-2c7ac4e23748 |
QuickSight Console
However, for this final part of the demonstration, we will be working from the Amazon QuickSight Console, as opposed to the AWS CLI, AWS CDK, or CloudFormation templates.
Signing Up for QuickSight
To use Amazon QuickSight, you must sign up for QuickSight.
There are two Editions of Amazon QuickSight, Standard and Enterprise. For this demonstration, the Standard Edition will suffice.
QuickSight Data Sets
Amazon QuickSight uses Data Sets as the basis for all data visualizations. According to AWS, QuickSight data sets can be created from a wide variety of data sources, including Amazon RDS, Amazon Aurora, Amazon Redshift, Amazon Athena, and Amazon S3. You can also upload Excel spreadsheets or flat files (CSV, TSV, CLF, ELF, and JSON), connect to on-premises databases like SQL Server, MySQL, and PostgreSQL and import data from SaaS applications like Salesforce. Below, we see a list of the latest data sources available in the QuickSight New Data Set Console.
Demonstration Data Sets
For the demonstration, I have created three QuickSight data sets, all based on Amazon Athena. You have two options when using Amazon Athena as a data source. The first option is to select a table from an AWS Glue Data Catalog database, such as the database we created in part one of the post, ‘smart_hub_data_catalog.’ The second option is to create a custom SQL query, based on one or more tables in an AWS Glue Data Catalog database.
Of the three data sets created for part two of this demonstration, two data sets use tables directly from the Data Catalog, including ‘etl_output_parquet’ and ‘electricity_rates_parquet.’ The third data set uses a custom SQL query, based on the single Data Catalog table, ‘smart_hub_locations_parquet.’ All three tables used to create the data sets represent the enriched, highly efficient Parquet-format data sources in the S3-based Data Lake.
Data Set Features
There are a large number of features available when creating and configuring data sets. We cannot possibly cover all of them in this post. Let’s look at three features: geospatial field types, calculated fields, and custom SQL.
Geospatial Data Types
QuickSight can intelligently detect common types of geographic fields in a data source and assign QuickSight geographic data type, including Country, County, City, Postcode, and State. QuickSight can also detect geospatial data, including Latitude and Longitude. We will take advantage of this QuickSight feature for our three data set’s data sources, including the State, Postcode, Latitude, and Longitude field types.
Calculated Fields
A commonly-used QuickSight data set feature is the ‘Calculated field.’ For the ‘etl_output_parquet’ data set, I have created a new field (column), cost_dollar
.
The cost
field is the electrical cost of the device, over a five minute time interval, in cents (¢). The calculated cost_dollar
field is the quotient of the cost
field divided by 100. This value represents the electrical cost of the device, over a five minute time interval, in dollars ($). This is a straightforward example. However, a calculated field can be very complex, built from multiple arithmetic, comparison, and conditional functions, string calculations, and data set fields.
Data set calculated fields can also be created and edited from the QuickSight Analysis Console (discussed later).
Custom SQL
The third QuickSight data set is based on an Amazon Athena custom SQL query.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT lon, lat, postcode, hash, tz, state | |
FROM smart_hub_data_catalog.smart_hub_locations_parquet; |
Although you can write queries in the QuickSight Data Prep Console, I prefer to write custom Athena queries using the Athena Query Editor. Using the Editor, you can write, run, debug, and optimize queries to ensure they function correctly, first.
The Athena query can then be pasted into the Custom SQL window. Clicking ‘Finish’ in the window is the equivalent of ‘Run query’ in the Athena Query Editor Console. The query runs and returns data.
Similar to the Athena Query Editor, queries executed in the QuickSight Data Prep Console will show up in the Athena History tab, with a /* QuickSight */
comment prefix.
SPICE
You will notice the three QuickSight data sets are labeled, ‘SPICE.’ According to AWS, the acronym, SPICE, stands for ‘Super-fast, Parallel, In-memory, Calculation Engine.’ QuickSight’s in-memory calculation engine, SPICE, achieves blazing fast performance at scale. SPICE automatically replicates data for high availability allowing thousands of users to simultaneously perform fast, interactive analysis while shielding your underlying data infrastructure, saving you time and resources. With the Standard Edition of QuickSight, as the first Author, you get 1 GB of SPICE in-memory data for free.
QuickSight Analysis
The QuickSight Analysis Console is where Analyses are created. A specific QuickSight Analysis will contain a collection of data sets and data visualizations (visuals). Each visual is associated with a single data set.
Types of QuickSight Analysis visuals include: horizontal and vertical, single and stacked bar charts, line graphs, combination charts, area line charts, scatter plots, heat maps, pie and donut charts, tree maps, pivot tables, gauges, key performance indicators (KPI), geospatial diagrams, and word clouds. Individual visual titles, legends, axis, and other visual aspects can be easily modified. Visuals can contain drill-downs.
A data set’s fields can be modified from within the Analysis Console. Field types and formats, such as date, numeric, currency fields, can be customized for display. The Analysis can include a Title and subtitle. There are some customizable themes available to change the overall look of the Analysis.
Analysis Filters
Data displayed in the visuals can be further shaped using a combination of Filters, Conditional formatting, and Parameters. Below, we see an example of a typical filter based on a range of dates and times. The data set contains two full days’ worth of data. Here, we are filtering the data to a 14-hour peak electrical usage period, between 8 AM and 10 PM on the same day, 12/21/2019.
Drill-Down, Drill-Up, Focus, and Exclude
According to AWS, all visual types except pivot tables offer the ability to create a hierarchy of fields for a visual element. The hierarchy lets you drill down or up to see data at different levels of the hierarchy. Focus allows you to concentrate on a single element within a hierarchy of fields. Exclude allows you to remove an element from a hierarchy of fields. Below, we see an example of all four of these features, available to apply to the ‘Central Air Conditioner’. Since the AC unit is the largest consumer of electricity on average per day, applying these filters to understand its impact on the overall electrical usage may be useful to an analysis. We can also drill down to minutes from hours or up to days from hours.
Example QuickSight Analysis
A QuickSight Analysis is shared by the Analysis Author as a QuickSight Dashboard. Below, we see an example of a QuickSight Dashboard, built and shared for this demonstration. The ‘Residential Electrical Usage Analysis’ is built from the three data sets created earlier. From those data sets, we have constructed several visuals, including a geospatial diagram, donut chart, heat map, KPI, combination chart, stacked vertical bar chart, and line graph. Each visual’s title, layout, and field display has all customized. The data displayed in the visuals have been filtered differently, including by date and time, by customer id (loc_id), and by state. Conditional formatting is used to enhance the visual appearance of visuals, such as the ‘Total Electrical Cost’ KPI.
Conclusion
In part one, we learned how to ingest, transform, and enrich raw, semi-structured data, in multiple formats, using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we used the transformed and enriched datasets, stored in the data lake, to create compelling visualizations using Amazon QuickSight.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 1
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Cloud, Python, Serverless, Software Development on January 5, 2020
Introduction
According to Wikipedia, data analysis is “a process of inspecting, cleansing, transforming, and modeling data with the goal of discovering useful information, informing conclusion, and supporting decision-making.” In this two-part post, we will explore how to get started with data analysis on AWS, using the serverless capabilities of Amazon Athena, AWS Glue, Amazon QuickSight, Amazon S3, and AWS Lambda. We will learn how to use these complementary services to transform, enrich, analyze, and visualize semi-structured data.
Data Analysis—discovering useful information, informing conclusion, and supporting decision-making. –Wikipedia
In part one, we will begin with raw, semi-structured data in multiple formats. We will discover how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We will build an S3-based data lake, and learn how AWS leverages open-source technologies, such as Presto, Apache Hive, and Apache Parquet. In part two, we will learn how to further analyze and visualize the data using Amazon QuickSight. Here’s a quick preview of what we will build in part one of the post.
Demonstration
In this demonstration, we will adopt the persona of a large, US-based electric energy provider. The energy provider has developed its next-generation Smart Electrical Monitoring Hub (Smart Hub). They have sold the Smart Hub to a large number of residential customers throughout the United States. The hypothetical Smart Hub wirelessly collects detailed electrical usage data from individual, smart electrical receptacles and electrical circuit meters, spread throughout the residence. Electrical usage data is encrypted and securely transmitted from the customer’s Smart Hub to the electric provider, who is running their business on AWS.
Customers are able to analyze their electrical usage with fine granularity, per device, and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.
Preview of post’s data in Amazon QuickSight.
The original concept for the Smart Hub was developed as part of a multi-day training and hackathon, I recently attended with an AWSome group of AWS Solutions Architects in San Francisco. As a team, we developed the concept of the Smart Hub integrated with a real-time, serverless, streaming data architecture, leveraging AWS IoT Core, Amazon Kinesis, AWS Lambda, and Amazon DynamoDB.
From left: Bruno Giorgini, Mahalingam (‘Mahali’) Sivaprakasam, Gary Stafford, Amit Kumar Agrawal, and Manish Agarwal.
This post will focus on data analysis, as opposed to the real-time streaming aspect of data capture or how the data is persisted on AWS.
High-level AWS architecture diagram of the demonstration.
Featured Technologies
The following AWS services and open-source technologies are featured prominently in this post.
Amazon S3-based Data Lake
An Amazon S3-based Data Lake uses Amazon S3 as its primary storage platform. Amazon S3 provides an optimal foundation for a data lake because of its virtually unlimited scalability, from gigabytes to petabytes of content. Amazon S3 provides ‘11 nines’ (99.999999999%) durability. It has scalable performance, ease-of-use features, and native encryption and access control capabilities.
AWS Glue
AWS Glue is a fully managed extract, transform, and load (ETL) service to prepare and load data for analytics. AWS Glue discovers your data and stores the associated metadata (e.g., table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.
AWS Glue Data Catalog
The AWS Glue Data Catalog is an Apache Hive Metastore compatible, central repository to store structural and operational metadata for data assets. For a given data set, store table definition, physical location, add business-relevant attributes, as well as track how the data has changed over time.
AWS Glue Crawler
An AWS Glue Crawler connects to a data store, progresses through a prioritized list of classifiers to extract the schema of your data and other statistics, and then populates the Glue Data Catalog with this metadata. Crawlers can run periodically to detect the availability of new data as well as changes to existing data, including table definition changes. Crawlers automatically add new tables, new partitions to an existing table, and new versions of table definitions. You can even customize Glue Crawlers to classify your own file types.
AWS Glue ETL Job
An AWS Glue ETL Job is the business logic that performs extract, transform, and load (ETL) work in AWS Glue. When you start a job, AWS Glue runs a script that extracts data from sources, transforms the data, and loads it into targets. AWS Glue generates a PySpark or Scala script, which runs on Apache Spark.
Amazon Athena
Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena supports and works with a variety of standard data formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet. Athena is integrated, out-of-the-box, with AWS Glue Data Catalog. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.
The underlying technology behind Amazon Athena is Presto, the open-source distributed SQL query engine for big data, created by Facebook. According to the AWS, the Athena query engine is based on Presto 0.172 (released April 9, 2017). In addition to Presto, Athena uses Apache Hive to define tables.
Amazon QuickSight
Amazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that can then be accessed from any device, and embedded into your applications, portals, and websites.
AWS Lambda
AWS Lambda automatically runs code without requiring the provisioning or management servers. AWS Lambda automatically scales applications by running code in response to triggers. Lambda code runs in parallel. With AWS Lambda, you are charged for every 100ms your code executes and the number of times your code is triggered. You pay only for the compute time you consume.
Smart Hub Data
Everything in this post revolves around data. For the post’s demonstration, we will start with four categories of raw, synthetic data. Those data categories include Smart Hub electrical usage data, Smart Hub sensor mapping data, Smart Hub residential locations data, and electrical rate data. To demonstrate the capabilities of AWS Glue to handle multiple data formats, the four categories of raw data consist of three distinct file formats: XML, JSON, and CSV. I have attempted to incorporate as many ‘real-world’ complexities into the data without losing focus on the main subject of the post. The sample datasets are intentionally small to keep your AWS costs to a minimum for the demonstration.
To further reduce costs, we will use a variety of data partitioning schemes. According to AWS, by partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost. We have very little data for the demonstration, in which case partitioning may negatively impact query performance. However, in a ‘real-world’ scenario, there would be millions of potential residential customers generating terabytes of data. In that case, data partitioning would be essential for both cost and performance.
Smart Hub Electrical Usage Data
The Smart Hub’s time-series electrical usage data is collected from the customer’s Smart Hub. In the demonstration’s sample electrical usage data, each row represents a completely arbitrary five-minute time interval. There are a total of ten electrical sensors whose electrical usage in kilowatt-hours (kW) is recorded and transmitted. Each Smart Hub records and transmits electrical usage for 10 device sensors, 288 times per day (24 hr / 5 min intervals), for a total of 2,880 data points per day, per Smart Hub. There are two days worth of usage data for the demonstration, for a total of 5,760 data points. The data is stored in JSON Lines format. The usage data will be partitioned in the Amazon S3-based data lake by date (e.g., ‘dt=2019-12-21’).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"loc_id":"b6a8d42425fde548","ts":1576915200,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04167}} | |
{"loc_id":"b6a8d42425fde548","ts":1576915500,"data":{"s_01":0,"s_02":0.00552,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04147}} | |
{"loc_id":"b6a8d42425fde548","ts":1576915800,"data":{"s_01":0.29267,"s_02":0.00642,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04207}} | |
{"loc_id":"b6a8d42425fde548","ts":1576916100,"data":{"s_01":0.29207,"s_02":0.00592,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04137}} | |
{"loc_id":"b6a8d42425fde548","ts":1576916400,"data":{"s_01":0.29217,"s_02":0.00622,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04157}} | |
{"loc_id":"b6a8d42425fde548","ts":1576916700,"data":{"s_01":0,"s_02":0.00562,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04197}} | |
{"loc_id":"b6a8d42425fde548","ts":1576917000,"data":{"s_01":0,"s_02":0.00512,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04257}} | |
{"loc_id":"b6a8d42425fde548","ts":1576917300,"data":{"s_01":0,"s_02":0.00522,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04177}} | |
{"loc_id":"b6a8d42425fde548","ts":1576917600,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04267}} | |
{"loc_id":"b6a8d42425fde548","ts":1576917900,"data":{"s_01":0,"s_02":0.00612,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04237}} |
Note the electrical usage data contains nested data. The electrical usage for each of the ten sensors is contained in a JSON array, within each time series entry. The array contains ten numeric values of type, double.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"loc_id": "b6a8d42425fde548", | |
"ts": 1576916400, | |
"data": { | |
"s_01": 0.29217, | |
"s_02": 0.00622, | |
"s_03": 0, | |
"s_04": 0, | |
"s_05": 0, | |
"s_06": 0, | |
"s_07": 0, | |
"s_08": 0, | |
"s_09": 0, | |
"s_10": 0.04157 | |
} | |
} |
Real data is often complex and deeply nested. Later in the post, we will see that AWS Glue can map many common data types, including nested data objects, as illustrated below.
Smart Hub Sensor Mappings
The Smart Hub sensor mappings data maps a sensor column in the usage data (e.g., ‘s_01’ to the corresponding actual device (e.g., ‘Central Air Conditioner’). The data contains the device location, wattage, and the last time the record was modified. The data is also stored in JSON Lines format. The sensor mappings data will be partitioned in the Amazon S3-based data lake by the state of the residence (e.g., ‘state=or’ for Oregon).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"loc_id":"b6a8d42425fde548","id":"s_01","description":"Central Air Conditioner","location":"N/A","watts":3500,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_02","description":"Ceiling Fan","location":"Master Bedroom","watts":65,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_03","description":"Clothes Dryer","location":"Basement","watts":5000,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_04","description":"Clothes Washer","location":"Basement","watts":1800,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_05","description":"Dishwasher","location":"Kitchen","watts":900,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_06","description":"Flat Screen TV","location":"Living Room","watts":120,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_07","description":"Microwave Oven","location":"Kitchen","watts":1000,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_08","description":"Coffee Maker","location":"Kitchen","watts":900,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_09","description":"Hair Dryer","location":"Master Bathroom","watts":2000,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_10","description":"Refrigerator","location":"Kitchen","watts":500,"last_modified":1559347200} |
Smart Hub Locations
The Smart Hub locations data contains the geospatial coordinates, home address, and timezone for each residential Smart Hub. The data is stored in CSV format. The data for the four cities included in this demonstration originated from OpenAddresses, ‘the free and open global address collection.’ There are approximately 4k location records. The location data will be partitioned in the Amazon S3-based data lake by the state of the residence where the Smart Hub is installed (e.g., ‘state=or’ for Oregon).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lon | lat | number | street | unit | city | district | region | postcode | id | hash | tz | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
-122.8077278 | 45.4715614 | 6635 | SW JUNIPER TER | 97008 | b6a8d42425fde548 | America/Los_Angeles | ||||||
-122.8356634 | 45.4385864 | 11225 | SW PINTAIL LOOP | 97007 | 08ae3df798df8b90 | America/Los_Angeles | ||||||
-122.8252379 | 45.4481709 | 9930 | SW WRANGLER PL | 97008 | 1c7e1f7df752663e | America/Los_Angeles | ||||||
-122.8354211 | 45.4535977 | 9174 | SW PLATINUM PL | 97007 | b364854408ee431e | America/Los_Angeles | ||||||
-122.8315771 | 45.4949449 | 15040 | SW MILLIKAN WAY | # 233 | 97003 | 0e97796ba31ba3b4 | America/Los_Angeles | |||||
-122.7950339 | 45.4470259 | 10006 | SW CONESTOGA DR | # 113 | 97008 | 2b5307be5bfeb026 | America/Los_Angeles | |||||
-122.8072836 | 45.4908594 | 12600 | SW CRESCENT ST | # 126 | 97005 | 4d74167f00f63f50 | America/Los_Angeles | |||||
-122.8211801 | 45.4689303 | 7100 | SW 140TH PL | 97008 | c5568631f0b9de9c | America/Los_Angeles | ||||||
-122.831154 | 45.4317057 | 15050 | SW MALLARD DR | # 101 | 97007 | dbd1321080ce9682 | America/Los_Angeles | |||||
-122.8162856 | 45.4442878 | 10460 | SW 136TH PL | 97008 | 008faab8a9a3e519 | America/Los_Angeles |
Electrical Rates
Lastly, the electrical rate data contains the cost of electricity. In this demonstration, the assumption is that the rate varies by state, by month, and by the hour of the day. The data is stored in XML, a data export format still common to older, legacy systems. The electrical rate data will not be partitioned in the Amazon S3-based data lake.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<root> | |
<row> | |
<state>or</state> | |
<year>2019</year> | |
<month>12</month> | |
<from>19:00:00</from> | |
<to>19:59:59</to> | |
<type>peak</type> | |
<rate>12.623</rate> | |
</row> | |
<row> | |
<state>or</state> | |
<year>2019</year> | |
<month>12</month> | |
<from>20:00:00</from> | |
<to>20:59:59</to> | |
<type>partial-peak</type> | |
<rate>7.232</rate> | |
</row> | |
<row> | |
<state>or</state> | |
<year>2019</year> | |
<month>12</month> | |
<from>21:00:00</from> | |
<to>21:59:59</to> | |
<type>partial-peak</type> | |
<rate>7.232</rate> | |
</row> | |
<row> | |
<state>or</state> | |
<year>2019</year> | |
<month>12</month> | |
<from>22:00:00</from> | |
<to>22:59:59</to> | |
<type>off-peak</type> | |
<rate>4.209</rate> | |
</row> | |
</root> |
Data Analysis Process
Due to the number of steps involved in the data analysis process in the demonstration, I have divided the process into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).
Full data analysis workflow diagram (click to enlarge…)
Raw Data Ingestion
In the Raw Data Ingestion stage, semi-structured CSV-, XML-, and JSON-format data files are copied to a secure Amazon Simple Storage Service (S3) bucket. Within the bucket, data files are organized into folders based on their physical data structure (schema). Due to the potentially unlimited number of data files, files are further organized (partitioned) into subfolders. Organizational strategies for data files are based on date, time, geographic location, customer id, or other common data characteristics.
This collection of semi-structured data files, S3 buckets, and partitions form what is referred to as a Data Lake. According to AWS, a data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.
A series of AWS Glue Crawlers process the raw CSV-, XML-, and JSON-format files, extracting metadata, and creating table definitions in the AWS Glue Data Catalog. According to AWS, an AWS Glue Data Catalog contains metadata tables, where each table specifies a single data store.
Data Transformation
In the Data Transformation stage, the raw data in the previous stage is transformed. Data transformation may include both modifying the data and changing the data format. Data modifications include data cleansing, re-casting data types, changing date formats, field-level computations, and field concatenation.
The data is then converted from CSV-, XML-, and JSON-format to Apache Parquet format and written back to the Amazon S3-based data lake. Apache Parquet is a compressed, efficient columnar storage format. Amazon Athena, like many Cloud-based services, charges you by the amount of data scanned per query. Hence, using data partitioning, bucketing, compression, and columnar storage formats, like Parquet, will reduce query cost.
Lastly, the transformed Parquet-format data is cataloged to new tables, alongside the raw CSV, XML, and JSON data, in the Glue Data Catalog.
Data Enrichment
According to ScienceDirect, data enrichment or augmentation is the process of enhancing existing information by supplementing missing or incomplete data. Typically, data enrichment is achieved by using external data sources, but that is not always the case.
Data Enrichment—the process of enhancing existing information by supplementing missing or incomplete data. –ScienceDirect
In the Data Enrichment stage, the Parquet-format Smart Hub usage data is augmented with related data from the three other data sources: sensor mappings, locations, and electrical rates. The customer’s Smart Hub usage data is enriched with the customer’s device types, the customer’s timezone, and customer’s electricity cost per monitored period based on the customer’s geographic location and time of day.
Once the data is enriched, it is converted to Parquet and optimized for query performance, stored in the data lake, and cataloged. At this point, the original CSV-, XML-, and JSON-format raw data files, the transformed Parquet-format data files, and the Parquet-format enriched data files are all stored in the Amazon S3-based data lake and cataloged in the Glue Data Catalog.
Data Visualization
In the final Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for visualization and Business Intelligence, which integrate with Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo. In this demonstration, we will focus on Amazon QuickSight.
Getting Started
Requirements
To follow along with the demonstration, you will need an AWS Account and a current version of the AWS CLI. To get the most from the demonstration, you should also have Python 3 and jq installed in your work environment.
Source Code
All source code for this post can be found on GitHub. Use the following command to clone a copy of the project.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone \ | |
–branch master –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/athena-glue-quicksight-demo.git |
Source code samples in this post are displayed as GitHub Gists, which will not display correctly on some mobile and social media browsers.
TL;DR?
Just want the jump in without reading the instructions? All the AWS CLI commands, found within the post, are consolidated in the GitHub project’s README file.
CloudFormation Stack
To start, create the ‘smart-hub-athena-glue-stack’ CloudFormation stack using the smart-hub-athena-glue.yml template. The template will create (3) Amazon S3 buckets, (1) AWS Glue Data Catalog Database, (5) Data Catalog Database Tables, (6) AWS Glue Crawlers, (1) AWS Glue ETL Job, and (1) IAM Service Role for AWS Glue.
Make sure to change the DATA_BUCKET
, SCRIPT_BUCKET
, and LOG_BUCKET
variables, first, to your own unique S3 bucket names. I always suggest using the standard AWS 3-part convention of 1) descriptive name, 2) AWS Account ID or Account Alias, and 3) AWS Region, to name your bucket (e.g. ‘smart-hub-data-123456789012-us-east-1’).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# *** CHANGE ME *** | |
BUCKET_SUFFIX="123456789012-us-east-1" | |
DATA_BUCKET="smart-hub-data-${BUCKET_SUFFIX}" | |
SCRIPT_BUCKET="smart-hub-scripts-${BUCKET_SUFFIX}" | |
LOG_BUCKET="smart-hub-logs-${BUCKET_SUFFIX}" | |
aws cloudformation create-stack \ | |
–stack-name smart-hub-athena-glue-stack \ | |
–template-body file://cloudformation/smart-hub-athena-glue.yml \ | |
–parameters ParameterKey=DataBucketName,ParameterValue=${DATA_BUCKET} \ | |
ParameterKey=ScriptBucketName,ParameterValue=${SCRIPT_BUCKET} \ | |
ParameterKey=LogBucketName,ParameterValue=${LOG_BUCKET} \ | |
–capabilities CAPABILITY_NAMED_IAM |
Raw Data Files
Next, copy the raw CSV-, XML-, and JSON-format data files from the local project to the DATA_BUCKET
S3 bucket (steps 1a-1b in workflow diagram). These files represent the beginnings of the S3-based data lake. Each category of data uses a different strategy for organizing and separating the files. Note the use of the Apache Hive-style partitions (e.g., /smart_hub_data_json/dt=2019-12-21
). As discussed earlier, the assumption is that the actual, large volume of data in the data lake would necessitate using partitioning to improve query performance.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# location data | |
aws s3 cp data/locations/denver_co_1576656000.csv \ | |
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=co/ | |
aws s3 cp data/locations/palo_alto_ca_1576742400.csv \ | |
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ca/ | |
aws s3 cp data/locations/portland_metro_or_1576742400.csv \ | |
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=or/ | |
aws s3 cp data/locations/stamford_ct_1576569600.csv \ | |
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ct/ | |
# sensor mapping data | |
aws s3 cp data/mappings/ \ | |
s3://${DATA_BUCKET}/sensor_mappings_json/state=or/ \ | |
–recursive | |
# electrical usage data | |
aws s3 cp data/usage/2019-12-21/ \ | |
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-21/ \ | |
–recursive | |
aws s3 cp data/usage/2019-12-22/ \ | |
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-22/ \ | |
–recursive | |
# electricity rates data | |
aws s3 cp data/rates/ \ | |
s3://${DATA_BUCKET}/electricity_rates_xml/ \ | |
–recursive |
Confirm the contents of the DATA_BUCKET
S3 bucket with the following command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 ls s3://${DATA_BUCKET}/ \ | |
–recursive –human-readable –summarize |
There should be a total of (14) raw data files in the DATA_BUCKET
S3 bucket.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2020-01-04 14:39:51 20.0 KiB electricity_rates_xml/2019_12_1575270000.xml | |
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/08ae3df798df8b90_1550908800.json | |
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/1c7e1f7df752663e_1559347200.json | |
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/b6a8d42425fde548_1568314800.json | |
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/08ae3df798df8b90_1576915200.json | |
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/1c7e1f7df752663e_1576915200.json | |
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/b6a8d42425fde548_1576915200.json | |
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/08ae3df798df8b90_15770016000.json | |
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/1c7e1f7df752663e_1577001600.json | |
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/b6a8d42425fde548_15770016001.json | |
2020-01-04 14:39:39 89.7 KiB smart_hub_locations_csv/state=ca/palo_alto_ca_1576742400.csv | |
2020-01-04 14:39:37 84.2 KiB smart_hub_locations_csv/state=co/denver_co_1576656000.csv | |
2020-01-04 14:39:44 78.6 KiB smart_hub_locations_csv/state=ct/stamford_ct_1576569600.csv | |
2020-01-04 14:39:42 91.6 KiB smart_hub_locations_csv/state=or/portland_metro_or_1576742400.csv | |
Total Objects: 14 | |
Total Size: 636.7 KiB |
Lambda Functions
Next, package the (5) Python3.8-based AWS Lambda functions for deployment.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pushd lambdas/athena-json-to-parquet-data || exit | |
zip -r package.zip index.py | |
popd || exit | |
pushd lambdas/athena-csv-to-parquet-locations || exit | |
zip -r package.zip index.py | |
popd || exit | |
pushd lambdas/athena-json-to-parquet-mappings || exit | |
zip -r package.zip index.py | |
popd || exit | |
pushd lambdas/athena-complex-etl-query || exit | |
zip -r package.zip index.py | |
popd || exit | |
pushd lambdas/athena-parquet-to-parquet-elt-data || exit | |
zip -r package.zip index.py | |
popd || exit |
Copy the five Lambda packages to the SCRIPT_BUCKET
S3 bucket. The ZIP archive Lambda packages are accessed by the second CloudFormation stack, smart-hub-serverless. This CloudFormation stack, which creates the Lambda functions, will fail to deploy if the packages are not found in the SCRIPT_BUCKET
S3 bucket.
I have chosen to place the packages in a different S3 bucket then the raw data files. In a real production environment, these two types of files would be separated, minimally, into separate buckets for security. Remember, only data should go into the data lake.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 cp lambdas/athena-json-to-parquet-data/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_data/ | |
aws s3 cp lambdas/athena-csv-to-parquet-locations/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_csv_to_parquet_locations/ | |
aws s3 cp lambdas/athena-json-to-parquet-mappings/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_mappings/ | |
aws s3 cp lambdas/athena-complex-etl-query/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_complex_etl_query/ | |
aws s3 cp lambdas/athena-parquet-to-parquet-elt-data/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_parquet_to_parquet_elt_data/ |
Create the second ‘smart-hub-lambda-stack’ CloudFormation stack using the smart-hub-lambda.yml CloudFormation template. The template will create (5) AWS Lambda functions and (1) Lambda execution IAM Service Role.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws cloudformation create-stack \ | |
–stack-name smart-hub-lambda-stack \ | |
–template-body file://cloudformation/smart-hub-lambda.yml \ | |
–capabilities CAPABILITY_NAMED_IAM |
At this point, we have deployed all of the AWS resources required for the demonstration using CloudFormation. We have also copied all of the raw CSV-, XML-, and JSON-format data files in the Amazon S3-based data lake.
AWS Glue Crawlers
If you recall, we created five tables in the Glue Data Catalog database as part of the CloudFormation stack. One table for each of the four raw data types and one table to hold temporary ELT data later in the demonstration. To confirm the five tables were created in the Glue Data Catalog database, use the Glue Data Catalog Console, or run the following AWS CLI / jq command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue get-tables \ | |
–database-name smart_hub_data_catalog \ | |
| jq -r '.TableList[].Name' |
The five data catalog tables should be as follows.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
electricity_rates_xml | |
etl_tmp_output_parquet | |
sensor_mappings_json | |
smart_hub_data_json | |
smart_hub_locations_csv |
We also created six Glue Crawlers as part of the CloudFormation template. Four of these Crawlers are responsible for cataloging the raw CSV-, XML-, and JSON-format data from S3 into the corresponding, existing Glue Data Catalog database tables. The Crawlers will detect any new partitions and add those to the tables as well. Each Crawler corresponds to one of the four raw data types. Crawlers can be scheduled to run periodically, cataloging new data and updating data partitions. Crawlers will also create a Data Catalog database tables. We use Crawlers to create new tables, later in the post.
Run the four Glue Crawlers using the AWS CLI (step 1c in workflow diagram).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-crawler –name smart-hub-locations-csv | |
aws glue start-crawler –name smart-hub-sensor-mappings-json | |
aws glue start-crawler –name smart-hub-data-json | |
aws glue start-crawler –name smart-hub-rates-xml |
You can check the Glue Crawler Console to ensure the four Crawlers finished successfully.
Alternately, use another AWS CLI / jq command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue get-crawler-metrics \ | |
| jq -r '.CrawlerMetricsList[] | "\(.CrawlerName): \(.StillEstimating), \(.TimeLeftSeconds)"' \ | |
| grep "^smart-hub-[A-Za-z-]*" |
When complete, all Crawlers should all be in a state of ‘Still Estimating = false’ and ‘TimeLeftSeconds = 0’. In my experience, the Crawlers can take up one minute to start, after the estimation stage, and one minute to stop when complete.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
smart-hub-data-json: true, 0 | |
smart-hub-etl-tmp-output-parquet: false, 0 | |
smart-hub-locations-csv: false, 15 | |
smart-hub-rates-parquet: false, 0 | |
smart-hub-rates-xml: false, 15 | |
smart-hub-sensor-mappings-json: false, 15 |
Successfully running the four Crawlers completes the Raw Data Ingestion stage of the demonstration.
Converting to Parquet with CTAS
With the Raw Data Ingestion stage completed, we will now transform the raw Smart Hub usage data, sensor mapping data, and locations data into Parquet-format using three AWS Lambda functions. Each Lambda subsequently calls Athena, which executes a CREATE TABLE AS SELECT
SQL statement (aka CTAS) . Each Lambda executes a similar command, varying only by data source, data destination, and partitioning scheme. Below, is an example of the command used for the Smart Hub electrical usage data, taken from the Python-based Lambda, athena-json-to-parquet-data/index.py.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query = \ | |
"CREATE TABLE IF NOT EXISTS " + data_catalog + "." + output_directory + " " \ | |
"WITH ( " \ | |
" format = 'PARQUET', " \ | |
" parquet_compression = 'SNAPPY', " \ | |
" partitioned_by = ARRAY['dt'], " \ | |
" external_location = 's3://" + data_bucket + "/" + output_directory + "' " \ | |
") AS " \ | |
"SELECT * " \ | |
"FROM " + data_catalog + "." + input_directory + ";" |
This compact, yet powerful CTAS statement converts a copy of the raw JSON- and CSV-format data files into Parquet-format, and partitions and stores the resulting files back into the S3-based data lake. Additionally, the CTAS SQL statement catalogs the Parquet-format data files into the Glue Data Catalog database, into new tables. Unfortunately, this method will not work for the XML-format raw data files, which we will tackle next.
The five deployed Lambda functions should be visible from the Lambda Console’s Functions tab.
Invoke the three Lambda functions using the AWS CLI. (part of step 2a in workflow diagram).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws lambda invoke \ | |
–function-name athena-json-to-parquet-data \ | |
response.json | |
aws lambda invoke \ | |
–function-name athena-csv-to-parquet-locations \ | |
response.json | |
aws lambda invoke \ | |
–function-name athena-json-to-parquet-mappings \ | |
response.json |
Here is an example of the same CTAS command, shown above for the Smart Hub electrical usage data, as it is was executed successfully by Athena.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE IF NOT EXISTS smart_hub_data_catalog.smart_hub_data_parquet | |
WITH (format = 'PARQUET', | |
parquet_compression = 'SNAPPY', | |
partitioned_by = ARRAY['dt'], | |
external_location = 's3://smart-hub-data-demo-account-1-us-east-1/smart_hub_data_parquet') | |
AS | |
SELECT * | |
FROM smart_hub_data_catalog.smart_hub_data_json |
We can view any Athena SQL query from the Athena Console’s History tab. Clicking on a query (in pink) will copy it to the Query Editor tab and execute it. Below, we see the three SQL statements executed by the Lamba functions.
AWS Glue ETL Job for XML
If you recall, the electrical rate data is in XML format. The Lambda functions we just executed, converted the CSV and JSON data to Parquet using Athena. Currently, unlike CSV, JSON, ORC, Parquet, and Avro, Athena does not support the older XML data format. For the XML data files, we will use an AWS Glue ETL Job to convert the XML data to Parquet. The Glue ETL Job is written in Python and uses Apache Spark, along with several AWS Glue PySpark extensions. For this job, I used an existing script created in the Glue ETL Jobs Console as a base, then modified the script to meet my needs.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
args = getResolvedOptions(sys.argv, [ | |
'JOB_NAME', | |
's3_output_path', | |
'source_glue_database', | |
'source_glue_table' | |
]) | |
s3_output_path = args['s3_output_path'] | |
source_glue_database = args['source_glue_database'] | |
source_glue_table = args['source_glue_table'] | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args['JOB_NAME'], args) | |
datasource0 = glueContext. \ | |
create_dynamic_frame. \ | |
from_catalog(database=source_glue_database, | |
table_name=source_glue_table, | |
transformation_ctx="datasource0") | |
applymapping1 = ApplyMapping.apply( | |
frame=datasource0, | |
mappings=[("from", "string", "from", "string"), | |
("to", "string", "to", "string"), | |
("type", "string", "type", "string"), | |
("rate", "double", "rate", "double"), | |
("year", "int", "year", "int"), | |
("month", "int", "month", "int"), | |
("state", "string", "state", "string")], | |
transformation_ctx="applymapping1") | |
resolvechoice2 = ResolveChoice.apply( | |
frame=applymapping1, | |
choice="make_struct", | |
transformation_ctx="resolvechoice2") | |
dropnullfields3 = DropNullFields.apply( | |
frame=resolvechoice2, | |
transformation_ctx="dropnullfields3") | |
datasink4 = glueContext.write_dynamic_frame.from_options( | |
frame=dropnullfields3, | |
connection_type="s3", | |
connection_options={ | |
"path": s3_output_path, | |
"partitionKeys": ["state"] | |
}, | |
format="parquet", | |
transformation_ctx="datasink4") | |
job.commit() |
The three Python command-line arguments the script expects (lines 10–12, above) are defined in the CloudFormation template, smart-hub-athena-glue.yml. Below, we see them on lines 10–12 of the CloudFormation snippet. They are injected automatically when the job is run and can be overridden from the command line when starting the job.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GlueJobRatesToParquet: | |
Type: AWS::Glue::Job | |
Properties: | |
GlueVersion: 1.0 | |
Command: | |
Name: glueetl | |
PythonVersion: 3 | |
ScriptLocation: !Sub "s3://${ScriptBucketName}/glue_scripts/rates_xml_to_parquet.py" | |
DefaultArguments: { | |
"–s3_output_path": !Sub "s3://${DataBucketName}/electricity_rates_parquet", | |
"–source_glue_database": !Ref GlueDatabase, | |
"–source_glue_table": "electricity_rates_xml", | |
"–job-bookmark-option": "job-bookmark-enable", | |
"–enable-spark-ui": "true", | |
"–spark-event-logs-path": !Sub "s3://${LogBucketName}/glue-etl-jobs/" | |
} | |
Description: "Convert electrical rates XML data to Parquet" | |
ExecutionProperty: | |
MaxConcurrentRuns: 2 | |
MaxRetries: 0 | |
Name: rates-xml-to-parquet | |
Role: !GetAtt "CrawlerRole.Arn" | |
DependsOn: | |
– CrawlerRole | |
– GlueDatabase | |
– DataBucket | |
– ScriptBucket | |
– LogBucket |
First, copy the Glue ETL Job Python script to the SCRIPT_BUCKET
S3 bucket.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 cp glue-scripts/rates_xml_to_parquet.py \ | |
s3://${SCRIPT_BUCKET}/glue_scripts/ |
Next, start the Glue ETL Job (part of step 2a in workflow diagram). Although the conversion is a relatively simple set of tasks, the creation of the Apache Spark environment, to execute the tasks, will take several minutes. Whereas the Glue Crawlers took about 2 minutes on average, the Glue ETL Job could take 10–15 minutes in my experience. The actual execution time only takes about 1–2 minutes of the 10–15 minutes to complete. In my opinion, waiting up to 15 minutes is too long to be viable for ad-hoc jobs against smaller datasets; Glue ETL Jobs are definitely targeted for big data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-job-run –job-name rates-xml-to-parquet |
To check on the status of the job, use the Glue ETL Jobs Console, or use the AWS CLI.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# get status of most recent job (the one that is running) | |
aws glue get-job-run \ | |
–job-name rates-xml-to-parquet \ | |
–run-id "$(aws glue get-job-runs \ | |
–job-name rates-xml-to-parquet \ | |
| jq -r '.JobRuns[0].Id')" |
When complete, you should see results similar to the following. Note the ‘JobRunState’ is ‘SUCCEEDED.’ This particular job ran for a total of 14.92 minutes, while the actual execution time was 2.25 minutes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"JobRun": { | |
"Id": "jr_f7186b26bf042ea7773ad08704d012d05299f080e7ac9b696ca8dd575f79506b", | |
"Attempt": 0, | |
"JobName": "rates-xml-to-parquet", | |
"StartedOn": 1578022390.301, | |
"LastModifiedOn": 1578023285.632, | |
"CompletedOn": 1578023285.632, | |
"JobRunState": "SUCCEEDED", | |
"PredecessorRuns": [], | |
"AllocatedCapacity": 10, | |
"ExecutionTime": 135, | |
"Timeout": 2880, | |
"MaxCapacity": 10.0, | |
"LogGroupName": "/aws-glue/jobs", | |
"GlueVersion": "1.0" | |
} | |
} |
The job’s progress and the results are also visible in the AWS Glue Console’s ETL Jobs tab.
Detailed Apache Spark logs are also available in CloudWatch Management Console, which is accessible directly from the Logs link in the AWS Glue Console’s ETL Jobs tab.
The last step in the Data Transformation stage is to convert catalog the Parquet-format electrical rates data, created with the previous Glue ETL Job, using yet another Glue Crawler (part of step 2b in workflow diagram). Start the following Glue Crawler to catalog the Parquet-format electrical rates data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-crawler –name smart-hub-rates-parquet |
This concludes the Data Transformation stage. The raw and transformed data is in the data lake, and the following nine tables should exist in the Glue Data Catalog.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
electricity_rates_parquet | |
electricity_rates_xml | |
etl_tmp_output_parquet | |
sensor_mappings_json | |
sensor_mappings_parquet | |
smart_hub_data_json | |
smart_hub_data_parquet | |
smart_hub_locations_csv | |
smart_hub_locations_parquet |
If we examine the tables, we should observe the data partitions we used to organize the data files in the Amazon S3-based data lake are contained in the table metadata. Below, we see the four partitions, based on state, of the Parquet-format locations data.
Data Enrichment
To begin the Data Enrichment stage, we will invoke the AWS Lambda, athena-complex-etl-query/index.py. This Lambda accepts input parameters (lines 28–30, below), passed in the Lambda handler’s event parameter. The arguments include the Smart Hub ID, the start date for the data requested, and the end date for the data requested. The scenario for the demonstration is that a customer with the location id value, using the electrical provider’s application, has requested data for a particular range of days (start date and end date), to visualize and analyze.
The Lambda executes a series of Athena INSERT INTO
SQL statements, one statement for each of the possible Smart Hub connected electrical sensors, s_01
through s_10
, for which there are values in the Smart Hub electrical usage data. Amazon just released the Amazon Athena INSERT INTO
a table using the results of a SELECT query capability in September 2019, an essential addition to Athena. New Athena features are listed in the release notes.
Here, the SELECT query is actually a series of chained subqueries, using Presto SQL’s WITH clause capability. The queries join the Parquet-format Smart Hub electrical usage data sources in the S3-based data lake, with the other three Parquet-format, S3-based data sources: sensor mappings, locations, and electrical rates. The Parquet-format data is written as individual files to S3 and inserted into the existing ‘etl_tmp_output_parquet’ Glue Data Catalog database table. Compared to traditional relational database-based queries, the capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!
The capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!
Below, we see the SQL statement starting on line 43.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import os | |
import logging | |
import json | |
from typing import Dict | |
# environment variables | |
data_catalog = os.getenv('DATA_CATALOG') | |
data_bucket = os.getenv('DATA_BUCKET') | |
# variables | |
output_directory = 'etl_tmp_output_parquet' | |
# uses list comprehension to generate the equivalent of: | |
# ['s_01', 's_02', …, 's_09', 's_10'] | |
sensors = [f's_{i:02d}' for i in range(1, 11)] | |
# logging | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
# athena client | |
athena_client = boto3.client('athena') | |
def handler(event, context): | |
args = { | |
"loc_id": event['loc_id'], | |
"date_from": event['date_from'], | |
"date_to": event['date_to'] | |
} | |
athena_query(args) | |
return { | |
'statusCode': 200, | |
'body': json.dumps("function 'athena-complex-etl-query' complete") | |
} | |
def athena_query(args: Dict[str, str]): | |
for sensor in sensors: | |
query = \ | |
"INSERT INTO " + data_catalog + "." + output_directory + " " \ | |
"WITH " \ | |
" t1 AS " \ | |
" (SELECT d.loc_id, d.ts, d.data." + sensor + " AS kwh, l.state, l.tz " \ | |
" FROM smart_hub_data_catalog.smart_hub_data_parquet d " \ | |
" LEFT OUTER JOIN smart_hub_data_catalog.smart_hub_locations_parquet l " \ | |
" ON d.loc_id = l.hash " \ | |
" WHERE d.loc_id = '" + args['loc_id'] + "' " \ | |
" AND d.dt BETWEEN cast('" + args['date_from'] + \ | |
"' AS date) AND cast('" + args['date_to'] + "' AS date)), " \ | |
" t2 AS " \ | |
" (SELECT at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz) AS ts, " \ | |
" date_format(at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz), '%H') AS rate_period, " \ | |
" m.description AS device, m.location, t1.loc_id, t1.state, t1.tz, t1.kwh " \ | |
" FROM t1 LEFT OUTER JOIN smart_hub_data_catalog.sensor_mappings_parquet m " \ | |
" ON t1.loc_id = m.loc_id " \ | |
" WHERE t1.loc_id = '" + args['loc_id'] + "' " \ | |
" AND m.state = t1.state " \ | |
" AND m.description = (SELECT m2.description " \ | |
" FROM smart_hub_data_catalog.sensor_mappings_parquet m2 " \ | |
" WHERE m2.loc_id = '" + args['loc_id'] + "' AND m2.id = '" + sensor + "')), " \ | |
" t3 AS " \ | |
" (SELECT substr(r.to, 1, 2) AS rate_period, r.type, r.rate, r.year, r.month, r.state " \ | |
" FROM smart_hub_data_catalog.electricity_rates_parquet r " \ | |
" WHERE r.year BETWEEN cast(date_format(cast('" + args['date_from'] + \ | |
"' AS date), '%Y') AS integer) AND cast(date_format(cast('" + args['date_to'] + \ | |
"' AS date), '%Y') AS integer)) " \ | |
"SELECT replace(cast(t2.ts AS VARCHAR), concat(' ', t2.tz), '') AS ts, " \ | |
" t2.device, t2.location, t3.type, t2.kwh, t3.rate AS cents_per_kwh, " \ | |
" round(t2.kwh * t3.rate, 4) AS cost, t2.state, t2.loc_id " \ | |
"FROM t2 LEFT OUTER JOIN t3 " \ | |
" ON t2.rate_period = t3.rate_period " \ | |
"WHERE t3.state = t2.state " \ | |
"ORDER BY t2.ts, t2.device;" | |
logger.info(query) | |
response = athena_client.start_query_execution( | |
QueryString=query, | |
QueryExecutionContext={ | |
'Database': data_catalog | |
}, | |
ResultConfiguration={ | |
'OutputLocation': 's3://' + data_bucket + '/tmp/' + output_directory | |
}, | |
WorkGroup='primary' | |
) | |
logger.info(response) |
Below, is an example of one of the final queries, for the s_10
sensor, as executed by Athena. All the input parameter values, Python variables, and environment variables have been resolved into the query.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT INTO smart_hub_data_catalog.etl_tmp_output_parquet | |
WITH t1 AS (SELECT d.loc_id, d.ts, d.data.s_10 AS kwh, l.state, l.tz | |
FROM smart_hub_data_catalog.smart_hub_data_parquet d | |
LEFT OUTER JOIN smart_hub_data_catalog.smart_hub_locations_parquet l ON d.loc_id = l.hash | |
WHERE d.loc_id = 'b6a8d42425fde548' | |
AND d.dt BETWEEN cast('2019-12-21' AS date) AND cast('2019-12-22' AS date)), | |
t2 AS (SELECT at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz) AS ts, | |
date_format(at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz), '%H') AS rate_period, | |
m.description AS device, | |
m.location, | |
t1.loc_id, | |
t1.state, | |
t1.tz, | |
t1.kwh | |
FROM t1 | |
LEFT OUTER JOIN smart_hub_data_catalog.sensor_mappings_parquet m ON t1.loc_id = m.loc_id | |
WHERE t1.loc_id = 'b6a8d42425fde548' | |
AND m.state = t1.state | |
AND m.description = (SELECT m2.description | |
FROM smart_hub_data_catalog.sensor_mappings_parquet m2 | |
WHERE m2.loc_id = 'b6a8d42425fde548' | |
AND m2.id = 's_10')), | |
t3 AS (SELECT substr(r.to, 1, 2) AS rate_period, r.type, r.rate, r.year, r.month, r.state | |
FROM smart_hub_data_catalog.electricity_rates_parquet r | |
WHERE r.year BETWEEN cast(date_format(cast('2019-12-21' AS date), '%Y') AS integer) | |
AND cast(date_format(cast('2019-12-22' AS date), '%Y') AS integer)) | |
SELECT replace(cast(t2.ts AS VARCHAR), concat(' ', t2.tz), '') AS ts, | |
t2.device, | |
t2.location, | |
t3.type, | |
t2.kwh, | |
t3.rate AS cents_per_kwh, | |
round(t2.kwh * t3.rate, 4) AS cost, | |
t2.state, | |
t2.loc_id | |
FROM t2 | |
LEFT OUTER JOIN t3 ON t2.rate_period = t3.rate_period | |
WHERE t3.state = t2.state | |
ORDER BY t2.ts, t2.device; |
Along with enriching the data, the query performs additional data transformation using the other data sources. For example, the Unix timestamp is converted to a localized timestamp containing the date and time, according to the customer’s location (line 7, above). Transforming dates and times is a frequent, often painful, data analysis task. Another example of data enrichment is the augmentation of the data with a new, computed column. The column’s values are calculated using the values of two other columns (line 33, above).
Invoke the Lambda with the following three parameters in the payload (step 3a in workflow diagram).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws lambda invoke \ | |
–function-name athena-complex-etl-query \ | |
–payload "{ \"loc_id\": \"b6a8d42425fde548\", | |
\"date_from\": \"2019-12-21\", \"date_to\": \"2019-12-22\"}" \ | |
response.json |
The ten INSERT INTO
SQL statement’s result statuses (one per device sensor) are visible from the Athena Console’s History tab.
Each Athena query execution saves that query’s results to the S3-based data lake as individual, uncompressed Parquet-format data files. The data is partitioned in the Amazon S3-based data lake by the Smart Meter location ID (e.g., ‘loc_id=b6a8d42425fde548’).
Below is a snippet of the enriched data for a customer’s clothes washer (sensor ‘s_04’). Note the timestamp is now an actual date and time in the local timezone of the customer (e.g., ‘2019-12-21 20:10:00.000’). The sensor ID (‘s_04’) is replaced with the actual device name (‘Clothes Washer’). The location of the device (‘Basement’) and the type of electrical usage period (e.g. ‘peak’ or ‘partial-peak’) has been added. Finally, the cost column has been computed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ts | device | location | type | kwh | cents_per_kwh | cost | state | loc_id | |
---|---|---|---|---|---|---|---|---|---|
2019-12-21 19:40:00.000 | Clothes Washer | Basement | peak | 0.0 | 12.623 | 0.0 | or | b6a8d42425fde548 | |
2019-12-21 19:45:00.000 | Clothes Washer | Basement | peak | 0.0 | 12.623 | 0.0 | or | b6a8d42425fde548 | |
2019-12-21 19:50:00.000 | Clothes Washer | Basement | peak | 0.1501 | 12.623 | 1.8947 | or | b6a8d42425fde548 | |
2019-12-21 19:55:00.000 | Clothes Washer | Basement | peak | 0.1497 | 12.623 | 1.8897 | or | b6a8d42425fde548 | |
2019-12-21 20:00:00.000 | Clothes Washer | Basement | partial-peak | 0.1501 | 7.232 | 1.0855 | or | b6a8d42425fde548 | |
2019-12-21 20:05:00.000 | Clothes Washer | Basement | partial-peak | 0.2248 | 7.232 | 1.6258 | or | b6a8d42425fde548 | |
2019-12-21 20:10:00.000 | Clothes Washer | Basement | partial-peak | 0.2247 | 7.232 | 1.625 | or | b6a8d42425fde548 | |
2019-12-21 20:15:00.000 | Clothes Washer | Basement | partial-peak | 0.2248 | 7.232 | 1.6258 | or | b6a8d42425fde548 | |
2019-12-21 20:20:00.000 | Clothes Washer | Basement | partial-peak | 0.2253 | 7.232 | 1.6294 | or | b6a8d42425fde548 | |
2019-12-21 20:25:00.000 | Clothes Washer | Basement | partial-peak | 0.151 | 7.232 | 1.092 | or | b6a8d42425fde548 |
To transform the enriched CSV-format data to Parquet-format, we need to catalog the CSV-format results using another Crawler, first (step 3d in workflow diagram).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-crawler –name smart-hub-etl-tmp-output-parquet |
Optimizing Enriched Data
The previous step created enriched Parquet-format data. However, this data is not as optimized for query efficiency as it should be. Using the Athena INSERT INTO WITH
SQL statement, allowed the data to be partitioned. However, the method does not allow the Parquet data to be easily combined into larger files and compressed. To perform both these optimizations, we will use one last Lambda, athena-parquet-to-parquet-elt-data/index.py. The Lambda will create a new location in the Amazon S3-based data lake, containing all the enriched data, in a single file and compressed using Snappy compression.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws lambda invoke \ | |
–function-name athena-parquet-to-parquet-elt-data \ | |
response.json |
The resulting Parquet file is visible in the S3 Management Console.
The final step in the Data Enrichment stage is to catalog the optimized Parquet-format enriched ETL data. To catalog the data, run the following Glue Crawler (step 3i in workflow diagram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-crawler –name smart-hub-etl-output-parquet |
Final Data Lake and Data Catalog
We should now have the following ten top-level folders of partitioned data in the S3-based data lake. The ‘tmp’ folder may be ignored.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 ls s3://${DATA_BUCKET}/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PRE electricity_rates_parquet/ | |
PRE electricity_rates_xml/ | |
PRE etl_output_parquet/ | |
PRE etl_tmp_output_parquet/ | |
PRE sensor_mappings_json/ | |
PRE sensor_mappings_parquet/ | |
PRE smart_hub_data_json/ | |
PRE smart_hub_data_parquet/ | |
PRE smart_hub_locations_csv/ | |
PRE smart_hub_locations_parquet/ |
Similarly, we should now have the following ten corresponding tables in the Glue Data Catalog. Use the AWS Glue Console to confirm the tables exist.
Alternately, use the following AWS CLI / jq command to list the table names.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue get-tables \ | |
–database-name smart_hub_data_catalog \ | |
| jq -r '.TableList[].Name' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
electricity_rates_parquet | |
electricity_rates_xml | |
etl_output_parquet | |
etl_tmp_output_parquet | |
sensor_mappings_json | |
sensor_mappings_parquet | |
smart_hub_data_json | |
smart_hub_data_parquet | |
smart_hub_locations_csv | |
smart_hub_locations_parquet |
‘Unknown’ Bug
You may have noticed the four tables created with the AWS Lambda functions, using the CTAS SQL statement, erroneously have the ‘Classification’ of ‘Unknown’ as opposed to ‘parquet’. I am not sure why, I believe it is a possible bug with the CTAS feature. It seems to have no adverse impact on the table’s functionality. However, to fix the issue, run the following set of commands. This aws glue update-table
hack will switch the table’s ‘Classification’ to ‘parquet’.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
database=smart_hub_data_catalog | |
tables=(smart_hub_locations_parquet sensor_mappings_parquet smart_hub_data_parquet etl_output_parquet) | |
for table in ${tables}; do | |
fixed_table=$(aws glue get-table \ | |
–database-name "${database}" \ | |
–name "${table}" \ | |
| jq '.Table.Parameters.classification = "parquet" | del(.Table.DatabaseName) | del(.Table.CreateTime) | del(.Table.UpdateTime) | del(.Table.CreatedBy) | del(.Table.IsRegisteredWithLakeFormation)') | |
fixed_table=$(echo ${fixed_table} | jq .Table) | |
aws glue update-table \ | |
–database-name "${database}" \ | |
–table-input "${fixed_table}" | |
echo "table '${table}' classification changed to 'parquet'" | |
done |
The results of the fix may be seen from the AWS Glue Console. All ten tables are now classified correctly.
Explore the Data
Before starting to visualize and analyze the data with Amazon QuickSight, try executing a few Athena queries against the tables in the Glue Data Catalog database, using the Athena Query Editor. Working in the Editor is the best way to understand the data, learn Athena, and debug SQL statements and queries. The Athena Query Editor has convenient developer features like SQL auto-complete and query formatting capabilities.
Be mindful when writing queries and searching the Internet for SQL references, the Athena query engine is based on Presto 0.172. The current version of Presto, 0.229, is more than 50 releases ahead of the current Athena version. Both Athena and Presto functionality has changed and diverged. There are additional considerations and limitations for SQL queries in Athena to be aware of.
Here are a few simple, ad-hoc queries to run in the Athena Query Editor.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
— preview the final etl data | |
SELECT * | |
FROM smart_hub_data_catalog.etl_output_parquet | |
LIMIT 10; | |
— total cost in $'s for each device, at location 'b6a8d42425fde548' | |
— from high to low, on December 21, 2019 | |
SELECT device, | |
concat('$', cast(cast(sum(cost) / 100 AS decimal(10, 2)) AS varchar)) AS total_cost | |
FROM smart_hub_data_catalog.etl_tmp_output_parquet | |
WHERE loc_id = 'b6a8d42425fde548' | |
AND date (cast(ts AS timestamp)) = date '2019-12-21' | |
GROUP BY device | |
ORDER BY total_cost DESC; | |
— count of smart hub residential locations in Oregon and California, | |
— grouped by zip code, sorted by count | |
SELECT DISTINCT postcode, upper(state), count(postcode) AS smart_hub_count | |
FROM smart_hub_data_catalog.smart_hub_locations_parquet | |
WHERE state IN ('or', 'ca') | |
AND length(cast(postcode AS varchar)) >= 5 | |
GROUP BY state, postcode | |
ORDER BY smart_hub_count DESC, postcode; | |
— electrical usage for the clothes washer | |
— over a 30-minute period, on December 21, 2019 | |
SELECT ts, device, location, type, cost | |
FROM smart_hub_data_catalog.etl_tmp_output_parquet | |
WHERE loc_id = 'b6a8d42425fde548' | |
AND device = 'Clothes Washer' | |
AND cast(ts AS timestamp) | |
BETWEEN timestamp '2019-12-21 08:45:00' | |
AND timestamp '2019-12-21 09:15:00' | |
ORDER BY ts; |
Cleaning Up
You may choose to save the AWS resources created in part one of this demonstration, to be used in part two. Since you are not actively running queries against the data, ongoing AWS costs will be minimal. If you eventually choose to clean up the AWS resources created in part one of this demonstration, execute the following AWS CLI commands. To avoid failures, make sure each command completes before running the subsequent command. You will need to confirm the CloudFormation stacks are deleted using the AWS CloudFormation Console or the AWS CLI. These commands will not remove Amazon QuickSight data sets, analyses, and dashboards created in part two. However, deleting the AWS Glue Data Catalog and the underlying data sources will impact the ability to visualize the data in QuickSight.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# delete s3 contents first | |
aws s3 rm s3://${DATA_BUCKET} –recursive | |
aws s3 rm s3://${SCRIPT_BUCKET} –recursive | |
aws s3 rm s3://${LOG_BUCKET} –recursive | |
# then, delete lambda cfn stack | |
aws cloudformation delete-stack –stack-name smart-hub-lambda-stack | |
# finally, delete athena-glue-s3 stack | |
aws cloudformation delete-stack –stack-name smart-hub-athena-glue-stack |
Part Two
In part one, starting with raw, semi-structured data in multiple formats, we learned how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we will use the transformed and enriched datasets, stored in the data lake, to create compelling visualizations using Amazon QuickSight.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 2
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, DevOps, Python, Software Development on November 30, 2019
Introduction
In Part 1 of this two-part post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on Amazon Elastic MapReduce (EMR). Further, we configured Zeppelin integrations with AWS Glue Data Catalog, Amazon Relational Database Service (RDS) for PostgreSQL, and Amazon Simple Cloud Storage Service (S3) Data Lake. We also covered how to obtain the project’s source code from the two GitHub repositories, zeppelin-emr-demo and zeppelin-emr-config. Below is a high-level architectural diagram of the infrastructure we constructed in Part 1 for this demonstration.
Part 2
In Part 2 of this post, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using a series of four Zeppelin notebooks. Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.
Notebook 1
The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.
Interpreters
When you open a notebook for the first time, you are given the choice of interpreters to bind and unbind to the notebook. The last interpreter in the list shown below, postgres
, is the new PostgreSQL JDBC Zeppelin interpreter we created in Part 1 of this post. We will use this interpreter in Notebook 3.
Application Versions
The first two paragraphs of the notebook are used to confirm the version of Spark, Scala, OpenJDK, and Python we are using. Recall we updated the Spark and Python interpreters to use Python 3.
Helium Visualizations
If you recall from Part 1 of the post, we pre-installed several additional Helium Visualizations, including the Ultimate Pie Chart. Below, we see the use of the Spark SQL (%sql
) interpreter to query a Spark DataFrame, return results, and visualize the data using the Ultimate Pie Chart. In addition to the pie chart, we see the other pre-installed Helium visualizations proceeding the five default visualizations, in the menu bar. With Zeppelin, all we have to do is write Spark SQL queries against the Spark DataFrame created earlier in the notebook, and Zeppelin will handle the visualization. You have some basic controls over charts using the ‘settings’ option.
Building the Data Lake
Notebook 1 demonstrates how to read and write data to S3. We read and write the Bakery dataset to both CSV-format and Apache Parquet-format, using Spark (PySpark). We also write the results of Spark SQL queries, like the one above, in Parquet, to S3.
With Parquet, data may be split into multiple files, as shown in the S3 bucket directory below. Parquet is much faster to read into a Spark DataFrame than CSV. Spark provides support for both reading and writing Parquet files. We will write all of our data to Parquet in S3, making future re-use of the data much more efficient than downloading data from the Internet, like GroupLens or kaggle, or consuming CSV from S3.
Preview S3 Data
In addition to using the Zeppelin notebook, we can preview data right in the S3 bucket web interface using the Amazon S3 Select feature. This query in place feature is helpful to quickly understand the structure and content of new data files with which you want to interact within Zeppelin.
Saving Changes to GitHub
In Part 1, we configured Zeppelin to read and write the notebooks from your own copy of the GitHub notebook repository. Using the ‘version control’ menu item, changes made to the notebooks can be committed directly to GitHub.
In GitHub, note the committer is the zeppelin
user.
Notebook 2
The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.
Multi-Node EMR Cluster
If you recall from Part 1, we waited to create this cluster due to the compute costs of running the cluster’s large EC2 instances. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.
Normalized Instance Hours
Understanding the costs of EMR requires understanding the concept of normalized instance hours. Cluster displayed in the EMR AWS Console contains two columns, ‘Elapsed time’ and ‘Normalized instance hours’. The ‘Elapsed time’ column reflects the actual wall-clock time the cluster was used. The ‘Normalized instance hours’ column indicates the approximate number of compute hours the cluster has used, rounded up to the nearest hour.
Normalized instance hours calculations are based on a normalization factor. The normalization factor ranges from 1 for a small instance, up to 64 for an 8xlarge. Based on the type and quantity of instances in our multi-node cluster, we would use approximately 56 compute hours (aka normalized instance hours) for every one hour of wall-clock time our EMR cluster is running. Note the multi-node cluster used in our demo, highlighted in yellow above. The cluster ran for two hours, which equated to 112 normalized instance hours.
Create the Multi-Node Cluster
Create the multi-node EMR cluster using CloudFormation. Change the following nine variable values, then run the emr cloudformation create-stack
API command, using the AWS CLI.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" EC2_KEY_NAME="your-key-name" LOG_BUCKET="aws-logs-your_aws_account_id-your_region" GITHUB_ACCOUNT="your-account-name" GITHUB_REPO="your-new-project-name" GITHUB_TOKEN="your-token-value" MASTER_INSTANCE_TYPE="m5.xlarge" # optional CORE_INSTANCE_TYPE="m5.2xlarge" # optional CORE_INSTANCE_COUNT=3 # optional aws cloudformation create-stack \ --stack-name zeppelin-emr-prod-stack \ --template-body file://cloudformation/emr_cluster.yml \ --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \ ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \ ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \ ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \ ParameterKey=CoreInstanceType,ParameterValue=${CORE_INSTANCE_TYPE} \ ParameterKey=CoreInstanceCount,ParameterValue=${CORE_INSTANCE_COUNT} \ ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \ ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \ ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}
Use the Amazon EMR web interface to confirm the success of the CloudFormation stack. The fully-provisioned cluster should be in the ‘Waiting’ state when ready.
Configuring the EMR Cluster
Refer to Part 1 for the configuration steps necessary to prepare the EMR cluster and Zeppelin before continuing. Repeat all the steps used for the single-node cluster.
Monitoring with Ganglia
In Part 1, we installed Ganglia as part of creating the EMR cluster. Ganglia, according to its website, is a scalable distributed monitoring system for high-performance computing systems such as clusters and grids. Ganglia can be used to evaluate the performance of the single-node and multi-node EMR clusters. With Ganglia, we can easily view cluster and individual instance CPU, memory, and network I/O performance.
Ganglia Example: Cluster Memory
Ganglia Example: Cluster Network I/O
YARN Resource Manager
The YARN Resource Manager Web UI is also available on our EMR cluster. Using the Resource Manager, we can view the compute resource load on the cluster, as well as the individual EMR Core nodes. Below, we see that the multi-node cluster has 24 vCPUs and 72 GiB of memory available, split evenly across the three Core cluster nodes.
You might recall, the m5.2xlarge EC2 instance type, used for the three Core nodes, each contains 8 vCPUs and 32 GiB of memory. However, by default, although all 8 vCPUs are available for computation per node, only 24 GiB of the node’s 32 GiB of memory are available for computation. EMR ensures a portion of the memory on each node is reserved for other system processes. The maximum available memory is controlled by the YARN memory configuration option, yarn.scheduler.maximum-allocation-mb
.
The YARN Resource Manager preview above shows the load on the Code nodes as Notebook 2 is executing the Spark SQL queries on the large MovieLens with 27MM ratings. Note that only 4 of the 24 vCPUs (16.6%) are in use, but that 70.25 of the 72 GiB (97.6%) of available memory is being used. According to Spark, because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth. In this case, memory appears to be the most constrained resource. Using memory-optimized instances, such as r4 or r5 instance types, might be more effective for the core nodes than the m5 instance types.
MovieLens Datasets
By changing one variable in the notebook, we can work with the latest, smaller GroupLens MovieLens dataset containing approximately 100k rows (ml-latest-small
) or the larger dataset, containing approximately 27M rows (ml-latest
). For this demo, try both datasets on both the single-node and multi-node clusters. Compare the Spark SQL paragraph execution times for each of the four variations, including single-node with the small dataset, single-node with the large dataset, multi-node with the small dataset, and multi-node with the large dataset. Observe how fast the SQL queries are executed on the single-node versus multi-node cluster. Try switching to a different Core node instance type, such as r5.2xlarge. Try creating a cluster with additional Core nodes. How is the compute time effected?
Terminate the multi-node EMR cluster to save yourself the expense before continuing to Notebook 3.
aws cloudformation delete-stack \ --stack-name=zeppelin-emr-prod-stack
Notebook 3
The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.
Glue Crawlers
Before continuing with Notebook 3, run the two Glue Crawlers using the AWS CLI.
aws glue start-crawler --name bakery-transactions-crawler aws glue start-crawler --name movie-ratings-crawler
The two Crawlers will create a total of seven tables in the Glue Data Catalog database.
If we examine the Glue Data Catalog database, we should now observe several tables, one for each dataset found in the S3 bucket. The location of each dataset is shown in the ‘Location’ column of the tables view.
From the Zeppelin notebook, we can even use Spark SQL to query the AWS Glue Data Catalog, itself, for its databases and the tables within them.
According to Amazon, the Glue Data Catalog tables and databases are containers for the metadata definitions that define a schema for underlying source data. Using Zeppelin’s SQL interpreter, we can query the Data Catalog database and return the underlying source data. The SQL query example, below, demonstrates how we can perform a join across two tables in the data catalog database, representing two different data sources, and return results.
Notebook 4
The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.
First, we create a new schema and four related tables for the RDS PostgreSQL movie ratings database, using the Psycopg 2 PostgreSQL adapter for Python and the SQL file we copied to S3 in Part 1.
The RDS database’s schema, shown below, approximates the schema of the four CSV files from the GroupLens MovieLens rating dataset we used in Notebook 2.
Since the schema of the PostgreSQL database matches the MovieLens dataset files, we can import the data from the CVS files, downloaded from GroupLens, directly into the RDS database, again using the Psycopg PostgreSQL adapter for Python.
According to the Spark documentation, Spark SQL also includes a data source that can read data from other databases using JDBC. Using Spark’s JDBC capability and the PostgreSQL JDBC Driver we installed in Part 1, we can perform Spark SQL queries against the RDS database using PySpark (%spark.pyspark
). Below, we see a paragraph example of reading the RDS database’s movies
table, using Spark.
As a third method of querying the RDS database, we can use the custom Zeppelin PostgreSQL JDBC interpreter (%postgres
) we created in Part 1. Although the default driver of the JDBC interpreter is set as PostgreSQL, and the associated JAR is included with Zeppelin, we overrode that older JAR, with the latest PostgreSQL JDBC Driver JAR.
Using the %postgres
interpreter, we query the RDS database’s public
schema, and return the four database tables we created earlier in the notebook.
Again, below, using the %postgres
interpreter in the notebook’s paragraph, we query the RDS database and return data, which we then visualize using Zeppelin’s bar chart. Finally, note the use of Zeppelin Dynamic Forms in this example. Dynamic Forms allows Zeppelin to dynamically creates input forms, whose input values are then available to use programmatically. Here, we use two form input values to control the data returned from our query and the resulting visualization.
Conclusion
In this two-part post, we learned how effectively Apache Zeppelin integrates with Amazon EMR. We also learned how to extend Zeppelin’s capabilities, using AWS Glue, Amazon RDS, and Amazon S3 as a Data Lake. Beyond what was covered in this post, there are dozens of more Zeppelin and EMR features, as well as dozens of more AWS services that integrate with Zeppelin and EMR, for you to discover.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 1
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, DevOps, Python, Software Development on November 22, 2019
Introduction
There is little question big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last 3–5 years. Behind the hype cycles and marketing buzz, these technologies are having a significant influence on many aspects of our modern lives. Due to their popularity, commercial enterprises, academic institutions, and the public sector have all rushed to develop hardware and software solutions to decrease the barrier to entry and increase the velocity of ML and Data Scientists and Engineers.
Data Science: 5-Year Search Trend (courtesy Google Trends)
Machine Learning: 5-Year Search Trend (courtesy Google Trends)
Technologies
All three major cloud providers, Amazon Web Services (AWS), Microsoft Azure, and Google Cloud, have rapidly maturing big data analytics, data science, and AI and ML services. AWS, for example, introduced Amazon Elastic MapReduce (EMR) in 2009, primarily as an Apache Hadoop-based big data processing service. Since then, according to Amazon, EMR has evolved into a service that uses Apache Spark, Apache Hadoop, and several other leading open-source frameworks to quickly and cost-effectively process and analyze vast amounts of data. More recently, in late 2017, Amazon released SageMaker, a service that provides the ability to build, train, and deploy machine learning models quickly and securely.
Simultaneously, organizations are building solutions that integrate and enhance these Cloud-based big data analytics, data science, AI, and ML services. One such example is Apache Zeppelin. Similar to the immensely popular Project Jupyter and the newly open-sourced Netflix’s Polynote, Apache Zeppelin is a web-based, polyglot, computational notebook. Zeppelin enables data-driven, interactive data analytics and document collaboration using a number of interpreters such as Scala (with Apache Spark), Python (with Apache Spark), Spark SQL, JDBC, Markdown, Shell and so on. Zeppelin is one of the core applications supported natively by Amazon EMR.
In the following two-part post, we will explore the use of Apache Zeppelin on EMR for data science and data analytics using a series of Zeppelin notebooks. The notebooks feature the use of AWS Glue, the fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load data for analytics. The notebooks also feature the use of Amazon Relational Database Service (RDS) for PostgreSQL and Amazon Simple Cloud Storage Service (S3). Amazon S3 will serve as a Data Lake to store our unstructured data. Given the current choice of Zeppelin’s more than twenty different interpreters, we will use Python3 and Apache Spark, specifically Spark SQL and PySpark, for all notebooks.
We will build an economical single-node EMR cluster for data exploration, as well as a larger multi-node EMR cluster for analyzing large data sets. Amazon S3 will be used to store input and output data, while intermediate results are stored in the Hadoop Distributed File System (HDFS) on the EMR cluster. Amazon provides a good overview of EMR architecture. Below is a high-level architectural diagram of the infrastructure we will construct during Part 1 for this demonstration.
Notebook Features
Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin, including Moonsoo Lee, Zepl CTO and creator for Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.
Notebook 1
The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.
Notebook 2
The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.
Notebook 3
The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.
Notebook 4
The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.
Demonstration
In Part 1 of the post, as a DataOps Engineer, we will create and configure the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3-based data lake. In Part 2 of this post, as a Data Scientist, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the Zeppelin notebooks.
Source Code
The demonstration’s source code is contained in two public GitHub repositories. The first repository, zeppelin-emr-demo, includes the four Zeppelin notebooks, organized according to the conventions of Zeppelin’s pluggable notebook storage mechanisms.
. ├── 2ERVVKTCG │ └── note.json ├── 2ERYY923A │ └── note.json ├── 2ESH8DGFS │ └── note.json ├── 2EUZKQXX7 │ └── note.json ├── LICENSE └── README.md
Zeppelin GitHub Storage
During the demonstration, changes made to your copy of the Zeppelin notebooks running on EMR will be automatically pushed back to GitHub when a commit occurs. To accomplish this, instead of just cloning a local copy of my zeppelin-emr-demo project repository, you will want your own copy, within your personal GitHub account. You could folk my zeppelin-emr-demo GitHub repository or copy a clone into your own GitHub repository.
To make a copy of the project in your own GitHub account, first, create a new empty repository on GitHub, for example, ‘my-zeppelin-emr-demo-copy’. Then, execute the following commands from your terminal, to clone the original project repository to your local environment, and finally, push it to your GitHub account.
# change me GITHUB_ACCOUNT="your-account-name" # i.e. garystafford GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo-copy # shallow clone into new directory git clone --branch master \ --single-branch --depth 1 --no-tags \ https://github.com/garystafford/zeppelin-emr-demo.git \ ${GITHUB_REPO} # re-initialize repository cd ${GITHUB_REPO} rm -rf .git git init # re-commit code git add -A git commit -m "Initial commit of my copy of zeppelin-emr-demo" # push to your repo git remote add origin \ https://github.com/$GITHUB_ACCOUNT/$GITHUB_REPO.git git push -u origin master
GitHub Personal Access Token
To automatically push changes to your GitHub repository when a commit occurs, Zeppelin will need a GitHub personal access token. Create a personal access token with the scope shown below. Be sure to keep the token secret. Make sure you do not accidentally check your token value into your source code on GitHub. To minimize the risk, change or delete the token after completing the demo.
The second repository, zeppelin-emr-config, contains the necessary bootstrap files, CloudFormation templates, and PostgreSQL DDL (Data Definition Language) SQL script.
. ├── LICENSE ├── README.md ├── bootstrap │ ├── bootstrap.sh │ ├── emr-config.json │ ├── helium.json ├── cloudformation │ ├── crawler.yml │ ├── emr_single_node.yml │ ├── emr_cluster.yml │ └── rds_postgres.yml └── sql └── ratings.sql
Use the following AWS CLI command to clone the GitHub repository to your local environment.
git clone --branch master \ --single-branch --depth 1 --no-tags \ https://github.com/garystafford/zeppelin-emr-demo-setup.git
Requirements
To follow along with the demonstration, you will need an AWS Account, an existing Amazon S3 bucket to store EMR configuration and data, and an EC2 key pair. You will also need a current version of the AWS CLI installed in your work environment. Due to the particular EMR features, we will be using, I recommend using the us-east-1
AWS Region to create the demonstration’s resources.
# create secure emr config and data bucket # change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" aws s3api create-bucket \ --bucket ${ZEPPELIN_DEMO_BUCKET} aws s3api put-public-access-block \ --bucket ${ZEPPELIN_DEMO_BUCKET} \ --public-access-block-configuration \ BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true
Copy Configuration Files to S3
To start, we need to copy three configuration files, bootstrap.sh, helium.json, and ratings.sql, from the zeppelin-emr-demo-setup
project directory to our S3 bucket. Change the ZEPPELIN_DEMO_BUCKET
variable value, then run the following s3 cp
API command, using the AWS CLI. The three files will be copied to a bootstrap directory within your S3 bucket.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" aws s3 cp bootstrap/bootstrap.sh s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/ aws s3 cp bootstrap/helium.json s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/ aws s3 cp sql/ratings.sql s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/
Below, sample output from copying local files to S3.
Create AWS Resources
We will start by creating most of the required AWS resources for the demonstration using three CloudFormation templates. We will create a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, an AWS Glue Data Catalog database, two AWS Glue Crawlers, and a Glue IAM Role. We will wait to create the multi-node EMR cluster due to the compute costs of running large EC2 instances in the cluster. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.
Single-Node EMR Cluster
We will start by creating the single-node Amazon EMR cluster, consisting of just one master node with no core or task nodes (a cluster of one). All operations will take place on the master node.
Default EMR Resources
The following EMR instructions assume you have already created at least one EMR cluster in the past, in your current AWS Region, using the EMR web interface with the ‘Create Cluster – Quick Options’ option. Creating a cluster this way creates several additional AWS resources, such as the EMR_EC2_DefaultRole
EC2 instance profile, the default EMR_DefaultRole
EMR IAM Role, and the default EMR S3 log bucket.
If you haven’t created any EMR clusters using the EMR ‘Create Cluster – Quick Options’ in the past, don’t worry, you can also create the required resources with a few quick AWS CLI commands. Change the following LOG_BUCKET
variable value, then run the aws emr
and aws s3api
API commands, using the AWS CLI. The LOG_BUCKET
variable value follows the convention of aws-logs-awsaccount-region
. For example, aws-logs-012345678901-us-east-1
.
# create emr roles aws emr create-default-roles # create log secure bucket # change me LOG_BUCKET="aws-logs-your_aws_account_id-your_region" aws s3api create-bucket --bucket ${LOG_BUCKET} aws s3api put-public-access-block --bucket ${LOG_BUCKET} \ --public-access-block-configuration \ BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true
The new EMR IAM Roles can be viewed in the IAM Roles web interface.
Often, I see tutorials that reference these default EMR resources from the AWS CLI or CloudFormation, without any understanding or explanation of how they are created.
EMR Bootstrap Script
As part of creating our EMR cluster, the CloudFormation template, emr_single_node.yml, will call the bootstrap script we copied to S3, earlier, bootstrap.sh. The bootstrap script pre-installs required Python and Linux software packages, and the PostgreSQL driver JAR. The bootstrap script also clones your copy of the zeppelin-emr-demo GitHub repository.
#!/bin/bash set -ex if [[ $# -ne 2 ]] ; then echo "Script requires two arguments" exit 1 fi GITHUB_ACCOUNT=$1 GITHUB_REPO=$2 # install extra python packages sudo python3 -m pip install psycopg2-binary boto3 # install extra linux packages yes | sudo yum install git htop # clone github repo cd /tmp git clone "https://github.com/${GITHUB_ACCOUNT}/${GITHUB_REPO}.git" # install extra jars POSTGRES_JAR="postgresql-42.2.8.jar" wget -nv "https://jdbc.postgresql.org/download/${POSTGRES_JAR}" sudo chown -R hadoop:hadoop ${POSTGRES_JAR} mkdir -p /home/hadoop/extrajars/ cp ${POSTGRES_JAR} /home/hadoop/extrajars/
EMR Application Configuration
The EMR CloudFormation template will also modify the EMR cluster’s Spark and Zeppelin application configurations. Amongst other configuration properties, the template sets the default Python version to Python3, instructs Zeppelin to use the cloned GitHub notebook directory path, and adds the PostgreSQL Driver JAR to the JVM ClassPath. Below we can see the configuration properties applied to an existing EMR cluster.

EMR Application Versions
As of the date of this post, EMR is at version 5.28.0. Below, as shown in the EMR web interface, are the current (21) applications and frameworks available for installation on EMR.
For this demo, we will install Apache Spark v2.4.4, Ganglia v3.7.2, and Zeppelin 0.8.2.
Apache Zeppelin: Web Interface
Apache Spark: DAG Visualization
Ganglia: Cluster CPU Monitoring
Create the EMR CloudFormation Stack
Change the following (7) variable values, then run the emr cloudformation create-stack
API command, using the AWS CLI.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" EC2_KEY_NAME="your-key-name" LOG_BUCKET="aws-logs-your_aws_account_id-your_region" GITHUB_ACCOUNT="your-account-name" # i.e. garystafford GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo GITHUB_TOKEN="your-token-value" MASTER_INSTANCE_TYPE="m5.xlarge" # optional aws cloudformation create-stack \ --stack-name zeppelin-emr-dev-stack \ --template-body file://cloudformation/emr_single_node.yml \ --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \ ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \ ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \ ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \ ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \ ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \ ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}
You can use the Amazon EMR web interface to confirm the results of the CloudFormation stack. The cluster should be in the ‘Waiting’ state.
PostgreSQL on Amazon RDS
Next, create a simple, single-AZ, single-master, non-replicated Amazon RDS PostgreSQL database, using the included CloudFormation template, rds_postgres.yml. We will use this database in Notebook 4. For the demo, I have selected the current-generation general purpose db.m4.large
EC2 instance type to run PostgreSQL. You can easily change the instance type to another RDS-supported instance type to suit your own needs.
Change the following (3) variable values, then run the cloudformation create-stack
API command, using the AWS CLI.
# change me DB_MASTER_USER="your-db-username" # i.e. masteruser DB_MASTER_PASSWORD="your-db-password" # i.e. 5up3r53cr3tPa55w0rd MASTER_INSTANCE_TYPE="db.m4.large" # optional aws cloudformation create-stack \ --stack-name zeppelin-rds-stack \ --template-body file://cloudformation/rds_postgres.yml \ --parameters ParameterKey=DBUser,ParameterValue=${DB_MASTER_USER} \ ParameterKey=DBPassword,ParameterValue=${DB_MASTER_PASSWORD} \ ParameterKey=DBInstanceClass,ParameterValue=${MASTER_INSTANCE_TYPE}
You can use the Amazon RDS web interface to confirm the results of the CloudFormation stack.
AWS Glue
Next, create the AWS Glue Data Catalog database, the Apache Hive-compatible metastore for Spark SQL, two AWS Glue Crawlers, and a Glue IAM Role (ZeppelinDemoCrawlerRole
), using the included CloudFormation template, crawler.yml. The AWS Glue Data Catalog database will be used in Notebook 3.
Change the following variable value, then run the cloudformation create-stack
API command, using the AWS CLI.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" aws cloudformation create-stack \ --stack-name zeppelin-crawlers-stack \ --template-body file://cloudformation/crawler.yml \ --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \ --capabilities CAPABILITY_NAMED_IAM
You can use the AWS Glue web interface to confirm the results of the CloudFormation stack. Note the Data Catalog database and the two Glue Crawlers. We will not run the two crawlers until Part 2 of the post, so no tables will exist in the Data Catalog database, yet.
At this point in the demonstration, you should have successfully created a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, and several AWS Glue resources, all using CloudFormation templates.
Post-EMR Creation Configuration
RDS Security
For the new EMR cluster to communicate with the RDS PostgreSQL database, we need to ensure that port 5432 is open from the RDS database’s VPC security group, which is the default VPC security group, to the security groups of the EMR nodes. Obtain the Group ID of the ElasticMapReduce-master
and ElasticMapReduce-slave
Security Groups from the EMR web interface.
Access the Security Group for the RDS database using the RDS web interface. Change the inbound rule for port 5432 to include both Security Group IDs.
SSH to EMR Master Node
In addition to the bootstrap script and configurations, we already applied to the EMR cluster, we need to make several post-EMR creation configuration changes to the EMR cluster for our demonstration. These changes will require SSH’ing to the EMR cluster. Using the master node’s public DNS address and SSH command provided in the EMR web console, SSH into the master node.
If you cannot access the node using SSH, check that port 22 is open on the associated EMR master node IAM Security Group (ElasticMapReduce-master
) to your IP address or address range.
Git Permissions
We need to change permissions on the git repository we installed during the EMR bootstrapping phase. Typically, with an EC2 instance, you perform operations as the ec2-user
user. With Amazon EMR, you often perform actions as the hadoop
user. With Zeppelin on EMR, the notebooks perform operations, including interacting with the git repository as the zeppelin
user. As a result of the bootstrap.sh script, the contents of the git repository directory, /tmp/zeppelin-emr-demo/
, are owned by the hadoop
user and group by default.
We will change their owner to the zeppelin
user and group. We could not perform this step as part of the bootstrap script since the the zeppelin
user and group did not exist at the time the script was executed.
cd /tmp/zeppelin-emr-demo/ sudo chown -R zeppelin:zeppelin .
The results should look similar to the following output.
Pre-Install Visualization Packages
Next, we will pre-install several Apache Zeppelin Visualization packages. According to the Zeppelin website, an Apache Zeppelin Visualization is a pluggable package that can be loaded/unloaded on runtime through the Helium framework in Zeppelin. We can use them just like any other built-in visualization in the notebook. A Visualization is a javascript npm package. For example, here is a link to the ultimate-pie-chart on the public npm registry.
We can pre-load plugins by replacing the /usr/lib/zeppelin/conf/helium.json
file with the version of helium.json we copied to S3, earlier, and restarting Zeppelin. If you have a lot of Visualizations or package types or use any DataOps automation to create EMR clusters, this approach is much more efficient and repeatable than manually loading plugins using the Zeppelin UI, each time you create a new EMR cluster. Below, the helium.json
file, which pre-loads (8) Visualization packages.
{ "enabled": { "ultimate-pie-chart": "ultimate-pie-chart@0.0.2", "ultimate-column-chart": "ultimate-column-chart@0.0.2", "ultimate-scatter-chart": "ultimate-scatter-chart@0.0.2", "ultimate-range-chart": "ultimate-range-chart@0.0.2", "ultimate-area-chart": "ultimate-area-chart@0.0.1", "ultimate-line-chart": "ultimate-line-chart@0.0.1", "zeppelin-bubblechart": "zeppelin-bubblechart@0.0.4", "zeppelin-highcharts-scatterplot": "zeppelin-highcharts-scatterplot@0.0.2" }, "packageConfig": {}, "bundleDisplayOrder": [ "ultimate-pie-chart", "ultimate-column-chart", "ultimate-scatter-chart", "ultimate-range-chart", "ultimate-area-chart", "ultimate-line-chart", "zeppelin-bubblechart", "zeppelin-highcharts-scatterplot" ] }
Run the following commands to load the plugins and adjust the permissions on the file.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" sudo aws s3 cp s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/helium.json \ /usr/lib/zeppelin/conf/helium.json sudo chown zeppelin:zeppelin /usr/lib/zeppelin/conf/helium.json
Create New JDBC Interpreter
Lastly, we need to create a new Zeppelin JDBC Interpreter to connect to our RDS database. By default, Zeppelin has several interpreters installed. You can review a list of available interpreters using the following command.
sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --list
The new JDBC interpreter will allow us to connect to our RDS PostgreSQL database, using Java Database Connectivity (JDBC). First, ensure all available interpreters are installed, including the current Zeppelin JDBC driver (org.apache.zeppelin:zeppelin-jdbc:0.8.0
) to /usr/lib/zeppelin/interpreter/jdbc
.
Creating a new interpreter is a two-part process. In this stage, we install the required interpreter files on the master node using the following command. Then later, in the Zeppelin web interface, we will configure the new PostgreSQL JDBC interpreter. Note we must provide a unique name for the interpreter (I chose ‘postgres’ in this case), which we will refer to in part two of the interpreter creation process.
sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --all sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh \ --name "postgres" \ --artifact org.apache.zeppelin:zeppelin-jdbc:0.8.0
To complete the post-EMR creation configuration on the master node, we must restart Zeppelin for our changes to take effect.
sudo stop zeppelin && sudo start zeppelin
In my experience, it could take 2–3 minutes for the Zeppelin UI to become fully responsive after a restart.
Zeppelin Web Interface Access
With all the EMR application configuration complete, we will access the Zeppelin web interface running on the master node. Use the Zeppelin connection information provided in the EMR web interface to setup SSH tunneling to the Zeppelin web interface, running on the master node. Using this method, we can also access the Spark History Server, Ganglia, and Hadoop Resource Manager web interfaces; all links are provided from EMR.
To set up a web connection to the applications installed on the EMR cluster, I am using FoxyProxy as a proxy management tool with Google Chrome.
If everything is working so far, you should see the Zeppelin web interface with all four Zeppelin notebooks available from the cloned GitHub repository. You will be logged in as the anonymous
user. Zeppelin offers authentication for accessing notebooks on the EMR cluster. For brevity, we will not cover setting up authentication in Zeppelin, using Shiro Authentication.
To confirm the path to the local, cloned copy of the GitHub notebook repository, is correct, check the Notebook Repos interface, accessible under the Settings dropdown (anonymous
user) in the upper right of the screen. The value should match the ZEPPELIN_NOTEBOOK_DIR
configuration property value in the emr_single_node.yml CloudFormation template we executed earlier.
To confirm the Helium Visualizations were pre-installed correctly, using the helium.json file, open the Helium interface, accessible under the Settings dropdown (anonymous
user) in the upper right of the screen.
Note the enabled visualizations. And, it is easy to enable additional plugins through the web interface.
New PostgreSQL JDBC Interpreter
If you recall, earlier, we install the required interpreter files on the master node using the following command using the bootstrap script. We will now complete the process of configuring the new PostgreSQL JDBC interpreter. Open the Interpreter interface, accessible under the Settings dropdown (anonymous
user) in the upper right of the screen.
The title of the new interpreter must match the name we used to install the interpreter files, ‘postgres’. The interpreter group will be ‘jdbc’. There are, minimally, three properties we need to configure for your specific RDS database instance, including default.url
, default.user
, and default.password
. These should match the values you used to create your RDS instance, earlier. Make sure to includes the database name in the default.url
. An example is shown below.
default.url: jdbc:postgresql://zeppelin-demo.abcd1234efg56.us-east-1.rds.amazonaws.com:5432/ratings default.user: masteruser default.password: 5up3r53cr3tPa55w0rd
We also need to provide a path to the PostgreSQL driver JAR dependency. This path is the location where we placed the JAR using the bootstrap.sh script, earlier, /home/hadoop/extrajars/postgresql-42.2.8.jar
. Save the new interpreter and make sure it starts successfully (shows a green icon).
Switch Interpreters to Python 3
The last thing we need to do is change the Spark and Python interpreters to use Python 3 instead of the default Python 2. On the same screen you used to create a new interpreter, modify the Spark and Python interpreters. First, for the Python interpreter, change the zeppelin.python
property to python3
.
Next, for the Spark interpreter, change the zeppelin.pyspark.python
property to python3
.
Part 2
In Part 1 of this post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3 data lake. In Part 2 of this post, we will explore some of Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the notebooks.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.