Streaming Data on AWS: Amazon Kinesis Data Streams or Amazon MSK?

Given similar functionality, what differences make one AWS-managed streaming service a better choice over the other?

Kinesis Data Streams and Amazon MSK can often be used interchangeably in streaming data workflows
Kinesis Data Streams and Amazon MSK can often be used interchangeably in streaming data workflows

Data streaming has emerged as a powerful tool in the last few years thanks to its ability to quickly and efficiently process large volumes of data, provide real-time insights, and scale and adapt to meet changing needs. As IoT, social media, and mobile devices continue to generate vast amounts of data, it has become imperative to have platforms that can handle the real-time ingestion, processing, and analysis of this data.

Key Differentiators

Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK) are two managed streaming services offered by AWS. While both platforms offer similar features, choosing the right service largely depends on your specific use cases and business requirements.

Amazon Kinesis Data Streams

  1. Simplicity: Kinesis Data Streams is generally considered a less complicated service than Amazon MSK, which requires you to manage more of the underlying infrastructure. This can make setting up and managing your streaming data pipeline easier, especially if you have limited experience with Apache Kafka. Amazon MSK Serverless, which went GA in April 2022, is a cluster type for Amazon MSK that allows you to run Apache Kafka without managing and scaling cluster capacity. Unlike Amazon MSK provisioned, Amazon MSK Serverless greatly reduces the effort required to use Amazon MSK, making ‘Simplicity’ less of a Kinesis differentiator.
  2. Integration with AWS services: Kinesis Data Streams integrates well with other AWS services, such as AWS Lambda, Amazon S3, and Amazon OpenSearch. This can make building end-to-end data processing pipelines easier using these services.
  3. Low latency: Kinesis Data Streams is designed to deliver low-latency processing of streaming data, which can be important for applications that require near real-time processing.
  4. Predictable pricing: Kinesis Data Streams is generally considered to have a more predictable pricing model than Amazon MSK, based on instance sizes and hourly usage. With Kinesis Data Streams, you pay for the data you process, making estimating and managing fees easier (additional fees may apply).
Example Architecture #1: Ingestion of features into SageMaker Feature Store using Amazon Kinesis
Example Architecture #1: Ingestion of features into SageMaker Feature Store using Amazon Kinesis

Amazon MSK

  1. Compatibility with Apache Kafka: Amazon MSK may be a better choice if you have an existing Apache Kafka deployment or are already familiar with Kafka. Amazon MSK is a fully managed version of Apache Kafka, which you can use with existing Kafka applications and tools.
  2. Customization: With Amazon MSK, you have more control over the underlying cluster infrastructure, configuration, deployment, and version of Kafka, which means you can customize the cluster to meet your needs. This can be important if you have specialized requirements or want to optimize performance (e.g., high-volume financial trading, real-time gaming).
  3. Larger ecosystem: Apache Kafka has a large ecosystem of tools and integrations compared to Kinesis Data Streams. This can provide flexibility and choice when building and managing your streaming data pipeline. Some common tools include MirrorMaker, Kafka Connect, LinkedIn’s Cruise Control, kcat (fka kafkacat), Lenses, Confluent Schema Registry, and Appicurio Registry.
  4. Preference for Open Source: You may prefer the flexibility, transparency, pace of innovation, and interoperability of employing open source software (OSS) over proprietary software and services for your streaming solution.
Example Architecture #2: Event-driven microservices using Amazon MSK
Example Architecture #2: Event-driven microservices using Amazon MSK

Ultimately, the choice between Amazon Kinesis Data Streams and Amazon MSK will depend on your specific needs and priorities. Kinesis Data Streams might be better if you prioritize simplicity, integration with other AWS services, and low latency. If you have an existing Kafka deployment, require more customization, or need access to a larger ecosystem of tools and integrations, Amazon MSK might be a better fit. In my opinion, the newer Amazon MSK Serverless option lessens several traditional differentiators between the two services.

Scaling Capabilities

Amazon Kinesis Data Streams and Amazon MSK are designed to be scalable streaming services that can handle large volumes of data. However, there are some differences in their scaling capabilities.

Amazon Kinesis Data Streams

  1. Scalability: Kinesis Data Streams has two capacity modes, on-demand and provisioned. With the on-demand mode, Kinesis Data Streams automatically manages the shards to provide the necessary throughput based on the amount of data you process. This means the service can automatically adjust the number of shards based on the incoming data volume, allowing you to handle increased traffic without manually adjusting the infrastructure.
  2. Limitations: Per the documentation, there is no upper quota on the number of streams with the provisioned mode you can have in an account. A shard can ingest up to 1 MB of data per second (including partition keys) or 1,000 records per second for writes. The maximum size of the data payload of a record before base64-encoding is up to 1 MB. GetRecords can retrieve up to 10 MB of data per call from a single shard and up to 10,000 records per call. Each call to GetRecords is counted as one read transaction. Each shard can support up to five read transactions per second. Each read transaction can provide up to 10,000 records with an upper quota of 10 MB per transaction. Each shard can support a maximum total data read rate of 2 MB per second via GetRecords. If a call to GetRecords returns 10 MB, subsequent calls made within the next 5 seconds throw an exception.
  3. Cost: Kinesis Data Streams has two capacity modes — on-demand and provisioned — with different pricing models. With on-demand capacity mode, you pay per GB of data written and read from your data streams. You do not need to specify how much read and write throughput you expect your application to perform. With provisioned capacity mode, you select the number of shards necessary for your application based on its write and read request rate. There are additional fees PUT Payload Units, enhanced fan-out, extended data retention, and retrieval of long-term retention data.

Amazon MSK

  1. Scalability: Amazon MSK is designed to be highly scalable and can handle millions of messages per second. With Amazon MSK provisioned, you can scale your Kafka cluster by adding or removing instances (brokers) and storage as needed. Amazon MSK can automatically rebalance partitions across instances. Alternately, Amazon MSK Serverless automatically provisions and scales capacity while managing the partitions in your topic, so you can stream data without thinking about right-sizing or scaling clusters.
  2. Flexibility: With Amazon MSK, you have more control over the underlying infrastructure, which means you can customize the deployment to meet your needs. This can be important if you have specialized requirements or want to optimize performance.
  3. Amazon MSK also offers multiple authentication methods. You can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, with Amazon MSK provisioned, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions.
  4. Cost: Scaling up or down with Amazon MSK can impact the cost based on instance sizes and hourly usage. Therefore, adding more instances can increase the overall cost of the service. Pricing models for Amazon MSK and Amazon MSK Serverless vary.

Amazon Kinesis Data Streams and Amazon MSK are highly scalable services. Kinesis Data Streams can scale automatically based on the amount of data you process. At the same time, Amazon MSK allows you to scale your Kafka cluster by adding or removing instances and adding storage as needed. However, adding more shards with Kinesis can lead to a more manual process that can take some time to propagate and impact cost, while scaling up or down with Amazon MSK is based on instance sizes and hourly usage. Ultimately, the choice between the two will depend on your specific use case and requirements.

Throughput

Throughput can be measured in the maximum MB/s of data and the maximum number of records per second. The maximum throughput of both Amazon Kinesis Data Streams and Amazon MSK are not hard limits. Depending on the service, you can exceed these limits by adding more resources, including shards or brokers. Total maximum system throughput is affected by the maximum throughput of both upstream and downstream producing and consuming components.

Amazon Kinesis Data Streams

The maximum throughput of Kinesis Data Streams depends on the number of shards and the size of the data being processed. Each shard in a Kinesis stream can handle up to 1 MB/s of data input and up to 2 MB/s of data output, or up to 1,000 records per second for writes and up to 10,000 records per second for reads. When a consumer uses enhanced fan-out, it gets its own 2 MB/s allotment of read throughput, allowing multiple consumers to read data from the same stream in parallel without contending for read throughput with other consumers.

The maximum throughput of a Kinesis stream is determined by the number of shards you have multiplied by the maximum throughput per shard. For example, if you have a stream with 10 shards, the maximum throughput of the stream would be 10 MB/s for data input and 20 MB/s for data output, or up to 10,000 records per second for writes and up to 100,000 records per second for reads.

The maximum throughput is not a hard limit, and you can exceed these limits by adding more shards to your stream. However, adding more shards can impact the cost of the service, and you should consider the optimal shard count for your use case to ensure efficient and cost-effective processing of your data.

Amazon MSK

As discussed in the Amazon MSK best practices documentation, the maximum throughput of Amazon MSK depends on the number of brokers and the instance type of those brokers. Amazon MSK allows you to scale the number of instances in a Kafka cluster up or down based on your needs.

The maximum throughput of an Amazon MSK cluster depends on the number of brokers and the performance characteristics of the instance types you are using. Each broker in an Amazon MSK cluster can handle tens of thousands of messages per second, depending on the instance type and configuration. The actual throughput you can achieve will depend on your specific use case and the message size. The AWS blog post, Best practices for right-sizing your Apache Kafka clusters to optimize performance and cost, is an excellent reference.

The maximum throughput is not a hard limit, and you can exceed these limits by adding more brokers or upgrading to more powerful instances. However, adding more instances or upgrading to more powerful instances can impact the service’s cost. Therefore, consider your use case’s optimal instance count and type to ensure efficient and cost-effective data processing.

Writing Messages

Compatibility with multiple producers and consumers is essential when choosing a streaming technology. There are multiple ways to write messages to Amazon Kinesis Data Streams and Amazon MSK.

Amazon Kinesis Data Streams

  1. AWS SDK: Use the AWS SDK for your preferred programming language.
  2. Kinesis Producer Library (KPL): KPL is a high-performance library that allows you to write data to Kinesis Data Streams at a high rate. KPL handles all heavy lifting, including batching, retrying failed records, and load balancing across shards.
  3. Amazon Kinesis Data Firehose: Kinesis Data Firehose is a fully managed service that can ingest and transform streaming data in real-time. It can be used to write data to Kinesis Data Streams, as well as to other AWS services such as S3, Redshift, and Elasticsearch.
  4. Amazon Kinesis Data Analytics: Kinesis Data Analytics is a fully managed service that allows you to process and analyze streaming data in real-time. It can read data from Kinesis Data Streams, perform real-time analytics and transformations, and write the results to another Kinesis stream or an external data store.
  5. Kinesis Agent: Kinesis Agent is a standalone Java application that collects and sends data to Kinesis Data Streams. It can monitor log files or other data sources and automatically send data to Kinesis Data Streams as it is generated.
  6. Third-party libraries and tools: There are many third-party libraries and tools available for writing data to Kinesis Data Streams, including Apache Kafka Connect, Apache Storm, and Fluentd. These tools can integrate Kinesis Data Streams with existing data processing pipelines or build custom streaming applications.
Example Architecture #3: Example of using Kinesis Producer Library to write messages to Kinesis Data Streams
Example Architecture #3: Example of using Kinesis Producer Library to write messages to Kinesis Data Streams

Amazon MSK

  1. Kafka command line tools: The Kafka command line tools (e.g., kafka-console-producer.sh) can be used to write messages to a Kafka topic in an Amazon MSK cluster. These tools are part of the Kafka distribution and are pre-installed on the Amazon MSK broker nodes.
  2. Kafka client libraries: You can use Kafka client libraries in your preferred programming language (e.g., Java, Python, C#) to write messages to an Amazon MSK cluster. These libraries provide a more flexible and customizable way to produce messages to Kafka topics.
  3. AWS SDKs: You can use AWS SDKs (e.g., AWS SDK for Java, AWS SDK for Python) to interact with Amazon MSK and write messages to Kafka topics. These SDKs provide a higher-level abstraction over the Kafka client libraries, making integrating Amazon MSK into your AWS infrastructure easier.
  4. Third-party libraries and tools: There are many third-party tools and frameworks, including Apache NiFi, Apache Camel, and Apache Beam. They provide Kafka connectors and producers, which can be used to write messages to Kafka topics in Amazon MSK. These tools can simplify the process of writing messages and provide additional features such as data transformation and routing.

Schema Registry

You can use AWS Glue Schema Registry with Amazon Kinesis Data Streams and Amazon MSK. AWS Glue Schema Registry is a fully managed service that provides a central schema repository for organizing, validating, and tracking the evolution of your data schemas. It enables you to store, manage, and discover schemas for your data in a single, centralized location.

With AWS Glue Schema Registry, you can define and register schemas for your data in the registry. You can then use these schemas to validate the data being ingested into your streaming applications, ensuring that the data conforms to the expected structure and format.

Both Kinesis Data Streams and Amazon MSK support the use of AWS Glue Schema Registry through the use of Apache Avro schemas. Avro is a compact, fast, binary data format that can improve the performance of your streaming applications. You can configure your streaming applications to use the registry to validate incoming data, ensuring that it conforms to the schema before processing.

Using AWS Glue Schema Registry can help ensure the consistency and quality of your data across your streaming applications and provide a centralized location for managing and tracking schema changes. Amazon MSK is also compatible with popular alternative schema registries, such as Confluent Schema Registry and RedHat’s open-source Apicurio Registry.

Example Architecture #4: Change Data Capture (CDC) using Amazon MSK and Glue Schema Registry
Example Architecture #4: Change Data Capture (CDC) using Amazon MSK and Glue Schema Registry

Stream Processing

According to TechTarget, Stream processing is a data management technique that involves ingesting a continuous data stream to quickly analyze, filter, transform, or enhance the data in real-time. Several leading stream processing tools are available, compatible with Amazon Kinesis Data Streams and Amazon MSK. Each tool with its own strengths and use cases. Some of the more popular tools include:

  1. Apache Flink: Apache Flink is a distributed stream processing framework that provides fast, scalable, and fault-tolerant data processing for real-time and batch data streams. It supports a variety of data sources and sinks and provides a powerful stream processing API and SQL interface. In addition, Amazon offers its managed version of Apache Flink, Amazon Kinesis Data Analytics (KDA), which is compatible with both Amazon Kinesis Data Streams and Amazon MSK.
  2. Apache Spark Structured Streaming: Apache Spark Structured Streaming is a stream processing framework that allows developers to build real-time stream processing applications using the familiar Spark API. It provides high-level APIs for processing data streams and supports integration with various data sources and sinks. Apache Spark is compatible with both Amazon Kinesis Data Streams and Amazon MSK. Spark Streaming is available as a managed service on AWS via AWS Glue Studio and Amazon EMR.
  3. Apache NiFi: Apache NiFi is an open-source data integration and processing tool that provides a web-based UI for building data pipelines. It supports batch and stream processing and offers a variety of processors for data ingestion, transformation, and delivery. Apache NiFi is compatible with both Amazon Kinesis Data Streams and Amazon MSK.
  4. Amazon Kinesis Data Firehose (KDA): Kinesis Data Firehose is a fully managed service that can ingest and transform streaming data in real time. It can be used to write data to Kinesis Data Streams, as well as to other AWS services such as S3, Redshift, and Elasticsearch. Kinesis Data Firehose is compatible with Amazon Kinesis Data Streams and Amazon MSK.
  5. Apache Kafka Streams (aka KStream): Apache Kafka Streams is a lightweight stream processing library that allows developers to build scalable and fault-tolerant real-time applications and microservices. KStreams integrates seamlessly with Amazon MSK and provides a high-level DSL for stream processing.
  6. ksqlDB: ksqlDB is a database for building stream processing applications on top of Apache Kafka. It is distributed, scalable, reliable, and real-time. ksqlDB combines the power of real-time stream processing with the approachable feel of a relational database through a familiar, lightweight SQL syntax. ksqlDB is compatible with Amazon MSK.

Several stream-processing tools are detailed in my recent two-part blog post, Exploring Popular Open-source Stream Processing Technologies.

Conclusion

Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK) are managed streaming services. While they offer similar functionality, some differences might make one a better choice, depending on your use cases and experience. Ensure you understand your streaming requirements and each service’s capabilities before making a final architectural decision.

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

Unlocking the Potential of Generative AI for Synthetic Data Generation

Explore the capabilities and applications of generative AI to create realistic synthetic data for software development, analytics, and machine learning

Licensed image: Yurchanka Siarhei/Shutterstock
Licensed image: Yurchanka Siarhei/Shutterstock

Introduction

Generative AI refers to a class of artificial intelligence algorithms capable of generating new data similar to a given dataset. These algorithms learn the underlying patterns and relationships in the data and use this knowledge to create new data consistent with the original dataset. Generative AI is a rapidly evolving field that has the potential to revolutionize the way we generate and use data.

Generative AI can generate synthetic data based on patterns and relationships learned from actual data. This ability to generate synthetic data has numerous applications, from creating realistic virtual environments for training and simulation to generating new data for machine learning models. In this article, we will explore the capabilities of generative AI and its potential to generate synthetic data, both directly and indirectly, for software development, data analytics, and machine learning.

Common Forms of Synthetic Data

According to AltexSoft, in their article Synthetic Data for Machine Learning: its Nature, Types, and Ways of Generation, common forms of synthetic data include:

  1. Tabular data: This type of synthetic data is often used to generate datasets that resemble real-world data in terms of structure and statistical properties.
  2. Time series data: This type of synthetic data generates datasets that resemble real-world time series data. It is commonly used when real-world time series data is unavailable or too expensive.
  3. Image and video data: This synthetic data is used to generate realistic images and videos for training machine learning models or simulations.
  4. Text data: This synthetic data generates realistic text for natural language processing tasks or for generating training data for machine learning models.
  5. Sound data: This synthetic data generates realistic sound for training machine learning models or simulations.

Synthetic Tabular Data Types

Synthetic tabular data refers to artificially generated datasets that resemble real-world tabular data in terms of structure and statistical properties. Tabular data is organized into rows and columns, like tables or spreadsheets. Some specific types of synthetic tabular data include:

  1. Financial data: Synthetic datasets that resemble real-world financial data such as bank transactions, stock prices, or credit card information.
  2. Customer data: Synthetic datasets that resemble real-world customer data, such as purchase history, demographic information, or customer behavior.
  3. Medical data: Synthetic datasets that resemble real-world medical data, such as patient records, medical test results, or treatment history.
  4. Sensor data: Synthetic datasets that resemble real-world sensor data such as temperature readings, humidity levels, or air quality measurements.
  5. Sales data: Synthetic datasets that resemble real-world sales data, such as sales transactions, product information, or customer behavior.
  6. Inventory data: Synthetic datasets that resemble real-world inventory data, such as stock levels, product information, or supplier information.
  7. Marketing data: Synthetic datasets that resemble real-world marketing data, such as campaign performance, customer behavior, or market trends.
  8. Human resources data: Synthetic datasets that resemble real-world human resources data, such as employee records, performance evaluations, or salary information.

Challenges with Creating Synthetic Data

According to sources including Towards Data Science, enov8, and J.P. Morgan, there are several challenges in creating synthetic data, including:

  1. Technical difficulty: Properly modeling complex real-world behaviors such as synthetic data is challenging, given available technologies.
  2. Biased behavior: The flexible nature of synthetic data makes it prone to potentially biased results.
  3. Privacy concerns: Care must be taken to ensure synthetic data does not reveal sensitive information.
  4. Quality of the data model: If the quality of the data model is not high, wrong conclusions can be reached.
  5. Time and effort: Synthetic data generation requires time and effort.

Difficult Patterns and Behaviors to Model

Many patterns and behaviors can be challenging to model in synthetic data, for example:

  1. Rare events: If certain events rarely occur in the real world, generating synthetic data that accurately reflects their distribution can be difficult.
  2. Complex relationships: Synthetic data generators may struggle to capture complex relationships between variables, such as non-linear interactions, feedback loops, or conditional dependencies.
  3. Contextual variability: Contextual factors, such as time, location, or individual differences, can have a significant impact on the distribution of data. Modeling this variability accurately can be challenging.
  4. Outliers and anomalies: Synthetic data generators may be unable to generate outliers or anomalies that are realistic and representative of the real world.
  5. Dynamic data: If the data is dynamic and changes over time, it can be challenging to generate synthetic data that captures these changes accurately.
  6. Unobserved variables: Sometimes, variables may be necessary for understanding the data distribution but are not directly observable. These variables can be challenging to model in synthetic data.
  7. Data bias: If the real-world data is biased in some way, such as over-representation of certain groups or under-representation of others, it can be challenging to generate synthetic data that is unbiased and representative of the population.
  8. Time-dependent patterns: If the data exhibits time-dependent patterns, such as seasonality or trends, it can be challenging to generate synthetic data that accurately reflects them.
  9. Spatial patterns: If the data has a spatial component, such as location data or images, it can be challenging to generate synthetic data that captures the spatial patterns realistically.
  10. Data sparsity: If the data is sparse or incomplete, it can be difficult to generate synthetic data that accurately reflects the distribution of the entire dataset.
  11. Human behavior: If the data involves human behavior, such as in social science or behavioral economics, it can be challenging to model the complex and nuanced behaviors of individuals and groups.
  12. Sensitive or confidential information: In some cases, the data may contain sensitive or confidential information that cannot be shared, making it challenging to generate synthetic data that preserves privacy while accurately reflecting the underlying distribution.

Overall, many patterns can be challenging to model accurately in synthetic data. It often requires careful consideration of the specific data characteristics and the synthetic data generation techniques’ limitations.

Easily Modeled Patterns and Behaviors

There are many simple and well-understood patterns that can be easily modeled in synthetic data, for example:

  1. Randomness: If the data is purely random, it can be easily generated using a random number generator or other simple techniques.
  2. Gaussian distribution: If the data follows a Gaussian or normal distribution, it can be generated using a Gaussian random number generator.
  3. Uniform distribution: If the data follows a uniform distribution, it can be easily generated using a uniform random number generator.
  4. Linear relationships: If the data follows a linear relationship between variables, it can be modeled using simple linear regression techniques.
  5. Categorical variables: If the data consists of categorical variables, such as gender or occupation, it can be generated using a categorical distribution.
  6. Text data: If the data consists of text, it can be generated using natural language processing techniques, such as language models or text generation algorithms.
  7. Time series: If the data consists of time series data, such as stock prices or weather data, it can be generated using time series models, such as Autoregressive integrated moving average (ARIMA) or Long short-term memory (LSTM).
  8. Seasonality: If the data exhibits seasonal patterns, such as higher sales data during holiday periods, it can be generated using seasonal time series models.
  9. Proportions and percentages: If the data consists of proportions or percentages, such as product sales distribution across different regions, it can be generated using beta or Dirichlet distributions.
  10. Multivariate normal distribution: If the data follows a multivariate normal distribution, it can be generated using a multivariate Gaussian random number generator.
  11. Networks: If the data consists of network or graph data, such as social networks or transportation networks, it can be generated using network models, such as Erdos-Renyi or Barabasi-Albert models.
  12. Binary data: If the data consists of binary data, such as whether a customer churned, it can be generated using a Bernoulli distribution.
  13. Geospatial data: If it involves geospatial data, such as the location of points of interest, it can be easily using geospatial models, such as point processes or spatial point patterns.
  14. Customer behaviors: If the data involves customer behaviors, such as browsing or purchase histories, it can be generated using customer journey models, such as Markov models.

In general, simple and well-understood patterns can be easily modeled using synthetic data techniques. In contrast, more complex and nuanced patterns may require more sophisticated modeling techniques and a deeper understanding of the underlying data characteristics.

Creating Synthetic Data with Generative AI

