Archive for category Technology Consulting

Evolving Models for ISV Software Delivery, Management, and Support

Understanding evolving models used by Independent Software Vendors for cloud-based software delivery, management, and support

Copyright: melpomen (123rf.com)

Introduction

As a Consultant, Enterprise Architect, Partner Solutions Architect, and Senior Solutions Architect, I have had the chance to work with many successful Independent Software Vendors (ISVs), from early-stage startups to large established enterprises. Based on my experience, I wrote two AWS Partner Network (APN) Blog posts: Architecting Successful SaaS: Understanding Cloud-Based Software-as-a-Service Models and Architecting Successful SaaS: Interacting with Your SaaS Customer’s Cloud Accounts. Continuing with that series, this post explores several existing and evolving models used by ISV’s to deliver, manage, and support their software product to cloud-based customers.

Independent Software Vendors

An ISV, also known as a software publisher, specializes in making and selling software designed for mass or niche markets. This is in contrast to in-house software, which the organization develops for its internal use, or custom software designed for a single, specific third party. Although end-users consume ISV-provided software, it remains the property of the vendor (source: Wikipedia).

The ISV industry, especially SaaS-based products, has seen huge year-over-year (YOY) growth. VC firms continue to fuel industry growth (and valuations) with an unprecedentedly high level of capital investment throughout 2021. According to SaaS Industry, the total investment for Q1-2021 stood at $9.9B. B2B data industry resource, Datamation, examines prominent ISVs in their article, Top 75 SaaS Companies of 2022. SaaS management company, Cledara, produced a similar piece, The Top SaaS Companies in 2021.

Online Marketplaces

Cloud-based ISV software products are purchased directly from the vendor, or more recently, through marketplaces hosted by major cloud providers. In their Predicts 2022: SaaS Dominates Software Contracting by 2026 — and So Do Risks, Gartner observes, “Online marketplaces have become more prevalent (e.g., Amazon Web Services [AWS], Google, etc.). With easy access to these marketplaces, customers can and are purchasing marketplace products without the need to engage the software vendor directly or interact with sourcing or procurement within their organizations.” Examples of marketplaces include AWS Marketplace, Azure Marketplace, Google Cloud Marketplace, Salesforce AppExchange, and Oracle Cloud Marketplace.

Major Cloud Providers’ approximate market share, according to Statista and Canalys

AWS Marketplace, for example, describes itself as “a curated digital catalog that makes it easy for organizations to discover, procure, entitle, provision, and govern third-party software.” Company tackle.io, whose platform facilitates the process of listing, selling, and managing cloud marketplaces for ISVs, produced a report, State of Cloud Marketplaces 2021, detailing the leading cloud software sales and delivery platforms.

Purpose-built Products

Based on my observations, most ISV products can be classified as either purpose-built or general-purpose. Purpose-built ISV products are designed to address a specific customer need. Many are considered enterprise software, also known as Enterprise Application Software (EAS). Enterprise software includes Customer Relationship Management (CRM), Management Information Systems (MIS), Enterprise Resource Planning (ERP), Human Resource Management (HRM or HRIS), Content Management Systems (CMS), Learning Management Systems (LMS), Field Service Management (FSM), Knowledge Management Systems (KMS), Talent Management Systems (TMS), and Applicant Tracking Systems (ATS).

General-purpose Products

General-purpose ISV products often focus on a certain technology, such as security, identity management, databases, analytics, storage, AI/ML, and virtual desktops. These products are frequently used by customers as one part of a larger solution. Many of these products are hosted ‘as-a-Service,’ such as Database as a Service (DBaaS), Data Warehousing as a Service (DWaaS), Monitoring as a Service (MaaS), Analytics as a Service (AaaS), Machine Learning-as-a-Service (MLaaS), Identity-as-a-Service (IaaS), Desktop as a Service (DaaS), and Storage as a Service (STaaS).

Examining the current 19,919 listings in the AWS Marketplace, by general category, we can see a mix of purpose-built (e.g., Business Applications, Industries) and general-purpose ISV products (e.g., DevOps, ML, IoT, Data, Infrastructure).

AWS Marketplace product by category (January 2022)

Below are all the categories of ISV products and services found on the AWS Marketplace.

AWS Marketplace product categories (January 2022)

Similarly, looking at the current 5,008 listings in the Google Cloud Marketplace by category, we can see both purpose-built and general-purpose ISV products.

Google Cloud Marketplace products by category (January 2022)

SaaS-as-a-Service

There is even an established market for SaaS-as-a-Service (SaaSaaS) — products and platforms designed to enable ISVs and SaaS providers. These products and platforms are designed to help overcome the inherent engineering complexities required to prepare, deliver, manage, bill, and support ISV products. Examples include services such as AWS SaaS Factory Program, AWS SaaS Boost, and Azure SaaS Development Kit (ASDK), as well as vendors, like tackle.io and AppDirect.

Current ISV Models

As the organizations continue to move their IT infrastructure and workloads to cloud providers such as Amazon Web Services (AWS), Google Cloud, and Microsoft Azure, ISVs have had to evolve how they distribute, manage, and support their software products. Today, most ISVs use a variation of one of three models: Customer-deployed (aka self-hosted), Software as a Service (SaaS), and SaaS with Remote Agents.

These methods are evident from looking at the current listings in the AWS Marketplace by delivery method. Of the 14,444 products, 11.3% are categorized as SaaS. Many of the remaining delivery methods could be classified as Customer-deployed products. The most significant percentage of products are delivered as Amazon Machine Images (AMI). Custom-built VM images were traditionally the most common delivery forms. However, newer technologies, such as Container Images, Helm Charts, Data Exchange (Datasets), and SageMaker (ML) Algorithms and Models are quickly growing in popularity. Data Exchange products, for example, have doubled in 18 months.

AWS Marketplace products by delivery method (January 2022)

Customer-deployed Model

In a Customer-deployed ISV product model, the customer deploys the ISV’s software product into their own Cloud environment. The ISV’s product is packaged as virtual machine images, such as Amazon Machine Images (AMIs), Docker container images, Helm Charts, licensed datasets, machine learning models, and infrastructure as code (IaC) files, such as Amazon CloudFormation Templates.

Customer-deployed (aka self-hosted) model

With Customer-deployed products, it is not required but also not uncommon for the ISV to have some connection to the customer’s cloud environment for break-the-glass (BTG) support, remote monitoring, or license management purposes.

Software as a Service (SaaS)

According to Wikipedia, SaaS is a software licensing and delivery model in which software is licensed on a subscription basis and is centrally hosted within the ISV’s cloud environment. SaaS is one of the three best-known cloud computing models, along with Platform as a Service (PaaS) and Infrastructure as a Service (IaaS).

Software as a Service (SaaS) model

With SaaS, the customer’s data can remain in the customer’s cloud environment. A secure connection, such as an Open Database Connectivity (ODBC) or Java Database Connectivity (JDBC) connection, can be made to the customer’s datasources. Alternately, the customer’s data is securely copied in advance or just-in-time (JIT) to dedicated storage within the ISV’s cloud environment. Using caching technologies, such as RubiX, Databricks Delta caching, and Apache Spark caching, data can be cached as needed. Some caching technologies, such as Alluxio, even offer tiered caching based on the frequency it is accessed — hot, warm, or cold.

SaaS with Remote Agents Model

The SaaS with Remote Agents model is a variation of the pure SaaS model. In this scenario, the customer deploys ISV-supplied software agents within their cloud, on-premise, and edge (IoT) environments. Software agents can be language-specific libraries or modules added to an application, sidecar containers, serverless functions, or stand-alone VMs. These agents collect data, pre-optimized payloads, and push data back to the ISV’s cloud environment. The prototypical example of this model is monitoring/observability and Application Performance Monitoring (APM) vendors. They often use agents to collect and aggregate a customer’s telemetry (logs, metrics, events, traces) to the ISV’s external cloud environment. The ISV’s cloud environment acts as a centralized, single pane of glass for the customer to view their aggregated telemetry.

SaaS with Remote Agents model

Some cloud providers offer products designed specifically to make a customer’s integration with SaaS products easier. With Amazon EventBridge, for example, you can “easily connect to and stream data from your SaaS applications without having to write any code.” Amazon EventBridge has established integrations with dozens of SaaS partners, including Auth0, DataDog, MongoDB, New Relic, Opsgenie, PagerDuty, Shopify, and Zendesk.

Evolving ISV Models

Remotely-managed Model

In addition to the customer-deployed and SaaS models, some ISVs have developed new models for offering their software products. One such model is what I refer to as the Remotely-managed model. This hybrid model combines the best aspects of both the Customer-deployed and SaaS models. They are designed to address common customer concerns, such as security, speed, ease of use, and cost.

Remotely-managed model

With the Remotely-managed model, the ISV’s product is administered by the customer through a user interface (UI) hosted in the ISV’s cloud environment. The administrative actions of the customer are translated into commands, which are executed in the customer’s cloud environment. These remote commands are communicated using API calls or bi-directional message queues such as EventBridge. Often, the customer grants the ISV programmatic access to their environment. The ISVs access is limited to a fine-grain set of permissions, based on the principle of least privilege (PoLP), to deploy and manage their product, usually isolated within a separate customer account or Virtual Private Cloud (VPC).

Deploying the ISV’s product to the customer’s environment adjacent to the data maximizes security by eliminating data movement external to the customer’s cloud environment. Instead, computations are done adjacent to data within the customer’s environment.

SaaS Façade Model

Recently, I have been developing some architectural thinking around a newer model that I call the SaaS Façade model. A façade or facade is generally the front part or exterior of a building. In software design, a facade is an object that serves as a front-facing interface masking more complex underlying or structural code (source: Wikipedia).

SaaS Façade model

The SaaS Façade model is a variation of the Remotely-managed model. Although architecturally more complex than the Remotely-managed model, the SaaS Façade model is simpler from a customer perspective. Both the customer’s administrators and end-users access the software product through the ISV’s cloud environment, but there is little to no data movement from the customer’s environment.

Separating Front-end from Back-end

The ISV’s product architecture is the most significant difference between the SaaS Façade model and the Remotely-managed model. Most modern software products are composed of multiple, decoupled components or tiers, including front-end/UI/presentation layer, back-end/services, and data. In the SaaS Façade model, the customer’s end-users access the ISV’s product through the ISV’s cloud environment, similar to SaaS. The ISV’s front-end is deployed to the ISV’s cloud environment. The ISV’s product’s back-end is deployed to the customer’s cloud environment, adjacent to the customer’s data. The ISV product’s data tier is deployed to either the ISV’s or customer’s cloud environment, depending on the product’s exact architectural requirements. This model requires a highly decoupled architecture and tolerance for moderate latency.

Decoupled User Management

A frequent request from customers of ISV software concerns user management. Customers want to allow approved external users to access read-only data, such as a sales report, without adding them to the customer’s cloud environment’s Identity and Access Management (IAM) system. Additionally, end-users do not need to access the software by first logging in through the customer’s cloud provider’s console and having an established IAM identity. The SaaS Façade model enables this capability.

Multi-Cloud

Another potential use case for the SaaS Façade model is implementing a multi-cloud customer architecture. Imagine an ISV’s cloud environment hosted on a single public cloud provider’s platform, while the customer has workloads and data housed on multiple cloud provider’s platforms. The ISV’s product’s back-end would be deployed to multiple cloud provider’s platforms using a common compute construct such as a Linux-based VM (e.g., Amazon EC2, Azure VM, or Google Cloud Compute Engine) or on Kubernetes (e.g., AWS’s EKS, Google Cloud’s GKE, or Azure’ AKS). The ISV product’s data-tier would also be built on a database engine common to most major cloud providers, such as MySQL or PostgreSQL. Similar to the SaaS with Remote Agents model, the ISV’s environment act as a single portal to the customer’s multiple environments and decentralized data sources.

SaaS Façade model with a multi-cloud configuration

In this scenario, the ISV product’s front-end and back-end are common and independent of the cloud provider’s platform. The customer-managed administration interface is also common. Potentially, only the ISV’s deployment, configuration, and monitoring elements may need to have aspects specific to each cloud provider’s platform. For example, Kubernetes is common to AWS, Google Cloud, and Azure. However, the authentication methods, IaC, and API commands to provision a Kubernetes cluster or deploy a containerized application differ between EKS, GKE, and AKS.

Conclusion

In this post, we briefly explored several models used by ISV’s to deliver, manage, and support their software product for cloud-native customers. As cloud adoption continues to grow and the complexity of cloud-based application platforms continues to evolve, ISVs will continue to develop new models for distributing their software products in the cloud.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. Introduction image – Copyright: melpomen (123rf.com).

, , , , ,

Leave a comment

The Art of Building Open Data Lakes with Apache Hudi, Kafka, Hive, and Debezium

Build near real-time, open-source data lakes on AWS using a combination of Apache Kafka, Hudi, Spark, Hive, and Debezium

Introduction

In the following post, we will learn how to build a data lake on AWS using a combination of open-source software (OSS), including Red Hat’s Debezium, Apache Kafka, Kafka Connect, Apache Hive, Apache Spark, Apache Hudi, and Hudi DeltaStreamer. We will use fully-managed AWS services to host the datasource, the data lake, and the open-source tools. These services include Amazon RDS, MKS, EKS, EMR, and S3.

The architecture and workflow demonstrated in this post

This post is an in-depth follow-up to the video demonstration, Building Open Data Lakes on AWS with Debezium and Apache Hudi.

Workflow

As shown in the architectural diagram above, these are the high-level steps in the demonstration’s workflow:

  1. Changes (inserts, updates, and deletes) are made to the datasource, a PostgreSQL database running on Amazon RDS;
  2. Kafka Connect Source Connector, utilizing Debezium and running on Amazon EKS (Kubernetes), continuously reads data from PostgreSQL WAL using Debezium;
  3. Source Connector creates and stores message schemas in Apicurio Registry, also running on Amazon EKS, in Avro format;
  4. Source Connector transforms and writes data in Apache Avro format to Apache Kafka, running on Amazon MSK;
  5. Kafka Connect Sink Connector, using Confluent S3 Sink Connector, reads messages from Kafka topics using schemas from Apicurio Registry;
  6. Sink Connector writes data to Amazon S3 in Apache Avro format;
  7. Apache Spark, using Hudi DeltaStreamer and running on Amazon EMR, reads message schemas from Apicurio Registry;
  8. DeltaStreamer reads raw Avro-format data from Amazon S3;
  9. DeltaStreamer writes data to Amazon S3 as both Copy on Write (CoW) and Merge on Read (MoR) table types;
  10. DeltaStreamer syncs Hudi tables and partitions to Apache Hive running on Amazon EMR;
  11. Queries are executed against Apache Hive Metastore or directly against Hudi tables using Apache Spark, with data returned from Hudi tables in Amazon S3;

The workflow described above actually contains two independent processes running simultaneously. Steps 2–6 represent the first process, the change data capture (CDC) process. Kafka Connect is used to continuously move changes from the database to Amazon S3. Steps 7–10 represent the second process, the data lake ingestion process. Hudi’s DeltaStreamer reads raw CDC data from Amazon S3 and writes the data back to another location in S3 (the data lake) in Apache Hudi table format. When combined, these processes can give us near real-time, incremental data ingestion of changes from the datasource to the Hudi-managed data lake.

Alternatives

This demonstration’s workflow is only one of many possible workflows to achieve similar outcomes. Alternatives include:

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. The specific resources used in this post are found in the debezium_hudi_demo directory of the GitHub repository. There are also two copies of the Museum of Modern Art (MoMA) Collection dataset from Kaggle, specifically prepared for this post, located in the moma_data directory. One copy is a nearly full dataset, and the other is a smaller, cost-effective dev/test version.

Kafka Connect

In this demonstration, Kafka Connect runs on Kubernetes, hosted on the fully-managed Amazon Elastic Kubernetes Service (Amazon EKS). Kafka Connect runs the Source and Sink Connectors.

Source Connector

The Kafka Connect Source Connector, source_connector_moma_postgres_kafka.json, used in steps 2–4 of the workflow, utilizes Debezium to continuously read changes to an Amazon RDS for PostgreSQL database. The PostgreSQL database hosts the MoMA Collection in two tables: artists and artworks.

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<your_database_hostname>",
"database.port": "5432",
"database.user": "<your_username>",
"database.password": "<your_password>",
"database.dbname": "moma",
"database.server.name": "moma",
"table.include.list": "public.artists,public.artworks",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,db,table,schema,lsn,source.ts_ms"
}

The Debezium Connector for PostgreSQL reads record-level insert, update, and delete entries from PostgreSQL’s write-ahead log (WAL). According to the PostgreSQL documentation, changes to data files must be written only after log records describing the changes have been flushed to permanent storage, thus the name, write-ahead log. The Source Connector then creates and stores Apache Avro message schemas in Apicurio Registry also running on Amazon EKS.

Apicurio Registry UI showing Avro-format Kafka message schemas
Apicurio Registry UI showing part of Avro-format Kafka message value schema for artists

Finally, the Source Connector transforms and writes Avro format messages to Apache Kafka running on the fully-managed Amazon Managed Streaming for Apache Kafka (Amazon MSK). Assuming Kafka’s topic.creation.enable property is set to true, Kafka Connect will create any necessary Kafka topics, one per database table.

Below, we see an example of a Kafka message representing an insert of a record with the artist_id 1 in the MoMA Collection database’s artists table. The record was read from the PostgreSQL WAL, transformed, and written to a corresponding Kafka topic, using the Debezium Connector for PostgreSQL. The first version represents the raw data before being transformed by Debezium. Note that the type of operation (_op) indicates a read (r). Possible values include c for create (or insert), u for update, d for delete, and r for read (applies to snapshots).

{
"payload": {
"before": null,
"after": {
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992
},
"source": {
"version": "1.7.0.Final",
"connector": "postgresql",
"name": "moma",
"ts_ms": 1640703877051,
"snapshot": "true",
"db": "moma",
"sequence": "[null,\"3668170506336\"]",
"schema": "public",
"table": "artists",
"txId": 217094,
"lsn": 3668170506336,
"xmin": null
},
"op": "r",
"ts_ms": 1640703877051,
"transaction": null
}
}

The next version represents the same record after being transformed by Debezium using the event flattening single message transformation (unwrap SMT). The final message structure represents the schema stored in Apicurio Registry. The message structure is identical to the structure of the data written to Amazon S3 by the Sink Connector.

{
"payload": {
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992,
"__op": "r",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3668438941792,
"__source_ts_ms": 1640705109121,
"__deleted": "false"
}
}

Sink Connector

