Archive for category SQL

Ten Ways to Leverage Generative AI for Development on AWS

Explore ten ways you can use Generative AI coding tools to accelerate development and increase your productivity on AWS

Licensed image: PopTika/Shutterstock.com
Licensed image: PopTika/Shutterstock.com

Generative AI coding tools are a new class of software development tools that leverage machine learning algorithms to assist developers in writing code. These tools use AI models trained on vast amounts of code to offer suggestions for completing code snippets, writing functions, and even entire blocks of code.

Quote generated by OpenAI ChatGPT

Introduction

Combining the latest Generative AI coding tools with a feature-rich and extensible IDE and your coding skills will accelerate development and increase your productivity. In this post, we will look at ten examples of how you can use Generative AI coding tools on AWS:

  1. Application Development: Code, unit tests, and documentation
  2. Infrastructure as Code (IaC): AWS CloudFormation, AWS CDK, Terraform, and Ansible
  3. AWS Lambda: Serverless, event-driven functions
  4. IAM Policies: AWS IAM policies and Amazon S3 bucket policies
  5. Structured Query Language (SQL): Amazon RDS, Amazon Redshift, Amazon Athena, and Amazon EMR
  6. Big Data: Apache Spark and Flink on Amazon EMR, AWS Glue, and Kinesis Data Analytics
  7. Configuration and Properties files: Amazon MSK, Amazon EMR, and Amazon OpenSearch
  8. Apache Airflow DAGs: Amazon MWAA
  9. Containerization: Kubernetes resources, Helm Charts, Dockerfiles for Amazon EKS
  10. Utility Scripts: PowerShell, Bash, Shell, and Python

Choosing a Generative AI Coding Tool

In my recent post, Accelerating Development with Generative AI-Powered Coding Tools, I reviewed six popular tools: ChatGPT, Copilot, CodeWhisperer, Tabnine, Bing, and ChatSonic.

For this post, we will use GitHub Copilot, powered by OpenAI Codex, a new AI system created by OpenAI. Copilot suggests code and entire functions in real-time, right from your IDE. Copilot is trained in all languages that appear in GitHub’s public repositories. GitHub points out that the quality of suggestions you receive may depend on the volume and diversity of training data for that language. Similar tools in this category are limited in the number of languages they support compared to Copilot.

GitHub Copilot, your AI pair programmer

Copilot is currently available as an extension for Visual Studio Code, Visual Studio, Neovim, and JetBrains suite of IDEs. The GitHub Copilot extension for Visual Studio Code (VS Code) already has 4.8 million downloads, and the GitHub Copilot Nightly extension, used for this post, has almost 280,000 downloads. I am also using the GitHub Copilot Labs extension in this post.

GitHub Copilot Nightly VS Code extensions used in this post

Ten Ways to Leverage Generative AI

Take a look at ten examples of how you can use Generative AI coding tools to increase your development productivity on AWS. All the code samples in this post can be found on GitHub.

1. Application Development

According to GitHub, trained on billions of lines of code, GitHub Copilot turns natural language prompts into coding suggestions across dozens of languages. These features make Copilot ideal for developing applications, writing unit tests, and authoring documentation. You can use GitHub Copilot to assist with writing software applications in nearly any popular language, including Go.

Code generation of a Go application using GitHub Copilot

The final application, which uses the AWS SDK for Go to create an Amazon DynamoDB table, shown below, was formatted using the Go extension by Google and optimized using the ‘Readable,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

The final Go application ran in the VS Code terminal

Generating Unit Tests

Using JavaScript and TypeScript, you can take advantage of TestPilot to generate unit tests based on your existing code and documentation. TestPilot, part of GitHub Copilot Labs, uses GitHub Copilot’s AI technology.

Generating unit tests with GitHub TestPilot, part of Copilot

2. Infrastructure as Code (IaC)

Widespread Infrastructure as Code (IaC) tools include Pulumi, AWS CloudFormation, Azure ARM Templates, Google Deployment Manager, AWS Cloud Development Kit (AWS CDK), Microsoft Bicep, and Ansible. Many IaC tools, except AWS CDK, use JSON- or YAML-based domain-specific languages (DSLs).

AWS CloudFormation
AWS CloudFormation is an Infrastructure as Code (IaC) service that allows you to easily model, provision, and manage AWS and third-party resources. The CloudFormation template is a JSON or YAML formatted text file. You can use GitHub Copilot to assist with writing IaC, including AWS CloudFormation in either JSON or YAML.

Code generation of an AWS CloudFormation template using GitHub Copilot

You can use the YAML Language Support by Red Hat extension to write YAML in VS Code.

Final YAML-based AWS CloudFormation template using GitHub Copilot

VS Code has native JSON support with JSON Schema Store, which includes AWS CloudFormation. VS Code uses the CloudFormation schema for IntelliSense and flag schema errors in templates.

Final JSON-based AWS CloudFormation template using GitHub Copilot

HashiCorp Terraform

In addition to AWS CloudFormation, HashiCorp Terraform is an extremely popular IaC tool. According to HashiCorp, Terraform lets you define resources and infrastructure in human-readable, declarative configuration files and manages your infrastructure’s lifecycle. Using Terraform has several advantages over manually managing your infrastructure.

Terraform plugins called providers let Terraform interact with cloud platforms and other services via their application programming interfaces (APIs). You can use the AWS Provider to interact with the many resources supported by AWS.

Code generation of a HashiCorp Terraform file using GitHub Copilot

3. AWS Lambda

Lambda, according to AWS, is a serverless, event-driven compute service that lets you run code for virtually any application or backend service without provisioning or managing servers. You can trigger Lambda from over 200 AWS services and software as a service (SaaS) applications and only pay for what you use. AWS Lambda natively supports Java, Go, PowerShell, Node.js, C#, Python, and Ruby. AWS Lambda also provides a Runtime API allowing you to use additional programming languages to author your functions.

You can use GitHub Copilot to assist with writing AWS Lambda functions in any of the natively supported languages. You can further optimize the resulting Lambda code with GitHub’s Code Brushes.

Code generation of a Python-based AWS Lambda using GitHub Copilot

The final Python-based AWS Lambda, below, was formatted using the Black Formatter and Flake8 extensions and optimized using the ‘Readable,’ ‘Debug,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

Final AWS Python-based Lambda using GitHub Copilot and optimized with Code Brushes

You can easily convert the Python-based AWS Lambda to Java using GitHub Copilot Lab’s ability to translate code between languages. Install the GitHub Copilot Labs extension for VS Code to try out language translation.

Final AWS Java-based Lambda using GitHub Copilot converted from Python by Copilot

4. IAM Policies

AWS Identity and Access Management (AWS IAM) is a web service that helps you securely control access to AWS resources. According to AWS, you manage access in AWS by creating policies and attaching them to IAM identities (users, groups of users, or roles) or AWS resources. A policy is an object in AWS that defines its permissions when associated with an identity or resource. IAM policies are stored on AWS as JSON documents. You can use GitHub Copilot to assist in writing IAM Policies.

Code generation of an AWS IAM Policy using GitHub Copilot

The final AWS IAM Policy, below, was formatted using VS Code’s built-in JSON support.

Final AWS IAM Policy using GitHub Copilot

5. Structured Query Language (SQL)

SQL has many use cases on AWS, including Amazon Relational Database Service (RDS) for MySQL, PostgreSQL, MariaDB, Oracle, and SQL Server databases. SQL is also used with Amazon Aurora, Amazon Redshift, Amazon Athena, Apache Presto, Trino (PrestoSQL), and Apache Hive on Amazon EMR.

You can use IDEs like VS Code with its SQL dialect-specific language support and formatted extensions. You can further optimize the resulting SQL statements with GitHub’s Code Brushes.

Code generation of a PostgreSQL script using GitHub Copilot

The final PostgreSQL script, below, was formatted using the Sql Formatter extension and optimized using the ‘Readable’ and ‘Fix Bug’ GitHub Code Brushes.

PostgreSQL script generated with GitHub Copilot and optimized with Code Brushes

6. Big Data

Big Data, according to AWS, can be described in terms of data management challenges that — due to increasing volume, velocity, and variety of data — cannot be solved with traditional databases. AWS offers managed versions of Apache Spark, Apache Flink, Apache Zepplin, and Jupyter Notebooks on Amazon EMR, AWS Glue, and Amazon Kinesis Data Analytics (KDA).

Apache Spark
According to their website, Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters. Spark jobs can be written in various languages, including Python (PySpark), SQL, Scala, Java, and R. Apache Spark is available on a growing number of AWS services, including Amazon EMR and AWS Glue.

Code generation of a Python-based Apache Spark job using GitHub Copilot

The final Python-based Apache Spark job, below, was formatted using the Black Formatter extension and optimized using the ‘Readable,’ ‘Document,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

Final Python-based Apache Spark job using GitHub Copilot

7. Configuration and Properties Files

According to TechTarget, a configuration file (aka config) defines the parameters, options, settings, and preferences applied to operating systems, infrastructure devices, and applications. There are many examples of configuration and properties files on AWS, including Amazon MSK Connect (Kafka Connect Source/Sink Connectors), Amazon OpenSearch (Filebeat, Logstash), and Amazon EMR (Apache Log4j, Hive, and Spark).

Kafka Connect
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. AWS offers a fully-managed version of Kafka Connect: Amazon MSK Connect. You can use GitHub Copilot to write Kafka Connect Source and Sink Connectors with Kafka Connect and Amazon MSK Connect.

Code generation of a Kafka Connect Source Connector using GitHub Copilot

The final Kafka Connect Source Connector, below, was formatted using VS Code’s built-in JSON support. It incorporates the Debezium connector for MySQL, Avro file format, schema registry, and message transformation. Debezium is a popular open source distributed platform for performing change data capture (CDC) with Kafka Connect.

Final Kafka Connect Source Connector using GitHub Copilot

8. Apache Airflow DAGs

Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow. You can use GitHub Copilot to assist in writing DAGs for Apache Airflow, to be used with Amazon MWAA.

Code generation of an Airflow DAG using GitHub Copilot

The final Python-based Apache Spark job, below, was formatted using the Black Formatter extension. Unfortunately, based on my testing, code optimization with GitHub’s Code Brushes is impossible with Airflow DAGs.

Final Airflow DAG using GitHub Copilot

9. Containerization

According to Check Point Software, Containerization is a type of virtualization in which all the components of an application are bundled into a single container image and can be run in isolated user space on the same shared operating system. Containers are lightweight, portable, and highly conducive to automation. AWS describes containerization as a software deployment process that bundles an application’s code with all the files and libraries it needs to run on any infrastructure.

AWS has several container services, including Amazon Elastic Container Service (Amazon ECS), Amazon Elastic Kubernetes Service (Amazon EKS), Amazon Elastic Container Registry (Amazon ECR), and AWS Fargate. Several code-based resources can benefit from a Generative AI coding tool like GitHub Copilot, including Dockerfiles, Kubernetes resources, Helm Charts, Weaveworks Flux, and ArgoCD configuration.

Kubernetes

Kubernetes objects are represented in the Kubernetes API and expressed in YAML format. Below is a Kubernetes Deployment resource file, which creates a ReplicaSet to bring up multiple replicas of nginx Pods.

Code generation of Kubernetes resources using GitHub Copilot

The final Kubernetes resource file below contains Deployment and Service resources. In addition to GitHub Copilot, you can use Microsoft’s Kubernetes extension for VS Code to use IntelliSense and flag schema errors in the file.

Final Kubernetes resource file using GitHub Copilot

10. Utility Scripts

According to Bing AI — Search, utility scripts are small, simple snippets of code written as independent code files designed to perform a particular task. Utility scripts are commonly written in Bash, Shell, Python, Ruby, PowerShell, and PHP.

AWS utility scripts leverage the AWS Command Line Interface (AWS CLI) for Bash and Shell and AWS SDK for other programming languages. SDKs take the complexity out of coding by providing language-specific APIs for AWS services. For example, Boto3, AWS’s Python SDK, easily integrates your Python application, library, or script with AWS services, including Amazon S3, Amazon EC2, Amazon DynamoDB, and more.

Code generation of Python-based utility script using GitHub Copilot

An example of a Python script to calculate the total size of an Amazon S3 bucket, below, was inspired by 100daysofdevops/N-days-of-automation, a fantastic set of open source AWS-oriented automation scripts.

Final Python-based utility script using GitHub Copilot

Conclusion

In this post, you learned ten ways to leverage Generative AI coding tools like GitHub Copilot for development on AWS. You saw how combining the latest generation of Generative AI coding tools, a mature and extensible IDE, and your coding experience will accelerate development, increase productivity, and reduce cost.

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Exploring Popular Open-source Stream Processing Technologies: Part 2 of 2

A brief demonstration of Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot with Apache Superset

Introduction

According to TechTarget, “Stream processing is a data management technique that involves ingesting a continuous data stream to quickly analyze, filter, transform or enhance the data in real-time. Once processed, the data is passed off to an application, data store, or another stream processing engine.Confluent, a fully-managed Apache Kafka market leader, defines stream processing as “a software paradigm that ingests, processes, and manages continuous streams of data while they’re still in motion.

This two-part post series and forthcoming video explore four popular open-source software (OSS) stream processing projects: Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot.

This post uses the open-source projects, making it easier to follow along with the demonstration and keeping costs to a minimum. However, you could easily substitute the open-source projects for your preferred SaaS, CSP, or COSS service offerings.

Part Two

We will continue our exploration in part two of this two-part post, covering Apache Flink and Apache Pinot. In addition, we will incorporate Apache Superset into the demonstration to visualize the real-time results of our stream processing pipelines as a dashboard.

Demonstration #3: Apache Flink

In the third demonstration of four, we will examine Apache Flink. For this part of the post, we will also use the third of the three GitHub repository projects, flink-kafka-demo. The project contains a Flink application written in Java, which performs stream processing, incremental aggregation, and multi-stream joins.

High-level workflow for Apache Flink demonstration

New Streaming Stack

To get started, we need to replace the first streaming Docker Swarm stack, deployed in part one, with the second streaming Docker Swarm stack. The second stack contains Apache Kafka, Apache Zookeeper, Apache Flink, Apache Pinot, Apache Superset, UI for Apache Kafka, and Project Jupyter (JupyterLab).

https://programmaticponderings.wordpress.com/media/601efca17604c3a467a4200e93d7d3ff

The stack will take a few minutes to deploy fully. When complete, there should be ten containers running in the stack.

Viewing the Docker streaming stack’s ten containers

Flink Application