We can use many popular generative AI-powered tools to create synthetic data for testing applications, constructing analytics pipelines, and building machine learning models. Tools include OpenAI ChatGPT, Microsoft’s all-new Bing Chat, ChatSonic, Tabnine, GitHub Copilot, and Amazon CodeWhisperer. For more information on these tools, check out my recent blog post:

Accelerating Development with Generative AI-Powered Coding Tools
Explore six popular generative AI-powered tools, including ChatGPT, Copilot, CodeWhisperer, Tabnine, Bing, and…garystafford.medium.com

Let’s start with a simple example of generating synthetic sales data. Suppose we have created a new sales forecasting application for coffee shops that we need to test using synthetic data. We might start by prompting a generative AI tool like OpenAI’s ChatGPT for some data:

Create a CSV file with 25 random sales records for a coffee shop.
Each record should include the following fields:
- id (incrementing integer starting at 1)
- date (random date between 1/1/2022 and 12/31/2022)
- time (random time between 6:00am and 9:00pm in 1-minute increments)
- product_id (incrementing integer starting at 1)
- product
- calories
- price in USD
- type (drink or food)
- quantity (random integer between 1 and 3)
- amount (price * quantity)
- payment type (cash, credit, debit, or gift card)

The content and structure of a prompt can vary, and this can strongly influence ChatGPT’s response. Based on the above prompt, the results were accurate but not very useful for testing our application in this format. ChatGPT cannot create a physical CSV file. Furthermore, ChatGPT’s response length is limited; only about twenty records were returned. According to ChatGPT, in general, ChatGPT can generate responses of up to 2,048 tokens, the maximum output length allowed by the GPT-3 model.

OpenAI ChatGPT UI

Instead of outputting the actual synthetic data, we could ask ChatGPT to write a program that can, in turn, generate the synthetic data. This option is certainly more scalable. Let’s prompt ChatGPT to write a Python program to generate synthetic sales data with the same characteristics as before:

Create a Python3 program to generate 100 sales of common items 
sold in a coffee shop. The data should be written to a CSV file
and include a header row. Each record should include the following fields:
- id (incrementing integer starting at 1)
- date (random date between 1/1/2022 and 12/31/2022)
- time (random time between 6:00am and 9:00pm in 1-minute increments)
- product_id (incrementing integer starting at 1)
- product
- calories
- price in USD
- type (drink or food)
- quantity (random integer between 1 and 3)
- amount (price * quantity)
- payment type (cash, credit, debit, or gift card)

Using a single concise prompt, ChatGPT generated a complete Python program, including code comments, to generate synthetic sales data. Unfortunately, given ChatGPT’s response size limitation, the coffee shop menu was limited to just six items. Reprompting for more items would result in the truncation of the output and, thus, the program, making it unrunnable. Instead, we could use an additional prompt to generate a longer Python list of menu items and combine the two pieces of code in our IDE. Regardless, we will still need to copy and paste the code into our IDE to review, debug, test, and run.

OpenAI ChatGPT UI

ChatGPT’s Python program, copied and pasted into VS Code, ran without modifications, and wrote 100 synthetic sales records to a CSV file!

Running ChatGPT’s Python program in VS Code

Using IDE-based Generative AI Tools

Although generating synthetic data directly or snippets of code in chat-based generative AI tools are helpful for limited use cases, writing code in IDE gives us several advantages:

  1. Code does not need to be copied and pasted from external sources into an IDE
  2. Consecutive lines of code, method, and block code completion overcome the single response size limits of chat-based tools like OpenAI ChatGPT
  3. Code can be reused and adapted to evolving use cases over time
  4. Python interpreter and debugger or equivalent for other languages
  5. Automatic code formatting, linting, and code style enforcement
  6. Unit, integration, and functional testing
  7. Static code analysis (SCA)
  8. Vulnerability scanning
  9. IntelliSense for code completion
  10. Source code management (SCM) / version control

Let’s use the same techniques we used with ChatGPT, but from within an IDE to generate three types of synthetic data. We will choose Microsoft’s VS Code with GitHub Copilot and Python as our programming language.

VS Code IDE with GitHub Copilot Nightly extension installed

Source Code

All the code examples shown in this post can be found on GitHub.

Example #1: Coffee Shop Sales Data

First, we will start by outlining the program’s objective using code comments on the top of our Python file. This detailed context helps us to clearly express our goal and enables Copilot to generate an accurate response.

# Write a program that creates synthetic sales data for a coffee shop.
# The program should accept a command line argument that specifies the number of records to generate.
# The program should write the sales data to a file called 'coffee_shop_sales_data.csv'.
# The program should contain the following functions:
# - main() function that calls the other functions
# - function that returns one random product from a list of dictionaries
# - function that returns a dictionary containing one sales record
# - function that writes the sales records to a file

Following the import statements also generated with the assistance of Copilot, we will write the first function to return a random product from a list of 25 products. Again, we will use code comments as a prompt to generate the code. Copilot was able to generate 100% of the function’s code from the comments.

# Write a function to create list of dictionaries.
# The list of dictionaries should contain 15 drink items and 10 food items sold in a coffee shop.
# Include the product id, product name, calories, price, and type (Food or Drink).
# Capilize the first letter of each product name.
# Return a random item from the list of dictionaries.

Below is an example of Copilot’s ability to generate complete lines of code. Ultimately, it generated 100% of the function including choosing the items sold in a coffee shop, with a reasonable price and caloric count. Copilot is not limited to just understanding code.

Copilot can generate entire lines of code

Next, we will write a function to return a random sales record.

# Write a function to return a random sales record.
# The record should be a dictionary with the following fields:
# - id (an incrementing integer starting at 1)
# - date (a random date between 1/1/2022 and 12/31/2022)
# - time (a random time between 6:00am and 9:00pm in 1 minute increments)
# - product_id, product, calories, price, and type (from the get_product function)
# - quantity (a random integer between 1 and 3)
# - amount (price * quantity)
# - payment type (Cash, Credit, Debit, Gift Card, Apple Pay, Google Pay, or Venmo)

Again, Copilot generated 100% of the function’s code as a single block based on the code comments.

Copilot can generate entire blocks of code or methods

Lastly, we will create a function to write the sales data into a CSV file using Copilot’s help.

# Write a function to write the sales records to a CSV file called 'coffee_shop_sales.csv'.
# Use an input parameter to specify the number of records to write.
# The CSV file must have a header row and be comma delimited.
# All string values must be enclosed in double quotes.

Again below, we see an example of Copilot’s ability to generate an entire Python function. I needed to correct a few problems with the generated code. First, there was a lack of quotes for string values, which I added to the function (quotechar='"', quoting=csv.QUOTE_NONNUMERIC). Also, the function was missing a key line of code, sale = get_sales_record(), which would have caused the code to fail. Remember, just because the code was generated does not mean it is correct.

Copilot can generate entire blocks of code or methods

Here is the complete program that creates synthetic sales data for a coffee shop with Copilot assistance:

# Purpose: Generate coffee shop sales data
# Author: Gary A. Stafford and GitHub Copilot
# Date: 2023-04-12
# Usage: python3 coffee_shop_data_gen.py 100
# Command-line argument(s): rec_count (number of records to generate as an integer)
# Write a program that creates synthetic sales data for a coffee shop.
# The program should accept a command line argument that specifies the number of records to generate.
# The program should write the sales data to a file called 'coffee_shop_sales_data.csv'.
# The program should contain the following functions:
# – main() function that calls the other functions
# – function that returns one random product from a list of dictionaries
# – function that returns a dictionary containing one sales record
# – function that writes the sales records to a file
import argparse
import csv
import hashlib
import random
from datetime import datetime, timedelta
def main():
# create a parser object
parser = argparse.ArgumentParser(
description="Generate coffee shop sales data")
# add a command line argument to specify the number of records to generate
parser.add_argument("num_recs",
type=int,
help="The number of records to generate",
default=100)
num_recs = parser.parse_args().num_recs
write_data(num_recs)
# Write a function to create list of dictionaries.
# The list of dictionaries should contain 15 drink items and 10 food items sold in a coffee shop.
# Include the product id, product name, calories, price, and type (Food or Drink).
# Capilize the first letter of each product name.
# Return a random item from the list of dictionaries.
def get_product():
products = [
{"id": 1, "product": "Latte", "calories": 120, "price": 3.50, "type": "Drink"},
{"id": 2, "product": "Cappuccino", "calories": 100, "price": 3.00, "type": "Drink"},
{"id": 3, "product": "Americano", "calories": 5, "price": 2.50, "type": "Drink"},
{"id": 4, "product": "Espresso", "calories": 10, "price": 2.00, "type": "Drink"},
{"id": 5, "product": "Mocha", "calories": 250, "price": 4.00, "type": "Drink"},
{"id": 6, "product": "Iced Coffee", "calories": 80, "price": 2.50, "type": "Drink"},
{"id": 7, "product": "Hot Chocolate", "calories": 300, "price": 3.50, "type": "Drink"},
{"id": 8, "product": "Tea", "calories": 0, "price": 2.00, "type": "Drink"},
{"id": 9, "product": "Frappe", "calories": 450, "price": 5.00, "type": "Drink"},
{"id": 10, "product": "Smoothie", "calories": 200, "price": 4.00, "type": "Drink"},
{"id": 11, "product": "Iced Tea", "calories": 0, "price": 2.50, "type": "Drink"},
{"id": 12, "product": "Lemonade", "calories": 120, "price": 3.00, "type": "Drink"},
{"id": 13, "product": "Hot Tea", "calories": 0, "price": 2.00, "type": "Drink"},
{"id": 14, "product": "Chai Tea", "calories": 200, "price": 3.50, "type": "Drink"},
{"id": 15, "product": "Iced Chai", "calories": 250, "price": 4.00, "type": "Drink"},
{"id": 16, "product": "Croissant", "calories": 231, "price": 2.99, "type": "Food"},
{"id": 17, "product": "Bagel", "calories": 289, "price": 3.49, "type": "Food"},
{"id": 18, "product": "Muffin", "calories": 426, "price": 3.99, "type": "Food"},
{"id": 19, "product": "Sandwich", "calories": 512, "price": 6.99, "type": "Food"},
{"id": 20, "product": "Wrap", "calories": 388, "price": 5.99, "type": "Food"},
{"id": 21, "product": "Salad", "calories": 231, "price": 7.99, "type": "Food"},
{"id": 22, "product": "Quiche", "calories": 456, "price": 4.99, "type": "Food"},
{"id": 23, "product": "Scone", "calories": 335, "price": 2.49, "type": "Food"},
{"id": 24, "product": "Pastry", "calories": 397, "price": 3.99, "type": "Food"},
{"id": 25, "product": "Cake", "calories": 512, "price": 5.99, "type": "Food"},
]
# return one random item from list of dictionaries
return random.choice(products)
# Write a function to return a random sales record.
# The record should be a dictionary with the following fields:
# – transaction_id (a hash of the date, time, and product_id)
# – date (a random date between 1/1/2022 and 12/31/2022)
# – time (a random time between 6:00am and 9:00pm in 1 minute increments)
# – product_id, product, calories, price, and type (from the get_product function)
# – quantity (a random integer between 1 and 3)
# – amount (price * quantity)
# – payment type (Cash, Credit, Debit, Gift Card, Apple Pay, Google Pay, or Venmo)
def get_sales_record():
# get a random product
product = get_product()
# get a random date between 1/1/2022 and 12/31/2022
start_date = datetime(2022, 1, 1)
end_date = datetime(2022, 12, 31)
random_date = start_date + timedelta(
# Get a random number of seconds between 0 and the number of seconds between start_date and end_date
seconds=random.randint(0, int(
(end_date start_date).total_seconds())), )
# get a random time between 6:00am and 9:00pm
start_time = datetime.strptime("6:00am", "%I:%M%p")
end_time = datetime.strptime("9:00pm", "%I:%M%p")
random_time = start_time + timedelta(
# Get a random number of seconds between 0 and the number of seconds between start_time and end_time
seconds=random.randint(0, int(
(end_time start_time).total_seconds())), )
# get a random quantity between 1 and 3
random_quantity = random.randint(1, 3)
# get a random payment type:
# Cash, Credit, Debit, Gift card, Apple Pay, Google Pay, Venmo
random_payment_type = random.choice(
["Cash", "Credit", "Debit", "Gift card", "Apple Pay", "Google Pay", "Venmo"])
sales_record = {
"date": random_date.strftime("%m/%d/%Y"),
"time": random_time.strftime("%H:%M:%S"),
"product_id": product["id"],
"product": product["product"],
"calories": product["calories"],
"price": product["price"],
"type": product["type"],
"quantity": random_quantity,
"amount": product["price"] * random_quantity,
"payment_type": random_payment_type,
}
return sales_record
# Write a function to write the sales records to a CSV file called 'coffee_shop_sales_data.csv'.
# Use an input parameter to specify the number of records to write.
# Call the get_sales_record function once for each record to write.
# The CSV file must have a header row and be comma delimited.
# All string values must be enclosed in double quotes.
def write_data(rec_count):
# open the file for writing
with open("output/coffee_shop_sales_data.csv", "w", newline="") as csv_file:
# create a csv writer object
csv_writer = csv.writer(csv_file,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_NONNUMERIC)
# write the header row
# id,date,time,product_id,product,calories,price,type,quantity,amount,payment_type
csv_writer.writerow([
"transaction_id",
"date",
"time",
"product_id",
"product",
"calories",
"price",
"type",
"quantity",
"amount",
"payment_type",
])
# write the sales records
for i in range(rec_count):
sale = get_sales_record()
transaction_id = hashlib.md5((f'{sale["date"]} {sale["time"]} {sale["product_id"]}').encode()).hexdigest()
csv_writer.writerow([
transaction_id,
sale["date"],
sale["time"],
sale["product_id"],
sale["product"],
sale["calories"],
sale["price"],
sale["type"],
sale["quantity"],
sale["amount"],
sale["payment_type"],
])
if __name__ == "__main__":
main()

Copilot generated an astounding 80–85% of the program’s final code. The initial program took 10–15 minutes to write using code comments. I then added a few new features, including the ability to pass in the record count on the command line and the hash-based transaction id, which took another 5 minutes. Finally, I used GitHub Code Brushes to optimize the code and generate the Python docstrings, and Black Formatter and Flake8 extensions to format and lint, all of which took less than 5 minutes. With testing and debugging, the total time was about 25–30 minutes.

The most significant difference with Copilot was that I never had to leave the IDE to look up code references or find existing sales datasets or even a coffee shop menu to duplicate. The code, as well as the list of products, price, calories, and product type, were all generated by Copilot.

To make this example more realistic, you could use Copilot’s assistance to write algorithms capable of reflecting daily, weekly, and seasonal variations in product choice and sales volumes. This might include simulating increased sales during the busy morning rush hour or a preference for iced drinks in the summer months versus hot drinks during the winter months.

Here is an example of the synthetic sales data output by the example application:

transaction_id date time product_id product calories price type quantity amount payment_type
47a157f84e727fe3335db1519ee736a6 06/27/2022 19:59:42 22 Quiche 456 4.99 Food 2 9.98 Debit
1bf01013e699ca0f804650ea50826c82 11/20/2022 06:21:14 22 Quiche 456 4.99 Food 3 14.97 Cash
84f41c15749090d1e79bf9a48a58d6c3 08/18/2022 11:50:22 14 Chai Tea 200 3.5 Drink 2 7.0 Apple Pay
ef1845b8438bf3b5b99d2f4891a48f03 11/13/2022 17:20:51 12 Lemonade 120 3.0 Drink 2 6.0 Debit
9863de11be3099d6361392584e30e624 06/03/2022 18:27:03 18 Muffin 426 3.99 Food 2 7.98 Gift card
f50ed8878250bc06f66b97f5cd2f6df7 02/21/2022 17:02:18 7 Hot Chocolate 300 3.5 Drink 2 7.0 Credit
1903169473f41a0275ee702f2c6b1dd6 05/24/2022 14:58:25 10 Smoothie 200 4.0 Drink 3 12.0 Venmo
164a9519fd3db952e721e9f55dc1be74 01/07/2022 14:19:35 14 Chai Tea 200 3.5 Drink 2 7.0 Debit
dc85a202143de48ad4646190cdc0bf5c 01/28/2022 08:52:38 20 Wrap 388 5.99 Food 1 5.99 Venmo
7d182be793ab4b3290d14968e9c9a3e3 02/20/2022 10:16:10 16 Croissant 231 2.99 Food 2 5.98 Google Pay
f524811f456481f5a4a2f0c09dcafa28 10/03/2022 16:19:35 10 Smoothie 200 4.0 Drink 3 12.0 Debit
f2f63eacd33cb9f13849b08638e6fc3d 07/06/2022 16:14:20 10 Smoothie 200 4.0 Drink 3 12.0 Cash
7435e8cc8771a8b538529ff6f873cc05 04/12/2022 16:35:51 14 Chai Tea 200 3.5 Drink 3 10.5 Venmo
0e1154abb6a79b9ece570d51e3116846 11/28/2022 14:44:05 21 Salad 231 7.99 Food 2 15.98 Debit
d5926d58011a2a25798bf2aa72552cf0 04/19/2022 16:06:09 22 Quiche 456 4.99 Food 1 4.99 Venmo

Example #2: Residential Address Data

We could use these same techniques to generate a list of residential addresses. To start, we can prompt Copilot for the values in a list of common street names and street types in the United States:

# Write a function that creates a list of common street names
# in the United States, in alphabetical order.
# List should be in alphabetical order. Each name should be unique.
# Return a random street name.
def get_street_name():
street_names = [
"Ash", "Bend", "Bluff", "Branch", "Bridge", "Broadway", "Brook", "Burg",
"Bury", "Canyon", "Cape", "Cedar", "Cove", "Creek", "Crest", "Crossing",
"Dale", "Dam", "Divide", "Downs", "Elm", "Estates", "Falls", "Fifth",
"First", "Fork", "Fourth", "Glen", "Green", "Grove", "Harbor", "Heights",
"Hickory", "Hill", "Hollow", "Island", "Isle", "Knoll", "Lake", "Landing",
...
]

return random.choice(street_names)


# Write a function that creates a list of common street types
# in the United States, in alphabetical order.
# List should be in alphabetical order. Each name should be unique.
# Return a random street type.
def get_street_type():
street_types = [
"Alley", "Avenue", "Bend", "Bluff", "Boulevard", "Branch", "Bridge", "Brook",
"Burg", "Circle", "Commons", "Court", "Drive", "Highway", "Lane", "Parkway",
"Place", "Road", "Square", "Street", "Terrace", "Trail", "Way"
]

return random.choice(street_types)

Next, we can create a function that returns a property type based on a categorical distribution of common residential property types with the prompt:

# Write a function to return a random property type.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 63% Single-family, 26% Multi-family, 4% Condo,
# 3% Townhouse, 2% Mobile home, 1% Farm, 1% Other.

Again, Copilot generated 100% of the function’s code as a single block based on the code comments.

Copilot generating a categorical distribution of common residential property types

Additionally, we could have Copilot help us generate a list of the 50 largest cities in the United States with state, zip code, and population, with the prompt:

# Write a function to returns the 50 largest cities in the United States.
# List should be sorted in descending order by population.
# Include the city, state abbreviation, zip code, and population.
# Return a list of dictionaries.

Once again, Copilot generated 100% of the function’s code using a combination of single lines and code blocks based on the code comments.

Copilot generating a block of US cities with state, zip code, and populations

When randomly choosing a city, we can use a categorical distribution of populations of all the cities to control the distribution of cities in the final synthetic dataset. For example, there will be more addresses in larger cities like New York City or Los Angeles than in smaller cities like Buffalo or Virginia Beach, with the prompt:

# Write a function that calculates the total population of the list of cities.
# Add a 'pcnt_of_total_population' and 'pcnt_running_total' columns to list.
# Returns a sorted list of cities by population.

Here is the complete program that creates synthetic US-based address data with Copilot assistance:

# Purpose: Generate US residential address data
# Author: Gary A. Stafford and GitHub Copilot
# Date: 2023-04-13
# Usage: python3 residential_address_data_gen.py 100
# Command-line argument(s): rec_count (number of records to generate as an integer)
# Write an application that create a random list of united states addresses.
# The application should accept a command line argument that specifies the number of records to generate.
# Include address, city, state, zip code, country, and property type.
# Write the data to a csv file named 'address_data.csv'.
# The application should contain the following functions:
# – main() function that calls the other functions
# – function that returns a list of common street names in the United States
# – function that returns a list of common street types in the United States
# – function that returns a list of common city, state, zip code, and population in the United States
# – function that returns a property type
import csv
import random
import argparse
cities_final = []
def main():
parser = argparse.ArgumentParser(description="Generate coffee shop sales data")
parser.add_argument(
"rec_count", type=int, help="The number of records to generate", default=100
)
# add population calculations to the city data
cities = get_cities()
prepare_cities(cities)
rec_count = parser.parse_args().rec_count
write_data(rec_count)
# Write a function that creates a list of common street names
# in the United States, in alphabetical order.
# Each one should be unique.
# Return a random street name.
def get_street_name():
street_names = [
"Ash", "Bend", "Bluff", "Branch", "Bridge", "Broadway", "Brook", "Burg",
"Bury", "Canyon", "Cape", "Cedar", "Cove", "Creek", "Crest", "Crossing",
"Dale", "Dam", "Divide", "Downs", "Elm", "Estates", "Falls", "Fifth",
"First", "Fork", "Fourth", "Glen", "Green", "Grove", "Harbor", "Heights",
"Hickory", "Hill", "Hollow", "Island", "Isle", "Knoll", "Lake", "Landing",
"Lawn", "Main", "Manor", "Maple", "Meadow", "Meadows", "Mill", "Mills",
"Mission", "Mount", "Mountain", "Oak", "Oaks", "Orchard", "Park", "Parkway",
"Pass", "Path", "Pike", "Pine", "Place", "Plain", "Plains", "Port", "Prairie",
"Ridge", "River", "Road", "Rock", "Rocks", "Second", "Seventh", "Shoals",
"Shore", "Shores", "Sixth", "Skyway", "Spring", "Springs", "Spur", "Station",
"Summit", "Sunset", "Terrace", "Third", "Trace", "Track", "Trail", "Tunnel",
"Turnpike", "Vale", "Valley", "View", "Village", "Ville", "Vista", "Walk",
"Way", "Well", "Wells", "Wood", "Woods", "Worth"
]
return random.choice(street_names)
# Write a function that creates a list of common street types
# in the United States, in alphabetical order.
# Each one should be unique.
# Return a random street type.
def get_street_type():
street_types = [
"Alley", "Avenue", "Bend", "Bluff", "Boulevard", "Branch", "Bridge", "Brook",
"Burg", "Circle", "Commons", "Court", "Drive", "Highway", "Lane", "Parkway",
"Place", "Road", "Square", "Street", "Terrace", "Trail", "Way"
]
return random.choice(street_types)
# Write a function that calculates the total population of the list of cities.
# Add a 'pcnt_of_total_population' and 'pcnt_running_total' columns to list.
# Returns a sorted list of cities by population.
def prepare_cities(cities):
total_population = 0 # 51,035,885
for city in cities:
total_population += city["population"]
for city in cities:
city["pcnt_of_total_population"] = city["population"] / total_population
global cities_final
cities_final = sorted(cities, key=lambda d: d["population"], reverse=True)
running_total = 1
for city in cities_final:
running_total -= city["pcnt_of_total_population"]
city["pcnt_running_total"] = running_total
# Write a function to returns the 50 largest cities in the United States.
# Include the city, state abbreviation, zip code, and population.
# List should be sorted in descending order by population.
# Return a list of dictionaries.
def get_cities():
cities = [
{"city": "Albuquerque", "state": "NM", "zip": "87102", "population": 559277},
{"city": "Anaheim", "state": "CA", "zip": "92801", "population": 345012},
{"city": "Anchorage", "state": "AK", "zip": "99501", "population": 291826},
{"city": "Arlington", "state": "TX", "zip": "76010", "population": 398121},
{"city": "Atlanta", "state": "GA", "zip": "30303", "population": 486290},
{"city": "Aurora", "state": "CO", "zip": "80010", "population": 325078},
{"city": "Austin", "state": "TX", "zip": "78701", "population": 931830},
{"city": "Bakersfield", "state": "CA", "zip": "93301", "population": 372576},
{"city": "Baltimore", "state": "MD", "zip": "21202", "population": 602495},
{"city": "Boston", "state": "MA", "zip": "02108", "population": 667137},
{"city": "Buffalo", "state": "NY", "zip": "14202", "population": 258959},
{"city": "Charlotte", "state": "NC", "zip": "28202", "population": 872498},
{"city": "Chicago", "state": "IL", "zip": "60602", "population": 2695598},
{"city": "Cincinnati", "state": "OH", "zip": "45202", "population": 296943},
{"city": "Cleveland", "state": "OH", "zip": "44113", "population": 390113},
{"city": "Colorado Springs", "state": "CO", "zip": "80903", "population": 456568},
{"city": "Columbus", "state": "OH", "zip": "43215", "population": 822553},
{"city": "Dallas", "state": "TX", "zip": "75201", "population": 1345047},
{"city": "Denver", "state": "CO", "zip": "80202", "population": 682545},
{"city": "Detroit", "state": "MI", "zip": "48226", "population": 672662},
{"city": "El Paso", "state": "TX", "zip": "79901", "population": 674433},
{"city": "Fort Worth", "state": "TX", "zip": "76102", "population": 792727},
{"city": "Fresno", "state": "CA", "zip": "93721", "population": 509924},
{"city": "Houston", "state": "TX", "zip": "77002", "population": 2296224},
{"city": "Indianapolis", "state": "IN", "zip": "46204", "population": 843393},
{"city": "Jacksonville", "state": "FL", "zip": "32202", "population": 842583},
{"city": "Kansas City", "state": "MO", "zip": "64102", "population": 467007},
{"city": "Las Vegas", "state": "NV", "zip": "89101", "population": 603488},
{"city": "Long Beach", "state": "CA", "zip": "90802", "population": 462257},
{"city": "Los Angeles", "state": "CA", "zip": "90001", "population": 3971883},
{"city": "Louisville", "state": "KY", "zip": "40202", "population": 609893},
{"city": "Memphis", "state": "TN", "zip": "38103", "population": 653450},
{"city": "Mesa", "state": "AZ", "zip": "85201", "population": 508958},
{"city": "Miami", "state": "FL", "zip": "33128", "population": 463347},
{"city": "Milwaukee", "state": "WI", "zip": "53202", "population": 594833},
{"city": "Minneapolis", "state": "MN", "zip": "55402", "population": 410939},
{"city": "Nashville", "state": "TN", "zip": "37203", "population": 654610},
{"city": "New York", "state": "NY", "zip": "10007", "population": 8405837},
{"city": "Newark", "state": "NJ", "zip": "07102", "population": 281944},
{"city": "Oakland", "state": "CA", "zip": "94607", "population": 406253},
{"city": "Oklahoma City", "state": "OK", "zip": "73102", "population": 631346},
{"city": "Omaha", "state": "NE", "zip": "68102", "population": 434353},
{"city": "Philadelphia", "state": "PA", "zip": "19107", "population": 1526006},
{"city": "Phoenix", "state": "AZ", "zip": "85003", "population": 1445632},
{"city": "Pittsburgh", "state": "PA", "zip": "15222", "population": 305841},
{"city": "Portland", "state": "OR", "zip": "97201", "population": 609456},
{"city": "Raleigh", "state": "NC", "zip": "27601", "population": 403892},
{"city": "Sacramento", "state": "CA", "zip": "95814", "population": 479686},
{"city": "San Antonio", "state": "TX", "zip": "78205", "population": 1327407},
{"city": "San Diego", "state": "CA", "zip": "92101", "population": 1307402},
{"city": "San Francisco", "state": "CA", "zip": "94102", "population": 805235},
{"city": "San Jose", "state": "CA", "zip": "95113", "population": 998537},
{"city": "Seattle", "state": "WA", "zip": "98101", "population": 608660},
{"city": "St. Louis", "state": "MO", "zip": "63102", "population": 319294},
{"city": "Tampa", "state": "FL", "zip": "33602", "population": 335709},
{"city": "Tucson", "state": "AZ", "zip": "85701", "population": 520116},
{"city": "Virginia Beach", "state": "VA", "zip": "23451", "population": 448479},
{"city": "Washington", "state": "DC", "zip": "20001", "population": 601723},
]
return cities
# write a function to return a random city
# accept a random value between 0 and 1 as an input parameter
def get_city(rnd_value):
for city in cities_final:
if rnd_value >= city["pcnt_running_total"]:
return city
# Write a function to return a random property type.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 63% Single-family, 26% Multi-family, 4% Condo,
# 3% Townhouse, 2% Mobile home, 1% Farm, 1% Other.
def get_property_type(rnd_value):
if rnd_value < 0.63:
return "Single-family"
elif rnd_value < 0.89:
return "Multi-family"
elif rnd_value < 0.93:
return "Condo"
elif rnd_value < 0.96:
return "Townhouse"
elif rnd_value < 0.98:
return "Mobile home"
elif rnd_value < 0.99:
return "Farm"
else:
return "Other"
# Create a function to write the address records to a csv file called 'address_data.csv'.
# Use an input parameter to specify the number of records to write.
# The csv file must have a header row and be comma delimited.
# All string values must be enclosed in double quotes.
def write_data(rec_count):
address_id = 0
with open("output/address_data.csv", "w", newline="") as csv_file:
csv_writer = csv.writer(
csv_file, delimiter=",", quotechar='"', quoting=csv.QUOTE_NONNUMERIC
)
csv_writer.writerow(
[
"id",
"address",
"city",
"state",
"zip",
"country",
"property_type",
"assessed_value",
]
)
for i in range(rec_count):
address_id += 1
street_name = get_street_name()
street_type = get_street_type()
street_address = f"{random.randint(1, 9999)} {street_name} {street_type}"
city_state_zip = get_city(random.random())
country = "United States"
property_type = get_property_type(random.random())
assessed_value = random.randint(52500, 1950000)
csv_writer.writerow(
[
address_id,
street_address,
city_state_zip["city"],
city_state_zip["state"],
city_state_zip["zip"],
country,
property_type,
assessed_value,
]
)
if __name__ == "__main__":
main()

To make this example more realistic, you could use Copilot’s assistance to write algorithms capable of accurately reflecting assessed property values based on the type of residence and the zip code.

Here is an example of the synthetic US-based residential address data output by the example application:

id address city state zip country property_type assessed_value
1 1008 Walk Burg Houston TX 77002 United States Multi-family 1122321
2 7088 Second Square Oklahoma City OK 73102 United States Single-family 261940
3 1425 Ridge Terrace Indianapolis IN 46204 United States Single-family 1030391
4 982 Way Lane New York NY 10007 United States Multi-family 95499
5 9404 Port Court Columbus OH 43215 United States Single-family 922404
6 7135 Crossing Trail Virginia Beach VA 23451 United States Single-family 272910
7 9481 Harbor Brook New York NY 10007 United States Multi-family 232795
8 8585 Manor Branch Raleigh NC 27601 United States Single-family 701217
9 7703 Bluff Boulevard Las Vegas NV 89101 United States Single-family 530581
10 4186 Worth Circle New York NY 10007 United States Townhouse 626577
11 8739 Prairie Trail Portland OR 97201 United States Single-family 316167
12 4 Shores Road Virginia Beach VA 23451 United States Single-family 925711
13 3148 Rocks Road Mesa AZ 85201 United States Single-family 182479
14 1545 Wells Drive Chicago IL 60602 United States Multi-family 1000777
15 1699 Ash Brook Seattle WA 98101 United States Single-family 605392

Example #3: Demographic Data

We could use the same techniques again to generate synthetic demographic data. With the assistance of Copilot, we can write functions that randomly return typical feminine or masculine first names (forenames) and common last names (surnames) found in the United States, for this use case.

# Write a function that generates a list of common feminine first names in the United States.
# List should be in alphabetical order.
# Each name should be unique.
# Return random first name.
def get_first_name_feminine():
first_name_feminine = [
"Alice", "Amanda", "Amy", "Angela", "Ann", "Anna", "Barbara", "Betty",
"Brenda", "Carol", "Carolyn", "Catherine", "Christine", "Cynthia", "Deborah", "Debra",
"Diane", "Donna", "Doris", "Dorothy", "Elizabeth", "Frances", "Gloria", "Heather",
"Helen", "Janet", "Jennifer", "Jessica", "Joyce", "Julie", "Karen", "Kathleen", "Kimberly",
]

return random.choice(first_name_feminine)

With the assistance of Copilot, we can also write functions that return demographic information, such as age, gender, race, marital status, religion, and political affiliation. Similar to the previous sales data example, we can influence the final synthetic dataset based on categorical distributions of different demographic categories, for instance, with the prompt:

# Write a function that returns a person's martial status.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 50% Married, 33% Single, 17% Unknown.
def get_martial_status(rnd_value):
if rnd_value < 0.50:
return "Married"
elif rnd_value < 0.83:
return "Single"
else:
return "Unknown"

By altering the categorical distributions, we can quickly alter the resulting synthetic dataset to reflect differing demographic characteristics: an older or younger population, the predominance of a single race, religious affiliation, or marital status, or the ratio of males to females.

Next, we can use a Gaussian distribution (aka normal distribution) to return the year of birth in a bell-shaped curve, given a mean year and a standard deviation, using Python’s random.normalvariate function.

# Write a function that generates a normal distribution of date of births.
# with a mean year of 1975 and a standard deviation of 10.
# Return random date of birth as a string in the format YYYY-MM-DD
def get_dob():
day_of_year = random.randint(1, 365)
year_of_birth = int(random.normalvariate(1975, 10))
dob = date(int(year_of_birth), 1, 1) + timedelta(day_of_year - 1)
dob = dob.strftime("%Y-%m-%d")
return dob

Here is the complete program that creates synthetic demographic data with Copilot’s assistance:

# Purpose: Generate demographic data
# Author: Gary A. Stafford and GitHub Copilot
# Date: 2023-04-14
# Usage: python3 demographic_data_gen.py 100
# Command-line argument(s): rec_count (number of records to generate as an integer)
# Write an application that creates a file containing demographic data.
# The application should accept a command line argument that specifies the number of records to generate.
# The application should write the demographic data to a file called 'demographic_data.csv'.
# The application should contain the following functions:
# – main() function that calls the other functions
# – function that returns a random first name
# – function that returns a random last name
# – function that returns a random date of birth
# – function that returns a random gender
# – function that returns a random religious affiliation
# – function that returns a random race
import random
import argparse
import csv
from datetime import date, timedelta
def main():
parser = argparse.ArgumentParser(description="Generate demographic data")
parser.add_argument(
"rec_count", type=int, help="The number of records to generate", default=100
)
rec_count = parser.parse_args().rec_count
write_data(rec_count)
# Write a function that generates a list of common feminine first names in the United States.
# List should be in alphabetical order.
# Each name should be unique.
# Return random first name.
def get_first_name_feminine():
first_name_feminine = [
"Alice", "Amanda", "Amy", "Angela", "Ann", "Anna", "Barbara", "Betty",
"Brenda", "Carol", "Carolyn", "Catherine", "Christine", "Cynthia", "Deborah", "Debra",
"Diane", "Donna", "Doris", "Dorothy", "Elizabeth", "Frances", "Gloria", "Heather",
"Helen", "Janet", "Jennifer", "Jessica", "Joyce", "Julie", "Karen", "Kathleen", "Kimberly",
"Laura", "Linda", "Lisa", "Margaret", "Maria", "Marie", "Martha", "Mary", "Melissa",
"Michelle", "Nancy", "Pamela", "Patricia", "Rebecca", "Ruth", "Sandra", "Sarah", "Sharon",
"Shirley", "Stephanie", "Susan", "Teresa", "Virginia",
]
return random.choice(first_name_feminine)
# Write a function that generates a list of common masculine first names in the United States.
# List should be in alphabetical order.
# Each name should be unique.
# Return random first name.
def get_first_name_masculine():
first_names_masculine = [
"Adams", "Alexander", "Allen", "Anderson", "Bailey", "Baker", "Barnes", "Bell",
"Bennett", "Brooks", "Brown", "Bryant", "Butler", "Campbell", "Carter", "Clark",
"Coleman", "Collins", "Cook", "Cooper", "Cox", "Davis", "Edwards", "Evans",
"Flores", "Foster", "Garcia", "Gonzales", "Gonzalez", "Gray", "Green", "Griffin",
"Hall", "Harris", "Henderson", "Hernandez", "Hill", "Howard", "Hughes", "Jackson",
"James", "Jenkins", "Johnson", "Jones", "Kelly", "King", "Lee", "Lewis", "Long",
"Lopez", "Martin", "Martinez", "Miller", "Mitchell", "Moore", "Morgan", "Morris",
"Murphy", "Nelson", "Parker", "Patterson", "Perez", "Perry", "Peterson", "Phillips",
"Powell", "Price", "Ramirez", "Reed", "Richardson", "Rivera", "Roberts", "Robinson",
"Rodriguez", "Rogers", "Ross", "Russell", "Sanchez", "Sanders", "Scott", "Simmons",
"Smith", "Stewart", "Taylor", "Thomas", "Thompson", "Torres", "Turner", "Walker",
"Ward", "Washington", "Watson", "White", "Williams", "Wilson", "Wood", "Wright", "Young"
]
return random.choice(first_names_masculine)
# Write a function that returns a person's gender.
# Return a random gender.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 53% Male, 40% Female, 6% Other, 1% Transgender
def get_gender(rnd_value):
if rnd_value < 0.53:
return "Male"
elif rnd_value < 0.93:
return "Feamle"
elif rnd_value < 0.99:
return "Other"
else:
return "Transgener"
# Write a function that returns a feminine or masculine first name.
# Return random first name.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 53% chance of being feminine, 40% chance of being masculine
def get_first_name(rnd_value):
if rnd_value < 0.53:
return get_first_name_masculine()
elif rnd_value < 0.93:
return get_first_name_feminine()
elif rnd_value < 0.97:
return get_first_name_masculine()
else:
return get_first_name_feminine()
# Write a function that generates a list of common last names in the United States.
# List should be in alphabetical order.
# Each name should be unique.
# Return random last name.
def get_last_name():
last_names = [
"Adams", "Alexander", "Allen", "Anderson", "Bailey", "Baker", "Barnes", "Bell",
"Bennett", "Brooks", "Brown", "Bryant", "Butler", "Campbell", "Carter", "Clark",
"Coleman", "Collins", "Cook", "Cooper", "Cox", "Davis", "Edwards", "Evans", "Flores",
"Foster", "Garcia", "Gonzales", "Gonzalez", "Gray", "Green", "Griffin", "Hall",
"Harris", "Henderson", "Hernandez", "Hill", "Howard", "Hughes", "Jackson", "James",
"Jenkins", "Johnson", "Jones", "Kelly", "King", "Lee", "Lewis", "Long", "Lopez",
"Martin", "Martinez", "Miller", "Mitchell", "Moore", "Morgan", "Morris", "Murphy",
"Nelson", "Parker", "Patterson", "Perez", "Perry", "Peterson", "Phillips", "Powell",
"Price", "Ramirez", "Reed", "Richardson", "Rivera", "Roberts", "Robinson", "Rodriguez",
"Rogers", "Ross", "Russell", "Sanchez", "Sanders", "Scott", "Simmons", "Smith", "Stewart",
"Taylor", "Thomas", "Thompson", "Torres", "Turner", "Walker", "Ward", "Washington", "Watson",
"White", "Williams", "Wilson", "Wood", "Wright", "Young",
]
return random.choice(last_names)
# Write a function that returns a martial status.
# Return random martial status.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 50% Married, 33% Single, 17% Unknown
def get_martial_status(rnd_value):
if rnd_value < 0.50:
return "Married"
elif rnd_value < 0.83:
return "Single"
else:
return "Unknown"
# Write a function that returns a person's race.
# Return random race.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 58% White, 19% Hispanic, 12% Black, 6% Asian, 4% Multiracial
def get_race(rnd_value):
if rnd_value < 0.58:
return "White"
elif rnd_value < 0.77:
return "Hispanic"
elif rnd_value < 0.89:
return "Black"
elif rnd_value < 0.95:
return "Asian"
else:
return "Multiracial"
# Write a function that returns a person's religious affiliation.
# Return random regilion.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 70% Christian, 20% Agnostic, 3% Atheist, 2% Jewish,
# 2% Other, 1% Muslim, 1% Hindu, 1% Buddhist
def get_regilion(rnd_value):
if rnd_value < 0.7:
return "Christian"
elif rnd_value < 0.9:
return "Agnostic"
elif rnd_value < 0.93:
return "Atheist"
elif rnd_value < 0.95:
return "Jewish"
elif rnd_value < 0.97:
return "Other"
elif rnd_value < 0.98:
return "Muslim"
elif rnd_value < 0.99:
return "Hindu"
else:
return "Buddhist"
# Write a function that returns a person's gender.
# Return a random gender.
# Accept a random value between 0 and 1 as an input parameter.
# The function must return one of the following values based on the %:
# 53% Male, 40% Female, 6% Other, 1% Transgender
def get_gender(rnd_value):
if rnd_value < 0.53:
return "Male"
elif rnd_value < 0.93:
return "Feamle"
elif rnd_value < 0.99:
return "Other"
else:
return "Transgener"
# Write a function that generates a normal distribution of ages.
# Using normalvariate() from the random module
# with a mean of 40 and a standard deviation of 10.
# Return random age as an integer.
def get_age():
return int(random.normalvariate(40, 10))
# Write a function that generates a normal distribution of date of births.
# with a mean year of 1975 and a standard deviation of 10.
# Return random date of birth as a string in the format YYYY-MM-DD
def get_dob():
day_of_year = random.randint(1, 365)
year_of_birth = int(random.normalvariate(1975, 10))
dob = date(int(year_of_birth), 1, 1) + timedelta(day_of_year – 1)
dob = dob.strftime("%Y-%m-%d")
return dob
# Create a function to write the demographic records to a csv file called 'demographic_data.csv'.
# Use an input parameter to specify the number of records to write.
# The csv file must have a header row and be comma delimited.
# String values must be enclosed in double quotes.
def write_data(rec_count):
id = 0
with open("output/demographic_data.csv", "w", newline="") as csv_file:
csv_writer = csv.writer(
csv_file, delimiter=",", quotechar='"', quoting=csv.QUOTE_NONNUMERIC
)
csv_writer.writerow(
[
"id",
"first_name",
"last_name",
"dob",
"gender",
"martital_status",
"race",
"religion"
]
)
for i in range(rec_count):
rnd_gender = random.random()
id += 1
first_name = get_first_name(rnd_gender)
last_name = get_last_name()
dob = get_dob()
gender = get_gender(rnd_gender)
martial_status = get_martial_status(random.random())
race = get_race(random.random())
religion = get_regilion(random.random())
csv_writer.writerow(
[
id,
first_name,
last_name,
dob,
gender,
martial_status,
race,
religion
]
)
if __name__ == "__main__":
main()

To make this example more realistic, you could use Copilot’s assistance to write algorithms capable of more accurately representing the nuanced associations and correlations between age, gender, race, marital status, religion, and political affiliation.

Here is an example of the synthetic demographic data output by the example application:

user_id first_name last_name dob gender martital_status race religion
1 Thomas Powell 1967-06-10 Male Married Black Christian
2 Ward Williams 1973-07-22 Male Single Asian Christian
3 Martha Watson 1975-02-28 Feamle Single Hispanic Agnostic
4 Brenda Bailey 1979-07-07 Feamle Married Black Christian
5 Parker Johnson 1955-07-14 Male Married White Christian
6 Rebecca Wilson 1972-05-27 Feamle Married White Christian
7 Doris Allen 1956-07-09 Feamle Married Multiracial Christian
8 Rebecca Sanchez 1965-09-16 Feamle Single White Christian
9 Mary Johnson 1971-04-04 Feamle Single White Christian
10 Anderson Roberts 1983-11-02 Male Single Hispanic Christian
11 Robinson Peterson 1974-10-25 Male Single White Jewish
12 Lopez Ross 1985-04-30 Male Married White Christian
13 Flores Reed 1977-09-01 Male Married Black Agnostic
14 Martin Phillips 1960-03-24 Male Single White Christian
15 Carter Torres 1983-07-31 Male Married White Christian

Generative AI Tools for Unit Testing

In addition to writing code and documentation, a common use of generative AI code assistants like Copilot is unit tests. For example, we can create unit tests for each function in our coffee shop sales data code generator, using the same method of prompting with code comments.

Copilot can also assist with writing unit tests

Conclusion

In this post, we learned how Generative AI could assist us in creating synthetic data for software development, analytics, and machine learning. The examples herein generated data using simple techniques. Using advanced modeling techniques, we could generate increasingly complex, realistic synthetic data.

To learn about other ways Generative AI can be used to assist in writing code, please read my previous article, Ten Ways to Leverage Generative AI for Development on AWS.

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Navigating the World of Generative AI: A Guide to Essential Terminology

Learn the essential terms and concepts you need to know to navigate the rapidly evolving world of generative AI

Licensed image: MMD Creative/Shutterstock.com
Licensed image: MMD Creative/Shutterstock.com

Generative AI is a fascinating and rapidly evolving field that has the potential to transform the way we interact with technology. However, with so much buzz and hype surrounding this topic, making sense of it all can be challenging. In this article, we’ll cut through the noise and gain a clear understanding of the essential terminology you need to know to navigate the world of generative AI.

According to a variety of sources, including McKinsey & Company and Vox Media, the critical difference between generative AI and other emerging technologies is that millions of people can — and already are — using it to create new content, such as text, photos, video, code, and 3D renderings, from data it is trained on. Recent breakthroughs in the field have the potential to drastically change the way we approach content creation. This has led to widespread excitement and some understandable apprehension about the potential for generative AI to impact virtually every aspect of society and disrupt industries, including media and entertainment, healthcare and life sciences, education, advertising, legal services, and finance.

Even if your current role is not in technology, it is highly likely that generative AI will have a direct impact on both your personal and professional life.

Gary Stafford

Even if your current role is not in technology, it is highly likely that generative AI will have a direct impact on both your personal and professional life. Familiarizing yourself with basic terminology related to generative AI can help you better comprehend the discussions on social media and in the news.

Terminology

Let’s explore the following terminology (in alphabetical order):

Below is a knowledge graph, created with OpenAI ChatGPT, showing the approximate relationships between the post’s terms.

Knowledge graph showing relationships between the post’s terminology (image by author)
Knowledge graph showing relationships between the post’s terminology (image by author)

Artificial General Intelligence (AGI)