The Kafka Connect Sink Connector, sink_connector_moma_kafka_s3.json, used in steps 5–6 of the workflow, implements the Confluent S3 Sink Connector. The Sink Connector reads the Avro-format messages from Kafka using the schemas stored in Apicurio Registry. It then writes the data to Amazon S3, also in Apache Avro format, based on the same schemas.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "moma.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "<your_data_lake_bucket>",
"s3.part.size": 5242880,
"flush.size": 10000,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"behavior.on.null.values": "ignore",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}

Running Kafka Connect

We first start Kafka Connect in the background to be the CDC process.

bin/connect-distributed.sh \
config/connect-distributed.properties \
> /dev/null 2>&1 &
tail -f logs/connect.log

Then, deploy the Kafka Connect Source and Sink Connectors using Kafka Connect’s RESTful API. Using the API, we can also confirm the status of the Connectors.

curl -s -d @"config/source_connector_moma_postgres_kafka.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/source_connector_moma_postgres_kafka/config | jq
curl -s -d @"config/sink_connector_moma_kafka_s3.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/sink_connector_moma_kafka_s3/config | jq
curl -s -X GET http://localhost:8083/connectors | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/source_connector_moma_postgres_kafka/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/sink_connector_moma_kafka_s3/status | jq

To confirm the two Kafka topics, moma.public.artists and moma.public.artworks, were created and contain Avro messages, we can use Kafka’s command-line tools.

# list kafka topics
bin/kafka-topics.sh –list \
–bootstrap-server $BBROKERS \
–command-config config/client-iam.properties
# read first 5 avro-format (binary) messages from topic
bin/kafka-console-consumer.sh \
–topic moma.public.artists \
–from-beginning \
–max-messages 5 \
–property print.value=true \
–property print.offset=true \
–bootstrap-server $BBROKERS \
–consumer.config config/client-iam.properties

In the short video-only clip below, we see the process of deploying the Kafka Connect Source and Sink Connectors and confirming they are working as expected.

Deploying and starting the Kafka Connect Source and Sink Connectors

The Sink Connector writes data to Amazon S3 in batches of 10k messages or every 60 seconds (one-minute intervals). These settings are configurable and highly dependent on your requirements, including message volume, message velocity, real-time analytics requirements, and available compute resources.

Amazon S3 objects containing MoMA Collection artwork records from PostgreSQL

Since we will not be querying this raw Avro-format CDC data in Amazon S3 directly, there is no need to catalog this data in Apache Hive or AWS Glue Data Catalog, a fully-managed Hive-compatible metastore.

Apache Hudi

According to the overview, Apache Hudi (pronounced “hoodie”) is the next-generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality to data lakes. Hudi provides tables, transactions, efficient upserts and deletes, advanced indexes, streaming ingestion services, data clustering, compaction optimizations, and concurrency, all while keeping data in open source file formats.

Without Hudi or an equivalent open-source data lake table format such as Apache Iceberg or Databrick’s Delta Lake, most data lakes are just of bunch of unmanaged flat files. Amazon S3 cannot natively maintain the latest view of the data, to the surprise of many who are more familiar with OLTP-style databases or OLAP-style data warehouses.

DeltaStreamer

DeltaStreamer, aka the HoodieDeltaStreamer utility (part of the hudi-utilities-bundle), used in steps 7–10 of the workflow, provides the way to perform streaming ingestion of data from different sources such as Distributed File System (DFS) and Apache Kafka.

Optionally, HoodieMultiTableDeltaStreamer, a wrapper on top of HoodieDeltaStreamer, ingests multiple tables in a single Spark job, into Hudi datasets. Currently, it only supports sequential processing of tables to be ingested and Copy on Write table type.

We are using HoodieDeltaStreamer to write to both Merge on Read (MoR) and Copy on Write (CoW) table types for demonstration purposes only. The MoR table type is a superset of the CoW table type, which stores data using a combination of columnar-based (e.g., Apache Parquet) plus row-based (e.g., Apache Avro) file formats. Updates are logged to delta files and later compacted to produce new versions of columnar files synchronously or asynchronously. Again, the choice of table types depends on your requirements.

Trade-offs between Hudi table types (table courtesy Apache Hudi documentation)
Trade-offs between Hudi table types (table courtesy Apache Hudi documentation)

Amazon EMR

For this demonstration, I’ve used the recently released Amazon EMR version 6.5.0 configured with Apache Spark 3.1.2 and Apache Hive 3.1.2. EMR 6.5.0 runs Scala version 2.12.10, Python 3.7.10, and OpenJDK Corretto-8.312. I have included the AWS CloudFormation template and parameters file used to create the EMR cluster, on GitHub.

When choosing Apache Spark, Apache Hive, or Presto on EMR 6.5.0, Apache Hudi release 0.9.0 is automatically installed.

Amazon EMR Master Node showing Apache Hudi related resources

DeltaStreamer Configuration

Below, we see the DeltaStreamer properties file, deltastreamer_artists_apicurio_mor.properties. This properties file is referenced by the Spark job that runs DeltaStreamer, shown next. The file contains properties related to the datasource, the data sink, and Apache Hive. The source of the data for DeltaStreamer is the CDC data written to Amazon S3. In this case, the datasource is the objects located in the /topics/moma.public.artworks/partition=0/ S3 object prefix. The data sink is a Hudi MoR table type in Amazon S3. DeltaStreamer will write Parquet data, partitioned by the artist’s nationality, to the /moma_mor/artists/ S3 object prefix. Lastly, DeltaStreamer will sync all tables and table partitions to Apache Hive, including creating the Hive databases and tables if they do not already exist.

# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and SchemaRegistryProvider
include=base.properties
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=moma_mor
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields=nationality
hoodie.datasource.hive_sync.table=artists
hoodie.datasource.write.drop.partition.columns=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=nationality
hoodie.datasource.write.recordkey.field=artist_id
hoodie.deltastreamer.schemaprovider.registry.url=http://<your_registry_url:post>/apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_bucket>/topics/moma.public.artists/partition=0/
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
# https://dacort.dev/posts/updating-partition-values-with-apache-hudi/
# This is required if we want to ensure we upsert a record, even if the partition changes
hoodie.index.type=GLOBAL_BLOOM
# This is required to write the data into the new partition
# defaults to false in Apache Hudi 0.8.0 (EMR 6.4.0), true in Hudi 0.9.0 (EMR 6.5.0)
# hoodie.bloom.index.update.partition.path=true
DeltaStreamer properties file for artists data using MoR table type

Below, we see the equivalent DeltaStreamer properties file for the MoMA artworks, deltastreamer_artworks_apicurio_mor.properties. There are also comparable DeltaStreamer property files for the Hudi CoW tables on GitHub.

# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and SchemaRegistryProvider
include=base.properties
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=moma_mor
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields=classification
hoodie.datasource.hive_sync.table=artworks
hoodie.datasource.write.drop.partition.columns=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=classification
hoodie.datasource.write.recordkey.field=artwork_id
hoodie.deltastreamer.schemaprovider.registry.url=http://<your_registry_url:post>/apis/ccompat/v6/subjects/moma.public.artworks-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_bucket>/topics/moma.public.artworks/partition=0/
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
# https://dacort.dev/posts/updating-partition-values-with-apache-hudi/
# This is required if we want to ensure we upsert a record, even if the partition changes
hoodie.index.type=GLOBAL_BLOOM
# This is required to write the data into the new partition
# defaults to false in Apache Hudi 0.8.0 (EMR 6.4.0), true in Hudi 0.9.0 (EMR 6.5.0)
# hoodie.bloom.index.update.partition.path=true
DeltaStreamer properties file for artworks data using MoR table type

All DeltaStreamer property files reference Apicurio Registry for the location of the Avro schemas. The schemas are used by both the Kafka Avro-format messages and the CDC-created Avro-format files in Amazon S3. Due to DeltaStreamer’s coupling with Confluent Schema Registry, as opposed to other registries, we must use Apicurio Registry’s Confluent Schema Registry API (Version 6) compatibility API endpoints (e.g., /apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest) when using the org.apache.hudi.utilities.schema.SchemaRegistryProvider datasource option with DeltaStreamer. According to Apicurio, to provide compatibility with Confluent SerDes (Serializer/Deserializer) and other clients, Apicurio Registry implements the API defined by the Confluent Schema Registry.

Apicurio Registry exposes multiple APIs

Running DeltaStreamer

The properties files are loaded by Spark jobs that call the DeltaStreamer library, using spark-submit. Below, we see an example Spark job that calls the DeltaStreamer class. DeltaStreamer reads the raw Avro-format CDC data from S3 and writes the data using the Hudi MoR table type into the /moma_mor/artists/ S3 object prefix. In this Spark particular job, we are using the continuous option. DeltaStreamer runs in continuous mode using this option, running source-fetch, transform, and write in a loop. We are also using the UPSERT write operation (op). Operation options include UPSERT, INSERT, and BULK_INSERT. This set of options is ideal for inserting ongoing changes to CDC data into Hudi tables. You can run jobs in the foreground or background on EMR’s Master Node or as EMR Steps from the Amazon EMR console.

export DATA_LAKE_BUCKET="<your_data_lake_bucket_name>"
# artists data, MoR table type, continuous upserts
spark-submit \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–props "s3://${DATA_LAKE_BUCKET}/hudi/deltastreamer_artists_apicurio_mor.properties" \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–target-base-path "s3://${DATA_LAKE_BUCKET}/moma/artists_mor/" \
–target-table moma_mor.artists \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–enable-sync \
–continuous \
–op UPSERT

Below, we see another example DeltaStreamer Spark job that reads the raw Avro-format CDC data from S3 and writes the data using the MoR table type into the /moma_mor/artworks/ S3 object prefix. This example uses the BULK_INSERT write operation (op) and the filter-dupes option. The filter-dupes option ensures that should duplicate records from the source are dropped/filtered out before INSERT or BULK_INSERT. This set of options is ideal for the initial bulk inserting of existing data into Hudi tables. The job runs one time and completes, unlike the previous example that ran continuously.

export DATA_LAKE_BUCKET="<your_data_lake_bucket_name>"
# artworks data, MoR table type, 1x bulk insert
spark-submit \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–props "s3://${DATA_LAKE_BUCKET}/hudi/deltastreamer_artworks_apicurio_mor.properties" \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–target-base-path "s3://${DATA_LAKE_BUCKET}/moma/artworks_mor/" \
–target-table moma_mor.artworks \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–enable-sync \
–op BULK_INSERT \
–filter-dupes

Syncing with Hive

The following abridged, video-only clip demonstrates the differences between the Hudi CoW and MoR table types with respect to Apache Hive. In the video, we run the deltastreamer_jobs_bulk_bkgd.sh script, included on GitHub. This script runs four different Apache Spark jobs, using Hudi DeltaStreamer to bulk-ingest all the artists and artworks CDC data from Amazon S3 into both Hudi CoW and MoR table types. Once the four Spark jobs are complete, the script queries Apache Hive and displays the new Hive databases and database tables created by DeltaStreamer.

Hudi DeltaStreamer Spark jobs running on the Amazon EMR

In both the video above and terminal screengrab below, note the difference in the tables created within the two Hive databases, the Hudi CoW table type (moma_cow) and the MoR table type (moma_mor). The MoR table type creates both a read-optimized table (_ro) as well as a real-time table (_rt) for each datasource (e.g., artists_ro and artists_rt).

View of the Apache Hive CoW and MoR database tables

According to documentation, Hudi creates two tables in the Hive metastore for the MoR table type. The first, a table which is a read-optimized view appended with _ro and the second, a table with the same name appended with _rt which is a real-time view. According to Hudi, the read-optimized view exposes columnar Parquet while the real-time view exposes columnar Parquet and/or row-based logs; you can query both tables. The CoW table type creates a single table without a suffix for each datasource (e.g., artists). Below, we see the Hive table structure for the artists_rt table, created by DeltaStreamer, using SHOW CREATE TABLE moma_mor.artists_rt;.

CREATE EXTERNAL TABLE `moma_mor.artists_rt`(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`artist_id` int,
`name` string,
`gender` string,
`birth_year` int,
`death_year` int,
`__op` string,
`__db` string,
`__table` string,
`__schema` string,
`__lsn` bigint,
`__source_ts_ms` bigint,
`__deleted` string)
PARTITIONED BY (
`nationality` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'hoodie.query.as.ro.table'='false',
'path'='s3://<your_data_lake_bucket>/moma/artists_mor')
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://<your_data_lake_bucket>/moma/artists_mor'
TBLPROPERTIES (
'bucketing_version'='2',
'last_commit_time_sync'='20211230180429',
'spark.sql.partitionProvider'='catalog',
'spark.sql.sources.provider'='hudi',
'spark.sql.sources.schema.numPartCols'='1',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"artist_id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"gender","type":"string","nullable":true,"metadata":{}},{"name":"birth_year","type":"integer","nullable":true,"metadata":{}},{"name":"death_year","type":"integer","nullable":true,"metadata":{}},{"name":"__op","type":"string","nullable":true,"metadata":{}},{"name":"__db","type":"string","nullable":true,"metadata":{}},{"name":"__table","type":"string","nullable":true,"metadata":{}},{"name":"__schema","type":"string","nullable":true,"metadata":{}},{"name":"__lsn","type":"long","nullable":true,"metadata":{}},{"name":"__source_ts_ms","type":"long","nullable":true,"metadata":{}},{"name":"__deleted","type":"string","nullable":true,"metadata":{}},{"name":"nationality","type":"string","nullable":true,"metadata":{}}]}',
'spark.sql.sources.schema.partCol.0'='nationality',
'transient_lastDdlTime'='1640919578')

Having run the demonstration’s deltastreamer_jobs_bulk_bkgd.sh script, the resulting object structure in the Hudi-managed section of the Amazon S3 bucket looks as follows.

S3 object structure in Hudi-managed Amazon S3 bucket

Below is an example of Hudi files created in the /moma/artists_cow/ S3 object prefix. When using data lake table formats like Hudi, given its specialized directory structure and the high number of objects, interactions with the data should be abstracted through Hudi’s programming interfaces. Generally speaking, you do not interact directly with the objects in a data lake.

"moma/artists_cow/.hoodie/.aux/.bootstrap/.fileids_$folder$",
"moma/artists_cow/.hoodie/.aux/.bootstrap/.partitions_$folder$",
"moma/artists_cow/.hoodie/.aux/.bootstrap_$folder$",
"moma/artists_cow/.hoodie/.aux_$folder$",
"moma/artists_cow/.hoodie/.temp_$folder$",
"moma/artists_cow/.hoodie/20211231203737.commit",
"moma/artists_cow/.hoodie/20211231203737.commit.requested",
"moma/artists_cow/.hoodie/20211231203737.inflight",
"moma/artists_cow/.hoodie/20211231203738.rollback",
"moma/artists_cow/.hoodie/20211231203738.rollback.inflight",
"moma/artists_cow/.hoodie/archived_$folder$",
"moma/artists_cow/.hoodie/hoodie.properties",
"moma/artists_cow/.hoodie_$folder$",
"moma/artists_cow/nationality=Afghan/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Afghan/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-0_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Afghan_$folder$",
"moma/artists_cow/nationality=Albanian/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Albanian/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-1_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Albanian_$folder$",
"moma/artists_cow/nationality=Algerian/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Algerian/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-2_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Algerian_$folder$",
"moma/artists_cow/nationality=American/.hoodie_partition_metadata",
"moma/artists_cow/nationality=American/0065ed77-4a6c-4755-b133-45126310936d-0_502-28-3854_20211231203737.parquet",
"moma/artists_cow/nationality=American/011d5c57-c918-40d8-8518-c3cb56747133-0_15-28-3367_20211231203737.parquet"
Hudi CLI commands used in the next video

Hudi CLI

Optionally, we can inspect the Hudi tables using the Hudi CLI (hudi-cli). The CLI offers an extensive list of available commands. Using the CLI, we can inspect the Hudi tables and their schemas, and review operational statistics like write amplification (the number of bytes written for 1 byte of incoming data), commits, and compactions.

> hudi-cli
help
connect –path s3://<your_data_lake_bucket>/moma/artworks_mor/
connect –path s3://<your_data_lake_bucket>/moma/artworks_cow/
desc
fetch table schema
commits show
stats wa
compactions show all
Using the Hudi CLI from the Amazon EMR Master Node

The following short video-only clip shows the use of the Hudi CLI, running on the Amazon EMR Master Node, to inspect the Hudi tables in S3.

Using the Hudi CLI from the Amazon EMR Master Node

Hudi Data Structure

Recall the sample Kafka message we saw earlier in the post representing an insert of an artist record with the artist_id 1. Below, we see what the same record looks like after being ingested by Hudi DeltaStreamer. Note the five additional fields added by Hudi with the _hoodie_ prefix.

{
"_hoodie_commit_time": "20211227215352",
"_hoodie_commit_seqno": "20211227215352_63_7301",
"_hoodie_record_key": "1",
"_hoodie_partition_path": "nationality=American",
"_hoodie_file_name": "0e91bb5b-aa93-42a9-933d-242f5fda1b8f-0_63-24-4710_20211227215352.parquet",
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992,
"__op": "r",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3637434647944,
"__source_ts_ms": 1640566580452,
"__deleted": "false"
}

Querying Hudi-managed Data

With the initial data ingestion complete and the CDC and DeltaStreamer processes monitoring for future changes, we can query the resulting data stored in Hudi tables. First, we will make some changes to the PostgreSQL MoMA Collection database to see how Hudi manages the data mutations. We could also make changes directly to the Hudi tables using Hive, Spark, or Presto. However, that would cause our datasource to be out of sync with the Hudi tables, potentially negating the entire CDC process. When developing a data lake, this is a critically important consideration — how changes are introduced to Hudi tables, especially when CDC is involved, and whether data continuity between datasources and the data lake is essential.

For the demonstration, I have made a series of arbitrary updates to a piece of artwork in the MoMA Collection database, ‘Picador (La Pique)’ by Pablo Picasso.

'Picador (La Pique)', by Pablo Picasso
SELECT *
FROM artworks
WHERE artwork_id = 128447 AND classification = 'Print';
firts update (creation date)
UPDATE artworks
SET date = 1959
WHERE artwork_id = 128447;
second update (acquisition date)
UPDATE artworks
SET acquisition_date = '2009-04-15'
WHERE artwork_id = 128447;
third update (in vs. '')
UPDATE artworks
SET dimensions = 'composition: 20 13/16 x 25 3/16 in (52.9 x 64 cm); sheet: 24 7/16 x 29 1/2 in (62.1 x 75 cm)'
WHERE artwork_id = 128447;
fourth update (acquisition date)
UPDATE artworks
SET acquisition_date = '2009-04-19'
WHERE artwork_id = 128447;

Below, note the last four objects shown in S3. Judging by the file names and dates, we can see that the CDC process, using Kafka Connect, has picked up the four updates I made to the record in the database. The Source Connector first wrote the changes to Kafka. The Sink Connector then read those Kafka messages and wrote the data to Amazon S3 in Avro format, as shown below.