The Flink application has two entry classes. The first class, RunningTotals, performs an identical aggregation function as the previous KStreams demo.

public static void flinkKafkaPipeline(Properties prop) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// assumes PLAINTEXT authentication
KafkaSource<Purchase> source = KafkaSource.<Purchase>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PURCHASES_TOPIC"))
.setGroupId("flink_reduce_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new PurchaseDeserializationSchema())
.build();
DataStream<Purchase> purchases = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<RunningTotal> runningTotals = purchases
.flatMap((FlatMapFunction<Purchase, RunningTotal>) (purchase, out) -> out.collect(
new RunningTotal(
purchase.getTransactionTime(),
purchase.getProductId(),
1,
purchase.getQuantity(),
purchase.getTotalPurchase()
))
).returns(RunningTotal.class)
.keyBy(RunningTotal::getProductId)
.reduce((runningTotal1, runningTotal2) -> {
runningTotal2.setTransactions(runningTotal1.getTransactions() + runningTotal2.getTransactions());
runningTotal2.setQuantities(runningTotal1.getQuantities() + runningTotal2.getQuantities());
runningTotal2.setSales(runningTotal1.getSales().add(runningTotal2.getSales()));
return runningTotal2;
});
KafkaSink<RunningTotal> sink = KafkaSink.<RunningTotal>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(prop.getProperty("RUNNING_TOTALS_TOPIC"))
.setValueSerializationSchema(new RunningTotalSerializationSchema())
.build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
runningTotals.sinkTo(sink);
env.execute("Flink Running Totals Demo");
}

The second class, JoinStreams, joins the stream of data from the demo.purchases topic and the demo.products topic, processing and combining them, in real-time, into an enriched transaction and publishing the results to a new topic, demo.purchases.enriched.

public static void flinkKafkaPipeline(Properties prop) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// assumes PLAINTEXT authentication
KafkaSource<Product> productSource = KafkaSource.<Product>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PRODUCTS_TOPIC"))
.setGroupId("flink_join_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new ProductDeserializationSchema())
.build();
DataStream<Product> productsStream = env.fromSource(
productSource, WatermarkStrategy.noWatermarks(), "Kafka Products Source");
tableEnv.createTemporaryView("products", productsStream);
KafkaSource<Purchase> purchasesSource = KafkaSource.<Purchase>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PURCHASES_TOPIC"))
.setGroupId("flink_join_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new PurchaseDeserializationSchema())
.build();
DataStream<Purchase> purchasesStream = env.fromSource(
purchasesSource, WatermarkStrategy.noWatermarks(), "Kafka Purchases Source");
tableEnv.createTemporaryView("purchases", purchasesStream);
Table result =
tableEnv.sqlQuery(
"SELECT " +
"purchases.transactionTime, " +
"TO_TIMESTAMP(purchases.transactionTime), " +
"purchases.transactionId, " +
"purchases.productId, " +
"products.category, " +
"products.item, " +
"products.size, " +
"products.cogs, " +
"products.price, " +
"products.containsFruit, " +
"products.containsVeggies, " +
"products.containsNuts, " +
"products.containsCaffeine, " +
"purchases.price, " +
"purchases.quantity, " +
"purchases.isMember, " +
"purchases.memberDiscount, " +
"purchases.addSupplements, " +
"purchases.supplementPrice, " +
"purchases.totalPurchase " +
"FROM " +
"products " +
"JOIN purchases " +
"ON products.productId = purchases.productId"
);
DataStream<PurchaseEnriched> purchasesEnrichedTable = tableEnv.toDataStream(result,
PurchaseEnriched.class);
KafkaSink<PurchaseEnriched> sink = KafkaSink.<PurchaseEnriched>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(prop.getProperty("PURCHASES_ENRICHED_TOPIC"))
.setValueSerializationSchema(new PurchaseEnrichedSerializationSchema())
.build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
purchasesEnrichedTable.sinkTo(sink);
env.execute("Flink Streaming Join Demo");
}

The resulting enriched purchases messages look similar to the following:

{
"transaction_time": "2022-09-25 01:58:11.714838",
"transaction_id": "4068565608708439642",
"product_id": "CS08",
"product_category": "Classic Smoothies",
"product_name": "Rockin’ Raspberry",
"product_size": "24 oz.",
"product_cogs": 1.5,
"product_price": 4.99,
"contains_fruit": true,
"contains_veggies": false,
"contains_nuts": false,
"contains_caffeine": false,
"purchase_price": 4.99,
"purchase_quantity": 2,
"is_member": false,
"member_discount": 0,
"add_supplements": false,
"supplement_price": 0,
"total_purchase": 9.98
}
Sample enriched purchase message

Running the Flink Job

To run the Flink application, we must first compile it into an uber JAR.

We can copy the JAR into the Flink container or upload it through the Apache Flink Dashboard, a browser-based UI. For this demonstration, we will upload it through the Apache Flink Dashboard, accessible on port 8081.

The project’s build.gradle file has preset the Main class (Flink’s Entry class) to org.example.JoinStreams. Optionally, to run the Running Totals demo, we could change the build.gradle file and recompile, or simply change Flink’s Entry class to org.example.RunningTotals.

Uploading the JAR to Apache Flink

Before running the Flink job, restart the sales generator in the background (nohup python3 ./producer.py &) to generate a new stream of data. Then start the Flink job.

Apache Flink job running successfully

To confirm the Flink application is running, we can check the contents of the new demo.purchases.enriched topic using the Kafka CLI.

The new demo.purchases.enriched topic populated with messages from Apache Flink

Alternatively, you can use the UI for Apache Kafka, accessible on port 9080.

Viewing messages in the UI for Apache Kafka

Demonstration #4: Apache Pinot

In the fourth and final demonstration, we will explore Apache Pinot. First, we will query the unbounded data streams from Apache Kafka, generated by both the sales generator and the Apache Flink application, using SQL. Then, we build a real-time dashboard in Apache Superset, with Apache Pinot as our datasource.

Creating Tables

According to the Apache Pinot documentation, “a table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot).” There are three types of Pinot tables: Offline, Realtime, and Hybrid. For this demonstration, we will create three Realtime tables. Realtime tables ingest data from streams — in our case, Kafka — and build segments from the consumed data. Further, according to the documentation, “each table in Pinot is associated with a Schema. A schema defines what fields are present in the table along with the data types. The schema is stored in Zookeeper, along with the table configuration.

Below, we see the schema and config for one of the three Realtime tables, purchasesEnriched. Note how the columns are divided into three categories: Dimension, Metric, and DateTime.