According to the all-new Bing Chat, based on ChatGPT, artificial general intelligence (AGI) is the ability of an intelligent agent to understand or learn any intellectual task that human beings or other animals can. It is a primary goal of some artificial intelligence research and a common topic in science fiction and Futurism. According to Forbes, which also prompted ChatGPT, Artificial General Intelligence (AGI) refers to a theoretical type of artificial intelligence that possesses human-like cognitive abilities, such as the ability to learn, reason, solve problems, and communicate in natural language.

Eliezer Yudkowsky is an American researcher, writer, and philosopher on the topic of AI. The podcast Eliezer Yudkowsky: Dangers of AI and the End of Human Civilization, by prominent MIT Research Scientist Lex Fridman, explores various aspects of artificial general intelligence against the backdrop of the recent release of OpenAI’s GPT-4.

Artificial Intelligence (AI)

According to the Brookings Institute, AI is generally thought to refer to machines that respond to stimulation consistent with traditional responses from humans, given the human capacity for contemplation, judgment, and intention. Similarly, according to the U.S. Department of State, the term artificial intelligence refers to a machine-based system that can, for a given set of human-defined objectives, make predictions, recommendations, or decisions influencing real or virtual environments.

Relationship between AI, ML, and DL (image by author)
Relationship between AI, ML, and DL (image by author)

ChatGPT

ChatGPT, according to ChatGPT, is a large language model developed by OpenAI. I was trained on a massive dataset of human-written text using a deep neural network (DNN) architecture called GPT (Generative Pre-trained Transformer). Its purpose is to generate human-like responses to questions and prompts, engage in conversations, and perform various language-related tasks. It is a virtual assistant capable of understanding and generating natural language.

DALL·E

According to Wikipedia, DALL·E is a deep learning model developed by OpenAI to generate digital images from natural language descriptions, called prompts. DALL·E is a portmanteau of the names of the animated robot Pixar character WALL-E and the Spanish surrealist artist Salvador Dalí. According to OpenAI, DALL·E is an AI system that can create realistic images and art from a description in natural language. OpenAI introduced DALL·E in January 2021. One year later, in April 2022, they announced their newest system, DALL·E 2, which generates more realistic and accurate images with 4x greater resolution. DALL·E 2 can create original, realistic images and art from a text description. It can combine concepts, attributes, and styles.

Deep Learning (DL)

According to IBM, deep learning is a subset of machine learning (ML), which is essentially a neural network with three or more layers. These neural networks attempt to simulate the behavior of the human brain — albeit far from matching its ability — allowing it to “learn” from large amounts of data. While a neural network with a single layer can still make approximate predictions, additional hidden layers can help to optimize and refine for accuracy.

Generative AI

According to Wikipedia, generative artificial intelligence (AI), aka generative AI, is a type of AI system capable of generating text, images, or other media in response to prompts. Generative AI systems use generative models such as large language models (LLMs) to statistically sample new data based on the training data set used to create them.

Generative Pre-trained Transformer (GPT)

According to ChatGPT, Generative Pre-trained Transformer (GPT) is a deep learning architecture used for natural language processing (NLP) tasks, such as text generation, summarization, and question-answering. It uses a transformer neural network architecture with a self-attention mechanism, allowing the model to understand each word’s context in a sentence or text. The success of GPT models lies in their ability to generate natural-sounding and coherent text similar to human-written language. The term “pre-trained” refers to the fact that the model is trained on large amounts of unlabeled text data, such as books or web pages, to learn general language patterns and features, before being fine-tuned on smaller labeled datasets for specific tasks.

According to ZDNET, GPT-4, announced on March 14, 2023, is the newest version of OpenAI’s language model systems. Its previous version, GPT 3.5, powered the company’s wildly popular ChatGPT chatbot when it launched in November 2022. According to OpenAI, GPT-4 is the latest milestone in OpenAI’s effort to scale up deep learning. GPT-4 is a large multimodal model (accepting image and text inputs, emitting text outputs) that, while less capable than humans in many real-world scenarios, exhibits human-level performance on various professional and academic benchmarks.

Intelligence Amplification

According to Wikipedia, intelligence amplification (IA) (aka cognitive augmentation, machine-augmented intelligence, or enhanced intelligence) refers to the effective use of information technology in augmenting human intelligence. Similarly, Harvard Business Review describes intelligence amplification as the use of technology to augment human intelligence. And a paradigm shift is on the horizon, where new devices will offer less intrusive, more intuitive ways to amplify our intelligence.

In his latest book, Impromptu: Amplifying Our Humanity Through AI, co-authored by ChatGPT-4, Greylock general partner Reid Hoffman discusses the subject of intelligence amplification and AI’s ability to amplify human ability. The topic was also explored in Hoffman’s interview with OpenAI CEO Sam Altman on Greylock’s podcast series AI Field Notes.

Large Language Model (LLM)

According to Wikipedia, a large language model (LLM) is a language model consisting of a neural network with many parameters (typically billions of weights or more), trained on large quantities of unlabelled text using self-supervised learning. Though the term large language model has no formal definition, it often refers to deep learning models having a parameter count on the order of billions or more.

Machine Learning (ML)

According to MIT, machine learning (ML) is a subfield of artificial intelligence, which is broadly defined as the capability of a machine to imitate intelligent human behavior. Artificial intelligence systems are used to perform complex tasks in a way that is similar to how humans solve problems. The function of a machine learning system can be descriptive, meaning that the system uses the data to explain what happened; predictive, meaning the system uses the data to predict what will happen; or prescriptive, meaning the system will use the data to make suggestions about what action to take.

Neural Network

According to MathWorks, a neural network (aka artificial neural network or ANN) is an adaptive system that learns by using interconnected nodes or neurons in a layered structure that resembles a human brain. A neural network can learn from data to be trained to recognize patterns, classify data, and forecast future events. Similarly, according to AWS, a neural network is a method in artificial intelligence that teaches computers to process data in a way that is inspired by the human brain. It is a type of machine learning process called deep learning that uses interconnected nodes or neurons in a layered structure that resembles the human brain. It creates an adaptive system that computers use to learn from their mistakes and improve continuously.

Types of Neural Networks

Deep neural networks (DNNs) are improved versions of conventional artificial neural networks (ANNs) with multiple layers. While ANNs consist of one or two hidden layers to process data, DNNs contain multiple layers between the input and output layers. Convolutional neural networks (CNNs) are another kind of DNN. CNNs have a convolution layer, which uses filters to convolve an area of input data into a smaller area, detecting important or specific parts within the area. Recurrent neural networks (RNNs) can be considered a type of DNN. DNNs are neural networks with multiple layers between the input and output layers. RNNs can have multiple layers and can be used to process sequential data, making them a type of DNN.

Relationship between types of neural networks (image by author)
Relationship between types of neural networks (image by author)

OpenAI

OpenAI is a San Francisco-based AI research and deployment company whose mission is to “ensure that artificial general intelligence benefits all of humanity.” According to Wikipedia, OpenAI was founded in 2015 by current CEO Sam Altman, Greylock general partner Reid Hoffman, Y Combinator founding partner Jessica Livingston, Elon Musk, Ilya Sutskever, Peter Thiel, and others. OpenAI’s current products include GPT-4, DALL·E 2, Whisper, ChatGPT, and OpenAI Codex.

Relationship of OpenAI to adjacent concepts (image by author)
Relationship of OpenAI to adjacent concepts (image by author)

Prompt Engineering

According to Cohere, prompting (aka prompt engineering) is at the heart of working with LLMs. The prompt provides context for the text we want the model to generate. The prompts we create can be anything from simple instructions to more complex pieces of text, and they are used to encourage the model to produce a specific type of output. Cohere’s Generative AI with Cohere blog post series is an excellent resource on the topic of Prompting. Similarly, according to Dataconomy, using prompts to get the desired result from an AI tool is known as AI prompt engineering. A prompt can be a statement or a block of code, but it can also just be a string of words. Similar to how you may prompt a person as a starting point for writing an essay, you can use prompts to teach an AI model to produce the desired results when given a specific task.

Reinforcement Learning with Human Feedback (RLHF)

According to Scale AI in their blog, Why is ChatGPT so good?, instead of simply predicting the next word(s), large language models (LLMs) can now follow human instructions and provide useful responses. These advancements are made possible by fine-tuning them with specialized instruction datasets and a technique called reinforcement learning with human feedback (RLHF). Similarly, according to Hugging Face, RLHF (aka RL from human preferences) uses methods from reinforcement learning to directly optimize LLMs with human feedback. RLHF has enabled language models to begin to align a model trained on a geprogneral corpus of text data to that of complex human values.

Ready for More?

Mastered all the terminology, ready for more? Here are some additional generative AI terms for you to learn:

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , , ,

Leave a comment

Ten Ways to Leverage Generative AI for Development on AWS

Explore ten ways you can use Generative AI coding tools to accelerate development and increase your productivity on AWS

Licensed image: PopTika/Shutterstock.com
Licensed image: PopTika/Shutterstock.com

Generative AI coding tools are a new class of software development tools that leverage machine learning algorithms to assist developers in writing code. These tools use AI models trained on vast amounts of code to offer suggestions for completing code snippets, writing functions, and even entire blocks of code.

Quote generated by OpenAI ChatGPT

Introduction

Combining the latest Generative AI coding tools with a feature-rich and extensible IDE and your coding skills will accelerate development and increase your productivity. In this post, we will look at ten examples of how you can use Generative AI coding tools on AWS:

  1. Application Development: Code, unit tests, and documentation
  2. Infrastructure as Code (IaC): AWS CloudFormation, AWS CDK, Terraform, and Ansible
  3. AWS Lambda: Serverless, event-driven functions
  4. IAM Policies: AWS IAM policies and Amazon S3 bucket policies
  5. Structured Query Language (SQL): Amazon RDS, Amazon Redshift, Amazon Athena, and Amazon EMR
  6. Big Data: Apache Spark and Flink on Amazon EMR, AWS Glue, and Kinesis Data Analytics
  7. Configuration and Properties files: Amazon MSK, Amazon EMR, and Amazon OpenSearch
  8. Apache Airflow DAGs: Amazon MWAA
  9. Containerization: Kubernetes resources, Helm Charts, Dockerfiles for Amazon EKS
  10. Utility Scripts: PowerShell, Bash, Shell, and Python

Choosing a Generative AI Coding Tool

In my recent post, Accelerating Development with Generative AI-Powered Coding Tools, I reviewed six popular tools: ChatGPT, Copilot, CodeWhisperer, Tabnine, Bing, and ChatSonic.

For this post, we will use GitHub Copilot, powered by OpenAI Codex, a new AI system created by OpenAI. Copilot suggests code and entire functions in real-time, right from your IDE. Copilot is trained in all languages that appear in GitHub’s public repositories. GitHub points out that the quality of suggestions you receive may depend on the volume and diversity of training data for that language. Similar tools in this category are limited in the number of languages they support compared to Copilot.

GitHub Copilot, your AI pair programmer

Copilot is currently available as an extension for Visual Studio Code, Visual Studio, Neovim, and JetBrains suite of IDEs. The GitHub Copilot extension for Visual Studio Code (VS Code) already has 4.8 million downloads, and the GitHub Copilot Nightly extension, used for this post, has almost 280,000 downloads. I am also using the GitHub Copilot Labs extension in this post.

GitHub Copilot Nightly VS Code extensions used in this post

Ten Ways to Leverage Generative AI

Take a look at ten examples of how you can use Generative AI coding tools to increase your development productivity on AWS. All the code samples in this post can be found on GitHub.

1. Application Development

According to GitHub, trained on billions of lines of code, GitHub Copilot turns natural language prompts into coding suggestions across dozens of languages. These features make Copilot ideal for developing applications, writing unit tests, and authoring documentation. You can use GitHub Copilot to assist with writing software applications in nearly any popular language, including Go.

Code generation of a Go application using GitHub Copilot

The final application, which uses the AWS SDK for Go to create an Amazon DynamoDB table, shown below, was formatted using the Go extension by Google and optimized using the ‘Readable,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

The final Go application ran in the VS Code terminal

Generating Unit Tests

Using JavaScript and TypeScript, you can take advantage of TestPilot to generate unit tests based on your existing code and documentation. TestPilot, part of GitHub Copilot Labs, uses GitHub Copilot’s AI technology.

Generating unit tests with GitHub TestPilot, part of Copilot

2. Infrastructure as Code (IaC)

Widespread Infrastructure as Code (IaC) tools include Pulumi, AWS CloudFormation, Azure ARM Templates, Google Deployment Manager, AWS Cloud Development Kit (AWS CDK), Microsoft Bicep, and Ansible. Many IaC tools, except AWS CDK, use JSON- or YAML-based domain-specific languages (DSLs).

AWS CloudFormation
AWS CloudFormation is an Infrastructure as Code (IaC) service that allows you to easily model, provision, and manage AWS and third-party resources. The CloudFormation template is a JSON or YAML formatted text file. You can use GitHub Copilot to assist with writing IaC, including AWS CloudFormation in either JSON or YAML.

Code generation of an AWS CloudFormation template using GitHub Copilot

You can use the YAML Language Support by Red Hat extension to write YAML in VS Code.

Final YAML-based AWS CloudFormation template using GitHub Copilot

VS Code has native JSON support with JSON Schema Store, which includes AWS CloudFormation. VS Code uses the CloudFormation schema for IntelliSense and flag schema errors in templates.

Final JSON-based AWS CloudFormation template using GitHub Copilot

HashiCorp Terraform

In addition to AWS CloudFormation, HashiCorp Terraform is an extremely popular IaC tool. According to HashiCorp, Terraform lets you define resources and infrastructure in human-readable, declarative configuration files and manages your infrastructure’s lifecycle. Using Terraform has several advantages over manually managing your infrastructure.

Terraform plugins called providers let Terraform interact with cloud platforms and other services via their application programming interfaces (APIs). You can use the AWS Provider to interact with the many resources supported by AWS.

Code generation of a HashiCorp Terraform file using GitHub Copilot

3. AWS Lambda

Lambda, according to AWS, is a serverless, event-driven compute service that lets you run code for virtually any application or backend service without provisioning or managing servers. You can trigger Lambda from over 200 AWS services and software as a service (SaaS) applications and only pay for what you use. AWS Lambda natively supports Java, Go, PowerShell, Node.js, C#, Python, and Ruby. AWS Lambda also provides a Runtime API allowing you to use additional programming languages to author your functions.

You can use GitHub Copilot to assist with writing AWS Lambda functions in any of the natively supported languages. You can further optimize the resulting Lambda code with GitHub’s Code Brushes.

Code generation of a Python-based AWS Lambda using GitHub Copilot

The final Python-based AWS Lambda, below, was formatted using the Black Formatter and Flake8 extensions and optimized using the ‘Readable,’ ‘Debug,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

Final AWS Python-based Lambda using GitHub Copilot and optimized with Code Brushes

You can easily convert the Python-based AWS Lambda to Java using GitHub Copilot Lab’s ability to translate code between languages. Install the GitHub Copilot Labs extension for VS Code to try out language translation.

Final AWS Java-based Lambda using GitHub Copilot converted from Python by Copilot

4. IAM Policies

AWS Identity and Access Management (AWS IAM) is a web service that helps you securely control access to AWS resources. According to AWS, you manage access in AWS by creating policies and attaching them to IAM identities (users, groups of users, or roles) or AWS resources. A policy is an object in AWS that defines its permissions when associated with an identity or resource. IAM policies are stored on AWS as JSON documents. You can use GitHub Copilot to assist in writing IAM Policies.

Code generation of an AWS IAM Policy using GitHub Copilot

The final AWS IAM Policy, below, was formatted using VS Code’s built-in JSON support.

Final AWS IAM Policy using GitHub Copilot

5. Structured Query Language (SQL)

SQL has many use cases on AWS, including Amazon Relational Database Service (RDS) for MySQL, PostgreSQL, MariaDB, Oracle, and SQL Server databases. SQL is also used with Amazon Aurora, Amazon Redshift, Amazon Athena, Apache Presto, Trino (PrestoSQL), and Apache Hive on Amazon EMR.

You can use IDEs like VS Code with its SQL dialect-specific language support and formatted extensions. You can further optimize the resulting SQL statements with GitHub’s Code Brushes.

Code generation of a PostgreSQL script using GitHub Copilot

The final PostgreSQL script, below, was formatted using the Sql Formatter extension and optimized using the ‘Readable’ and ‘Fix Bug’ GitHub Code Brushes.

PostgreSQL script generated with GitHub Copilot and optimized with Code Brushes

6. Big Data

Big Data, according to AWS, can be described in terms of data management challenges that — due to increasing volume, velocity, and variety of data — cannot be solved with traditional databases. AWS offers managed versions of Apache Spark, Apache Flink, Apache Zepplin, and Jupyter Notebooks on Amazon EMR, AWS Glue, and Amazon Kinesis Data Analytics (KDA).

Apache Spark
According to their website, Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters. Spark jobs can be written in various languages, including Python (PySpark), SQL, Scala, Java, and R. Apache Spark is available on a growing number of AWS services, including Amazon EMR and AWS Glue.

Code generation of a Python-based Apache Spark job using GitHub Copilot

The final Python-based Apache Spark job, below, was formatted using the Black Formatter extension and optimized using the ‘Readable,’ ‘Document,’ ‘Make Robust,’ and ‘Fix Bug’ GitHub Code Brushes.

Final Python-based Apache Spark job using GitHub Copilot

7. Configuration and Properties Files

According to TechTarget, a configuration file (aka config) defines the parameters, options, settings, and preferences applied to operating systems, infrastructure devices, and applications. There are many examples of configuration and properties files on AWS, including Amazon MSK Connect (Kafka Connect Source/Sink Connectors), Amazon OpenSearch (Filebeat, Logstash), and Amazon EMR (Apache Log4j, Hive, and Spark).

Kafka Connect
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. AWS offers a fully-managed version of Kafka Connect: Amazon MSK Connect. You can use GitHub Copilot to write Kafka Connect Source and Sink Connectors with Kafka Connect and Amazon MSK Connect.

Code generation of a Kafka Connect Source Connector using GitHub Copilot

The final Kafka Connect Source Connector, below, was formatted using VS Code’s built-in JSON support. It incorporates the Debezium connector for MySQL, Avro file format, schema registry, and message transformation. Debezium is a popular open source distributed platform for performing change data capture (CDC) with Kafka Connect.

Final Kafka Connect Source Connector using GitHub Copilot

8. Apache Airflow DAGs

Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow. You can use GitHub Copilot to assist in writing DAGs for Apache Airflow, to be used with Amazon MWAA.

Code generation of an Airflow DAG using GitHub Copilot

The final Python-based Apache Spark job, below, was formatted using the Black Formatter extension. Unfortunately, based on my testing, code optimization with GitHub’s Code Brushes is impossible with Airflow DAGs.

Final Airflow DAG using GitHub Copilot

9. Containerization

According to Check Point Software, Containerization is a type of virtualization in which all the components of an application are bundled into a single container image and can be run in isolated user space on the same shared operating system. Containers are lightweight, portable, and highly conducive to automation. AWS describes containerization as a software deployment process that bundles an application’s code with all the files and libraries it needs to run on any infrastructure.

AWS has several container services, including Amazon Elastic Container Service (Amazon ECS), Amazon Elastic Kubernetes Service (Amazon EKS), Amazon Elastic Container Registry (Amazon ECR), and AWS Fargate. Several code-based resources can benefit from a Generative AI coding tool like GitHub Copilot, including Dockerfiles, Kubernetes resources, Helm Charts, Weaveworks Flux, and ArgoCD configuration.

Kubernetes

Kubernetes objects are represented in the Kubernetes API and expressed in YAML format. Below is a Kubernetes Deployment resource file, which creates a ReplicaSet to bring up multiple replicas of nginx Pods.

Code generation of Kubernetes resources using GitHub Copilot

The final Kubernetes resource file below contains Deployment and Service resources. In addition to GitHub Copilot, you can use Microsoft’s Kubernetes extension for VS Code to use IntelliSense and flag schema errors in the file.

Final Kubernetes resource file using GitHub Copilot

10. Utility Scripts

According to Bing AI — Search, utility scripts are small, simple snippets of code written as independent code files designed to perform a particular task. Utility scripts are commonly written in Bash, Shell, Python, Ruby, PowerShell, and PHP.

AWS utility scripts leverage the AWS Command Line Interface (AWS CLI) for Bash and Shell and AWS SDK for other programming languages. SDKs take the complexity out of coding by providing language-specific APIs for AWS services. For example, Boto3, AWS’s Python SDK, easily integrates your Python application, library, or script with AWS services, including Amazon S3, Amazon EC2, Amazon DynamoDB, and more.

Code generation of Python-based utility script using GitHub Copilot

An example of a Python script to calculate the total size of an Amazon S3 bucket, below, was inspired by 100daysofdevops/N-days-of-automation, a fantastic set of open source AWS-oriented automation scripts.

Final Python-based utility script using GitHub Copilot

Conclusion

In this post, you learned ten ways to leverage Generative AI coding tools like GitHub Copilot for development on AWS. You saw how combining the latest generation of Generative AI coding tools, a mature and extensible IDE, and your coding experience will accelerate development, increase productivity, and reduce cost.

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Accelerate Software Development with Six Popular Generative AI-Powered Coding Tools

Explore six popular generative AI-powered tools, including ChatGPT, Copilot, CodeWhisperer, Tabnine, Bing, and ChatSonic

Licensed image: Phonlamai Photo/Shutterstock.com
Licensed image: Phonlamai Photo/Shutterstock.com

Introduction

Modern software systems continue to grow inherently more complex over time. We have evolved from bulky monoliths to loosely coupled, event-driven, fault-tolerant, stateless, serverless, cloud-native, real-time, microservices-based, API-first, continuously deployed distributed systems, festooned with CQRS, 2PC, DDD, EDA, DOMA, Sagas, BFFs, GraphQL, gRPC, micro-frontends, contract tests, and Hexagonal architectures. Generative AI-powered coding tools do not necessarily make a developer’s job easier — they assist developers in dealing with increasing system complexity.

This post examines six popular generative AI-powered coding tools, including chat-based OpenAI ChatGPT, Microsoft’s all-new Bing Chat, and ChatSonic, as well as IDE-based Tabnine, GitHub Copilot, and Amazon CodeWhisperer (Preview). In this post, each tool will assist with developing an identical program to complete a series of common tasks on AWS. We will then compare and contrast each tool’s ease of use and the resulting code accuracy and quality.

“Generative AI coding tools are a new class of software development tools that leverage machine learning algorithms to assist developers in writing code. These tools use AI models trained on vast amounts of code to offer suggestions for completing code snippets, writing functions, and even entire blocks of code.” (quote generated by ChatGPT)

The generative AI space is evolving at a breakneck pace. Tools continue to rapidly improve their AI models, add new features, and adjust pricing. In just the short time it took to research and write this article:

Generative AI

According to McKinsey & Company in their recent article, What is generative AI?, “Generative artificial intelligence (AI) describes algorithms (such as ChatGPT) that can be used to create new content, including audio, code, images, text, simulations, and videos. Recent new breakthroughs in the field have the potential to drastically change the way we approach content creation.

Generative AI for Code Generation