Looking again at S3, we can also observe that DeltaStreamer picked up the new CDC objects in Amazon S3 and wrote them to both the Hudi CoW and MoR tables. Note the file types shown below. Given Hudi’s MoR table type structure, Hudi first logged the changes to row-based delta files and later compacted them to produce a new version of the columnar-format Parquet file.

Hudi MoR row-based delta log files and compacted Parquet files

Querying Results from Apache Hive

There are several ways to query Hudi-managed data in S3. In this demonstration, they include against Apache Hive using the hive client from the command line, against Hive using Spark, and against the Hudi tables also using Spark. We could also install Presto on EMR to query the Hudi data directly or via Hive.

Querying the real-time artwork_rt table in Hive after we make each database change, we can observe the data in Hudi reflects the updates. Note that the value of the _hoodie_file_name field for the first three updates is a Hudi delta log file, while the value for the last update is a Parquet file. The Parquet file signifies compaction occurred between the fourth update was made, and the time the Hive query was executed. Lastly, note the type of operation (_op) indicates an update change (u) for all records.

Querying the data in the Hudi MoR real-time table as we make changes to the database

Once all fours database updates are complete and compaction has occurred, we should observe identical results from all Hive tables. Below, note the _hoodie_file_name field for all three tables is a Parquet file. Logically, the Parquet file for the MoR read-optimized and real-time Hive tables is the same.

Querying the same record in all three Hive tables: Hudi MoR _ro and _rt tables and CoW table

Had we queried the data previous to compaction, the results would have differed. Below we have three queries. I further updated the artwork record, changing the date field from 1959 to 1960. The read-optimized MoR table, artworks_ro, still reflects the original date value, 1959, before the update and prior to compaction. The real-time table,artworks_rt , reflects the latest update to the date field, 1960. Note that the value of the _hoodie_file_name field for the read-optimized table is a Parquet file, while the value for the real-time table (artworks_rt), the third and final query, is a delta log file. The delta log allows the real-time table to display the most current state of the data in Hudi.

Querying the same record in all three Hive tables

Below are a few useful Hive commands to query the changes in Hudi.