{
"schemaName": "purchasesEnriched",
"dimensionFieldSpecs": [
{
"name": "transaction_id",
"dataType": "STRING"
},
{
"name": "product_id",
"dataType": "STRING"
},
{
"name": "product_category",
"dataType": "STRING"
},
{
"name": "product_name",
"dataType": "STRING"
},
{
"name": "product_size",
"dataType": "STRING"
},
{
"name": "product_cogs",
"dataType": "FLOAT"
},
{
"name": "product_price",
"dataType": "FLOAT"
},
{
"name": "contains_fruit",
"dataType": "BOOLEAN"
},
{
"name": "contains_veggies",
"dataType": "BOOLEAN"
},
{
"name": "contains_nuts",
"dataType": "BOOLEAN"
},
{
"name": "contains_caffeine",
"dataType": "BOOLEAN"
},
{
"name": "purchase_price",
"dataType": "FLOAT"
},
{
"name": "is_member",
"dataType": "BOOLEAN"
},
{
"name": "member_discount",
"dataType": "FLOAT"
},
{
"name": "add_supplements",
"dataType": "BOOLEAN"
},
{
"name": "supplement_price",
"dataType": "FLOAT"
}
],
"metricFieldSpecs": [
{
"name": "purchase_quantity",
"dataType": "INT"
},
{
"name": "total_purchase",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [
{
"name": "transaction_time",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSSSSS",
"granularity": "1:MILLISECONDS"
}
]
}
Schema file for purchasesEnriched Realtime table
{
"tableName": "purchasesEnriched",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "transaction_time",
"timeType": "MILLISECONDS",
"schemaName": "purchasesEnriched",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "demo.purchases.enriched",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:29092",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.rows": "50000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {}
}
Config file for purchasesEnriched Realtime table

To begin, copy the three Pinot Realtime table schemas and configurations from the streaming-sales-generator GitHub project into the Apache Pinot Controller container. Next, use a docker exec command to call the Pinot Command Line Interface’s (CLI) AddTable command to create the three tables: products, purchases, and purchasesEnriched.

# copy pinot table schema and config files to pinot controller
CONTROLLER_CONTAINER=$(docker container ls –filter name=streaming-stack_pinot-controller.1 –format "{{.ID}}")
cd ~/streaming-sales-generator/apache_pinot_examples
docker cp configs_schemas/ ${CONTROLLER_CONTAINER}:/tmp/
# create three tables
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/purchases-config.json \
-schemaFile /tmp/configs_schemas/purchases-schema.json -exec
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/products-config.json \
-schemaFile /tmp/configs_schemas/products-schema.json -exec
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/purchases-enriched-config.json \
-schemaFile /tmp/configs_schemas/purchases-enriched-schema.json -exec

To confirm the three tables were created correctly, use the Apache Pinot Data Explorer accessible on port 9000. Use the Tables tab in the Cluster Manager.

Cluster Manager’s Tables tab shows the three Realtime tables and corresponding schemas

We can further inspect and edit the table’s config and schema from the Tables tab in the Cluster Manager.

Realtime table’s editable config and schema

The three tables are configured to read the unbounded stream of data from the corresponding Kafka topics: demo.products, demo.purchases, and demo.purchases.enriched.

Querying with Pinot

We can use Pinot’s Query Console to query the Realtime tables using SQL. According to the documentation, “Pinot provides a SQL interface for querying. It uses the [Apache] Calcite SQL parser to parse queries and uses MYSQL_ANSI dialect.

Schema and query results for the purchases table

With the generator still running, re-query the purchases table in the Query Console (select count(*) from purchases). You should notice the document count increasing each time you re-run the query since new messages are published to the demo.purchases topic by the sales generator.

If you do not observe the count increasing, ensure the sales generator and Flink enrichment job are running.

Query Console showing the purchases table’s document count continuing to increase

Table Joins?

It might seem logical to want to replicate the same multi-stream join we performed with Apache Flink in part three of the demonstration on the demo.products and demo.purchases topics. Further, we might presume to join the products and purchases realtime tables by writing a SQL statement in Pinot’s Query Console. However, according to the documentation, at the time of this post, version 0.11.0 of Pinot did not [currently] support joins or nested subqueries.

This current join limitation is why we created the Realtime table, purchasesEnriched, allowing us to query Flink’s real-time results in the demo.purchases.enriched topic. We will use both Flink and Pinot as part of our stream processing pipeline, taking advantage of each tool’s individual strengths and capabilities.

Note, according to the documentation for the latest release of Pinot on the main branch, “the latest Pinot multi-stage supports inner join, left-outer, semi-join, and nested queries out of the box. It is optimized for in-memory process and latency.” For more information on joins as part of Pinot’s new multi-stage query execution engine, read the documentation, Multi-Stage Query Engine.

Query showing results from the demo.purchases.enriched topic in real-time

Aggregations

We can perform real-time aggregations using Pinot’s rich SQL query interface. For example, like previously with Spark and Flink, we can calculate running totals for the number of items sold and the total sales for each product in real time.

Aggregating running totals for each product

We can do the same with the purchasesEnriched table, which will use the continuous stream of enriched transaction data from our Apache Flink application. With the purchasesEnriched table, we can add the product name and product category for richer results. Each time we run the query, we get real-time results based on the running sales generator and Flink enrichment job.

Aggregating running totals for each product

Query Options and Indexing

Note the reference to the Star-Tree index at the start of the SQL query shown above. Pinot provides several query options, including useStarTree (true by default).

Multiple indexing techniques are available in Pinot, including Forward Index, Inverted Index, Star-tree Index, Bloom Filter, and Range Index, among others. Each has advantages in different query scenarios. According to the documentation, by default, Pinot creates a dictionary-encoded forward index for each column.

SQL Examples

Here are a few examples of SQL queries you can try in Pinot’s Query Console:

products
SELECT
COUNT(product_id) AS product_count,
AVG(price) AS avg_price,
AVG(cogs) AS avg_cogs,
AVG(price) AVG(cogs) AS avg_gross_profit
FROM
products;
purchases
SELECT
product_id,
SUMPRECISION(quantity, 10, 0) AS quantity,
SUMPRECISION(total_purchase, 10, 2) AS sales
FROM
purchases
GROUP BY
product_id
ORDER BY
sales DESC;
purchasesEnriched
SELECT
product_id,
product_name,
product_category,
SUMPRECISION(purchase_quantity, 10, 0) AS quantity,
SUMPRECISION(total_purchase, 10, 2) AS sales
FROM
purchasesEnriched
GROUP BY
product_id,
product_name,
product_category
ORDER BY
sales DESC;

Troubleshooting Pinot

If have issues with creating the tables or querying the real-time data, you can start by reviewing the Apache Pinot logs:

CONTROLLER_CONTAINER=$(docker container ls –filter name=streaming-stack_pinot-controller.1 –format "{{.ID}}")
docker exec -it ${CONTROLLER_CONTAINER} cat logs/pinot-all.log
view raw pinot_logs.sh hosted with ❤ by GitHub

Real-time Dashboards with Apache Superset

To display the real-time stream of data produced results of our Apache Flink stream processing job and made queriable by Apache Pinot, we can use Apache Superset. Superset positions itself as “a modern data exploration and visualization platform.” Superset allows users “to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

According to the documentation, “Superset requires a Python DB-API database driver and a SQLAlchemy dialect to be installed for each datastore you want to connect to.” In the case of Apache Pinot, we can use pinotdb as the Python DB-API and SQLAlchemy dialect for Pinot. Since the existing Superset Docker container does not have pinotdb installed, I have built and published a Docker Image with the driver and deployed it as part of the second streaming stack of containers.

# Custom Superset build to add apache pinot driver
# Gary A. Stafford (2022-09-25)
# Updated: 2022-12-18
FROM apache/superset:66138b0ca0b82a94404e058f0cc55517b2240069
# Switching to root to install the required packages
USER root
# Find which driver you need based on the analytics database:
# https://superset.apache.org/docs/databases/installing-database-drivers
RUN pip install mysqlclient psycopg2-binary pinotdb
# Switching back to using the `superset` user
USER superset
view raw Dockerfile hosted with ❤ by GitHub

First, we much configure the Superset container instance. These instructions are documented as part of the Superset Docker Image repository.

# establish an interactive session with the superset container
SUPERSET_CONTAINER=$(docker container ls –filter name=streaming-stack_superset.1 –format "{{.ID}}")
# initialize superset (see superset documentation)
docker exec -it ${SUPERSET_CONTAINER} \
superset fab create-admin \
–username admin \
–firstname Superset \
–lastname Admin \
–email admin@superset.com \
–password sUp3rS3cREtPa55w0rD1
docker exec -it ${SUPERSET_CONTAINER} superset db upgrade
docker exec -it ${SUPERSET_CONTAINER} superset init

Once the configuration is complete, we can log into the Superset web-browser-based UI accessible on port 8088.

Home page of the Superset web-browser-based UI

Pinot Database Connection and Dataset

Next, to connect to Pinot from Superset, we need to create a Database Connection and a Dataset.

Creating a new database connection to Pinot

The SQLAlchemy URI is shown below. Input the URI, test your connection (‘Test Connection’), make sure it succeeds, then hit ‘Connect’.

pinot+http://pinot-broker:8099/query?controller=http://pinot-controller:9000

Next, create a Dataset that references the purchasesEnriched Pinot table.

Creating a new dataset allowing us access to the purchasesEnriched Pinot table

Modify the dataset’s transaction_time column. Check the is_temporal and Default datetime options. Lastly, define the DateTime format as epoch_ms.

Modifying the dataset’s transaction_time column

Building a Real-time Dashboard

Using the new dataset, which connects Superset to the purchasesEnriched Pinot table, we can construct individual charts to be placed on a dashboard. Build a few charts to include on your dashboard.

Example of a chart whose data source is the new dataset
List of charts that included on the dashboard

Create a new Superset dashboard and add the charts and other elements, such as headlines, dividers, and tabs.

Apache Superset dashboard displaying data from Apache Pinot Realtime table

We can apply a refresh interval to the dashboard to continuously query Pinot and visualize the results in near real-time.

Configuring a refresh interval for the dashboard

Conclusion

In this two-part post series, we were introduced to stream processing. We explored four popular open-source stream processing projects: Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot. Next, we learned how we could solve similar stream processing and streaming analytics challenges using different streaming technologies. Lastly, we saw how these technologies, such as Kafka, Flink, Pinot, and Superset, could be integrated to create effective stream processing pipelines.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , , , , ,

Leave a comment

Exploring Popular Open-source Stream Processing Technologies: Part 1 of 2

A brief demonstration of Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot with Apache Superset

Introduction

According to TechTarget, “Stream processing is a data management technique that involves ingesting a continuous data stream to quickly analyze, filter, transform or enhance the data in real-time. Once processed, the data is passed off to an application, data store, or another stream processing engine.Confluent, a fully-managed Apache Kafka market leader, defines stream processing as “a software paradigm that ingests, processes, and manages continuous streams of data while they’re still in motion.

Batch vs. Stream Processing

Again, according to Confluent, “Batch processing is when the processing and analysis happens on a set of data that have already been stored over a period of time.” A batch processing example might include daily retail sales data, which is aggregated and tabulated nightly after the stores close. Conversely, “streaming data processing happens as the data flows through a system. This results in analysis and reporting of events as it happens.” To use a similar example, instead of nightly batch processing, the streams of sales data are processed, aggregated, and analyzed continuously throughout the day — sales volume, buying trends, inventory levels, and marketing program performance are tracked in real time.

Bounded vs. Unbounded Data

According to Packt Publishing’s book, Learning Apache Apex, “bounded data is finite; it has a beginning and an end. Unbounded data is an ever-growing, essentially infinite data set.” Batch processing is typically performed on bounded data, whereas stream processing is most often performed on unbounded data.

Stream Processing Technologies

There are many technologies available to perform stream processing. These include proprietary custom software, commercial off-the-shelf (COTS) software, fully-managed service offerings from Software as a Service (or SaaS) providers, Cloud Solution Providers (CSP), Commercial Open Source Software (COSS) companies, and popular open-source projects from the Apache Software Foundation and Linux Foundation.

The following two-part post and forthcoming video will explore four popular open-source software (OSS) stream processing projects, including Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot. Each of these projects has some equivalent SaaS, CSP, and COSS offerings.

This post uses the open-source projects, making it easier to follow along with the demonstration and keeping costs to a minimum. However, you could easily substitute the open-source projects for your preferred SaaS, CSP, or COSS service offerings.

Apache Spark Structured Streaming

According to the Apache Spark documentation, “Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.” Further, “Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.” In the post, we will examine both batch and stream processing using a series of Apache Spark Structured Streaming jobs written in PySpark.

Spark Structured Streaming job statistics as seen from the Spark UI

Apache Kafka Streams

According to the Apache Kafka documentation, “Kafka Streams [aka KStreams] is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.” In the post, we will examine a KStreams application written in Java that performs stream processing and incremental aggregation.

Building the KStreams application’s uber JAR in JetBrains IntelliJ IDEA

Apache Flink

According to the Apache Flink documentation, “Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.” Further, “Apache Flink excels at processing unbounded and bounded data sets. Precise control of time and state enables Flink’s runtime to run any kind of application on unbounded streams. Bounded streams are internally processed by algorithms and data structures that are specifically designed for fixed-sized data sets, yielding excellent performance.” In the post, we will examine a Flink application written in Java, which performs stream processing, incremental aggregation, and multi-stream joins.

Apache Flink Dashboard showing Flink pipeline demonstrated in this post

Apache Pinot

According to Apache Pinot’s documentation, “Pinot is a real-time distributed OLAP datastore, purpose-built to provide ultra-low-latency analytics, even at extremely high throughput. It can ingest directly from streaming data sources — such as Apache Kafka and Amazon Kinesis — and make the events available for querying instantly. It can also ingest from batch data sources such as Hadoop HDFS, Amazon S3, Azure ADLS, and Google Cloud Storage.” In the post, we will query the unbounded data streams from Apache Kafka, generated by Apache Flink, using SQL.

Apache Pinot Query Console showing tables demonstrated in this post

Streaming Data Source

We must first find a good unbounded data source to explore or demonstrate these streaming technologies. Ideally, the streaming data source should be complex enough to allow multiple types of analyses and visualize different aspects with Business Intelligence (BI) and dashboarding tools. Additionally, the streaming data source should possess a degree of consistency and predictability while displaying a reasonable level of variability and randomness.

To this end, we will use the open-source Streaming Synthetic Sales Data Generator project, which I have developed and made available on GitHub. This project’s highly-configurable, Python-based, synthetic data generator generates an unbounded stream of product listings, sales transactions, and inventory restocking activities to a series of Apache Kafka topics.

Streaming Synthetic Sales Data Generator publishing messages to Apache Kafka

Source Code

All the source code demonstrated in this post is open source and available on GitHub. There are three separate GitHub projects:

# streaming data generator, Apache Spark and Apache Pinot examples
git clone –depth 1 -b main \
https://github.com/garystafford/streaming-sales-generator.git
# Apache Flink examples
git clone –depth 1 -b main \
https://github.com/garystafford/flink-kafka-demo.git
# Kafka Streams examples
git clone –depth 1 -b main \
https://github.com/garystafford/kstreams-kafka-demo.git

Docker

To make it easier to follow along with the demonstration, we will use Docker Swarm to provision the streaming tools. Alternatively, you could use Kubernetes (e.g., creating a Helm chart) or your preferred CSP or SaaS managed services. Nothing in this demonstration requires you to use a paid service.

The two Docker Swarm stacks are located in the Streaming Synthetic Sales Data Generator project:

  1. Streaming Stack — Part 1: Apache Kafka, Apache Zookeeper, Apache Spark, UI for Apache Kafka, and the KStreams application
  2. Streaming Stack — Part 2: Apache Kafka, Apache Zookeeper, Apache Flink, Apache Pinot, Apache Superset, UI for Apache Kafka, and Project Jupyter (JupyterLab).*

* the Jupyter container can be used as an alternative to the Spark container for running PySpark jobs (follow the same steps as for Spark, below)

Demonstration #1: Apache Spark

In the first of four demonstrations, we will examine two Apache Spark Structured Streaming jobs, written in PySpark, demonstrating both batch processing (spark_batch_kafka.py) and stream processing (spark_streaming_kafka.py). We will read from a single stream of data from a Kafka topic, demo.purchases, and write to the console.

High-level workflow for Apache Spark demonstration

Deploying the Streaming Stack

To get started, deploy the first streaming Docker Swarm stack containing the Apache Kafka, Apache Zookeeper, Apache Spark, UI for Apache Kafka, and the KStreams application containers.

# cd into project
cd streaming-sales-generator/
# initialize swarm stack – 1x only
docker swarm init
# optional: delete previous streaming-stack
docker stack rm streaming-stack
# deploy first streaming-stack
docker stack deploy streaming-stack –compose-file docker/spark-kstreams-stack.yml
# observe the deployment's progress
docker stack services streaming-stack

The stack will take a few minutes to deploy fully. When complete, there should be a total of six containers running in the stack.

Viewing the Docker streaming stack’s six containers

Sales Generator

Before starting the streaming data generator, confirm or modify the configuration/configuration.ini. Three configuration items, in particular, will determine how long the streaming data generator runs and how much data it produces. We will set the timing of transaction events to be generated relatively rapidly for test purposes. We will also set the number of events high enough to give us time to explore the Spark jobs. Using the below settings, the generator should run for an average of approximately 50–60 minutes: (((5 sec + 2 sec)/2)*1000 transactions)/60 sec=~58 min on average. You can run the generator again if necessary or increase the number of transactions.

[SALES]
# minimum sales frequency in seconds (debug with 1, typical min. 120)
min_sale_freq = 2
# maximum sales frequency in seconds (debug with 3, typical max. 300)
max_sale_freq = 5
# number of transactions to generate
number_of_sales = 1000
A code snippet from the project’s configuration.ini file

Start the streaming data generator as a background service:

# install required python packages (1x)
python3 -m pip install kafka-python
cd sales_generator/
# run in foreground
python3 ./producer.py
# better option, run as background process
nohup python3 ./producer.py &
# confirm process is running
ps -u

The streaming data generator will start writing data to three Apache Kafka topics: demo.products, demo.purchases, and demo.inventories. We can view these topics and their messages by logging into the Apache Kafka container and using the Kafka CLI:

# establish an interactive session with the spark container
KAFKA_CONTAINER=$(docker container ls –filter name=streaming-stack_kafka.1 –format "{{.ID}}")
docker exec -it ${KAFKA_CONTAINER} bash
# set environment variables used by jobs
export BOOTSTRAP_SERVERS="localhost:9092"
export TOPIC_PRODUCTS="demo.products"
export TOPIC_PURCHASES="demo.purchases"
export TOPIC_INVENTORIES="demo.inventories"
# list topics
kafka-topics.sh –list –bootstrap-server $BOOTSTRAP_SERVERS
# read topics from beginning
kafka-console-consumer.sh \
–topic $TOPIC_PRODUCTS –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $TOPIC_PURCHASES –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $TOPIC_INVENTORIES –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS

Below, we see a few sample messages from the demo.purchases topic:

Consuming messages from Kafka’s demo.purchases topic

Alternatively, you can use the UI for Apache Kafka, accessible on port 9080.

Viewing demo.purchases topic in the UI for Apache Kafka
Viewing messages in the demo.purchases topic using the UI for Apache Kafka

Prepare Spark

Next, prepare the Spark container to run the Spark jobs:

# establish an interactive session with the spark container
SPARK_CONTAINER=$(docker container ls –filter name=streaming-stack_spark.1 –format "{{.ID}}")
docker exec -it -u 0 ${SPARK_CONTAINER} bash
# update and install wget
apt-get update && apt-get install wget vim -y
# install required job dependencies
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.1/kafka-clients-3.3.1.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.1/spark-sql-kafka-0-10_2.12-3.3.1.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.1/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
mv *.jar /opt/bitnami/spark/jars/
exit
Preparing the Spark container instance as the root user

Running the Spark Jobs

Next, copy the jobs from the project to the Spark container, then exec back into the container:

# copy jobs to spark container
docker cp apache_spark_examples/ ${SPARK_CONTAINER}:/home/
# establish an interactive session with the spark container
docker exec -it ${SPARK_CONTAINER} bash

Batch Processing with Spark

The first Spark job, spark_batch_kafka.py, aggregates the number of items sold and the total sales for each product, based on existing messages consumed from the demo.purchases topic. We use the PySpark DataFrame class’s read() and write() methods in the first example, reading from Kafka and writing to the console. We could just as easily write the results back to Kafka.

ds_sales = (
df_sales.selectExpr("CAST(value AS STRING)")
.select(F.from_json("value", schema=schema).alias("data"))
.select("data.*")
.withColumn("row", F.row_number().over(window))
.withColumn("quantity", F.sum(F.col("quantity")).over(window_agg))
.withColumn("sales", F.sum(F.col("total_purchase")).over(window_agg))
.filter(F.col("row") == 1)
.drop("row")
.select(
"product_id",
F.format_number("sales", 2).alias("sales"),
F.format_number("quantity", 0).alias("quantity"),
)
.coalesce(1)
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
.write.format("console")
.option("numRows", 25)
.option("truncate", False)
.save()
)
A snippet of batch processing Spark job’s summarize_sales() method

The batch processing job sorts the results and outputs the top 25 items by total sales to the console. The job should run to completion and exit successfully.

Batch results for top 25 items by total sales

To run the batch Spark job, use the following commands:

# set environment variables used by jobs
export BOOTSTRAP_SERVERS="kafka:29092"
export TOPIC_PURCHASES="demo.purchases"
cd /home/apache_spark_examples/
# run batch processing job
spark-submit spark_batch_kafka.py
Run the batch Spark job

Stream Processing with Spark

The stream processing Spark job, spark_streaming_kafka.py, also aggregates the number of items sold and the total sales for each item, based on messages consumed from the demo.purchases topic. However, as shown in the code snippet below, this job continuously aggregates the stream of data from Kafka, displaying the top ten product totals within an arbitrary ten-minute sliding window, with a five-minute overlap, and updates output every minute to the console. We use the PySpark DataFrame class’s readStream() and writeStream() methods as opposed to the batch-oriented read() and write() methods in the first example.

ds_sales = (
df_sales.selectExpr("CAST(value AS STRING)")
.select(F.from_json("value", schema=schema).alias("data"))
.select("data.*")
.withWatermark("transaction_time", "10 minutes")
.groupBy("product_id", F.window("transaction_time", "10 minutes", "5 minutes"))
.agg(F.sum("total_purchase"), F.sum("quantity"))
.orderBy(F.col("window").desc(), F.col("sum(total_purchase)").desc())
.select(
"product_id",
F.format_number("sum(total_purchase)", 2).alias("sales"),
F.format_number("sum(quantity)", 0).alias("drinks"),
"window.start",
"window.end",
)
.coalesce(1)
.writeStream.queryName("streaming_to_console")
.trigger(processingTime="1 minute")
.outputMode("complete")
.format("console")
.option("numRows", 10)
.option("truncate", False)
.start()
)
ds_sales.awaitTermination()
A snippet of stream processing Spark job’s summarize_sales() method

Shorter event-time windows are easier for demonstrations — in Production, hourly, daily, weekly, or monthly windows are more typical for sales analysis.

Micro-batch representing real-time totals for the current ten-minute window

To run the stream processing Spark job, use the following commands:

# run stream processing job
spark-submit spark_streaming_kafka.py
Run the stream processing Spark job

We could just as easily calculate running totals for the stream of sales data versus aggregations over a sliding event-time window (example job included in project).

Micro-batch representing running totals for data stream as opposed to using event-time windows

Be sure to kill the stream processing Spark jobs when you are done, or they will continue to run, awaiting more data.

Demonstration #2: Apache Kafka Streams

Next, we will examine Apache Kafka Streams (aka KStreams). For this part of the post, we will also use the second of the three GitHub repository projects, kstreams-kafka-demo. The project contains a KStreams application written in Java that performs stream processing and incremental aggregation.

High-level workflow for KStreams demonstration

KStreams Application

The KStreams application continuously consumes the stream of messages from the demo.purchases Kafka topic (source) using an instance of the StreamBuilder() class. It then aggregates the number of items sold and the total sales for each item, maintaining running totals, which are then streamed to a new demo.running.totals topic (sink). All of this using an instance of the KafkaStreams() Kafka client class.

private static void kStreamPipeline(Properties props) {
System.out.println("Starting…");
Properties kafkaStreamsProps = new Properties();
kafkaStreamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, props.getProperty("APPLICATION_ID"));
kafkaStreamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("BOOTSTRAP_SERVERS"));
kafkaStreamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, props.getProperty("AUTO_OFFSET_RESET_CONFIG"));
kafkaStreamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, props.getProperty("COMMIT_INTERVAL_MS_CONFIG"));
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(props.getProperty("INPUT_TOPIC"), Consumed.with(Serdes.Void(), CustomSerdes.Purchase()))
.peek((unused, purchase) -> System.out.println(purchase.toString()))
.flatMap((KeyValueMapper<Void, Purchase, Iterable<KeyValue<String, Total>>>) (unused, purchase) -> {
List<KeyValue<String, Total>> result = new ArrayList<>();
result.add(new KeyValue<>(purchase.getProductId(), new Total(
purchase.getTransactionTime(),
purchase.getProductId(),
1,
purchase.getQuantity(),
purchase.getTotalPurchase()
)));
return result;
})
.groupByKey(Grouped.with(Serdes.String(), CustomSerdes.Total()))
.reduce((total1, total2) -> {
total2.setTransactions(total1.getTransactions() + total2.getTransactions());
total2.setQuantities(total1.getQuantities() + total2.getQuantities());
total2.setSales(total1.getSales().add(total2.getSales()));
return total2;
})
.toStream()
.peek((productId, total) -> System.out.println(total.toString()))
.to(props.getProperty("OUTPUT_TOPIC"), Produced.with(Serdes.String(), CustomSerdes.Total()));
KafkaStreams streams = new KafkaStreams(builder.build(), kafkaStreamsProps);
streams.start();
System.out.println("Running…");
}
A snippet of KStreams application’s kStreamPipeline() method