According to Papers with Code, “Code Generation is an important field to predict explicit code or program structure from multimodal data sources such as incomplete code, programs in another programming language, natural language descriptions or execution examples. Code Generation tools can assist the development of automatic programming tools to improve programming productivity.” Similarly, according to MarketTechPost, “Generative AI technologies have led to a surge of interest and progress in code generation applications. These technologies use machine learning algorithms and natural language processing to assist developers in automating the time-consuming and laborious portions of coding.

Common Features

Standard features of leading generative AI-powered coding tools include the following:

  • Whole-line, full-function, and block code completion
  • Natural language to code completion
  • Code suggestions based on the model’s pre-trained dataset
  • Context-aware recommendations based on your existing code
  • Context-aware recommendations based on your code comments
  • Native integration with popular IDEs
  • Multi-language coding support
  • Respond to follow-up instructions (resulting in refinement of code)

Benefits

The benefits of generative AI-powered code generation include the following:

  • Accelerate application development
  • Improve development productivity
  • Decrease development costs
  • Produce higher-quality, more consistent code
  • Enable better code documentation and test coverage
  • Learn new programming languages and coding techniques
  • Contributes to the democratization of programming

Methods of Code Generation

As you explore different generative AI tools for writing code, you will likely develop techniques for generating effective code based on the tool.

Long-form Narrative Approach

With chat-based tools like OpenAI ChatGPT, Microsoft Bing Chat, and ChatSonic, developers might use a longer narrative-type instruction (aka one-shot approach) rather than a series of more concise instructions to generate the code. For example:

  • Write a Python script to create a new DynamoDB table with a partition key of username, a sort key of last_name, and provisioned throughput of 5 RCUs and 5 WCUs. Pass the table name into the function as a variable. Wait until the table exists before continuing. Include try/except blocks and logging. If the table fails to be created, log a FATAL error and exit, and if the table already exists, log a WARNING. Add a main function and wrap everything in a Class called DynamoUtilities.
  • Add three new functions, including a function to insert a new item into a table, a function to retrieve an item from a table, and a function to delete a table.

Below is an example of OpenAI ChatGPT’s response using this approach. Of course, this method doesn’t limit you from following up with additional instructions to further enhance and refine the code.

Python script generated by OpenAI ChatGPT

Progressive Approach

Although the previous approach makes for an impressive demonstration, that method of writing code differs from how most developers work. Instead, a developer might use a progressive approach (aka chain of thought) to generate and refine the code with chat-based tools. In my tests with chat-based tools, I found a progressive approach of asking multiple, concise instructions to guide the model toward the desired behavior produced higher-quality code with fewer errors. For example:

  • Write a Python script to create a DynamoDB table with a partition key of username, a sort key of last_name, and provisioned throughput of 5 RCUs and 5 WCUs.
  • Pass the table name into the create table function as a variable.
  • Wait until the table exists before continuing.
  • Add logging with the default level of INFO.
  • Include try/except blocks, log a FATAL error and exit if the table fails to be created and a WARNING if the table already exists.
  • Add a function to delete the table and wait until the table is deleted before continuing.
  • Add a main function.
  • Add a function to insert a new item into a table and a function to get an item from a table.
  • Wrap the code in a Class called DynamoUtilities.
  • Convert the Python code to Java.

Below is an example of OpenAI ChatGPT’s response using a progressive code generation approach. The final code results from an initial prompt and a series of follow-up instructions to enhance and refine the code.

Python script generated by OpenAI ChatGPT Free Plan

Be aware that most chat-based tools have a session limit. For example, Bing has a limit of 15 chats per session, which will limit the number of follow-up instructions you can use to modify the code. Additionally, the slightest variation in how an instruction is phrased may significantly impact the code generated. There is a whole science and blossoming industry around prompt optimization!

IDE-based Code Generation

Tabnine, GitHub Copilot, and Amazon CodeWhisperer use generative AI technology to predict and suggest new lines or blocks of code based on their trained pre-model and the context and syntax of existing code and code comments within your IDE. These tools differ from tools like OpenAI ChatGPT, Microsoft Bing Chat, and ChatSonic, which can generate code using a chat-based interaction with the user. IDE-based tools like Tabnine, GitHub Copilot, and Amazon CodeWhisperer are like having an AI-powered paired programming partner that enhances your development productivity. However, these tools also require a reasonable level of development experience, in my opinion.

Below, we see an example of Tabnine Pro’s ability to generate whole-line code completion (line 11) and full-function code completion (lines 12–41) based on the existing code context and syntax (lines 1–11).

Code generation in Visual Studio Code using Tabnine Pro

Although writing code comments may be perceived as easier than using generative code completion, many industry pundits dissuade what they call “ comment-driven development.” Instead, they believe using generative AI-based code completion results in superior results.

Testing Generative AI Tools

With the assistance of each tool, I have written a Python script to perform the following tasks: 1) create an Amazon DynamoDB table, 2) insert an item into a table, 3) retrieve an item from a table, and finally, 4) delete a table. Although I tested the results in multiple IDEs, I only included the results from Visual Studio Code (VS Code). Tool interactions and code results were consistent across IDEs.

As a reference for “what good looks like” when evaluating code accuracy and quality, I referenced the examples provided with AWS’s Boto3 SDK and DynamoDB API documentation and the PEP 8 — Style Guide for Python Code.

AWS SDK for Python (Boto3) code examples for Amazon DynamoDB

An example of the final code, written with the assistance of GitHub Copilot, including unit tests, is available on GitHub.

