Archive for category Python

Unlocking the Potential of Generative AI for Synthetic Data Generation

Explore the capabilities and applications of generative AI to create realistic synthetic data for software development, analytics, and machine learning

Licensed image: Yurchanka Siarhei/Shutterstock
Licensed image: Yurchanka Siarhei/Shutterstock

Introduction

Generative AI refers to a class of artificial intelligence algorithms capable of generating new data similar to a given dataset. These algorithms learn the underlying patterns and relationships in the data and use this knowledge to create new data consistent with the original dataset. Generative AI is a rapidly evolving field that has the potential to revolutionize the way we generate and use data.

Generative AI can generate synthetic data based on patterns and relationships learned from actual data. This ability to generate synthetic data has numerous applications, from creating realistic virtual environments for training and simulation to generating new data for machine learning models. In this article, we will explore the capabilities of generative AI and its potential to generate synthetic data, both directly and indirectly, for software development, data analytics, and machine learning.

Common Forms of Synthetic Data

According to AltexSoft, in their article Synthetic Data for Machine Learning: its Nature, Types, and Ways of Generation, common forms of synthetic data include:

  1. Tabular data: This type of synthetic data is often used to generate datasets that resemble real-world data in terms of structure and statistical properties.
  2. Time series data: This type of synthetic data generates datasets that resemble real-world time series data. It is commonly used when real-world time series data is unavailable or too expensive.
  3. Image and video data: This synthetic data is used to generate realistic images and videos for training machine learning models or simulations.
  4. Text data: This synthetic data generates realistic text for natural language processing tasks or for generating training data for machine learning models.
  5. Sound data: This synthetic data generates realistic sound for training machine learning models or simulations.

Synthetic Tabular Data Types

Synthetic tabular data refers to artificially generated datasets that resemble real-world tabular data in terms of structure and statistical properties. Tabular data is organized into rows and columns, like tables or spreadsheets. Some specific types of synthetic tabular data include:

  1. Financial data: Synthetic datasets that resemble real-world financial data such as bank transactions, stock prices, or credit card information.
  2. Customer data: Synthetic datasets that resemble real-world customer data, such as purchase history, demographic information, or customer behavior.
  3. Medical data: Synthetic datasets that resemble real-world medical data, such as patient records, medical test results, or treatment history.
  4. Sensor data: Synthetic datasets that resemble real-world sensor data such as temperature readings, humidity levels, or air quality measurements.
  5. Sales data: Synthetic datasets that resemble real-world sales data, such as sales transactions, product information, or customer behavior.
  6. Inventory data: Synthetic datasets that resemble real-world inventory data, such as stock levels, product information, or supplier information.
  7. Marketing data: Synthetic datasets that resemble real-world marketing data, such as campaign performance, customer behavior, or market trends.
  8. Human resources data: Synthetic datasets that resemble real-world human resources data, such as employee records, performance evaluations, or salary information.

Challenges with Creating Synthetic Data

According to sources including Towards Data Science, enov8, and J.P. Morgan, there are several challenges in creating synthetic data, including:

  1. Technical difficulty: Properly modeling complex real-world behaviors such as synthetic data is challenging, given available technologies.
  2. Biased behavior: The flexible nature of synthetic data makes it prone to potentially biased results.
  3. Privacy concerns: Care must be taken to ensure synthetic data does not reveal sensitive information.
  4. Quality of the data model: If the quality of the data model is not high, wrong conclusions can be reached.
  5. Time and effort: Synthetic data generation requires time and effort.

Difficult Patterns and Behaviors to Model

Many patterns and behaviors can be challenging to model in synthetic data, for example:

  1. Rare events: If certain events rarely occur in the real world, generating synthetic data that accurately reflects their distribution can be difficult.
  2. Complex relationships: Synthetic data generators may struggle to capture complex relationships between variables, such as non-linear interactions, feedback loops, or conditional dependencies.
  3. Contextual variability: Contextual factors, such as time, location, or individual differences, can have a significant impact on the distribution of data. Modeling this variability accurately can be challenging.
  4. Outliers and anomalies: Synthetic data generators may be unable to generate outliers or anomalies that are realistic and representative of the real world.
  5. Dynamic data: If the data is dynamic and changes over time, it can be challenging to generate synthetic data that captures these changes accurately.
  6. Unobserved variables: Sometimes, variables may be necessary for understanding the data distribution but are not directly observable. These variables can be challenging to model in synthetic data.
  7. Data bias: If the real-world data is biased in some way, such as over-representation of certain groups or under-representation of others, it can be challenging to generate synthetic data that is unbiased and representative of the population.
  8. Time-dependent patterns: If the data exhibits time-dependent patterns, such as seasonality or trends, it can be challenging to generate synthetic data that accurately reflects them.
  9. Spatial patterns: If the data has a spatial component, such as location data or images, it can be challenging to generate synthetic data that captures the spatial patterns realistically.
  10. Data sparsity: If the data is sparse or incomplete, it can be difficult to generate synthetic data that accurately reflects the distribution of the entire dataset.
  11. Human behavior: If the data involves human behavior, such as in social science or behavioral economics, it can be challenging to model the complex and nuanced behaviors of individuals and groups.
  12. Sensitive or confidential information: In some cases, the data may contain sensitive or confidential information that cannot be shared, making it challenging to generate synthetic data that preserves privacy while accurately reflecting the underlying distribution.

Overall, many patterns can be challenging to model accurately in synthetic data. It often requires careful consideration of the specific data characteristics and the synthetic data generation techniques’ limitations.

Easily Modeled Patterns and Behaviors

There are many simple and well-understood patterns that can be easily modeled in synthetic data, for example:

  1. Randomness: If the data is purely random, it can be easily generated using a random number generator or other simple techniques.
  2. Gaussian distribution: If the data follows a Gaussian or normal distribution, it can be generated using a Gaussian random number generator.
  3. Uniform distribution: If the data follows a uniform distribution, it can be easily generated using a uniform random number generator.
  4. Linear relationships: If the data follows a linear relationship between variables, it can be modeled using simple linear regression techniques.
  5. Categorical variables: If the data consists of categorical variables, such as gender or occupation, it can be generated using a categorical distribution.
  6. Text data: If the data consists of text, it can be generated using natural language processing techniques, such as language models or text generation algorithms.
  7. Time series: If the data consists of time series data, such as stock prices or weather data, it can be generated using time series models, such as Autoregressive integrated moving average (ARIMA) or Long short-term memory (LSTM).
  8. Seasonality: If the data exhibits seasonal patterns, such as higher sales data during holiday periods, it can be generated using seasonal time series models.
  9. Proportions and percentages: If the data consists of proportions or percentages, such as product sales distribution across different regions, it can be generated using beta or Dirichlet distributions.
  10. Multivariate normal distribution: If the data follows a multivariate normal distribution, it can be generated using a multivariate Gaussian random number generator.
  11. Networks: If the data consists of network or graph data, such as social networks or transportation networks, it can be generated using network models, such as Erdos-Renyi or Barabasi-Albert models.
  12. Binary data: If the data consists of binary data, such as whether a customer churned, it can be generated using a Bernoulli distribution.
  13. Geospatial data: If it involves geospatial data, such as the location of points of interest, it can be easily using geospatial models, such as point processes or spatial point patterns.
  14. Customer behaviors: If the data involves customer behaviors, such as browsing or purchase histories, it can be generated using customer journey models, such as Markov models.

In general, simple and well-understood patterns can be easily modeled using synthetic data techniques. In contrast, more complex and nuanced patterns may require more sophisticated modeling techniques and a deeper understanding of the underlying data characteristics.

Creating Synthetic Data with Generative AI

We can use many popular generative AI-powered tools to create synthetic data for testing applications, constructing analytics pipelines, and building machine learning models. Tools include OpenAI ChatGPT, Microsoft’s all-new Bing Chat, ChatSonic, Tabnine, GitHub Copilot, and Amazon CodeWhisperer. For more information on these tools, check out my recent blog post:

Accelerating Development with Generative AI-Powered Coding Tools
Explore six popular generative AI-powered tools, including ChatGPT, Copilot, CodeWhisperer, Tabnine, Bing, and…garystafford.medium.com

Let’s start with a simple example of generating synthetic sales data. Suppose we have created a new sales forecasting application for coffee shops that we need to test using synthetic data. We might start by prompting a generative AI tool like OpenAI’s ChatGPT for some data:

Create a CSV file with 25 random sales records for a coffee shop.
Each record should include the following fields:
- id (incrementing integer starting at 1)
- date (random date between 1/1/2022 and 12/31/2022)
- time (random time between 6:00am and 9:00pm in 1-minute increments)
- product_id (incrementing integer starting at 1)
- product
- calories
- price in USD
- type (drink or food)
- quantity (random integer between 1 and 3)
- amount (price * quantity)
- payment type (cash, credit, debit, or gift card)

The content and structure of a prompt can vary, and this can strongly influence ChatGPT’s response. Based on the above prompt, the results were accurate but not very useful for testing our application in this format. ChatGPT cannot create a physical CSV file. Furthermore, ChatGPT’s response length is limited; only about twenty records were returned. According to ChatGPT, in general, ChatGPT can generate responses of up to 2,048 tokens, the maximum output length allowed by the GPT-3 model.

OpenAI ChatGPT UI

Instead of outputting the actual synthetic data, we could ask ChatGPT to write a program that can, in turn, generate the synthetic data. This option is certainly more scalable. Let’s prompt ChatGPT to write a Python program to generate synthetic sales data with the same characteristics as before:

Create a Python3 program to generate 100 sales of common items 
sold in a coffee shop. The data should be written to a CSV file
and include a header row. Each record should include the following fields:
- id (incrementing integer starting at 1)
- date (random date between 1/1/2022 and 12/31/2022)
- time (random time between 6:00am and 9:00pm in 1-minute increments)
- product_id (incrementing integer starting at 1)
- product
- calories
- price in USD
- type (drink or food)
- quantity (random integer between 1 and 3)
- amount (price * quantity)
- payment type (cash, credit, debit, or gift card)

Using a single concise prompt, ChatGPT generated a complete Python program, including code comments, to generate synthetic sales data. Unfortunately, given ChatGPT’s response size limitation, the coffee shop menu was limited to just six items. Reprompting for more items would result in the truncation of the output and, thus, the program, making it unrunnable. Instead, we could use an additional prompt to generate a longer Python list of menu items and combine the two pieces of code in our IDE. Regardless, we will still need to copy and paste the code into our IDE to review, debug, test, and run.

OpenAI ChatGPT UI

ChatGPT’s Python program, copied and pasted into VS Code, ran without modifications, and wrote 100 synthetic sales records to a CSV file!

Running ChatGPT’s Python program in VS Code

Using IDE-based Generative AI Tools

Although generating synthetic data directly or snippets of code in chat-based generative AI tools are helpful for limited use cases, writing code in IDE gives us several advantages:

  1. Code does not need to be copied and pasted from external sources into an IDE
  2. Consecutive lines of code, method, and block code completion overcome the single response size limits of chat-based tools like OpenAI ChatGPT
  3. Code can be reused and adapted to evolving use cases over time
  4. Python interpreter and debugger or equivalent for other languages
  5. Automatic code formatting, linting, and code style enforcement
  6. Unit, integration, and functional testing
  7. Static code analysis (SCA)
  8. Vulnerability scanning
  9. IntelliSense for code completion
  10. Source code management (SCM) / version control

Let’s use the same techniques we used with ChatGPT, but from within an IDE to generate three types of synthetic data. We will choose Microsoft’s VS Code with GitHub Copilot and Python as our programming language.

VS Code IDE with GitHub Copilot Nightly extension installed

Source Code

All the code examples shown in this post can be found on GitHub.

Example #1: Coffee Shop Sales Data

First, we will start by outlining the program’s objective using code comments on the top of our Python file. This detailed context helps us to clearly express our goal and enables Copilot to generate an accurate response.

# Write a program that creates synthetic sales data for a coffee shop.
# The program should accept a command line argument that specifies the number of records to generate.
# The program should write the sales data to a file called 'coffee_shop_sales_data.csv'.
# The program should contain the following functions:
# - main() function that calls the other functions
# - function that returns one random product from a list of dictionaries
# - function that returns a dictionary containing one sales record
# - function that writes the sales records to a file

Following the import statements also generated with the assistance of Copilot, we will write the first function to return a random product from a list of 25 products. Again, we will use code comments as a prompt to generate the code. Copilot was able to generate 100% of the function’s code from the comments.

# Write a function to create list of dictionaries.
# The list of dictionaries should contain 15 drink items and 10 food items sold in a coffee shop.
# Include the product id, product name, calories, price, and type (Food or Drink).
# Capilize the first letter of each product name.
# Return a random item from the list of dictionaries.

Below is an example of Copilot’s ability to generate complete lines of code. Ultimately, it generated 100% of the function including choosing the items sold in a coffee shop, with a reasonable price and caloric count. Copilot is not limited to just understanding code.

Copilot can generate entire lines of code

Next, we will write a function to return a random sales record.

# Write a function to return a random sales record.
# The record should be a dictionary with the following fields:
# - id (an incrementing integer starting at 1)
# - date (a random date between 1/1/2022 and 12/31/2022)
# - time (a random time between 6:00am and 9:00pm in 1 minute increments)
# - product_id, product, calories, price, and type (from the get_product function)
# - quantity (a random integer between 1 and 3)
# - amount (price * quantity)
# - payment type (Cash, Credit, Debit, Gift Card, Apple Pay, Google Pay, or Venmo)

Again, Copilot generated 100% of the function’s code as a single block based on the code comments.

Copilot can generate entire blocks of code or methods

Lastly, we will create a function to write the sales data into a CSV file using Copilot’s help.

# Write a function to write the sales records to a CSV file called 'coffee_shop_sales.csv'.
# Use an input parameter to specify the number of records to write.
# The CSV file must have a header row and be comma delimited.
# All string values must be enclosed in double quotes.

Again below, we see an example of Copilot’s ability to generate an entire Python function. I needed to correct a few problems with the generated code. First, there was a lack of quotes for string values, which I added to the function (quotechar='"', quoting=csv.QUOTE_NONNUMERIC). Also, the function was missing a key line of code, sale = get_sales_record(), which would have caused the code to fail. Remember, just because the code was generated does not mean it is correct.

Copilot can generate entire blocks of code or methods

Here is the complete program that creates synthetic sales data for a coffee shop with Copilot assistance:

# Purpose: Generate coffee shop sales data
# Author: Gary A. Stafford and GitHub Copilot
# Date: 2023-04-12
# Usage: python3 coffee_shop_data_gen.py 100
# Command-line argument(s): rec_count (number of records to generate as an integer)
# Write a program that creates synthetic sales data for a coffee shop.
# The program should accept a command line argument that specifies the number of records to generate.
# The program should write the sales data to a file called 'coffee_shop_sales_data.csv'.
# The program should contain the following functions:
# – main() function that calls the other functions
# – function that returns one random product from a list of dictionaries
# – function that returns a dictionary containing one sales record
# – function that writes the sales records to a file
import argparse
import csv
import hashlib
import random
from datetime import datetime, timedelta
def main():
# create a parser object
parser = argparse.ArgumentParser(
description="Generate coffee shop sales data")
# add a command line argument to specify the number of records to generate
parser.add_argument("num_recs",
type=int,
help="The number of records to generate",
default=100)
num_recs = parser.parse_args().num_recs
write_data(num_recs)
# Write a function to create list of dictionaries.
# The list of dictionaries should contain 15 drink items and 10 food items sold in a coffee shop.
# Include the product id, product name, calories, price, and type (Food or Drink).
# Capilize the first letter of each product name.
# Return a random item from the list of dictionaries.
def get_product():
products = [
{"id": 1, "product": "Latte", "calories": 120, "price": 3.50, "type": "Drink"},
{"id": 2, "product": "Cappuccino", "calories": 100, "price": 3.00, "type": "Drink"},
{"id": 3, "product": "Americano", "calories": 5, "price": 2.50, "type": "Drink"},
{"id": 4, "product": "Espresso", "calories": 10, "price": 2.00, "type": "Drink"},
{"id": 5, "product": "Mocha", "calories": 250, "price": 4.00, "type": "Drink"},
{"id": 6, "product": "Iced Coffee", "calories": 80, "price": 2.50, "type": "Drink"},
{"id": 7, "product": "Hot Chocolate", "calories": 300, "price": 3.50, "type": "Drink"},
{"id": 8, "product": "Tea", "calories": 0, "price": 2.00, "type": "Drink"},
{"id": 9, "product": "Frappe", "calories": 450, "price": 5.00, "type": "Drink"},
{"id": 10, "product": "Smoothie", "calories": 200, "price": 4.00, "type": "Drink"},
{"id": 11, "product": "Iced Tea", "calories": 0, "price": 2.50, "type": "Drink"},
{"id": 12, "product": "Lemonade", "calories": 120, "price": 3.00, "type": "Drink"},
{"id": 13, "product": "Hot Tea", "calories": 0, "price": 2.00, "type": "Drink"},
{"id": 14, "product": "Chai Tea", "calories": 200, "price": 3.50, "type": "Drink"},
{"id": 15, "product": "Iced Chai", "calories": 250, "price": 4.00, "type": "Drink"},
{"id": 16, "product": "Croissant", "calories": 231, "price": 2.99, "type": "Food"},
{"id": 17, "product": "Bagel", "calories": 289, "price": 3.49, "type": "Food"},
{"id": 18, "product": "Muffin", "calories": 426, "price": 3.99, "type": "Food"},
{"id": 19, "product": "Sandwich", "calories": 512, "price": 6.99, "type": "Food"},
{"id": 20, "product": "Wrap", "calories": 388, "price": 5.99, "type": "Food"},
{"id": 21, "product": "Salad", "calories": 231, "price": 7.99, "type": "Food"},
{"id": 22, "product": "Quiche", "calories": 456, "price": 4.99, "type": "Food"},
{"id": 23, "product": "Scone", "calories": 335, "price": 2.49, "type": "Food"},
{"id": 24, "product": "Pastry", "calories": 397, "price": 3.99, "type": "Food"},
{"id": 25, "product": "Cake", "calories": 512, "price": 5.99, "type": "Food"},
]
# return one random item from list of dictionaries
return random.choice(products)
# Write a function to return a random sales record.
# The record should be a dictionary with the following fields:
# – transaction_id (a hash of the date, time, and product_id)
# – date (a random date between 1/1/2022 and 12/31/2022)
# – time (a random time between 6:00am and 9:00pm in 1 minute increments)
# – product_id, product, calories, price, and type (from the get_product function)
# – quantity (a random integer between 1 and 3)
# – amount (price * quantity)
# – payment type (Cash, Credit, Debit, Gift Card, Apple Pay, Google Pay, or Venmo)
def get_sales_record():
# get a random product
product = get_product()
# get a random date between 1/1/2022 and 12/31/2022
start_date = datetime(2022, 1, 1)
end_date = datetime(2022, 12, 31)
random_date = start_date + timedelta(
# Get a random number of seconds between 0 and the number of seconds between start_date and end_date
seconds=random.randint(0, int(
(end_date start_date).total_seconds())), )
# get a random time between 6:00am and 9:00pm
start_time = datetime.strptime("6:00am", "%I:%M%p")
end_time = datetime.strptime("9:00pm", "%I:%M%p")
random_time = start_time + timedelta(
# Get a random number of seconds between 0 and the number of seconds between start_time and end_time
seconds=random.randint(0, int(
(end_time start_time).total_seconds())), )
# get a random quantity between 1 and 3
random_quantity = random.randint(1, 3)
# get a random payment type:
# Cash, Credit, Debit, Gift card, Apple Pay, Google Pay, Venmo
random_payment_type = random.choice(
["Cash", "Credit", "Debit", "Gift card", "Apple Pay", "Google Pay", "Venmo"])
sales_record = {
"date": random_date.strftime("%m/%d/%Y"),
"time": random_time.strftime("%H:%M:%S"),
"product_id": product["id"],
"product": product["product"],
"calories": product["calories"],
"price": product["price"],
"type": product["type"],
"quantity": random_quantity,
"amount": product["price"] * random_quantity,
"payment_type": random_payment_type,
}
return sales_record
# Write a function to write the sales records to a CSV file called 'coffee_shop_sales_data.csv'.
# Use an input parameter to specify the number of records to write.
# Call the get_sales_record function once for each record to write.
# The CSV file must have a header row and be comma delimited.
# All string values must be enclosed in double quotes.
def write_data(rec_count):
# open the file for writing
with open("output/coffee_shop_sales_data.csv", "w", newline="") as csv_file:
# create a csv writer object
csv_writer = csv.writer(csv_file,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_NONNUMERIC)
# write the header row
# id,date,time,product_id,product,calories,price,type,quantity,amount,payment_type
csv_writer.writerow([
"transaction_id",
"date",
"time",
"product_id",
"product",
"calories",
"price",
"type",
"quantity",
"amount",
"payment_type",
])
# write the sales records
for i in range(rec_count):
sale = get_sales_record()
transaction_id = hashlib.md5((f'{sale["date"]} {sale["time"]} {sale["product_id"]}').encode()).hexdigest()
csv_writer.writerow([
transaction_id,
sale["date"],
sale["time"],
sale["product_id"],
sale["product"],
sale["calories"],
sale["price"],
sale["type"],
sale["quantity"],
sale["amount"],
sale["payment_type"],
])
if __name__ == "__main__":
main()

Copilot generated an astounding 80–85% of the program’s final code. The initial program took 10–15 minutes to write using code comments. I then added a few new features, including the ability to pass in the record count on the command line and the hash-based transaction id, which took another 5 minutes. Finally, I used GitHub Code Brushes to optimize the code and generate the Python docstrings, and Black Formatter and Flake8 extensions to format and lint, all of which took less than 5 minutes. With testing and debugging, the total time was about 25–30 minutes.

The most significant difference with Copilot was that I never had to leave the IDE to look up code references or find existing sales datasets or even a coffee shop menu to duplicate. The code, as well as the list of products, price, calories, and product type, were all generated by Copilot.

To make this example more realistic, you could use Copilot’s assistance to write algorithms capable of reflecting daily, weekly, and seasonal variations in product choice and sales volumes. This might include simulating increased sales during the busy morning rush hour or a preference for iced drinks in the summer months versus hot drinks during the winter months.

Here is an example of the synthetic sales data output by the example application:

transaction_id date time product_id product calories price type quantity amount payment_type
47a157f84e727fe3335db1519ee736a6 06/27/2022 19:59:42 22 Quiche 456 4.99 Food 2 9.98 Debit
1bf01013e699ca0f804650ea50826c82 11/20/2022 06:21:14 22 Quiche 456 4.99 Food 3 14.97 Cash
84f41c15749090d1e79bf9a48a58d6c3 08/18/2022 11:50:22 14 Chai Tea 200 3.5 Drink 2 7.0 Apple Pay
ef1845b8438bf3b5b99d2f4891a48f03 11/13/2022 17:20:51 12 Lemonade 120 3.0 Drink 2 6.0 Debit
9863de11be3099d6361392584e30e624 06/03/2022 18:27:03 18 Muffin 426 3.99 Food 2 7.98 Gift card
f50ed8878250bc06f66b97f5cd2f6df7 02/21/2022 17:02:18 7 Hot Chocolate 300 3.5 Drink 2 7.0 Credit
1903169473f41a0275ee702f2c6b1dd6 05/24/2022 14:58:25 10 Smoothie 200 4.0 Drink 3 12.0 Venmo
164a9519fd3db952e721e9f55dc1be74 01/07/2022 14:19:35 14 Chai Tea 200 3.5 Drink 2 7.0 Debit
dc85a202143de48ad4646190cdc0bf5c 01/28/2022 08:52:38 20 Wrap 388 5.99 Food 1 5.99 Venmo
7d182be793ab4b3290d14968e9c9a3e3 02/20/2022 10:16:10 16 Croissant 231 2.99 Food 2 5.98 Google Pay
f524811f456481f5a4a2f0c09dcafa28 10/03/2022 16:19:35 10 Smoothie 200 4.0 Drink 3 12.0 Debit
f2f63eacd33cb9f13849b08638e6fc3d 07/06/2022 16:14:20 10 Smoothie 200 4.0 Drink 3 12.0 Cash
7435e8cc8771a8b538529ff6f873cc05 04/12/2022 16:35:51 14 Chai Tea 200 3.5 Drink 3 10.5 Venmo
0e1154abb6a79b9ece570d51e3116846 11/28/2022 14:44:05 21 Salad 231 7.99 Food 2 15.98 Debit
d5926d58011a2a25798bf2aa72552cf0 04/19/2022 16:06:09 22 Quiche 456 4.99 Food 1 4.99 Venmo

Example #2: Residential Address Data

We could use these same techniques to generate a list of residential addresses. To start, we can prompt Copilot for the values in a list of common street names and street types in the United States:

# Write a function that creates a list of common street names
# in the United States, in alphabetical order.
# List should be in alphabetical order. Each name should be unique.
# Return a random street name.
def get_street_name():
street_names = [
"Ash", "Bend", "Bluff", "Branch", "Bridge", "Broadway", "Brook", "Burg",
"Bury", "Canyon", "Cape", "Cedar", "Cove", "Creek", "Crest", "Crossing",
"Dale", "Dam", "Divide", "Downs", "Elm", "Estates", "Falls", "Fifth",
"First", "Fork", "Fourth", "Glen", "Green", "Grove", "Harbor", "Heights",
"Hickory", "Hill", "Hollow", "Island", "Isle", "Knoll", "Lake", "Landing",
...
]

return random.choice(street_names)


# Write a function that creates a list of common street types
# in the United States, in alphabetical order.
# List should be in alphabetical order. Each name should be unique.
# Return a random street type.
def get_street_type():
street_types = [
"Alley", "Avenue", "Bend", "Bluff", "Boulevard", "Branch", "Bridge", "Brook",
"Burg", "Circle", "Commons", "Court", "Drive", "Highway", "Lane", "Parkway",
"Place", "Road", "Square", "Street", "Terrace", "Trail", "Way"
]

return random.choice(street_types)

Next, we can create a function that returns a property type based on a categorical distribution of common residential property types with the prompt:

# Write a function to return a random property type.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 63% Single-family, 26% Multi-family, 4% Condo,
# 3% Townhouse, 2% Mobile home, 1% Farm, 1% Other.

Again, Copilot generated 100% of the function’s code as a single block based on the code comments.

Copilot generating a categorical distribution of common residential property types

Additionally, we could have Copilot help us generate a list of the 50 largest cities in the United States with state, zip code, and population, with the prompt:

# Write a function to returns the 50 largest cities in the United States.
# List should be sorted in descending order by population.
# Include the city, state abbreviation, zip code, and population.
# Return a list of dictionaries.

Once again, Copilot generated 100% of the function’s code using a combination of single lines and code blocks based on the code comments.

Copilot generating a block of US cities with state, zip code, and populations

When randomly choosing a city, we can use a categorical distribution of populations of all the cities to control the distribution of cities in the final synthetic dataset. For example, there will be more addresses in larger cities like New York City or Los Angeles than in smaller cities like Buffalo or Virginia Beach, with the prompt:

# Write a function that calculates the total population of the list of cities.
# Add a 'pcnt_of_total_population' and 'pcnt_running_total' columns to list.
# Returns a sorted list of cities by population.

Here is the complete program that creates synthetic US-based address data with Copilot assistance:

# Purpose: Generate US residential address data
# Author: Gary A. Stafford and GitHub Copilot
# Date: 2023-04-13
# Usage: python3 residential_address_data_gen.py 100
# Command-line argument(s): rec_count (number of records to generate as an integer)
# Write an application that create a random list of united states addresses.
# The application should accept a command line argument that specifies the number of records to generate.
# Include address, city, state, zip code, country, and property type.
# Write the data to a csv file named 'address_data.csv'.
# The application should contain the following functions:
# – main() function that calls the other functions
# – function that returns a list of common street names in the United States
# – function that returns a list of common street types in the United States
# – function that returns a list of common city, state, zip code, and population in the United States
# – function that returns a property type
import csv
import random
import argparse
cities_final = []
def main():
parser = argparse.ArgumentParser(description="Generate coffee shop sales data")
parser.add_argument(
"rec_count", type=int, help="The number of records to generate", default=100
)
# add population calculations to the city data
cities = get_cities()
prepare_cities(cities)
rec_count = parser.parse_args().rec_count
write_data(rec_count)
# Write a function that creates a list of common street names
# in the United States, in alphabetical order.
# Each one should be unique.
# Return a random street name.
def get_street_name():
street_names = [
"Ash", "Bend", "Bluff", "Branch", "Bridge", "Broadway", "Brook", "Burg",
"Bury", "Canyon", "Cape", "Cedar", "Cove", "Creek", "Crest", "Crossing",
"Dale", "Dam", "Divide", "Downs", "Elm", "Estates", "Falls", "Fifth",
"First", "Fork", "Fourth", "Glen", "Green", "Grove", "Harbor", "Heights",
"Hickory", "Hill", "Hollow", "Island", "Isle", "Knoll", "Lake", "Landing",
"Lawn", "Main", "Manor", "Maple", "Meadow", "Meadows", "Mill", "Mills",
"Mission", "Mount", "Mountain", "Oak", "Oaks", "Orchard", "Park", "Parkway",
"Pass", "Path", "Pike", "Pine", "Place", "Plain", "Plains", "Port", "Prairie",
"Ridge", "River", "Road", "Rock", "Rocks", "Second", "Seventh", "Shoals",
"Shore", "Shores", "Sixth", "Skyway", "Spring", "Springs", "Spur", "Station",
"Summit", "Sunset", "Terrace", "Third", "Trace", "Track", "Trail", "Tunnel",
"Turnpike", "Vale", "Valley", "View", "Village", "Ville", "Vista", "Walk",
"Way", "Well", "Wells", "Wood", "Woods", "Worth"
]
return random.choice(street_names)
# Write a function that creates a list of common street types
# in the United States, in alphabetical order.
# Each one should be unique.
# Return a random street type.
def get_street_type():
street_types = [
"Alley", "Avenue", "Bend", "Bluff", "Boulevard", "Branch", "Bridge", "Brook",
"Burg", "Circle", "Commons", "Court", "Drive", "Highway", "Lane", "Parkway",
"Place", "Road", "Square", "Street", "Terrace", "Trail", "Way"
]
return random.choice(street_types)
# Write a function that calculates the total population of the list of cities.
# Add a 'pcnt_of_total_population' and 'pcnt_running_total' columns to list.
# Returns a sorted list of cities by population.
def prepare_cities(cities):
total_population = 0 # 51,035,885
for city in cities:
total_population += city["population"]
for city in cities:
city["pcnt_of_total_population"] = city["population"] / total_population
global cities_final
cities_final = sorted(cities, key=lambda d: d["population"], reverse=True)
running_total = 1
for city in cities_final:
running_total -= city["pcnt_of_total_population"]
city["pcnt_running_total"] = running_total
# Write a function to returns the 50 largest cities in the United States.
# Include the city, state abbreviation, zip code, and population.
# List should be sorted in descending order by population.
# Return a list of dictionaries.
def get_cities():
cities = [
{"city": "Albuquerque", "state": "NM", "zip": "87102", "population": 559277},
{"city": "Anaheim", "state": "CA", "zip": "92801", "population": 345012},
{"city": "Anchorage", "state": "AK", "zip": "99501", "population": 291826},
{"city": "Arlington", "state": "TX", "zip": "76010", "population": 398121},
{"city": "Atlanta", "state": "GA", "zip": "30303", "population": 486290},
{"city": "Aurora", "state": "CO", "zip": "80010", "population": 325078},
{"city": "Austin", "state": "TX", "zip": "78701", "population": 931830},
{"city": "Bakersfield", "state": "CA", "zip": "93301", "population": 372576},
{"city": "Baltimore", "state": "MD", "zip": "21202", "population": 602495},
{"city": "Boston", "state": "MA", "zip": "02108", "population": 667137},
{"city": "Buffalo", "state": "NY", "zip": "14202", "population": 258959},
{"city": "Charlotte", "state": "NC", "zip": "28202", "population": 872498},
{"city": "Chicago", "state": "IL", "zip": "60602", "population": 2695598},
{"city": "Cincinnati", "state": "OH", "zip": "45202", "population": 296943},
{"city": "Cleveland", "state": "OH", "zip": "44113", "population": 390113},
{"city": "Colorado Springs", "state": "CO", "zip": "80903", "population": 456568},
{"city": "Columbus", "state": "OH", "zip": "43215", "population": 822553},
{"city": "Dallas", "state": "TX", "zip": "75201", "population": 1345047},
{"city": "Denver", "state": "CO", "zip": "80202", "population": 682545},
{"city": "Detroit", "state": "MI", "zip": "48226", "population": 672662},
{"city": "El Paso", "state": "TX", "zip": "79901", "population": 674433},
{"city": "Fort Worth", "state": "TX", "zip": "76102", "population": 792727},
{"city": "Fresno", "state": "CA", "zip": "93721", "population": 509924},
{"city": "Houston", "state": "TX", "zip": "77002", "population": 2296224},
{"city": "Indianapolis", "state": "IN", "zip": "46204", "population": 843393},
{"city": "Jacksonville", "state": "FL", "zip": "32202", "population": 842583},
{"city": "Kansas City", "state": "MO", "zip": "64102", "population": 467007},
{"city": "Las Vegas", "state": "NV", "zip": "89101", "population": 603488},
{"city": "Long Beach", "state": "CA", "zip": "90802", "population": 462257},
{"city": "Los Angeles", "state": "CA", "zip": "90001", "population": 3971883},
{"city": "Louisville", "state": "KY", "zip": "40202", "population": 609893},
{"city": "Memphis", "state": "TN", "zip": "38103", "population": 653450},
{"city": "Mesa", "state": "AZ", "zip": "85201", "population": 508958},
{"city": "Miami", "state": "FL", "zip": "33128", "population": 463347},
{"city": "Milwaukee", "state": "WI", "zip": "53202", "population": 594833},
{"city": "Minneapolis", "state": "MN", "zip": "55402", "population": 410939},
{"city": "Nashville", "state": "TN", "zip": "37203", "population": 654610},
{"city": "New York", "state": "NY", "zip": "10007", "population": 8405837},
{"city": "Newark", "state": "NJ", "zip": "07102", "population": 281944},
{"city": "Oakland", "state": "CA", "zip": "94607", "population": 406253},
{"city": "Oklahoma City", "state": "OK", "zip": "73102", "population": 631346},
{"city": "Omaha", "state": "NE", "zip": "68102", "population": 434353},
{"city": "Philadelphia", "state": "PA", "zip": "19107", "population": 1526006},
{"city": "Phoenix", "state": "AZ", "zip": "85003", "population": 1445632},
{"city": "Pittsburgh", "state": "PA", "zip": "15222", "population": 305841},
{"city": "Portland", "state": "OR", "zip": "97201", "population": 609456},
{"city": "Raleigh", "state": "NC", "zip": "27601", "population": 403892},
{"city": "Sacramento", "state": "CA", "zip": "95814", "population": 479686},
{"city": "San Antonio", "state": "TX", "zip": "78205", "population": 1327407},
{"city": "San Diego", "state": "CA", "zip": "92101", "population": 1307402},
{"city": "San Francisco", "state": "CA", "zip": "94102", "population": 805235},
{"city": "San Jose", "state": "CA", "zip": "95113", "population": 998537},
{"city": "Seattle", "state": "WA", "zip": "98101", "population": 608660},
{"city": "St. Louis", "state": "MO", "zip": "63102", "population": 319294},
{"city": "Tampa", "state": "FL", "zip": "33602", "population": 335709},
{"city": "Tucson", "state": "AZ", "zip": "85701", "population": 520116},
{"city": "Virginia Beach", "state": "VA", "zip": "23451", "population": 448479},
{"city": "Washington", "state": "DC", "zip": "20001", "population": 601723},
]
return cities
# write a function to return a random city
# accept a random value between 0 and 1 as an input parameter
def get_city(rnd_value):
for city in cities_final:
if rnd_value >= city["pcnt_running_total"]:
return city
# Write a function to return a random property type.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 63% Single-family, 26% Multi-family, 4% Condo,
# 3% Townhouse, 2% Mobile home, 1% Farm, 1% Other.
def get_property_type(rnd_value):
if rnd_value < 0.63:
return "Single-family"
elif rnd_value < 0.89:
return "Multi-family"
elif rnd_value < 0.93:
return "Condo"
elif rnd_value < 0.96:
return "Townhouse"
elif rnd_value < 0.98:
return "Mobile home"
elif rnd_value < 0.99:
return "Farm"
else:
return "Other"
# Create a function to write the address records to a csv file called 'address_data.csv'.
# Use an input parameter to specify the number of records to write.
# The csv file must have a header row and be comma delimited.
# All string values must be enclosed in double quotes.
def write_data(rec_count):
address_id = 0
with open("output/address_data.csv", "w", newline="") as csv_file:
csv_writer = csv.writer(
csv_file, delimiter=",", quotechar='"', quoting=csv.QUOTE_NONNUMERIC
)
csv_writer.writerow(
[
"id",
"address",
"city",
"state",
"zip",
"country",
"property_type",
"assessed_value",
]
)
for i in range(rec_count):
address_id += 1
street_name = get_street_name()
street_type = get_street_type()
street_address = f"{random.randint(1, 9999)} {street_name} {street_type}"
city_state_zip = get_city(random.random())
country = "United States"
property_type = get_property_type(random.random())
assessed_value = random.randint(52500, 1950000)
csv_writer.writerow(
[
address_id,
street_address,
city_state_zip["city"],
city_state_zip["state"],
city_state_zip["zip"],
country,
property_type,
assessed_value,
]
)
if __name__ == "__main__":
main()

To make this example more realistic, you could use Copilot’s assistance to write algorithms capable of accurately reflecting assessed property values based on the type of residence and the zip code.

Here is an example of the synthetic US-based residential address data output by the example application:

id address city state zip country property_type assessed_value
1 1008 Walk Burg Houston TX 77002 United States Multi-family 1122321
2 7088 Second Square Oklahoma City OK 73102 United States Single-family 261940
3 1425 Ridge Terrace Indianapolis IN 46204 United States Single-family 1030391
4 982 Way Lane New York NY 10007 United States Multi-family 95499
5 9404 Port Court Columbus OH 43215 United States Single-family 922404
6 7135 Crossing Trail Virginia Beach VA 23451 United States Single-family 272910
7 9481 Harbor Brook New York NY 10007 United States Multi-family 232795
8 8585 Manor Branch Raleigh NC 27601 United States Single-family 701217
9 7703 Bluff Boulevard Las Vegas NV 89101 United States Single-family 530581
10 4186 Worth Circle New York NY 10007 United States Townhouse 626577
11 8739 Prairie Trail Portland OR 97201 United States Single-family 316167
12 4 Shores Road Virginia Beach VA 23451 United States Single-family 925711
13 3148 Rocks Road Mesa AZ 85201 United States Single-family 182479
14 1545 Wells Drive Chicago IL 60602 United States Multi-family 1000777
15 1699 Ash Brook Seattle WA 98101 United States Single-family 605392

Example #3: Demographic Data

We could use the same techniques again to generate synthetic demographic data. With the assistance of Copilot, we can write functions that randomly return typical feminine or masculine first names (forenames) and common last names (surnames) found in the United States, for this use case.

# Write a function that generates a list of common feminine first names in the United States.
# List should be in alphabetical order.
# Each name should be unique.
# Return random first name.
def get_first_name_feminine():
first_name_feminine = [
"Alice", "Amanda", "Amy", "Angela", "Ann", "Anna", "Barbara", "Betty",
"Brenda", "Carol", "Carolyn", "Catherine", "Christine", "Cynthia", "Deborah", "Debra",
"Diane", "Donna", "Doris", "Dorothy", "Elizabeth", "Frances", "Gloria", "Heather",
"Helen", "Janet", "Jennifer", "Jessica", "Joyce", "Julie", "Karen", "Kathleen", "Kimberly",
]

return random.choice(first_name_feminine)

With the assistance of Copilot, we can also write functions that return demographic information, such as age, gender, race, marital status, religion, and political affiliation. Similar to the previous sales data example, we can influence the final synthetic dataset based on categorical distributions of different demographic categories, for instance, with the prompt:

# Write a function that returns a person's martial status.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 50% Married, 33% Single, 17% Unknown.
def get_martial_status(rnd_value):
if rnd_value < 0.50:
return "Married"
elif rnd_value < 0.83:
return "Single"
else:
return "Unknown"

By altering the categorical distributions, we can quickly alter the resulting synthetic dataset to reflect differing demographic characteristics: an older or younger population, the predominance of a single race, religious affiliation, or marital status, or the ratio of males to females.

Next, we can use a Gaussian distribution (aka normal distribution) to return the year of birth in a bell-shaped curve, given a mean year and a standard deviation, using Python’s random.normalvariate function.

# Write a function that generates a normal distribution of date of births.
# with a mean year of 1975 and a standard deviation of 10.
# Return random date of birth as a string in the format YYYY-MM-DD
def get_dob():
day_of_year = random.randint(1, 365)
year_of_birth = int(random.normalvariate(1975, 10))
dob = date(int(year_of_birth), 1, 1) + timedelta(day_of_year - 1)
dob = dob.strftime("%Y-%m-%d")
return dob

Here is the complete program that creates synthetic demographic data with Copilot’s assistance:

# Purpose: Generate demographic data
# Author: Gary A. Stafford and GitHub Copilot
# Date: 2023-04-14
# Usage: python3 demographic_data_gen.py 100
# Command-line argument(s): rec_count (number of records to generate as an integer)
# Write an application that creates a file containing demographic data.
# The application should accept a command line argument that specifies the number of records to generate.
# The application should write the demographic data to a file called 'demographic_data.csv'.
# The application should contain the following functions:
# – main() function that calls the other functions
# – function that returns a random first name
# – function that returns a random last name
# – function that returns a random date of birth
# – function that returns a random gender
# – function that returns a random religious affiliation
# – function that returns a random race
import random
import argparse
import csv
from datetime import date, timedelta
def main():
parser = argparse.ArgumentParser(description="Generate demographic data")
parser.add_argument(
"rec_count", type=int, help="The number of records to generate", default=100
)
rec_count = parser.parse_args().rec_count
write_data(rec_count)
# Write a function that generates a list of common feminine first names in the United States.
# List should be in alphabetical order.
# Each name should be unique.
# Return random first name.
def get_first_name_feminine():
first_name_feminine = [
"Alice", "Amanda", "Amy", "Angela", "Ann", "Anna", "Barbara", "Betty",
"Brenda", "Carol", "Carolyn", "Catherine", "Christine", "Cynthia", "Deborah", "Debra",
"Diane", "Donna", "Doris", "Dorothy", "Elizabeth", "Frances", "Gloria", "Heather",
"Helen", "Janet", "Jennifer", "Jessica", "Joyce", "Julie", "Karen", "Kathleen", "Kimberly",
"Laura", "Linda", "Lisa", "Margaret", "Maria", "Marie", "Martha", "Mary", "Melissa",
"Michelle", "Nancy", "Pamela", "Patricia", "Rebecca", "Ruth", "Sandra", "Sarah", "Sharon",
"Shirley", "Stephanie", "Susan", "Teresa", "Virginia",
]
return random.choice(first_name_feminine)
# Write a function that generates a list of common masculine first names in the United States.
# List should be in alphabetical order.
# Each name should be unique.
# Return random first name.
def get_first_name_masculine():
first_names_masculine = [
"Adams", "Alexander", "Allen", "Anderson", "Bailey", "Baker", "Barnes", "Bell",
"Bennett", "Brooks", "Brown", "Bryant", "Butler", "Campbell", "Carter", "Clark",
"Coleman", "Collins", "Cook", "Cooper", "Cox", "Davis", "Edwards", "Evans",
"Flores", "Foster", "Garcia", "Gonzales", "Gonzalez", "Gray", "Green", "Griffin",
"Hall", "Harris", "Henderson", "Hernandez", "Hill", "Howard", "Hughes", "Jackson",
"James", "Jenkins", "Johnson", "Jones", "Kelly", "King", "Lee", "Lewis", "Long",
"Lopez", "Martin", "Martinez", "Miller", "Mitchell", "Moore", "Morgan", "Morris",
"Murphy", "Nelson", "Parker", "Patterson", "Perez", "Perry", "Peterson", "Phillips",
"Powell", "Price", "Ramirez", "Reed", "Richardson", "Rivera", "Roberts", "Robinson",
"Rodriguez", "Rogers", "Ross", "Russell", "Sanchez", "Sanders", "Scott", "Simmons",
"Smith", "Stewart", "Taylor", "Thomas", "Thompson", "Torres", "Turner", "Walker",
"Ward", "Washington", "Watson", "White", "Williams", "Wilson", "Wood", "Wright", "Young"
]
return random.choice(first_names_masculine)
# Write a function that returns a person's gender.
# Return a random gender.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 53% Male, 40% Female, 6% Other, 1% Transgender
def get_gender(rnd_value):
if rnd_value < 0.53:
return "Male"
elif rnd_value < 0.93:
return "Feamle"
elif rnd_value < 0.99:
return "Other"
else:
return "Transgener"
# Write a function that returns a feminine or masculine first name.
# Return random first name.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 53% chance of being feminine, 40% chance of being masculine
def get_first_name(rnd_value):
if rnd_value < 0.53:
return get_first_name_masculine()
elif rnd_value < 0.93:
return get_first_name_feminine()
elif rnd_value < 0.97:
return get_first_name_masculine()
else:
return get_first_name_feminine()
# Write a function that generates a list of common last names in the United States.
# List should be in alphabetical order.
# Each name should be unique.
# Return random last name.
def get_last_name():
last_names = [
"Adams", "Alexander", "Allen", "Anderson", "Bailey", "Baker", "Barnes", "Bell",
"Bennett", "Brooks", "Brown", "Bryant", "Butler", "Campbell", "Carter", "Clark",
"Coleman", "Collins", "Cook", "Cooper", "Cox", "Davis", "Edwards", "Evans", "Flores",
"Foster", "Garcia", "Gonzales", "Gonzalez", "Gray", "Green", "Griffin", "Hall",
"Harris", "Henderson", "Hernandez", "Hill", "Howard", "Hughes", "Jackson", "James",
"Jenkins", "Johnson", "Jones", "Kelly", "King", "Lee", "Lewis", "Long", "Lopez",
"Martin", "Martinez", "Miller", "Mitchell", "Moore", "Morgan", "Morris", "Murphy",
"Nelson", "Parker", "Patterson", "Perez", "Perry", "Peterson", "Phillips", "Powell",
"Price", "Ramirez", "Reed", "Richardson", "Rivera", "Roberts", "Robinson", "Rodriguez",
"Rogers", "Ross", "Russell", "Sanchez", "Sanders", "Scott", "Simmons", "Smith", "Stewart",
"Taylor", "Thomas", "Thompson", "Torres", "Turner", "Walker", "Ward", "Washington", "Watson",
"White", "Williams", "Wilson", "Wood", "Wright", "Young",
]
return random.choice(last_names)
# Write a function that returns a martial status.
# Return random martial status.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 50% Married, 33% Single, 17% Unknown
def get_martial_status(rnd_value):
if rnd_value < 0.50:
return "Married"
elif rnd_value < 0.83:
return "Single"
else:
return "Unknown"
# Write a function that returns a person's race.
# Return random race.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 58% White, 19% Hispanic, 12% Black, 6% Asian, 4% Multiracial
def get_race(rnd_value):
if rnd_value < 0.58:
return "White"
elif rnd_value < 0.77:
return "Hispanic"
elif rnd_value < 0.89:
return "Black"
elif rnd_value < 0.95:
return "Asian"
else:
return "Multiracial"
# Write a function that returns a person's religious affiliation.
# Return random regilion.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 70% Christian, 20% Agnostic, 3% Atheist, 2% Jewish,
# 2% Other, 1% Muslim, 1% Hindu, 1% Buddhist
def get_regilion(rnd_value):
if rnd_value < 0.7:
return "Christian"
elif rnd_value < 0.9:
return "Agnostic"
elif rnd_value < 0.93:
return "Atheist"
elif rnd_value < 0.95:
return "Jewish"
elif rnd_value < 0.97:
return "Other"
elif rnd_value < 0.98:
return "Muslim"
elif rnd_value < 0.99:
return "Hindu"
else:
return "Buddhist"
# Write a function that returns a person's gender.
# Return a random gender.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 53% Male, 40% Female, 6% Other, 1% Transgender
def get_gender(rnd_value):
if rnd_value < 0.53:
return "Male"
elif rnd_value < 0.93:
return "Feamle"
elif rnd_value < 0.99:
return "Other"
else:
return "Transgener"
# Write a function that generates a normal distribution of ages.
# Using normalvariate() from the random module
# with a mean of 40 and a standard deviation of 10.
# Return random age as an integer.
def get_age():
return int(random.normalvariate(40, 10))
# Write a function that generates a normal distribution of date of births.
# with a mean year of 1975 and a standard deviation of 10.
# Return random date of birth as a string in the format YYYY-MM-DD
def get_dob():
day_of_year = random.randint(1, 365)
year_of_birth = int(random.normalvariate(1975, 10))
dob = date(int(year_of_birth), 1, 1) + timedelta(day_of_year 1)
dob = dob.strftime("%Y-%m-%d")
return dob
# Create a function to write the demographic records to a csv file called 'demographic_data.csv'.
# Use an input parameter to specify the number of records to write.
# The csv file must have a header row and be comma delimited.
# String values must be enclosed in double quotes.
def write_data(rec_count):
id = 0
with open("output/demographic_data.csv", "w", newline="") as csv_file:
csv_writer = csv.writer(
csv_file, delimiter=",", quotechar='"', quoting=csv.QUOTE_NONNUMERIC
)
csv_writer.writerow(
[
"id",
"first_name",
"last_name",
"dob",
"gender",
"martital_status",
"race",
"religion"
]
)
for i in range(rec_count):
rnd_gender = random.random()
id += 1
first_name = get_first_name(rnd_gender)
last_name = get_last_name()
dob = get_dob()
gender = get_gender(rnd_gender)
martial_status = get_martial_status(random.random())
race = get_race(random.random())
religion = get_regilion(random.random())
csv_writer.writerow(
[
id,
first_name,
last_name,
dob,
gender,
martial_status,
race,
religion
]
)
if __name__ == "__main__":
main()

To make this example more realistic, you could use Copilot’s assistance to write algorithms capable of more accurately representing the nuanced associations and correlations between age, gender, race, marital status, religion, and political affiliation.

Here is an example of the synthetic demographic data output by the example application:

user_id first_name last_name dob gender martital_status race religion
1 Thomas Powell 1967-06-10 Male Married Black Christian
2 Ward Williams 1973-07-22 Male Single Asian Christian
3 Martha Watson 1975-02-28 Feamle Single Hispanic Agnostic
4 Brenda Bailey 1979-07-07 Feamle Married Black Christian
5 Parker Johnson 1955-07-14 Male Married White Christian
6 Rebecca Wilson 1972-05-27 Feamle Married White Christian
7 Doris Allen 1956-07-09 Feamle Married Multiracial Christian
8 Rebecca Sanchez 1965-09-16 Feamle Single White Christian
9 Mary Johnson 1971-04-04 Feamle Single White Christian
10 Anderson Roberts 1983-11-02 Male Single Hispanic Christian
11 Robinson Peterson 1974-10-25 Male Single White Jewish
12 Lopez Ross 1985-04-30 Male Married White Christian
13 Flores Reed 1977-09-01 Male Married Black Agnostic
14 Martin Phillips 1960-03-24 Male Single White Christian
15 Carter Torres 1983-07-31 Male Married White Christian

Generative AI Tools for Unit Testing

In addition to writing code and documentation, a common use of generative AI code assistants like Copilot is unit tests. For example, we can create unit tests for each function in our coffee shop sales data code generator, using the same method of prompting with code comments.

Copilot can also assist with writing unit tests

Conclusion

In this post, we learned how Generative AI could assist us in creating synthetic data for software development, analytics, and machine learning. The examples herein generated data using simple techniques. Using advanced modeling techniques, we could generate increasingly complex, realistic synthetic data.

To learn about other ways Generative AI can be used to assist in writing code, please read my previous article, Ten Ways to Leverage Generative AI for Development on AWS.

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Ten Ways to Leverage Generative AI for Development on AWS

Explore ten ways you can use Generative AI coding tools to accelerate development and increase your productivity on AWS

Licensed image: PopTika/Shutterstock.com
Licensed image: PopTika/Shutterstock.com

Generative AI coding tools are a new class of software development tools that leverage machine learning algorithms to assist developers in writing code. These tools use AI models trained on vast amounts of code to offer suggestions for completing code snippets, writing functions, and even entire blocks of code.

Quote generated by OpenAI ChatGPT

Introduction

Combining the latest Generative AI coding tools with a feature-rich and extensible IDE and your coding skills will accelerate development and increase your productivity. In this post, we will look at ten examples of how you can use Generative AI coding tools on AWS:

  1. Application Development: Code, unit tests, and documentation
  2. Infrastructure as Code (IaC): AWS CloudFormation, AWS CDK, Terraform, and Ansible
  3. AWS Lambda: Serverless, event-driven functions
  4. IAM Policies: AWS IAM policies and Amazon S3 bucket policies
  5. Structured Query Language (SQL): Amazon RDS, Amazon Redshift, Amazon Athena, and Amazon EMR
  6. Big Data: Apache Spark and Flink on Amazon EMR, AWS Glue, and Kinesis Data Analytics
  7. Configuration and Properties files: Amazon MSK, Amazon EMR, and Amazon OpenSearch
  8. Apache Airflow DAGs: Amazon MWAA
  9. Containerization: Kubernetes resources, Helm Charts, Dockerfiles for Amazon EKS
  10. Utility Scripts: PowerShell, Bash, Shell, and Python

Choosing a Generative AI Coding Tool

In my recent post, Accelerating Development with Generative AI-Powered Coding Tools, I reviewed six popular tools: ChatGPT, Copilot, CodeWhisperer, Tabnine, Bing, and ChatSonic.

For this post, we will use GitHub Copilot, powered by OpenAI Codex, a new AI system created by OpenAI. Copilot suggests code and entire functions in real-time, right from your IDE. Copilot is trained in all languages that appear in GitHub’s public repositories. GitHub points out that the quality of suggestions you receive may depend on the volume and diversity of training data for that language. Similar tools in this category are limited in the number of languages they support compared to Copilot.

GitHub Copilot, your AI pair programmer

Copilot is currently available as an extension for Visual Studio Code, Visual Studio, Neovim, and JetBrains suite of IDEs. The GitHub Copilot extension for Visual Studio Code (VS Code) already has 4.8 million downloads, and the GitHub Copilot Nightly extension, used for this post, has almost 280,000 downloads. I am also using the GitHub Copilot Labs extension in this post.

GitHub Copilot Nightly VS Code extensions used in this post

Ten Ways to Leverage Generative AI

Take a look at ten examples of how you can use Generative AI coding tools to increase your development productivity on AWS. All the code samples in this post can be found on GitHub.

1. Application Development

According to GitHub, trained on billions of lines of code, GitHub Copilot turns natural language prompts into coding suggestions across dozens of languages. These features make Copilot ideal for developing applications, writing unit tests, and authoring documentation. You can use GitHub Copilot to assist with writing software applications in nearly any popular language, including Go.

Code generation of a Go application using GitHub Copilot

The final application, which uses the AWS SDK for Go to create an Amazon DynamoDB table, shown below, was formatted using the Go extension by Google and optimized using the ‘Readable,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

The final Go application ran in the VS Code terminal

Generating Unit Tests

Using JavaScript and TypeScript, you can take advantage of TestPilot to generate unit tests based on your existing code and documentation. TestPilot, part of GitHub Copilot Labs, uses GitHub Copilot’s AI technology.

Generating unit tests with GitHub TestPilot, part of Copilot

2. Infrastructure as Code (IaC)

Widespread Infrastructure as Code (IaC) tools include Pulumi, AWS CloudFormation, Azure ARM Templates, Google Deployment Manager, AWS Cloud Development Kit (AWS CDK), Microsoft Bicep, and Ansible. Many IaC tools, except AWS CDK, use JSON- or YAML-based domain-specific languages (DSLs).

AWS CloudFormation
AWS CloudFormation is an Infrastructure as Code (IaC) service that allows you to easily model, provision, and manage AWS and third-party resources. The CloudFormation template is a JSON or YAML formatted text file. You can use GitHub Copilot to assist with writing IaC, including AWS CloudFormation in either JSON or YAML.

Code generation of an AWS CloudFormation template using GitHub Copilot

You can use the YAML Language Support by Red Hat extension to write YAML in VS Code.

Final YAML-based AWS CloudFormation template using GitHub Copilot

VS Code has native JSON support with JSON Schema Store, which includes AWS CloudFormation. VS Code uses the CloudFormation schema for IntelliSense and flag schema errors in templates.

Final JSON-based AWS CloudFormation template using GitHub Copilot

HashiCorp Terraform

In addition to AWS CloudFormation, HashiCorp Terraform is an extremely popular IaC tool. According to HashiCorp, Terraform lets you define resources and infrastructure in human-readable, declarative configuration files and manages your infrastructure’s lifecycle. Using Terraform has several advantages over manually managing your infrastructure.

Terraform plugins called providers let Terraform interact with cloud platforms and other services via their application programming interfaces (APIs). You can use the AWS Provider to interact with the many resources supported by AWS.

Code generation of a HashiCorp Terraform file using GitHub Copilot

3. AWS Lambda

Lambda, according to AWS, is a serverless, event-driven compute service that lets you run code for virtually any application or backend service without provisioning or managing servers. You can trigger Lambda from over 200 AWS services and software as a service (SaaS) applications and only pay for what you use. AWS Lambda natively supports Java, Go, PowerShell, Node.js, C#, Python, and Ruby. AWS Lambda also provides a Runtime API allowing you to use additional programming languages to author your functions.

You can use GitHub Copilot to assist with writing AWS Lambda functions in any of the natively supported languages. You can further optimize the resulting Lambda code with GitHub’s Code Brushes.

Code generation of a Python-based AWS Lambda using GitHub Copilot

The final Python-based AWS Lambda, below, was formatted using the Black Formatter and Flake8 extensions and optimized using the ‘Readable,’ ‘Debug,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

Final AWS Python-based Lambda using GitHub Copilot and optimized with Code Brushes