Running the Application

We have at least three choices to run the KStreams application for this demonstration: 1) running locally from our IDE, 2) a compiled JAR run locally from the command line, or 3) a compiled JAR copied into a Docker image, which is deployed as part of the Swarm stack. You can choose any of the options.

# set java version (v17 is latest compatible version with kstreams)
JAVA_HOME=~/Library/Java/JavaVirtualMachines/corretto-17.0.5/Contents/Home
$JAVA_HOME/bin/java -version
# compile to uber jar
./gradlew clean shadowJar
# run the streaming application
$JAVA_HOME/bin/java -jar build/libs/kstreams-kafka-demo-1.0.0-all.jar

Compiling and running the KStreams application locally

We will continue to use the same streaming Docker Swarm stack used for the Apache Spark demonstration. I have already compiled a single uber JAR file using OpenJDK 17 and Gradle from the project’s source code. I then created and published a Docker image, which is already part of the running stack.

FROM amazoncorretto:17.0.5
COPY build/libs/kstreams-kafka-demo-1.1.0-all.jar /tmp/kstreams-app.jar
CMD ["java", "-jar", "/tmp/kstreams-app.jar"]
view raw Dockerfile hosted with ❤ by GitHub
Dockerfile used to build KStreams app Docker image

Since we ran the sales generator earlier for the Spark demonstration, there is existing data in the demo.purchases topic. Re-run the sales generator (nohup python3 ./producer.py &) to generate a new stream of data. View the results of the KStreams application, which has been running since the stack was deployed using the Kafka CLI or UI for Apache Kafka:

# terminal 1: establish an interactive session with the kstreams app container
KSTREAMS_CONTAINER=$(docker container ls –filter name=streaming-stack_kstreams.1 –format "{{.ID}}")
docker logs ${KSTREAMS_CONTAINER} –follow
# terminal 2: establish an interactive session with the kafka container
KAFKA_CONTAINER=$(docker container ls –filter name=streaming-stack_kafka.1 –format "{{.ID}}")
docker exec -it ${KAFKA_CONTAINER} bash
# set environment variables used by jobs
export BOOTSTRAP_SERVERS="localhost:9092"
export INPUT_TOPIC="demo.purchases"
export OUTPUT_TOPIC="demo.running.totals"
# read topics from beginning
kafka-console-consumer.sh \
–topic $INPUT_TOPIC –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $OUTPUT_TOPIC –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS

Below, in the top terminal window, we see the output from the KStreams application. Using KStream’s peek() method, the application outputs Purchase and Total instances to the console as they are processed and written to Kafka. In the lower terminal window, we see new messages being published as a continuous stream to output topic, demo.running.totals.

KStreams application performing stream processing and the resulting output stream

Part Two

In part two of this two-part post, we continue our exploration of the four popular open-source stream processing projects. We will cover Apache Flink and Apache Pinot. In addition, we will incorporate Apache Superset into the demonstration, building a real-time dashboard to visualize the results of our stream processing.

Apache Superset dashboard displaying data from Apache Pinot Realtime table

This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , , , ,

Leave a comment

Lakehouse Data Modeling using dbt, Amazon Redshift, Redshift Spectrum, and AWS Glue

Learn how dbt makes it easy to transform data and materialize models in a modern cloud data lakehouse built on AWS

Introduction

Data lakes have grabbed much of the analytics community’s attention in recent years, thanks to an overabundance of VC-backed analytics startups and marketing dollars. Nonetheless, data warehouses, specifically modern cloud data warehouses, continue to gain market share, led by SnowflakeAmazon RedshiftGoogle Cloud BigQuery, and Microsoft’s Azure Synapse Analytics.

Several factors have fostered the renewed interest and appeal of data warehouses, including the data lakehouse architecture. According to Databricks, “a lakehouse is a new, open architecture that combines the best elements of data lakes and data warehouses. Lakehouses are enabled by a new system design: implementing similar data structures and data management features to those in a data warehouse directly on top of low-cost cloud storage in open formats.” Similarly, Snowflake describes a lakehouse as “a data solution concept that combines elements of the data warehouse with those of the data lake. Data lakehouses implement data warehouses’ data structures and management features for data lakes, which are typically more cost-effective for data storage.

dbt

In the following post, we will explore the use of dbt (data build tool), developed by dbt Labs, to transform data in an AWS-based data lakehouse, built with Amazon Redshift, Redshift Spectrum, AWS Glue, and Amazon S3. According to dbt Labs, “dbt enables analytics engineers to transform data in their warehouses by simply writing select statements. dbt handles turning these select statements into tables and views.” Further, “dbt does the T in ELT (Extract, Load, Transform) processes — it doesn’t extract or load data, but it’s extremely good at transforming data that’s already loaded into your warehouse.

This post’s project, displayed in dbt Cloud

Amazon Redshift

According to AWS, “Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes using AWS-designed hardware and machine learning to deliver the best price-performance at any scale.” AWS claims Amazon Redshift is the most widely used cloud data warehouse.

Amazon Redshift Spectrum

According to AWS, “Redshift Spectrum allows you to efficiently query and retrieve structured and semi-structured data from files in Amazon S3 without having to load the data into Amazon Redshift tables.” Redshift Spectrum tables define the data structure for the files in Amazon S3. The external tables exist in an external data catalog, which can be AWS Glue, the data catalog that comes with Amazon Athena, or an Apache Hive metastore.

dbt can interact with Amazon Redshift Spectrum to create external tables, refresh external table partitions, and access raw data in an Amazon S3-based data lake from the data warehouse. We will use dbt along with the dbt package, dbt_external_tables, to create the external tables in an AWS Glue data catalog.

Prerequisites

Prerequisites to follow along with this post’s demonstration include:

  • Amazon S3 bucket to store raw data;
  • Amazon Redshift or Amazon Redshift Serverless cluster;
  • AWS IAM Role with permissions to Amazon Redshift, Amazon S3, and AWS Glue;
  • dbt Cloud account;
  • dbt CLI (dbt Core) and dbt Amazon Redshift adapter installed locally;
  • Microsoft Visual Studio Code (VS Code) with dbt extensions installed;

The post’s demonstration uses dbt Cloud, VS Code, and the dbt CLI interchangeably with the project’s GitHub repository as a source. Follow along with the demonstration using any or all of these three dbt options.

Example of this project in VS Code with dbt extensions installed

Cost Warning!

Be careful when creating a new, provisioned Amazon Redshift cluster for this demonstration. The suggested default Production cluster with two ra3.4xlarge on-demand compute nodes and AQUA (Redshift’s Advanced Query Accelerator) enabled is estimated at $4,694/month ($3.26/node/hour). For this demonstration, choose the minimum size provisioned Redshift cluster configuration of one dc2.large on-demand compute node, estimated to cost $180/month ($0.25/node/hour). Be sure to delete the cluster when the demonstration is complete.

Creating a new Amazon Redshift cluster

Amazon Redshift Serverless Option

AWS recently announced the general availability (GA) of Amazon Redshift Serverless on July 12, 2022. Amazon Redshift Serverless allows data analysts, developers, and data scientists to run and scale analytics without having to provision and manage data warehouse clusters. dbt is fully compatible with Amazon Redshift Serverless and is an alternative to provisioned Redshift for this demonstration. According to AWS, Amazon Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). You pay for the workloads you run in RPU-hours on a per-second basis (with a 60-second minimum charge), including queries that access data in open file formats in Amazon S3.

Source Code

All the source code demonstrated in this post is open source and available on GitHub.

Sample Data

This demonstration uses the TICKIT sample database provided by AWS and designed for use with Amazon Redshift. This sample database application tracks sales activity for the fictional online TICKIT website, where users buy and sell tickets for sporting events, shows, and concerts. The database consists of seven tables in a star schema: five dimension tables and two fact tables. A clean copy of the raw TICKIT data, formatted as pipe-delimited text files, is included in this GitHub project. Use the following shell commands to copy the raw data to Amazon S3:

# Purpose: Unzip raw data files and copy to s3
# Author: Gary A. Stafford
# Date: 2022-12-29
# sh ./raw_date_to_s3.sh
# ** REPLACE ME! **
s3_bucket="<your_s3_bucket_name>"
pushd raw_tickit_data/
unzip tickit_data.zip
popd
declare -a TableArray=("category" "date" "event" "listing" "sale" "user" "venue")
for table in "${TableArray[@]}"; do
aws s3 cp ./raw_tickit_data/$table.txt s3://$s3_bucket/raw_tickit_data/$table/
done

Prepare Amazon Redshift for dbt

Create New Database

Create a new Redshift database to use for the demonstration, demo.

create new database
create database demo with owner admin;

Create Database Schemas

Within the new Redshift database,demo, create the external schema, tickit_external, and the corresponding external AWS Glue Data Catalogtickit_dbt, using the CREATE EXTERNAL SCHEMA Redshift SQL command. Make sure to update the command to reflect your IAM Role’s ARN. Next, create the schema that will hold our dbt models, tickit_dbt. Lastly, as a security best practice, drop the default public schema.

switch redshift database connection to new demo database before continuing!
create external tables schema and new glue data catalog
create external schema tickit_external
from data catalog
database 'tickit_dbt'
iam_role 'arn:aws:iam::<your_aws_acccount_id>:role/ClusterPermissionsRole'
create external database if not exists;
create schema tickit_dbt;
drop schema public;

From the AWS Glue console, we should observe a new tickit_dbt AWS Glue Data Catalog. The description shown below was manually added after the catalog was created.

Newly created AWS Glue Data Catalog

Create dbt Database User and Group

As a security best practice, create a separate database dbt user and dbt group. We are assigning a completely arbitrary connection limit of ten. Then, apply the grants to allow the dbt group access to the new database and schemas. Lastly, change the two schema’s owners to the dbt.

create dbt user and group
create user dbt with password 'CHANGE_ME!'
nocreatedb nocreateuser syslog access restricted
connection limit 10;
create dbt group
create group dbt with user dbt;
grants on tickit_external schema
grant usage on schema tickit_external to group dbt;
grant create on schema tickit_external to group dbt;
grant all on all tables in schema tickit_external to group dbt;
grants on tickit_dbt schema
grant usage on schema tickit_dbt to group dbt;
grant create on schema tickit_dbt to group dbt;
grant all on all tables in schema tickit_dbt to group dbt;
reassign schema ownership to dbt
alter schema tickit_dbt owner to dbt;
alter schema tickit_external owner to dbt;

Alternately, we could use an IAM Role with a SAML 2.0-compliant IdP.

Initialize and Configure dbt for Redshift

Next, configure your dbt Cloud account and dbt locally with your Amazon Redshift connection information using the dbt init command. On a Mac, this configuration is stored in the /Users/<your_usernama>/.dbt/profiles.yml file. You will need your Redshift cluster host URL, port, database, username, and password. With your local install of dbt, we can use the dbt debug command to confirm the new configuration.