"""
Purpose: Creates an Amazon DynamoDB table, adds an item to the table,
gets that item from the table, and finally deletes the table
Author(s): Gary A. Stafford and GitHub Copilot
Created: 2023-03-26
Usage: python3 github_copilot_test.py table_name
pytest github_copilot_test.py -v
"""
import boto3
from botocore.exceptions import ClientError
import logging
from logging import StreamHandler, Formatter
import sys
import unittest
from moto import mock_dynamodb
class DynamoUtilities:
def __init__(self):
self.dynamodb = boto3.resource("dynamodb")
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
handler = StreamHandler(stream=sys.stdout)
handler.setFormatter(Formatter(fmt="[%(asctime)s: %(levelname)s] %(message)s"))
self.logger.addHandler(handler)
# function to create a new DynamoDB table with the name users, a partition key of username, a sort key of last_name, provisioned throughput of 5 RCUs and 5 WCUs, passes table name into function as a variable
def create_table(self, table_name):
"""Create a table in the database.
Args:
table_name: the name of the table to create.
Returns:
True if the table was created, False if it already exists.
"""
try:
table = self.dynamodb.create_table(
TableName=table_name,
KeySchema=[
{"AttributeName": "username", "KeyType": "HASH"},
{"AttributeName": "last_name", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "username", "AttributeType": "S"},
{"AttributeName": "last_name", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
waiter = self.dynamodb.meta.client.get_waiter("table_exists")
waiter.wait(TableName=table_name)
self.logger.info(f"Table {table_name} created")
return True
except self.dynamodb.meta.client.exceptions.ResourceInUseException:
self.logger.warning(f"Table {table_name} already exists")
return False
except ClientError as e:
self.logger.error(e)
return False
# function to delete a DynamoDB table, passes table name into function as a variable
def delete_table(self, table_name):
"""Deletes the table with the given name.
Args:
table_name: The name of the table to delete.
Returns:
True if the table was deleted, False if the table was not found.
"""
try:
table = self.dynamodb.Table(table_name)
table.delete()
waiter = self.dynamodb.meta.client.get_waiter("table_not_exists")
waiter.wait(TableName=table_name)
self.logger.info(f"Table {table_name} deleted")
return True
except self.dynamodb.meta.client.exceptions.ResourceNotFoundException:
self.logger.warning(f"Table {table_name} not found")
return False
except ClientError as e:
self.logger.error(e)
return False
# function to add a new item to a DynamoDB table, passes table name and item into function as variables
def add_item(self, table_name, item):
"""Add an item to a table.
Args:
table_name: The name of the table.
item: The item to add.
Returns:
The response from the database, None if the table was not found.
"""
try:
table = self.dynamodb.Table(table_name)
response = table.put_item(Item=item)
self.logger.info(f"Added item: {item} to Table {table_name}")
return response
except self.dynamodb.meta.client.exceptions.ResourceNotFoundException:
self.logger.error(f"Table {table_name} not found")
return None
except ClientError as e:
self.logger.error(e)
return None
# function to get an item from a DynamoDB table, passes table name and key into function as variables
def get_item(self, table_name, key):
"""Return the value of the given key in a DynamoDB table.
Args:
table_name: The name of the DynamoDB table.
key: The key to retrieve.
Returns:
The value associated with the given key, None if the table was not found.
"""
try:
table = self.dynamodb.Table(table_name)
response = table.get_item(Key=key)
self.logger.info(f"Got item: {response['Item']} from Table {table_name}")
return response["Item"]
except self.dynamodb.meta.client.exceptions.ResourceNotFoundException:
self.logger.error(f"Table {table_name} not found")
return None
except ClientError as e:
self.logger.error(e)
return None
class DynamoUtilitiesTests(unittest.TestCase):
def setUp(self):
self.dynamo_utilities = DynamoUtilities()
self.table_name = "users"
@mock_dynamodb
def test_create_table(self):
self.dynamo_utilities.create_table(self.table_name)
dynamodb = boto3.client("dynamodb")
table_list = dynamodb.list_tables()["TableNames"]
self.assertIn(self.table_name, table_list)
@mock_dynamodb
def test_add_item(self):
self.dynamo_utilities.create_table(self.table_name)
item = {
"username": "test_user",
"last_name": "test_last_name",
"first_name": "test_first_name",
"email": "test_email",
"age": 20,
"account_type": "standard_user",
"last_login": "2020-01-01 00:00:00",
"active": True,
}
self.dynamo_utilities.add_item(self.table_name, item)
key = {"username": "test_user", "last_name": "test_last_name"}
response = self.dynamo_utilities.get_item(self.table_name, key)
self.assertNotEqual(response, None)
@mock_dynamodb
def test_delete_table(self):
dynamodb = boto3.client("dynamodb")
dynamodb.create_table(
TableName=self.table_name,
KeySchema=[
{"AttributeName": "username", "KeyType": "HASH"},
{"AttributeName": "last_name", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "username", "AttributeType": "S"},
{"AttributeName": "last_name", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
self.assertIn(self.table_name, dynamodb.list_tables()["TableNames"])
self.dynamo_utilities.delete_table(self.table_name)
self.assertNotIn(self.table_name, dynamodb.list_tables()["TableNames"])
def main():
table_name = sys.argv[1] if len(sys.argv) > 1 else "users"
logging.info(f"Table name: {table_name}")
dynamo_utilities = DynamoUtilities()
# create a new table
dynamo_utilities.create_table(table_name)
# add a new item to the table we created
item = {
"username": "janedoe",
"first_name": "Jane",
"last_name": "Doe",
"email": "jdoe@example.com",
"age": 20,
"account_type": "standard_user",
"last_login": "2020-01-01 00:00:00",
"active": True,
}
dynamo_utilities.add_item(table_name, item)
# get the item we just added to the table
key = {"username": "janedoe", "last_name": "Doe"}
dynamo_utilities.get_item(table_name, key)
# delete the table we created
dynamo_utilities.delete_table(table_name)
if __name__ == "__main__":
main()

OpenAI ChatGPT

Introducing OpenAI ChatGPT

According to Wikipedia, ChatGPT (Generative Pre-trained Transformer) “is an artificial intelligence chatbot developed by OpenAI and launched in November 2022. It is built on top of OpenAI GPT-3 and GPT-4 [released March 14, 2023] families of large language models [LLMs] and has been fine-tuned (an approach to transfer learning) using both supervised and reinforcement learning techniques.

According to the VC-backed California-based startup OpenAI, “ChatGPT interacts in a conversational way. The dialogue format makes it possible for ChatGPT to answer follow-up questions, admit its mistakes, challenge incorrect premises, and reject inappropriate requests.” Further, “ChatGPT was optimized for dialogue using Reinforcement Learning with Human Feedback (RLHF), which uses human demonstrations and preference comparisons to guide the model toward desired behavior.

Getting Started with ChatGPT

You can get started with ChatGPT for free using the Free Plan. However, if you want to use the latest generative AI models, get faster results, and ensure availability, you should consider ChatGPT Plus for $20/month. GPT-4 is now available through the ChatGPT Plus Plan.

OpenAI ChatGPT’s current plans and pricing

ChatGPT’s Free Plan is a great way to test the generative AI waters. However, due to potentially slow chat responses and unavailability during peak times, you will want to upgrade to ChatGPT Plus if your team plans to rely on the tool.

The downside of the Free Plan is unavailability during peak times

Using ChatGPT

OpenAI ChatGPT is available through a web-based chat interface or programmatically using OpenAI APIs. For API users, OpenAI provides online API references, code examples, and an interactive coding playground. According to OpenAI, “The OpenAI API can be applied to virtually any task that involves understanding or generating natural language, code, or images.

OpenAI API documentation

Below is an example of output using ChatGPT’s UI. Using the UI, ChatGPT’s results are a mix of code blocks and explanatory text. ChatGPT has a straightforward, clean, and functional web-based user interface. There is a button to copy the resulting code to your clipboard, and previous chat sessions are saved and accessible.

Python script generated by OpenAI ChatGPT chat-based UI

ChatGPT Results

In my tests, using a progressive chat approach to building the code with concise instructional questions versus a longer-form narrative approach produced better-quality code. The code created using a progressive chat approach was accurate, error-free, and well-written.

A significant advantage of ChatGPT is its ability to remember what happened earlier in the conversation. According to OpenAI documentation, the model can reference up to approximately 3k words (or 4k tokens) from the current conversation; any information beyond that is not stored.

Final OpenAI ChatGPT Python script successfully ran in VS Code

Microsoft Bing Chat

Microsoft’s Bing Chat running on Microsoft’s Edge web browser

According to their blog post on February 7, Microsoft launched “an all new, AI-powered Bing search engine and Edge browser, available in preview now at Bing.com, to deliver better search, more complete answers, a new chat experience and the ability to generate content. We think of these tools as an AI copilot for the web.” The latest version of Bing “is running on a new, next-generation OpenAI large language model that is more powerful than ChatGPT and customized specifically for search. It takes key learnings and advancements from ChatGPT and GPT-3.5 — and it is even faster, more accurate and more capable.” On March 14, Microsoft confirmed the new Bing runs on OpenAI’s latest GPT-4.

Getting Started with Bing Chat

Bing’s Chat mode is only available when you access Bing using Microsoft Edge.

Microsoft Edge is available for most major platforms, including MacOS, iOS, and Android.

Using Bing Chat

Bing Chat now supports up to 15 chats per session and 150 per day. Users can supply up to 15 prompts (questions or instructions) to create and improve the code results. When responding, Bing considers the context of previous prompts in the same chat session.

Python script being generated by Bing’s Chat feature

Bing Chat Results

Like OpenAI ChatGPT, Bing Chat delivered accurate, error-free, and well-written code. Although ChatGPT and Bing Chat are not IDE-based tools, the resulting code is easily copied into your project or saved to disk. Additionally, these tools provide an excellent opportunity to learn new languages and coding techniques without requiring an IDE.

Final Bing Chat Python script successfully ran in VS Code

ChatSonic

ChatSonic, “Your personalised AI-powered chatbot”

ChatSonic is a service of Writesonic. According to Y Combinator, startup Writesonic, founded by Samanyou Garg, is backed by leading venture capital firms, including Y Combinator, HOF Capital, Rebel Fund, Soma Capital, Broom Ventures, Amino Capital, and some of the best angels from different industries.

ChatSonic claims to improve upon the limitations of ChatGPT. According to Wordsonic, ChatSonic is “a powerful chatbot, powered by Google Search, allowing it to provide up-to-date and accurate content and hence superior to ChatGPT, which can not generate content on topics after September 2021. Additionally, it can create digital images and respond to voice commands and many more additional features. So, ChatSonic has addressed all the limitations of ChatGPT.

Using ChatSonic

You can get started with ChatSonic for free, with up to 10,000 words. However, like OpenAI ChatGPT, if you want to take advantage of GPT-4, you will want to upgrade to a paid plan, starting at $13/month with an annual commitment.

ChatSonic’s current plans and pricing

Using ChatSonic is nearly identical to using OpenAI ChatGPT.

Writing a Python script with ChatSonic, a service of Writesonic

During my testing for this article, I ran into several issues with ChatSonic’s Premium Plan’s Free Trial. For example, the code block constantly flashed and refreshed on the screen using Chrome for Mac; a minor issue but highly annoying. Also, when copying or downloading code results, you get both code and the informational text, which you must separate. Worse, the final code block did not always include all the code output; the code was intermixed with informational text. Lastly, when using follow-up instructions to modify the results, the regenerated code often missed previous code modifications. In my tests, after several follow-up instructions, the resulting code was missing up to one-third of the previous statements. Due to these issues, the resulting code block was malformed and unrunnable. The more I chatted with ChatSonic, the worse the code issues became, and the issues were repeatable.

The final Python script with ChatSonic lost several lines of code as the chat progressed
The final Python script with ChatSonic lost several lines of code as the chat progressed

ChatSonic Results

Unlike ChatGPT and Bing Chat, ChatSonic’s Premium Plan’s Free Trial did not deliver accurate, error-free, or well-written code during my tests. Although later tests with fewer follow-up instructions provided better though incomplete results, my first impressions were unfavorable. The results might be improved using ChatSonic’s Superior or Ultimate Plans, which use the newest GPT-4 model. However, given the results of Premium, I was not keen to invest in the paid plan to find out.

Final ChatSonic incomplete Python script with fixes run in VS Code

Tabnine

Tabnine: the “AI assistant for software developers”

According to the VC-backed, Israel-based startup Tabnine, “Tabnine is to create and deliver a top-to-bottom AI-assisted development workflow that empowers all code creators, in all languages, from concept through to completion.” Tabnine Pro serves whole-line, full-function, and natural language to code completions. Regarding AI, in Tabline’s opinion, “a multi-model approach far outperforms a monolithic approach. That’s why we’ve developed language-specific code native AI models, which are pre-trained on code and provide faster and more accurate code completions based on your tech stack.” To learn more about their models, read Tabnine’s blog post, Tabnine Enterprise vs. ChatGPT Plus.

Getting Started with Tabline

You can get started with Tabline for free with the Starter plan. However, the features of the Starter plan are limited compared to the Pro Plan, which starts at $12/month for a single user.

Tabline’s current plans and pricing

You can try out the Pro plan’s features, which are free for 14 days.

Tabnine Pro’s 14-day free trial

Tabnine supports a wide range of programming languages.

Tabnine supports a wide range of programming languages

To get started with Tabline, choose your favorite IDE and install the required extensions or plugins.

Tabnine offers support for all major IDEs

Tabnine provides automated or manual IDE installation.

Tabnine provides automated or manual IDE installation

The Tabnine extension for VS Code already has a staggering 5.1 million downloads! A valuable tip learned from testing the IDE-based tools was to disable all non-essential extensions to give me a baseline of the tool’s capabilities. Initially, with over eighty VS Code extensions enabled, I found it hard to separate and understand each tool’s features and potential conflicts with other extensions I had loaded, especially other generative AI tools.

Installing the Tabnine extension for VS Code

Using Tabnine

As discussed earlier, Tabnine, like GitHub Copilot and Amazon CodeWhisperer, uses generative AI technology to predict and suggest subsequent lines of code based on context and syntax within your IDE. It is like having IntelliSense on steroids.

Like Copilot and CodeWhisperer, it takes some practice to get efficient with Tabnine. After starting with the initial import and logging statements, I used code comments to generate the functions, then returned to add exception-catching and logging.

Tabnine, like Copilot and CodeWhisperer, includes a collapsible navigation bar. However, unlike those tools, I found Tabnine’s navigation bar provided little value, in my opinion. It seemed more like marketing stats and a way to encourage more users to sign-up rather than providing useful development features like Copilot and CodeWhisperer.

Below, we see an example of Tabnine Pro’s ability to generate whole-line code completion (line 11) and full-function code completion (lines 12–41) based on the existing code context and syntax (lines 1–11).

Code generation in VS Code using Tabnine Pro

Tabnine Results

Given my development knowledge and experience with similar tools, I found the final code results with Tabnine Pro were accurate, error-free, and well-formed (Pythonic). However, I would have liked to see additional features to differentiate Tabnine from its competitors.

Final Tabnine Pro Python script successfully ran in VS Code

GitHub Copilot

GitHub Copilot: “Your AI pair programmer”

According to GitHub, “Copilot is an AI pair programmer that offers autocomplete-style suggestions as you code. You can receive suggestions from GitHub Copilot either by starting to write the code you want to use or by writing a natural language comment describing what you want the code to do. In addition, GitHub Copilot analyzes the context in the file you are editing and related files, and offers suggestions from within your text editor.” Copilot is powered by OpenAI Codex, a new AI system created by OpenAI.

Given the vast popularity and volume of code on GitHub, it is no wonder Copilot is trained in all languages that appear in GitHub’s public repositories. GitHub points out that the quality of suggestions you receive may depend on the volume and diversity of training data for that language.

GitHub Copilot X

GitHub is extending Copilot’s capabilities, with the announcement of GitHub Copolit X on March 21, 2023. According to GitHub, “With chat and terminal interfaces, support for pull requests, and early adoption of OpenAI’s GPT-4, GitHub Copilot X is our vision for the future of AI-powered software development. Integrated into every part of your workflow.

Sign up to join the waiting list for GitHub Copilot X

Getting Started with Copilot

Get started with GitHub Copilot for as little as $10/month. This Monthly Plan is a great way to test Copilot’s capabilities. In addition, GitHub is currently offering a 60-day free trial of Copilot.

GitHub Copilot pricing plans and sign-up

Regarding IDE support, Copilot is currently available as an extension in Visual Studio Code, Visual Studio, Neovim, and the JetBrains suite of IDEs. The GitHub Copilot extension already has 4.2 million downloads, and the GitHub Copilot Nightly extension, used for this post, has almost 250,000 downloads.

Installing the GitHub Copilot Nightly extension for VS Code

GitHub Copilot can also be used with Codespaces, GitHub’s fully configured development environments in the Cloud based on VS Code, similar to AWS Cloud9.

Using Copilot

As discussed earlier, GitHub Copilot, like Tabnine and Amazon CodeWhisperer, uses generative AI technology to predict and suggest new lines of code based on existing context and syntax. In addition, Copilot includes an inline pop-up toolbar, which makes it easier to scroll through and accept code choices.

Copilot’s inline pop-up toolbar to review and accept code changes

Copilot also has an extensive collapsible navigation bar, which can explain the code, offer language translations, provide access to Copilot’s Brushes, and can even generate tests. My only disappointment with Copilot is that test generation is currently only supported for JavaScript and TypeScript, given Python’s enormous popularity.

Using Copilot’s Document Brush to create automated documentation of Python functions

Below, we see an example of Copilot’s ability to generate whole-line code completion (line 14) and full-function code completion (lines 15–34) based on the existing code context and syntax (lines 1–14).

Code generation in VS Code using GitHub Copilot

Copilot Results

Given my development knowledge and experience with similar tools, the final code results with Copilot were accurate, error-free, and well-formed. I found Copilot’s additional features, especially those found on its collapsible navigation bar, extended its functionality well beyond most similar generative AI tools I tested. Lastly, I felt Copilot’s ability to learn from the existing code context and syntax was superior to other tools.

Final Copilot Python script successfully ran in VS Code

Amazon CodeWhisperer (Preview)

Amazon CodeWhisperer: your “ML-powered coding companion”

According to AWS, Amazon CodeWhisperer, announced in June 2022, “is a general purpose, machine learning-powered code generator that provides you with code recommendations, in real time. As you write code, CodeWhisperer automatically generates suggestions based on your existing code and comments. Your personalized recommendations can vary in size and scope, ranging from a single line comment to fully formed functions.” CodeWhisperer code generation is powered by ML models trained on various data sources, including Amazon and open-source code.

CodeWhisperer currently supports Java, JavaScript, Python, C#, and TypeScript. Although these are some of the most popular languages, CodeWhisperer’s language support is limited compared to Tabnine and Copilot.

Getting Started with CodeWhisperer

It is easy to get started with CodeWhisperer. Unfortunately, final pricing is unavailable since Amazon CodeWhisperer is still in Preview as of late March 2023.

Amazon CodeWhisperer’s Pricing and Availability FAQs

CodeWhisperer enhances your IDE using AWS Toolkit for JetBrains, AWS Toolkit for Visual Studio Code, AWS Lambda console, and AWS Cloud9. With Lambda and AWS Cloud9, the setup simply involves activating CodeWhisperer within the IDE.

Installing the AWS Toolkit extension for VS Code

Getting started with CodeWhisperer requires installing the AWS Toolkit if applicable, choosing your authentication method, and setting up your Builder ID, IAM Identity Center (AWS SSO), or IAM.

Optionally setting up CodeWhisperer from the AWS Management Console for IAM

AWS Builder ID

According to the documentation, “the AWS Builder ID is a new personal profile for everyone who builds on AWS. Your AWS Builder ID provides access to tools and builder services on AWS, including Amazon CodeCatalyst and Amazon CodeWhisperer. You can keep your AWS Builder ID as you move between jobs, schools, or other organizations. AWS Builder IDs are free. You only pay for the AWS resources you consume in your AWS accounts.

Connecting to Amazon CodeWhisperer using AWS Toolkit for VS Code

Using CodeWhisperer

Nearly identical to Copilot and Tabnine, Amazon CodeWhisperer provides real-time code recommendations in your IDE. Unique features of CodeWhisperer include first-class support for AWS APIs, built-in security scans, a code reference tracker, and responsible AI/ML, which avoids bias by filtering out code recommendations that might be considered biased and unfair.

Below, we see an example of CodeWhisperer’s ability to generate whole-line code completion (line 14) and full-function code completion (lines 15–27) based on the existing code context and syntax (lines 1–14).

Code generation in VSCode using Amazon CodeWhisperer

CodeWhisperer Results

Like my feedback on Tabnine and Copilot, the final code results with CodeWhisperer were accurate, error-free, and well-formed. In addition, I appreciated the added benefits of CodeWhisperer’s Security Scan and Reference Tracker to identify significant code from other projects.

Final CodeWhisperer Python script successfully run in VS Code

Other Options

There are many other tools in the Generative AI coding category of comparable quality to the six featured in this post. New tools are being released on an almost daily basis.

Google Bard

On March 21, 2023, Google announced limited access to Bard, an early experiment that lets you collaborate with generative AI. They are beginning with the U.S. and the U.K. and will expand to more countries and languages over time. Bard is based on Google’s LaMDA (Language Model for Dialogue Applications), a transformer-based model.

Signing up on the wait list for Google Bard

Azure OpenAI Service with GPT-4

On March 21, 2023, Microsoft announced that GPT-4 was available in preview in Azure OpenAI Service. Unfortunately, Azure OpenAI Service requires registration and is currently only available to approved enterprise customers and partners. To get started, if you are qualified, you must trudge through the 25-question form.

25-question request form for qualified customers

Replit Ghostwriter

California-based VC-backed startup Replit has put the power of Replit’s AI into the world’s most popular online IDE. Replit “automates away the repetitive parts of coding, so you can stay focused on making your creative vision a reality.” Replit announced Ghostwriter Chat, which allows you to chat with a coding AI directly in your IDE. It now has a proactive debugger and awareness of your project’s code.

Getting started with Replit Ghostwriter

Conclusion

In this post, we examined six popular generative AI-powered coding tools, including chat-based OpenAI ChatGPT, Microsoft’s all-new Bing Chat, and ChatSonic, as well as IDE-based GitHub Copilot, Amazon CodeWhisperer (Preview), and Tabnine. Each tool assisted with developing an identical program to complete everyday database tasks on AWS. We then compared and contrasted the ease of use and the resulting code accuracy and quality of the tools. The generative AI space is moving at a breakneck pace. Tools continue to rapidly improve their AI models, add new features, and adjust pricing.

Recommended References

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Brief Introduction to Observability on AWS

Explore the wide variety of Application Performance Monitoring (APM) and Observability options on AWS

Licensed image: ArtemisDiana/Shutterstock.com
Licensed image: ArtemisDiana/Shutterstock.com

APM and Observability

Observability is “the extent to which the internal states of a system can be inferred from externally available data” (Gartner). The three pillars of observability data are metricslogs, and tracesApplication Performance Monitoring (APM), a term commonly associated with observability, is “software that enables the observation and analysis of application health, performance, and user experience” (Gartner).

Additional features often associated with APM and observability products and services include the following (in alphabetical order):

  • Advanced Threat Protection (ATP)
  • Endpoint Detection and Response (EDR)
  • Incident Detection Response (IDR)
  • Infrastructure Performance Monitoring (IPM)
  • Network Device Monitoring (NDM)
  • Network Performance Monitoring (NPM)
  • OpenTelemetry (OTel)
  • Operational (or Operations) Intelligence Platform
  • Predictive Monitoring (predictive analytics / predictive modeling)
  • Real User Monitoring (RUM)
  • Security Information and Event Management (SIEM)
  • Security Orchestration, Automation, and Response (SOAR)
  • Synthetic Monitoring (directed monitoring / synthetic testing)
  • Threat Visibility and Risk Scoring
  • Unified Security and Observability Platform
  • User Behavior Analytics (UBA)

Not all features are offered by all vendors. Most vendors tend to specialize in one or more areas. Determining which features are essential to your organization before choosing a solution is vital.

AI/ML

Given the growing volume and real-time nature of observability telemetry, many vendors have started incorporating AI and ML into their products and services to improve correlation, anomaly detection, and mitigation capabilities. Understand how these features can reduce operational burden, enhance insights, and simplify complexity.

Decision Factors

APM and observability tooling choices often come down to a “Build vs. Buy” decision for organizations. In the Cloud, this usually means integrating several individual purpose-built products and services, self-managed open-source projects, or investing in an end-to-end APM or unified observability platform. Other decision factors include the need for solutions to support:

  • Hybrid cloud environments (on-premises/Cloud)
  • Multi-cloud environments (Public Cloud, SaaS, Supercloud)
  • Specialized workloads (e.g., Mainframes, HPC, VMware, SAP, SAS)
  • Compliant workloads (e.g., PCI DSS, PII, GDPR, FedRAMP)
  • Edge Computing and IoT/IIoT
  • AI, ML, and Data Analytics monitoring (AIOps, MLOps, DataOps)
  • SaaS observability (SaaS providers who offer monitoring to their end-users as part of their service offering)
  • Custom log formats and protocols

Finally, the 5 V’s of big data: Velocity, Volume, Value, Variety, and Veracity, also influence the choice of APM and observability tooling. The real-time nature of the observability data, the sheer volume of the data, the source and type of data, and the sensitivity of the data, will all guide tooling choices based on features and cost.

Organizations can choose fully-managed native AWS services, AWS Partner products and services, often SaaS, self-managed open-source observability tooling, or a combination of options. Many AWS and Partner products and services are commercial versions of popular open-source software (COSS).

AWS Options

Data Collection, Processing, and Forwarding

  • AWS Distro for OpenTelemetry (ADOT): Open-source APIs, libraries, and agents to collect distributed traces and metrics for application monitoring. ADOT is an open-source distribution of OpenTelemetry, a “high-quality, ubiquitous, and portable telemetry [solution] to enable effective observability.
  • AWS for Fluent Bit: Fluent Bit image with plugins for both CloudWatch Logs and Kinesis Data Firehose. Fluent Bit is an open-source, “super fast, lightweight, and highly scalable logging and metrics processor and forwarder.

Security-focused Monitoring

  • AWS CloudTrail: Helps enable operational and risk auditing, governance, and compliance in your AWS environment. CloudTrail records events, including actions taken in the AWS Management Console, AWS Command Line Interface (CLI), AWS SDKs, and APIs.

Partner Options

According to the 2022 Gartner® Magic Quadrant™ — APM & Observability Report, leading vendors commonly used by AWS customers include the following (in alphabetical order):

Open Source Options

There are countless open-source observability projects to choose from, including the following (in alphabetical order):

  • Elastic: Elastic Stack: Elasticsearch, Kibana, Beats, and Logstash
  • Fluentd: Data collector for unified logging layer
  • Grafana: Platform for monitoring and observability
  • Jaeger: End-to-end distributed tracing
  • Kiali: Configure, visualize, validate, and troubleshoot Istio Service Mesh
  • Loki: Like Prometheus, but for logs
  • OpenSearch: Scalable, flexible, and extensible software suite for search, analytics, and observability applications
  • OpenTelemetry (OTel): Collection of tools, APIs, and SDKs used to instrument, generate, collect, and export telemetry data
  • Prometheus: Monitoring system and time series database
  • Zabbix: Single pane of glass view of your whole IT infrastructure stack

🔔 To keep up with future content, follow Gary Stafford on LinkedIn.


This blog represents my viewpoints and not those of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

Building Data Lakes on AWS with Kafka Connect, Debezium, Apicurio Registry, and Apache Hudi

Learn how to build a near real-time transactional data lake on AWS using a combination of Open Source Software (OSS) and AWS Services

Introduction

In the following post, we will explore one possible architecture for building a near real-time transactional data lake on AWS. The data lake will be built using a combination of open source software (OSS) and fully-managed AWS services. Red Hat’s Debezium, Apache Kafka, and Kafka Connect will be used for change data capture (CDC). In addition, Apache Spark, Apache Hudi, and Hudi’s DeltaStreamer will be used to manage the data lake. To complete our architecture, we will use several fully-managed AWS services, including Amazon RDS, Amazon MKS, Amazon EKS, AWS Glue, and Amazon EMR.

The data lake architecture used in this post’s demonstration

Source Code

The source code, configuration files, and a list of commands shown in this post are open-sourced and available on GitHub.

Kafka

According to the Apache Kafka documentation, “Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.” For this post, we will use Apache Kafka as the core of our change data capture (CDC) process. According to Wikipedia, “change data capture (CDC) is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. CDC is an approach to data integration that is based on the identification, capture, and delivery of the changes made to enterprise data sources.” We will discuss CDC in greater detail later in the post.

There are several options for Apache Kafka on AWS. We will use AWS’s fully-managed Amazon Managed Streaming for Apache Kafka (Amazon MSK) service. Alternatively, you could choose industry-leading SaaS providers, such as ConfluentAivenRedpanda, or Instaclustr. Lastly, you could choose to self-manage Apache Kafka on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS).

Kafka Connect

According to the Apache Kafka documentation, “Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency.

There are multiple options for Kafka Connect on AWS. You can use AWS’s fully-managed, serverless Amazon MSK Connect. Alternatively, you could choose a SaaS provider or self-manage Kafka Connect yourself on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). I am not a huge fan of Amazon MSK Connect during development. In my opinion, iterating on the configuration for a new source and sink connector, especially with transforms and external registry dependencies, can be painfully slow and time-consuming with MSK Connect. I find it much faster to develop and fine-tune my sink and source connectors using a self-managed version of Kafka Connect. For Production workloads, you can easily port the configuration from the native Kafka Connect connector to MSK Connect. I am using a self-managed version of Kafka Connect for this post, running in Amazon EKS.

Example Production-ready architecture using Amazon services for CDC

Debezium

According to the Debezium documentation, “Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases.” Regarding Kafka Connect, according to the Debezium documentation, “Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS). Connectors record the history of data changes in the DBMS by detecting changes as they occur and streaming a record of each change event to a Kafka topic. Consuming applications can then read the resulting event records from the Kafka topic.

Source Connectors

We will use Kafka Connect along with three Debezium connectors, MySQL, PostgreSQL, and SQL Server, to connect to three corresponding Amazon Relational Database Service (RDS) databases and perform CDC. Changes from the three databases will be represented as messages in separate Kafka topics. In Kafka Connect terminology, these are referred to as Source Connectors. According to Confluent.io, a leader in the Kafka community, “source connectors ingest entire databases and stream table updates to Kafka topics.

Sink Connector

We will stream the data from the Kafka topics into Amazon S3 using a sink connector. Again, according to Confluent.io, “sink connectors deliver data from Kafka topics to secondary indexes, such as Elasticsearch, or batch systems such as Hadoop for offline analysis.” We will use Confluent’s Amazon S3 Sink Connector for Confluent Platform. We can use Confluent’s sink connector without depending on the entire Confluent platform.

There is also an option to use the Hudi Sink Connector for Kafka, which promises to greatly simplify the processes described in this post. However, the RFC for this Hudi feature appears to be stalled. Last updated in August 2021, the RFC is still in the initial “Under Discussion” phase. Therefore, I would not recommend the connectors use in Production until it is GA (General Availability) and gains broader community support.

Securing Database Credentials

Whether using Amazon MSK Connect or self-managed Kafka Connect, you should ensure your database, Kafka, and schema registry credentials, and other sensitive configuration values are secured. Both MSK Connect and self-managed Kafka Connect can integrate with configuration providers that implement the ConfigProvider class interface, such as AWS Secrets ManagerHashiCorp VaultMicrosoft Azure Key Vault, and Google Cloud Secrets Manager.

Example of database credentials safely stored AWS Secrets Manager

For self-managed Kafka Connect, I prefer Jeremy Custenborder’s kafka-config-provider-aws plugin. This plugin provides integration with AWS Secrets Manager. A complete list of Jeremy’s providers can be found on GitHub. Below is a snippet of the Secrets Manager configuration from the connect-distributed.properties files, which is read by Apache Kafka Connect at startup.

https://garystafford.medium.com/media/db6ff589c946fcd014546c56d0255fe5

Apache Avro

The message in the Kafka topic and corresponding objects in Amazon S3 will be stored in Apache Avro format by the CDC process. The Apache Avro documentation states, “Apache Avro is the leading serialization format for record data, and the first choice for streaming data pipelines.” Avro provides rich data structures and a compact, fast, binary data format.

Again, according to the Apache Avro documentation, “Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.” When Avro data is stored in a file, its schema can be stored with it so any program may process files later.

Alternatively, the schema can be stored separately in a schema registry. According to Apicurio Registry, “in the messaging and event streaming world, data that are published to topics and queues often must be serialized or validated using a Schema (e.g. Apache Avro, JSON Schema, or Google protocol buffers). Schemas can be packaged in each application, but it is often a better architectural pattern to instead register them in an external system [schema registry] and then referenced from each application.

Schema Registry

Several leading open-source and commercial schema registries exist, including Confluent Schema RegistryAWS Glue Schema Registry, and Red Hat’s open-source Apicurio Registry. In this post, we use a self-managed version of Apicurio Registry running on Amazon EKS. You can relatively easily substitute AWS Glue Schema Registry if you prefer a fully-managed AWS service.

Apicurio Registry

According to the documentation, Apicurio Registry supports adding, removing, and updating OpenAPI, AsyncAPI, GraphQL, Apache Avro, Google protocol buffers, JSON Schema, Kafka Connect schema, WSDL, and XML Schema (XSD) artifact types. Furthermore, content evolution can be controlled by enabling content rules, including validity and compatibility. Lastly, the registry can be configured to store data in various backend storage systems depending on the use case, including Kafka (e.g., Amazon MSK), PostgreSQL (e.g., Amazon RDS), and Infinispan (embedded).

Post’s architecture using Amazon MSK, self-managed Kafka Connect, and Apicurio Registry for CDC

Data Lake Table Formats

Three leading open-source, transactional data lake storage frameworks enable building data lake and data lakehouse architectures: Apache IcebergLinux Foundation Delta Lake, and Apache Hudi. They offer comparable features, such as ACID-compliant transactional guarantees, time travel, rollback, and schema evolution. Using any of these three data lake table formats will allow us to store and perform analytics on the latest version of the data in the data lake originating from the three Amazon RDS databases.

Apache Hudi

According to the Apache Hudi documentation, “Apache Hudi is a transactional data lake platform that brings database and data warehouse capabilities to the data lake.” The specifics of how the data is laid out as files in your data lake depends on the Hudi table type you choose, either Copy on Write (CoW) or Merge On Read (MoR).

Like Apache Iceberg and Delta Lake, Apache Hudi is partially supported by AWS’s Analytics services, including AWS GlueAmazon EMRAmazon AthenaAmazon Redshift Spectrum, and AWS Lake Formation. In general, CoW has broader support on AWS than MoR. It is important to understand the limitations of Apache Hudi with each AWS analytics service before choosing a table format.

If you are looking for a fully-managed Cloud data lake service built on Hudi, I recommend Onehouse. Born from the roots of Apache Hudi and founded by its original creator, Vinoth Chandar (PMC chair of the Apache Hudi project), the Onehouse product and its services leverage OSS Hudi to offer a data lake platform similar to what companies like Uber have built.

Hudi DeltaStreamer

Hudi also offers DeltaStreamer (aka HoodieDeltaStreamer) for streaming ingestion of CDC data. DeltaStreamer can be run once or continuously, using Apache Spark, similar to an Apache Spark Structured Streaming job, to capture changes to the raw CDC data and write that data to a different part of our data lake.

DeltaStreamer also works with Amazon S3 Event Notifications instead of running continuously. According to DeltaStreamer’s documentation, Amazon S3 object storage provides an event notification service to post notifications when certain events happen in your S3 bucket. AWS will put these events in Amazon Simple Queue Service (Amazon SQS). Apache Hudi provides an S3EventsSource that can read from Amazon SQS to trigger and process new or changed data as soon as it is available on Amazon S3.

Sample Data for the Data Lake

The data used in this post is from the TICKIT sample database. The TICKIT database represents the backend data store for a platform that brings buyers and sellers of tickets to entertainment events together. It was initially designed to demonstrate Amazon Redshift. The TICKIT database is a small database with approximately 425K rows of data across seven tables: Category, Event, Venue, User, Listing, Sale, and Date.

A data lake most often contains data from multiple sources, each with different storage formats, protocols, and connection methods. To simulate these data sources, I have separated the TICKIT database tables to represent three typical enterprise systems, including a commercial off-the-shelf (COTS) E-commerce platform, a custom Customer Relationship Management (CRM) platform, and a SaaS-based Event Management System (EMS). Each simulated system uses a different Amazon RDS database engine, including MySQL, PostgreSQL, and SQL Server.

Breakdown of TICKIT tables and database engines

Enabling CDC for Amazon RDS

To use Debezium for CDC with Amazon RDS databases, minor changes to the default database configuration for each database engine are required.

CDC for PostgreSQL

Debezium has detailed instructions regarding configuring CDC for Amazon RDS for PostgreSQL. Database parameters specify how the database is configured. According to the Debezium documentation, for PostgreSQL, set the instance parameter rds.logical_replication to 1 and verify that the wal_level parameter is set to logical. It is automatically changed when the rds.logical_replication parameter is set to 1. This parameter is adjusted using an Amazon RDS custom parameter group. According to the AWS documentation, “with Amazon RDS, you manage database configuration by associating your DB instances and Multi-AZ DB clusters with parameter groups. Amazon RDS defines parameter groups with default settings. You can also define your own parameter groups with customized settings.

Example Amazon RDS Parameter groups with CDC configuration

CDC for MySQL

Similarly, Debezium has detailed instructions regarding configuring CDC for MySQL. Like PostgreSQL, MySQL requires a custom DB parameter group.

Example Amazon RDS Parameter groups with CDC configuration
Example Amazon RDS Parameter groups with CDC configuration
Example Amazon RDS Parameter groups with CDC configuration

CDC for SQL Server

Lastly, Debezium has detailed instructions for configuring CDC with Microsoft SQL Server. Enabling CDC requires enabling CDC on the SQL Server database and table(s) and creating a new filegroup and associated file. Debezium recommends locating change tables in a different filegroup than you use for source tables. CDC is only supported with Microsoft SQL Server Standard Edition and higher; Express and Web Editions are not supported.

— enable the database for CDC
EXEC msdb.dbo.rds_cdc_enable_db 'tickit'
USE tickit
GO
SELECT * FROM sys.filegroups;
SELECT * FROM sys.database_files;
— add new filegroup to database
ALTER DATABASE tickit ADD FILEGROUP CDC_FG1;
— add new file to filegroup
ALTER DATABASE tickit
ADD FILE
(
NAME = cdc_data1,
FILENAME = 'D:\rdsdbdata\DATA\sqldba_data1.ndf',
SIZE = 500 MB,
FILEGROWTH = 50 MB
) TO FILEGROUP CDC_FG1;
— enabling cdc for a sql server table
EXEC sys.sp_cdc_enable_table
@source_schema = N'crm',
@source_name = N'user',
@role_name = N'admin',
@filegroup_name = N'CDC_FG1',
@supports_net_changes = 0
GO
view raw ms_sql_cdc.sql hosted with ❤ by GitHub

Kafka Connect Source Connectors

There is a Kafka Connect source connector for each of the three Amazon RDS databases, all of which use Debezium. CDC is performed, moving changes from the three databases into separate Kafka topics in Apache Avro format using the source connectors. The connector configuration is nearly identical whether you are using Amazon MSK Connect or a self-managed version of Kafka Connect.

UI for Apache Kafka showing the three default Kafka Connect topics

As shown above, I am using the UI for Apache Kafka by Provectus, self-managed on Amazon EKS, in this post.

PostgreSQL Source Connector

The source_connector_postgres_kafka_avro_tickit source connector captures all changes to the three tables in the Amazon RDS for PostgreSQL ticket database’s ems schema: categoryevent, and venue. These changes are written to three corresponding Kafka topics as Avro-format messages: tickit.ems.categorytickit.ems.event, and tickit.ems.venue. The messages are transformed by the connector using Debezium’s unwrap transform. Schemas for the messages are written to Apicurio Registry.

{
"name": "source_connector_postgres_kafka_avro_tickit",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "tickit",
"database.hostname": "${secretManager:demo/postgres:host}",
"database.password": "${secretManager:demo/postgres:password}",
"database.port": 5432,
"database.server.name": "tickit",
"database.user": "${secretManager:demo/postgres:username}",
"decimal.handling.mode": "double",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;,
"plugin.name": "pgoutput",
"schema.name.adjustment.mode": "avro",
"table.include.list": "ems.category,ems.event,ems.venue",
"tasks.max": 1,
"topic.prefix": "tickit",
"transforms": "unwrap",
"transforms.unwrap.add.fields": "name,op,db,table,schema,lsn,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;
}
}

MySQL Source Connector

The source_connector_mysql_kafka_avro_tickit source connector captures all changes to the three tables in the Amazon RDS for PostgreSQL ecomm database: datelisting, and sale. These changes are written to three corresponding Kafka topics as Avro-format messages: tickit.ecomm.datetickit.ecomm.listing, and tickit.ecomm.sale.

{
"name": "source_connector_mysql_kafka_avro_tickit",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "${secretManager:demo/mysql:host}",
"database.include.list": "ecomm",
"database.password": "${secretManager:demo/mysql:password}",
"database.port": 3306,
"database.server.id": 184054,
"database.user": "${secretManager:demo/mysql:username}",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;,
"schema.name.adjustment.mode": "avro",
"schema.history.internal.kafka.bootstrap.servers": "${secretManager:demo/mysql:bootstrap.servers}",
"schema.history.internal.kafka.topic": "schemahistory.tickit",
"tasks.max": 1,
"topic.prefix": "tickit",
"transforms": "unwrap",
"transforms.unwrap.add.fields": "name,op,db,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;
}
}

SQL Server Source Connector

Lastly, the source_connector_mssql_kafka_avro_tickit source connector captures all changes to the single user table in the Amazon RDS for SQL Server ticket database’s crm schema. These changes are written to a corresponding Kafka topic as Avro-format messages: tickit.crm.user.

{
"name": "source_connector_mssql_kafka_avro_tickit",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.encrypt": "false",
"database.hostname": "${secretManager:demo/mssql:host}",
"database.names": "tickit",
"database.password": "${secretManager:demo/mssql:password}",
"database.port": 1433,
"database.user": "${secretManager:demo/mssql:username}",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;,
"schema.name.adjustment.mode": "avro",
"schema.history.internal.kafka.bootstrap.servers": "${secretManager:demo/mssql:bootstrap.servers}",
"schema.history.internal.kafka.topic": "schemahistory.tickit",
"table.include.list": "crm.user",
"tasks.max": 1,
"topic.prefix": "tickit",
"transforms": "unwrap",
"transforms.unwrap.add.fields": "name,op,db,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;
}
}