You can easily convert the Python-based AWS Lambda to Java using GitHub Copilot Lab’s ability to translate code between languages. Install the GitHub Copilot Labs extension for VS Code to try out language translation.

Final AWS Java-based Lambda using GitHub Copilot converted from Python by Copilot

4. IAM Policies

AWS Identity and Access Management (AWS IAM) is a web service that helps you securely control access to AWS resources. According to AWS, you manage access in AWS by creating policies and attaching them to IAM identities (users, groups of users, or roles) or AWS resources. A policy is an object in AWS that defines its permissions when associated with an identity or resource. IAM policies are stored on AWS as JSON documents. You can use GitHub Copilot to assist in writing IAM Policies.

Code generation of an AWS IAM Policy using GitHub Copilot

The final AWS IAM Policy, below, was formatted using VS Code’s built-in JSON support.

Final AWS IAM Policy using GitHub Copilot

5. Structured Query Language (SQL)

SQL has many use cases on AWS, including Amazon Relational Database Service (RDS) for MySQL, PostgreSQL, MariaDB, Oracle, and SQL Server databases. SQL is also used with Amazon Aurora, Amazon Redshift, Amazon Athena, Apache Presto, Trino (PrestoSQL), and Apache Hive on Amazon EMR.

You can use IDEs like VS Code with its SQL dialect-specific language support and formatted extensions. You can further optimize the resulting SQL statements with GitHub’s Code Brushes.

Code generation of a PostgreSQL script using GitHub Copilot

The final PostgreSQL script, below, was formatted using the Sql Formatter extension and optimized using the ‘Readable’ and ‘Fix Bug’ GitHub Code Brushes.

PostgreSQL script generated with GitHub Copilot and optimized with Code Brushes

6. Big Data

Big Data, according to AWS, can be described in terms of data management challenges that — due to increasing volume, velocity, and variety of data — cannot be solved with traditional databases. AWS offers managed versions of Apache Spark, Apache Flink, Apache Zepplin, and Jupyter Notebooks on Amazon EMR, AWS Glue, and Amazon Kinesis Data Analytics (KDA).

Apache Spark
According to their website, Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters. Spark jobs can be written in various languages, including Python (PySpark), SQL, Scala, Java, and R. Apache Spark is available on a growing number of AWS services, including Amazon EMR and AWS Glue.

Code generation of a Python-based Apache Spark job using GitHub Copilot

The final Python-based Apache Spark job, below, was formatted using the Black Formatter extension and optimized using the ‘Readable,’ ‘Document,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

Final Python-based Apache Spark job using GitHub Copilot

7. Configuration and Properties Files

According to TechTarget, a configuration file (aka config) defines the parameters, options, settings, and preferences applied to operating systems, infrastructure devices, and applications. There are many examples of configuration and properties files on AWS, including Amazon MSK Connect (Kafka Connect Source/Sink Connectors), Amazon OpenSearch (Filebeat, Logstash), and Amazon EMR (Apache Log4j, Hive, and Spark).

Kafka Connect
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. AWS offers a fully-managed version of Kafka Connect: Amazon MSK Connect. You can use GitHub Copilot to write Kafka Connect Source and Sink Connectors with Kafka Connect and Amazon MSK Connect.

Code generation of a Kafka Connect Source Connector using GitHub Copilot

The final Kafka Connect Source Connector, below, was formatted using VS Code’s built-in JSON support. It incorporates the Debezium connector for MySQL, Avro file format, schema registry, and message transformation. Debezium is a popular open source distributed platform for performing change data capture (CDC) with Kafka Connect.

Final Kafka Connect Source Connector using GitHub Copilot

8. Apache Airflow DAGs

Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow. You can use GitHub Copilot to assist in writing DAGs for Apache Airflow, to be used with Amazon MWAA.

Code generation of an Airflow DAG using GitHub Copilot

The final Python-based Apache Spark job, below, was formatted using the Black Formatter extension. Unfortunately, based on my testing, code optimization with GitHub’s Code Brushes is impossible with Airflow DAGs.

Final Airflow DAG using GitHub Copilot

9. Containerization

According to Check Point Software, Containerization is a type of virtualization in which all the components of an application are bundled into a single container image and can be run in isolated user space on the same shared operating system. Containers are lightweight, portable, and highly conducive to automation. AWS describes containerization as a software deployment process that bundles an application’s code with all the files and libraries it needs to run on any infrastructure.

AWS has several container services, including Amazon Elastic Container Service (Amazon ECS), Amazon Elastic Kubernetes Service (Amazon EKS), Amazon Elastic Container Registry (Amazon ECR), and AWS Fargate. Several code-based resources can benefit from a Generative AI coding tool like GitHub Copilot, including Dockerfiles, Kubernetes resources, Helm Charts, Weaveworks Flux, and ArgoCD configuration.

Kubernetes

Kubernetes objects are represented in the Kubernetes API and expressed in YAML format. Below is a Kubernetes Deployment resource file, which creates a ReplicaSet to bring up multiple replicas of nginx Pods.

Code generation of Kubernetes resources using GitHub Copilot

The final Kubernetes resource file below contains Deployment and Service resources. In addition to GitHub Copilot, you can use Microsoft’s Kubernetes extension for VS Code to use IntelliSense and flag schema errors in the file.

Final Kubernetes resource file using GitHub Copilot

10. Utility Scripts

According to Bing AI — Search, utility scripts are small, simple snippets of code written as independent code files designed to perform a particular task. Utility scripts are commonly written in Bash, Shell, Python, Ruby, PowerShell, and PHP.

AWS utility scripts leverage the AWS Command Line Interface (AWS CLI) for Bash and Shell and AWS SDK for other programming languages. SDKs take the complexity out of coding by providing language-specific APIs for AWS services. For example, Boto3, AWS’s Python SDK, easily integrates your Python application, library, or script with AWS services, including Amazon S3, Amazon EC2, Amazon DynamoDB, and more.

Code generation of Python-based utility script using GitHub Copilot

An example of a Python script to calculate the total size of an Amazon S3 bucket, below, was inspired by 100daysofdevops/N-days-of-automation, a fantastic set of open source AWS-oriented automation scripts.

Final Python-based utility script using GitHub Copilot

Conclusion

In this post, you learned ten ways to leverage Generative AI coding tools like GitHub Copilot for development on AWS. You saw how combining the latest generation of Generative AI coding tools, a mature and extensible IDE, and your coding experience will accelerate development, increase productivity, and reduce cost.

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Exploring Popular Open-source Stream Processing Technologies: Part 2 of 2

A brief demonstration of Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot with Apache Superset

Introduction

According to TechTarget, “Stream processing is a data management technique that involves ingesting a continuous data stream to quickly analyze, filter, transform or enhance the data in real-time. Once processed, the data is passed off to an application, data store, or another stream processing engine.Confluent, a fully-managed Apache Kafka market leader, defines stream processing as “a software paradigm that ingests, processes, and manages continuous streams of data while they’re still in motion.

This two-part post series and forthcoming video explore four popular open-source software (OSS) stream processing projects: Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot.

This post uses the open-source projects, making it easier to follow along with the demonstration and keeping costs to a minimum. However, you could easily substitute the open-source projects for your preferred SaaS, CSP, or COSS service offerings.

Part Two

We will continue our exploration in part two of this two-part post, covering Apache Flink and Apache Pinot. In addition, we will incorporate Apache Superset into the demonstration to visualize the real-time results of our stream processing pipelines as a dashboard.

Demonstration #3: Apache Flink

In the third demonstration of four, we will examine Apache Flink. For this part of the post, we will also use the third of the three GitHub repository projects, flink-kafka-demo. The project contains a Flink application written in Java, which performs stream processing, incremental aggregation, and multi-stream joins.

High-level workflow for Apache Flink demonstration

New Streaming Stack

To get started, we need to replace the first streaming Docker Swarm stack, deployed in part one, with the second streaming Docker Swarm stack. The second stack contains Apache Kafka, Apache Zookeeper, Apache Flink, Apache Pinot, Apache Superset, UI for Apache Kafka, and Project Jupyter (JupyterLab).

https://programmaticponderings.wordpress.com/media/601efca17604c3a467a4200e93d7d3ff

The stack will take a few minutes to deploy fully. When complete, there should be ten containers running in the stack.

Viewing the Docker streaming stack’s ten containers

Flink Application

The Flink application has two entry classes. The first class, RunningTotals, performs an identical aggregation function as the previous KStreams demo.

public static void flinkKafkaPipeline(Properties prop) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// assumes PLAINTEXT authentication
KafkaSource<Purchase> source = KafkaSource.<Purchase>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PURCHASES_TOPIC"))
.setGroupId("flink_reduce_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new PurchaseDeserializationSchema())
.build();
DataStream<Purchase> purchases = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<RunningTotal> runningTotals = purchases
.flatMap((FlatMapFunction<Purchase, RunningTotal>) (purchase, out) -> out.collect(
new RunningTotal(
purchase.getTransactionTime(),
purchase.getProductId(),
1,
purchase.getQuantity(),
purchase.getTotalPurchase()
))
).returns(RunningTotal.class)
.keyBy(RunningTotal::getProductId)
.reduce((runningTotal1, runningTotal2) -> {
runningTotal2.setTransactions(runningTotal1.getTransactions() + runningTotal2.getTransactions());
runningTotal2.setQuantities(runningTotal1.getQuantities() + runningTotal2.getQuantities());
runningTotal2.setSales(runningTotal1.getSales().add(runningTotal2.getSales()));
return runningTotal2;
});
KafkaSink<RunningTotal> sink = KafkaSink.<RunningTotal>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(prop.getProperty("RUNNING_TOTALS_TOPIC"))
.setValueSerializationSchema(new RunningTotalSerializationSchema())
.build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
runningTotals.sinkTo(sink);
env.execute("Flink Running Totals Demo");
}

The second class, JoinStreams, joins the stream of data from the demo.purchases topic and the demo.products topic, processing and combining them, in real-time, into an enriched transaction and publishing the results to a new topic, demo.purchases.enriched.

public static void flinkKafkaPipeline(Properties prop) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// assumes PLAINTEXT authentication
KafkaSource<Product> productSource = KafkaSource.<Product>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PRODUCTS_TOPIC"))
.setGroupId("flink_join_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new ProductDeserializationSchema())
.build();
DataStream<Product> productsStream = env.fromSource(
productSource, WatermarkStrategy.noWatermarks(), "Kafka Products Source");
tableEnv.createTemporaryView("products", productsStream);
KafkaSource<Purchase> purchasesSource = KafkaSource.<Purchase>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PURCHASES_TOPIC"))
.setGroupId("flink_join_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new PurchaseDeserializationSchema())
.build();
DataStream<Purchase> purchasesStream = env.fromSource(
purchasesSource, WatermarkStrategy.noWatermarks(), "Kafka Purchases Source");
tableEnv.createTemporaryView("purchases", purchasesStream);
Table result =
tableEnv.sqlQuery(
"SELECT " +
"purchases.transactionTime, " +
"TO_TIMESTAMP(purchases.transactionTime), " +
"purchases.transactionId, " +
"purchases.productId, " +
"products.category, " +
"products.item, " +
"products.size, " +
"products.cogs, " +
"products.price, " +
"products.containsFruit, " +
"products.containsVeggies, " +
"products.containsNuts, " +
"products.containsCaffeine, " +
"purchases.price, " +
"purchases.quantity, " +
"purchases.isMember, " +
"purchases.memberDiscount, " +
"purchases.addSupplements, " +
"purchases.supplementPrice, " +
"purchases.totalPurchase " +
"FROM " +
"products " +
"JOIN purchases " +
"ON products.productId = purchases.productId"
);
DataStream<PurchaseEnriched> purchasesEnrichedTable = tableEnv.toDataStream(result,
PurchaseEnriched.class);
KafkaSink<PurchaseEnriched> sink = KafkaSink.<PurchaseEnriched>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(prop.getProperty("PURCHASES_ENRICHED_TOPIC"))
.setValueSerializationSchema(new PurchaseEnrichedSerializationSchema())
.build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
purchasesEnrichedTable.sinkTo(sink);
env.execute("Flink Streaming Join Demo");
}

The resulting enriched purchases messages look similar to the following:

{
"transaction_time": "2022-09-25 01:58:11.714838",
"transaction_id": "4068565608708439642",
"product_id": "CS08",
"product_category": "Classic Smoothies",
"product_name": "Rockin’ Raspberry",
"product_size": "24 oz.",
"product_cogs": 1.5,
"product_price": 4.99,
"contains_fruit": true,
"contains_veggies": false,
"contains_nuts": false,
"contains_caffeine": false,
"purchase_price": 4.99,
"purchase_quantity": 2,
"is_member": false,
"member_discount": 0,
"add_supplements": false,
"supplement_price": 0,
"total_purchase": 9.98
}
Sample enriched purchase message

Running the Flink Job

To run the Flink application, we must first compile it into an uber JAR.

We can copy the JAR into the Flink container or upload it through the Apache Flink Dashboard, a browser-based UI. For this demonstration, we will upload it through the Apache Flink Dashboard, accessible on port 8081.

The project’s build.gradle file has preset the Main class (Flink’s Entry class) to org.example.JoinStreams. Optionally, to run the Running Totals demo, we could change the build.gradle file and recompile, or simply change Flink’s Entry class to org.example.RunningTotals.

Uploading the JAR to Apache Flink

Before running the Flink job, restart the sales generator in the background (nohup python3 ./producer.py &) to generate a new stream of data. Then start the Flink job.

Apache Flink job running successfully

To confirm the Flink application is running, we can check the contents of the new demo.purchases.enriched topic using the Kafka CLI.

The new demo.purchases.enriched topic populated with messages from Apache Flink

Alternatively, you can use the UI for Apache Kafka, accessible on port 9080.

Viewing messages in the UI for Apache Kafka

Demonstration #4: Apache Pinot

In the fourth and final demonstration, we will explore Apache Pinot. First, we will query the unbounded data streams from Apache Kafka, generated by both the sales generator and the Apache Flink application, using SQL. Then, we build a real-time dashboard in Apache Superset, with Apache Pinot as our datasource.

Creating Tables

According to the Apache Pinot documentation, “a table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot).” There are three types of Pinot tables: Offline, Realtime, and Hybrid. For this demonstration, we will create three Realtime tables. Realtime tables ingest data from streams — in our case, Kafka — and build segments from the consumed data. Further, according to the documentation, “each table in Pinot is associated with a Schema. A schema defines what fields are present in the table along with the data types. The schema is stored in Zookeeper, along with the table configuration.

Below, we see the schema and config for one of the three Realtime tables, purchasesEnriched. Note how the columns are divided into three categories: Dimension, Metric, and DateTime.