Confirming configuration using dbt debug command

Project Structure

The GitHub project structure follows many of the best practices outlined in dbt Labs’ Best Practice Guide. Data models in the models directory is organized into the recommended stagingintermediate, and marts subdirectories (aka layers).

Project structure for data models

From a data lineage perspective, in this project, the staging layer’s data models depend on the external tables (AWS Glue/Amazon Redshift Spectrum). The intermediate layer’s data models depend on the staging models. The marts layer’s data models depend on staging and intermediate models.

dbt Cloud’s Lineage Graph showing an example of data model relationships

Install dbt Packages

The GitHub project’s packages.yml contains a few commonly recommended packages. The only one required for this post is the dbt-labs/dbt_external_tables package. Make sure your project is referring to the latest version of the package.

# updated 2022-12-29 (dbt=1.3.1 locally and dbt=1.3.1 in dbt cloud)
packages:
package: dbt-labs/dbt_external_tables
version: 0.8.2
package: dbt-labs/codegen
version: 0.9.0
package: dbt-labs/dbt_utils
version: 1.0.0
package: dbt-labs/redshift
version: 0.8.0
view raw packages.yml hosted with ❤ by GitHub

Use the dbt deps command to install the packages locally.

Installing dbt packages

External Tables

The _tickit__sources.yml file in the models/staging/tickit/external_tables/ model’s subdirectory defines the schema and S3 location for each of the seven external TICKIT database tables: category, date, event, listing, sale, user, and venue. You will need to update this file to reflect the name of your Amazon S3 bucket, in seven places.

version: 2
sources:
name: tickit_external
description: Sales activity for the fictional TICKIT web site, where users buy and sell tickets online for sporting events, shows, and concerts.
database: demo
schema: tickit_external
loader: s3
tables:
name: category
description: dimension table – TICKIT categories
external:
location: "s3://<your_s3_bucket_name>/raw_tickit_data/category/"
row_format: >
serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
'separatorChar'='|'
)
table_properties: "('skip.header.line.count'='1')"
columns:
name: catid
data_type: int
description: primary key
tests:
unique
not_null
name: catgroup
data_type: varchar(20)
name: catname
data_type: varchar(20)
name: catdesc
data_type: varchar(50)

Execute the command, dbt run-operation stage_external_sources, to create the seven external tables in the AWS Glue Data Catalog. This command is part of the dbt_external_tables package we installed earlier. It iterates through all source nodes, creates the tables if missing, and refreshes metadata.

Creating external tables in AWS Glue Data Catalog

If we failed to run the previous SQL statements to set schema ownership to the dbt user, the following error will likely occur.

Typical error resulting from incorrect ownership of the external_table schema

Once the command completes, we should observe seven new tables in the AWS Glue Data Catalog.

Seven new tables in the AWS Glue Data Catalog

Examining one of the AWS Glue data catalog tables, we can observe how the configuration in the _tickit__sources.yml file was used to define the table’s properties and schema. Note the Location field indicates where the underlying data is located in our Amazon S3 bucket.

The ‘category’ table in the AWS Glue Data Catalog

Staging Layer

In their best practices guide, dbt describes the staging layer in the following manner: “you can think of the staging layer as condensing and refining this material into the individual atoms we’ll later build more intricate and useful structures with.” The staging data models are the base tables and views we will use to build more complex aggregations and analytics queries in Redshift. The schema.yml file, also in the models/staging/tickit/ model’s subdirectory, defines seven late-binding views, modeled by dbt, to be created in Amazon Redshift.

version: 2
models:
name: stg_tickit__categories
description: Late binding view of external dimension table – TICKIT category
name: stg_tickit__dates
description: Late binding view of external dimension table – TICKIT date
name: stg_tickit__events
description: Late binding view of external dimension table – TICKIT event
name: stg_tickit__listings
description: Late binding view of external fact table – TICKIT listing
name: stg_tickit__sales
description: Late binding view of external fact table – TICKIT sale
name: stg_tickit__users
description: Late binding view of external dimension table – TICKIT user
name: stg_tickit__venues
description: Late binding view of external dimension table – TICKIT venue
view raw schema.yml hosted with ❤ by GitHub

The staging model’s SQL statements also follow many of dbt’s best practices. Below, we see an example of the stg_tickit__sales model (stg_tickit__sales.sql). This model performs a SELECT from the external sale table in the external_table schema. The model performs column renaming and basic calculations.

{{ config(materialized='view', bind=False) }}
with source as (
select * from {{ source('tickit_external', 'sale') }}
),
renamed as (
select
saleid as sale_id,
listid as list_id,
sellerid as seller_id,
buyerid as buyer_id,
eventid as event_id,
dateid as date_id,
qtysold as qty_sold,
round (pricepaid / qtysold, 2) as ticket_price,
pricepaid as price_paid,
round((commission / pricepaid) * 100, 2) as commission_prcnt,
commission,
(pricepaid commission) as earnings,
saletime as sale_time
from
source
where
sale_id IS NOT NULL
order by
sale_id
)
select * from renamed

The the dbt run command, according to dbt, “executes compiled SQL model files against the current target database. dbt connects to the target database and runs the relevant SQL, required to materialize all data models using the specified materialization strategies.” Instead of using the dbt run command to create all the project’s tables and views at once, for now, we are limiting the command to just the models in the ./models/staging/tickit/ directory using the --select optional argument. Execute the dbt run --select staging command to materialize the seven corresponding staging tables in Amazon Redshift.

The Staging layer’s data models successfully materialized in Redshift

Once the command completes, we should observe seven new views in Amazon Redshift demo database’s tickit_dbt schema with the stg_ prefix.

Amazon Redshift Query Editor v2 Console

Selecting from any of the views should return data.

Amazon Redshift Query Editor v2 Console

Late Binding Views

This demonstration uses late binding views for staging and intermediate layer models. According to dbt, “using late-binding views in a production deployment of dbt can vastly improve the availability of data in the warehouse, especially for models that are materialized as late-binding views and are queried by end-users, since they won’t be dropped when upstream models are updated. Additionally, late binding views can be used with external tables via Redshift Spectrum.

Alternatively, we could define the seven staging models as tables instead of late binding views. Once created as tables, the dependent intermediate and marts views will not require a late-binding reference, as in this project.

Intermediate Layer

In their best practices guide, dbt describes the intermediate layer as “purpose-built transformation steps.” Further, “the best guiding principle is to think about verbs (e.g. pivotedaggregated_to_userjoinedfanned_out_by_quanityfunnel_created, etc.) in the intermediate layer.

The project’s intermediate layer consists of two models related to users. The sample TICKIT database lumps all users into a single table. However, for analytics purposes, different user personas might interest marketing teams, such as buyers, sellers, sellers who also buy, and non-buyers (users who have never purchased tickets). The two models in the project’s intermediate layer filter for buyers and for sellers, resulting in two separate views of user personas.

{{ config(materialized='view', bind=False) }}
with sales as (
select * from {{ ref('stg_tickit__sales') }}
),
users as (
select * from {{ ref('stg_tickit__users') }}
),
first_purchase as (
select min(date(sale_time)) as first_purchase_date, buyer_id
from sales
group by buyer_id
),
final as (
select distinct
u.user_id,
u.username,
cast((u.last_name||', '||u.first_name) as varchar(100)) as full_name,
f.first_purchase_date,
u.city,
u.state,
u.email,
u.phone,
u.like_broadway,
u.like_classical,
u.like_concerts,
u.like_jazz,
u.like_musicals,
u.like_opera,
u.like_rock,
u.like_sports,
u.like_theatre,
u.like_vegas
from
sales as s
join users as u on u.user_id = s.buyer_id
join first_purchase as f on f.buyer_id = s.buyer_id
order by
user_id
)
select * from final

To materialize the intermediate layer’s two data models into views, execute the command, dbt run --select intermediate.

The Intermediate layer’s data models successfully materialized in Redshift

Once the command completes, we should observe a total of nine views in Amazon Redshift demo database’s tickit_dbt schema — seven staging and two intermediate, identified with the int_ prefix.

Amazon Redshift Query Editor v2 Console

Marts Layer

In their best practices guide, dbt describes the marts layer as “business defined entities.” Further, “this is the layer where everything comes together and we start to arrange all of our atoms (staging models) and molecules (intermediate models) into full-fledged cells that have identity and purpose. We sometimes like to call this the entity layer or concept layer, to emphasize that all our marts are meant to represent a specific entity or concept at its unique grain.

The project’s marts layer consists of four data models across marketing and sales. The models are materialized as two dimension tables and two fact tables. Although it is common practice to describe and label these as traditional star schema dimension (dim_) or fact (fct_) tables, in reality, the fact tables in this demonstration are actually flat, de-normalized, wide tables. Wide tables generally have better analytics performance in a modern data warehouse, according to Fivetran and others.

{{ config(materialized='table', sort='sale_id', dist='sale_id') }}
with categories as (
select * from {{ ref('stg_tickit__categories') }}
),
dates as (
select * from {{ ref('stg_tickit__dates') }}
),
events as (
select * from {{ ref('stg_tickit__events') }}
),
listings as (
select * from {{ ref('stg_tickit__listings') }}
),
sales as (
select * from {{ ref('stg_tickit__sales') }}
),
sellers as (
select * from {{ ref('int_sellers_extracted_from_users') }}
),
buyers as (
select * from {{ ref('int_buyers_extracted_from_users') }}
),
event_categories as (
select
e.event_id,
e.event_name,
c.cat_group,
c.cat_name
from events as e
join categories as c on c.cat_id = e.cat_id
),
final as (
select
s.sale_id,
s.sale_time,
d.qtr,
ec.cat_group,
ec.cat_name,
ec.event_name,
b.username as buyer_username,
b.full_name as buyer_name,
b.state as buyer_state,
b.first_purchase_date as buyer_first_purchase_date,
se.username as seller_username,
se.full_name as seller_name,
se.state as seller_state,
se.first_sale_date as seller_first_sale_date,
s.ticket_price,
s.qty_sold,
s.price_paid,
s.commission_prcnt,
s.commission,
s.earnings
from
sales as s
join listings as l on l.list_id = s.list_id
join buyers as b on b.user_id = s.buyer_id
join sellers as se on se.user_id = s.seller_id
join event_categories as ec on ec.event_id = s.event_id
join dates as d on d.date_id = s.date_id
order by
sale_id
)
select * from final
view raw fct_sales.sql hosted with ❤ by GitHub

The marts layer’s models take various dependencies through joins on staging and intermediate models. The data model above, fct_sales, has dependencies on multiple staging and intermediate models.

Lineage graph (dependency graph) of a data model, displayed in dbt Cloud

To materialize the marts layer’s four data models into tables, execute the command, dbt run --select marts.

The Marts layer’s data models successfully materialized in Redshift as tables

Once the command completes, we should observe four tables and nine views in the Redshift demo database’s tickit_dbt schema. Note how the dbt model for fct_sales (shown above), with its Jinja templating and multiple CTEs have been compiled into the resulting table in Redshift, this is the real magic of dbt!

Amazon Redshift Query Editor v2 Console showing fct_sales table

At this point, all of the project’s models have been compiled and created in the Redshift demo database by dbt.

Analyses

The demonstration’s project also contains example analyses. dbt allows us to version control more analytical-oriented SQL files within our dbt project using the analyses functionality of dbt. These analyses do not fit the fairly opinionated dbt model definition. We can compile the analyses SQL file using the dbt compile command, then copy and paste the resulting SQL statements from the target/compiled/ subdirectory into our data warehouse’s query tool of choice.

Analysis shown in dbt Cloud interface
Compiled analysis SQL executed in Amazon Redshift

Project Documentation

Using the dbt docs generate command will automatically generate the project’s documentation website from the SQL and YAML files. Documentations can be generated and displayed from your dbt Cloud account or hosted locally.

Project’s documentation website

Testing

According to dbt, “Tests are assertions you make about your models and other resources in your dbt project (e.g. sources, seeds, and snapshots). When you run dbt test, dbt will tell you if each test in your project passes or fails.” The project contains over 50 tests, split between the _tickit__sources.yml file and individual tests in the test/ directory. Typical dbt tests check for non-null and unique values, values within an expected numeric range, and values from a known list of strings. Any SELECT statement written in SQL can be tested.

name: user
description: dimension table – TICKIT users
columns:
name: userid
data_type: int
description: primary key
tests:
unique
not_null
name: username
data_type: char(8)
tests:
unique
not_null
name: firstname
data_type: varchar(30)
tests:
not_null
Snippet of tests in the _tickit__sources.sql file
all prices paid for tickets should be a positive value of $1 or greater
there are no credits (negative values)
select price_paid
from {{ ref('stg_tickit__sales') }}
group by price_paid
having not (price_paid >= 1)

Execute the project’s tests using the dbt test command. We can execute individual tests using the --select optional argument, for example, dbt test --select assert_all_sale_amounts_are_positive. We can also use the --threads optional argument with most dbt commands, including dbt test, increasing parallelism and reducing execution time. The example below uses 10 threads, the arbitrary maximum configured for the Amazon Redshift dbt user.

Running dbt tests
Successful test run

Jobs

According to dbt, Jobs are a set of dbt commands that you want to run on a schedule. For example, dbt run and dbt test. Jobs can load packages, run tests, materialize models, check source freshness (dbt source freshness), and regenerate documentation. Below, we have created a daily job to test, refresh, and document our project as the data is updated in the data lake.

dbt Cloud’s Job Run Overview

Notifications

According to dbt, Setting up notifications in dbt Cloud will allow you to receive alerts via Email or a chosen Slack channel when a job run succeeds, fails, or is canceled.

dbt Cloud’s Notifications interface for configuring email and Slack

The Slack notifications include run status, timings, and a link to open the job in dbt Cloud. Below, we see a notification regarding our project’s daily job run.

Notification of successful job run in Slack from dbt Cloud

Exposures

Exposures are a recent addition to dbt. Exposures make it possible to define and describe a downstream use of our dbt project, such as in a dashboard, application, or data science pipeline. Below we see an example of an exposure describing a sales dashboard created in Amazon QuickSight.

version: 2
exposures:
name: tickit_sales_summary
type: dashboard
maturity: medium
url: https://us-east-1.quicksight.aws.amazon.com
description: >
TICKIT sales summary dashboard, authored in Amazon QuickSight
depends_on:
ref('fct_sales')
ref('fct_listings')
owner:
name: Gary A. Stafford
email: gary.a.stafford@gmail.com
view raw dashboard.yml hosted with ❤ by GitHub