If using a self-managed version of Kafka Connect, we can deploy, manage, and monitor the source and sink connectors using Kafka Connect’s RESTful API (Connect API).

Using Kafka Connect’s RESTful API to confirm all connectors are running

Once the three source connectors are running, we should see seven new Kafka topics corresponding to the seven TICKIT database tables from the three Amazon RDS database sources. There will be approximately 425K messages consuming 147 MB of space in Avro’s binary format.

Seven Kafka topics representing each table within the three databases

Avro’s binary format and the use of a separate schema ensure the messages consume minimal Kafka storage space. For example, the average message Value (payload) size in the tickit.ecomm.sale topic is a minuscule 98 Bytes. Resource management is critical when you are dealing with hundreds of millions or often billions of daily Kafka messages as a result of the CDC process.

Analysis results of the tickit.ecomm.sale topic

From within the Apicurio Registry UI, we should now see Value schema artifacts for each of the seven Kafka topics.

Apicurio Registry UI showing the Kafka topic’s Avro message’s value schemas

Also from within the Apicurio Registry UI, we can examine details of individual Value schema artifacts.

Apicurio Registry UI showing the tickit.ecomm.sale topic’s value schemas

Kafka Connect Sink Connector

In addition to the three source connectors, there is a single Kafka Connect sink connector, sink_connector_kafka_s3_avro_tickit. The sink connector copies messages from the seven Kafka topics, all prefixed with tickit, to the Bronze area of our Amazon S3-based data lake.

Sink connector Kafka consumer, shown in the Kafka UI, consumer messages from seven topics

The connector uses Confluent’s S3 Sink Connector (S3SinkConnector). Like Kafka, the connector writes all changes to Amazon S3 in Apache Avro format, with the schemas already stored in Apicurio Registry.

{
"name": "sink_connector_kafka_s3_avro_tickit",
"config": {
"behavior.on.null.values": "ignore",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"flush.size": 10000,
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;,
"locale": "en-US",
"partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
"rotate.schedule.interval.ms": 60000,
"s3.bucket.name": "<your_data_lake_s3_bucket>",
"s3.part.size": 5242880,
"s3.region": "us-east-1",
"schema.compatibility": "NONE",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"table.name.format": "${topic}",
"tasks.max": 1,
"timestamp.extractor": "Wallclock",
"timezone": "UTC",
"topics.dir": "cdc_hudi_data_lake/bronze",
"topics.regex": "tickit.(.*)",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;
}
}

Once the sink connector and the three source connectors are running with Kafka Connect, we should see a series of seven subdirectories within the Bronze area of the data lake.

Raw CDC data in the Bronze area of the data lake

Confluent offers a choice of data partitioning strategies. The sink connector we have deployed uses the Daily Partitioner. According to the documentation, the io.confluent.connect.storage.partitioner.DailyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd and partition.duration.ms=86400000 (one day for one S3 object in each daily directory). Below is an example of Avro files (S3 objects) containing changes to the sale table. The objects are partitioned by the yearmonth, and day they were written to the S3 bucket.

Partitioned Avro-formatted data in the data lakes

Message Transformation

Previously, while discussing the Kafka Connect source connectors, I mentioned that the connector transforms the messages using the unwrap transform. By default, the changes picked up by Debezium during the CDC process contain several additional Debezium-related fields of data. An example of an untransformed message generated by Debezium from the PostgreSQL sale table is shown below.

When the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a consistent snapshot of each database schema. All existing records are captured. Unlike an UPDATE ("op" : "u") or a DELETE ("op" : "d"), these initial snapshot messages, like the one shown below, represent a READ operation ("op" : "r"). As a result, there is no before data only after.

{
"before" : null,
"after" : {
"full.ecomm.sale.Value" : {
"salesid" : 1,
"listid" : 1,
"sellerid" : 36861,
"buyerid" : 21191,
"eventid" : 7872,
"dateid" : 1875,
"qtysold" : 4,
"pricepaid" : 728.0,
"commission" : 109.2,
"saletime" : "2/18/2020 02:36:48"
}
},
"source" : {
"version" : "2.1.2.Final",
"connector" : "mysql",
"name" : "full",
"ts_ms" : 1677415227000,
"snapshot" : {
"string" : "first_in_data_collection"
},
"db" : "ecomm",
"sequence" : null,
"table" : {
"string" : "sale"
},
"server_id" : 0,
"gtid" : null,
"file" : "mysql-bin-changelog.074478",
"pos" : 154,
"row" : 0,
"thread" : null,
"query" : null
},
"op" : "r",
"ts_ms" : {
"long" : 1677415227078
},
"transaction" : null
}

According to Debezium’s documentation, “Debezium provides a single message transformation [SMT] that crosses the bridge between the complex and simple formats, the UnwrapFromEnvelope SMT.” Below, we see the results of Debezium’s unwrap transform of the same message. The unwrap transform flattens the nested JSON structure of the original message, adds the __deleted field, and includes fields based on the list included in the source connector’s configuration. These fields are prefixed with a double underscore (e.g., __table).

{
"salesid" : 1,
"listid" : 1,
"sellerid" : 36861,
"buyerid" : 21191,
"eventid" : 7872,
"dateid" : 1875,
"qtysold" : 4,
"pricepaid" : 728.0,
"commission" : 109.2,
"saletime" : "2/18/2020 02:36:48",
"__name" : {
"string" : "tickit"
},
"__op" : {
"string" : "r"
},
"__db" : {
"string" : "ecomm"
},
"__table" : {
"string" : "sale"
},
"__source_ts_ms" : {
"long" : 1677379962000
},
"__deleted" : {
"string" : "false"
}
}

In the next section, we examine Apache Hudi will manage the data lake. An example of the same message, managed with Hudi, is shown below. Note the five additional Hudi data fields, prefixed with _hoodie_.

{
"_hoodie_commit_time": "20230226135257126",
"_hoodie_commit_seqno": "20230226135257126_0_69229",
"_hoodie_record_key": "1",
"_hoodie_partition_path": "__table=sale",
"_hoodie_file_name": "773bbbb7-b6ec-4ade-9f08-942726584e42-0_0-35-50_20230226135257126.parquet",
"salesid": 1,
"listid": 1,
"sellerid": 36861,
"buyerid": 21191,
"eventid": 7872,
"dateid": 1875,
"qtysold": 4,
"pricepaid": 728,
"commission": 109.2,
"saletime": "2/18/2020 02:36:48",
"__name": "tickit",
"__op": "r",
"__db": "ecomm",
"__table": "sale",
"__source_ts_ms": 1677418032000,
"__deleted": "false"
}

Apache Hudi

With database changes flowing into the Bronze area of our data lake, we are ready to use Apache Hudi to provide data lake capabilities such as ACID transactional guarantees, time travel, rollback, and schema evolution. Using Hudi allows us to store and perform analytics on a specific time-bound version of the data in the data lake. Without Hudi, a query for a single row of data could return multiple results, such as an initial CREATE record, multiple UPDATE records, and a DELETE record.

DeltaStreamer

Using Hudi’s DeltaStreamer, we will continuously stream changes from the Bronze area of the data lake to a Silver area, which Hudi manages. We will run DeltaStreamer continuously, similar to an Apache Spark Structured Streaming job, using Apache Spark on Amazon EMR. Running DeltaStreamer requires using a spark-submit command that references a series of configuration files. There is a common base set of configuration items and a group of configuration items specific to each database table. Below, we see the base configuration, base.properties.

hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.hive_sync.mode=hms
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=tickit_cdc_hudi
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.write.hive_style_partitioning=true
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
hoodie.index.type=GLOBAL_BLOOM
hoodie.bloom.index.update.partition.path=true
view raw base.properties hosted with ❤ by GitHub

The base configuration is referenced by each of the table-specific configuration files. For example, below, we see the tickit.ecomm.sale.properties configuration file for DeltaStreamer.

include=base.properties
hoodie.datasource.hive_sync.partition_fields=__table
hoodie.datasource.hive_sync.table=sale
hoodie.datasource.write.partitionpath.field=__table
hoodie.datasource.write.recordkey.field=salesid
hoodie.deltastreamer.schemaprovider.registry.url=http://http://<your_registry_host>:<port>/apis/ccompat/v6/subjects/tickit.ecomm.sale-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_s3_bucket>/cdc_hudi_data_lake/bronze/tickit.ecomm.sale/

To run DeltaStreamer, you can submit an EMR Step or use EMR’s master node to run a spark-submit command on the cluster. Below, we see an example of the DeltaStreamer spark-submit command using the Merge on Read (MoR) Hudi table type.

DATA_LAKE_BUCKET="<your_data_lake_s3_bucket>"
TARGET_TABLE="tickit.ecomm.sale"
spark-submit \
–name %{TARGET_TABLE} \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–conf spark.yarn.submit.waitAppCompletion=false \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /usr/lib/hudi/hudi-utilities-bundle.jar` \
–props file://${PWD}/${TARGET_TABLE}.properties \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–target-table ${TARGET_TABLE} \
–target-base-path s3://${DATA_LAKE_BUCKET}/cdc_hudi_data_lake/silver/${TARGET_TABLE}/ \
–enable-sync \
–continuous \
–op UPSERT \
> ${TARGET_TABLE}.log 2>&1 &

Next, we see the same example of the DeltaStreamer spark-submit command using the Copy on Write (CoW) Hudi table type.

DATA_LAKE_BUCKET="<your_data_lake_s3_bucket>"
TARGET_TABLE="tickit.ecomm.sale"
spark-submit \
–name %{TARGET_TABLE} \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–conf spark.yarn.submit.waitAppCompletion=false \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /usr/lib/hudi/hudi-utilities-bundle.jar` \
–props file://${PWD}/${TARGET_TABLE}.properties \
–table-type COPY_ON_WRITE \
–source-ordering-field __source_ts_ms \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–target-table ${TARGET_TABLE} \
–target-base-path s3://${DATA_LAKE_BUCKET}/cdc_hudi_data_lake/silver/${TARGET_TABLE}/ \
–enable-sync \
–continuous \
–op UPSERT \
> ${TARGET_TABLE}.log 2>&1 &

For this post’s demonstration, we will run a single-table DeltaStreamer Spark job, with one process for each table using CoW. Alternately, Hudi offers HoodieMultiTableDeltaStreamer, a wrapper on top of HoodieDeltaStreamer, which enables the ingestion of multiple tables at a single time into Hudi datasets. Currently, HoodieMultiTableDeltaStreamer only supports sequential processing of tables to be ingested and COPY_ON_WRITE storage type.

Below, we see an example of three DeltaStreamer Spark jobs running continuously for the datelisting, and sale tables.

Using the Spark UI (History Server) to view active DeltaStreamer [Spark] jobs

Once DeltaStreamer is up and running, we should see a series of subdirectories within the Silver area of the data lake, all managed by Apache Hudi.

Silver area of the data lake, managed by Apache Hudi

Within each subdirectory, partitioned by the table name, is a series of Apache Parquet files, along with other Apache Hudi-specific files. The specific folder structure and files depend on MoR or Cow.

Example of Apache Parquet files managed by Hudi in Amazon S3

AWS Glue Data Catalog

For this post, we are using an AWS Glue Data Catalog, an Apache Hive-compatible metastore, to persist technical metadata stored in the Silver area of the data lake, managed by Apache Hudi. The AWS Glue Data Catalog database, tickit_cdc_hudi, will be automatically created the first time DeltaStreamer runs.

Using DeltaStreamer with the table type of MERGE_ON_READ, there would be two tables in the AWS Glue Data Catalog database for each original table. According to Amazon’s EMR documentation, “Merge on Read (MoR) — Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files.” Hudi creates two tables in the Hive metastore for MoR, a table with the name that you specified, which is a read-optimized view (appended with _ro), and a table with the same name appended with _rt, which is a real-time view. You can query both tables.

AWS Glue Data Catalog showing the Apache Hudi Merge on Read (MoR) tables

According to Amazon’s EMR documentation, “Copy on Write (CoW) — Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write. CoW is the default storage type.” Using COPY_ON_WRITE with DeltaStreamer, there is only a single Hudi table in the AWS Glue Data Catalog database for each corresponding database table.

AWS Glue Data Catalog showing the Apache Hudi Copy on Write (CoW) tables

Examining an individual table in the AWS Glue Data Catalog database, we can see details like the table schema, location of underlying data in S3, input/output formats, Serde serialization library, and table partitions.

AWS Glue Data Catalog database showing the details of the sale table

Database Changes and Data Lake

We are using Kafka Connect, Apache Hudi, and Hudi’s DeltaStreamer to maintain data parity between our databases and the data lake. Let’s look at an example of how a simple data change is propagated from a database to Kafka, then to the Bronze area of the data lake, and finally to the Hudi-managed Silver area of the data lake, written using the CoW table format.

First, we will make a simple update to a single record in the MySQL database’s sale table with the salesid = 200.

UPDATE ecomm.sale s
SET s.buyerid = 11694
s.qtysold = 3,
s.pricepaid = 600.00,
s.commission = 90.00
WHERE s.salesid = 200;
view raw sale_update.sql hosted with ❤ by GitHub
Database update made at 2023–02–27 03:16:59 UTC

Almost immediately, we can see the change picked up by the Kafka Connect Debezium MySQL source connector.

Database update logged by source connector at 2023–02–27 03:16:59 UTC

Almost immediately, in the Bronze area of the data lake, we can see we have a new Avro-formatted file (S3 object) containing the updated record in the partitioned sale subdirectory.

New S3 object containing updated record created at 2023–02–27 03:17:07 UTC

If we examine the new object in the Bronze area of the data lake, we will see a row representing the updated record with the salesid = 200. Note how the operation is now an UPDATE versus a READ ("op" : "u").

{
"salesid" : 200,
"listid" : 214,
"sellerid" : 31484,
"buyerid" : 11694,
"eventid" : 3272,
"dateid" : 1891,
"qtysold" : 4,
"pricepaid" : 600.0,
"commission" : 90.0,
"saletime" : "3/6/2020 02:18:11",
"__name" : {
"string" : "tickit"
},
"__op" : {
"string" : "u"
},
"__db" : {
"string" : "ecomm"
},
"__table" : {
"string" : "sale"
},
"__source_ts_ms" : {
"long" : 1677467818000
},
"__deleted" : {
"string" : "false"
}
}

Next, in the corresponding Silver area of the data lake, managed by Hudi, we should also see a new Parquet file that contains a record of the change. In this example, the file was written approximately 26 seconds after the original change to the database. This is the end-to-end time from database change to when the updated data is queryable in the data lake.

New Hudi-managed S3 object containing updated record created at 2023–02–27 03:17:25 UTC

Similarly, if we examine the new object in the Silver area of the data lake, we will see a row representing the updated record with the salesid = 200. The record was committed approximately 15 seconds after the original database change.

{
"_hoodie_commit_time": "20230227031713915",
"_hoodie_commit_seqno": "20230227031713915_0_2168",
"_hoodie_record_key": "200",
"_hoodie_partition_path": "__table=sale",
"_hoodie_file_name": "cce62f4e-a111-4aae-bfd1-2c5af1c6bdeb-0_0-82-84_20230227031713915.parquet",
"salesid": 200,
"listid": 214,
"sellerid": 31484,
"buyerid": 11694,
"eventid": 3272,
"dateid": 1891,
"qtysold": 4,
"pricepaid": 600,
"commission": 90,
"saletime": "3/6/2020 02:18:11",
"__name": "tickit",
"__op": "u",
"__db": "ecomm",
"__table": "sale",
"__source_ts_ms": 1677467818000,
"__deleted": "false"
}
New Hudi-managed S3 object containing updated record created at 2023–02–27 03:17:13.915 UTC

Querying Hudi Tables with Amazon EMR

Using an EMR Notebook, we can query the updated database record stored in Amazon S3 using Hudi’s CoW table format. First, running a simple Spark SQL query for the record with salesid = 200, returns the latest record, reflecting the changes as indicated by the UPDATE operation value (u) and the _hoodie_commit_time = 2023-02-27 03:17:13.915 UTC.

Hudi Time Travel Query

We can also run a Hudi Time Travel Query, with an as.of.instance set to some arbitrary point in the future. Again, the latest record is returned, reflecting the changes as indicated by the UPDATE operation value (u) and the _hoodie_commit_time = 2023-02-27 03:17:13.915 UTC.

We can run the same Hudi Time Travel Query with an as.of.instance set to on or after the original records were written by DeltaStreamer (_hoodie_commit_time = 2023-02-27 03:13:50.642), but some arbitrary time before the updated record was written (_hoodie_commit_time = 2023-02-27 03:17:13.915). This time, the original record is returned as indicated by the READ operation value (r) and the _hoodie_commit_time = 2023-02-27 03:13:50.642. This is one of the strengths of Apache Hudi, the ability to query data in the present or at any point in the past.

Hudi Point in Time Query

We can also run a Hudi Point in Time Query, with a time range set from the beginning of time until some arbitrary point in the future. Again, the latest record is returned, reflecting the changes as indicated by the UPDATE operation value (u) and the _hoodie_commit_time = 2023-02-27 03:17:13.915.

We can run the same Hudi Point in Time Query but change the time range to two arbitrary time values, both before the updated record was written (_hoodie_commit_time = 2023-02-26 22:39:07.203). This time, the original record is returned as indicated by the READ operation value (r) and the _hoodie_commit_time of 2023-02-27 03:13:50.642. Again, this is one of the strengths of Apache Hudi, the ability to query data in the present or at any point in the past.

Querying Hudi Tables with Amazon Athena

We can also use Amazon Athena and run a SQL query against the AWS Glue Data Catalog database’s sale table to view the updated database record stored in Amazon S3 using Hudi’s CoW table format. The operation (op) column’s value now indicates an UPDATE (u).

Amazon Athena query showing the database update reflected in the data lake managed by Hudi

How are Deletes Handled?

In this post’s architecture, deleted database records are signified with an operation ("op") field value of "d" for a DELETE and a __deleted field value of true.

https://itnext.io/media/f7c4ad62dcd45543fdf63834814a5aab

Back to our previous Jupyter notebook example, when rerunning the Spark SQL query, the latest record is returned, reflecting the changes as indicated by the DELETE operation value (d) and the _hoodie_commit_time = 2023-02-27 03:52:13.689.

Alternatively, we could use an additional Spark SQL filter statement to prevent deleted records from being returned (e.g., df.__op != "d").

Since we only did what is referred to as a soft delete in Hudi terminology, we can run a time travel query with an as.of.instance set to some arbitrary time before the deleted record was written (_hoodie_commit_time = 2023-02-27 03:52:13.689). This time, the original record is returned as indicated by the READ operation value (r) and the _hoodie_commit_time = 2023-02-27 03:13:50.642. We could also use a later as.of.instance to return the version of the record reflecting the the UPDATE operation value (u). This also applies to other query types such as point-in-time queries.

Conclusion

In this post, we learned how to build a near real-time transactional data lake on AWS using one possible architecture. The data lake was built using a combination of open source software (OSS) and fully-managed AWS services. Red Hat’s Debezium, Apache Kafka, and Kafka Connect were used for change data capture (CDC). In addition, Apache Spark, Apache Hudi, and Hudi’s DeltaStreamer were used to manage the data lake. To complete our architecture, we used several fully-managed AWS services, including Amazon RDS, Amazon MKS, Amazon EKS, AWS Glue, and Amazon EMR.

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

Earning the AWS Certified Machine Learning — Specialty (MLS-C01) Certification

Introduction

Recently, I earned the AWS Certified Machine Learning — Specialty (MLS-C01) Certification, my ninth AWS certification. Since a few colleagues asked me about my preparation, I thought I would share it with the community, without divulging any details of the exam, of course.

Prerequisite Experience

Several AWS certifications can be earned with minimal to no hands-on AWS experience, but excellent short-term memorization skills. Although you will have technically earned the certification, you will certainly not be competent to practice the particular discipline. Certification does not equal qualification.

In my opinion, the AWS Certified Machine Learning — Specialty certification exam is not one of those where simple memorization of study materials, alone, will guarantee a passing score. If you lack practical experience in data science, machine learning, basic statistics, or data analytics on AWS, you will be challenged to pass this exam, no matter how much you cram.

Consider Data Analytics Certification First

To prepare for the Machine Learning — Specialty exam, I would strongly suggest first earning the AWS Certified Data Analytics — Specialty certification. According to the Machine Learning — Specialty exam’s content outline, “Domain 1: Data Engineering”, accounts for 20% of the exam’s score. Understanding the AWS Analytics services and how they integrate to form the most efficient data pipelines to feed your Machine Learning model training is a requirement for this portion of the exam’s questions. Preparing for the Data Analytics — Specialty certification will provide this adjacent domain knowledge:

  • Amazon Athena
  • Amazon EMR (pka Amazon Elastic MapReduce)
  • Amazon IAM
  • Amazon Kinesis Data Analytics, Data Firehose, Data Streams, Video Streams
  • Amazon Redshift
  • Amazon S3
  • Amazon VPC
  • AWS Data Pipeline
  • AWS Glue Crawlers, Jobs, Data Catalog
  • AWS Lambda
  • AWS Step Functions
My copious notes taken while preparing for the exam.
Take copious notes and review them right before taking the exam

Study Materials

In my case, certification success was a result of practical experience, coursework, completing and reviewing the results of several practice exams, and taking lots of notes. The following is a list of the study materials I found most impactful:

Documentation

I reviewed the Amazon SageMaker and other AWS fully-managed AI/ML service documentation for my preparation.