beeline or hive
beeline connect
!connect jdbc:hive2://localhost:10000/default
SHOW DATABASES;
DESCRIBE DATABASE moma_mor;
USE moma_cow;SHOW TABLES;
USE moma_mor;SHOW TABLES;
USE moma_mor;DESCRIBE artworks_ro;
MSCK REPAIR TABLE moma_mor.artworks_ro;
SHOW PARTITIONS moma_mor.artworks_ro;
ANALYZE TABLE moma_mor.artists_rt COMPUTE STATISTICS;
DESCRIBE EXTENDED moma_mor.artists_rt;
test query performance without caching
set hive.query.results.cache.enabled=false;
100 rows selected (1.394 seconds) <- read-optimized vs. real-time table
SELECT * FROM moma_mor.artworks_ro WHERE department='Prints & Illustrated Books' LIMIT 100;
100 rows selected (2.371 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE department='Prints & Illustrated Books' LIMIT 100;
10 rows selected (0.719 seconds) <- read-optimized vs. real-time table, classification is partitioned
SELECT * FROM moma_mor.artworks_ro WHERE classification='Print' LIMIT 10;
10 rows selected (1.482 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE classification='Print' LIMIT 10;
EXPLAIN EXTENDED SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447 AND classification='Print';
1 row selected (14.126 seconds) <- read-optimized vs. real-time table
SELECT * FROM moma_mor.artworks_ro WHERE artwork_id=128447;
1 row selected (32.877 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447;
1 row selected (1.491 seconds) <- classification is partitioned
SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447 AND classification='Print';
84 rows selected (8.618 seconds)
SELECT artworks.title AS title,
artworks.`date` AS created,
artworks.name AS artist,
artists.nationality AS nationality,
artworks.classification AS classification
FROM moma_cow.artworks artworks
JOIN moma_cow.artists artists ON (artworks.artist_id = artists.artist_id)
WHERE artworks.artist_id = 4609
AND nationality = 'Spanish'
AND classification = 'Print'
AND artworks.`date` IS NOT NULL
ORDER BY created, title;

Deletes with Hudi

In addition to inserts and updates (upserts), Apache Hudi can manage deletes. Hudi supports implementing two types of deletes on data stored in Hudi tables: soft deletes and hard deletes. Given this demonstration’s specific configuration for CDC and DeltaStreamer, we will use soft deletes. Soft deletes retain the record key and nullify the other field’s values. Hard deletes, a stronger form of deletion, physically remove any record trace from the Hudi table.

Below, we see the CDC record for the artist with artist_id 441. The event flattening single message transformation (SMT), used by the Debezium-based Kafka Connect Source Connector, adds the __deleted field with a value of true and nullifies all fields except the record’s key, artist_id, which is required.

{
"artist_id" : 441,
"name" : null,
"nationality" : null,
"gender" : null,
"birth_year" : null,
"death_year" : null,
"__op" : {
"string" : "d"
},
"__db" : {
"string" : "moma"
},
"__table" : {
"string" : "artists"
},
"__schema" : {
"string" : "public"
},
"__lsn" : {
"long" : 3692866569488
},
"__source_ts_ms" : {
"long" : 1640814436010
},
"__deleted" : {
"string" : "true"
}
}

Below, we see the same delete record for the artist with artist_id 441 in the Hudi MoR table. All the null fields have been removed.

{
"_hoodie_commit_time": "20211229225047",
"_hoodie_commit_seqno": "20211229225047_1_1",
"_hoodie_record_key": "441",
"_hoodie_partition_path": "nationality=default",
"_hoodie_file_name": "2a98931a-6015-438e-be78-1eff80a75f83-2_1-24-15431_20211229225047.parquet",
"artist_id": 441,
"__op": "d",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3692866569488,
"__source_ts_ms": 1640814436010,
"__deleted": "true"
}

Below, we see how the deleted record appears in the three Hive CoW and MoR artwork tables. Note the query results from the read-optimized MoR table, artworks_ro, contains two records — the original record (r) and the deleted record (d). The data is partitioned by nationality, and since the record was deleted, the nationality field is changed to null. In S3, Hudi represents this partition as nationality=default. The record now exists in two different Parquet files, within two separate partitions, something to be aware of when querying the read-optimized MoR table.

Results of a database delete as shown in Hive CoW and MoR tables

Time Travel

According to the documentation, Hudi has supported time travel queries since version 0.9.0. With time travel, you can query the previous state of your data. Time travel is particularly useful for use cases, including rollbacks, debugging, and audit history.

To demonstrate time travel queries in Hudi, we start by making some additional changes to the source database. For this demonstration, I made a series of five updates and finally a delete to the artist record with artist_id 299 in the PostgreSQL database over a few-hour period.

first update (birth)
UPDATE public.artists
SET birth_year = 1907
WHERE artist_id = 299;
second update (death)
UPDATE public.artists
SET death_year = 1989
WHERE artist_id = 299;
third update (middle initial)
UPDATE public.artists
SET name = 'Gerhard M. Bakker'
WHERE artist_id = 299;
fourth update (nationality – impacts partitions)
UPDATE public.artists
SET nationality = 'German'
WHERE artist_id = 299;
fifth update (birth)
UPDATE public.artists
SET birth_year = 1905
WHERE artist_id = 299;
delete
DELETE
FROM public.artists
WHERE artist_id = 299;

Once the CDC and DeltaStreamer ingestion processes are complete, we can use Hudi’s time travel query capability to view the state of data in Hudi at different points in time (instants). To do so, we need to provide an as.an.instant date/time value to Spark (see line 21 below).

Based on the time period in which I made the five updates and the delete, I have chosen six instants during that period where I want to examine the state of the record. Below is an example of the PySpark code from a Jupyter Notebook used to perform the six time travel queries against the Hudi MoR artist’s table.

from datetime import timedelta
from dateutil import parser
base_path = "s3://open-data-lake-demo-us-east-1/moma/artists_mor"
instances = [ # times in EST
"2021-12-30 08:00:00", # reflects original record (r)
"2021-12-30 09:00:00", # refects updates 1 and 2 (u)
"2021-12-30 09:30:00", # refects updates 3 (u)
"2021-12-30 11:00:00", # refects updates 4 (u)
"2021-12-30 12:30:00", # refects updates 5 (u)
"2021-12-30 14:00:00", # refects delete (d)
]
for instant in instants:
as_of_instant = parser.parse(instant) + timedelta(hours=5) # adjust EST for UTC
print(f"Record state as of: {as_of_instant}")
artistsSnapshotDF = (
spark.read.format("hudi").option("as.of.instant", as_of_instant).load(base_path)
)
artistsSnapshotDF.createOrReplaceTempView("hudi_artists_snapshot")
spark.sql(
"""
SELECT _hoodie_commit_time, __op, _hoodie_partition_path, name, nationality, gender, birth_year, death_year
FROM hudi_artists_snapshot
WHERE artist_id=299;
"""
).show()

Below, we see the results of the time travel queries. At each instant, we can observe the mutating state of the data in the Hudi MoR Artist’s table, including the initial bulk insert of the existing snapshot of data (r) and the delete record (d). Since the delete made in the PostgreSQL database was recorded as a soft delete in Hudi, as opposed to a hard delete, we are still able to retrieve the record at any instant.

Record state as of: 2021-12-30 13:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230034812| r| nationality=American|Gerhard H. Bakker| American| Male| 1906| 1988|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 14:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230132628| u| nationality=American|Gerhard H. Bakker| American| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 14:30:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230142035| u| nationality=American|Gerhard M. Bakker| American| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 16:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230144237| u| nationality=German|Gerhard M. Bakker| German| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 17:30:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230171925| u| nationality=German|Gerhard M. Bakker| German| Male| 1905| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 19:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230180429| d| nationality=default| null| null| null| null| null|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Results of the time travel queries, ordered by commit time

In addition to time travel queries, Hudi also offers incremental queries and point in time queries.

Conclusion

Although this post only scratches the surface of the capabilities of Debezium and Hudi, you can see the power of CDC using Kafka Connect and Debezium, combined with Hudi, to build and manage open data lakes on AWS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

DevOps for DataOps: Building a CI/CD Pipeline for Apache Airflow DAGs

Build an effective CI/CD pipeline to test and deploy your Apache Airflow DAGs to Amazon MWAA using GitHub Actions

Introduction

In this post, we will learn how to use GitHub Actions to build an effective CI/CD workflow for our Apache Airflow DAGs. We will use the DevOps concepts of Continuous Integration and Continuous Delivery to automate the testing and deployment of Airflow DAGs to Amazon Managed Workflows for Apache Airflow (Amazon MWAA) on AWS.

Fork and pull model of collaborative Airflow development used in this post

Technologies

Apache Airflow

According to the documentation, Apache Airflow is an open-source platform to author, schedule, and monitor workflows programmatically. With Airflow, you author workflows as Directed Acyclic Graphs (DAGs) of tasks written in Python.

Amazon Managed Workflows for Apache Airflow

According to AWS, Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a highly available, secure, and fully-managed workflow orchestration for Apache Airflow. MWAA automatically scales its workflow execution capacity to meet your needs and is integrated with AWS security services to help provide fast and secure access to data.

Example of Apache Airflow UI within Amazon MWAA Environment

GitHub Actions

According to GitHub, GitHub Actions makes it easy to automate software workflows with CI/CD. GitHub Actions allow you to build, test, and deploy code right from GitHub. GitHub Actions are workflows triggered by GitHub events like push, issue creation, or a new release. You can leverage GitHub Actions prebuilt and maintained by the community.

Example of GitHub Action workflow running in the GitHub repository used in this post

If you are new to GitHub Actions, I recommend my previous post, Continuous Integration and Deployment of Docker Images using GitHub Actions.

Terminology

DataOps

According to Wikipedia, DataOps is an automated, process-oriented methodology used by analytic and data teams to improve the quality and reduce the cycle time of data analytics. While DataOps began as a set of best practices, it has now matured to become a new approach to data analytics.

DataOps applies to the entire data lifecycle from data preparation to reporting and recognizes the interconnected nature of the data analytics team and IT operations. DataOps incorporates the Agile methodology to shorten the software development life cycle (SDLC) of analytics development.

DevOps

According to Wikipedia, DevOps is a set of practices that combines software development (Dev) and IT operations (Ops). It aims to shorten the systems development life cycle and provide continuous delivery with high software quality.

DevOps is a set of practices intended to reduce the time between committing a change to a system and the change being placed into normal production, while ensuring high quality. -Wikipedia

Fail Fast

According to Wikipedia, a fail-fast system is one that immediately reports any condition that is likely to indicate a failure. Using the DevOps concept of fail fast, we build steps into our workflows to uncover errors sooner in the SDLC. We shift testing as far to the left as possible (referring to the pipeline of steps moving from left to right) and test at multiple points along the way.

Source Code

All source code for this demonstration, including the GitHub Actions, Pytest unit tests, and Git Hooks, is open-sourced and located on GitHub.

Architecture

The diagram below represents the architecture for a recent blog post and video demonstration, Lakehouse Automation on AWS with Apache Airflow. The post and video show how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based data lake using Apache Airflow.

Architecture for the post and video, Lakehouse Automation on AWS with Apache Airflow

In this post, we will review how the DAGs from the previous were developed, tested, and deployed to MWAA using a variety of progressively more effective CI/CD workflows. The workflows demonstrated could also be easily applied to other Airflow resources in addition to DAGs, such as SQL scripts, configuration and data files, Python requirement files, and plugins.

Workflows

No DevOps

Below we see a minimally viable workflow for loading DAGs into Amazon MWAA, which does not use the principles of CI/CD. Changes are made in the local Airflow developer’s environment. The modified DAGs are copied directly to the Amazon S3 bucket, which are then automatically synced with Amazon MWAA, barring any errors. Those changes are also (hopefully) pushed back to the centralized version control or source code management (SCM) system, which is GitHub in this post.

Error-prone, non-DevOps workflow for modifying and syncing DAGs to MWAA

There are at least two significant issues with this error-prone workflow. First, the DAGs are always out of sync between the Amazon S3 bucket and GitHub. These are two independent steps — copying or syncing the DAGs to S3 and pushing the DAGs to GitHub. A developer might continue making changes and pushing DAGs to S3 without pushing to GitHub or vice versa.

Secondly, the DevOps concept of fail-fast is missing. The first time you know your DAG contains errors is likely when it is synced to MWAA and throws an Import Error. By then, the DAG has already been copied to S3, synced to MWAA, and possibly pushed to GitHub, which other developers could then pull.

Example of a typical DAG Import Error, easily caught with a simple test

GitHub Actions

A significant step up from the previous workflow is using GitHub Actions to test and deploy your code after pushing it to GitHub. Although in this workflow, code is still ‘pushed straight to Trunk’ (the main branch in GitHub) and risks other developers in a collaborative environment pulling potentially erroneous code, you have far less chance of DAG errors making it to MWAA.

GitHub Actions allow you to fail faster and catch errors sooner

Using GitHub Actions, you also eliminate human error that could result in the changes to DAGs not being synced to Amazon S3. Lastly, using this workflow improves security by eliminating the need to provide direct access to the Airflow Amazon S3 bucket to Airflow Developers.

Fork and pull model of collaborative Airflow development used in this post (video only)

Types of Tests

The first GitHub Action, test_dags.yml, is triggered on a push to the dags directory in the main branch of the repository. It is also triggered whenever a pull request is made for the main branch. The first GitHub Action runs a battery of tests, including checking Python dependencies, code style, code quality, DAG import errors, and unit tests. The tests catch issues with DAGs before being synced to S3 by a second GitHub Action.

name: Test DAGs

on:
  push:
    paths:
      - 'dags/**'
  pull_request:
    branches:
      - main

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.7'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements/requirements.txt
        pip check
    - name: Lint with Flake8
      run: |
        pip install flake8
        flake8 --ignore E501 dags --benchmark -v
    - name: Confirm Black code compliance (psf/black)
      run: |
        pip install pytest-black
        pytest dags --black -v
    - name: Test with Pytest
      run: |
        pip install pytest
        cd tests || exit
        pytest tests.py -v

Successful runs of the ‘Test DAGs’ GitHub Action, shown in the Actions Console

Python Dependencies

The first test installs the modules listed in the requirements.txt file used locally to develop the application. This test is designed to uncover any missing or conflicting modules.

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements/requirements.txt
pip check

It is essential to develop your DAGs against the same version of Python and with the same version of the Python modules used in your Airflow environment. You can use the BashOperator to run shell commands to obtain the versions of Python and module installed in your Airflow environment:

python3 --version; python3 -m pip list

A snippet of log output from DAG showing Python version and Python modules available in MWAA 2.0.2:

Python version and Python modules available in MWAA 2.0.2

The latest stable release of Airflow is currently version 2.2.2, released 2021-11-15. However, as of December 2021, Amazon’s latest version of MWAA 2.x is version 2.0.2, released 2021-04-19. MWAA 2.0.2 currently runs Python3 version 3.7.10.

Available versions of Amazon MWAA as of December 2021

Flake8

Known as ‘your tool for style guide enforcement,’ Flake8 is described as the modular source code checker. It is a command-line utility for enforcing style consistency across Python projects. Flake8 is a wrapper around PyFlakes, pycodestyle, and Ned Batchelder’s McCabe script. The module, pycodestyle, is a tool to check your Python code against some of the style conventions in PEP 8.

Flake8 is highly configurable, with options to ignore specific rules if not required by your development team. For example, in this demonstration, I intentionally ignored rule E501, which states that ‘line length should be limited to 72 characters.

- name: Lint with Flake8
run: |
pip install flake8
flake8 --ignore E501 dags --benchmark -v

Black

Known as ‘the uncompromising code formatter,’ Python code formatted using Black (referred to as Blackened code) looks the same regardless of the project you’re reading. Formatting becomes transparent, allowing teams to focus on the content instead. Black makes code review faster by producing the smallest diffs possible, assuming all developers are using black to format their code.

The Airflow DAGs in this GitHub repository are automatically formatted with black using a pre-commit Git Hooks before being committed and pushed to GitHub. The test confirms black code compliance.

- name: Confirm Black code compliance (psf/black)
run: |
pip install pytest-black
pytest dags --black -v

Pytest

The pytest framework describes itself as a mature, fully-featured Python testing tool that helps you write better programs. The Pytest framework makes it easy to write small tests yet scales to support complex functional testing for applications and libraries.

The GitHub Action in the GitHub project, test_dags.yml, calls the tests.py file, also contained in the project.

- name: Test with Pytest
run: |
pip install pytest
cd tests || exit
pytest tests.py -v

The tests.py file contains several pytest unit tests. The tests are based on my project requirements; your tests will vary. These tests confirm that all DAGs:

  1. Do not contain DAG Import Errors (test catches 75% of my errors);
  2. Follow specific file naming conventions;
  3. Include a description and an owner other than ‘airflow’;
  4. Contain required project tags;
  5. Do not send emails (my projects use SNS or Slack for notifications);
  6. Do not retry more than three times;
import os
import sys
import pytest
from airflow.models import DagBag
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags"))
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags/utilities"))
# Airflow variables called from DAGs under test are stubbed out
os.environ["AIRFLOW_VAR_DATA_LAKE_BUCKET"] = "test_bucket"
os.environ["AIRFLOW_VAR_ATHENA_QUERY_RESULTS"] = "SELECT 1;"
os.environ["AIRFLOW_VAR_SNS_TOPIC"] = "test_topic"
os.environ["AIRFLOW_VAR_REDSHIFT_UNLOAD_IAM_ROLE"] = "test_role_1"
os.environ["AIRFLOW_VAR_GLUE_CRAWLER_IAM_ROLE"] = "test_role_2"
@pytest.fixture(params=["../dags/"])
def dag_bag(request):
return DagBag(dag_folder=request.param, include_examples=False)
def test_no_import_errors(dag_bag):
assert not dag_bag.import_errors
def test_requires_tags(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.tags
def test_requires_specific_tag(dag_bag):
for dag_id, dag in dag_bag.dags.items():
try:
assert dag.tags.index("data lake demo") >= 0
except ValueError:
assert dag.tags.index("redshift demo") >= 0
def test_desc_len_greater_than_fifteen(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.description) > 15
def test_owner_len_greater_than_five(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.owner) > 5
def test_owner_not_airflow(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag.owner) != "airflow"
def test_no_emails_on_retry(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_retry"]
def test_no_emails_on_failure(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_failure"]
def test_three_or_less_retries(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.default_args["retries"] <= 3
def test_dag_id_contains_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).find("__") != -1
def test_dag_id_requires_specific_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).startswith("data_lake__") \
or str.lower(dag_id).startswith("redshift_demo__")

If you are building custom Airflow Operators, additional unit, functional, and integration tests are recommended.

Fork and Pull

We can improve on the practice of pushing directly to Trunk by implementing one of two collaborative development models, recommended by GitHub:

  1. The Shared repository model: uses ‘topic’ branches, which are reviewed, approved, and merged into the main branch.
  2. Fork and pull model: a repo is forked, changes are made, a pull request is created, the request is reviewed, and if approved, merged into the main branch.

In the fork and pull model, we create a fork of the DAG repository where we make our changes. We then commit and push those changes back to the forked repository. When ready, we create a pull request. If the pull request is approved and passes all the tests, it is manually or automatically merged into the main branch. DAGs are then synced to S3 and, eventually, to MWAA. I usually prefer to trigger merges manually once all tests have passed.

The fork and pull model greatly reduces the chance that bad code is merged to the main branch before passing all tests.

Errors are caught early in the fork and pull model prior to merging code changes

Syncing DAGs to S3

The second GitHub Action in the GitHub project, sync_dags.yml, is triggered when the previous Action, test_dags.yml, completes successfully, or in the case of the folk and pull method, the merge to the main branch is successful.

name: Sync DAGs

on:
workflow_run:
workflows:
- 'Test DAGs'
types:
- completed
pull_request:
types:
- closed

jobs:
deploy:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@master
- uses: jakejarvis/s3-sync-action@master
env:
AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: 'us-east-1'
SOURCE_DIR: 'dags'
DEST_DIR: 'dags'

The GitHub Action, sync_dags.yml, requires three GitHub encrypted secrets, created in advance and associated with the GitHub repository. According to GitHub, secrets are encrypted environment variables you create in an organization, repository, or repository environment. Encrypted secrets allow you to store sensitive information, such as access tokens, in your repository. The secrets that you create are available to use in GitHub Actions workflows.

Encrypted repository secrets used by GitHub Action to sync with Amazon S3

The DAGs are synced to Amazon S3 and, eventually, automatically synced to MWAA.

GitHub Action syncs DAGs to Amazon S3 if tests are successful

Local Testing and Git Hooks

To further improve your CI/CD workflows, you should consider using Git Hooks. Using Git Hooks, we can ensure code is tested locally before committing and pushing changes to GitHub. Testing locally allows us to fail-faster, catching errors during development instead of once code is pushed to GitHub.

Errors are caught even early using Git Hooks

According to the documentation, Git has a way to fire off custom scripts when certain important actions occur. There are two types of hooks: client-side and server-side. Client-side hooks are triggered by operations such as committing and merging, while server-side hooks run on network operations such as receiving pushed commits.

You can use these hooks for all sorts of reasons. I often use a client-side pre-commit hook to format DAGs using black. Using a client-side pre-push Git Hook, we will ensure that tests are run before pushing the DAGs to GitHub. According to Git, The pre-push hook runs when the git push command is executed after the remote refs have been updated but before any objects have been transferred. You can use it to validate a set of ref updates before a push occurs. A non-zero exit code will abort the push. The test could instead be run as part of the pre-commit hook if they are not too time-consuming.

To use the pre-push hook, create the following file within the local repository, .git/hooks/pre-push:

#!/bin/sh
# do nothing if there are no commits to push
if [ -z "$(git log @{u}..)" ]; then
exit 0
fi
sh ./run_tests_locally.sh

Then, run the following chmod command to make the hook executable:

chmod 755 .git/hooks/pre-push

The the pre-push hook runs the shell script, run_tests_locally.sh. The script executes nearly identical tests, locally, as the GitHub Action, test_dags.yml, does remotely on GitHub:

#!/bin/sh
echo "Starting Flake8 test..."
flake8 --ignore E501 dags --benchmark || exit 1
echo "Starting Black test..."
python3 -m pytest --cache-clear
python3 -m pytest dags/ --black -v || exit 1
echo "Starting Pytest tests..."
cd tests || exit
python3 -m pytest tests.py -v || exit 1
echo "All tests completed successfully! 🥳"

Complete CI/CD workflow including running tests locally using a Git Hook (video only)

References

Here are some additional references for testing and deploying Airflow DAGs and the use of GitHub Actions:

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Video Demonstration: Lakehouse Automation on AWS with Apache Airflow

Programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow

Introduction

In the following video demonstration, we will learn how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow. Since we are on AWS, we will be using the fully-managed Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Using Airflow, we will COPY raw data into staging tables, then merge that staging data into a series of tables. We will then load incremental data into Redshift on a regular schedule. Next, we will join and aggregate data from several tables and UNLOAD the resulting dataset to an Amazon S3-based data lake. Lastly, we will catalog the data in S3 using AWS Glue and query with Amazon Athena.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL statements, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs included in the GitHub project are:

Demonstration DAGs as seen in MWAA Airflow UI

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Implementing an Effective and Repeatable Approach for Capturing Data Analytics System Requirements

Implement an effective, consistent, and repeatable strategy for documenting data analytics workflows and capturing system requirements

Audio version of this blog post

Introduction

Data analytics applications involve more than just analyzing data, particularly on advanced analytics projects. Much of the required work takes place upfront, in collecting, integrating, and preparing data and then developing, testing, and revising analytical models to ensure that they produce accurate results. In addition to data scientists and other data analysts, analytics teams often include data engineers, who create data pipelines and help prepare data sets for analysis.” — TechTarget

Successful consultants, project managers, and product owners take a well-defined and systematic approach to achieve desired outcomes. They use frameworks, templates, and proven patterns to accomplish their goals — consistently successful engagements, project outcomes, and product and service launches.

Given the complexity of modern data analytics workflows, the goal of this platform-agnostic discovery process is to lead an organization, department, team, or customer through an effective, efficient, consistent, and repeatable approach for capturing data analytics workflows and systems requirements. This process will result in a clear understanding of existing analytics workflows, current and desired future business and technical outcomes, and existing and anticipated future business and technical constraints.

If applicable, the discovery process serves as a foundation for architecting new data analytics workflows. Starting with a set of business and technical constraints, the approach facilitates the development of new data analytics workflows to achieve desired business and technical outcomes. Outcomes and constraints determine the requirements.

Analytics Workflow Stages

There are many patterns organizations use to delineate the stages of their analytics workflows. However, the granularity of the stages is not critical to this process as long as all major functionality is considered. This process utilizes six stages of a typical analytics workflow, from left to right: Generate, Collect, Prepare, Store, Analyze, and Deliver.

Visualization of the discovery process

The discovery process starts by working backward and from the outside in. First, the process identifies current and desired future outcomes (right side of the diagram above). Then, it identifies existing and anticipated future constraints (left-side of the diagram). Next, it identifies the inputs and the outputs for the existing and re-architected workflows. Finally, the process identifies the current and re-architected analytics workflows — collect, prepare, store, and analyze — the steps required to get from data sources to deliverables.

Collect, prepare, store, and analyze — the steps required to get from data sources to deliverables.

Specifically, the process identifies and documents the following:

  1. Current business and technical outcomes
  2. Desired future business and technical outcomes
  3. Existing business and technical constraints
  4. Anticipated future business and technical constraints
  5. Inputs required to achieve desired outcomes
  6. Outputs needed to achieve desired outcomes
  7. Data producers and consumers
  8. Existing analytics tools, procedures, and people
  9. Measures of success for analytics workflows

Outcomes

Capture desired business and technical outcomes (goals and objectives), for example:

  • Re-architect analytics workflows to modernize, reduce costs, increase speed, reduce complexity, and add capabilities
  • Move analytics workflows to the Cloud (review 6 R’s strategies)
  • Migrate from another Cloud or SaaS provider
  • Shift away from proprietary platforms to an open-source analytics stack
  • Migrate away from custom in-house analytics tools
  • Add DataOps — CI/CD automation to existing workflows
  • Integrated hybrid on-prem, multi-cloud, and SaaS-based analytics architectures
  • Develop new greenfield analytics workflows, products, and services
  • Standardization of analytics workflows
  • Develop machine learning models from the data
  • Provide key stakeholders with real-time business KPIs dashboards

Constraints

Identify the existing and potential future business and technical constraints that impact analytics workflows, for example:

  • Budget
  • Timeline
  • People, including lack of specific skills (need for training)
  • Internal and external regulatory, data governance, and data lineage requirements
  • SLAs and KPIs (measures of workflow effectiveness)
  • Current vendor, partner, Cloud-provider, and SaaS relationships
  • Licensing and contractual obligations
  • Must-keep aspects of current analytics workflows

Inputs

Capture sources of data that are required to produce the outputs, for example:

  • Batch sources such as databases and data feeds
  • Streaming sources such as IoT device telemetry, operational metrics, logs, clickstreams, and gaming stats
  • Database engines, including relational, key-value, document, in-memory, graph, time series, wide column, and ledger
  • Data warehouses
  • Data feeds, such as flat files from legacy or third-party systems
  • API endpoints
  • Public and licensed datasets

Use the 5 V’s of data, including Volume, Velocity, Variety, Veracity, and Value, to dive deep into each input. Capture existing and anticipated data access and usage patterns.

Outputs

Identify the deliverables required to meet the desired outcomes. For example, prepare and make data available for:

  • Further analysis
  • Business Intelligence (BI), visualizations, and dashboards
  • Machine Learning (ML) and Artificial Intelligence (AI)
  • Data exports, such as Excel or CSV-format flat files
  • Hosted datasets for external or internal consumption
  • Expose data via APIs

Analytics Workflows

Capture existing analytics workflows using the four stages of Collect, Prepare, Store, and Analyze as a way to organize this part of the process:

  • High- and low-level architecture, process flow, patterns, and external dependencies
  • Analytics tools, including hardware, commercial, custom, and OSS software, libraries, modules, source code
  • DataOps, CI/CD, DevOps, and Infrastructure-as-Code (IaC) automation
  • People, including Managed Service Providers (MSPs), roles and required skills
  • Partners, including consultants, vendors, SaaS-providers
  • Overall effectiveness and customer satisfaction with existing analytics workflows
  • Deficiencies with current workflows

Use checklists for each stage to ensure all potential features, functions, and capabilities of typical analytics workflows are considered and captured. For example, review a checklist of all possible ways data is typically collected to ensure nothing is missed.

Measures of Success

Identify how success is measured with existing analytics workflows as well as with new workflows, and by whom, for example:

  • Key Performance Indicators (KPIs)
  • Service Level Agreements (SLAs)
  • Customer Satisfaction Score (CSAT)
  • Net Promoter Score (NPS)
  • How measurements are determined, calculated, and weighted

Data Producers and Consumers

Capture the data producers and data consumers:

  • Data producers
  • Data consumers

Results

The output of the data analytics discovery process is a clear and concise document that captures all elements described herein. Additionally, the document contains any customer-supplied artifacts, such as architectural and process flow diagrams. The document is thoroughly reviewed for accuracy and completeness by the customer. This document serves as a record of current data analytics workflows and a basis for architecting new workflows.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , ,

Leave a comment

LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT

Introduction

According to the LoRa Alliance, Low-Power, Wide-Area Networks (LPWAN) are projected to support a major portion of the billions of devices forecasted for the Internet of Things (IoT). LoRaWAN is designed from the bottom up to optimize LPWANs for battery lifetime, capacity, range, and cost. LoRa and LoRaWAN permit long-range connectivity for the Internet of Things (IoT) devices in different types of industries. According to Wikipedia, LoRaWAN defines the communication protocol and system architecture for the network, while the LoRa physical layer enables the long-range communication link.

LoRa

Long Range (LoRa), the low-power wide-area network (LPWAN) protocol developed by Semtech, sits at layer 1, the physical layer, of the seven-layer OSI model (Open Systems Interconnection model) of computer networking. The physical layer defines the means of transmitting raw bits over a physical data link connecting network nodes. LoRa uses license-free sub-gigahertz radio frequency (RF) bands, including 433 MHz, 868 MHz (Europe), 915 MHz (Australia and North America), and 923 MHz (Asia). LoRa enables long-range transmissions with low power consumption.

LoRaWAN

LoRaWAN is a cloud-based medium access control (MAC) sublayer (layer 2) protocol but acts mainly as a network layer (layer 3) protocol for managing communication between LPWAN gateways and end-node devices as a routing protocol, maintained by the LoRa Alliance. The MAC sublayer and the logical link control (LLC) sublayer together make up layer 2, the data link layer, of the OSI model.

LoRaWAN is often cited as having greater than a 10-km-wide coverage area in rural locations. However, according to other sources, it is generally more limited. According to the Electronic Design article, 11 Myths About LoRaWAN, a typical LoRaWAN network range depends on numerous factors—indoor or outdoor gateways, the payload of the message, the antenna used, etc. On average, in an urban environment with an outdoor gateway, you can expect up to 2- to 3-km-wide coverage, while in the rural areas it can reach beyond 5 to 7 km. LoRa’s range depends on the “radio line-of-sight.” Radio waves in the 400 MHz to 900 MHz range may pass through some obstructions, depending on their composition, but will be absorbed or reflected otherwise. This means that the signal can potentially reach as far as the horizon, as long as there are no physical barriers to block it.

In the following hands-on post, we will explore the use of the LoRa and LoRaWAN protocols to transmit and receive sensor data, over a substantial distance, between an IoT device, containing a number of embedded sensors, and an IoT gateway.

python_script_running

Recommended Hardware

For this post, I have used the following hardware.

IoT Device with Embedded Sensors

I have used an Arduino single-board microcontroller as an IoT sensor, actually an array of sensors. The 3.3V AI-enabled Arduino Nano 33 BLE Sense board (Amazon: USD 36.00), released in August 2019, comes with the powerful nRF52840 processor from Nordic Semiconductors, a 32-bit ARM Cortex-M4 CPU running at 64 MHz, 1MB of CPU Flash Memory, 256KB of SRAM, and a NINA-B306 stand-alone Bluetooth 5 low energy (BLE) module.

IMG_7016

The Sense also contains an impressive array of embedded sensors:

  • 9-axis Inertial Sensor (LSM9DS1): 3D digital linear acceleration sensor, a 3D digital
    angular rate sensor, and a 3D digital magnetic sensor
  • Humidity and Temperature Sensor (HTS221): Capacitive digital sensor for relative humidity and temperature
  • Barometric Sensor (LPS22HB): MEMS nano pressure sensor: 260–1260 hectopascal (hPa) absolute digital output barometer
  • Microphone (MP34DT05): MEMS audio sensor omnidirectional digital microphone
  • Gesture, Proximity, Light Color, and Light Intensity Sensor (APDS9960): Advanced Gesture detection, Proximity detection, Digital Ambient Light Sense (ALS), and Color Sense (RGBC).

The Arduino Sense is an excellent, low-cost single-board microcontroller for learning about the collection and transmission of IoT sensor data.

IoT Gateway

An IoT Gateway, according to TechTarget, is a physical device or software program that serves as the connection point between the Cloud and controllers, sensors, and intelligent devices. All data moving to the Cloud, or vice versa goes through the gateway, which can be either a dedicated hardware appliance or software program.

lora_network

I have used an a third-generation Raspberry Pi 3 Model B+ single-board computer (SBC), to serve as an IoT Gateway. This Raspberry Pi model features a 1.4GHz Cortex-A53 (ARMv8) 64-bit quad-core processor System on a Chip (SoC), 1GB LPDDR2 SDRAM, dual-band wireless LAN, Bluetooth 4.2 BLE, and Gigabit Ethernet (Amazon: USD 42.99).

To follow along with the post, you could substitute the Raspberry Pi for any Linux-based machine to run the included sample Python script.

IMG_7025

LoRa Transceiver Modules

To transmit the IoT sensor data between the IoT device, containing the embedded sensors, and the IoT gateway, I have used the REYAX RYLR896 LoRa transceiver module (Amazon: USD 19.50 x 2). The transceiver modules are commonly referred to as a universal asynchronous receiver-transmitter (UART). A UART is a computer hardware device for asynchronous serial communication in which the data format and transmission speeds are configurable.

IMG_7116

According to the manufacturer, REYAX, the RYLR896 contains the Semtech SX1276 long-range, low power transceiver. The RYLR896 module provides ultra-long range spread spectrum communication and high interference immunity while minimizing current consumption. This transceiver operates at both the 868 and 915 MHz frequency ranges. We will be transmitting at 915 MHz for North America, in this post. Each RYLR896 module contains a small, PCB integrated, helical antenna.

Security

The RYLR896 is capable of the AES 128-bit data encryption. Using the Advanced Encryption Standard (AES), we will encrypt the data sent from the IoT device to the IoT gateway, using a 32 hex digit password (128 bits / 4 bits/hex digit = 32 hex digits). Using hexadecimal notation, the password is limited to digits 0–9 and characters A–F.

USB to TTL Serial Converter Adapter

Optionally, to configure, test, and debug the RYLR896 LoRa transceiver module directly from your laptop, you can use a USB to TTL serial converter adapter. I currently use the IZOKEE FT232RL FTDI USB to TTL Serial Converter Adapter Module for 3.3V and 5V (Amazon: USD 9.49 for 2). The 3.3V RYLR896 module easily connects to the USB to TTL Serial Converter Adapter using the TXD/TX, RXD/RX, VDD/VCC, and GND pins. We use serial communication to send and receive data through TX (transmit) and RX (receive) pins. The wiring is shown below: VDD to VCC, GND to GND, TXD to RX, and RXD to TX.

UART Diagram

The FT232RL has support for baud rates up to 115,200 bps, which is the speed we will use to communicate with the RYLR896 module.

Arduino Sketch

For those not familiar with Arduino, a sketch is the name that Arduino uses for a program. It is the unit of code that is uploaded into non-volatile flash memory and runs on an Arduino board. The Arduino language is a set of C and C++ functions. All standard C and C++ constructs supported by the avr-g++ compiler should work in Arduino.

For this post, the sketch, lora_iot_demo.ino, contains all the code necessary to collect and securely transmit the environmental sensor data, including temperature, relative humidity, barometric pressure, RGB color, and ambient light intensity, using the LoRaWAN protocol. All code for this post, including the sketch, can be found on GitHub.


/*
Description: Transmits Arduino Nano 33 BLE Sense sensor telemetry over LoRaWAN,
including temperature, humidity, barometric pressure, and color,
using REYAX RYLR896 transceiver modules
http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
Author: Gary Stafford
*/
#include <Arduino_HTS221.h>
#include <Arduino_LPS22HB.h>
#include <Arduino_APDS9960.h>
const int UPDATE_FREQUENCY = 5000; // update frequency in ms
const float CALIBRATION_FACTOR = –4.0; // temperature calibration factor (Celsius)
const int ADDRESS = 116;
const int NETWORK_ID = 6;
const String PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF";
const String DELIMITER = "|";
void setup()
{
Serial.begin(9600);
Serial1.begin(115200); // default baud rate of module is 115200
delay(1000); // wait for LoRa module to be ready
// needs all need to be same for receiver and transmitter
Serial1.print((String)"AT+ADDRESS=" + ADDRESS + "\r\n");
delay(200);
Serial1.print((String)"AT+NETWORKID=" + NETWORK_ID + "\r\n");
delay(200);
Serial1.print("AT+CPIN=" + PASSWORD + "\r\n");
delay(200);
Serial1.print("AT+CPIN?\r\n"); // confirm password is set
if (!HTS.begin())
{ // initialize HTS221 sensor
Serial.println("Failed to initialize humidity temperature sensor!");
while (1);
}
if (!BARO.begin())
{ // initialize LPS22HB sensor
Serial.println("Failed to initialize pressure sensor!");
while (1);
}
// avoid bad readings to start bug
// https://forum.arduino.cc/index.php?topic=660360.0
BARO.readPressure();
delay(1000);
if (!APDS.begin())
{ // initialize APDS9960 sensor
Serial.println("Failed to initialize color sensor!");
while (1);
}
}
void loop()
{
updateReadings();
delay(UPDATE_FREQUENCY);
}
void updateReadings()
{
float temperature = getTemperature(CALIBRATION_FACTOR);
float humidity = getHumidity();
float pressure = getPressure();
int colors[4];
getColor(colors);
String payload = buildPayload(temperature, humidity, pressure, colors);
// Serial.println("Payload: " + payload); // display the payload for debugging
Serial1.print(payload); // send the payload over LoRaWAN WiFi
displayResults(temperature, humidity, pressure, colors); // display the results for debugging
}
float getTemperature(float calibration)
{
return HTS.readTemperature() + calibration;
}
float getHumidity()
{
return HTS.readHumidity();
}
float getPressure()
{
return BARO.readPressure();
}
void getColor(int c[])
{
// check if a color reading is available
while (!APDS.colorAvailable())
{
delay(5);
}
int r, g, b, a;
APDS.readColor(r, g, b, a);
c[0] = r;
c[1] = g;
c[2] = b;
c[3] = a;
}
void displayResults(float t, float h, float p, int c[])
{
Serial.print("Temperature: ");
Serial.println(t);
Serial.print("Humidity: ");
Serial.println(h);
Serial.print("Pressure: ");
Serial.println(p);
Serial.print("Color (r, g, b, a): ");
Serial.print(c[0]);
Serial.print(", ");
Serial.print(c[1]);
Serial.print(", ");
Serial.print(c[2]);
Serial.print(", ");
Serial.println(c[3]);
Serial.println("———-");
}
String buildPayload(float t, float h, float p, int c[])
{
String readings = "";
readings += t;
readings += DELIMITER;
readings += h;
readings += DELIMITER;
readings += p;
readings += DELIMITER;
readings += c[0];
readings += DELIMITER;
readings += c[1];
readings += DELIMITER;
readings += c[2];
readings += DELIMITER;
readings += c[3];
String payload = "";
payload += "AT+SEND=";
payload += ADDRESS;
payload += ",";
payload += readings.length();
payload += ",";
payload += readings;
payload += "\r\n";
return payload;
}

AT Commands

Communications with the RYLR896’s long-range modem is done using AT commands. AT commands are instructions used to control a modem. AT is the abbreviation of ATtention. Every command line starts with “AT”. That is why modem commands are called AT commands, according to Developer’s Home. A complete list of AT commands can be downloaded as a PDF from the RYLR896 product page.

To efficiently transmit the environmental sensor data from the IoT sensor to the IoT gateway, the sketch concatenates the sensor values together in a single string. The string will be incorporated into AT command to send the data to the RYLR896 LoRa transceiver module. To make it easier to parse the sensor data on the IoT gateway, we will delimit the sensor values with a pipe (|), as opposed to a comma. The maximum length of the payload (sensor data) is 240 bytes.

Below, we see an example of an AT command used to send the sensor data from the IoT sensor and the corresponding unencrypted data received by the IoT gateway. Both strings contain the LoRa transmitter Address ID, payload length, and the payload. The data received by the IoT gateway also contains the Received signal strength indicator (RSSI), and Signal-to-noise ratio (SNR).

Message Diagram

Configure, Test, and Debug

As discussed earlier, to configure, test, and debug the RYLR896 LoRa transceiver modules without the use of the IoT gateway, you can use a USB to TTL serial converter adapter. The sketch is loaded on the Arduino Sense (the IoT device) and actively transmits data through one of the RYLR896 modules (shown below right). The other RYLR896 module is connected to your laptop’s USB port, via the USB to TTL serial converter adapter (shown below left). Using a terminal and the screen command, or the Arduino desktop application’s Serial Terminal, we can receive the sensor data from the Arduino Sense.

IMG_7027

Using a terminal on your laptop, we first need to locate the correct virtual console (aka virtual terminal). On Linux or Mac, the virtual consoles are represented by device special files, such as /dev/tty1, /dev/tty2, and so forth. To find the virtual console for the USB to TTL serial converter adapter plugged into the laptop, use the following command.

ls -alh /dev/tty.*

We should see a virtual console with a name similar to /dev/tty.usbserial-.

... /dev/tty.Bluetooth-Incoming-Port
... /dev/tty.GarysBoseQC35II-SPPDev
... /dev/tty.a483e767cbac-Bluetooth-
... /dev/tty.usbserial-A50285BI

To connect to the RYLR896 module via the USB to TTL serial converter adapter, using the virtual terminal, we use the screen command and connect at a baud rate of 115,200 bps.

screen /dev/tty.usbserial-A50285BI 115200

If everything is configured and working correctly, we should see data being transmitted from the Arduino Sense and received by the local machine, at five second intervals. Each line of unencrypted data transmitted will look similar to the following, +RCV=116,25,22.18|41.57|99.74|2343|1190|543|4011,-34,47. In the example below, the AES 128-bit data encryption is not enabled on the Arduino, yet. With encryption turned on the sensor data (the payload) would appear garbled.

scree_mac_uart

Even easier than the screen command, we can also use the Arduino desktop application’s Serial Terminal, as shown in the following short screen recording. Select the correct Port (virtual console) from the Tools menu and open the Serial Terminal. Since the transmitted data should be secured using AES 128-bit data encryption, we need to send an AT command (AT+CPIN) containing the transceiver module’s common password, to correctly decrypt the data on the receiving device (e.g., AT+CPIN=92A0ECEC9000DA0DCF0CAAB0ABA2E0EF).

Receiving Data on IoT Gateway

The Raspberry Pi will act as an IoT gateway, receiving the environmental sensor data from the IoT device, the Arduino. The Raspberry Pi will run a Python script, rasppi_lora_receiver.py, which will receive and decrypt the data payload, parse the sensor values, and display the values in the terminal. The script uses the pyserial, the Python Serial Port Extension. This Python module encapsulates the access for the serial port.


import logging
import time
from argparse import ArgumentParser
from datetime import datetime
import serial
from colr import color as colr
# LoRaWAN IoT Sensor Demo
# Using REYAX RYLR896 transceiver modules
# Author: Gary Stafford
# Requirements: python3 -m pip install –user -r requirements.txt
# To Run: python3 ./rasppi_lora_receiver.py –tty /dev/ttyAMA0 –baud-rate 115200
# constants
ADDRESS = 116
NETWORK_ID = 6
PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF"
def main():
logging.basicConfig(filename='output.log', filemode='w', level=logging.DEBUG)
args = get_args() # get args
payload = ""
print("Connecting to REYAX RYLR896 transceiver module…")
serial_conn = serial.Serial(
port=args.tty,
baudrate=int(args.baud_rate),
timeout=5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
if serial_conn.isOpen():
set_lora_config(serial_conn)
check_lora_config(serial_conn)
while True:
serial_payload = serial_conn.readline() # read data from serial port
if len(serial_payload) > 0:
try:
payload = serial_payload.decode(encoding="utf-8")
except UnicodeDecodeError: # receiving corrupt data?
logging.error("UnicodeDecodeError: {}".format(serial_payload))
payload = payload[:2]
try:
data = parse_payload(payload)
print("\n———-")
print("Timestamp: {}".format(datetime.now()))
print("Payload: {}".format(payload))
print("Sensor Data: {}".format(data))
display_temperature(data[0])
display_humidity(data[1])
display_pressure(data[2])
display_color(data[3], data[4], data[5], data[6])
except IndexError:
logging.error("IndexError: {}".format(payload))
except ValueError:
logging.error("ValueError: {}".format(payload))
# time.sleep(2) # transmission frequency set on IoT device
def eight_bit_color(value):
return int(round(value / (4097 / 255), 0))
def celsius_to_fahrenheit(value):
return (value * 1.8) + 32
def display_color(r, g, b, a):
print("12-bit Color values (r,g,b,a): {},{},{},{}".format(r, g, b, a))
r = eight_bit_color(r)
g = eight_bit_color(g)
b = eight_bit_color(b)
a = eight_bit_color(a) # ambient light intensity
print(" 8-bit Color values (r,g,b,a): {},{},{},{}".format(r, g, b, a))
print("RGB Color")
print(colr("\t\t", fore=(127, 127, 127), back=(r, g, b)))
print("Light Intensity")
print(colr("\t\t", fore=(127, 127, 127), back=(a, a, a)))
def display_pressure(value):
print("Barometric Pressure: {} kPa".format(round(value, 2)))
def display_humidity(value):
print("Humidity: {}%".format(round(value, 2)))
def display_temperature(value):
temperature = celsius_to_fahrenheit(value)
print("Temperature: {}°F".format(round(temperature, 2)))
def get_args():
arg_parser = ArgumentParser(description="BLE IoT Sensor Demo")
arg_parser.add_argument("–tty", required=True, help="serial tty", default="/dev/ttyAMA0")
arg_parser.add_argument("–baud-rate", required=True, help="serial baud rate", default=1152000)
args = arg_parser.parse_args()
return args
def parse_payload(payload):
# input: +RCV=116,29,23.94|37.71|99.89|16|38|53|80,-61,56
# output: [23.94, 37.71, 99.89, 16.0, 38.0, 53.0, 80.0]
payload = payload.split(",")
payload = payload[2].split("|")
payload = [float(i) for i in payload]
return payload
def set_lora_config(serial_conn):
# configures the REYAX RYLR896 transceiver module
serial_conn.write(str.encode("AT+ADDRESS=" + str(ADDRESS) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("Address set?", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+NETWORKID=" + str(NETWORK_ID) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("Network Id set?", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+CPIN=" + PASSWORD + "\r\n"))
time.sleep(1)
serial_payload = (serial_conn.readline())[:2]
print("AES-128 password set?", serial_payload.decode(encoding="utf-8"))
def check_lora_config(serial_conn):
serial_conn.write(str.encode("AT?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("Module responding?", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+ADDRESS?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("Address:", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+NETWORKID?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("Network id:", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+IPR?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("UART baud rate:", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+BAND?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("RF frequency", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+CRFOP?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("RF output power", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+MODE?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("Work mode", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+PARAMETER?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("RF parameters", serial_payload.decode(encoding="utf-8"))
serial_conn.write(str.encode("AT+CPIN?\r\n"))
serial_payload = (serial_conn.readline())[:2]
print("AES128 password of the network",
serial_payload.decode(encoding="utf-8"))
if __name__ == "__main__":
main()

Prior to running the Python script, we can test and debug the connection from the Arduino Sense to the Raspberry Pi using a general application such as Minicom. Minicom is a text-based modem control and terminal emulator program. To install Minicom on the Raspberry Pi, use the following command.

sudo apt-get install minicom

To run Minicom or the Python script, we will need to know the virtual console of the serial connection (Serial1 in the script) used to communicate with the RYLR896 module, wired to the Raspberry Pi. This can found using the following command.

dmesg | grep -E --color 'serial|tty'

Search for a line, similar to the last line, shown below. Note the name of the virtual console, in my case, ttyAMA0.

[    0.000000] Kernel command line: coherent_pool=1M bcm2708_fb.fbwidth=656 bcm2708_fb.fbheight=416 bcm2708_fb.fbswap=1 vc_mem.mem_base=0x1ec00000 vc_mem.mem_size=0x20000000  dwc_otg.lpm_enable=0 console=tty1 root=PARTUUID=509d1565-02 rootfstype=ext4 elevator=deadline fsck.repair=yes rootwait quiet splash plymouth.ignore-serial-consoles
[    0.000637] console [tty1] enabled
[    0.863147] uart-pl011 20201000.serial: cts_event_workaround enabled
[    0.863289] 20201000.serial: ttyAMA0 at MMIO 0x20201000 (irq = 81, base_baud = 0) is a PL011 rev2

To view the data received from the Arduino Sense, using Minicom, use the following command, substituting the virtual console value, found above.

minicom -b 115200 -o -D /dev/ttyAMA0

If successful, we should see output similar to the lower right terminal window. Data is being transmitted by the Arduino Sense and being received by the Raspberry Pi, via LoRaWAN. In the below example, the AES 128-bit data encryption is not enabled on the Arduino, yet. With encryption turned on the sensor data (the payload) would appear garbled.

lora_tx_rx

IoT Gateway Python Script

To run the Python script on the Raspberry Pi, use the following command, substituting the name of the virtual console (e.g., /dev/ttyAMA0).

python3 ./rasppi_lora_receiver.py \
  --tty /dev/ttyAMA0 --baud-rate 115200

The script starts by configuring the RYLR896 and outputting that configuration to the terminal. If successful, we should see the following informational output.

Connecting to REYAX RYLR896 transceiver module...

Address set? +OK
Network Id set? +OK
AES-128 password set? +OK
Module responding? +OK

Address: +ADDRESS=116
Firmware version: +VER=RYLR89C_V1.2.7
Network Id: +NETWORKID=6
UART baud rate: +IPR=115200
RF frequency +BAND=915000000
RF output power +CRFOP=15
Work mode +MODE=0
RF parameters +PARAMETER=12,7,1,4
AES-128 password of the network +CPIN=92A0ECEC9000DA0DCF0CAAB0ABA2E0EF

Once configured, the script will receive the data from the Arduino Sense, decrypt the data, parse the sensor values, and format and display the values within the terminal.

python_script_running

The following screen recording shows a parallel view of both the Arduino Serial Monitor (upper right window) and the Raspberry Pi’s terminal output (lower right window). The Raspberry Pi (receiver) receives data from the Arduino (transmitter). The Raspberry Pi successfully reads, decrypts, interprets, and displays the sensor data, including displaying color swatches for the RGB and light intensity sensor readings.

Conclusion

In this post, we explored the use of the LoRa and LoRaWAN protocols to transmit environmental sensor data from an IoT device to an IoT gateway. Given its low energy consumption, long-distance transmission capabilities, and well-developed protocols, LoRaWAN is an ideal long-range wireless protocol for IoT devices.

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

2 Comments

BLE and GATT for IoT: Getting Started with Bluetooth Low Energy and the Generic Attribute Profile Specification for IoT

Introduction

According to Wikipedia, Bluetooth is a wireless technology standard used for exchanging data between fixed and mobile devices over short distances. Bluetooth Low Energy (Bluetooth LE or BLE) is a wireless personal area network (WPAN) technology designed and marketed by the Bluetooth Special Interest Group (Bluetooth SIG). According to the Bluetooth SIG, BLE is designed for very low power operation. BLE supports data rates from 125 Kb/s to 2 Mb/s, with multiple power levels from 1 milliwatt (mW) to 100 mW. Several key factors influence the effective range of a reliable Bluetooth connection, which can vary from a kilometer down to less than a meter. The newer generation Bluetooth 5 provides a theoretical 4x range improvement over Bluetooth 4.2, from approximately 200 feet (60 meters) to 800 feet (240 meters).

Wikipedia currently lists 36 definitions of Bluetooth profiles defined and adopted by the Bluetooth SIG, including the Generic Attribute Profile (GATT) Specification. According to the Bluetooth SIG, GATT is built on top of the Attribute Protocol (ATT) and establishes common operations and a framework for the data transported and stored by the ATT. GATT provides profile discovery and description services for the BLE protocol. It defines how ATT attributes are grouped together into sets to form services.

Given its low energy consumption and well-developed profiles, such as GATT, BLE is an ideal short-range wireless protocol for Internet of Things (IoT) devices, when compared to competing protocols, such as ZigBee, Bluetooth classic, and Wi-Fi. In this post, we will explore the use of BLE and the GATT specification to transmit environmental sensor data from an IoT Sensor to an IoT Gateway.

IoT Sensor

In this post, we will use an Arduino single-board microcontroller to serve as an IoT sensor, actually an array of sensors. The 3.3V AI-enabled Arduino Nano 33 BLE Sense board, released in August 2019, comes with the powerful nRF52840 processor from Nordic Semiconductors, a 32-bit ARM Cortex-M4 CPU running at 64 MHz, 1MB of CPU Flash Memory, 256KB of SRAM, and a NINA-B306 stand-alone Bluetooth 5 low energy module.

IMG_7016

The Sense also contains an impressive array of embedded sensors:

  • 9-axis Inertial Sensor (LSM9DS1): 3D digital linear acceleration sensor, a 3D digital
    angular rate sensor, and a 3D digital magnetic sensor
  • Humidity and Temperature Sensor (HTS221): Capacitive digital sensor for relative humidity and temperature
  • Barometric Sensor (LPS22HB): MEMS nano pressure sensor: 260–1260 hectopascal (hPa) absolute digital output barometer
  • Microphone (MP34DT05): MEMS audio sensor omnidirectional digital microphone
  • Gesture, Proximity, Light Color, and Light Intensity Sensor (APDS9960): Advanced Gesture detection, Proximity detection, Digital Ambient Light Sense (ALS), and Color Sense (RGBC).

The Sense is an excellent, low-cost single-board microcontroller for learning about collecting and transmitting IoT sensor data.

IoT Gateway

An IoT Gateway, according to TechTarget, is a physical device or software program that serves as the connection point between the Cloud and controllers, sensors, and intelligent devices. All data moving to the Cloud, or vice versa goes through the gateway, which can be either a dedicated hardware appliance or software program.

ble_aws_iot_2

In this post, we will use a recent Raspberry Pi 3 Model B+ single-board computer (SBC), to serve as an IoT Gateway. This Raspberry Pi model features a 1.4GHz Cortex-A53 (ARMv8) 64-bit quad-core processor System on a Chip (SoC), 1GB LPDDR2 SDRAM, dual-band wireless LAN, Bluetooth 4.2 BLE, and Gigabit Ethernet. To follow along with the post, you could substitute the Raspberry Pi for any Linux-based machine to run the included sample Python script.

IMG_7025

The Arduino will transmit IoT sensor telemetry, over BLE, to the Raspberry Pi. The Raspberry Pi, using Wi-Fi or Ethernet, is then able to securely transmit the sensor telemetry data to the Cloud. In Bluetooth terminology, the Bluetooth Peripheral device (aka GATT Server), which is the Arduino, will transmit data to the Bluetooth Central device (aka GATT Client), which is the Raspberry Pi.

ble_aws_iot

Arduino Sketch

For those not familiar with Arduino, a sketch is the name that Arduino uses for a program. It is the unit of code that is uploaded into non-volatile flash memory and runs on an Arduino board. The Arduino language is a set of C/C++ functions. All standard C and C++ constructs supported by the avr-g++ compiler should work in Arduino.

For this post, the sketch, combo_sensor_ble.ino, contains all the code necessary to collect environmental sensor telemetry, including temperature, relative humidity, barometric pressure, and ambient light and RGB color. All code for this post, including the sketch, can be found on GitHub.

The sensor telemetry will be advertised by the Sense, over BLE, as a GATT Environmental Sensing Service (GATT Assigned Number 0x181A) with multiple GATT Characteristics. Each Characteristic represents a sensor reading and contains the most current sensor value(s), for example, Temperature (0x2A6E) or Humidity (0x2A6F).

Each GATT Characteristic defines how the data should be represented. To represent the data accurately, the sensor readings need to be modified. For example, using ArduinoHTS221 library, the temperature is captured with two decimal points of precision (e.g., 22.21 °C). However, the Temperature GATT Characteristic (0x2A6E) requires a signed 16-bit value (-32,768–32,767). To maintain precision, the captured value (e.g., 22.21 °C) is multiplied by 100 to convert it to an integer (e.g., 2221). The Raspberry Pi will then handle converting the value back to the original value with the correct precision.

The GATT specification has no current predefined Characteristic representing ambient light and RGB color. Therefore, I have created a custom Characteristic for the color values and assigned it a universally unique identifier (UUID).

According to the documentation, ambient light and RGB color are captured as 16-bit values (a range of 0–65,535). However, using the ArduinoAPDS9960 library, I have found the scale of the readings to be within a range of 0–4097. Without diving into the weeds, the maximum count (or saturation) value is variable. It can be calculated based upon the integration time and the size of the count register (e.g., 16-bits). The ADC integration time appears to be set to 10 ms in the library’s file, Arduino_APDS9960.cpp.

RGB values are typically represented as 8-bit color. We could convert the values to 8-bit before sending or handle it later on the Raspberry Pi IoT Gateway. For the sake of demonstration purposes versus data transfer efficiency, the sketch concatenates the 12-bit values together as a string (e.g., 4097,2811,1500,4097). The string will be converted from 12-bit to 8-bit on the Raspberry Pi (e.g., 255,175,93,255).


/*
Description: Transmits Arduino Nano 33 BLE Sense sensor readings over BLE,
including temperature, humidity, barometric pressure, and color,
using the Bluetooth Generic Attribute Profile (GATT) Specification
Author: Gary Stafford
Reference: Source code adapted from `Nano 33 BLE Sense Getting Started`
Adapted from Arduino BatteryMonitor example by Peter Milne
*/
/*
Generic Attribute Profile (GATT) Specifications
GATT Service: Environmental Sensing Service (ESS) Characteristics
Temperature
sint16 (decimalexponent -2)
Unit is in degrees Celsius with a resolution of 0.01 degrees Celsius
https://www.bluetooth.com/xml-viewer/?src=https://www.bluetooth.com/wp-content/uploads/Sitecore-Media-Library/Gatt/Xml/Characteristics/org.bluetooth.characteristic.temperature.xml
Humidity
uint16 (decimalexponent -2)
Unit is in percent with a resolution of 0.01 percent
https://www.bluetooth.com/xml-viewer/?src=https://www.bluetooth.com/wp-content/uploads/Sitecore-Media-Library/Gatt/Xml/Characteristics/org.bluetooth.characteristic.humidity.xml
Barometric Pressure
uint32 (decimalexponent -1)
Unit is in pascals with a resolution of 0.1 Pa
https://www.bluetooth.com/xml-viewer/?src=https://www.bluetooth.com/wp-content/uploads/Sitecore-Media-Library/Gatt/Xml/Characteristics/org.bluetooth.characteristic.pressure.xml
*/
#include <ArduinoBLE.h>
#include <Arduino_HTS221.h>
#include <Arduino_LPS22HB.h>
#include <Arduino_APDS9960.h>
const int UPDATE_FREQUENCY = 2000; // Update frequency in ms
const float CALIBRATION_FACTOR = –4.0; // Temperature calibration factor (Celsius)
int previousTemperature = 0;
unsigned int previousHumidity = 0;
unsigned int previousPressure = 0;
String previousColor = "";
int r, g, b, a;
long previousMillis = 0; // last time readings were checked, in ms
BLEService environmentService("181A"); // Standard Environmental Sensing service
BLEIntCharacteristic tempCharacteristic("2A6E", // Standard 16-bit Temperature characteristic
BLERead | BLENotify); // Remote clients can read and get updates
BLEUnsignedIntCharacteristic humidCharacteristic("2A6F", // Unsigned 16-bit Humidity characteristic
BLERead | BLENotify);
BLEUnsignedIntCharacteristic pressureCharacteristic("2A6D", // Unsigned 32-bit Pressure characteristic
BLERead | BLENotify); // Remote clients can read and get updates
BLECharacteristic colorCharacteristic("936b6a25-e503-4f7c-9349-bcc76c22b8c3", // Custom Characteristics
BLERead | BLENotify, 24); // 1234,5678,
BLEDescriptor colorLabelDescriptor("2901", "16-bit ints: r, g, b, a");
void setup() {
Serial.begin(9600); // Initialize serial communication
// while (!Serial); // only when connected to laptop
if (!HTS.begin()) { // Initialize HTS221 sensor
Serial.println("Failed to initialize humidity temperature sensor!");
while (1);
}
if (!BARO.begin()) { // Initialize LPS22HB sensor
Serial.println("Failed to initialize pressure sensor!");
while (1);
}
// Avoid bad readings to start bug
// https://forum.arduino.cc/index.php?topic=660360.0
BARO.readPressure();
delay(1000);
if (!APDS.begin()) { // Initialize APDS9960 sensor
Serial.println("Failed to initialize color sensor!");
while (1);
}
pinMode(LED_BUILTIN, OUTPUT); // Initialize the built-in LED pin
if (!BLE.begin()) { // Initialize NINA B306 BLE
Serial.println("starting BLE failed!");
while (1);
}
BLE.setLocalName("ArduinoNano33BLESense"); // Set name for connection
BLE.setAdvertisedService(environmentService); // Advertise environment service
environmentService.addCharacteristic(tempCharacteristic); // Add temperature characteristic
environmentService.addCharacteristic(humidCharacteristic); // Add humidity characteristic
environmentService.addCharacteristic(pressureCharacteristic); // Add pressure characteristic
environmentService.addCharacteristic(colorCharacteristic); // Add color characteristic
colorCharacteristic.addDescriptor(colorLabelDescriptor); // Add color characteristic descriptor
BLE.addService(environmentService); // Add environment service
tempCharacteristic.setValue(0); // Set initial temperature value
humidCharacteristic.setValue(0); // Set initial humidity value
pressureCharacteristic.setValue(0); // Set initial pressure value
colorCharacteristic.setValue(""); // Set initial color value
BLE.advertise(); // Start advertising
Serial.print("Peripheral device MAC: ");
Serial.println(BLE.address());
Serial.println("Waiting for connections…");
}
void loop() {
BLEDevice central = BLE.central(); // Wait for a BLE central to connect
// If central is connected to peripheral
if (central) {
Serial.print("Connected to central MAC: ");
Serial.println(central.address()); // Central's BT address:
digitalWrite(LED_BUILTIN, HIGH); // Turn on the LED to indicate the connection
while (central.connected()) {
long currentMillis = millis();
// After UPDATE_FREQUENCY ms have passed, check temperature & humidity
if (currentMillis – previousMillis >= UPDATE_FREQUENCY) {
previousMillis = currentMillis;
updateReadings();
}
}
digitalWrite(LED_BUILTIN, LOW); // When the central disconnects, turn off the LED
Serial.print("Disconnected from central MAC: ");
Serial.println(central.address());
}
}
int getTemperature(float calibration) {
// Get calibrated temperature as signed 16-bit int for BLE characteristic
return (int) (HTS.readTemperature() * 100) + (int) (calibration * 100);
}
unsigned int getHumidity() {
// Get humidity as unsigned 16-bit int for BLE characteristic
return (unsigned int) (HTS.readHumidity() * 100);
}
unsigned int getPressure() {
// Get humidity as unsigned 32-bit int for BLE characteristic
return (unsigned int) (BARO.readPressure() * 1000 * 10);
}
void getColor() {
// check if a color reading is available
while (!APDS.colorAvailable()) {
delay(5);
}
// Get color as (4) unsigned 16-bit ints
int tmp_r, tmp_g, tmp_b, tmp_a;
APDS.readColor(tmp_r, tmp_g, tmp_b, tmp_a);
r = tmp_r;
g = tmp_g;
b = tmp_b;
a = tmp_a;
}
void updateReadings() {
int temperature = getTemperature(CALIBRATION_FACTOR);
unsigned int humidity = getHumidity();
unsigned int pressure = getPressure();
getColor();
if (temperature != previousTemperature) { // If reading has changed
Serial.print("Temperature: ");
Serial.println(temperature);
tempCharacteristic.writeValue(temperature); // Update characteristic
previousTemperature = temperature; // Save value
}
if (humidity != previousHumidity) { // If reading has changed
Serial.print("Humidity: ");
Serial.println(humidity);
humidCharacteristic.writeValue(humidity);
previousHumidity = humidity;
}
if (pressure != previousPressure) { // If reading has changed
Serial.print("Pressure: ");
Serial.println(pressure);
pressureCharacteristic.writeValue(pressure);
previousPressure = pressure;
}
// Get color reading everytime
// e.g. "12345,45678,89012,23456"
String stringColor = "";
stringColor += r;
stringColor += ",";
stringColor += g;
stringColor += ",";
stringColor += b;
stringColor += ",";
stringColor += a;
if (stringColor != previousColor) { // If reading has changed
byte bytes[stringColor.length() + 1];
stringColor.getBytes(bytes, stringColor.length() + 1);
Serial.print("r, g, b, a: ");
Serial.println(stringColor);
colorCharacteristic.writeValue(bytes, sizeof(bytes));
previousColor = stringColor;
}
}

Previewing and Debugging BLE Device Services

Before looking at the code running on the Raspberry Pi, we can use any number of mobile applications to preview and debug the Environmental Sensing service running on the Arduino and being advertised over BLE. A commonly recommended application is Nordic Semiconductor’s nRF Connect for Mobile, available on Google Play. I have found the Android version works better at correctly interpreting and displaying GATT Characteristic values than the iOS version of the app.

Below, we see a scan of my local vicinity for BLE devices being advertised, using the Android version of the nRF Connect mobile application. Note the BLE device, ArduinoNano33BLESense (indicated in red). Also, note the media access control address (MAC address) of that BLE device, in my case, d1:aa:89:0c:ee:82. The MAC address will be required later on the IoT Gateway.

Screenshot_20200731-185352

Connecting to the device, we see three Services. The Environmental Sensing Service (indicated in red) contains the sensor readings.

Screenshot_20200731-185659

Drilling down into the Environmental Sensing Service (0x181A), we see the four expected Characteristics: Temperature (0x2A6E), Humidity (0x2A6F), Pressure (0x2A6D), and Unknown Characteristic (936b6a25-e503-4f7c-9349-bcc76c22b8c3). Since nRF Connect cannot recognize the color sensor reading as a registered GATT Characteristic (no GATT Assigned Number), it is displayed as an Unknown Characteristic. Whereas the temperature, humidity, and pressure values (indicated in red) are interpreted and displayed correctly, the color sensor reading is left as raw hexadecimal text (e.g., 30-2c-30-2c-30-2c-30-00 or 0,0,0,0).

Screenshot_20200731-185558

These results indicate everything is working as expected.

BLE Client Python Code

To act as the BLE Client (aka central device), the Raspberry Pi runs a Python script. The script, rasppi_ble_receiver.py, uses the bluepy Python module for interfacing with BLE devices through Bluez, on Linux.


import sys
import time
from argparse import ArgumentParser
from bluepy import btle # linux only (no mac)
from colr import color as colr
# BLE IoT Sensor Demo
# Author: Gary Stafford
# Reference: https://elinux.org/RPi_Bluetooth_LE
# Requirements: python3 -m pip install –user -r requirements.txt
# To Run: python3 ./rasppi_ble_receiver.py d1:aa:89:0c:ee:82 <- MAC address – change me!
def main():
# get args
args = get_args()
print("Connecting…")
nano_sense = btle.Peripheral(args.mac_address)
print("Discovering Services…")
_ = nano_sense.services
environmental_sensing_service = nano_sense.getServiceByUUID("181A")
print("Discovering Characteristics…")
_ = environmental_sensing_service.getCharacteristics()
while True:
print("\n")
read_temperature(environmental_sensing_service)
read_humidity(environmental_sensing_service)
read_pressure(environmental_sensing_service)
read_color(environmental_sensing_service)
# time.sleep(2) # transmission frequency set on IoT device
def byte_array_to_int(value):
# Raw data is hexstring of int values, as a series of bytes, in little endian byte order
# values are converted from bytes -> bytearray -> int
# e.g., b'\xb8\x08\x00\x00' -> bytearray(b'\xb8\x08\x00\x00') -> 2232
# print(f"{sys._getframe().f_code.co_name}: {value}")
value = bytearray(value)
value = int.from_bytes(value, byteorder="little")
return value
def split_color_str_to_array(value):
# e.g., b'2660,2059,1787,4097\x00' -> 2660,2059,1787,4097 ->
# [2660, 2059, 1787, 4097] -> 166.0,128.0,111.0,255.0
# print(f"{sys._getframe().f_code.co_name}: {value}")
# remove extra bit on end ('\x00')
value = value[0:1]
# split r, g, b, a values into array of 16-bit ints
values = list(map(int, value.split(",")))
# convert from 16-bit ints (2^16 or 0-65535) to 8-bit ints (2^8 or 0-255)
# values[:] = [int(v) % 256 for v in values]
# actual sensor is reading values are from 0 – 4097
print(f"12-bit Color values (r,g,b,a): {values}")
values[:] = [round(int(v) / (4097 / 255), 0) for v in values]
return values
def byte_array_to_char(value):
# e.g., b'2660,2058,1787,4097\x00' -> 2659,2058,1785,4097
value = value.decode("utf-8")
return value
def decimal_exponent_two(value):
# e.g., 2350 -> 23.5
return value / 100
def decimal_exponent_one(value):
# e.g., 988343 -> 98834.3
return value / 10
def pascals_to_kilopascals(value):
# 1 Kilopascal (kPa) is equal to 1000 pascals (Pa)
# to convert kPa to pascal, multiply the kPa value by 1000
# 98834.3 -> 98.8343
return value / 1000
def celsius_to_fahrenheit(value):
return (value * 1.8) + 32
def read_color(service):
color_char = service.getCharacteristics("936b6a25-e503-4f7c-9349-bcc76c22b8c3")[0]
color = color_char.read()
color = byte_array_to_char(color)
color = split_color_str_to_array(color)
print(f" 8-bit Color values (r,g,b,a): {color[0]},{color[1]},{color[2]},{color[3]}")
print("RGB Color")
print(colr('\t\t', fore=(127, 127, 127), back=(color[0], color[1], color[2])))
print("Light Intensity")
print(colr('\t\t', fore=(127, 127, 127), back=(color[3], color[3], color[3])))
def read_pressure(service):
pressure_char = service.getCharacteristics("2A6D")[0]
pressure = pressure_char.read()
pressure = byte_array_to_int(pressure)
pressure = decimal_exponent_one(pressure)
pressure = pascals_to_kilopascals(pressure)
print(f"Barometric Pressure: {round(pressure, 2)} kPa")
def read_humidity(service):
humidity_char = service.getCharacteristics("2A6F")[0]
humidity = humidity_char.read()
humidity = byte_array_to_int(humidity)
humidity = decimal_exponent_two(humidity)
print(f"Humidity: {round(humidity, 2)}%")
def read_temperature(service):
temperature_char = service.getCharacteristics("2A6E")[0]
temperature = temperature_char.read()
temperature = byte_array_to_int(temperature)
temperature = decimal_exponent_two(temperature)
temperature = celsius_to_fahrenheit(temperature)
print(f"Temperature: {round(temperature, 2)}°F")
def get_args():
arg_parser = ArgumentParser(description="BLE IoT Sensor Demo")
arg_parser.add_argument('mac_address', help="MAC address of device to connect")
args = arg_parser.parse_args()
return args
if __name__ == "__main__":
main()

python3 ./rasppi_ble_receiver.py d1:aa:89:0c:ee:82

Unlike the nRF Connect app, the bluepy Python module is not capable of correctly interpreting and displaying the GATT Characteristic values. Hence, the script takes the raw, incoming hexadecimal text from the Arduino and coerces it to the correct values. For example, a temperature reading must be transformed from bytes, b'\xb8\x08\x00\x00', to a byte array, bytearray(b'\xb8\x08\x00\x00'), then to an integer, 2232, then to a decimal, 22.32, and finally to the Fahrenheit scale, 72.18°F.

Sensor readings are retrieved from the BLE device every two seconds. In addition to displaying the numeric sensor readings, the Python script also displays a color swatch of the 8-bit RGB color, as well as a grayscale swatch representing the light intensity using the colr Python module.

ScreenShot2020-08-04at8.55.08AM

The following screen recording shows a parallel view of both the Arduino Serial Monitor and the Raspberry Pi’s terminal output. The Raspberry Pi (central device) connects to the Arduino (peripheral device) when the Python script is started. The Raspberry Pi successfully reads and interprets the telemetry data from the Environmental Sensing Service.

Conclusion

In this post, we explored the use of BLE and the GATT specification to transmit environmental sensor data from a peripheral device to a central device. Given its low energy consumption and well-developed profiles, such as GATT, Bluetooth Low Energy (BLE) is an ideal short-range wireless protocol for IoT devices.

This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , , ,

Leave a comment

Getting Started with IoT Analytics on AWS

Introduction

AWS defines AWS IoT as a set of managed services that enable ‘internet-connected devices to connect to the AWS Cloud and lets applications in the cloud interact with internet-connected devices.’ AWS IoT services span three categories: Device Software, Connectivity and Control, and Analytics. In this post, we will focus on AWS IoT Analytics, one of four services, which are part of the AWS IoT Analytics category. According to AWS, AWS IoT Analytics is a fully-managed IoT analytics service, designed specifically for IoT, which collects, pre-processes, enriches, stores, and analyzes IoT device data at scale.

Certainly, AWS IoT Analytics is not the only way to analyze the Internet of Things (IoT) or Industrial Internet of Things (IIoT) data on AWS. It is common to see Data Analyst teams using a more general AWS data analytics stack, composed of Amazon S3, Amazon Kinesis, AWS Glue, and Amazon Athena or Amazon Redshift and Redshift Spectrum, for analyzing IoT data. So then why choose AWS IoT Analytics over a more traditional AWS data analytics stack? According to AWS, IoT Analytics was purpose-built to manage the complexities of IoT and IIoT data on a petabyte-scale. According to AWS, IoT data frequently has significant gaps, corrupted messages, and false readings that must be cleaned up before analysis can occur. Additionally, IoT data must often be enriched and transformed to be meaningful. IoT Analytics can filter, transform, and enrich IoT data before storing it in a time-series data store for analysis.

In the following post, we will explore the use of AWS IoT Analytics to analyze environmental sensor data, in near real-time, from a series of IoT devices. To follow along with the post’s demonstration, there is an option to use sample data to simulate the IoT devices (see the ‘Simulating IoT Device Messages’ section of this post).

IoT Devices

In this post, we will explore IoT Analytics using IoT data generated from a series of custom-built environmental sensor arrays. Each breadboard-based sensor array is connected to a Raspberry Pi single-board computer (SBC), the popular, low cost, credit-card sized Linux computer. The IoT devices were purposely placed in physical locations that vary in temperature, humidity, and other environmental conditions.

rasppi

Each device includes the following sensors:

  1. MQ135 Air Quality Sensor Hazardous Gas Detection Sensor: CO, LPG, Smoke (link)
    (requires an MCP3008 – 8-Channel 10-Bit ADC w/ SPI Interface (link))
  2. DHT22/AM2302 Digital Temperature and Humidity Sensor (link)
  3. Onyehn IR Pyroelectric Infrared PIR Motion Sensor (link)
  4. Anmbest Light Intensity Detection Photosensitive Sensor (link)

rasppi_detail

AWS IoT Device SDK

Each Raspberry Pi device runs a custom Python script, sensor_collector_v2.py. The script uses the AWS IoT Device SDK for Python v2 to communicate with AWS. The script collects a total of seven different readings from the four sensors at a regular interval. Sensor readings include temperature, humidity, carbon monoxide (CO), liquid petroleum gas (LPG), smoke, light, and motion.

The script securely publishes the sensor readings, along with a device ID and timestamp, as a single message, to AWS using the ISO standard Message Queuing Telemetry Transport (MQTT) network protocol. Below is an example of an MQTT message payload, published by the collector script.


{
"data": {
"co": 0.006104480269226063,
"humidity": 55.099998474121094,
"light": true,
"lpg": 0.008895956948783413,
"motion": false,
"smoke": 0.023978358312270912,
"temp": 31.799999237060547
},
"device_id": "6e:81:c9:d4:9e:58",
"ts": 1594419195.292461
}

As shown below, using tcpdump on the IoT device, the MQTT message payloads generated by the script average approximately 275 bytes. The complete MQTT messages average around 300 bytes.

screen_shot_2020-07-15_at_1_56_21_pm

AWS IoT Core

Each Raspberry Pi is registered with AWS IoT Core. IoT Core allows users to quickly and securely connect devices to AWS. According to AWS, IoT Core can reliably scale to billions of devices and trillions of messages. Registered devices are referred to as things in AWS IoT Core. A thing is a representation of a specific device or logical entity. Information about a thing is stored in the registry as JSON data.

IoT Core provides a Device Gateway, which manages all active device connections. The Gateway currently supports MQTT, WebSockets, and HTTP 1.1 protocols. Behind the Message Gateway is a high-throughput pub/sub Message Broker, which securely transmits messages to and from all IoT devices and applications with low latency. Below, we see a typical AWS IoT Core architecture.

AWS_IoT_Diagram_01_Ingest_blog

At a message frequency of five seconds, the three Raspberry Pi devices publish a total of roughly 50,000 IoT messages per day to AWS IoT Core.

monitoring

AWS IoT Security

AWS IoT Core provides mutual authentication and encryption, ensuring all data is exchanged between AWS and the devices are secure by default. In the demo, all data is sent securely using Transport Layer Security (TLS) 1.2 with X.509 digital certificates on port 443. Authorization of the device to access any resource on AWS is controlled by individual AWS IoT Core Policies, similar to AWS IAM Policies. Below, we see an example of an X.509 certificate, assigned to a registered device.

thing_cert

AWS IoT Core Rules

Once an MQTT message is received from an IoT device (a thing), we use AWS IoT Rules to send message data to an AWS IoT Analytics Channel. Rules give your devices the ability to interact with AWS services. Rules are written in standard Structured Query Language (SQL). Rules are analyzed, and Actions are performed based on the MQTT topic stream. Below, we see an example rule that forwards our messages to IoT Analytics, in addition to AWS IoT Events and Amazon Kinesis Data Firehose.

rule

Simulating IoT Device Messages

Building and configuring multiple Raspberry Pi-based sensor arrays, and registering the devices with AWS IoT Core would require a lot of work just for this post. Therefore, I have provided everything you need to simulate the three IoT devices, on GitHub. Use the following command to git clone a local copy of the project.


git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/aws-iot-analytics-demo.git

view raw

iot_github.sh

hosted with ❤ by GitHub

AWS CloudFormation

Use the CloudFormation template, iot-analytics.yaml, to create an IoT Analytics stack containing (17) resources, including the following.

  • (3) AWS IoT Things
  • (1) AWS IoT Core Topic Rule
  • (1) AWS IoT Analytics Channel, Pipeline, Data store, and Data set
  • (1) AWS Lambda and Lambda Permission
  • (1) Amazon S3 Bucket
  • (1) Amazon SageMaker Notebook Instance
  • (5) AWS IAM Roles

Please be aware of the costs involved with the AWS resources used in the CloudFormation template before continuing. To build the AWS CloudFormation stack, run the following AWS CLI command.


aws cloudformation create-stack \
–stack-name iot-analytics-demo \
–template-body file://cloudformation/iot-analytics.yaml \
–parameters ParameterKey=ProjectName,ParameterValue=iot-analytics-demo \
ParameterKey=IoTTopicName,ParameterValue=iot-device-data \
–capabilities CAPABILITY_NAMED_IAM

view raw

iot_cfn.sh

hosted with ❤ by GitHub

Below, we see a successful deployment of the IoT Analytics Demo CloudFormation Stack.

cfn_stack

Publishing Sample Messages

Once the CloudFormation stack is created successfully, use an included Python script, send_sample_messages.py, to send sample IoT data to an AWS IoT Topic, from your local machine. The script will use your AWS identity and credentials, instead of an actual IoT device registered with IoT Core. The IoT data will be intercepted by an IoT Topic Rule and redirected, using a Topic Rule Action, to the IoT Analytics Channel.

First, we will ensure the IoT stack is running correctly on AWS by sending a few test messages. Go to the AWS IoT Core Test tab. Subscribe to the iot-device-data topic.

screen_shot_2020-07-13_at_2_06_32_pm

Then, run the following command using the smaller data file, raw_data_small.json.


cd sample_data/
time python3 ./send_sample_messages.py \
-f raw_data_small.json -t iot-device-data

If successful, you should see the five messages appear in the Test tab, shown above. Example output from the script is shown below.

screen_shot_2020-07-15_at_10.30.58_pm

Then, run the second command using the larger data file, raw_data_large.json, containing 9,995 messages (a few hours worth of data). The command will take approximately 12 minutes to complete.


time python3 ./send_sample_messages.py \
-f raw_data_large.json -t iot-device-data

Once the second command completes successfully, your IoT Analytics Channel should contain 10,000 unique messages. There is an optional extra-large data file containing approximately 50,000 IoT messages (24 hours of IoT messages).

AWS IoT Analytics

AWS IoT Analytics is composed of five primary components: Channels, Pipelines, Data stores, Data sets, and Notebooks. These components enable you to collect, prepare, store, analyze, and visualize your IoT data.

iot_analytics

Below, we see a typical AWS IoT Analytics architecture. IoT messages are pulled from AWS IoT Core, thought a Rule Action. Amazon QuickSight provides business intelligence, visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting.

AWS_IoT_Diagram_02_IoT_Analytics_blog

IoT Analytics Channel

An AWS IoT Analytics Channel pulls messages or data into IoT Analytics from other AWS sources, such as Amazon S3, Amazon Kinesis, or Amazon IoT Core. Channels store data for IoT Analytics Pipelines. Both Channels and Data store support storing data in your own Amazon S3 bucket or in an IoT Analytics service-managed S3 bucket. In the demonstration, we are using a service managed S3 bucket.

When creating a Channel, you also decide how long to retain the data. For the demonstration, we have set the data retention period for 14 days. Channels are generally not used for long term storage of data. Typically, you would only retain data in the Channel for the time period you need to analyze. For long term storage of IoT message data, I recommend using an AWS IoT Core Rule to send a copy of the raw IoT data to Amazon S3, using a service such as Amazon Kinesis Data Firehose.

screen_shot_2020-07-13_at_3_03_09_pm

IoT Analytics Pipeline

An AWS IoT Analytics Pipeline consumes messages from one or more Channels. Pipelines transform, filter, and enrich the messages before storing them in IoT Analytics Data stores. A Pipeline is composed of an array of activities. Logically, you must specify both a Channel (source) and a Datastore (destination) activity. Optionally, you may choose as many as 23 additional activities in the pipelineActivities array.

In our demonstration’s Pipeline, iot_analytics_pipeline, we have specified five additional activities, including DeviceRegistryEnrich, Filter, Math, Lambda, and SelectAttributes. There are two additional Activity types we did not choose, RemoveAttributes and AddAttributes.

screen_shot_2020-07-14_at_3_11_01_pm

The demonstration’s Pipeline created by CloudFormation starts with messages from the demonstration’s Channel, iot_analytics_channel, similar to the following.


{
"co": 0.004782974313835918,
"device_id": "ae:c4:1d:34:1c:7b",
"device": "iot-device-01",
"humidity": 68.81000305175781,
"light": true,
"lpg": 0.007456714657976871,
"msg_received": "2020-07-13T19:44:58.690+0000",
"motion": false,
"smoke": 0.019858593777432054,
"temp": 19.200000762939453,
"ts": 1594496359.235107
}

The demonstration’s Pipeline transforms the messages through a series of Pipeline Activities and then stores the resulting message in the demonstration’s Data store, iot_analytics_data_store. The resulting messages appear similar to the following.


{
"co": 0.0048,
"device": "iot-device-01",
"humidity": 68.81,
"light": true,
"lpg": 0.0075,
"metadata": "{defaultclientid=iot-device-01, thingname=iot-device-01, thingid=5de1c2af-14b4-49b5-b20b-b25cf251b01a, thingarn=arn:aws:iot:us-east-1:864887685992:thing/iot-device-01, thingtypename=null, attributes={installed=1594665292, latitude=37.4133144, longitude=-122.1513069}, version=2, billinggroupname=null}",
"msg_received": "2020-07-13T19:44:58.690+0000",
"motion": false,
"smoke": 0.0199,
"temp": 66.56,
"ts": 1594496359.235107
}

In our demonstration, transformations to the messages include dropping the device_id attribute and converting the temp attribute value to Fahrenheit. In addition, the Lambda Activity rounds down the temp, humidity, co, lpg, and smoke attribute values to between 2–4 decimal places of precision.

screen_shot_2020-07-14_at_4_38_33_pm

The demonstration’s Pipeline also enriches the message with the metadata attribute, containing metadata from the IoT device’s AWS IoT Core Registry. The metadata includes additional information about the device that generated the message, including custom attributes we input, such as location (longitude and latitude) and the device’s installation date.

screen_shot_2020-07-14_at_3_15_27_pm

A significant feature of Pipelines is the ability to reprocess messages. If you make a change to the Pipeline, which often happens during the data preparation stage, you can reprocess any or all messages in the associated Channel, and overwrite the messages in the Data set.

screen_shot_2020-07-14_at_4_55_24_pm

IoT Analytics Data store

An AWS IoT Analytics Data store stores prepared data from an AWS IoT Analytics Pipeline, in a fully-managed database. Both Channels and Data store support storing data in your own Amazon S3 bucket or in an IoT Analytics managed S3 bucket. In the demonstration, we are using a service-managed S3 bucket to store messages in our Data store.

screen_shot_2020-07-13_at_3_03_22_pm

IoT Analytics Data set

An AWS IoT Analytics Data set automatically provides regular, up-to-date insights for data analysts by querying a Data store using standard SQL. Regular updates are provided through the use of a cron expression. For the demonstration, we are using a 15-minute interval.

Below, we see the sample messages in the Result preview pane of the Data set. These are the five test messages we sent to check the stack. Note the SQL query used to obtain the messages, which queries the Data store. The Data store, as you will recall, contains the transformed messages from the Pipeline.

screen_shot_2020-07-14_at_4_46_03_pm

IoT Analytics Data sets also support sending content results, which are materialized views of your IoT Analytics data, to an Amazon S3 bucket.

screen_shot_2020-07-14_at_3_32_18_pm

The CloudFormation stack contains an encrypted Amazon S3 Bucket. This bucket receives a copy of the messages from the IoT Analytics Data set whenever the scheduled update is run by the cron expression.

screen_shot_2020-07-14_at_3_38_40_pm

IoT Analytics Notebook

An AWS IoT Analytics Notebook allows users to perform statistical analysis and machine learning on IoT Analytics Data sets using Jupyter Notebooks. The IoT Analytics Notebook service includes a set of notebook templates that contain AWS-authored machine learning models and visualizations. Notebooks Instances can be linked to a GitHub or other source code repository. Notebooks created with IoT Analytics Notebook can also be accessed directly through Amazon SageMaker. For the demonstration, the Notebooks Instance is associated with the project’s GitHub repository.

screen_shot_2020-07-14_at_10_18_12_pm

The repository contains a sample Jupyter Notebook, IoT_Analytics_Demo_Notebook.ipynb, based on the conda_python3 kernel. This preinstalled environment includes the default Anaconda installation and Python 3. The Notebook uses pandas, matplotlib, and plotly to manipulate and visualize the sample IoT messages we published earlier and stored in the Data set.

screen_shot_2020-07-14_at_10_00_39_pm

screen_shot_2020-07-14_at_10_44_54_pm

screen_shot_2020-07-14_at_9_34_27_pm

screen_shot_2020-07-14_at_9_53_27_pm

Notebooks can be modified, and the changes pushed back to GitHub. You could easily fork a copy of my GitHub repository and modify the CloudFormation template, to include your own GitHub repository URL.

screen_shot_2020-07-14_at_10_14_51_pm

Amazon QuickSight

Amazon QuickSight provides business intelligence (BI) and visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting. We can use Amazon QuickSight to visualize the IoT message data, stored in the IoT Analytics Data set.

Amazon QuickSight has both a Standard and an Enterprise Edition. AWS provides a detailed product comparison of each edition. For the post, I am demonstrating the Enterprise Edition, which includes additional features, such as ML Insights, hourly refreshes of SPICE (super-fast, parallel, in-memory, calculation engine), and theme customization. Please be aware of the costs of Amazon QuickSight if you choose to follow along with this part of the demo. Amazon QuickSight is enabled or configured with the demonstration’s CloudFormation template.

QuickSight Data Sets

Amazon QuickSight has a wide variety of data source options for creating Amazon QuickSight Data sets, including the ones shown below. Do not confuse Amazon QuickSight Data sets with IoT Analytics Data sets. These are two different, yet similar, constructs.

screen_shot_2020-07-15_at_8.50.26_am

For the demonstration, we will create an Amazon QuickSight Data set that will use our IoT Analytics Data set as a data source.

screen_shot_2020-07-15_at_9_06_50_am

Amazon QuickSight gives you the ability to modify QuickSight Data sets. For the demonstration, I have added two additional fields, converting the boolean light and motion values of true and false to binary values of 0 or 1. I have also deselected two fields that I do not need for QuickSight Analysis.

screen_shot_2020-07-15_at_8.53.02_am

QuickSight provides a wide variety of functions, enabling us to perform dynamic calculations on the field values. Below, we see a new calculated field, light_dec, containing the original light field’s Boolean values converted to binary values. I am using a if...else formula to change the field’s value depending on the value in another field.

screen_shot_2020-07-15_at_8.53.16_am

QuickSight Analysis

Using the QuickSight Data set, built from the IoT Analytics Data set as a data source, we create a QuickSight Analysis. The QuickSight Analysis user interface is shown below. An Analysis is primarily a collection of Visuals (Visual types). QuickSight provides a number of Visual types. Each visual is associated with a Data set. Data for the QuickSight Analysis or for each individual visual can be filtered. For the demo, I have created a QuickSight Analysis, including several typical QuickSight Visuals.

screen_shot_2020-07-10_at_12_21_32_pm

QuickSight Dashboards

To share a QuickSight Analysis, we can create a QuickSight Dashboard. Below, we see a few views of the QuickSight Analysis, shown above, as a Dashboard. A viewer of the Dashboard cannot edit the visuals, though they can apply filtering and interactively drill-down into data in the Visuals.

screen_shot_2020-07-15_at_11_38_17_am

screen_shot_2020-07-15_at_11_40_17_am

screen_shot_2020-07-15_at_11_43_23_am

Geospatial Data

Amazon QuickSight understands geospatial data. If you recall, in the IoT Analytics Pipeline, we enriched the messages in the metadata from the device registry. The metadata attributes contained the device’s longitude and latitude. Quicksight will recognize those fields as geographic fields. In our QuickSight Analysis, we can visualize the geospatial data, using the geospatial chart (map) Visual type.

screen_shot_2020-07-10_at_12_22_12_pm

QuickSight Mobile App

Amazon QuickSight offers free iOS and Android versions of the Amazon QuickSight Mobile App. The mobile application makes it easy for registered QuickSight end-users to securely connect to QuickSight Dashboards, using their mobile devices. Below, we see two views of the same Dashboard, shown in the iOS version of the Amazon QuickSight Mobile App.

mobile_quicksight

Amazon QuickSight ML Insights

According to Amazon, ML Insights leverages AWS’s machine learning (ML) and natural language capabilities to gain deeper insights from data. QuickSight’s ML-powered Anomaly Detection continuously analyze data to discover anomalies and variations inside of the aggregates, giving you the insights to act when business changes occur. QuickSight’s ML-powered Forecasting can be used to accurately predict your business metrics, and perform interactive what-if analysis with point-and-click simplicity. QuickSight’s built-in algorithms make it easy for anyone to use ML that learns from your data patterns to provide you with accurate predictions based on historical trends.

Below, we see the ML Insights tab in the demonstration’s QuickSight Analysis. Individually detected anomalies can be added to the QuickSight Analysis, similar to Visuals, and configured to tune the detection parameters.

screen_shot_2020-07-15_at_12_04_21_pm

Below, we see an example of humidity anomalies across all devices, based on their Anomaly Score and are higher or lower with a minimum delta of five percent.

screen_shot_2020-07-15_at_12_24_12_pm

Cleaning Up

You are charged hourly for the SageMaker Notebook Instance. Do not forget to delete your CloudFormation stack when you are done with the demonstration. Note the Amazon S3 bucket will not be deleted; you must do this manually.


aws cloudformation delete-stack \
–stack-name iot-analytics-demo

Conclusion

In this post, we demonstrated how to use AWS IoT Analytics to analyze and visualize streaming messages from multiple IoT devices, in near real-time. Combined with other AWS IoT analytics services, such as AWS IoT SiteWise, AWS IoT Events, and AWS IoT Things Graph, you can create a robust, full-featured IoT Analytics platform, capable of handling millions of industrial, commercial, and residential IoT devices, generating petabytes of data.

This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , , ,

Leave a comment

Architecting a Successful SaaS: Understanding Cloud-based SaaS Models

Originally published on the AWS APN Blog.

Introduction

You’re a startup with an idea for a revolutionary new software product. You quickly build a beta version and deploy it to the cloud. After a successful social-marketing campaign and concerted sales effort, dozens of customers subscribe to your SaaS-based product. You’re ecstatic…until you realize you never architected your product for this level of success. You were so busy coding, raising capital, marketing, and selling, you never planned how you would scale your Sass product. How you would ensure your customer’s security, as well as your own. How you would meet the product reliability, compliance, and performance you promised. And, how you would monitor and meter your customer’s usage, no matter how fast you or they grew.

I’ve often heard budding entrepreneurs jest, if only success was their biggest problem. Certainly, success won’t be their biggest problem. For many, the problems come afterward, when they disappoint their customers by failing to deliver the quality product they promised. Or worse, damaging their customer’s reputation (and their own) by losing or exposing sensitive data. As the old saying goes, ‘you never get a second chance to make a first impression.’ Customer trust is hard-earned and easily lost. Properly architecting a scalable and secure SaaS-based product is just as important as feature development and sales. No one wants to fail on Day 1—you worked too hard to get there.

Architecting a Successful SaaS

In this series of posts, Architecting a Successful SaaS, we will explore how to properly plan and architect a SaaS product offering, designed for hosting on the cloud. We will start by answering basic questions, like, what is SaaS, what are the alternatives to SaaS for software distribution, and what are the most common SaaS product models. We will then examine different high-level SaaS architectures, review tenant isolation strategies, and explore how SaaS vendors securely interact with their customer’s cloud accounts. Finally, we will discuss how SaaS providers can meet established best practices, like those from AWS SaaS Factory and the AWS Well-Architected Framework.

For this post, I have chosen many examples of cloud services from AWS and vendors from AWS Marketplace. However, the principals discussed may be applied to other leading cloud providers, SaaS products, and cloud-based software marketplaces. All information in this post is publicly available.

What is SaaS?

According to AWS Marketplace, ‘SaaS [Software as a Service] is a delivery model for software applications whereby the vendor hosts and operates the application over the Internet. Customers pay for using the software without owning the underlying infrastructure.’ Another definition from AWS, ‘SaaS is a licensing and delivery model whereby software is centrally managed and hosted by a provider and available to customers on a subscription basis.’

A SaaS product, like other forms of software, is produced by what is commonly referred to as an Independent Software Vendor (ISV). According to Wikipedia, an Independent Software Vendor ‘is an organization specializing in making and selling software, as opposed to hardware, designed for mass or niche markets. This is in contrast to in-house software, which is developed by the organization that will use it, or custom software, which is designed or adapted for a single, specific third party. Although ISV-provided software is consumed by end-users, it remains the property of the vendor.’

Although estimates vary greatly, according to The Software as a Service (SaaS) Global Market Report 2020, the global SaaS market was valued at about $134.44B in 2018 and is expected to grow to $220.21B at a compound annual growth rate (CAGR) of 13.1% through 2022. Statista predicts SaaS revenues will grow even faster, forecasting revenues of $266B by 2022, with continued strong positive growth to $346B by 2027.

Cloud-based Usage Models

Let’s start by reviewing the three most common ways that individuals, businesses, academic institutions, the public sector, and government consume services from cloud providers such as Amazon Web Services (AWS), Microsoft Azure, Google Cloud, and IBM Cloud (now includes Red Hat).

Indirect Consumer

Indirect consumers are customers who consume cloud-based SaaS products. Indirect users are often unlikely to know which cloud provider host’s the SaaS products to which they subscribe. Many SaaS products can import and export data, as well as integrate with other SaaS products. Many successful companies run their entire business in the cloud using a combination of SaaS products from multiple vendors.

SaaS-28

Examples

  • An advertising firm that uses Google G Suite for day-to-day communications and collaboration between its employees and clients.
  • A large automotive parts manufacturer that runs its business using the Workday cloud-based Enterprise Resource Management (ERP) suite.
  • A software security company that uses Zendesk for customer support. They also use the Slack integration for Zendesk to view, create, and take action on support tickets, using Slack channels.
  • A recruiting firm that uses Zoom Meetings & Chat to interview remote candidates. They also use the Zoom integration with Lever recruiting software, to schedule interviews.

Direct Consumer

Direct consumers are customers who use cloud-based Infrastructure as a Service (IaaS) and Platform as a Service (PaaS) services to build and run their software; the DIY (do it yourself) model. The software deployed in the customer’s account may be created by the customer or purchased from a third-party software vendor and deployed within the customer’s cloud account. Direct users may purchase IaaS and PaaS services from multiple cloud providers.

SaaS-18

Examples

Hybrid Consumers

Hybrid consumers are customers who use a combination of IaaS, PaaS, and SaaS services. Customers often connect multiple IaaS, PaaS, and SaaS services as part of larger enterprise software application platforms.

SaaS-27

Examples

  • A payroll company that hosts its proprietary payroll software product, using IaaS products like Amazon EC2 and Elastic Load Balancing. In addition, they use an integrated SaaS-based fraud detection product, like Cequence Security CQ botDefense, to ensure the safety and security of payroll customers.
  • An online gaming company that operates its applications using the fully-managed container-based PaaS service, Amazon ECS. To promote their gaming products, they use a SaaS-based marketing product, like Mailchimp Marketing CRM.

Cloud-based Software

Most cloud-based software is sold in one of two ways, Customer-deployed or SaaS. Below, we see a breakdown by the method of product delivery on AWS Marketplace. All items in the chart, except SaaS, represent Customer-deployed products. Serverless applications are available elsewhere on AWS and are not represented in the AWS Marketplace statistics.

DeliveryTypes
AWS Marketplace: All Products – Delivery Methods (February 2020)

Customer-deployed

An ISV who sells customer-deployed software products to consumers of cloud-based IaaS and PaaS services. Products are installed by the customer, Systems Integrator (SI), or the ISV into the customer’s cloud account. Customer-deployed products are reminiscent of traditional ‘boxed’ software.

Customers typically pay a reoccurring hourly, monthly, or annual subscription fee for the software product, commonly referred to as pay-as-you-go (PAYG). The subscription fee paid to the vendor is in addition to the fees charged to the customer by the cloud service provider for the underlying compute resources on which the customer-deployed product runs in the customer’s cloud account.

Some customer-deployed products may also require a software license. Software licenses are often purchased separately through other channels. Applying a license you already own to a newly purchased product is commonly referred to as bring your own license (BYOL). BYOL is common in larger enterprise customers, who may have entered into an Enterprise License Agreement (ELA) with the ISV.

PlanTypesCD
AWS Marketplace: Customer-deployed Product Subscription Types (February 2020)

Customer-deployed cloud-based software products can take a variety of forms. The most common deliverables include some combination of virtual machines (VMs) such as Amazon Machine Images (AMIs), Docker images, Amazon SageMaker models, or Infrastructure as Code such as AWS CloudFormationHashiCorp Terraform, or Helm Charts. Customers usually pull these deliverables from a vendor’s AWS account or other public or private source code or binary repositories. Below, we see the breakdown of customer-deployed products, by the method of delivery, on AWS Marketplace.

DeliveryTypesCD2
AWS Marketplace: Customer-deployed Product Delivery Methods (February 2020)

Although historically, AMIs have been the predominant method of customer-deployed software delivery, newer technologies, such as Docker images, serverless, SageMaker models, and AWS Data Exchange datasets will continue to grow in this segment. The AWS Serverless Application Repository (SAR), currently contains over 500 serverless applications, not reflected in this chart. AWS appears to be moving toward making it easier to sell serverless software applications in AWS Marketplace, according to one recent post.

Customer-deployed cloud-based software products may require a connection between the installed product and the ISV for product support, license verification, product upgrades, or security notifications.

SaaS-17

Examples

SaaS

An ISV who sells SaaS software products to customers. The SaaS product is deployed, managed, and sold by the ISV and hosted by a cloud provider, such as AWS. A SaaS product may or may not interact with a customer’s cloud account. SaaS products are similar to customer-deployed products with respect to their subscription-based fee structure. Subscriptions may be based on a unit of measure, often a period of time. Subscriptions may also be based on the number of users, requests, hosts, or the volume of data.

AWS Marketplace: SaaS Products - Delivery Methods (February 2020)
AWS Marketplace: SaaS Products – Pricing Plans (February 2020)

A significant difference between SaaS products and customer-deployed products is the lack of direct customer costs from the underlying cloud provider. The underlying costs are bundled into the subscription fee for the SaaS product.

Similar to Customer-deployed products, SaaS products target both consumers and businesses. SaaS products span a wide variety of consumer, business, industry-specific, and technical categories. AWS Marketplace offers products from vendors covering eight major categories and over 70 sub-categories.

SaaSCats
AWS Marketplace: SaaS Product Categories (February 2020)

SaaS Product Variants

I regularly work with a wide variety of cloud-based software vendors. In my experience, most cloud-based SaaS products fit into one of four categories, based on the primary way a customer interacts with the SaaS product:

  • Stand-alone: A SaaS product that has no interaction with the customer’s cloud account;
  • Data Access: A SaaS product that connects to the customer’s cloud account to only obtain data;
  • Augmentation: A SaaS product that connects to the customer’s cloud account, interacting with and augmenting the customer’s software;
  • Discrete Service: A variation of augmentation, a SaaS product that provides a discrete service or function as opposed to a more complete software product;

Stand-alone

A stand-alone SaaS product has no interaction with a customer’s cloud account. Customers of stand-alone SaaS products interact with the product through an interface provided by the SaaS vendor. Many stand-alone SaaS products can import and export customer data, as well as integrate with other cloud-based SaaS products. Stand-alone SaaS products may target consumers, known as Business-to-Consumer (B2C SaaS). They may also target businesses, known as Business-to-Business (B2B SaaS).

SaaS-29

Examples

Data Access

A SaaS product that connects to a customer’s data sources in their cloud account or on-prem. These SaaS products often fall into the categories of Big Data and Data Analytics, Machine Learning and Artificial Intelligence, and IoT (Internet of Things). Products in these categories work with large quantities of data. Given the sheer quantity of data or real-time nature of the data, importing or manually inputting data directly into the SaaS product, through the SaaS vendor’s user interface is unrealistic. Often, these SaaS products will cache some portion of the customer’s data to reduce customer’s data transfer costs.

Similar to the previous stand-alone SaaS products, customers of these SaaS products interact with the product thought a user interface provided by the SaaS vendor.

SaaS-14

Examples

  • Zepl provides an enterprise data science analytics platform, which enables data exploration, analysis, and collaboration. Zepl sells its Zepl Science and Analytics Platform SaaS product on AWS Marketplace. The Zepl product provides integration to many types of customer data sources including Snowflake, Amazon S3, Amazon Redshift, Amazon Athena, Google BigQuery, Apache Cassandra (Amazon MCS), and other SQL databases.
  • Sisense provides an enterprise-grade, cloud-native business intelligence and analytics platform, powered by AI. Sisense offers its Sisense Business Intelligence SaaS product on AWS Marketplace. This product lets customers prepare and analyze disparate big datasets using Sisense’s Data Connectors. The wide array of connectors provide connectivity to dozens of different cloud-based and on-prem data sources.
  • Databricks provides a unified data analytics platform, designed for massive-scale data engineering and collaborative data science. Databricks offers its Databricks Unified Analytics Platform SaaS product on AWS Marketplace. Databricks allows customers to interact with data across many different data sources, data storage types, and data types, including batch and streaming.
  • DataRobot provides an enterprise AI platform, which enables global enterprises to collaboratively harness the power of AI. DataRobot sells its DataRobot Automated Machine Learning for AWS SaaS product on AWS Marketplace. Using connectors, like Skyvia’s OData connector, customers can connect their data sources to the DataRobot product.

Augmentation

A SaaS product that interacts with, or augments a customer’s application, which is managed by the customer in their own cloud account. These SaaS products often maintain secure, loosely-coupled, unidirectional or bidirectional connections between the vendor’s SaaS product and the customer’s account. Vendors on AWS often use services like Amazon EventBridgeAWS PrivateLink, VPC Peering, Amazon S3, Amazon Kinesis, Amazon SQS, and Amazon SNS to interact with customer’s accounts and exchange data. Often, these SaaS products fall within the categories of Security, Logging and Monitoring, and DevOps.

Customers of these types of SaaS products generally interact with their own software, as well as the SaaS product thought an interface provided by the SaaS vendor.

SaaS-24

Examples

  • CloudCheckr provides solutions that enable clients to optimize costs, security, and compliance on leading cloud providers. CloudCheckr sells its Cloud Management Platform SaaS product on AWS Marketplace. CloudCheckr uses an AWS IAM cross-account role and Amazon S3 to exchange data between the customer’s account and their SaaS product.
  • Splunk provides the leading software platform for real-time Operational Intelligence. Splunk sells its Splunk Cloud SaaS product on AWS Marketplace. Splunk Cloud enables rapid application troubleshooting, ensures security and compliance, and provides monitoring of business-critical services in real-time. According to their documentation, Splunk uses a combination of AWS S3, Amazon SQS, and Amazon SNS services to transfer AWS CloudTrail logs from the customer’s accounts to Splunk Cloud.

Discrete Service

Discrete SaaS products are a variation of SaaS augmentation products. Discrete SaaS products provide specific, distinct functionality to a customer’s software application. These products may be an API, data source, or machine learning model, which is often accessed completely through a vendor’s API. The products have a limited or no visual user interface. These SaaS products are sometimes referred to as a ‘Service as a Service’. Discrete SaaS products often fall into the categories of Artificial Intelligence and Machine Learning, Financial Services, Reference Data, and Authentication and Authorization.

SaaS-30

Examples

AWS Data Exchange

There is a new category of products on AWS Marketplace. Released in November 2019, AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. According to AWS, Data Exchange vendors can publish new data, as well as automatically publish revisions to existing data and notify subscribers. Once subscribed to a data product, customers can use the AWS Data Exchange API to load data into Amazon S3 and then analyze it with a wide variety of AWS analytics and machine learning services.

SaaS-25

Data Exchange seems to best fit the description of a customer-deployed product. However, given the nature of the vendor-subscriber relationship, where data may be regularly exchanged—revised and published by the vendor and pulled by the subscriber—I would consider Data Exchange a cloud-based hybrid product.

AWS Data Exchange products are available on AWS Marketplace. The list of qualified data providers is growing and includes Reuters, Foursquare, TransUnion, Pitney Bowes, IMDb, Epsilon, ADP, Dun & Bradstreet, and others. As illustrated below, data sets are available in the categories of financial services, public sector, healthcare, media, telecommunications, and more.

DataTypes
AWS Marketplace: Data Exchange Product Categories (February 2020)

Examples

Conclusion

In this first post, we’ve become familiar with the common ways in which customers consume cloud-based IaaS, PaaS, and SaaS products and services. We also explored the different ways in which ISVs sell their software products to customers. In future posts, we will examine different high-level SaaS architectures, review tenant isolation strategies, and explore how SaaS vendors securely interact with their customer’s cloud accounts. Finally, we will discuss how SaaS providers can meet best-practices, like those from AWS SaaS Factory and the AWS Well-Architected Framework.

References

Here are some great references to learn more about building and managing SaaS products on AWS.

This blog represents my own view points and not of my employer, Amazon Web Services.

, , , , , ,

Leave a comment