The exposure YAML file shown above describes the Amazon QuickSight dashboard shown below.

Amazon QuickSight dashboard

Exposures work with dbt’s auto-documentation feature. dbt populates a dedicated page in the auto-generated documentation site with context relevant to data consumers.

Project’s documentation website, showing dashboard exposure
Project’s documentation website, showing dashboard exposure’s lineage graph

Conclusion

In this post, we covered some of the basic functionality of dbt. We learned how dbt enables analysts to work more like software engineers. We also learned how dbt makes it easy to codify data models in SQL, to version control and manage data models as code with git, and collaborate on data models with other data team members.

Topics not explored in this post but critical to most large-scale dbt-managed production environments include advanced Jinja templating and macrosmodel freshnessorchestrationjob schedulingContinuous Integration and GitOpsnotificationsenvironment variables, and incremental models. We will explore these additional dbt capabilities in future posts.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , ,

1 Comment

Utilizing In-memory Data Caching to Enhance the Performance of Data Lake-based Applications

Significantly improve the performance and reduce the cost of data lake-based analytics applications using Amazon ElastiCache for Redis

Introduction

The recent post, Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena, demonstrated how to develop a Cloud-native analytics application using Spring Boot. The application queried data in an Amazon S3-based data lake via an AWS Glue Data Catalog utilizing the Amazon Athena API.

Securely exposing data in a data lake using RESTful APIs can fulfill many data-consumer needs. However, access to that data can be significantly slower than access from a database or data warehouse. For example, in the previous post, we imported the OpenAPI v3 specification from the Spring Boot service into Postman. The API specification contained approximately 17 endpoints.

Running a suite of integration tests against the Spring Boot service using Postman

From my local development laptop, the Postman API test run times for all service endpoints took an average of 32.4 seconds. The Spring Boot service was running three Kubernetes pod replicas on Amazon EKS in the AWS US East (N. Virginia) Region.

Sample of test results ran from service (no Redis cache)

Compare the data lake query result times to equivalent queries made against a minimally-sized Amazon RDS for PostgreSQL database instance containing the same data. The average run times for all PostgreSQL queries averaged 10.8 seconds from a similar Spring Boot service. Although not a precise benchmark, we can clearly see that access to the data in the Amazon S3-based data lake is substantially slower, approximately 3x slower, than that of the PostgreSQL database. Tuning the database would easily create an even greater disparity.

Sample of test results for queries ran from service against Athena vs. PostgreSQL (quicker is better)

Caching for Data Lakes

According to AWS, the speed and throughput of a database can be the most impactful factor for overall application performance. Consequently, in-memory data caching can be one of the most effective strategies to improve overall application performance and reduce database costs. This same caching strategy can be applied to analytics applications built atop data lakes, as this post will demonstrate.

High-level AWS architecture demonstrated in this post

In-memory Caching

According to Hazelcast, memory caching (aka in-memory caching), often simply referred to as caching, is a technique in which computer applications temporarily store data in a computer’s main memory (e.g., RAM) to enable fast retrievals of that data. The RAM used for the temporary storage is known as the cache. As an application tries to read data, typically from a data storage system like a database, it checks to see if the desired record already exists in the cache. If it does, the application will read the data from the cache, thus eliminating the slower access to the database. If the desired record is not in the cache, then the application reads the record from the source. When it retrieves that data, it writes it to the cache so that when the application needs that same data in the future, it can quickly retrieve it from the cache.

Further, according to Hazelcast, as an application tries to read data, typically from a data storage system like a database, it checks to see if the desired record already exists in the cache. If it does, the application will read the data from the cache, thus eliminating the slower access to the database. If the desired record is not in the cache, then the application reads the record from the source. When it retrieves that data, it writes it to the cache so that when the application needs that same data in the future, it can quickly retrieve it from the cache.

Redis In-memory Data Store

According to their website, Redis is the open-source, in-memory data store used by millions of developers as a database, cache, streaming engine, and message broker. Redis provides data structures such as stringshasheslistssetssorted sets with range queries, bitmapshyperloglogsgeospatial indexes, and streams. In addition, Redis has built-in replicationLua scriptingLRU evictiontransactions, and different levels of on-disk persistence and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.

Amazon ElastiCache for Redis

According to AWS, Amazon ElastiCache for Redis, the fully-managed version of Redis, according to AWS, is a blazing fast in-memory data store that provides sub-millisecond latency to power internet-scale real-time applications. Redis applications can work seamlessly with ElastiCache for Redis without any code changes. ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with manageability, security, and scalability from AWS. As a result, Redis is an excellent choice for implementing a highly available in-memory cache to decrease data access latency, increase throughput, and ease the load on relational and NoSQL databases.

ElastiCache Performance Results

In the following post, we will add in-memory caching to the Spring Boot service introduced in the previous post. In preliminary tests with Amazon ElastiCache for Redis, the Spring Boot service delivered a 34x improvement in average response times. For example, test runs with the best-case scenario of a Redis cache hit ratio of 100% averaged 0.95 seconds compared to 32.4 seconds without Redis.

Sample of uncached versus cached test results (quicker is better)

Source Code

All the source code and Docker and Kubernetes resources are open-source and available on GitHub.

git clone --depth 1 -b redis \
https://github.com/garystafford/athena-spring-app.git

In addition, a Docker image for the Redis-base Spring Boot service is available on Docker Hub. For this post, use the latest tag with the .redis suffix.

Spring Boot service images are available on Docker Hub

Code Changes

The following code changes are required to the Spring Boot service to implement Spring Boot Cache with Redis.

Gradle Build

The gradle.build file now implements two additional dependencies, Spring Boot’s spring-boot-starter-cache and spring-boot-starter-data-redis (lines 45–46).

plugins {
id 'org.springframework.boot' version '2.7.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
id 'io.freefair.lombok' version '6.5.0-rc1'
}
group = 'aws.example'
version = '2.0.0'
sourceCompatibility = '17'
def awsSdkVersion = '2.17.225'
def springBootVersion = '2.7.1'
def restAssuredVersion = '5.1.1'
repositories {
mavenCentral()
}
dependencies {
// aws sdk
runtimeOnly "software.amazon.awssdk:bom:${awsSdkVersion}"
implementation "software.amazon.awssdk:athena:${awsSdkVersion}"
// spring
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}"
developmentOnly "org.springframework.boot:spring-boot-devtools:${springBootVersion}"
implementation 'org.springdoc:springdoc-openapi-ui:1.6.9'
implementation 'org.springframework:spring-context:5.3.20'
// testings
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
testImplementation "io.rest-assured:rest-assured:${restAssuredVersion}"
testImplementation "io.rest-assured:json-path:${restAssuredVersion}"
testImplementation "io.rest-assured:xml-path:${restAssuredVersion}"
testImplementation "io.rest-assured:json-schema-validator:${restAssuredVersion}"
// monitoring
implementation 'io.micrometer:micrometer-registry-prometheus:1.9.1'
// spring caching with redis
implementation "org.springframework.boot:spring-boot-starter-cache:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-data-redis:${springBootVersion}"
}
tasks.named('test') {
useJUnitPlatform()
}
view raw build.gradle hosted with ❤ by GitHub

Application Properties

The application properties file, application.yml, has been modified for both the dev and prod Spring Profiles. The dev Spring Profile expects Redis to be running on localhost. Correspondingly, the project’s docker-compose.yml file now includes a Redis container for local development. The time-to-live (TTL) for all Redis caches is arbitrarily set to one minute for dev and five minutes for prod. To increase application performance and reduce the cost of querying the data lake using Athena, increase Redis’s TTL. Note that increasing the TTL will reduce data freshness.

spring:
profiles:
active: dev
server:
port: 8080
servlet:
contextPath: /v1
athena:
region: us-east-1
workgroup: primary
catalog: AwsDataCatalog
database: tickit_demo
limit: 25
client-execution-timeout: 100000
retry-sleep: 1000
results-bucket: ${RESULTS_BUCKET}
named-query-id: ${NAMED_QUERY_ID}
spring:
config:
activate:
on-profile: dev
redis:
host: localhost
port: 6379
cache:
redis:
time-to-live: 60000 # 1000 ms * 60 * 1 = 1 min
enable-statistics: true
server:
port: 8080
logging:
level:
root: DEBUG
management:
endpoints:
web:
exposure:
include: '*'
jmx:
exposure:
include: '*'
spring:
config:
activate:
on-profile: prod
redis:
host: ${REDIS_HOST}
port: 6379
cache:
redis:
time-to-live: 300000 # 1000 ms * 60 * 5 = 5 min
enable-statistics: true
logging:
level:
root: INFO
management:
endpoints:
web:
exposure:
include: health, prometheus
jmx:
exposure:
include: health
view raw application.yml hosted with ❤ by GitHub

Athena Application Class

The AthenaApplication class declaration is now decorated with Spring Framework’s EnableCaching annotation (line 22). Additionally, two new Beans have been added (lines 58–68). Spring Redis provides an implementation for the Spring cache abstraction through the org.springframework.data.redis.cache package. The RedisCacheManager cache manager creates caches by default upon the first write. The RedisCacheConfiguration cache configuration helps to customize RedisCache behavior such as caching null values, cache key prefixes, and binary serialization.

package com.example.athena;
import com.example.athena.common.PreparedStatement;
import com.example.athena.common.View;
import com.example.athena.config.ConfigProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.time.Duration;
@SpringBootApplication
@EnableConfigurationProperties(ConfigProperties.class)
@EnableCaching
public class AthenaApplication {
private final PreparedStatement preparedStatement;
private final View view;
@Autowired
public AthenaApplication(PreparedStatement preparedStatement, View view) {
this.preparedStatement = preparedStatement;
this.view = view;
}
public static void main(String[] args) {
SpringApplication.run(AthenaApplication.class, args);
}
@Bean
void CreatePreparedStatement() {
preparedStatement.CreatePreparedStatement();
}
@Bean
void createView() {
view.CreateView();
}
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**").allowedOrigins("*");
}
};
}
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {
return RedisCacheManager.create(connectionFactory);
}
@Bean
public RedisCacheConfiguration cacheConfiguration() {
return RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(5))
.disableCachingNullValues();
}
}

POJO Data Model Classes

Spring Boot Redis caching uses Java serialization and deserialization. Therefore, all the POJO data model classes must implement Serializable (line 14).

package com.example.athena.tickit.model.ecomm;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Listing implements Serializable {
private int id;
private int sellerId;
private int eventId;
private int dateId;
private int numTickets;
private BigDecimal pricePerTicket;
private LocalDateTime listTime;
}
view raw Listing.java hosted with ❤ by GitHub

Service Classes