{
"schemaName": "purchasesEnriched",
"dimensionFieldSpecs": [
{
"name": "transaction_id",
"dataType": "STRING"
},
{
"name": "product_id",
"dataType": "STRING"
},
{
"name": "product_category",
"dataType": "STRING"
},
{
"name": "product_name",
"dataType": "STRING"
},
{
"name": "product_size",
"dataType": "STRING"
},
{
"name": "product_cogs",
"dataType": "FLOAT"
},
{
"name": "product_price",
"dataType": "FLOAT"
},
{
"name": "contains_fruit",
"dataType": "BOOLEAN"
},
{
"name": "contains_veggies",
"dataType": "BOOLEAN"
},
{
"name": "contains_nuts",
"dataType": "BOOLEAN"
},
{
"name": "contains_caffeine",
"dataType": "BOOLEAN"
},
{
"name": "purchase_price",
"dataType": "FLOAT"
},
{
"name": "is_member",
"dataType": "BOOLEAN"
},
{
"name": "member_discount",
"dataType": "FLOAT"
},
{
"name": "add_supplements",
"dataType": "BOOLEAN"
},
{
"name": "supplement_price",
"dataType": "FLOAT"
}
],
"metricFieldSpecs": [
{
"name": "purchase_quantity",
"dataType": "INT"
},
{
"name": "total_purchase",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [
{
"name": "transaction_time",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSSSSS",
"granularity": "1:MILLISECONDS"
}
]
}
Schema file for purchasesEnriched Realtime table
{
"tableName": "purchasesEnriched",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "transaction_time",
"timeType": "MILLISECONDS",
"schemaName": "purchasesEnriched",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "demo.purchases.enriched",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:29092",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.rows": "50000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {}
}
Config file for purchasesEnriched Realtime table

To begin, copy the three Pinot Realtime table schemas and configurations from the streaming-sales-generator GitHub project into the Apache Pinot Controller container. Next, use a docker exec command to call the Pinot Command Line Interface’s (CLI) AddTable command to create the three tables: products, purchases, and purchasesEnriched.

# copy pinot table schema and config files to pinot controller
CONTROLLER_CONTAINER=$(docker container ls –filter name=streaming-stack_pinot-controller.1 –format "{{.ID}}")
cd ~/streaming-sales-generator/apache_pinot_examples
docker cp configs_schemas/ ${CONTROLLER_CONTAINER}:/tmp/
# create three tables
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/purchases-config.json \
-schemaFile /tmp/configs_schemas/purchases-schema.json -exec
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/products-config.json \
-schemaFile /tmp/configs_schemas/products-schema.json -exec
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/purchases-enriched-config.json \
-schemaFile /tmp/configs_schemas/purchases-enriched-schema.json -exec

To confirm the three tables were created correctly, use the Apache Pinot Data Explorer accessible on port 9000. Use the Tables tab in the Cluster Manager.

Cluster Manager’s Tables tab shows the three Realtime tables and corresponding schemas

We can further inspect and edit the table’s config and schema from the Tables tab in the Cluster Manager.

Realtime table’s editable config and schema

The three tables are configured to read the unbounded stream of data from the corresponding Kafka topics: demo.products, demo.purchases, and demo.purchases.enriched.

Querying with Pinot

We can use Pinot’s Query Console to query the Realtime tables using SQL. According to the documentation, “Pinot provides a SQL interface for querying. It uses the [Apache] Calcite SQL parser to parse queries and uses MYSQL_ANSI dialect.

Schema and query results for the purchases table

With the generator still running, re-query the purchases table in the Query Console (select count(*) from purchases). You should notice the document count increasing each time you re-run the query since new messages are published to the demo.purchases topic by the sales generator.

If you do not observe the count increasing, ensure the sales generator and Flink enrichment job are running.

Query Console showing the purchases table’s document count continuing to increase

Table Joins?

It might seem logical to want to replicate the same multi-stream join we performed with Apache Flink in part three of the demonstration on the demo.products and demo.purchases topics. Further, we might presume to join the products and purchases realtime tables by writing a SQL statement in Pinot’s Query Console. However, according to the documentation, at the time of this post, version 0.11.0 of Pinot did not [currently] support joins or nested subqueries.

This current join limitation is why we created the Realtime table, purchasesEnriched, allowing us to query Flink’s real-time results in the demo.purchases.enriched topic. We will use both Flink and Pinot as part of our stream processing pipeline, taking advantage of each tool’s individual strengths and capabilities.

Note, according to the documentation for the latest release of Pinot on the main branch, “the latest Pinot multi-stage supports inner join, left-outer, semi-join, and nested queries out of the box. It is optimized for in-memory process and latency.” For more information on joins as part of Pinot’s new multi-stage query execution engine, read the documentation, Multi-Stage Query Engine.

Query showing results from the demo.purchases.enriched topic in real-time

Aggregations

We can perform real-time aggregations using Pinot’s rich SQL query interface. For example, like previously with Spark and Flink, we can calculate running totals for the number of items sold and the total sales for each product in real time.

Aggregating running totals for each product

We can do the same with the purchasesEnriched table, which will use the continuous stream of enriched transaction data from our Apache Flink application. With the purchasesEnriched table, we can add the product name and product category for richer results. Each time we run the query, we get real-time results based on the running sales generator and Flink enrichment job.

Aggregating running totals for each product

Query Options and Indexing

Note the reference to the Star-Tree index at the start of the SQL query shown above. Pinot provides several query options, including useStarTree (true by default).

Multiple indexing techniques are available in Pinot, including Forward Index, Inverted Index, Star-tree Index, Bloom Filter, and Range Index, among others. Each has advantages in different query scenarios. According to the documentation, by default, Pinot creates a dictionary-encoded forward index for each column.

SQL Examples

Here are a few examples of SQL queries you can try in Pinot’s Query Console:

products
SELECT
COUNT(product_id) AS product_count,
AVG(price) AS avg_price,
AVG(cogs) AS avg_cogs,
AVG(price) AVG(cogs) AS avg_gross_profit
FROM
products;
purchases
SELECT
product_id,
SUMPRECISION(quantity, 10, 0) AS quantity,
SUMPRECISION(total_purchase, 10, 2) AS sales
FROM
purchases
GROUP BY
product_id
ORDER BY
sales DESC;
purchasesEnriched
SELECT
product_id,
product_name,
product_category,
SUMPRECISION(purchase_quantity, 10, 0) AS quantity,
SUMPRECISION(total_purchase, 10, 2) AS sales
FROM
purchasesEnriched
GROUP BY
product_id,
product_name,
product_category
ORDER BY
sales DESC;

Troubleshooting Pinot

If have issues with creating the tables or querying the real-time data, you can start by reviewing the Apache Pinot logs:

CONTROLLER_CONTAINER=$(docker container ls –filter name=streaming-stack_pinot-controller.1 –format "{{.ID}}")
docker exec -it ${CONTROLLER_CONTAINER} cat logs/pinot-all.log
view raw pinot_logs.sh hosted with ❤ by GitHub

Real-time Dashboards with Apache Superset

To display the real-time stream of data produced results of our Apache Flink stream processing job and made queriable by Apache Pinot, we can use Apache Superset. Superset positions itself as “a modern data exploration and visualization platform.” Superset allows users “to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

According to the documentation, “Superset requires a Python DB-API database driver and a SQLAlchemy dialect to be installed for each datastore you want to connect to.” In the case of Apache Pinot, we can use pinotdb as the Python DB-API and SQLAlchemy dialect for Pinot. Since the existing Superset Docker container does not have pinotdb installed, I have built and published a Docker Image with the driver and deployed it as part of the second streaming stack of containers.

# Custom Superset build to add apache pinot driver
# Gary A. Stafford (2022-09-25)
# Updated: 2022-12-18
FROM apache/superset:66138b0ca0b82a94404e058f0cc55517b2240069
# Switching to root to install the required packages
USER root
# Find which driver you need based on the analytics database:
# https://superset.apache.org/docs/databases/installing-database-drivers
RUN pip install mysqlclient psycopg2-binary pinotdb
# Switching back to using the `superset` user
USER superset
view raw Dockerfile hosted with ❤ by GitHub

First, we much configure the Superset container instance. These instructions are documented as part of the Superset Docker Image repository.

# establish an interactive session with the superset container
SUPERSET_CONTAINER=$(docker container ls –filter name=streaming-stack_superset.1 –format "{{.ID}}")
# initialize superset (see superset documentation)
docker exec -it ${SUPERSET_CONTAINER} \
superset fab create-admin \
–username admin \
–firstname Superset \
–lastname Admin \
–email admin@superset.com \
–password sUp3rS3cREtPa55w0rD1
docker exec -it ${SUPERSET_CONTAINER} superset db upgrade
docker exec -it ${SUPERSET_CONTAINER} superset init

Once the configuration is complete, we can log into the Superset web-browser-based UI accessible on port 8088.

Home page of the Superset web-browser-based UI

Pinot Database Connection and Dataset

Next, to connect to Pinot from Superset, we need to create a Database Connection and a Dataset.

Creating a new database connection to Pinot

The SQLAlchemy URI is shown below. Input the URI, test your connection (‘Test Connection’), make sure it succeeds, then hit ‘Connect’.

pinot+http://pinot-broker:8099/query?controller=http://pinot-controller:9000

Next, create a Dataset that references the purchasesEnriched Pinot table.

Creating a new dataset allowing us access to the purchasesEnriched Pinot table

Modify the dataset’s transaction_time column. Check the is_temporal and Default datetime options. Lastly, define the DateTime format as epoch_ms.

Modifying the dataset’s transaction_time column

Building a Real-time Dashboard

Using the new dataset, which connects Superset to the purchasesEnriched Pinot table, we can construct individual charts to be placed on a dashboard. Build a few charts to include on your dashboard.

Example of a chart whose data source is the new dataset
List of charts that included on the dashboard

Create a new Superset dashboard and add the charts and other elements, such as headlines, dividers, and tabs.

Apache Superset dashboard displaying data from Apache Pinot Realtime table

We can apply a refresh interval to the dashboard to continuously query Pinot and visualize the results in near real-time.

Configuring a refresh interval for the dashboard

Conclusion

In this two-part post series, we were introduced to stream processing. We explored four popular open-source stream processing projects: Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot. Next, we learned how we could solve similar stream processing and streaming analytics challenges using different streaming technologies. Lastly, we saw how these technologies, such as Kafka, Flink, Pinot, and Superset, could be integrated to create effective stream processing pipelines.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , , , , ,

Leave a comment

Exploring Popular Open-source Stream Processing Technologies: Part 1 of 2

A brief demonstration of Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot with Apache Superset

Introduction

According to TechTarget, “Stream processing is a data management technique that involves ingesting a continuous data stream to quickly analyze, filter, transform or enhance the data in real-time. Once processed, the data is passed off to an application, data store, or another stream processing engine.Confluent, a fully-managed Apache Kafka market leader, defines stream processing as “a software paradigm that ingests, processes, and manages continuous streams of data while they’re still in motion.

Batch vs. Stream Processing

Again, according to Confluent, “Batch processing is when the processing and analysis happens on a set of data that have already been stored over a period of time.” A batch processing example might include daily retail sales data, which is aggregated and tabulated nightly after the stores close. Conversely, “streaming data processing happens as the data flows through a system. This results in analysis and reporting of events as it happens.” To use a similar example, instead of nightly batch processing, the streams of sales data are processed, aggregated, and analyzed continuously throughout the day — sales volume, buying trends, inventory levels, and marketing program performance are tracked in real time.

Bounded vs. Unbounded Data

According to Packt Publishing’s book, Learning Apache Apex, “bounded data is finite; it has a beginning and an end. Unbounded data is an ever-growing, essentially infinite data set.” Batch processing is typically performed on bounded data, whereas stream processing is most often performed on unbounded data.

Stream Processing Technologies

There are many technologies available to perform stream processing. These include proprietary custom software, commercial off-the-shelf (COTS) software, fully-managed service offerings from Software as a Service (or SaaS) providers, Cloud Solution Providers (CSP), Commercial Open Source Software (COSS) companies, and popular open-source projects from the Apache Software Foundation and Linux Foundation.

The following two-part post and forthcoming video will explore four popular open-source software (OSS) stream processing projects, including Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot. Each of these projects has some equivalent SaaS, CSP, and COSS offerings.

This post uses the open-source projects, making it easier to follow along with the demonstration and keeping costs to a minimum. However, you could easily substitute the open-source projects for your preferred SaaS, CSP, or COSS service offerings.

Apache Spark Structured Streaming

According to the Apache Spark documentation, “Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.” Further, “Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.” In the post, we will examine both batch and stream processing using a series of Apache Spark Structured Streaming jobs written in PySpark.

Spark Structured Streaming job statistics as seen from the Spark UI

Apache Kafka Streams

According to the Apache Kafka documentation, “Kafka Streams [aka KStreams] is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.” In the post, we will examine a KStreams application written in Java that performs stream processing and incremental aggregation.

Building the KStreams application’s uber JAR in JetBrains IntelliJ IDEA

Apache Flink

According to the Apache Flink documentation, “Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.” Further, “Apache Flink excels at processing unbounded and bounded data sets. Precise control of time and state enables Flink’s runtime to run any kind of application on unbounded streams. Bounded streams are internally processed by algorithms and data structures that are specifically designed for fixed-sized data sets, yielding excellent performance.” In the post, we will examine a Flink application written in Java, which performs stream processing, incremental aggregation, and multi-stream joins.

Apache Flink Dashboard showing Flink pipeline demonstrated in this post

Apache Pinot

According to Apache Pinot’s documentation, “Pinot is a real-time distributed OLAP datastore, purpose-built to provide ultra-low-latency analytics, even at extremely high throughput. It can ingest directly from streaming data sources — such as Apache Kafka and Amazon Kinesis — and make the events available for querying instantly. It can also ingest from batch data sources such as Hadoop HDFS, Amazon S3, Azure ADLS, and Google Cloud Storage.” In the post, we will query the unbounded data streams from Apache Kafka, generated by Apache Flink, using SQL.

Apache Pinot Query Console showing tables demonstrated in this post

Streaming Data Source

We must first find a good unbounded data source to explore or demonstrate these streaming technologies. Ideally, the streaming data source should be complex enough to allow multiple types of analyses and visualize different aspects with Business Intelligence (BI) and dashboarding tools. Additionally, the streaming data source should possess a degree of consistency and predictability while displaying a reasonable level of variability and randomness.

To this end, we will use the open-source Streaming Synthetic Sales Data Generator project, which I have developed and made available on GitHub. This project’s highly-configurable, Python-based, synthetic data generator generates an unbounded stream of product listings, sales transactions, and inventory restocking activities to a series of Apache Kafka topics.

Streaming Synthetic Sales Data Generator publishing messages to Apache Kafka

Source Code

All the source code demonstrated in this post is open source and available on GitHub. There are three separate GitHub projects:

# streaming data generator, Apache Spark and Apache Pinot examples
git clone –depth 1 -b main \
https://github.com/garystafford/streaming-sales-generator.git
# Apache Flink examples
git clone –depth 1 -b main \
https://github.com/garystafford/flink-kafka-demo.git
# Kafka Streams examples
git clone –depth 1 -b main \
https://github.com/garystafford/kstreams-kafka-demo.git

Docker

To make it easier to follow along with the demonstration, we will use Docker Swarm to provision the streaming tools. Alternatively, you could use Kubernetes (e.g., creating a Helm chart) or your preferred CSP or SaaS managed services. Nothing in this demonstration requires you to use a paid service.

The two Docker Swarm stacks are located in the Streaming Synthetic Sales Data Generator project:

  1. Streaming Stack — Part 1: Apache Kafka, Apache Zookeeper, Apache Spark, UI for Apache Kafka, and the KStreams application
  2. Streaming Stack — Part 2: Apache Kafka, Apache Zookeeper, Apache Flink, Apache Pinot, Apache Superset, UI for Apache Kafka, and Project Jupyter (JupyterLab).*

* the Jupyter container can be used as an alternative to the Spark container for running PySpark jobs (follow the same steps as for Spark, below)

Demonstration #1: Apache Spark

In the first of four demonstrations, we will examine two Apache Spark Structured Streaming jobs, written in PySpark, demonstrating both batch processing (spark_batch_kafka.py) and stream processing (spark_streaming_kafka.py). We will read from a single stream of data from a Kafka topic, demo.purchases, and write to the console.

High-level workflow for Apache Spark demonstration

Deploying the Streaming Stack

To get started, deploy the first streaming Docker Swarm stack containing the Apache Kafka, Apache Zookeeper, Apache Spark, UI for Apache Kafka, and the KStreams application containers.

# cd into project
cd streaming-sales-generator/
# initialize swarm stack – 1x only
docker swarm init
# optional: delete previous streaming-stack
docker stack rm streaming-stack
# deploy first streaming-stack
docker stack deploy streaming-stack –compose-file docker/spark-kstreams-stack.yml
# observe the deployment's progress
docker stack services streaming-stack

The stack will take a few minutes to deploy fully. When complete, there should be a total of six containers running in the stack.

Viewing the Docker streaming stack’s six containers

Sales Generator

Before starting the streaming data generator, confirm or modify the configuration/configuration.ini. Three configuration items, in particular, will determine how long the streaming data generator runs and how much data it produces. We will set the timing of transaction events to be generated relatively rapidly for test purposes. We will also set the number of events high enough to give us time to explore the Spark jobs. Using the below settings, the generator should run for an average of approximately 50–60 minutes: (((5 sec + 2 sec)/2)*1000 transactions)/60 sec=~58 min on average. You can run the generator again if necessary or increase the number of transactions.

[SALES]
# minimum sales frequency in seconds (debug with 1, typical min. 120)
min_sale_freq = 2
# maximum sales frequency in seconds (debug with 3, typical max. 300)
max_sale_freq = 5
# number of transactions to generate
number_of_sales = 1000
A code snippet from the project’s configuration.ini file

Start the streaming data generator as a background service:

# install required python packages (1x)
python3 -m pip install kafka-python
cd sales_generator/
# run in foreground
python3 ./producer.py
# better option, run as background process
nohup python3 ./producer.py &
# confirm process is running
ps -u

The streaming data generator will start writing data to three Apache Kafka topics: demo.products, demo.purchases, and demo.inventories. We can view these topics and their messages by logging into the Apache Kafka container and using the Kafka CLI:

# establish an interactive session with the spark container
KAFKA_CONTAINER=$(docker container ls –filter name=streaming-stack_kafka.1 –format "{{.ID}}")
docker exec -it ${KAFKA_CONTAINER} bash
# set environment variables used by jobs
export BOOTSTRAP_SERVERS="localhost:9092"
export TOPIC_PRODUCTS="demo.products"
export TOPIC_PURCHASES="demo.purchases"
export TOPIC_INVENTORIES="demo.inventories"
# list topics
kafka-topics.sh –list –bootstrap-server $BOOTSTRAP_SERVERS
# read topics from beginning
kafka-console-consumer.sh \
–topic $TOPIC_PRODUCTS –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $TOPIC_PURCHASES –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $TOPIC_INVENTORIES –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS

Below, we see a few sample messages from the demo.purchases topic:

Consuming messages from Kafka’s demo.purchases topic

Alternatively, you can use the UI for Apache Kafka, accessible on port 9080.

Viewing demo.purchases topic in the UI for Apache Kafka
Viewing messages in the demo.purchases topic using the UI for Apache Kafka

Prepare Spark

Next, prepare the Spark container to run the Spark jobs:

# establish an interactive session with the spark container
SPARK_CONTAINER=$(docker container ls –filter name=streaming-stack_spark.1 –format "{{.ID}}")
docker exec -it -u 0 ${SPARK_CONTAINER} bash
# update and install wget
apt-get update && apt-get install wget vim -y
# install required job dependencies
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.1/kafka-clients-3.3.1.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.1/spark-sql-kafka-0-10_2.12-3.3.1.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.1/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
mv *.jar /opt/bitnami/spark/jars/
exit
Preparing the Spark container instance as the root user

Running the Spark Jobs

Next, copy the jobs from the project to the Spark container, then exec back into the container:

# copy jobs to spark container
docker cp apache_spark_examples/ ${SPARK_CONTAINER}:/home/
# establish an interactive session with the spark container
docker exec -it ${SPARK_CONTAINER} bash

Batch Processing with Spark

The first Spark job, spark_batch_kafka.py, aggregates the number of items sold and the total sales for each product, based on existing messages consumed from the demo.purchases topic. We use the PySpark DataFrame class’s read() and write() methods in the first example, reading from Kafka and writing to the console. We could just as easily write the results back to Kafka.

ds_sales = (
df_sales.selectExpr("CAST(value AS STRING)")
.select(F.from_json("value", schema=schema).alias("data"))
.select("data.*")
.withColumn("row", F.row_number().over(window))
.withColumn("quantity", F.sum(F.col("quantity")).over(window_agg))
.withColumn("sales", F.sum(F.col("total_purchase")).over(window_agg))
.filter(F.col("row") == 1)
.drop("row")
.select(
"product_id",
F.format_number("sales", 2).alias("sales"),
F.format_number("quantity", 0).alias("quantity"),
)
.coalesce(1)
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
.write.format("console")
.option("numRows", 25)
.option("truncate", False)
.save()
)
A snippet of batch processing Spark job’s summarize_sales() method

The batch processing job sorts the results and outputs the top 25 items by total sales to the console. The job should run to completion and exit successfully.

Batch results for top 25 items by total sales

To run the batch Spark job, use the following commands:

# set environment variables used by jobs
export BOOTSTRAP_SERVERS="kafka:29092"
export TOPIC_PURCHASES="demo.purchases"
cd /home/apache_spark_examples/
# run batch processing job
spark-submit spark_batch_kafka.py
Run the batch Spark job

Stream Processing with Spark

The stream processing Spark job, spark_streaming_kafka.py, also aggregates the number of items sold and the total sales for each item, based on messages consumed from the demo.purchases topic. However, as shown in the code snippet below, this job continuously aggregates the stream of data from Kafka, displaying the top ten product totals within an arbitrary ten-minute sliding window, with a five-minute overlap, and updates output every minute to the console. We use the PySpark DataFrame class’s readStream() and writeStream() methods as opposed to the batch-oriented read() and write() methods in the first example.

ds_sales = (
df_sales.selectExpr("CAST(value AS STRING)")
.select(F.from_json("value", schema=schema).alias("data"))
.select("data.*")
.withWatermark("transaction_time", "10 minutes")
.groupBy("product_id", F.window("transaction_time", "10 minutes", "5 minutes"))
.agg(F.sum("total_purchase"), F.sum("quantity"))
.orderBy(F.col("window").desc(), F.col("sum(total_purchase)").desc())
.select(
"product_id",
F.format_number("sum(total_purchase)", 2).alias("sales"),
F.format_number("sum(quantity)", 0).alias("drinks"),
"window.start",
"window.end",
)
.coalesce(1)
.writeStream.queryName("streaming_to_console")
.trigger(processingTime="1 minute")
.outputMode("complete")
.format("console")
.option("numRows", 10)
.option("truncate", False)
.start()
)
ds_sales.awaitTermination()
A snippet of stream processing Spark job’s summarize_sales() method

Shorter event-time windows are easier for demonstrations — in Production, hourly, daily, weekly, or monthly windows are more typical for sales analysis.

Micro-batch representing real-time totals for the current ten-minute window

To run the stream processing Spark job, use the following commands:

# run stream processing job
spark-submit spark_streaming_kafka.py
Run the stream processing Spark job

We could just as easily calculate running totals for the stream of sales data versus aggregations over a sliding event-time window (example job included in project).

Micro-batch representing running totals for data stream as opposed to using event-time windows

Be sure to kill the stream processing Spark jobs when you are done, or they will continue to run, awaiting more data.

Demonstration #2: Apache Kafka Streams

Next, we will examine Apache Kafka Streams (aka KStreams). For this part of the post, we will also use the second of the three GitHub repository projects, kstreams-kafka-demo. The project contains a KStreams application written in Java that performs stream processing and incremental aggregation.

High-level workflow for KStreams demonstration

KStreams Application

The KStreams application continuously consumes the stream of messages from the demo.purchases Kafka topic (source) using an instance of the StreamBuilder() class. It then aggregates the number of items sold and the total sales for each item, maintaining running totals, which are then streamed to a new demo.running.totals topic (sink). All of this using an instance of the KafkaStreams() Kafka client class.

private static void kStreamPipeline(Properties props) {
System.out.println("Starting…");
Properties kafkaStreamsProps = new Properties();
kafkaStreamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, props.getProperty("APPLICATION_ID"));
kafkaStreamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("BOOTSTRAP_SERVERS"));
kafkaStreamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, props.getProperty("AUTO_OFFSET_RESET_CONFIG"));
kafkaStreamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, props.getProperty("COMMIT_INTERVAL_MS_CONFIG"));
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(props.getProperty("INPUT_TOPIC"), Consumed.with(Serdes.Void(), CustomSerdes.Purchase()))
.peek((unused, purchase) -> System.out.println(purchase.toString()))
.flatMap((KeyValueMapper<Void, Purchase, Iterable<KeyValue<String, Total>>>) (unused, purchase) -> {
List<KeyValue<String, Total>> result = new ArrayList<>();
result.add(new KeyValue<>(purchase.getProductId(), new Total(
purchase.getTransactionTime(),
purchase.getProductId(),
1,
purchase.getQuantity(),
purchase.getTotalPurchase()
)));
return result;
})
.groupByKey(Grouped.with(Serdes.String(), CustomSerdes.Total()))
.reduce((total1, total2) -> {
total2.setTransactions(total1.getTransactions() + total2.getTransactions());
total2.setQuantities(total1.getQuantities() + total2.getQuantities());
total2.setSales(total1.getSales().add(total2.getSales()));
return total2;
})
.toStream()
.peek((productId, total) -> System.out.println(total.toString()))
.to(props.getProperty("OUTPUT_TOPIC"), Produced.with(Serdes.String(), CustomSerdes.Total()));
KafkaStreams streams = new KafkaStreams(builder.build(), kafkaStreamsProps);
streams.start();
System.out.println("Running…");
}
A snippet of KStreams application’s kStreamPipeline() method