Carefully review the Choose an Algorithm section of the Amazon SageMaker Developer Guide. According to the exam’s content outline, “Domain 3: Modeling” accounts for 36% of the exam’s score. Understand 1) recommended use cases for each of SageMaker’s built-in algorithms, 2) the algorithm’s required hyperparameters, and 3) the prescribed model evaluation metrics and tuning techniques. Built-in SageMaker algorithms most commonly covered in most training materials include:

  • Tabular
    • XGBoost (eXtreme Gradient Boosting)
    • Linear Learner
    • K-Nearest Neighbors (KNN)
    • Factorization Machines
    • Object2Vec
  • Vision
    • Image Classification
    • Object Detection
    • Semantic Segmentation
  • Clustering
    • K-Means
  • Time-Series Forecast
    • DeepAR
  • Text Classification & Embedding
    • BlazingText
  • Text Transformation
    • Sequence-to-Sequence (Seq2Seq)
  • Text Topic Modeling
    • Neural Topic Modeling (NTM)
    • Latent Dirichlet Allocation (LDA)
  • Dimensionality Reduction
    • Principal Component Analysis (PCA)
  • Anomaly Detection
    • Random Cut Forest (RCF)
    • IP Insights

AWS also uses Read the Docs. SageMaker’s Algorithm section is especially helpful with respect to preparing for the Machine Learning — Specialty exam: image processing, text processing, time-series processing, supervised learning, unsupervised learning, and feature engineering algorithms.

Along with algorithms, review SageMaker’s Deploy Models for Inference documentation. According to the exam’s content outline, “Domain 4: Machine Learning Implementation and Operations” accounts for 20% of the exam’s score. Understand SageMaker’s options for model serving, model versioning, deployment strategies, and endpoint monitoring.

Review the AWS fully managed AI/ML services Developer Guide documentation for the following services:

  • Amazon Augmented AI
  • Amazon CodeGuru
  • Amazon Comprehend
  • Amazon Forecast
  • Amazon Fraud Detector
  • Amazon Kendra
  • Amazon Lex
  • Amazon Personalize
  • Amazon Polly
  • Amazon Rekognition
  • Amazon Textract
  • Amazon Transcribe
  • Amazon Translate

Understand the use cases for each of these services and most critically, how these managed services can be combined to create more complex AI/ML solutions. For example, building a near-real-time speech-to-speech translator with Amazon Transcribe, Amazon Translate, and Amazon Polly.

Online Courses

For my preparation, I completed three Udemy courses. Most of these online courses regularly go on sale and be purchased for $25 or less:

Udemy courses recommended in post.
Recommended Udemy online courses

Books

For my preparation, I read or re-read three books, two from Packt and one from O’Reilly:

  • AWS Certified Machine Learning Specialty: MLS-C01 Certification Guide, by Somanath Nanda and Weslley Moura (Packt Publishing). I recommend this one if you only have time to read a single book.
  • Practical Statistics for Data Scientists, 2nd Edition, by Peter Bruce, Andrew Bruce, Peter Gedeck (O’Reilly Media). According to the University of San Diego, “Statistics (or statistical analysis) is core to every machine learning algorithm.” This book covers many of the core statistical concepts behind Machine Learning, covered on the exam:
    • BLEU
    • Classification metrics: Precision-Recall Curve, ROC Curve, AUC
    • Confusion Matrix: TP, FP, TN, FN, Accuracy, Precision, Recall (Sensitivity), Specificity, F1
    • Correlated variables, Multicollinearity
    • Distributions: Normal (Gaussian or “bell curve”), Bernoulli, Binomial, Poisson
    • Elbow Method
    • Ensemble Learning: Bagging, Boosting
    • Euclidean Distance
    • K-Fold Cross-Validation
    • L1/L2 Regularization (lasso, alpha, ridge, lambda)
    • Overfitting, Underfitting, High Bias, High Variance, Bias-Variance Tradeoff
    • Plots: Histograms, Boxplots, Scatterplots
    • Regression metrics: MAE, MSE, RMSE, R-squared, Adjusted R-squared
    • Residuals
    • SMOTE
    • Standard Deviation, Three-Sigma/Empirical/68–95–99.7 Rule
    • Z-score
  • Python Machine Learning — Third Edition, by Sebastian Raschka and Vahid Mirjalili (Packt Publishing). Note that this book dives much deeper into the low-level statistical underpinnings of machine learning than is required for the exam, based on the exam outline. Again, don’t get caught up in the nitty-gritty details of Python; focus on the higher-level machine learning principles.

Scheduling the Exam

One last tip regarding when to take your exam. I have taken 15 AWS exams between nine AWS certifications and several recertifications. Although the Certified Machine Learning — Specialty exam is difficult, I found changing the time I sat the exam, greatly reduced my stress level. In the past, I took time off on a workday to complete exams, either in person or at home using online proctoring. I was preparing for the exam while frequently being interrupted by work-related items. For this exam, I chose to use online proctoring and took my exam at 6:00 AM on a Sunday morning. Up early, fresh, and full of energy, with no work- or family-related interruptions, no lawnmowers, dogs barking, or garbage trucks rumbling by, and no Internet bandwidth issues. I was done by 9:00 AM and eating breakfast with the family.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Exploring Popular Open-source Stream Processing Technologies: Part 2 of 2

A brief demonstration of Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot with Apache Superset

Introduction

According to TechTarget, “Stream processing is a data management technique that involves ingesting a continuous data stream to quickly analyze, filter, transform or enhance the data in real-time. Once processed, the data is passed off to an application, data store, or another stream processing engine.Confluent, a fully-managed Apache Kafka market leader, defines stream processing as “a software paradigm that ingests, processes, and manages continuous streams of data while they’re still in motion.

This two-part post series and forthcoming video explore four popular open-source software (OSS) stream processing projects: Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot.

This post uses the open-source projects, making it easier to follow along with the demonstration and keeping costs to a minimum. However, you could easily substitute the open-source projects for your preferred SaaS, CSP, or COSS service offerings.

Part Two

We will continue our exploration in part two of this two-part post, covering Apache Flink and Apache Pinot. In addition, we will incorporate Apache Superset into the demonstration to visualize the real-time results of our stream processing pipelines as a dashboard.

Demonstration #3: Apache Flink

In the third demonstration of four, we will examine Apache Flink. For this part of the post, we will also use the third of the three GitHub repository projects, flink-kafka-demo. The project contains a Flink application written in Java, which performs stream processing, incremental aggregation, and multi-stream joins.

High-level workflow for Apache Flink demonstration

New Streaming Stack

To get started, we need to replace the first streaming Docker Swarm stack, deployed in part one, with the second streaming Docker Swarm stack. The second stack contains Apache Kafka, Apache Zookeeper, Apache Flink, Apache Pinot, Apache Superset, UI for Apache Kafka, and Project Jupyter (JupyterLab).

https://programmaticponderings.wordpress.com/media/601efca17604c3a467a4200e93d7d3ff

The stack will take a few minutes to deploy fully. When complete, there should be ten containers running in the stack.

Viewing the Docker streaming stack’s ten containers

Flink Application

The Flink application has two entry classes. The first class, RunningTotals, performs an identical aggregation function as the previous KStreams demo.

public static void flinkKafkaPipeline(Properties prop) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// assumes PLAINTEXT authentication
KafkaSource<Purchase> source = KafkaSource.<Purchase>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PURCHASES_TOPIC"))
.setGroupId("flink_reduce_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new PurchaseDeserializationSchema())
.build();
DataStream<Purchase> purchases = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<RunningTotal> runningTotals = purchases
.flatMap((FlatMapFunction<Purchase, RunningTotal>) (purchase, out) -> out.collect(
new RunningTotal(
purchase.getTransactionTime(),
purchase.getProductId(),
1,
purchase.getQuantity(),
purchase.getTotalPurchase()
))
).returns(RunningTotal.class)
.keyBy(RunningTotal::getProductId)
.reduce((runningTotal1, runningTotal2) -> {
runningTotal2.setTransactions(runningTotal1.getTransactions() + runningTotal2.getTransactions());
runningTotal2.setQuantities(runningTotal1.getQuantities() + runningTotal2.getQuantities());
runningTotal2.setSales(runningTotal1.getSales().add(runningTotal2.getSales()));
return runningTotal2;
});
KafkaSink<RunningTotal> sink = KafkaSink.<RunningTotal>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(prop.getProperty("RUNNING_TOTALS_TOPIC"))
.setValueSerializationSchema(new RunningTotalSerializationSchema())
.build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
runningTotals.sinkTo(sink);
env.execute("Flink Running Totals Demo");
}

The second class, JoinStreams, joins the stream of data from the demo.purchases topic and the demo.products topic, processing and combining them, in real-time, into an enriched transaction and publishing the results to a new topic, demo.purchases.enriched.

public static void flinkKafkaPipeline(Properties prop) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// assumes PLAINTEXT authentication
KafkaSource<Product> productSource = KafkaSource.<Product>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PRODUCTS_TOPIC"))
.setGroupId("flink_join_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new ProductDeserializationSchema())
.build();
DataStream<Product> productsStream = env.fromSource(
productSource, WatermarkStrategy.noWatermarks(), "Kafka Products Source");
tableEnv.createTemporaryView("products", productsStream);
KafkaSource<Purchase> purchasesSource = KafkaSource.<Purchase>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PURCHASES_TOPIC"))
.setGroupId("flink_join_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new PurchaseDeserializationSchema())
.build();
DataStream<Purchase> purchasesStream = env.fromSource(
purchasesSource, WatermarkStrategy.noWatermarks(), "Kafka Purchases Source");
tableEnv.createTemporaryView("purchases", purchasesStream);
Table result =
tableEnv.sqlQuery(
"SELECT " +
"purchases.transactionTime, " +
"TO_TIMESTAMP(purchases.transactionTime), " +
"purchases.transactionId, " +
"purchases.productId, " +
"products.category, " +
"products.item, " +
"products.size, " +
"products.cogs, " +
"products.price, " +
"products.containsFruit, " +
"products.containsVeggies, " +
"products.containsNuts, " +
"products.containsCaffeine, " +
"purchases.price, " +
"purchases.quantity, " +
"purchases.isMember, " +
"purchases.memberDiscount, " +
"purchases.addSupplements, " +
"purchases.supplementPrice, " +
"purchases.totalPurchase " +
"FROM " +
"products " +
"JOIN purchases " +
"ON products.productId = purchases.productId"
);
DataStream<PurchaseEnriched> purchasesEnrichedTable = tableEnv.toDataStream(result,
PurchaseEnriched.class);
KafkaSink<PurchaseEnriched> sink = KafkaSink.<PurchaseEnriched>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(prop.getProperty("PURCHASES_ENRICHED_TOPIC"))
.setValueSerializationSchema(new PurchaseEnrichedSerializationSchema())
.build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
purchasesEnrichedTable.sinkTo(sink);
env.execute("Flink Streaming Join Demo");
}

The resulting enriched purchases messages look similar to the following:

{
"transaction_time": "2022-09-25 01:58:11.714838",
"transaction_id": "4068565608708439642",
"product_id": "CS08",
"product_category": "Classic Smoothies",
"product_name": "Rockin’ Raspberry",
"product_size": "24 oz.",
"product_cogs": 1.5,
"product_price": 4.99,
"contains_fruit": true,
"contains_veggies": false,
"contains_nuts": false,
"contains_caffeine": false,
"purchase_price": 4.99,
"purchase_quantity": 2,
"is_member": false,
"member_discount": 0,
"add_supplements": false,
"supplement_price": 0,
"total_purchase": 9.98
}
Sample enriched purchase message

Running the Flink Job

To run the Flink application, we must first compile it into an uber JAR.

We can copy the JAR into the Flink container or upload it through the Apache Flink Dashboard, a browser-based UI. For this demonstration, we will upload it through the Apache Flink Dashboard, accessible on port 8081.

The project’s build.gradle file has preset the Main class (Flink’s Entry class) to org.example.JoinStreams. Optionally, to run the Running Totals demo, we could change the build.gradle file and recompile, or simply change Flink’s Entry class to org.example.RunningTotals.

Uploading the JAR to Apache Flink

Before running the Flink job, restart the sales generator in the background (nohup python3 ./producer.py &) to generate a new stream of data. Then start the Flink job.

Apache Flink job running successfully

To confirm the Flink application is running, we can check the contents of the new demo.purchases.enriched topic using the Kafka CLI.

The new demo.purchases.enriched topic populated with messages from Apache Flink

Alternatively, you can use the UI for Apache Kafka, accessible on port 9080.

Viewing messages in the UI for Apache Kafka

Demonstration #4: Apache Pinot

In the fourth and final demonstration, we will explore Apache Pinot. First, we will query the unbounded data streams from Apache Kafka, generated by both the sales generator and the Apache Flink application, using SQL. Then, we build a real-time dashboard in Apache Superset, with Apache Pinot as our datasource.

Creating Tables

According to the Apache Pinot documentation, “a table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot).” There are three types of Pinot tables: Offline, Realtime, and Hybrid. For this demonstration, we will create three Realtime tables. Realtime tables ingest data from streams — in our case, Kafka — and build segments from the consumed data. Further, according to the documentation, “each table in Pinot is associated with a Schema. A schema defines what fields are present in the table along with the data types. The schema is stored in Zookeeper, along with the table configuration.

Below, we see the schema and config for one of the three Realtime tables, purchasesEnriched. Note how the columns are divided into three categories: Dimension, Metric, and DateTime.

{
"schemaName": "purchasesEnriched",
"dimensionFieldSpecs": [
{
"name": "transaction_id",
"dataType": "STRING"
},
{
"name": "product_id",
"dataType": "STRING"
},
{
"name": "product_category",
"dataType": "STRING"
},
{
"name": "product_name",
"dataType": "STRING"
},
{
"name": "product_size",
"dataType": "STRING"
},
{
"name": "product_cogs",
"dataType": "FLOAT"
},
{
"name": "product_price",
"dataType": "FLOAT"
},
{
"name": "contains_fruit",
"dataType": "BOOLEAN"
},
{
"name": "contains_veggies",
"dataType": "BOOLEAN"
},
{
"name": "contains_nuts",
"dataType": "BOOLEAN"
},
{
"name": "contains_caffeine",
"dataType": "BOOLEAN"
},
{
"name": "purchase_price",
"dataType": "FLOAT"
},
{
"name": "is_member",
"dataType": "BOOLEAN"
},
{
"name": "member_discount",
"dataType": "FLOAT"
},
{
"name": "add_supplements",
"dataType": "BOOLEAN"
},
{
"name": "supplement_price",
"dataType": "FLOAT"
}
],
"metricFieldSpecs": [
{
"name": "purchase_quantity",
"dataType": "INT"
},
{
"name": "total_purchase",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [
{
"name": "transaction_time",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSSSSS",
"granularity": "1:MILLISECONDS"
}
]
}
Schema file for purchasesEnriched Realtime table
{
"tableName": "purchasesEnriched",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "transaction_time",
"timeType": "MILLISECONDS",
"schemaName": "purchasesEnriched",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "demo.purchases.enriched",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:29092",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.rows": "50000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {}
}
Config file for purchasesEnriched Realtime table

To begin, copy the three Pinot Realtime table schemas and configurations from the streaming-sales-generator GitHub project into the Apache Pinot Controller container. Next, use a docker exec command to call the Pinot Command Line Interface’s (CLI) AddTable command to create the three tables: products, purchases, and purchasesEnriched.

# copy pinot table schema and config files to pinot controller
CONTROLLER_CONTAINER=$(docker container ls –filter name=streaming-stack_pinot-controller.1 –format "{{.ID}}")
cd ~/streaming-sales-generator/apache_pinot_examples
docker cp configs_schemas/ ${CONTROLLER_CONTAINER}:/tmp/
# create three tables
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/purchases-config.json \
-schemaFile /tmp/configs_schemas/purchases-schema.json -exec
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/products-config.json \
-schemaFile /tmp/configs_schemas/products-schema.json -exec
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/purchases-enriched-config.json \
-schemaFile /tmp/configs_schemas/purchases-enriched-schema.json -exec

To confirm the three tables were created correctly, use the Apache Pinot Data Explorer accessible on port 9000. Use the Tables tab in the Cluster Manager.

Cluster Manager’s Tables tab shows the three Realtime tables and corresponding schemas

We can further inspect and edit the table’s config and schema from the Tables tab in the Cluster Manager.

Realtime table’s editable config and schema

The three tables are configured to read the unbounded stream of data from the corresponding Kafka topics: demo.products, demo.purchases, and demo.purchases.enriched.

Querying with Pinot

We can use Pinot’s Query Console to query the Realtime tables using SQL. According to the documentation, “Pinot provides a SQL interface for querying. It uses the [Apache] Calcite SQL parser to parse queries and uses MYSQL_ANSI dialect.

Schema and query results for the purchases table

With the generator still running, re-query the purchases table in the Query Console (select count(*) from purchases). You should notice the document count increasing each time you re-run the query since new messages are published to the demo.purchases topic by the sales generator.

If you do not observe the count increasing, ensure the sales generator and Flink enrichment job are running.

Query Console showing the purchases table’s document count continuing to increase

Table Joins?

It might seem logical to want to replicate the same multi-stream join we performed with Apache Flink in part three of the demonstration on the demo.products and demo.purchases topics. Further, we might presume to join the products and purchases realtime tables by writing a SQL statement in Pinot’s Query Console. However, according to the documentation, at the time of this post, version 0.11.0 of Pinot did not [currently] support joins or nested subqueries.

This current join limitation is why we created the Realtime table, purchasesEnriched, allowing us to query Flink’s real-time results in the demo.purchases.enriched topic. We will use both Flink and Pinot as part of our stream processing pipeline, taking advantage of each tool’s individual strengths and capabilities.

Note, according to the documentation for the latest release of Pinot on the main branch, “the latest Pinot multi-stage supports inner join, left-outer, semi-join, and nested queries out of the box. It is optimized for in-memory process and latency.” For more information on joins as part of Pinot’s new multi-stage query execution engine, read the documentation, Multi-Stage Query Engine.

Query showing results from the demo.purchases.enriched topic in real-time

Aggregations

We can perform real-time aggregations using Pinot’s rich SQL query interface. For example, like previously with Spark and Flink, we can calculate running totals for the number of items sold and the total sales for each product in real time.

Aggregating running totals for each product

We can do the same with the purchasesEnriched table, which will use the continuous stream of enriched transaction data from our Apache Flink application. With the purchasesEnriched table, we can add the product name and product category for richer results. Each time we run the query, we get real-time results based on the running sales generator and Flink enrichment job.

Aggregating running totals for each product

Query Options and Indexing

Note the reference to the Star-Tree index at the start of the SQL query shown above. Pinot provides several query options, including useStarTree (true by default).

Multiple indexing techniques are available in Pinot, including Forward Index, Inverted Index, Star-tree Index, Bloom Filter, and Range Index, among others. Each has advantages in different query scenarios. According to the documentation, by default, Pinot creates a dictionary-encoded forward index for each column.

SQL Examples

Here are a few examples of SQL queries you can try in Pinot’s Query Console:

products
SELECT
COUNT(product_id) AS product_count,
AVG(price) AS avg_price,
AVG(cogs) AS avg_cogs,
AVG(price) AVG(cogs) AS avg_gross_profit
FROM
products;
purchases
SELECT
product_id,
SUMPRECISION(quantity, 10, 0) AS quantity,
SUMPRECISION(total_purchase, 10, 2) AS sales
FROM
purchases
GROUP BY
product_id
ORDER BY
sales DESC;
purchasesEnriched
SELECT
product_id,
product_name,
product_category,
SUMPRECISION(purchase_quantity, 10, 0) AS quantity,
SUMPRECISION(total_purchase, 10, 2) AS sales
FROM
purchasesEnriched
GROUP BY
product_id,
product_name,
product_category
ORDER BY
sales DESC;

Troubleshooting Pinot

If have issues with creating the tables or querying the real-time data, you can start by reviewing the Apache Pinot logs:

CONTROLLER_CONTAINER=$(docker container ls –filter name=streaming-stack_pinot-controller.1 –format "{{.ID}}")
docker exec -it ${CONTROLLER_CONTAINER} cat logs/pinot-all.log
view raw pinot_logs.sh hosted with ❤ by GitHub

Real-time Dashboards with Apache Superset

To display the real-time stream of data produced results of our Apache Flink stream processing job and made queriable by Apache Pinot, we can use Apache Superset. Superset positions itself as “a modern data exploration and visualization platform.” Superset allows users “to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

According to the documentation, “Superset requires a Python DB-API database driver and a SQLAlchemy dialect to be installed for each datastore you want to connect to.” In the case of Apache Pinot, we can use pinotdb as the Python DB-API and SQLAlchemy dialect for Pinot. Since the existing Superset Docker container does not have pinotdb installed, I have built and published a Docker Image with the driver and deployed it as part of the second streaming stack of containers.

# Custom Superset build to add apache pinot driver
# Gary A. Stafford (2022-09-25)
# Updated: 2022-12-18
FROM apache/superset:66138b0ca0b82a94404e058f0cc55517b2240069
# Switching to root to install the required packages
USER root
# Find which driver you need based on the analytics database:
# https://superset.apache.org/docs/databases/installing-database-drivers
RUN pip install mysqlclient psycopg2-binary pinotdb
# Switching back to using the `superset` user
USER superset
view raw Dockerfile hosted with ❤ by GitHub

First, we much configure the Superset container instance. These instructions are documented as part of the Superset Docker Image repository.

# establish an interactive session with the superset container
SUPERSET_CONTAINER=$(docker container ls –filter name=streaming-stack_superset.1 –format "{{.ID}}")
# initialize superset (see superset documentation)
docker exec -it ${SUPERSET_CONTAINER} \
superset fab create-admin \
–username admin \
–firstname Superset \
–lastname Admin \
–email admin@superset.com \
–password sUp3rS3cREtPa55w0rD1
docker exec -it ${SUPERSET_CONTAINER} superset db upgrade
docker exec -it ${SUPERSET_CONTAINER} superset init

Once the configuration is complete, we can log into the Superset web-browser-based UI accessible on port 8088.

Home page of the Superset web-browser-based UI

Pinot Database Connection and Dataset

Next, to connect to Pinot from Superset, we need to create a Database Connection and a Dataset.

Creating a new database connection to Pinot

The SQLAlchemy URI is shown below. Input the URI, test your connection (‘Test Connection’), make sure it succeeds, then hit ‘Connect’.

pinot+http://pinot-broker:8099/query?controller=http://pinot-controller:9000

Next, create a Dataset that references the purchasesEnriched Pinot table.

Creating a new dataset allowing us access to the purchasesEnriched Pinot table

Modify the dataset’s transaction_time column. Check the is_temporal and Default datetime options. Lastly, define the DateTime format as epoch_ms.

Modifying the dataset’s transaction_time column

Building a Real-time Dashboard

Using the new dataset, which connects Superset to the purchasesEnriched Pinot table, we can construct individual charts to be placed on a dashboard. Build a few charts to include on your dashboard.

Example of a chart whose data source is the new dataset
List of charts that included on the dashboard

Create a new Superset dashboard and add the charts and other elements, such as headlines, dividers, and tabs.

Apache Superset dashboard displaying data from Apache Pinot Realtime table

We can apply a refresh interval to the dashboard to continuously query Pinot and visualize the results in near real-time.

Configuring a refresh interval for the dashboard

Conclusion

In this two-part post series, we were introduced to stream processing. We explored four popular open-source stream processing projects: Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot. Next, we learned how we could solve similar stream processing and streaming analytics challenges using different streaming technologies. Lastly, we saw how these technologies, such as Kafka, Flink, Pinot, and Superset, could be integrated to create effective stream processing pipelines.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , , , , ,

Leave a comment