Each public method in the Service classes is now decorated with Spring Framework’s Cachable annotation (lines 42 and 66). For example, the findById(int id) method in the CategoryServiceImp class is annotated with @Cacheable(value = "categories", key = "#id"). The method’s key parameter uses Spring Expression Language (SpEL) expression for computing the key dynamically. Default is null, meaning all method parameters are considered as a key unless a custom keyGenerator has been configured. If no value is found in the Redis cache for the computed key, the target method will be invoked, and the returned value will be stored in the associated cache.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.ecomm.Listing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class ListingServiceImp implements ListingService {
private static final Logger logger = LoggerFactory.getLogger(ListingServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public ListingServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
@Cacheable(value = "listings")
public List<Listing> findAll(Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE listid IS NOT NULL";
String query = String.format("""
SELECT *
FROM refined_tickit_public_listing
%s
ORDER BY listid
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}
@Cacheable(value = "listing", key = "#id")
public Listing findById(int id) {
String query = String.format("""
SELECT DISTINCT *
FROM refined_tickit_public_listing
WHERE listid=%s""", id);
Listing listing;
try {
listing = startQuery(query).get(0);
} catch (java.lang.IndexOutOfBoundsException e) {
logger.error(e.getMessage());
return null;
}
return listing;
}
private List<Listing> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<Listing> listings = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return listings;
}
private List<Listing> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<Listing> listings = new ArrayList<>();
try {
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Listing listing = new Listing();
listing.setId(parseInt(allData.get(0).varCharValue()));
listing.setSellerId(parseInt(allData.get(1).varCharValue()));
listing.setEventId(parseInt(allData.get(2).varCharValue()));
listing.setDateId(parseInt(allData.get(3).varCharValue()));
listing.setNumTickets(Integer.parseInt(allData.get(4).varCharValue()));
listing.setPricePerTicket(new BigDecimal(allData.get(5).varCharValue()));
listing.setListTime(LocalDateTime.parse(allData.get(6).varCharValue(), formatter));
listings.add(listing);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return listings;
}
}

Controller Classes

There are no changes required to the Controller classes.

Amazon ElastiCache for Redis

Multiple options are available for creating an Amazon ElastiCache for Redis cluster, including cluster mode, multi-AZ option, auto-failover option, node type, number of replicas, number of shards, replicas per shard, Availability Zone placements, and encryption at rest and encryption in transit options. The results in this post are based on a minimally-configured Redis version 6.2.6 cluster, with one shard, two cache.r6g.large nodes, and cluster mode, multi-AZ option, and auto-failover all disabled. In addition, encryption at rest and encryption in transit were also disabled. This cluster configuration is sufficient for development and testing, but not Production.

Amazon ElastiCache for Redis cluster used for this post
Amazon ElastiCache for Redis cluster monitoring console showing caching activity

Testing the Cache

To test Amazon ElastiCache for Redis, we will use Postman again with the imported OpenAPI v3 specification. With all data evicted from existing Redis caches, the first time the Postman tests run, they cause the service’s target methods to be invoked and the returned data stored in the associated caches.

Running a suite of integration tests against the Spring Boot service using Postman

To confirm this caching behavior, use the Redis CLI’s --scan option. To access the redis-cli, I deployed a single Redis pod to Amazon EKS. The first time the --scan command runs, we should get back an empty list of keys. After the first Postman test run, the same --scan command should return a list of cached keys.

List of cached keys in Redis

Use the Redis CLI’s MONITOR option to further confirm data is being cached, as indicated by the set command.

Athena query results being cached in Redis

After the initial caching of data, use the Redis CLI’s MONITOR option, again, to confirm the cache is being hit instead of calling the target methods, which would then invoke the Athena API. Rerunning the Postman tests, we should see get commands as opposed to set commands.

Monitoring cache hits in Redis

Lastly, to confirm the Spring Boot service is effectively using Redis to cache data, we can also check Amazon Athena’s Recent queries tab in the AWS Management Console. After repeated sequential test runs within the TTL window, we should only see one Athena query per endpoint.

Amazon Athena Recent queries tab in the AWS Management Console

Conclusion

In this brief follow-up to the recent post, Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena, we saw how to substantially increase data lake application performance using Amazon ElastiCache for Redis. Although this caching technique is often associated with databases, it can also be effectively applied to data lake-based applications, as demonstrated in the post.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author unless otherwise noted.

, , , , ,

Leave a comment

Monolith to Microservices: Refactoring Relational Databases

Exploring common patterns for refactoring relational database models as part of a microservices architecture

Introduction

There is no shortage of books, articles, tutorials, and presentations on migrating existing monolithic applications to microservices, nor designing new applications using a microservices architecture. It has been one of the most popular IT topics for the last several years. Unfortunately, monolithic architectures often have equally monolithic database models. As organizations evolve from monolithic to microservices architectures, refactoring the application’s database model is often overlooked or deprioritized. Similarly, as organizations develop new microservices-based applications, they frequently neglect to apply a similar strategy to their databases.

The following post will examine several basic patterns for refactoring relational databases for microservices-based applications.

Terminology

Monolithic Architecture

A monolithic architecture is “the traditional unified model for the design of a software program. Monolithic, in this context, means composed all in one piece.” (TechTarget). A monolithic application “has all or most of its functionality within a single process or container, and it’s componentized in internal layers or libraries” (Microsoft). A monolith is usually built, deployed, and upgraded as a single unit of code.

Microservices Architecture

A microservices architecture (aka microservices) refers to “an architectural style for developing applications. Microservices allow a large application to be separated into smaller independent parts, with each part having its own realm of responsibility” (Google Cloud).

According to microservices.io, the advantages of microservices include:

  • Highly maintainable and testable
  • Loosely coupled
  • Independently deployable
  • Organized around business capabilities
  • Owned by a small team
  • Enables rapid, frequent, and reliable delivery
  • Allows an organization to [more easily] evolve its technology stack

Database

A database is “an organized collection of structured information, or data, typically stored electronically in a computer system” (Oracle). There are many types of databases. The most common database engines include relational, NoSQL, key-value, document, in-memory, graph, time series, wide column, and ledger.

PostgreSQL

In this post, we will use PostgreSQL (aka Postgres), a popular open-source object-relational database. A relational database is “a collection of data items with pre-defined relationships between them. These items are organized as a set of tables with columns and rows. Tables are used to hold information about the objects to be represented in the database” (AWS).

Amazon RDS for PostgreSQL

We will use the fully managed Amazon RDS for PostgreSQL in this post. Amazon RDS makes it easy to set up, operate, and scale PostgreSQL deployments in the cloud. With Amazon RDS, you can deploy scalable PostgreSQL deployments in minutes with cost-efficient and resizable hardware capacity. In addition, Amazon RDS offers multiple versions of PostgreSQL, including the latest version used for this post, 14.2.

The patterns discussed here are not specific to Amazon RDS for PostgreSQL. There are many options for using PostgreSQL on the public cloud or within your private data center. Alternately, you could choose Amazon Aurora PostgreSQL-Compatible Edition, Google Cloud’s Cloud SQL for PostgreSQL, Microsoft’s Azure Database for PostgreSQLElephantSQL, or your own self-manage PostgreSQL deployed to bare metal servers, virtual machine (VM), or container.

Database Refactoring Patterns

There are many ways in which a relational database, such as PostgreSQL, can be refactored to optimize efficiency in microservices-based application architectures. As stated earlier, a database is an organized collection of structured data. Therefore, most refactoring patterns reorganize the data to optimize for an organization’s functional requirements, such as database access efficiency, performance, resilience, security, compliance, and manageability.

The basic building block of Amazon RDS is the DB instance, where you create your databases. You choose the engine-specific characteristics of the DB instance when you create it, such as storage capacity, CPU, memory, and EC2 instance type on which the database server runs. A single Amazon RDS database instance can contain multiple databases. Those databases contain numerous object types, including tables, views, functions, procedures, and types. Tables and other object types are organized into schemas. These hierarchal constructs — instances, databases, schemas, and tables — can be arranged in different ways depending on the requirements of the database data producers and consumers.

Basic relational database refactoring patterns

Sample Database

To demonstrate different patterns, we need data. Specifically, we need a database with data. Conveniently, due to the popularity of PostgreSQL, there are many available sample databases, including the Pagila database. I have used it in many previous articles and demonstrations. The Pagila database is available for download from several sources.

Database diagram showing the relations between Pagila’s tables

The Pagila database represents a DVD rental business. The database is well-built, small, and adheres to a third normal form (3NF) database schema design. The Pagila database has many objects, including 1 schema, 15 tables, 1 trigger, 7 views, 8 functions, 1 domain, 1 type, 1 aggregate, and 13 sequences. Pagila’s tables contain between 2 and 16K rows.

Pattern 1: Single Schema

Pattern 1: Single Schema is one of the most basic database patterns. There is one database instance containing a single database. That database has a single schema containing all tables and other database objects.

Pattern 1: Single Schema

As organizations begin to move from monolithic to microservices architectures, they often retain their monolithic database architecture for some time.

Beginning to decompose the monolith application

Frequently, the monolithic database’s data model is equally monolithic, lacking proper separation of concerns using simple database constructs such as schemas. The Pagila database is an example of this first pattern. The Pagila database has a single schema containing all database object types, including tables, functions, views, procedures, sequences, and triggers.

To create a copy of the Pagila database, we can use pg_restore to restore any of several publically available custom-format database archive files. If you already have the Pagila database running, simply create a copy with pg_dump.

# set postgres environment variables
# ** CHANGE ME **
export PGHOST="postgres1.abcxyzdef.us-east-1.rds.amazonaws.com"
export PGPORT=5432
export PGDATABASE="postgres"
export PGUSER="admin"
export PGPASSWORD="change_me!"
# create new v1 of pagila database
export PGDATABASE="postgres"
psql -c "CREATE DATABASE pagila_v1;"
# restore original version of pagila database
pg_restore -d pagila_v1 pagila.dump
# confirm pagila tables in public schema
export PGDATABASE="pagila_v1"
psql -c "\dt"
Create a new version of the Pagila database for Pattern 1

Below we see the table layout of the Pagila database, which contains the single, default public schema.

-----------+----------+--------+------------
Instance | Database | Schema | Table
-----------+----------+--------+------------
postgres1 | pagila | public | actor
postgres1 | pagila | public | address
postgres1 | pagila | public | category
postgres1 | pagila | public | city
postgres1 | pagila | public | country
postgres1 | pagila | public | customer
postgres1 | pagila | public | film
postgres1 | pagila | public | film_actor
postgres1 | pagila | public | film_category
postgres1 | pagila | public | inventory
postgres1 | pagila | public | language
postgres1 | pagila | public | payment
postgres1 | pagila | public | rental
postgres1 | pagila | public | staff
postgres1 | pagila | public | store

Using a single schema to house all tables, especially the public schema is generally considered poor database design. As a database grows in complexity, creating, organizing, managing, and securing dozens, hundreds, or thousands of database objects, including tables, within a single schema becomes impossible. For example, given a single schema, the only way to organize large numbers of database objects is by using lengthy and cryptic naming conventions.

Public Schema

According to the PostgreSQL docs, if tables or other object types are created without specifying a schema name, they are automatically assigned to the default public schema. Every new database contains a public schema. By default, users cannot access any objects in schemas they do not own. To allow that, the schema owner must grant the USAGE privilege on the schema. by default, everyone has CREATE and USAGE privileges on the schema public. These default privileges enable all users to connect to a given database to create objects in its public schema. Some usage patterns call for revoking that privilege, which is a compelling reason not to use the public schema as part of your database design.

Pattern 2: Multiple Schemas

Separating tables and other database objects into multiple schemas is an excellent first step to refactoring a database to support microservices. As application complexity and databases naturally grow over time, schemas to separate functionality by business subdomain or teams will benefit significantly.

According to the PostgreSQL docs, there are several reasons why one might want to use schemas:

  • To allow many users to use one database without interfering with each other.
  • To organize database objects into logical groups to make them more manageable.
  • Third-party applications can be put into separate schemas, so they do not collide with the names of other objects.

Schemas are analogous to directories at the operating system level, except schemas cannot be nested.

Pattern 2: Multiple Schemas

With Pattern 2, as an organization continues to decompose its monolithic application architecture to a microservices-based application, it could transition to a schema-per-microservice or similar level or organizational granularity.

Continuing to decompose the monolith into microservices

Applying Domain-driven Design Principles

Domain-driven design (DDD) is “a software design approach focusing on modeling software to match a domain according to input from that domain’s experts” (Wikipedia). Architects often apply DDD principles to decompose a monolithic application into microservices. For example, a microservice or set of related microservices might represent a Bounded Context. In DDD, a Bounded Context is “a description of a boundary, typically a subsystem or the work of a particular team, within which a particular model is defined and applicable.” (hackernoon.com). Examples of Bounded Context might include Sales, Shipping, and Support.

One technique to apply schemas when refactoring a database is to mirror the Bounded Contexts, which reflect the microservices. For each microservice or set of closely related microservices, there is a schema. Unfortunately, there is no absolute way to define the Bounded Contexts of a Domain, and henceforth, schemas to a database. It depends on many factors, including your application architecture, features, security requirements, and often an organization’s functional team structure.

Reviewing the purpose of each table in the Pagila database and their relationships to each other, we could infer Bounded Contexts, such as Films, Stores, Customers, and Sales. We can represent these Bounded Contexts as schemas within the database as a way to organize the data. The individual tables in a schema mirror DDD concepts, such as aggregates, entities, or value objects.

# dump v1 of pagila database
pg_dump -Fc -d pagila_v1 -f pagila_v1.dump
# create new v2 of pagila database
psql -c "CREATE DATABASE pagila_v2;"
# restore v1 of pagila database
pg_restore -d pagila_v2 pagila_v1.dump
# connect to new pagila database
export PGDATABASE="pagila_v2"
psql
Create a new version of the Pagila database for Pattern 2
wrap in transaction
BEGIN;
optional, should be set to public by default
SET search_path TO public;
create new schemas
CREATE SCHEMA common;
CREATE SCHEMA customers;
CREATE SCHEMA films;
CREATE SCHEMA sales;
CREATE SCHEMA staff;
CREATE SCHEMA stores;
common
ALTER TABLE address SET SCHEMA common;
ALTER TABLE city SET SCHEMA common;
ALTER TABLE country SET SCHEMA common;
customers
ALTER TABLE customer SET SCHEMA customers;
films
ALTER TABLE actor SET SCHEMA films;
ALTER TABLE category SET SCHEMA films;
ALTER TABLE film SET SCHEMA films;
ALTER TABLE language SET SCHEMA films;
ALTER TABLE film_actor SET SCHEMA films;
ALTER TABLE film_category SET SCHEMA films;
sales
ALTER TABLE payment SET SCHEMA sales;
ALTER TABLE rental SET SCHEMA sales;
staff
ALTER TABLE staff SET SCHEMA staff;
stores
ALTER TABLE store SET SCHEMA stores;
ALTER TABLE inventory SET SCHEMA stores;
COMMIT;
confirm all tables are removed from public schema
\dt
view raw pagila_v2.sql hosted with ❤ by GitHub
Add the new schemas and move tables and objects accordingly

As shown below, the tables of the Pagila database have been relocated into six new schemas: commoncustomersfilmssalesstaff, and stores. The common schema contains tables with address data references tables in several other schemas. There are now no tables left in the public schema. We will assume other database objects (e.g., functions, views, and triggers) have also been moved and modified if necessary to reflect new table locations.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | address
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store

By applying schemas, we align tables and other database objects to individual microservices or functional teams that own the microservices and the associated data. Schemas allow us to apply fine-grain access control over objects and data within the database more effectively.

Refactoring other Database Objects

Typically with psql, when moving tables across schemas using an ALTER TABLE...SET SCHEMA... SQL statement, objects such as database views will be updated to the table’s new location. For example, take Pagila’s sales_by_store view. Note the schemas have been automatically updated for multiple tables from their original location in the public schema. The view was also moved to the sales schema.

CREATE OR REPLACE VIEW sales.sales_by_store AS
SELECT (c.city || ','::text) || cy.country AS store,
(m.first_name || ' '::text) || m.last_name AS manager,
sum(p.amount) AS total_sales
FROM sales.payment p
JOIN sales.rental r ON p.rental_id = r.rental_id
JOIN stores.inventory i ON r.inventory_id = i.inventory_id
JOIN stores.store s ON i.store_id = s.store_id
JOIN common.address a ON s.address_id = a.address_id
JOIN common.city c ON a.city_id = c.city_id
JOIN common.country cy ON c.country_id = cy.country_id
JOIN staff.staff m ON s.manager_staff_id = m.staff_id
GROUP BY cy.country, c.city, s.store_id,
m.first_name, m.last_name
ORDER BY cy.country, c.city;
Pagila’s sales_by_store database view with new schema pattern

Splitting Table Data Across Multiple Schemas

When refactoring a database, you may have to split data by replicating table definitions across multiple schemas. Take, for example, Pagila’s address table, which contains the addresses of customers, staff, and stores. The customers.customerstores.staff, and stores.store all have foreign key relationships with the common.address table. The address table has a foreign key relationship with both the city and country tables. Thus for convenience, the addresscity, and country tables were all placed into the common schema in the example above.

Although, at first, storing all the addresses in a single table might appear to be sound database normalization, consider the risks of having the address table’s data exposed. The store addresses are not considered sensitive data. However, the home addresses of customers and staff are likely considered sensitive personally identifiable information (PII). Also, consider as an application evolves, you may have fields unique to one type of address that does not apply to other categories of addresses. The table definitions for a store’s address may be defined differently than the address of a customer. For example, we might choose to add a county column to the customers.address table for e-commerce tax purposes, or an on_site_parking boolean column to the stores.address table.

In the example below, a new staff schema was added. The address table definition was replicated in the customersstaff, and stores schemas. The assumption is that the mixed address data in the original table was distributed to the appropriate address tables. Note the way schemas help us avoid table name collisions.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store

To create the new customers.address table, we could use the following SQL statements. The statements to create the other two address tables are nearly identical.

wrap in transaction
BEGIN;
create new customers.address table
CREATE SEQUENCE IF NOT EXISTS customers.address_address_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 1;
ALTER SEQUENCE customers.address_address_id_seq
OWNER TO pagila_admin;
CREATE TABLE IF NOT EXISTS customers.address (
address_id integer DEFAULT nextval('address_address_id_seq'::regclass) NOT NULL PRIMARY KEY,
address text NOT NULL,
address2 text,
district text NOT NULL,
city_id smallint NOT NULL REFERENCES common.city ON UPDATE CASCADE ON DELETE RESTRICT,
postal_code text,
phone text NOT NULL,
last_update timestamp with time zone DEFAULT now() NOT NULL
);
ALTER TABLE customers.address
OWNER TO pagila_admin;
CREATE INDEX IF NOT EXISTS idx_fk_city_id ON customers.address(city_id);
CREATE TRIGGER last_updated
BEFORE UPDATE ON customers.address FOR EACH ROW
EXECUTE PROCEDURE last_updated();
COMMIT;
Creating new customers.address table and associated objects

Although we now have two additional tables with identical table definitions, we do not duplicate any data. We could use the following SQL statements to migrate unique address data into the appropriate tables and confirm the results.

wrap in transaction
BEGIN;
copy only customer addresses to new customers.address table
INSERT INTO customers.address
SELECT *
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM customers.customer
);
copy only staff addresses to new staff.address table
INSERT INTO staff.address
SELECT COUNT(*)
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM staff.staff
);
copy only store addresses to new stores.address table
INSERT INTO stores.address
SELECT *
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM stores.store
);
check for extraneous data in common.address before deleting
SELECT *
FROM common.address
WHERE common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM customers.customer)
AND common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM staff.staff)
AND common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM stores.store);
COMMIT;
Migrating unique address data into the appropriate tables

Lastly, alter the existing foreign key constraints to point to the new address tables. The SQL statements for the other two address tables are nearly identical.

wrap in transaction
BEGIN;
customers.customer
ALTER TABLE IF EXISTS customers.customer
DROP CONSTRAINT IF EXISTS customer_address_id_fkey;
ALTER TABLE IF EXISTS customers.customer
ADD CONSTRAINT customer_address_id_fkey FOREIGN KEY (address_id)
REFERENCES customers.address (address_id) MATCH SIMPLE
ON UPDATE CASCADE
ON DELETE RESTRICT;
COMMIT;
Updating the existing foreign key constraints

There is now a reduced risk of exposing sensitive customer or staff data when querying store addresses, and the three address entities can evolve independently. Individual functional teams separately responsible customersstaff, and stores, can own and manage just the data within their domain.

Before dropping the common.address tables, you would still need to modify the remaining database objects that have dependencies on this table, such as views and functions. For example, take Pagila’s sales_by_store view we saw previously. Note line 9, below, the schema of the address table has been updated from common.address to stores.address. The stores.address table only contains addresses of stores, not customers or staff.

CREATE OR REPLACE VIEW sales.sales_by_store AS
SELECT (c.city || ','::text) || cy.country AS store,
(m.first_name || ' '::text) || m.last_name AS manager,
sum(p.amount) AS total_sales
FROM sales.payment p
JOIN sales.rental r ON p.rental_id = r.rental_id
JOIN stores.inventory i ON r.inventory_id = i.inventory_id
JOIN stores.store s ON i.store_id = s.store_id
JOIN stores.address a ON s.address_id = a.address_id
JOIN common.city c ON a.city_id = c.city_id
JOIN common.country cy ON c.country_id = cy.country_id
JOIN staff.staff m ON s.manager_staff_id = m.staff_id
GROUP BY cy.country, c.city, s.store_id,
m.first_name, m.last_name
ORDER BY cy.country, c.city;
Pagila’s sales_by_store database view with the new schema pattern

Below, we see the final table structure for the Pagila database after refactoring. Tables have been loosely grouped together schema in the diagram.

Database diagram showing new table relationships

Pattern 3: Multiple Databases

Similar to how individual schemas allow us to organize tables and other database objects and provide better separation of concerns, we can use databases the same way. For example, we could choose to spread the Pagila data across more than one database within a single RDS database instance. Again, using DDD concepts, while schemas might represent Bounded Contexts, databases most closely align to Domains, which are “spheres of knowledge and activity where the application logic revolves” (hackernoon.com).

Pattern 3: Multiple Databases

With Pattern 3, as an organization continues to refine its microservices-based application architecture, it might find that multiple databases within the same database instance are advantageous to further separate and organize application data.

Moving from a single- to multi-database architecture

Let’s assume that the data in the films schema is owned and managed by a completely separate team who should never have access to sensitive data stored in the customersstores, and sales schemas. According to the PostgreSQL docs, database access permissions are managed using the concept of roles. Depending on how the role is set up, a role can be thought of as either a database user or a group of users.

To provide greater separation of concerns than just schemas, we can create a second, completely separate database within the same RDS database instance for data related to films. With two separate databases, it is easier to create and manage distinct roles and ensure access to customersstores, or sales data is only accessible to teams that need access.

# dump v2 of pagila database
pg_dump -Fc -d pagila_v2 -f pagila_v2.dump
# create 2 new v3 databases
export PGDATABASE="postgres"
psql << EOF
\x
CREATE DATABASE pagila_v3;
CREATE DATABASE products_v3;
EOF
# restore v2 of pagila database
pg_restore -d pagila_v3 pagila_v2.dump
pg_restore -d products_v3 -n films pagila_v2.dump
# connect to new pagila database
export PGDATABASE="pagila_v3"
psql
Create a new version of the Pagila and Products database for Pattern 3

Below, we see the new layout of tables now spread across two databases within the same RDS database instance. Two new tables, highlighted in bold, are explained below.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | film
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store
-----------+----------+-----------+---------------
postgres1 | products | films | actor
postgres1 | products | films | category
postgres1 | products | films | film
postgres1 | products | films | film_actor
postgres1 | products | films | film_category
postgres1 | products | films | language
postgres1 | products | films | outbox

Change Data Capture and the Outbox Pattern

Inserts, updates, and deletes of film data can be replicated between the two databases using several methods, including Change Data Capture (CDC) with the Outbox Pattern. CDC is “a pattern that enables database changes to be monitored and propagated to downstream systems” (RedHat). The Outbox Pattern uses the PostgreSQL database’s ability to perform an commit to two tables atomically using a transaction. Transactions bundles multiple steps into a single, all-or-nothing operation.

In this example, data is written to existing tables in the products.films schema (updated aggregate’s state) as well as a new products.films.outbox table (new domain events), wrapped in a transaction. Using CDC, the domain events from the products.films.outbox table are replicated to the pagila.films.film table. The replication of data between the two databases using CDC is also referred to as eventual consistency.

Change Data Capture (CDC) with the Outbox Pattern

In this example, films in the pagila.films.film and products.films.outbox tables are represented in a denormalized, aggregated view of a film instead of the original, normalized relational multi-table structure. The table definition of the new pagila.films.film table is very different than that of the original Pagila products.films.films table. A concept such as a film, represented as an aggregate or entity, can be common to multiple Bounded Contexts, yet have a different definition.

CREATE TABLE IF NOT EXISTS films.outbox
(
film_id integer NOT NULL,
title character varying(50) NOT NULL,
release_year smallint NOT NULL,
film_language character varying(20) NOT NULL,
rating character varying(5) COLLATE NOT NULL,
categories character varying(100) NOT NULL,
actors character varying NOT NULL,
rental_duration smallint NOT NULL,
length_minutes smallint NOT NULL,
replacement_cost numeric(5,2) NOT NULL,
rental_rate numeric(4,2) NOT NULL,
last_update timestamp with time zone NOT NULL DEFAULT now(),
CONSTRAINT outbox_pkey PRIMARY KEY (film_id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS films.outbox
OWNER to products_admin;
Example products.films.outbox table definition (similar for pagila.films.film)

Note the Confluent JDBC Source Connector  (io.confluent.connect.jdbc.JdbcSourceConnector) used here will not work with PostgreSQL arrays, which would be ideal for one-to-many categories and actors columns. Arrays can be converted to text using ::text or by building value-delimited strings using string_agg aggregate function.

PROCEDURE: films.insert_into_outbox(integer)
DROP PROCEDURE IF EXISTS films.insert_into_outbox(integer);
EXAMPLE: "CALL films.insert_into_outbox(100);"
CREATE OR REPLACE PROCEDURE films.insert_into_outbox(IN filmid integer)
LANGUAGE 'sql'
BEGIN ATOMIC
delete existing record
DELETE
FROM films.outbox
WHERE (outbox.film_id = insert_into_outbox.filmid);
insert new record
INSERT INTO films.outbox (film_id, title, release_year,
film_language, rating, categories,
actors, rental_duration, length_minutes,
replacement_cost, rental_rate)
SELECT f.film_id,
initcap(f.title) AS title,
f.release_year,
trim(BOTH FROM l.name) AS film_language,
f.rating,
(SELECT array
(SELECT c.name
FROM films.film_category AS fc
JOIN films.category AS c ON fc.category_id = c.category_id
WHERE film_id = f.film_id)::text AS categories),
(SELECT array
(SELECT initcap(concat(a.first_name, ' ', a.last_name)) AS actors
FROM films.film_actor AS fa
JOIN films.actor AS a ON fa.actor_id = a.actor_id
WHERE film_id = f.film_id)::text AS actor_array),
f.rental_duration,
f.length AS length_minutes,
f.replacement_cost,
f.rental_rate
FROM films.film f
JOIN films.language l ON f.language_id = l.language_id
WHERE (f.film_id = insert_into_outbox.filmid)
GROUP BY f.film_id, (trim(BOTH FROM l.name));
END;
ALTER PROCEDURE films.insert_into_outbox (integer)
OWNER TO products_admin;
An example query to insert data into the products.films.outbox table

Given this table definition, the resulting data would look as follows.

film_id title release_year film_language rating categories actor_array rental_duration length_minutes replacement_cost rental_rate
389 Gunfighter Mussolini 2006 English PG-13 {Sports} {"Audrey Olivier","Judy Dean","Scarlett Damon","Russell Close"} 3 127 9.99 2.99
581 Minority Kiss 2006 English G {Music} {"Vivien Basinger"} 4 59 16.99 0.99
598 Mosquito Armageddon 2006 English G {Sports} {"Goldie Brody","Kirk Jovovich","Nick Stallone","Reese West"} 6 57 22.99 0.99
943 Villain Desperate 2006 English PG-13 {Documentary} {"Dustin Tautou","Cary Mcconaughey"} 4 76 27.99 4.99
490 Jumanji Blade 2006 English G {New} {"Jennifer Davis","Bob Fawcett","Nick Stallone","Gary Phoenix","Mena Temple","Jim Mostel"} 4 121 13.99 2.99
243 Doors President 2006 English NC-17 {Animation} {"Karl Berry","Lucille Tracy","Natalie Hopkins","Christian Akroyd","Sylvester Dern","Gene Hopkins","Ed Mansfield","Kim Allen","Reese West"} 3 49 22.99 4.99
40 Army Flintstones 2006 English R {Documentary} {"Ed Chase","Cary Mcconaughey","Mae Hoffman","Gene Willis","Penelope Cronyn","Matthew Carrey","Russell Close"} 4 148 22.99 0.99
317 Fireball Philadelphia 2006 English PG {Comedy} {"Val Bolger","Jude Cruise","Adam Grant","James Pitt","Frances Tomei"} 4 148 25.99 0.99
17 Alone Trip 2006 English R {Music} {"Ed Chase","Karl Berry","Uma Wood","Woody Jolie","Spencer Depp","Chris Depp","Laurence Bullock","Renee Ball"} 3 82 14.99 0.99
195 Crowds Telemark 2006 English R {Sci-Fi} {"Matthew Johansson","Anne Cronyn","Jeff Silverstone","Matthew Carrey"} 3 112 16.99 4.99
Example of data in the pagila.films.film and products.films.outbox tables

The existing pagila.stores.inventory table has a foreign key constraint on the the pagila.films.film table. However, the films schema and associated tables have been migrated to the products database’s films schema. To overcome this challenge, we can:

  1. Create a new pagila.films.film table
  2. Continuously replicate data from the products database to the pagila.films.film table data using CDC (see below)
  3. Modify the pagila.stores.inventory table to take a dependency on the new film table
  4. Drop the duplicate tables and other objects from the pagila.films schema

Debezium and Confluent for CDC

There are several technology choices for performing CDC. For this post, I have used RedHat’s Debezium connector for PostgreSQL and Debezium Outbox Event Router, and Confluent’s JDBC Sink Connector. Below, we see a typical example of a Kafka Connect Source Connector using the Debezium connector for PostgreSQL and a Sink Connector using the Confluent JDBC Sink Connector. The Source Connector streams changes from the products logs, using PostgreSQL’s Write-Ahead Logging (WAL) feature, to an Apache Kafka topic. A corresponding Sink Connector streams the changes from the Kafka topic to the pagila database.

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres1.abcxyzdef.us-east-1.rds.amazonaws.com",
"database.port": "5432",
"database.user": "cdc_source_user",
"database.password": "change_me!",
"database.dbname": "products",
"database.server.name": "products",
"table.include.list": "films.outbox",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"slot.name": "debezium_source_connector"
}
Debezium connector for PostgreSQL example
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "products.films.outbox",
"connection.url": "jdbc:postgresql://postgres1.abcxyzdef.us-east-1.rds.amazonaws.com:5432/pagila?stringtype=unspecified",
"connection.user": "cdc_sink_user",
"connection.password": "change_me!",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "films.film",
"auto-evolve": "true",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "film_id",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
Confluent JDBC Sink Connector example

Pattern 4: Multiple Database Instances

At some point in the evolution of a microservices-based application, it might become advantageous to separate the data into multiple database instances using the same database engine. Although managing numerous database instances may require more resources, there are also advantages. Each database instance will have independent connection configurations, roles, and administrators. Each database instance could run different versions of the database engine, and each could be upgraded and maintained independently.

Pattern 4: Multiple Database Instances

With Pattern 4, as an organization continues to refine its application architecture, it might find that multiple database instances are beneficial to further separate and organize application data.

Moving from multiple databases to multiple DB instances

Below is one possible refactoring of the Pagila database, splitting the data between two database engines. The first database instance, postgres1, contains two databases, pagila and products. The second database instance, postgres2, contains a single database, products.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres2 | products | films | actor
postgres2 | products | films | category
postgres2 | products | films | film
postgres2 | products | films | film_actor
postgres2 | products | films | film_category
postgres2 | products | films | language

Data Replication with CDC

Note the films schema is duplicated between the two databases, shown above. Again, using the CDC allows us to keep the six postgres1.pagila.films tables in sync with the six  postgres2.produc