Running the Application

We have at least three choices to run the KStreams application for this demonstration: 1) running locally from our IDE, 2) a compiled JAR run locally from the command line, or 3) a compiled JAR copied into a Docker image, which is deployed as part of the Swarm stack. You can choose any of the options.

# set java version (v17 is latest compatible version with kstreams)
JAVA_HOME=~/Library/Java/JavaVirtualMachines/corretto-17.0.5/Contents/Home
$JAVA_HOME/bin/java -version
# compile to uber jar
./gradlew clean shadowJar
# run the streaming application
$JAVA_HOME/bin/java -jar build/libs/kstreams-kafka-demo-1.0.0-all.jar

Compiling and running the KStreams application locally

We will continue to use the same streaming Docker Swarm stack used for the Apache Spark demonstration. I have already compiled a single uber JAR file using OpenJDK 17 and Gradle from the project’s source code. I then created and published a Docker image, which is already part of the running stack.

FROM amazoncorretto:17.0.5
COPY build/libs/kstreams-kafka-demo-1.1.0-all.jar /tmp/kstreams-app.jar
CMD ["java", "-jar", "/tmp/kstreams-app.jar"]
view raw Dockerfile hosted with ❤ by GitHub
Dockerfile used to build KStreams app Docker image

Since we ran the sales generator earlier for the Spark demonstration, there is existing data in the demo.purchases topic. Re-run the sales generator (nohup python3 ./producer.py &) to generate a new stream of data. View the results of the KStreams application, which has been running since the stack was deployed using the Kafka CLI or UI for Apache Kafka:

# terminal 1: establish an interactive session with the kstreams app container
KSTREAMS_CONTAINER=$(docker container ls –filter name=streaming-stack_kstreams.1 –format "{{.ID}}")
docker logs ${KSTREAMS_CONTAINER} –follow
# terminal 2: establish an interactive session with the kafka container
KAFKA_CONTAINER=$(docker container ls –filter name=streaming-stack_kafka.1 –format "{{.ID}}")
docker exec -it ${KAFKA_CONTAINER} bash
# set environment variables used by jobs
export BOOTSTRAP_SERVERS="localhost:9092"
export INPUT_TOPIC="demo.purchases"
export OUTPUT_TOPIC="demo.running.totals"
# read topics from beginning
kafka-console-consumer.sh \
–topic $INPUT_TOPIC –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $OUTPUT_TOPIC –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS

Below, in the top terminal window, we see the output from the KStreams application. Using KStream’s peek() method, the application outputs Purchase and Total instances to the console as they are processed and written to Kafka. In the lower terminal window, we see new messages being published as a continuous stream to output topic, demo.running.totals.

KStreams application performing stream processing and the resulting output stream

Part Two

In part two of this two-part post, we continue our exploration of the four popular open-source stream processing projects. We will cover Apache Flink and Apache Pinot. In addition, we will incorporate Apache Superset into the demonstration, building a real-time dashboard to visualize the results of our stream processing.

Apache Superset dashboard displaying data from Apache Pinot Realtime table

This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , , , ,

Leave a comment

Serverless Analytics on AWS: Getting Started with Amazon EMR Serverless and Amazon MSK Serverless

Utilizing the recently released Amazon EMR Serverless and Amazon MSK Serverless for batch and streaming analytics with Apache Spark and Apache Kafka

Introduction

Amazon EMR Serverless

AWS recently announced the general availability (GA) of Amazon EMR Serverless on June 1, 2022. EMR Serverless is a new serverless deployment option in Amazon EMR, in addition to EMR on EC2EMR on EKS, and EMR on AWS Outposts. EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. According to AWS, with EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks.

Amazon MSK Serverless

Similarly, on April 28, 2022, AWS announced the general availability of Amazon MSK Serverless. According to AWS, Amazon MSK Serverless is a cluster type for Amazon MSK that makes it easy to run Apache Kafka without managing and scaling cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources, so you can use Apache Kafka on demand and only pay for the data you stream and retain.

Serverless Analytics

In the following post, we will learn how to use these two new, powerful, cost-effective, and easy-to-operate serverless technologies to perform batch and streaming analytics. The PySpark examples used in this post are similar to those featured in two earlier posts, which featured non-serverless alternatives Amazon EMR on EC2 and Amazon MSK: Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR and Stream Processing with Apache Spark, Kafka, Avro, and Apicurio Registry on AWS using Amazon MSK and EMR.

Source Code

All the source code demonstrated in this post is open-source and available on GitHub.

git clone --depth 1 -b main \
https://github.com/garystafford/emr-msk-serverless-demo.git

Architecture

The post’s high-level architecture consists of an Amazon EMR Serverless Application, Amazon MSK Serverless Cluster, and Amazon EC2 Kafka client instance. To support these three resources, we will need two Amazon Virtual Private Clouds (VPCs), a minimum of three subnets, an AWS Internet Gateway (IGW) or equivalent, an Amazon S3 Bucket, multiple AWS Identity and Access Management (IAM) Roles and Policies, Security Groups, and Route Tables, and a VPC Gateway Endpoint for S3. All resources are constrained to a single AWS account and a single AWS Region, us-east-1.

High-level AWS serverless analytics architecture used in this post

Prerequisites

As a prerequisite for this post, you will need to create the following resources:

  1. (1) Amazon EMR Serverless Application;
  2. (1) Amazon MSK Serverless Cluster;
  3. (1) Amazon S3 Bucket;
  4. (1) VPC Endpoint for S3;
  5. (3) Apache Kafka topics;
  6. PySpark applications, related JAR dependencies, and sample data files uploaded to Amazon S3 Bucket;

Let’s walk through each of these prerequisites.

Amazon EMR Serverless Application

Before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon EMR Serverless, especially, What is Amazon EMR Serverless? Create a new EMR Serverless Application by following the AWS documentation, Getting started with Amazon EMR Serverless. The creation of the EMR Serverless Application includes the following resources:

  1. Amazon S3 bucket for storage of Spark resources;
  2. Amazon VPC with at least two private subnets and associated Security Group(s);
  3. EMR Serverless runtime AWS IAM Role and associated IAM Policy;
  4. Amazon EMR Serverless Application;

For this post, use the latest version of EMR available in the EMR Studio Serverless Application console, the newly released version 6.7.0, to create a Spark application.

EMR Studio Serverless Application creation console

Keep the default pre-initialized capacity, application limits, and application behavior settings.

EMR Studio Serverless Application creation console

Since we are connecting to MSK Serverless from EMR Serverless, we need to configure VPC access. Select the new VPC and at least two private subnets in different Availability Zones (AZs).

EMR Studio Serverless Application creation console

According to the documentation, the subnets selected for EMR Serverless must be private subnets. The associated route tables for the subnets should not contain direct routes to the Internet.

Error resulting from trying to associate a public subnet with EMR Serverless
EMR Studio Serverless Application details console showing new Application

Amazon MSK Serverless Cluster

Similarly, before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon MSK Serverless, especially MSK Serverless. Create a new MSK Serverless Cluster by following the AWS documentation, Getting started using MSK Serverless clusters. The creation of the MSK Serverless Cluster includes the following resources:

  1. AWS IAM Role and associated IAM Policy for the Amazon EC2 Kafka client instance;
  2. VPC with at least one public subnet and associated Security Group(s);
  3. Amazon EC2 instance used as Apache Kafka client, provisioned in the public subnet of the above VPC;
  4. Amazon MSK Serverless Cluster;
Amazon MSK Serverless Create cluster console

Associate the new MSK Serverless Cluster with the EMR Serverless Application’s VPC and two private subnets. Also, associate the cluster with the EC2-based Kafka client instance’s VPC and its public subnet.

Amazon MSK Serverless Create cluster console — VPC 1
Amazon MSK Serverless Create cluster console — VPC 2

According to the AWS documentation, Amazon MSK does not support all AZs. For example, I tried to use a subnet in us-east-1e threw an error. If this happens, choose an alternative AZ.

Error resulting from using an unsupported AZ
Successfully created Amazon MSK Serverless Cluster

VPC Endpoint for S3

To access the Spark resource in Amazon S3 from EMR Serverless running in the two private subnets, we need a VPC Endpoint for S3. Specifically, a Gateway Endpoint, which sends traffic to Amazon S3 or DynamoDB using private IP addresses. A gateway endpoint for Amazon S3 enables you to use private IP addresses to access Amazon S3 without exposure to the public Internet. EMR Serverless does not require public IP addresses, and you don’t need an internet gateway (IGW), a NAT device, or a virtual private gateway in your VPC to connect to S3.

VPC Endpoint for S3 associated with route table for private subnets

Create the VPC Endpoint for S3 (Gateway Endpoint) and add the route table for the two EMR Serverless private subnets. You can add additional routes to that route table, such as VPC peering connections to data sources such as Amazon Redshift or Amazon RDS. However, do not add routes that provide direct Internet access.

Route table for private subnets showing VPC Endpoint to S3 route (first route shown)

Kafka Topics and Sample Messages

Once the MSK Serverless Cluster and EC2-based Kafka client instance are provisioned and running, create the three required Kafka topics using the EC2-based Kafka client instance. I recommend using AWS Systems Manager Session Manager to connect to the client instance as the ec2-user user. Session Manager provides secure and auditable node management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys. Alternatively, you can SSH into the client instance.

Before creating the topics, use a utility like telnet to confirm connectivity between the Kafka client and the MSK Serverless Cluster. Verifying connectivity will save you a lot of frustration with potential security and networking issues.

sudo yum install telnet -y
telnet <your_bootstrap_server_host> 9098
# > Trying 192.168.XX.XX…
# > Connected to boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com.
# > Escape character is '^]'.

With MSK Serverless Cluster connectivity confirmed, create the three Kafka topics: topicAtopicB, and topicC. I am using the default partitioning and replication settings from the AWS Getting Started Tutorial.

cd kafka_2.12-2.8.1
# *** CHANGE ME ***
export BOOTSTRAP_SERVER=<your_bootstrap_server> # e.g., boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com:9098
bin/kafka-topics.sh –create –topic topicA \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicB \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicC \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
# list topics to confirm creation
bin/kafka-topics.sh –list \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties

To create some quick sample data, we will copy and paste 250 messages from a file included in the GitHub project, sample_data/sales_messages.txt, into topicA. The messages are simple mock sales transactions.

{"payment_id":16940,"customer_id":130,"amount":5.99,"payment_date":"2021-05-08 21:21:56.996577 +00:00","city":"guas Lindas de Gois","district":"Gois","country":"Brazil"}
{"payment_id":16406,"customer_id":459,"amount":5.99,"payment_date":"2021-05-08 21:22:59.996577 +00:00","city":"Qomsheh","district":"Esfahan","country":"Iran"}
{"payment_id":16315,"customer_id":408,"amount":6.99,"payment_date":"2021-05-08 21:32:05.996577 +00:00","city":"Jaffna","district":"Northern","country":"Sri Lanka"}
{"payment_id":16185,"customer_id":333,"amount":7.99,"payment_date":"2021-05-08 21:33:07.996577 +00:00","city":"Baku","district":"Baki","country":"Azerbaijan"}
{"payment_id":17097,"customer_id":222,"amount":9.99,"payment_date":"2021-05-08 21:33:47.996577 +00:00","city":"Jaroslavl","district":"Jaroslavl","country":"Russian Federation"}
{"payment_id":16579,"customer_id":549,"amount":3.99,"payment_date":"2021-05-08 21:36:33.996577 +00:00","city":"Santiago de Compostela","district":"Galicia","country":"Spain"}
{"payment_id":16050,"customer_id":269,"amount":4.99,"payment_date":"2021-05-08 21:40:19.996577 +00:00","city":"Salinas","district":"California","country":"United States"}
{"payment_id":17126,"customer_id":239,"amount":7.99,"payment_date":"2021-05-08 22:00:12.996577 +00:00","city":"Ciomas","district":"West Java","country":"Indonesia"}
{"payment_id":16933,"customer_id":126,"amount":7.99,"payment_date":"2021-05-08 22:29:06.996577 +00:00","city":"Po","district":"So Paulo","country":"Brazil"}
{"payment_id":16297,"customer_id":399,"amount":8.99,"payment_date":"2021-05-08 22:30:47.996577 +00:00","city":"Okara","district":"Punjab","country":"Pakistan"}

Use the kafka-console-producer Shell script to publish the messages to the Kafka topic. Use the kafka-console-consumer Shell script to validate the messages made it to the topic by consuming a few messages.

bin/kafka-console-producer.sh \
–topic topicA \
–bootstrap-server $BOOTSTRAP_SERVER \
–producer.config config/client.properties
# copy and paste contents of 'sales_messages.txt' and then Ctrl+C to exit
# check for messages in topic
bin/kafka-console-consumer.sh \
–topic topicA \
–from-beginning –max-messages 5 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVER \
–consumer.config config/client.properties

The output should look similar to the following example.

Sample message output from Kafka topic

Spark Resources in Amazon S3

To submit and run the five Spark Jobs included in the project, you will need to copy the following resources to your Amazon S3 bucket: (5) Apache Spark jobs, (5) related JAR dependencies, and (2) sample data files.

PySpark Applications
To start, copy the five PySpark applications to a scripts/ subdirectory within your Amazon S3 bucket.

PySpark applications uploaded to the Amazon S3 bucket

Sample Data
Next, copy the two sample data files to a sample_data/ subdirectory within your Amazon S3 bucket. The large file contains 2,000 messages, while the small file contains 600 messages. These two files can be used interchangeably with the post’s final streaming example.

Sample sales data uploaded to the Amazon S3 bucket

PySpark Dependencies
Lastly, the PySpark applications have a handful of JAR dependencies that must be available when the job runs, which are not on the EMR Serverless classpath by default. If you are unsure which JARs are already on the EMR Serverless classpath, you can check the Spark UI’s Environment tab’s Classpath Entries section. Accessing the Spark UI is demonstrated in the first PySpark application example, below.

Spark UI’s Environment tab showing Classpath Entries

It is critical to choose the correct version of each JAR dependency based on the version of libraries used with the EMR and MSK. Using the wrong version or inconsistent versions, especially Scala, can result in job failures. Specifically, we are targeting Spark 3.2.1 and Scala 2.12 (EMR v6.7.0: Amazon’s Spark 3.2.1, Scala 2.12.15, Amazon Corretto 8 version of OpenJDK), and Apache Kafka 2.8.1 (MSK Serverless: Kafka 2.8.1).

Download the seven JAR files locally, then copy them to a jars/ subdirectory within your Amazon S3 bucket.

Dependency JARs uploaded to the Amazon S3 bucket

PySpark Applications Examples

With the EMR Serverless Application, MSK Serverless Cluster, Kafka topics, and sample data created, and the Spark resources uploaded to Amazon S3, we are ready to explore four different Spark examples.

Example 1: Kafka Batch Aggregation to the Console

The first PySpark application, 01_example_console.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to the console (stdout).

There are no hard-coded values in any of the PySpark application examples. All required environment-specific variables, such as your MSK Serverless bootstrap server (host and port) and Amazon S3 bucket name, will be passed to the running Spark jobs as arguments from the spark-submit command.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to the console (stdout)
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("01-example-console") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your first PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. You will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 01-example-console \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/01_example_console.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see the new Spark job you just submitted in one of several job states.

EMR Studio Serverless Application details console

You can click on the Spark job to get more details. Note the Script arguments and Spark properties passed in from the spark-submit command.

EMR Studio Serverless Application details Job details view

From the Spark job details tab, access the Spark UI, aka Spark Web UI, from a button in the upper right corner of the screen. If you have experience with Spark, you are most likely familiar with the Spark Web UI to monitor and tune Spark jobs.

Spark History Server UI

From the initial screen, the Spark History Server tab, click on the App ID. You can access an enormous amount of Spark-related information about your job and EMR environment from the Spark Web UI.

Spark UI’s Stages tab
Spark UI’s Stages tab showing a Directed acyclic graph (DAG) Visualization
Spark UI’s Environment tab showing environment variables, including versions of Spark, Java, and Scala

The Executors tab will give you access to the Spark job’s output. The output we are most interested in is the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI ‘s Executors tab

The stderr contains output related to the running Spark job. Below we see an example of Kafka consumer configuration values output to stderr. Several of these values were passed in from the Spark job, including items such as kafka.bootstrap.serverssecurity.protocolsasl.mechanism, and sasl.jaas.config.

driver executor’s stderr output to the console

The stdout from the driver executor contains the console output as directed from the Spark job. Below we see the successfully aggregated results of the first Spark job, output to stdout.

+——————+——+——+
|country |sales |orders|
+——————+——+——+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+——————+——+——+
only showing top 25 rows

Example 2: Kafka Batch Aggregation to CSV in S3

Although the console is useful for development and debugging, it is typically not used in Production. Instead, Spark typically sends results to S3 as CSV, JSON, Parquet, or Arvo formatted files, to Kafka, to a database, or to an API endpoint. The second PySpark application, 02_example_csv_s3.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a CSV file in Amazon S3.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to CSV file in Amazon S3
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("02-example-csv-s3") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.csv(path=f"s3a://{args.s3_bucket}/output/", header=True, sep="|")
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your second PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first example, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 02-example-csv-s3 \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/02_example_csv_s3.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

If successful, the Spark job should create a single CSV file in the designated Amazon S3 key (directory path) and an empty _SUCCESS indicator file. The presence of an empty _SUCCESS file signifies that the save() operation completed normally.

Amazon S3 bucket showing CSV file output by Spark job

Below we see the expected pipe-delimited output from the second Spark job.

country|sales|orders
India|138.80|20
China|133.80|20
Mexico|106.86|14
Japan|100.86|14
Brazil|96.87|13
Russian Federation|94.87|13
United States|92.86|14
Nigeria|58.93|7
Philippines|58.92|8
South Africa|46.94|6
Argentina|42.93|7
Germany|39.96|4
Indonesia|38.95|5
Italy|35.95|5
Iran|33.95|5

Example 3: Kafka Batch Aggregation to Kafka

The third PySpark application, 03_example_kafka.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a second Kafka topic, topicB. This job now has both read and write options.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to topicB
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("03-example-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,