Archive for category Enterprise Software Development

Monolith to Microservices: Refactoring Relational Databases

Exploring common patterns for refactoring relational database models as part of a microservices architecture

Introduction

There is no shortage of books, articles, tutorials, and presentations on migrating existing monolithic applications to microservices, nor designing new applications using a microservices architecture. It has been one of the most popular IT topics for the last several years. Unfortunately, monolithic architectures often have equally monolithic database models. As organizations evolve from monolithic to microservices architectures, refactoring the application’s database model is often overlooked or deprioritized. Similarly, as organizations develop new microservices-based applications, they frequently neglect to apply a similar strategy to their databases.

The following post will examine several basic patterns for refactoring relational databases for microservices-based applications.

Terminology

Monolithic Architecture

A monolithic architecture is “the traditional unified model for the design of a software program. Monolithic, in this context, means composed all in one piece.” (TechTarget). A monolithic application “has all or most of its functionality within a single process or container, and it’s componentized in internal layers or libraries” (Microsoft). A monolith is usually built, deployed, and upgraded as a single unit of code.

Microservices Architecture

A microservices architecture (aka microservices) refers to “an architectural style for developing applications. Microservices allow a large application to be separated into smaller independent parts, with each part having its own realm of responsibility” (Google Cloud).

According to microservices.io, the advantages of microservices include:

  • Highly maintainable and testable
  • Loosely coupled
  • Independently deployable
  • Organized around business capabilities
  • Owned by a small team
  • Enables rapid, frequent, and reliable delivery
  • Allows an organization to [more easily] evolve its technology stack

Database

A database is “an organized collection of structured information, or data, typically stored electronically in a computer system” (Oracle). There are many types of databases. The most common database engines include relational, NoSQL, key-value, document, in-memory, graph, time series, wide column, and ledger.

PostgreSQL

In this post, we will use PostgreSQL (aka Postgres), a popular open-source object-relational database. A relational database is “a collection of data items with pre-defined relationships between them. These items are organized as a set of tables with columns and rows. Tables are used to hold information about the objects to be represented in the database” (AWS).

Amazon RDS for PostgreSQL

We will use the fully managed Amazon RDS for PostgreSQL in this post. Amazon RDS makes it easy to set up, operate, and scale PostgreSQL deployments in the cloud. With Amazon RDS, you can deploy scalable PostgreSQL deployments in minutes with cost-efficient and resizable hardware capacity. In addition, Amazon RDS offers multiple versions of PostgreSQL, including the latest version used for this post, 14.2.

The patterns discussed here are not specific to Amazon RDS for PostgreSQL. There are many options for using PostgreSQL on the public cloud or within your private data center. Alternately, you could choose Amazon Aurora PostgreSQL-Compatible Edition, Google Cloud’s Cloud SQL for PostgreSQL, Microsoft’s Azure Database for PostgreSQLElephantSQL, or your own self-manage PostgreSQL deployed to bare metal servers, virtual machine (VM), or container.

Database Refactoring Patterns

There are many ways in which a relational database, such as PostgreSQL, can be refactored to optimize efficiency in microservices-based application architectures. As stated earlier, a database is an organized collection of structured data. Therefore, most refactoring patterns reorganize the data to optimize for an organization’s functional requirements, such as database access efficiency, performance, resilience, security, compliance, and manageability.

The basic building block of Amazon RDS is the DB instance, where you create your databases. You choose the engine-specific characteristics of the DB instance when you create it, such as storage capacity, CPU, memory, and EC2 instance type on which the database server runs. A single Amazon RDS database instance can contain multiple databases. Those databases contain numerous object types, including tables, views, functions, procedures, and types. Tables and other object types are organized into schemas. These hierarchal constructs — instances, databases, schemas, and tables — can be arranged in different ways depending on the requirements of the database data producers and consumers.

Basic relational database refactoring patterns

Sample Database

To demonstrate different patterns, we need data. Specifically, we need a database with data. Conveniently, due to the popularity of PostgreSQL, there are many available sample databases, including the Pagila database. I have used it in many previous articles and demonstrations. The Pagila database is available for download from several sources.

Database diagram showing the relations between Pagila’s tables

The Pagila database represents a DVD rental business. The database is well-built, small, and adheres to a third normal form (3NF) database schema design. The Pagila database has many objects, including 1 schema, 15 tables, 1 trigger, 7 views, 8 functions, 1 domain, 1 type, 1 aggregate, and 13 sequences. Pagila’s tables contain between 2 and 16K rows.

Pattern 1: Single Schema

Pattern 1: Single Schema is one of the most basic database patterns. There is one database instance containing a single database. That database has a single schema containing all tables and other database objects.

Pattern 1: Single Schema

As organizations begin to move from monolithic to microservices architectures, they often retain their monolithic database architecture for some time.

Beginning to decompose the monolith application

Frequently, the monolithic database’s data model is equally monolithic, lacking proper separation of concerns using simple database constructs such as schemas. The Pagila database is an example of this first pattern. The Pagila database has a single schema containing all database object types, including tables, functions, views, procedures, sequences, and triggers.

To create a copy of the Pagila database, we can use pg_restore to restore any of several publically available custom-format database archive files. If you already have the Pagila database running, simply create a copy with pg_dump.

# set postgres environment variables
# ** CHANGE ME **
export PGHOST="postgres1.abcxyzdef.us-east-1.rds.amazonaws.com"
export PGPORT=5432
export PGDATABASE="postgres"
export PGUSER="admin"
export PGPASSWORD="change_me!"
# create new v1 of pagila database
export PGDATABASE="postgres"
psql -c "CREATE DATABASE pagila_v1;"
# restore original version of pagila database
pg_restore -d pagila_v1 pagila.dump
# confirm pagila tables in public schema
export PGDATABASE="pagila_v1"
psql -c "\dt"
Create a new version of the Pagila database for Pattern 1

Below we see the table layout of the Pagila database, which contains the single, default public schema.

-----------+----------+--------+------------
Instance | Database | Schema | Table
-----------+----------+--------+------------
postgres1 | pagila | public | actor
postgres1 | pagila | public | address
postgres1 | pagila | public | category
postgres1 | pagila | public | city
postgres1 | pagila | public | country
postgres1 | pagila | public | customer
postgres1 | pagila | public | film
postgres1 | pagila | public | film_actor
postgres1 | pagila | public | film_category
postgres1 | pagila | public | inventory
postgres1 | pagila | public | language
postgres1 | pagila | public | payment
postgres1 | pagila | public | rental
postgres1 | pagila | public | staff
postgres1 | pagila | public | store

Using a single schema to house all tables, especially the public schema is generally considered poor database design. As a database grows in complexity, creating, organizing, managing, and securing dozens, hundreds, or thousands of database objects, including tables, within a single schema becomes impossible. For example, given a single schema, the only way to organize large numbers of database objects is by using lengthy and cryptic naming conventions.

Public Schema

According to the PostgreSQL docs, if tables or other object types are created without specifying a schema name, they are automatically assigned to the default public schema. Every new database contains a public schema. By default, users cannot access any objects in schemas they do not own. To allow that, the schema owner must grant the USAGE privilege on the schema. by default, everyone has CREATE and USAGE privileges on the schema public. These default privileges enable all users to connect to a given database to create objects in its public schema. Some usage patterns call for revoking that privilege, which is a compelling reason not to use the public schema as part of your database design.

Pattern 2: Multiple Schemas

Separating tables and other database objects into multiple schemas is an excellent first step to refactoring a database to support microservices. As application complexity and databases naturally grow over time, schemas to separate functionality by business subdomain or teams will benefit significantly.

According to the PostgreSQL docs, there are several reasons why one might want to use schemas:

  • To allow many users to use one database without interfering with each other.
  • To organize database objects into logical groups to make them more manageable.
  • Third-party applications can be put into separate schemas, so they do not collide with the names of other objects.

Schemas are analogous to directories at the operating system level, except schemas cannot be nested.

Pattern 2: Multiple Schemas

With Pattern 2, as an organization continues to decompose its monolithic application architecture to a microservices-based application, it could transition to a schema-per-microservice or similar level or organizational granularity.

Continuing to decompose the monolith into microservices

Applying Domain-driven Design Principles

Domain-driven design (DDD) is “a software design approach focusing on modeling software to match a domain according to input from that domain’s experts” (Wikipedia). Architects often apply DDD principles to decompose a monolithic application into microservices. For example, a microservice or set of related microservices might represent a Bounded Context. In DDD, a Bounded Context is “a description of a boundary, typically a subsystem or the work of a particular team, within which a particular model is defined and applicable.” (hackernoon.com). Examples of Bounded Context might include Sales, Shipping, and Support.

One technique to apply schemas when refactoring a database is to mirror the Bounded Contexts, which reflect the microservices. For each microservice or set of closely related microservices, there is a schema. Unfortunately, there is no absolute way to define the Bounded Contexts of a Domain, and henceforth, schemas to a database. It depends on many factors, including your application architecture, features, security requirements, and often an organization’s functional team structure.

Reviewing the purpose of each table in the Pagila database and their relationships to each other, we could infer Bounded Contexts, such as Films, Stores, Customers, and Sales. We can represent these Bounded Contexts as schemas within the database as a way to organize the data. The individual tables in a schema mirror DDD concepts, such as aggregates, entities, or value objects.

# dump v1 of pagila database
pg_dump -Fc -d pagila_v1 -f pagila_v1.dump
# create new v2 of pagila database
psql -c "CREATE DATABASE pagila_v2;"
# restore v1 of pagila database
pg_restore -d pagila_v2 pagila_v1.dump
# connect to new pagila database
export PGDATABASE="pagila_v2"
psql
Create a new version of the Pagila database for Pattern 2
wrap in transaction
BEGIN;
optional, should be set to public by default
SET search_path TO public;
create new schemas
CREATE SCHEMA common;
CREATE SCHEMA customers;
CREATE SCHEMA films;
CREATE SCHEMA sales;
CREATE SCHEMA staff;
CREATE SCHEMA stores;
common
ALTER TABLE address SET SCHEMA common;
ALTER TABLE city SET SCHEMA common;
ALTER TABLE country SET SCHEMA common;
customers
ALTER TABLE customer SET SCHEMA customers;
films
ALTER TABLE actor SET SCHEMA films;
ALTER TABLE category SET SCHEMA films;
ALTER TABLE film SET SCHEMA films;
ALTER TABLE language SET SCHEMA films;
ALTER TABLE film_actor SET SCHEMA films;
ALTER TABLE film_category SET SCHEMA films;
sales
ALTER TABLE payment SET SCHEMA sales;
ALTER TABLE rental SET SCHEMA sales;
staff
ALTER TABLE staff SET SCHEMA staff;
stores
ALTER TABLE store SET SCHEMA stores;
ALTER TABLE inventory SET SCHEMA stores;
COMMIT;
confirm all tables are removed from public schema
\dt
view raw pagila_v2.sql hosted with ❤ by GitHub
Add the new schemas and move tables and objects accordingly

As shown below, the tables of the Pagila database have been relocated into six new schemas: commoncustomersfilmssalesstaff, and stores. The common schema contains tables with address data references tables in several other schemas. There are now no tables left in the public schema. We will assume other database objects (e.g., functions, views, and triggers) have also been moved and modified if necessary to reflect new table locations.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | address
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store

By applying schemas, we align tables and other database objects to individual microservices or functional teams that own the microservices and the associated data. Schemas allow us to apply fine-grain access control over objects and data within the database more effectively.

Refactoring other Database Objects

Typically with psql, when moving tables across schemas using an ALTER TABLE...SET SCHEMA... SQL statement, objects such as database views will be updated to the table’s new location. For example, take Pagila’s sales_by_store view. Note the schemas have been automatically updated for multiple tables from their original location in the public schema. The view was also moved to the sales schema.

CREATE OR REPLACE VIEW sales.sales_by_store AS
SELECT (c.city || ','::text) || cy.country AS store,
(m.first_name || ' '::text) || m.last_name AS manager,
sum(p.amount) AS total_sales
FROM sales.payment p
JOIN sales.rental r ON p.rental_id = r.rental_id
JOIN stores.inventory i ON r.inventory_id = i.inventory_id
JOIN stores.store s ON i.store_id = s.store_id
JOIN common.address a ON s.address_id = a.address_id
JOIN common.city c ON a.city_id = c.city_id
JOIN common.country cy ON c.country_id = cy.country_id
JOIN staff.staff m ON s.manager_staff_id = m.staff_id
GROUP BY cy.country, c.city, s.store_id,
m.first_name, m.last_name
ORDER BY cy.country, c.city;
Pagila’s sales_by_store database view with new schema pattern

Splitting Table Data Across Multiple Schemas

When refactoring a database, you may have to split data by replicating table definitions across multiple schemas. Take, for example, Pagila’s address table, which contains the addresses of customers, staff, and stores. The customers.customerstores.staff, and stores.store all have foreign key relationships with the common.address table. The address table has a foreign key relationship with both the city and country tables. Thus for convenience, the addresscity, and country tables were all placed into the common schema in the example above.

Although, at first, storing all the addresses in a single table might appear to be sound database normalization, consider the risks of having the address table’s data exposed. The store addresses are not considered sensitive data. However, the home addresses of customers and staff are likely considered sensitive personally identifiable information (PII). Also, consider as an application evolves, you may have fields unique to one type of address that does not apply to other categories of addresses. The table definitions for a store’s address may be defined differently than the address of a customer. For example, we might choose to add a county column to the customers.address table for e-commerce tax purposes, or an on_site_parking boolean column to the stores.address table.

In the example below, a new staff schema was added. The address table definition was replicated in the customersstaff, and stores schemas. The assumption is that the mixed address data in the original table was distributed to the appropriate address tables. Note the way schemas help us avoid table name collisions.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store

To create the new customers.address table, we could use the following SQL statements. The statements to create the other two address tables are nearly identical.

wrap in transaction
BEGIN;
create new customers.address table
CREATE SEQUENCE IF NOT EXISTS customers.address_address_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 1;
ALTER SEQUENCE customers.address_address_id_seq
OWNER TO pagila_admin;
CREATE TABLE IF NOT EXISTS customers.address (
address_id integer DEFAULT nextval('address_address_id_seq'::regclass) NOT NULL PRIMARY KEY,
address text NOT NULL,
address2 text,
district text NOT NULL,
city_id smallint NOT NULL REFERENCES common.city ON UPDATE CASCADE ON DELETE RESTRICT,
postal_code text,
phone text NOT NULL,
last_update timestamp with time zone DEFAULT now() NOT NULL
);
ALTER TABLE customers.address
OWNER TO pagila_admin;
CREATE INDEX IF NOT EXISTS idx_fk_city_id ON customers.address(city_id);
CREATE TRIGGER last_updated
BEFORE UPDATE ON customers.address FOR EACH ROW
EXECUTE PROCEDURE last_updated();
COMMIT;
Creating new customers.address table and associated objects

Although we now have two additional tables with identical table definitions, we do not duplicate any data. We could use the following SQL statements to migrate unique address data into the appropriate tables and confirm the results.

wrap in transaction
BEGIN;
copy only customer addresses to new customers.address table
INSERT INTO customers.address
SELECT *
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM customers.customer
);
copy only staff addresses to new staff.address table
INSERT INTO staff.address
SELECT COUNT(*)
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM staff.staff
);
copy only store addresses to new stores.address table
INSERT INTO stores.address
SELECT *
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM stores.store
);
check for extraneous data in common.address before deleting
SELECT *
FROM common.address
WHERE common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM customers.customer)
AND common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM staff.staff)
AND common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM stores.store);
COMMIT;
Migrating unique address data into the appropriate tables

Lastly, alter the existing foreign key constraints to point to the new address tables. The SQL statements for the other two address tables are nearly identical.

wrap in transaction
BEGIN;
customers.customer
ALTER TABLE IF EXISTS customers.customer
DROP CONSTRAINT IF EXISTS customer_address_id_fkey;
ALTER TABLE IF EXISTS customers.customer
ADD CONSTRAINT customer_address_id_fkey FOREIGN KEY (address_id)
REFERENCES customers.address (address_id) MATCH SIMPLE
ON UPDATE CASCADE
ON DELETE RESTRICT;
COMMIT;
Updating the existing foreign key constraints

There is now a reduced risk of exposing sensitive customer or staff data when querying store addresses, and the three address entities can evolve independently. Individual functional teams separately responsible customersstaff, and stores, can own and manage just the data within their domain.

Before dropping the common.address tables, you would still need to modify the remaining database objects that have dependencies on this table, such as views and functions. For example, take Pagila’s sales_by_store view we saw previously. Note line 9, below, the schema of the address table has been updated from common.address to stores.address. The stores.address table only contains addresses of stores, not customers or staff.

CREATE OR REPLACE VIEW sales.sales_by_store AS
SELECT (c.city || ','::text) || cy.country AS store,
(m.first_name || ' '::text) || m.last_name AS manager,
sum(p.amount) AS total_sales
FROM sales.payment p
JOIN sales.rental r ON p.rental_id = r.rental_id
JOIN stores.inventory i ON r.inventory_id = i.inventory_id
JOIN stores.store s ON i.store_id = s.store_id
JOIN stores.address a ON s.address_id = a.address_id
JOIN common.city c ON a.city_id = c.city_id
JOIN common.country cy ON c.country_id = cy.country_id
JOIN staff.staff m ON s.manager_staff_id = m.staff_id
GROUP BY cy.country, c.city, s.store_id,
m.first_name, m.last_name
ORDER BY cy.country, c.city;
Pagila’s sales_by_store database view with the new schema pattern

Below, we see the final table structure for the Pagila database after refactoring. Tables have been loosely grouped together schema in the diagram.

Database diagram showing new table relationships

Pattern 3: Multiple Databases

Similar to how individual schemas allow us to organize tables and other database objects and provide better separation of concerns, we can use databases the same way. For example, we could choose to spread the Pagila data across more than one database within a single RDS database instance. Again, using DDD concepts, while schemas might represent Bounded Contexts, databases most closely align to Domains, which are “spheres of knowledge and activity where the application logic revolves” (hackernoon.com).

Pattern 3: Multiple Databases

With Pattern 3, as an organization continues to refine its microservices-based application architecture, it might find that multiple databases within the same database instance are advantageous to further separate and organize application data.

Moving from a single- to multi-database architecture

Let’s assume that the data in the films schema is owned and managed by a completely separate team who should never have access to sensitive data stored in the customersstores, and sales schemas. According to the PostgreSQL docs, database access permissions are managed using the concept of roles. Depending on how the role is set up, a role can be thought of as either a database user or a group of users.

To provide greater separation of concerns than just schemas, we can create a second, completely separate database within the same RDS database instance for data related to films. With two separate databases, it is easier to create and manage distinct roles and ensure access to customersstores, or sales data is only accessible to teams that need access.

# dump v2 of pagila database
pg_dump -Fc -d pagila_v2 -f pagila_v2.dump
# create 2 new v3 databases
export PGDATABASE="postgres"
psql << EOF
\x
CREATE DATABASE pagila_v3;
CREATE DATABASE products_v3;
EOF
# restore v2 of pagila database
pg_restore -d pagila_v3 pagila_v2.dump
pg_restore -d products_v3 -n films pagila_v2.dump
# connect to new pagila database
export PGDATABASE="pagila_v3"
psql
Create a new version of the Pagila and Products database for Pattern 3

Below, we see the new layout of tables now spread across two databases within the same RDS database instance. Two new tables, highlighted in bold, are explained below.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | film
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store
-----------+----------+-----------+---------------
postgres1 | products | films | actor
postgres1 | products | films | category
postgres1 | products | films | film
postgres1 | products | films | film_actor
postgres1 | products | films | film_category
postgres1 | products | films | language
postgres1 | products | films | outbox

Change Data Capture and the Outbox Pattern

Inserts, updates, and deletes of film data can be replicated between the two databases using several methods, including Change Data Capture (CDC) with the Outbox Pattern. CDC is “a pattern that enables database changes to be monitored and propagated to downstream systems” (RedHat). The Outbox Pattern uses the PostgreSQL database’s ability to perform an commit to two tables atomically using a transaction. Transactions bundles multiple steps into a single, all-or-nothing operation.

In this example, data is written to existing tables in the products.films schema (updated aggregate’s state) as well as a new products.films.outbox table (new domain events), wrapped in a transaction. Using CDC, the domain events from the products.films.outbox table are replicated to the pagila.films.film table. The replication of data between the two databases using CDC is also referred to as eventual consistency.

Change Data Capture (CDC) with the Outbox Pattern

In this example, films in the pagila.films.film and products.films.outbox tables are represented in a denormalized, aggregated view of a film instead of the original, normalized relational multi-table structure. The table definition of the new pagila.films.film table is very different than that of the original Pagila products.films.films table. A concept such as a film, represented as an aggregate or entity, can be common to multiple Bounded Contexts, yet have a different definition.

CREATE TABLE IF NOT EXISTS films.outbox
(
film_id integer NOT NULL,
title character varying(50) NOT NULL,
release_year smallint NOT NULL,
film_language character varying(20) NOT NULL,
rating character varying(5) COLLATE NOT NULL,
categories character varying(100) NOT NULL,
actors character varying NOT NULL,
rental_duration smallint NOT NULL,
length_minutes smallint NOT NULL,
replacement_cost numeric(5,2) NOT NULL,
rental_rate numeric(4,2) NOT NULL,
last_update timestamp with time zone NOT NULL DEFAULT now(),
CONSTRAINT outbox_pkey PRIMARY KEY (film_id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS films.outbox
OWNER to products_admin;
Example products.films.outbox table definition (similar for pagila.films.film)

Note the Confluent JDBC Source Connector  (io.confluent.connect.jdbc.JdbcSourceConnector) used here will not work with PostgreSQL arrays, which would be ideal for one-to-many categories and actors columns. Arrays can be converted to text using ::text or by building value-delimited strings using string_agg aggregate function.

PROCEDURE: films.insert_into_outbox(integer)
DROP PROCEDURE IF EXISTS films.insert_into_outbox(integer);
EXAMPLE: "CALL films.insert_into_outbox(100);"
CREATE OR REPLACE PROCEDURE films.insert_into_outbox(IN filmid integer)
LANGUAGE 'sql'
BEGIN ATOMIC
delete existing record
DELETE
FROM films.outbox
WHERE (outbox.film_id = insert_into_outbox.filmid);
insert new record
INSERT INTO films.outbox (film_id, title, release_year,
film_language, rating, categories,
actors, rental_duration, length_minutes,
replacement_cost, rental_rate)
SELECT f.film_id,
initcap(f.title) AS title,
f.release_year,
trim(BOTH FROM l.name) AS film_language,
f.rating,
(SELECT array
(SELECT c.name
FROM films.film_category AS fc
JOIN films.category AS c ON fc.category_id = c.category_id
WHERE film_id = f.film_id)::text AS categories),
(SELECT array
(SELECT initcap(concat(a.first_name, ' ', a.last_name)) AS actors
FROM films.film_actor AS fa
JOIN films.actor AS a ON fa.actor_id = a.actor_id
WHERE film_id = f.film_id)::text AS actor_array),
f.rental_duration,
f.length AS length_minutes,
f.replacement_cost,
f.rental_rate
FROM films.film f
JOIN films.language l ON f.language_id = l.language_id
WHERE (f.film_id = insert_into_outbox.filmid)
GROUP BY f.film_id, (trim(BOTH FROM l.name));
END;
ALTER PROCEDURE films.insert_into_outbox (integer)
OWNER TO products_admin;
An example query to insert data into the products.films.outbox table

Given this table definition, the resulting data would look as follows.

film_id title release_year film_language rating categories actor_array rental_duration length_minutes replacement_cost rental_rate
389 Gunfighter Mussolini 2006 English PG-13 {Sports} {"Audrey Olivier","Judy Dean","Scarlett Damon","Russell Close"} 3 127 9.99 2.99
581 Minority Kiss 2006 English G {Music} {"Vivien Basinger"} 4 59 16.99 0.99
598 Mosquito Armageddon 2006 English G {Sports} {"Goldie Brody","Kirk Jovovich","Nick Stallone","Reese West"} 6 57 22.99 0.99
943 Villain Desperate 2006 English PG-13 {Documentary} {"Dustin Tautou","Cary Mcconaughey"} 4 76 27.99 4.99
490 Jumanji Blade 2006 English G {New} {"Jennifer Davis","Bob Fawcett","Nick Stallone","Gary Phoenix","Mena Temple","Jim Mostel"} 4 121 13.99 2.99
243 Doors President 2006 English NC-17 {Animation} {"Karl Berry","Lucille Tracy","Natalie Hopkins","Christian Akroyd","Sylvester Dern","Gene Hopkins","Ed Mansfield","Kim Allen","Reese West"} 3 49 22.99 4.99
40 Army Flintstones 2006 English R {Documentary} {"Ed Chase","Cary Mcconaughey","Mae Hoffman","Gene Willis","Penelope Cronyn","Matthew Carrey","Russell Close"} 4 148 22.99 0.99
317 Fireball Philadelphia 2006 English PG {Comedy} {"Val Bolger","Jude Cruise","Adam Grant","James Pitt","Frances Tomei"} 4 148 25.99 0.99
17 Alone Trip 2006 English R {Music} {"Ed Chase","Karl Berry","Uma Wood","Woody Jolie","Spencer Depp","Chris Depp","Laurence Bullock","Renee Ball"} 3 82 14.99 0.99
195 Crowds Telemark 2006 English R {Sci-Fi} {"Matthew Johansson","Anne Cronyn","Jeff Silverstone","Matthew Carrey"} 3 112 16.99 4.99
Example of data in the pagila.films.film and products.films.outbox tables

The existing pagila.stores.inventory table has a foreign key constraint on the the pagila.films.film table. However, the films schema and associated tables have been migrated to the products database’s films schema. To overcome this challenge, we can:

  1. Create a new pagila.films.film table
  2. Continuously replicate data from the products database to the pagila.films.film table data using CDC (see below)
  3. Modify the pagila.stores.inventory table to take a dependency on the new film table
  4. Drop the duplicate tables and other objects from the pagila.films schema

Debezium and Confluent for CDC

There are several technology choices for performing CDC. For this post, I have used RedHat’s Debezium connector for PostgreSQL and Debezium Outbox Event Router, and Confluent’s JDBC Sink Connector. Below, we see a typical example of a Kafka Connect Source Connector using the Debezium connector for PostgreSQL and a Sink Connector using the Confluent JDBC Sink Connector. The Source Connector streams changes from the products logs, using PostgreSQL’s Write-Ahead Logging (WAL) feature, to an Apache Kafka topic. A corresponding Sink Connector streams the changes from the Kafka topic to the pagila database.

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres1.abcxyzdef.us-east-1.rds.amazonaws.com",
"database.port": "5432",
"database.user": "cdc_source_user",
"database.password": "change_me!",
"database.dbname": "products",
"database.server.name": "products",
"table.include.list": "films.outbox",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"slot.name": "debezium_source_connector"
}
Debezium connector for PostgreSQL example
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "products.films.outbox",
"connection.url": "jdbc:postgresql://postgres1.abcxyzdef.us-east-1.rds.amazonaws.com:5432/pagila?stringtype=unspecified",
"connection.user": "cdc_sink_user",
"connection.password": "change_me!",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "films.film",
"auto-evolve": "true",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "film_id",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
Confluent JDBC Sink Connector example

Pattern 4: Multiple Database Instances

At some point in the evolution of a microservices-based application, it might become advantageous to separate the data into multiple database instances using the same database engine. Although managing numerous database instances may require more resources, there are also advantages. Each database instance will have independent connection configurations, roles, and administrators. Each database instance could run different versions of the database engine, and each could be upgraded and maintained independently.

Pattern 4: Multiple Database Instances

With Pattern 4, as an organization continues to refine its application architecture, it might find that multiple database instances are beneficial to further separate and organize application data.

Moving from multiple databases to multiple DB instances

Below is one possible refactoring of the Pagila database, splitting the data between two database engines. The first database instance, postgres1, contains two databases, pagila and products. The second database instance, postgres2, contains a single database, products.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres2 | products | films | actor
postgres2 | products | films | category
postgres2 | products | films | film
postgres2 | products | films | film_actor
postgres2 | products | films | film_category
postgres2 | products | films | language

Data Replication with CDC

Note the films schema is duplicated between the two databases, shown above. Again, using the CDC allows us to keep the six postgres1.pagila.films tables in sync with the six  postgres2.products.films tables using CDC. In this example, we are not using the OutBox Pattern, as used previously in Pattern 3. Instead, we are replicating any changes to any of the tables in postgres2.products.films schema to the corresponding tables in the postgres1.pagila.films schema.

Multi-table data replication between database instances using Change Data Capture (CDC)

To ensure the tables stay in sync, the tables and other objects in the postgres1.pagila.films schema should be limited to read-only access (SELECT) for all users. The postgres2.products.films tables represent the authoritative source of data, the System of Record (SoR). Any inserts, updates, or deletes, must be made to these tables and replicated using CDC.

CREATE USER read_only_user WITH ENCRYPTED PASSWORD 'change_me!';
GRANT CONNECT ON DATABASE pagila TO read_only_user;
GRANT USAGE ON SCHEMA films TO read_only_user;
GRANT SELECT ON ALL TABLES IN SCHEMA films TO read_only_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA films
GRANT SELECT ON TABLES TO read_only_user;
Example of a user with read-only rights (SELECT) to films schema

Pattern 5: Multiple Database Engines

AWS commonly uses the term ‘purpose-built databases.’ AWS offers over fifteen purpose-built database engines to support diverse data models, including relational, key-value, document, in-memory, graph, time series, wide column, and ledger. There may be instances where using multiple, purpose-built databases makes sense. Using different database engines allows architects to take advantage of the unique characteristics of each engine type to support diverse application requirements.

With Pattern 5, as an organization continues to refine its application architecture, it might choose to leverage multiple, different database engines.

Moving from multiple databases to multiple database engines

Take for example an application that uses a combination of relational, NoSQL, and in-memory databases to persist data. In addition to PostgreSQL, the application benefits from moving a certain subset of its relational data to a non-relational, high-performance key-value store, such as Amazon DynamoDB. Furthermore, the application implements a database cache using an ultra-fast in-memory database, such as Amazon ElastiCache for Redis.

Pattern 5: Multiple Database Engines

Below is one possible refactoring of the Pagila database, splitting the data between two different database engines, PostgreSQL and Amazon DynamoDB.

-----------+----------+-----------+-----------
Instance | Database | Schema | Table
-----------+----------+-----------+-----------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+-----------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+-----------
postgres1 | pagila | films | film
-----------+----------+-----------+-----------
postgres1 | sales | sales | payment
postgres1 | sales | sales | rental
-----------+----------+-----------+-----------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+-----------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | film
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store
-----------+----------+-----------+-----------
DynamoDB | - | - | Films

The assumption is that based on the application’s access patterns for film data, the application could benefit from the addition of a non-relational, high-performance key-value store. Further, the film-related data entities, such as a film , category, and actor, could be modeled using DynamoDB’s single-table data model architecture. In this model, multiple entity types can be stored in the same table. If necessary, to replicate data back to the PostgreSQL instance from the DynamoBD instance, we can perform CDC with DynamoDB Streams.

Creating a new Films data model for DynamoDB using NoSQL Workbench
Aggregate view of the DynamoDB single-table Films data model

CQRS

Command Query Responsibility Segregation (CQRS), a popular software architectural pattern, is another use case for multiple database engines. The CQRS pattern is, as the name implies, “a software design pattern that separates command activities from query activities. In CQRS parlance, a command writes data to a data source. A query reads data from a data source. CQRS addresses the problem of data access performance degradation when applications running at web-scale have too much burden placed on the physical database and the network on which it resides” (RedHat). CQRS commonly uses one database engine optimized for writes and a separate database optimized for reads.

CQRS architectural pattern using two different database engines

Conclusion

Embracing a microservices-based application architecture may have many business advantages for an organization. However, ignoring the application’s existing databases can negate many of the benefits of microservices. This post examined several common patterns for refactoring relational databases to match a modern microservices-based application architecture.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author.

, , , ,

Leave a comment

Considerations for Architecting Resilient Multi-Region Workloads

What to consider when evaluating a ‘multi-region’ strategy as part of business continuity and disaster recovery planning

Audio version of the blog post on YouTube

Introduction

Increasingly, I hear the term ‘multi-region’ used within the IT community and in conversations with peers and customers, most often within the context of disaster recovery. In my experience, ‘multi-region’ is a cloud provider-agnostic phrase that can mean different things to different organizations. A few examples:

  • Multiple, independent, regionally-deployed application instances that better serve a geographically-diverse customer base, for regulated ‘locality-restricted’ workloads, to ensure data sovereignty, distribute system load, or minimize the blast radius of a regional disaster event. Although a disaster recovery plan may be required, the primary driver of this architecture is often not disaster recovery.
  • An active-passive failover strategy in which a second DR Region hosts a mixture of cold, warm, and hot copies of workloads and serves as a failover in response to a disaster event in the Primary Region. In my experience, this is probably the most common use case when someone refers to ‘multi-region.’
  • An active-active architecture in which data is continually replicated and traffic can be seamlessly routed based on geolocation between all services within two or more geo-redundant regions, making it resilient to the impact of a regional disaster event. Some might describe this architecture as having both inner-regional and inter-regional high availability.
Copyright: peshkov
Copyright: peshkov

Terminology

The following terminology is commonly used when discussing Business Continuity and Disaster Recovery Planning. Teams should be familiar with these concepts before undertaking planning activities:

  • Fault Tolerance (FT), High Availability (HA), Disaster Recovery (DR), and Business Continuity (BC), and the distinct differences between the four concepts
  • Business Continuity Plan/Planning (BCP) and Disaster Recovery Plan/Planning (DRP), and the differences between the two types of plans (source)
  • Business Continuity and Disaster Recovery (BCDR or BC/DR) (source)
  • Business Impact Analysis (BIA) and Risk Assessment (source)
  • Categories of Disaster: Natural Disasters, Technical Failures, and Human Actions, both intentional and unintentional (source)
  • Resiliency, which includes both Disaster Recovery (service restoration) and Availability (preventing loss of service) (source)
  • Crisis Management: Critical vs. Non-Critical Systems and Mission Critical vs. Business Critical Systems (source)
  • Regions vs. Availability Zones (aka Zones), common constructs to all major Cloud Service Providers (CSP): AWS, Google Cloud, Microsoft Azure, IBM Cloud, and Oracle Cloud
  • Primary (aka Active) Region vs. DR (aka Passive or Standby) Region (source)
  • Active-Active vs. Active-Passive DR Strategies (source)
  • SHARE’s 7 Tiers of Disaster Recovery (source)
  • Disaster Recovery Site Types: Cold, Warm, and Hot (source)
  • AWS Multi-Region Disaster Recovery Strategies: 1) Backup and restore, 2) active-passive Pilot light, 3) active-passive Warm standby, or 4) Multi-region (multi-site) active-active (source)
  • Recovery Time Objective (RTO) and Recovery Point Objective (RPO), and the methods and costs to achieve varying levels of each SLA (source)
  • Failover and Failback Operations (source)
  • Partial vs. Complete Regional Outage, and the implications to Disaster Recovery Planning (source)
  • Single Points of Failure (SPOF) (source)

BCDR Planning Considerations

When developing BCDR Plans that include multi-region, there are several technical aspects of your workloads that need to be considered. The following list is not designed to be exhaustive, nor is it intended to suggest that multi-region DR is an unattainable task. On the contrary, this list is meant to encourage thorough planning and suggest ways to continually improve an organization’s plan.

  • Configuration Data Management
  • Secret Management
  • Cryptographic Key Management
  • Hardware Security Module (HSM)
  • Credential Management
  • SSL/TLS Certificate Management
  • Authentication (AuthN) and Authorization (AuthZ)
  • Domain Name System (DNS), DNS Failover and Failback, Global Traffic Management (GTM)
  • Content delivery network (CDN)
  • Specialized Workloads, such as SAP, VMware, SharePoint, Citrix, Oracle, SQL Server, SAP HANA, and IBM Db2
  • On-premises workload dependencies, and wide-area network (WAN) connectivity between on-premises data centers and the Cloud
  • Remote access from on-premises and remote employees to cloud-based backend and enterprise systems
  • Edge compute, such as connected devices, IoT, storage gateways, and remotely-managed local cloud-infrastructure (e.g., AWS Outposts
  • DevOps, CI/CD, Release Management
  • Infrastructure as Code (IaC)
  • Public and private artifact repositories, including Docker and Virtual Machine (VM) Image repositories
  • Source code in Version Control Systems (VCS), also known as Source Control Management (SCM)
  • Software licensing for the self-managed and hosted services
  • Observability, monitoring, logging, alerting, and notification
  • Regional differences of a Cloud Provider’s service offerings, cost, performance, and support
  • Latency, including latency between Primary and DR Region and between end-users and partners and DR Region
  • Data residency and data sovereignty requirements, which will impact choice of DR Region
  • Automated, event-driven Failover process vs. manual processes
  • Failback process
  • Playbooks, documentation, training, and regular testing
  • Support, help desk, and call center coordination (and potential impact of disaster event on Cloud-based call center technologies)

Disaster Recovery Planning Process

In my opinion, many disaster planning discussions I’m involved in begin by focusing on the wrong things. Logically, engineering teams often jump right to questions about specific service capabilities, such as “is my database capable of cross-region replication?” or “how do I support multi-region cryptographic keys for data encryption and decryption.” Yet, higher-level business continuity planning or workload assessments haven’t yet been conducted. Based on my experience, I suggest the following approach to get started with disaster recovery planning (again, not an exhaustive list):

  1. Workload Portfolio: Identify the organization’s complete workload portfolio, including all distinct applications and their associated infrastructure, datastores, and other dependencies.
  2. DR Workloads: From the portfolio, identify which workloads are considered business-critical or mission-critical systems and must be part of the disaster recovery planning.
  3. Classification: Classify each DR workload based on Business Impact Analysis, Risk Assessment, and SLAs such as availability, RTO, and RPO. Do the requirements demand an active-passive or active-active DR strategy? In AWS terms, do the requirements dictate Backup and Restore, Pilot Light, Warm Standby, or Multi-Site Active-Active?
  4. Documentation: Obtain current documentation and architectural and process-flow diagrams showing all components and dependencies, including cross-workload and third-party dependencies such as SaaS vendors. Review and verify accuracy of documentation and diagrams.
  5. Current Regions: Identify the Regions into which the existing workload is deployed.
  6. Service-level Review: Review each workload’s individual components to ensure they can meet the DR requirements, such as compute, storage, databases, security, networking, edge, CDN, mobile, frontend web, and end-user compute (e.g., “is the workload’s specific NoSQL database capable of cross-region replication and automatic failover?”).
  7. Third-party Dependencies: Identify and review each workload’s third-party dependencies, such as SaaS partners. Is their service essential to a critical workload’s functionality? What is your partner’s Disaster Recovery Plan?
  8. DR-capable Workload: Determine how much re-engineering is required to deploy and operate the workload to the DR Region.
  9. Data Residency and Data Sovereignty: Review data residency and sovereignty requirements for the workload, which could impact the choice of DR Regions.
  10. Choose DR Region: Not all Cloud Provider’s Regions offer the same services. Therefore, choose a DR Region(s) that can support all services utilized by the workload.
  11. Disaster Planning Considerations: Review all items shown in the previous ‘Disaster Recovery Planning Considerations’ section for each workload.
  12. Prepare for Partial Failures: Decide how you will handle partial versus complete regional outages. Regional disruptions of specific services are the most common type of Cloud outage, often resulting in partial impairment of a workload.
  13. Cost: Calculate the cost of the workload based on the required DR Service Level and DR Region. Investigate Cloud-provider’s volume pricing agreements to reduce costs.
  14. Budget: Adjust DR Service Level requirements to meet budgetary constraints if necessary.
  15. Re-engineer Workloads: Construct timelines and budgets to re-engineer workloads for DR if required.
  16. DR Proof of Concept: Build out a Proof of Concept (POC) DR Region to validate the plan’s major assumptions and adjust if necessary; include failover and failback operations.
  17. DR Buildout: Construct timelines and budgets to build out the DR environment.
  18. Workload Deployment: Construct timelines and budgets to provision, deploy, configure, test, and monitor workloads in the DR Region.
  19. Documentation, Training, and Testing: Ensure all playbooks, documentation, training, and testing procedures are completed and regularly reviewed, updated, and tested, including failover and failback operations.

Before Considering Multi-Region

Workloads built to be resilient, fault-tolerant, highly available, easily deployable and configurable, backed-up, and monitored will help an organization withstand the most common disruptions in the Cloud. Before considering a multi-region disaster recovery strategy, I strongly recommend ensuring the following aspects of your workloads are adequately addressed:

  • Fault Tolerance: Workloads are architected to be fault-tolerant such that they can withstand the failure of individual components and operate in a degraded state. Eliminate any single point of failure (SPOF).
  • High Availability: Workloads are designed to be highly available, which with most cloud providers means resources are spread across multiple, discrete, regionally dispersed data centers or Availability Zones (AZ) and can tolerate the loss of a data center or AZ.
  • Backup: All workload components, source code, data, and configuration are regularly backed up using automated processes. All backups are verified. Backups are periodically restored to test restore procedures. As the most basic form of disaster recovery, developing and testing a backup and restore strategy will help teams to think more deeply about disaster planning.
  • Observability: Workloads have adequate observability, monitoring, logging, alerting, and notification processes in place.
  • Automation: Workloads and all required infrastructure and configuration are codified, documented, and can be efficiently and consistently deployed and configured without requiring manual intervention, using mature DevOps and CI/CD practices. Ensuring workloads can be consistently deployed and re-deployed will help ensure they could be built out in a second region if multi-region is a potential goal.
  • Environment-agnostic: Workloads are environment-agnostic, with no hard-coded application or infrastructure dependencies or configurations. Confirming workloads are environment-agnostic will help to ensure they are portable across regions if multi-region is a potential goal.
  • Multi-environment: Workloads are deployed to one or more SDLC environments prior to Production, such as Development, Test, Staging, or UAT. The environment should be a different Cloud account than Production. A second environment will help to ensure workloads are portable across regions if multi-region is a potential goal.
  • Chaos Engineering: Workloads are regularly tested to ensure that they can withstand unexpected disruptions.

References

In addition to the references already listed, here are some useful references to learn more about the topics introduced in this post:

Conclusion

In this post, we explored some of the potential meanings of the term ‘multi-region’. We then reviewed Business Continuity and Disaster Recovery Planning terminologies, considerations, and a recommended approach to get started. Lastly, we discovered some best-practices to enact before considering a multi-region disaster recovery strategy. What does ‘multi-region’ mean to your organization? Do you have comprehensive Business Continuity and Disaster Recovery Plans for your Cloud-based workloads? I would value your feedback and thoughts.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Evolving Models for ISV Software Delivery, Management, and Support

Understanding evolving models used by Independent Software Vendors for cloud-based software delivery, management, and support

Copyright: melpomen (123rf.com)

Introduction

As a Consultant, Enterprise Architect, Partner Solutions Architect, and Senior Solutions Architect, I have had the chance to work with many successful Independent Software Vendors (ISVs), from early-stage startups to large established enterprises. Based on my experience, I wrote two AWS Partner Network (APN) Blog posts: Architecting Successful SaaS: Understanding Cloud-Based Software-as-a-Service Models and Architecting Successful SaaS: Interacting with Your SaaS Customer’s Cloud Accounts. Continuing with that series, this post explores several existing and evolving models used by ISV’s to deliver, manage, and support their software product to cloud-based customers.

Independent Software Vendors

An ISV, also known as a software publisher, specializes in making and selling software designed for mass or niche markets. This is in contrast to in-house software, which the organization develops for its internal use, or custom software designed for a single, specific third party. Although end-users consume ISV-provided software, it remains the property of the vendor (source: Wikipedia).

The ISV industry, especially SaaS-based products, has seen huge year-over-year (YOY) growth. VC firms continue to fuel industry growth (and valuations) with an unprecedentedly high level of capital investment throughout 2021. According to SaaS Industry, the total investment for Q1-2021 stood at $9.9B. B2B data industry resource, Datamation, examines prominent ISVs in their article, Top 75 SaaS Companies of 2022. SaaS management company, Cledara, produced a similar piece, The Top SaaS Companies in 2021.

Online Marketplaces

Cloud-based ISV software products are purchased directly from the vendor, or more recently, through marketplaces hosted by major cloud providers. In their Predicts 2022: SaaS Dominates Software Contracting by 2026 — and So Do Risks, Gartner observes, “Online marketplaces have become more prevalent (e.g., Amazon Web Services [AWS], Google, etc.). With easy access to these marketplaces, customers can and are purchasing marketplace products without the need to engage the software vendor directly or interact with sourcing or procurement within their organizations.” Examples of marketplaces include AWS Marketplace, Azure Marketplace, Google Cloud Marketplace, Salesforce AppExchange, and Oracle Cloud Marketplace.

Major Cloud Providers’ approximate market share, according to Statista and Canalys

AWS Marketplace, for example, describes itself as “a curated digital catalog that makes it easy for organizations to discover, procure, entitle, provision, and govern third-party software.” Company tackle.io, whose platform facilitates the process of listing, selling, and managing cloud marketplaces for ISVs, produced a report, State of Cloud Marketplaces 2021, detailing the leading cloud software sales and delivery platforms.

Purpose-built Products

Based on my observations, most ISV products can be classified as either purpose-built or general-purpose. Purpose-built ISV products are designed to address a specific customer need. Many are considered enterprise software, also known as Enterprise Application Software (EAS). Enterprise software includes Customer Relationship Management (CRM), Management Information Systems (MIS), Enterprise Resource Planning (ERP), Human Resource Management (HRM or HRIS), Content Management Systems (CMS), Learning Management Systems (LMS), Field Service Management (FSM), Knowledge Management Systems (KMS), Talent Management Systems (TMS), and Applicant Tracking Systems (ATS).

General-purpose Products

General-purpose ISV products often focus on a certain technology, such as security, identity management, databases, analytics, storage, AI/ML, and virtual desktops. These products are frequently used by customers as one part of a larger solution. Many of these products are hosted ‘as-a-Service,’ such as Database as a Service (DBaaS), Data Warehousing as a Service (DWaaS), Monitoring as a Service (MaaS), Analytics as a Service (AaaS), Machine Learning-as-a-Service (MLaaS), Identity-as-a-Service (IaaS), Desktop as a Service (DaaS), and Storage as a Service (STaaS).

Examining the current 19,919 listings in the AWS Marketplace, by general category, we can see a mix of purpose-built (e.g., Business Applications, Industries) and general-purpose ISV products (e.g., DevOps, ML, IoT, Data, Infrastructure).

AWS Marketplace product by category (January 2022)

Below are all the categories of ISV products and services found on the AWS Marketplace.

AWS Marketplace product categories (January 2022)

Similarly, looking at the current 5,008 listings in the Google Cloud Marketplace by category, we can see both purpose-built and general-purpose ISV products.

Google Cloud Marketplace products by category (January 2022)

SaaS-as-a-Service

There is even an established market for SaaS-as-a-Service (SaaSaaS) — products and platforms designed to enable ISVs and SaaS providers. These products and platforms are designed to help overcome the inherent engineering complexities required to prepare, deliver, manage, bill, and support ISV products. Examples include services such as AWS SaaS Factory Program, AWS SaaS Boost, and Azure SaaS Development Kit (ASDK), as well as vendors, like tackle.io and AppDirect.

Current ISV Models

As the organizations continue to move their IT infrastructure and workloads to cloud providers such as Amazon Web Services (AWS), Google Cloud, and Microsoft Azure, ISVs have had to evolve how they distribute, manage, and support their software products. Today, most ISVs use a variation of one of three models: Customer-deployed (aka self-hosted), Software as a Service (SaaS), and SaaS with Remote Agents.

These methods are evident from looking at the current listings in the AWS Marketplace by delivery method. Of the 14,444 products, 11.3% are categorized as SaaS. Many of the remaining delivery methods could be classified as Customer-deployed products. The most significant percentage of products are delivered as Amazon Machine Images (AMI). Custom-built VM images were traditionally the most common delivery forms. However, newer technologies, such as Container Images, Helm Charts, Data Exchange (Datasets), and SageMaker (ML) Algorithms and Models are quickly growing in popularity. Data Exchange products, for example, have doubled in 18 months.

AWS Marketplace products by delivery method (January 2022)

Customer-deployed Model

In a Customer-deployed ISV product model, the customer deploys the ISV’s software product into their own Cloud environment. The ISV’s product is packaged as virtual machine images, such as Amazon Machine Images (AMIs), Docker container images, Helm Charts, licensed datasets, machine learning models, and infrastructure as code (IaC) files, such as Amazon CloudFormation Templates.

Customer-deployed (aka self-hosted) model

With Customer-deployed products, it is not required but also not uncommon for the ISV to have some connection to the customer’s cloud environment for break-the-glass (BTG) support, remote monitoring, or license management purposes.

Software as a Service (SaaS)

According to Wikipedia, SaaS is a software licensing and delivery model in which software is licensed on a subscription basis and is centrally hosted within the ISV’s cloud environment. SaaS is one of the three best-known cloud computing models, along with Platform as a Service (PaaS) and Infrastructure as a Service (IaaS).

Software as a Service (SaaS) model

With SaaS, the customer’s data can remain in the customer’s cloud environment. A secure connection, such as an Open Database Connectivity (ODBC) or Java Database Connectivity (JDBC) connection, can be made to the customer’s datasources. Alternately, the customer’s data is securely copied in advance or just-in-time (JIT) to dedicated storage within the ISV’s cloud environment. Using caching technologies, such as RubiX, Databricks Delta caching, and Apache Spark caching, data can be cached as needed. Some caching technologies, such as Alluxio, even offer tiered caching based on the frequency it is accessed — hot, warm, or cold.

SaaS with Remote Agents Model

The SaaS with Remote Agents model is a variation of the pure SaaS model. In this scenario, the customer deploys ISV-supplied software agents within their cloud, on-premise, and edge (IoT) environments. Software agents can be language-specific libraries or modules added to an application, sidecar containers, serverless functions, or stand-alone VMs. These agents collect data, pre-optimized payloads, and push data back to the ISV’s cloud environment. The prototypical example of this model is monitoring/observability and Application Performance Monitoring (APM) vendors. They often use agents to collect and aggregate a customer’s telemetry (logs, metrics, events, traces) to the ISV’s external cloud environment. The ISV’s cloud environment acts as a centralized, single pane of glass for the customer to view their aggregated telemetry.

SaaS with Remote Agents model

Some cloud providers offer products designed specifically to make a customer’s integration with SaaS products easier. With Amazon EventBridge, for example, you can “easily connect to and stream data from your SaaS applications without having to write any code.” Amazon EventBridge has established integrations with dozens of SaaS partners, including Auth0, DataDog, MongoDB, New Relic, Opsgenie, PagerDuty, Shopify, and Zendesk.

Evolving ISV Models

Remotely-managed Model

In addition to the customer-deployed and SaaS models, some ISVs have developed new models for offering their software products. One such model is what I refer to as the Remotely-managed model. This hybrid model combines the best aspects of both the Customer-deployed and SaaS models. They are designed to address common customer concerns, such as security, speed, ease of use, and cost.

Remotely-managed model

With the Remotely-managed model, the ISV’s product is administered by the customer through a user interface (UI) hosted in the ISV’s cloud environment. The administrative actions of the customer are translated into commands, which are executed in the customer’s cloud environment. These remote commands are communicated using API calls or bi-directional message queues such as EventBridge. Often, the customer grants the ISV programmatic access to their environment. The ISVs access is limited to a fine-grain set of permissions, based on the principle of least privilege (PoLP), to deploy and manage their product, usually isolated within a separate customer account or Virtual Private Cloud (VPC).

Deploying the ISV’s product to the customer’s environment adjacent to the data maximizes security by eliminating data movement external to the customer’s cloud environment. Instead, computations are done adjacent to data within the customer’s environment.

SaaS Façade Model

Recently, I have been developing some architectural thinking around a newer model that I call the SaaS Façade model. A façade or facade is generally the front part or exterior of a building. In software design, a facade is an object that serves as a front-facing interface masking more complex underlying or structural code (source: Wikipedia).

SaaS Façade model

The SaaS Façade model is a variation of the Remotely-managed model. Although architecturally more complex than the Remotely-managed model, the SaaS Façade model is simpler from a customer perspective. Both the customer’s administrators and end-users access the software product through the ISV’s cloud environment, but there is little to no data movement from the customer’s environment.

Separating Front-end from Back-end

The ISV’s product architecture is the most significant difference between the SaaS Façade model and the Remotely-managed model. Most modern software products are composed of multiple, decoupled components or tiers, including front-end/UI/presentation layer, back-end/services, and data. In the SaaS Façade model, the customer’s end-users access the ISV’s product through the ISV’s cloud environment, similar to SaaS. The ISV’s front-end is deployed to the ISV’s cloud environment. The ISV’s product’s back-end is deployed to the customer’s cloud environment, adjacent to the customer’s data. The ISV product’s data tier is deployed to either the ISV’s or customer’s cloud environment, depending on the product’s exact architectural requirements. This model requires a highly decoupled architecture and tolerance for moderate latency.

Decoupled User Management

A frequent request from customers of ISV software concerns user management. Customers want to allow approved external users to access read-only data, such as a sales report, without adding them to the customer’s cloud environment’s Identity and Access Management (IAM) system. Additionally, end-users do not need to access the software by first logging in through the customer’s cloud provider’s console and having an established IAM identity. The SaaS Façade model enables this capability.

Multi-Cloud

Another potential use case for the SaaS Façade model is implementing a multi-cloud customer architecture. Imagine an ISV’s cloud environment hosted on a single public cloud provider’s platform, while the customer has workloads and data housed on multiple cloud provider’s platforms. The ISV’s product’s back-end would be deployed to multiple cloud provider’s platforms using a common compute construct such as a Linux-based VM (e.g., Amazon EC2, Azure VM, or Google Cloud Compute Engine) or on Kubernetes (e.g., AWS’s EKS, Google Cloud’s GKE, or Azure’ AKS). The ISV product’s data-tier would also be built on a database engine common to most major cloud providers, such as MySQL or PostgreSQL. Similar to the SaaS with Remote Agents model, the ISV’s environment act as a single portal to the customer’s multiple environments and decentralized data sources.

SaaS Façade model with a multi-cloud configuration

In this scenario, the ISV product’s front-end and back-end are common and independent of the cloud provider’s platform. The customer-managed administration interface is also common. Potentially, only the ISV’s deployment, configuration, and monitoring elements may need to have aspects specific to each cloud provider’s platform. For example, Kubernetes is common to AWS, Google Cloud, and Azure. However, the authentication methods, IaC, and API commands to provision a Kubernetes cluster or deploy a containerized application differ between EKS, GKE, and AKS.

Conclusion

In this post, we briefly explored several models used by ISV’s to deliver, manage, and support their software product for cloud-native customers. As cloud adoption continues to grow and the complexity of cloud-based application platforms continues to evolve, ISVs will continue to develop new models for distributing their software products in the cloud.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. Introduction image – Copyright: melpomen (123rf.com).

, , , , ,

Leave a comment

Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS

Import data from an Amazon RDS database into an Amazon S3-based data lake using Amazon EKS, Amazon MSK, and Apache Kafka Connect

Introduction

A data lake, according to AWS, is a centralized repository that allows you to store all your structured and unstructured data at any scale. Data is collected from multiple sources and moved into the data lake. Once in the data lake, data is organized, cataloged, transformed, enriched, and converted to common file formats, optimized for analytics and machine learning.

One of an organization’s first challenges when building a data lake is how to continually import data from different data sources, such as relational and non-relational database engines, enterprise ERP, SCM, CRM, and SIEM software, flat-files, messaging platforms, IoT devices, and logging and metrics collection systems. Each data source will have its own unique method of connectivity, security, data storage format, and data export capabilities. There are many closed- and open-source tools available to help extract data from different data sources.

A popular open-source tool is Kafka Connect, part of the Apache Kafka ecosystem. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. Kafka Connect makes it simple to quickly define connectors that move large collections of data into and out of Kafka.

In the following post, we will learn how to use Kafka Connect to export data from our data source, an Amazon RDS for PostgreSQL relational database, into Kafka. We will then export that data from Kafka into our data sink — a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 will be converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect.

Best of all, to maintain data freshness of the data lake, as data is added or updated in PostgreSQL, Kafka Connect will automatically detect those changes and stream those changes into the data lake. This process is commonly referred to as Change Data Capture (CDC).

High-level architecture for this post’s demonstration

Change Data Capture

According to Gunnar Morling, Principal Software Engineer at Red Hat who works on the Debezium and Hibernate projects and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

Joker 2021: Change data capture pipelines with Debezium and Kafka Streams (image: YouTube)

You can find another good explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.

Query-based vs. Log-based CDC

To effectively demonstrate the difference between query-based and log-based CDC, examine the results of a SQL UPDATE statement, captured with both methods.

UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;

Here is how the change is represented as a JSON message payload using the query-based CDC method described in this post.

{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}

Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.

{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}

In an upcoming post, we will explore Debezium along with Apache Arvo and a schema registry to build a log-based CDC solution using PostgreSQL’s write-ahead log (WAL). In this post, we will examine query-based CDC using the ‘update timestamp’ technique.

Kafka Connect Connectors

In this post, we will use source and sink connectors from Confluent. Confluent is the undisputed leader in providing enterprise-grade managed Kafka through their Confluent Cloud and Confluent Platform products. Confluent offers dozens of source and sink connectors that cover the most popular data sources and sinks. Connectors used in this post will include:

  • Confluent’s Kafka Connect JDBC Source connector imports data from any relational database with a JDBC driver into an Apache Kafka topic. The Kafka Connect JDBC Sink connector exports data from Kafka topics to any relational database with a JDBC driver.
  • Confluent’s Kafka Connect Amazon S3 Sink connector exports data from Apache Kafka topics to S3 objects in either Avro, Parquet, JSON, or Raw Bytes.

Prerequisites

This post will focus on data movement with Kafka Connect, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:

  1. Amazon RDS for PostgreSQL instance (data source);
  2. Amazon S3 bucket (data sink);
  3. Amazon MSK cluster;
  4. Amazon EKS cluster;
  5. Connectivity between the Amazon RDS instance and Amazon MSK cluster;
  6. Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
  7. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true. This setting is false by default;
  8. IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);

As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.

Source Code

All source code for this post, including the Kafka Connect configuration files and the Helm chart, is open-sourced and located on GitHub.

Authentication and Authorization

Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK, Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.

Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (IRSA) allows EKS to access MSK and S3 using IAM (see more details below).

Sample PostgreSQL Database

There are many sample PostgreSQL databases we could use to explore Kafka Connect. One of my favorite, albeit a bit dated, is PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytics costs.

Pagila database schema diagram

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.

Last Updated Trigger

Each table in the Pagila database has a last_update field. A convenient way to detect changes in the Pagila database, and ensure those changes make it from RDS to S3, is to have Kafka Connect use the last_update field. This is a common technique to determine if and when changes were made to data using query-based CDC.

As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.

CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();

Kubernetes-based Kafka Connect

There are several options for deploying and managing Kafka Connect and other required Kafka management tools to Kubernetes on Amazon EKS. Popular solutions include Strimzi and Confluent for Kubernetes (CFK) or building your own Docker Image using the official Apache Kafka binaries. For this post, I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries. I then installed Confluent’s connectors and their dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf OSS container, building your own image can really teach you how Kafka and Kafka Connect work, in my opinion.

If you chose to use the same Kafka Connect Image used in this post, a Helm Chart is included in the post’s GitHub repository. The Helm chart will deploy a single Kubernetes pod to the kafka Namespace on Amazon EKS.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.0.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent

Before deploying the chart, update the value.yaml file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on MSK and to S3 from Kafka Connect running on EKS.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kafka-cluster:*",
"Resource": [
"arn:aws:kafka:us-east-1:111222333444:cluster/*/*",
"arn:aws:kafka:us-east-1:111222333444:group/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:transactional-id/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:topic/*/*/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>/*"
}
]
}

Once the Service Account variable is updated, use the following command to deploy the Helm chart:

helm install kafka-connect-msk ./kafka-connect-msk \
--namespace $NAMESPACE --create-namespace

To get a shell to the running Kafka Connect container, use the following kubectl exec command:

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash
Interacting with Kafka Connect container running on EKS

Configure Bootstrap Brokers

Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone and distributed modes. Since we will use Kafka Connect’s distributed mode, modify the config/connect-distributed.properties file. A complete sample of the configuration file I used in this post is shown below.

Kafka Connect will run within the pod’s container, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:

# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')

Update the config/connect-distributed.properties file.

# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

For convenience when executing Kafka commands, set the BBROKERS environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:

export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"

Confirm Access to Amazon MSK from Kafka Connect

To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties

You can also try listing the existing Kafka consumer groups:

bin/kafka-consumer-groups.sh --list \
  --bootstrap-server $BBROKERS \
  --command-config config/client-iam.properties

If either of these fails, you will likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.

Kafka Connect

I recommend starting Kafka Connect as a background process using either method shown below.

bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
# alternately use nohup
nohup bin/connect-distributed.sh \
config/connect-distributed.properties &

To confirm Kafka Connect started properly, immediately tail the connect.log file. The log will capture any startup errors for troubleshooting.

tail -f logs/connect.log
Kafka Connect log showing Kafka Connect starting as a background process

You can also examine the background process with the ps command to confirm Kafka Connect is running. Note the process with PID 4915, below. Use the kill command along with the PID to stop Kafka Connect if necessary.

Kafka Connect running as a background process

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, the first time it starts up, as defined in the config/connect-distributed.properties file: connect-configs, connect-offsets, and connect-status. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-

Kafka Connect Connectors

This post demonstrates three progressively more complex Kafka Connect source and sink connectors. Each will demonstrate different connector capabilities to import/export and transform data between Amazon RDS for PostgreSQL and Amazon S3.

Connector Source #1

Create a new file (or modify the existing file if using my Kafka Connect container) named config/jdbc_source_connector_postgresql_00.json. Modify lines 3–5, as shown below, to reflect your RDS instance’s JDBC connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.public.",
"poll.interval.ms": 5000,
"mode": "timestamp",
"catalog.pattern": "public",
"table.whitelist": "address, city, country",
"timestamp.column.name": "last_update"
}

This first Kafka Connect source connector uses Confluent’s Kafka Connect JDBC Source connector (io.confluent.connect.jdbc.JdbcSourceConnector) to export data from RDS with a JDBC driver and import that data into a series of Kafka topics. We will be exporting data from three tables in Pagila’s public schema: address, city, and country. We will write that data to a series of topics, arbitrarily prefixed with database name and schema, pagila.public.. The source connector will create the three new topics automatically: pagila.public.address , pagila.public.city , and pagila.public.country.

Note the connector’s mode property value is set to timestamp, and the last_update field is referenced in the timestamp.column.name property. Recall we added the database function and triggers to these three tables earlier in the post, which will update the last_update field whenever a record is created or updated in the Pagila database. In addition to an initial export of the entire table, the source connector will poll the database every 5 seconds (poll.interval.ms property), looking for changes that are newer than the most recently exported last_modified date. This is accomplished by the source connector, using a parameterized query, such as:

SELECT *
FROM "public"."address"
WHERE "public"."address"."last_update" > ?
AND "public"."address"."last_update" < ?
ORDER BY "public"."address"."last_update" ASC

Connector Sink #1

Next, create and configure the first Kafka Connect sink connector. Create a new file or modify config/s3_sink_connector_00.json. Modify line 7, as shown below to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "pagila.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "your-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}

This first Kafka Connect sink connector uses Confluent’s Kafka Connect Amazon S3 Sink connector (io.confluent.connect.s3.S3SinkConnector) to export data from Kafka topics to Amazon S3 objects in JSON format.

Deploy Connectors #1

Deploy the source and sink connectors using the Kafka Connect REST Interface. Many tutorials demonstrate a POST method against the /connectors endpoint. However, this then requires a DELETE and an additional POST to update the connector. Using a PUT against the /config endpoint, you can update the connector without first issuing a DELETE.

curl -s -d @"config/jdbc_source_connector_postgresql_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/config | jq
curl -s -d @"config/s3_sink_connector_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_00/config | jq

You can confirm the source and sink connectors are deployed and running using the following commands:

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_00/status | jq
Kafka Connect source connector running successfully

Errors preventing the connector from starting correctly will be displayed using the /status endpoint, as shown in the example below. In this case, the Kubernetes Service Account associated with the pod lacked the proper IAM permissions to the Amazon S3 target bucket.

Kafka Connect sink connector failed to run with errors

Confirming Success of Connectors #1

The entire contents of the three tables will be exported from RDS to Kafka by the source connector, then exported from Kafka to S3 by the sink connector. To confirm the source connector worked, verify the existence of three new Kafka topics that should have been created: pagila.public.address, pagila.public.city, and pagila.public.country.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep pagila.public.

To confirm the sink connector worked, verify the new S3 objects have been created in the data lake’s S3 bucket. If you use the AWS CLI v2’s s3 API, we can view the contents of our target S3 bucket:

aws s3api list-objects \
--bucket your-s3-bucket \
--query 'Contents[].{Key: Key}' \
--output text

You should see approximately 15 new S3 objects (JSON files) in the S3 bucket, whose keys are organized by their topic names. The sink connector flushes new data to S3 every 100 records, or 60 seconds.

topics/pagila.public.address/partition=0/pagila.public.address+0+0000000000.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000200.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000300.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000400.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000500.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000600.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000000.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000100.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000200.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000300.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000400.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000500.json
topics/pagila.public.country/partition=0/pagila.public.country+0+0000000000.json
topics/pagila.public.country/partition=0/pagila.public.country+0+0000000100.json

You could also use the AWS Management Console to view the S3 bucket’s contents.

Amazon S3 bucket showing results of Kafka Connect S3 sink connector, organized by topic names

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

For example, the address table’s data will look similar to the following using the ‘Query with S3 Select’ feature via the console or API:

{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000
}

Congratulations, you have successfully imported data from a relational database into your data lake using Kafka Connect!

Connector Source #2

Create a new file or modify config/jdbc_source_connector_postgresql_01.json. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.public.alt.",
"poll.interval.ms": 5000,
"mode": "timestamp",
"timestamp.column.name": "last_update",
"catalog.pattern": "public",
"table.whitelist": "address",
"numeric.mapping": "best_fit",
"transforms": "createKey,extractInt,InsertTopic,InsertSourceDetails",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "address_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "address_id",
"validate.non.null": "false",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertSourceDetails.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field": "message_source",
"transforms.InsertSourceDetails.static.value": "pagila"
}

This second Kafka Connect source connector also uses Confluent’s Kafka Connect JDBC Source connector to export data from the just address table with a JDBC driver and import that data into a new Kafka topic, pagila.public.alt.address. The difference with this source connector is transforms, known as Single Message Transformations (SMTs). SMTs are applied to messages as they flow through Connect from RDS to Kafka.

In this connector, there are four transforms, which perform the following common functions:

  1. Extract address_id integer field as the Kafka message key, as detailed in this blog post by Confluence (see ‘Setting the Kafka message key’).
  2. Append Kafka topic name into message as a new static field;
  3. Append database name into message as a new static field;

Connector Sink #2

Create a new file or modify config/s3_sink_connector_01.json. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics": "pagila.public.alt.address",
"s3.region": "us-east-1",
"s3.bucket.name": "you-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}

This second sink connector is nearly identical to the first sink connector, except it only exports data from a single Kafka topic, pagila.public.alt.address, into S3.

Deploy Connectors #2

Deploy the second set of source and sink connectors using the Kafka Connect REST Interface, exactly like the first pair.

curl -s -d @"config/jdbc_source_connector_postgresql_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/config | jq
curl -s -d @"config/s3_sink_connector_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_01/config | jq

Confirming Success of Connectors #2

Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first set, for a total of four connectors.

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_01/status | jq
Kafka Connect source and sink connectors running successfully

To view the results of the first transform, extracting the address_id integer field as the Kafka message key, we can use a Kafka command-line consumer:

bin/kafka-console-consumer.sh \
--topic pagila.public.alt.address \
--offset 102 --partition 0 --max-messages 5 \
--property print.key=true --property print.value=true \
--property print.offset=true --property print.partition=true \
--property print.headers=false --property print.timestamp=false \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties

In the output below, note the beginning of each message, which displays the Kafka message key, identical to the address_id. For example, {"type":"int32","optional":false},"payload":100}.

Output showing messages in the Kafka pagila.public.alt.address topic

Examing the Amazon S3 bucket using the AWS Management Console or the CLI, you should note the fourth set of S3 objects within the /topics/pagila.public.alt.address/ object key prefix.

Amazon S3 bucket showing JSON-format files containing address data

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.alt.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

In the sample data below, note the two new fields that have been appended into each record, a result of the Kafka Connector transforms:

{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}

Congratulations, you have successfully imported more data from a relational database into your data lake, including performing a simple series of transforms using Kafka Connect!

Connector Source #3

Create or modify config/jdbc_source_connector_postgresql_02.json. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.query",
"poll.interval.ms": 5000,
"mode": "timestamp",
"timestamp.column.name": "last_update",
"query": "SELECT * FROM (SELECT a.address_id, a.address, a.address2, city.city, a.district, a.postal_code, country.country, a.phone, a.last_update FROM address AS a INNER JOIN city ON a.city_id = city.city_id INNER JOIN country ON country.country_id = city.country_id ORDER BY address_id) AS subquery",
"incrementing.column.name": "address_id",
"transforms": "createKey,extractInt,InsertTopic,InsertSourceDetails",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "address_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "address_id",
"validate.non.null": "false",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertSourceDetails.type": "org.apache.kafka.connect.transforms.InsertField$Value",
}

Unlike the first two source connectors that export data from tables, this connector uses a SELECT query to export data from the Pagila database’s address , city, and country tables and import the results of that SQL query data into a new Kafka topic, pagila.public.alt.address. The SQL query in the source connector’s configuration is as follows:

SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses

The final parameterized query, executed by the source connector, which allows it to detect changes based on the last_update field is as follows:

SELECT *
FROM (SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
WHERE "last_update" > ?
AND "last_update" < ?
ORDER BY "last_update" ASC

Connector Sink #3

Create or modify config/s3_sink_connector_02.json. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics": "pagila.query",
"s3.region": "us-east-1",
"s3.bucket.name": "your-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "gzip",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"partition.field.name": "country",
"schema.compatibility": "NONE",
"transforms": "RenameField, insertStaticField1,insertStaticField2,insertStaticField3",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "district:state_province",
"transforms.insertStaticField1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "message_source",
"transforms.insertStaticField1.static.value": "pagila",
"transforms.insertStaticField2.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField2.static.field": "message_source_engine",
"transforms.insertStaticField2.static.value": "postgresql",
"transforms.insertStaticField3.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField3.static.field": "environment",
"transforms.insertStaticField3.static.value": "development"
}

This sink connector is significantly different than the previous two sink connectors. In addition to leveraging SMTs in the corresponding source connector, we are also using them in this sink connector. The sink connect appends three arbitrary static fields to each record as it is written to Amazon S3 — message_source, message_source_engine, and environment using the InsertField transform. The sink connector also renames the district field to state_province using the ReplaceField transform.

The first two sink connectors wrote uncompressed JSON-format files to Amazon S3. This third sink connector optimizes the data imported into S3 for downstream data analytics. The sink connector writes GZIP-compressed Apache Parquet files to Amazon S3. In addition, the compressed Parquet files are partitioned by the country field. Using a columnar file format, compression, and partitioning, queries against the data should be faster and more efficient.

Deploy Connectors #3

Deploy the final source and sink connectors using the Kafka Connect REST Interface, exactly like the first two pairs.

curl -s -d @"config/jdbc_source_connector_postgresql_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/config | jq
curl -s -d @"config/s3_sink_connector_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_02/config | jq

Confirming Success of Connectors #3

Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first two sets, for a total of six connectors.

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_02/status | jq
Kafka Connect source and sink connectors running successfully

Reviewing the messages within the newpagila.query topic, note the message_topic field has been appended to the message by the source connector but not message_source, message_source_engine, and environment fields. The sink connector appends these fields as it writes the messages to S3. Also, note the district field has yet to be renamed by the sink connector to state_province.

Output showing messages in the Kafka pagila.query topic

Examing the Amazon S3 bucket, again, you should note the fifth set of S3 objects within the /topics/pagila.query/ object key prefix. The Parquet-format files within are partitioned by country.

Amazon S3 bucket showing data partitioned by Country

Within each country partition, there are Parquet files whose records contain addresses within those countries.

Amazon S3 bucket showing GZIP-compressed Apache Parquet-format files

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.query/country=United States/pagila.query+0+0000000003.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

In the sample data below, note the four new fields that have been appended into each record, a result of the source and sink connector SMTs. Also, note the renamed district field:

{
"address_id": 599,
"address": "1895 Zhezqazghan Drive",
"address2": "",
"city": "Garden Grove",
"state_province": "California",
"postal_code": "36693",
"country": "United States",
"phone": "137809746111",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 6,
"address": "1121 Loja Avenue",
"address2": "",
"city": "San Bernardino",
"state_province": "California",
"postal_code": "17886",
"country": "United States",
"phone": "838635286649",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 18,
"address": "770 Bydgoszcz Avenue",
"address2": "",
"city": "Citrus Heights",
"state_province": "California",
"postal_code": "16266",
"country": "United States",
"phone": "517338314235",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}

Record Updates and Query-based CDC

What happens when we change data within the tables that Kafka Connect is polling every 5 seconds? To answer this question, let’s make a few DML changes:

-- update address field
UPDATE public.address
SET address = '123 CDC Test Lane'
WHERE address_id = 100;
-- update address2 field
UPDATE public.address
SET address2 = 'Apartment #2201'
WHERE address_id = 101;
-- second update to same record
UPDATE public.address
SET address2 = 'Apartment #2202'
WHERE address_id = 101;

-- insert new country
INSERT INTO public.country (country)
values ('Wakanda');
-- should be 110
SELECT country_id FROM country WHERE country='Wakanda';
-- insert new city
INSERT INTO public.city (city, country_id)
VALUES ('Birnin Zana', 110);
-- should be 601
SELECT city_id FROM public.city WHERE country_id=110;
-- update city_id to new city_id
UPDATE public.address
SET phone = city_id = 601
WHERE address_id = 102;
-- second update to same record
UPDATE public.address
SET district = 'Lake Turkana'
WHERE address_id = 102;
-- delete an address record
UPDATE public.customer
SET address_id = 200
WHERE customer_id IN (
SELECT customer_id FROM customer WHERE address_id = 104);
DELETE
FROM public.address
WHERE address_id = 104;

To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The three Kafka Connect source connectors detect changes, which are exported from PostgreSQL to Kafka. The three sink connectors then write these changes to new JSON and Parquet files to the target S3 bucket.

Kafka Connect log showing changes to Pagila database being exported/imported

Viewing Data in the Data Lake

A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

AWS Glue Data Catalog showing five new tables, the result of the AWS Glue Crawler

When writing Parquet into partitions, one shortcoming of the Kafka Connect S3 sink connector is duplicate column names in AWS Glue. As a result, any columns used as partitions are duplicated in the Glue Data Catalog’s database table schema. The issue will result in an error similar to HIVE_INVALID_METADATA: Hive metadata for table pagila_query is invalid: Table descriptor contains duplicate columns when performing queries. To remedy this, predefine the table and the table’s schema. Alternately, edit the Glue Data Catalog table’s schema after crawling and remove the duplicate, non-partition column(s). Below, that would mean removing duplicate country column 7.

AWS Glue Data Catalog table schema showing duplicate column

Performing a typical SQL SELECT query in Athena will return all of the original records as well as the changes we made earlier as duplicate records (same address_id primary key).

Amazon Athena showing the SQL query and the result set
SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM "pagila_kafka_connect"."pagila_query"
WHERE address_id BETWEEN 100 AND 105
ORDER BY address_id;

Note the original records for address_id 100–103 as well as each change we made earlier. The last_update field reflects the date and time the record was created or updated. Also, note the record with address_id 104 in the query results. This is the record we deleted from the Pagila database.

address_id address address2 city state_province postal_code country last_update
100 1308 Arecibo Way Augusta-Richmond County Georgia 30695 United States 2017-02-15 09:45:30.000
100 123 CDC Test Lane Augusta-Richmond County Georgia 30695 United States 2021-08-09 14:10:29.126
101 1599 Plock Drive Tete Tete 71986 Mozambique 2017-02-15 09:45:30.000
101 1599 Plock Drive Apartment #2201 Tete Tete 71986 Mozambique 2021-08-09 14:10:29.467
101 1599 Plock Drive Apartment #2202 Tete Tete 71986 Mozambique 2021-08-09 14:19:03.761
102 669 Firozabad Loop al-Ayn Abu Dhabi 92265 United Arab Emirates 2017-02-15 09:45:30.000
102 669 Firozabad Loop Birnin Zana Abu Dhabi 92265 Wakanda 2021-08-09 14:10:29.789
102 669 Firozabad Loop Birnin Zana Lake Turkana 92265 Wakanda 2021-08-09 15:56:53.323
103 588 Vila Velha Manor Kimchon Kyongsangbuk 51540 South Korea 2017-02-15 09:45:30.000
104 1913 Kamakura Place Jelets Lipetsk 97287 Russian Federation 2017-02-15 09:45:30.000
105 733 Mandaluyong Place Abha Asir 77459 Saudi Arabia 2017-02-15 09:45:30.000

To view only the most current data, we can use Athena’s ROW_NUMBER() function:

SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1
AND address_id BETWEEN 100 AND 105
ORDER BY address_id;

Now, we only see the latest records. Unfortunately, the record we deleted with address_id 104 is still present in the query results.

address_id address address2 city state_province postal_code country last_update
100 123 CDC Test Lane Augusta-Richmond County Georgia 30695 United States 2021-08-09 14:10:29.126
101 1599 Plock Drive Apartment #2202 Tete Tete 71986 Mozambique 2021-08-09 14:19:03.761
102 669 Firozabad Loop Birnin Zana Lake Turkana 92265 Wakanda 2021-08-09 15:56:53.323
103 588 Vila Velha Manor Kimchon Kyongsangbuk 51540 South Korea 2017-02-15 09:45:30.000
104 1913 Kamakura Place Jelets Lipetsk 97287 Russian Federation 2017-02-15 09:45:30.000
105 733 Mandaluyong Place Abha Asir 77459 Saudi Arabia 2017-02-15 09:45:30.000

Using log-based CDC with Debezium, as opposed to query-based CDC, we would have received a record in S3 that indicated the delete. The null value message, shown below, is referred to as a tombstone message in Kafka. Note the ‘before’ syntax with the delete record as opposed to the ‘after’ syntax we observed earlier with the update record.

{
"before": {
"address": "",
"address2": null,
"phone": "",
"district": "",
"last_update": "1970-01-01T00:00:00Z",
"address_id": 104,
"postal_code": null,
"city_id": 0
},

"source": {
"schema": "public",
"sequence": "[\"1101256482032\",\"1101256482032\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1101256483936,
"name": "pagila",
"txId": 17137,
"version": "1.6.1.Final",
"ts_ms": 1628864251512,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "d",
"ts_ms": 1628864251671
}

An inefficient solution to duplicates and deletes with query-based CDC would be to bulk ingest the entire query result set from the Pagila database each time instead of only the changes based on the last_update field. Performing an unbounded query repeatedly on a huge dataset would negatively impact database performance. Notwithstanding, you would still end up with duplicates in the data lake unless you first purged the data in S3 before re-importing the new query results.

Data Movement

Using Amazon Athena, we can easily write the results of our ROW_NUMBER() query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT (CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

High-level architecture for this post’s demonstration

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by country, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.

CREATE TABLE pagila_kafka_connect.pagila_query_processed
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['country'],
external_location='s3://your-s3-bucket/processed/pagila_query'
) AS
SELECT address_id, last_update, address, address2, city,
state_province, postal_code, country
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_update DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1 AND address_id BETWEEN 0 and 100
ORDER BY address_id;

Examing the Amazon S3 bucket, on last time, you should new set of S3 objects within the /processed/pagila_query/ key path. The Parquet-format files, partitioned by country, are the result of the CTAS query.

Amazon S3 bucket showing SNAPPY-compressed Parquet-format files containing CTAS query results

We should now see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS query. We can perform additional queries on the processed data.

Amazon Athena showing query results from the processed data table in AWS Glue Data Catalog

ACID Transactions with a Data Lake

To fully take advantage of CDC and maximize the freshness of data in the data lake, we would also need to adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level updates and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.

Conclusion

This post explored how CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon EKS, Amazon MSK, and Apache Kafka Connect. We learned about query-based CDC for capturing ongoing changes to the source data. In a subsequent post, we will explore log-based CDC using Debezium and see how data lake file formats like Apache Avro, Apache Hudi, Apache Iceberg, and Delta Lake can help us manage the data in our data lake.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

Employing Amazon Macie to Discover and Protect Sensitive Data in your Amazon S3-based Data Lake

Introduction

Working with Analytics customers, it’s not uncommon to see data lakes with a dozen or more discrete data sources. Data typically originates from sources both internal and external to the customer. Internal data may come from multiple teams, departments, divisions, and enterprise systems. External data comes from vendors, partners, public sources, and subscriptions to licensed data sources. The volume, velocity, variety, veracity, and method of delivery vary across the data sources. All this data is being fed into data lakes for purposes such as analytics, business intelligence, and machine learning.

Given the growing volumes of incoming data and variations amongst data sources, it is increasingly complex, expensive, and time-consuming for organizations to ensure compliance with relevant laws, policies, and regulations. Regulations that impact how data is handled in a data lake include the Organizations Health Insurance Portability and Accountability Act (HIPAA), General Data Privacy Regulation (GDPR), Payment Card Industry Data Security Standard (PCI DSS), California Consumer Privacy Act (CCPA), and the Federal Information Security Management Act (FISMA).

Data Lake

AWS defines a data lake as a centralized repository that allows you to store all your structured and unstructured data at any scale. Once in the data lake, you run different types of analytics — from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

Data in a data lake is regularly organized or separated by its stage in the analytics process. Incoming data is often referred to as raw data. Data is then processed — cleansed, filtered, enriched, and tokenized if necessary. Lastly, the data is analyzed and aggregated, and the results are written back to the data lake. The analyzed and aggregated data is used to build business intelligence dashboards and reports, machine learning models, and is delivered to downstream or external systems. The different categories of data — raw, processed, and aggregated, are frequently referred to as bronze, silver, and gold, a reference to their overall data quality or value.

Protecting the Data Lake

Imagine you’ve received a large volume of data from an external data source. The incoming data is cleansed, filtered, and enriched. The data is re-formatted, partitioned, compressed for analytical efficiency, and written back to the data lake. Your analytics pipelines run complex and time-consuming queries against the data. Unfortunately, while building reports for a set of stakeholders, you realize that the original data accidentally included credit card information and other sensitive information about your customers. In addition to being out of compliance, you have the wasted time and expense of the initial data processing, as well as the extra time and expense to replace and re-process the data. The solution — Amazon Macie.

Amazon Macie

According to AWS, Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to discover and protect your sensitive data stored in Amazon Simple Storage Service (Amazon S3). Macie’s alerts, or findings, can be searched, filtered, and sent to Amazon EventBridge, formerly called Amazon CloudWatch Events, for easy integration with existing workflow or event management systems, or to be used in combination with AWS services, such as AWS Step Functions or Amazon Managed Workflows for Apache Airflow (MWAA) to take automated remediation actions.

Amazon Macie’s Summary view

Data Discovery and Protection

In this post, we will deploy an automated data inspection workflow to examine sample data in an S3-based data lake. Amazon Macie will examine data files uploaded to an encrypted S3 bucket. If sensitive data is discovered within the files, the files will be moved to an encrypted isolation bucket for further investigation. Email and SMS text alerts will be sent. This workflow will leverage Amazon EventBridge, Amazon Simple Notification Service (Amazon SNS), AWS Lambda, and AWS Systems Manager Parameter Store.

Macie data inspection workflow architecture

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/macie-demo.git

AWS resources for this post can be deployed using AWS Cloud​Formation. To follow along, you will need recent versions of Python 3, Boto3, and the AWS CLI version 2, installed.

Sample Data

We will use synthetic patient data, freely available from the MITRE Corporation. The data was generated by Synthea, MITRE’s open-source, synthetic patient generator that models the medical history of synthetic patients. Synthea data is exported in a variety of data standards, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Download and unzip the CSV files from the Synthea website.

REMOTE_FILE="synthea_sample_data_csv_apr2020.zip"
wget "https://storage.googleapis.com/synthea-public/${REMOTE_FILE}"
unzip -j "${REMOTE_FILE}" -d synthea_data/

The sixteen CSV data files contain a total of 471,852 rows of data, including column headers.

> wc -l *.csv

      598 allergies.csv
    3,484 careplans.csv
    8,377 conditions.csv
       79 devices.csv
   53,347 encounters.csv
      856 imaging_studies.csv
   15,479 immunizations.csv
   42,990 medications.csv
  299,698 observations.csv
    1,120 organizations.csv
    1,172 patients.csv
    3,802 payer_transitions.csv
       11 payers.csv
   34,982 procedures.csv
    5,856 providers.csv
        1 supplies.csv
  ------------------------------
  471,852 total

Amazon Macie Custom Data Identifier

To demonstrate some of the advanced features of Amazon Macie, we will use three Custom Data Identifiers. According to Macie’s documentation, a custom data identifier is a set of criteria that you define that reflects your organization’s particular proprietary data — for example, employee IDs, customer account numbers, or internal data classifications. We will create three custom data identifiers to detect the specific Synthea-format Patient ID, US driver number, and US passport number columns.

Post’s three custom data identifiers

The custom data identifiers in this post use a combination of regular expressions (regex) and keywords. The identifiers are designed to work with structured data, such as CSV files. Macie reports text that matches the regex pattern if any of these keywords are in the name of the column or field that stores the text, or if the text is within the maximum match distance of one of these words in a field value. Macie supports a subset of the regex pattern syntax provided by the Perl Compatible Regular Expressions (PCRE) library.

Patient ID custom data identifier console

Enable Macie

Before creating a CloudFormation stack with this demonstration’s resources, you will need to enable Amazon Macie from the AWS Management Console, or use the macie2 API and the AWS CLI with the enable-macie command.

aws macie2 enable-macie

Macie can also be enabled for your multi-account AWS Organization. The enable-organization-admin-account command designates an account as the delegated Amazon Macie administrator account for an AWS organization. For more information, see Managing multiple accounts in Amazon Macie.

AWS_ACCOUNT=111222333444
aws macie2 enable-organization-admin-account \
--admin-account-id ${AWS_ACCOUNT}

CloudFormation Stack

To create the CloudFormation stack with the supplied template, cloudformation/macie_demo.yml, run the following AWS CLI command. You will need to include an email address and phone number as input parameters. These parameter values will be used to send email and text alerts when Macie produces a sensitive data finding.

Please make sure you understand all the potential cost and security implications of creating the CloudFormation stack before continuing.

SNS_PHONE="+12223334444"
SNS_EMAIL="your-email-address@email.com"

aws cloudformation create-stack \
--stack-name macie-demo \
--template-body file://cloudformation/macie_demo.yml \
--parameters ParameterKey=SNSTopicEndpointSms,ParameterValue=${SNS_PHONE} \
ParameterKey=SNSTopicEndpointEmail,ParameterValue=${SNS_EMAIL} \
--capabilities CAPABILITY_NAMED_IAM

As shown in the AWS CloudFormation console, the new macie-demo stack will contain twenty-one AWS resources.

CloudFormation stack successfully created

Upload Data

Next, with the stack deployed, upload the CSV format data files to the encrypted S3 bucket, representing your data lake. The target S3 bucket has the following naming convention, synthea-data-<aws_account_id>-<region>. You can retrieve the two new bucket names from AWS Systems Manager Parameter Store, which were written there by CloudFormation, using the ssm API.

aws ssm get-parameters-by-path \
--path /macie_demo/ \
--query 'Parameters[*].Value'

Use the following ssm and s3 API commands to upload the data files.

DATA_BUCKET=$(aws ssm get-parameter \
--name /macie_demo/patient_data_bucket \
--query 'Parameter.Value')
aws s3 cp synthea_data/ \
    "s3://$(eval echo ${DATA_BUCKET})/patient_data/" --recursive

You should end up with sixteen CSV files in the S3 bucket, totaling approximately 82.3 MB.

Synthea patient data files uploaded to in S3

Sensitive Data Discovery Jobs

With the CloudFormation stack created and the patient data files uploaded, we will create two sensitive data discovery jobs. These jobs will scan the contents of the encrypted S3 bucket for sensitive data and report the findings. According to the documentation, you can configure a sensitive data discovery job to run only once for on-demand analysis and assessment, or on a recurring basis for periodic analysis, assessment, and monitoring. For this demonstration, we will create a one-time sensitive data discovery job using the AWS CLI. We will also create a recurring sensitive data discovery job using the AWS SDK for Python (Boto3). Both jobs can also be created from within Macie’s Jobs console.

Creating a new job Macie’s Jobs console

For both sensitive data discovery jobs, we will include the three custom data identifiers. Each of the custom data identifiers has a unique ID. We will need all three IDs to create the two sensitive data discovery jobs. You can use the AWS CLI and the macie2 API to retrieve the values.

aws macie2 list-custom-data-identifiers --query 'items[*].id'

Next, modify the job_specs/macie_job_specs_1x.json file, adding the three custom data identifier IDs. Also, update your AWS account ID and S3 bucket name (lines 3–5, 12, and 14). Note that since all the patient data files are in CSV format, we will limit our inspection to only files with a csv file extension (lines 18–33).

{
"customDataIdentifierIds": [
"custom-data-identifier-id-1",
"custom-data-identifier-id-2",
"custom-data-identifier-id-3"
],
"description": "Review Synthea patient data (1x)",
"jobType": "ONE_TIME",
"s3JobDefinition": {
"bucketDefinitions": [
{
"accountId": "111222333444",
"buckets": [
"synthea-data-111222333444-us-east-1"
]
}
],
"scoping": {
"includes": {
"and": [
{
"simpleScopeTerm": {
"comparator": "EQ",
"key": "OBJECT_EXTENSION",
"values": [
"csv"
]
}
}
]
}
}
},
"tags": {
"KeyName": "Project",
"KeyValue": "Amazon Macie Demo"
}
}

The above JSON template was generated using the standard AWS CLI generate-cli-skeleton command.

aws macie2 create-classification-job --generate-cli-skeleton

To create a one-time sensitive data discovery job using the above JSON template, run the following AWS CLI command. The unique job name will be dynamically generated based on the current time.

aws macie2 create-classification-job \
--name $(echo "SyntheaPatientData_${EPOCHSECONDS}") \
--cli-input-json file://job_specs/macie_job_specs_1x.json

In the Amazon Macie Jobs console, we can see a one-time sensitive data discovery job running. With a sampling depth of 100, the job will take several minutes to run. The samplingPercentage job property can be adjusted to scan any percentage of the data. If this value is less than 100, Macie selects the objects to analyze at random, up to the specified percentage and analyzes all the data in those objects.

One-time sensitive data discovery job running

Once the job is completed, the findings will be available in Macie’s Findings console. Using the three custom data identifiers in addition to Macie’s managed data identifiers, there should be a total of fifteen findings from the Synthea patient data files in S3. There should be six High severity findings and nine Medium severity findings. Of those, three are of a Personal finding type, seven of a Custom Identifier finding type, and five of a Multiple finding type, having both Personal and Custom Identifier finding types.

Macie’s Findings console displaying the results of the one-time job

Isolating High Severity Findings

The data inspection workflow we have deployed uses an AWS Lambda function, macie-object-mover, to isolate all data files with High severity findings to a second S3 bucket. The offending files are copied to the isolation bucket and deleted from the source bucket.

#!/usr/bin/env python3
# Purpose: Lambda function that moves S3 objects flagged by Macie
# Author: Gary A. Stafford (March 2021)
import json
import logging
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
def lambda_handler(event, context):
logging.info(f'event: {json.dumps(event)}')
destination_bucket_name = 'macie-isolation-111222333444-us-east-1'
source_bucket_name = event['detail']['resourcesAffected']['s3Bucket']['name']
file_key_name = event['detail']['resourcesAffected']['s3Object']['key']
copy_source_object = {'Bucket': source_bucket_name, 'Key': file_key_name}
logging.debug(f'destination_bucket_name: {destination_bucket_name}')
logging.debug(f'source_bucket_name: {source_bucket_name}')
logging.debug(f'file_key_name: {file_key_name}')
try:
response = s3_client.copy_object(
CopySource=copy_source_object,
Bucket=destination_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
try:
response = s3_client.delete_object(
Bucket=source_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
return {
'statusCode': 200,
'body': json.dumps(copy_source_object)
}

Amazon EventBridge

According to Macie’s documentation, to support integration with other applications, services, and systems, such as monitoring or event management systems, Amazon Macie automatically publishes findings to Amazon EventBridge as finding events. Amazon EventBridge is a serverless event bus that makes it easier to build event-driven applications at scale using events generated from your applications, integrated Software-as-a-Service (SaaS) applications, and AWS services.

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on any of the custom data identifiers, macie-rule-custom, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

Post’s event rules, shown in the Amazon EventBridge console

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on one of the three custom data identifiers, macie-rule-high, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

{
"source": [
"aws.macie"
],
"detail-type": [
"Macie Finding"
],
"detail": {
"classificationDetails": {
"result": {
"customDataIdentifiers": {
"detections": {
"name": [
"Patient ID",
"US Passport",
"US Driver License"
]
}
}
}
}
}
}

Six data files, containing High severity findings, will be moved to the isolation bucket by the Lambda, triggered by EventBridge.

Isolation bucket containing data files with High severity findings

Scheduled Sensitive Data Discovery Jobs

Data sources commonly deliver data on a repeated basis, such as nightly data feeds. For these types of data sources, we can schedule sensitive data discovery jobs to run on a scheduled basis. For this demonstration, we will create a scheduled job using the AWS SDK for Python (Boto3). Unlike the AWS CLI-based one-time job, you don’t need to modify the project’s script, scripts/create_macie_job_daily.py. The Python script will retrieve your AWS account ID and three custom data identifier IDs. The Python script then runs the create_classification_job command.

#!/usr/bin/env python3
# Purpose: Create Daily Macie classification job – Synthea patient data
# Author: Gary A. Stafford (March 2021)
import logging
import sys
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
sts_client = boto3.client('sts')
macie_client = boto3.client('macie2')
def main():
params = get_parameters()
account_id = sts_client.get_caller_identity()['Account']
custom_data_identifiers = list_custom_data_identifiers()
create_classification_job(params['patient_data_bucket'], account_id, custom_data_identifiers)
def list_custom_data_identifiers():
"""Returns a list of all custom data identifier ids"""
custom_data_identifiers = []
try:
response = macie_client.list_custom_data_identifiers()
for item in response['items']:
custom_data_identifiers.append(item['id'])
return custom_data_identifiers
except ClientError as e:
logging.error(e)
sys.exit(e)
def create_classification_job(patient_data_bucket, account_id, custom_data_identifiers):
"""Create Daily Macie classification job"""
try:
response = macie_client.create_classification_job(
customDataIdentifierIds=custom_data_identifiers,
description='Review Synthea patient data (Daily)',
jobType='SCHEDULED',
initialRun=True,
name='SyntheaPatientData_Daily',
s3JobDefinition={
'bucketDefinitions': [
{
'accountId': account_id,
'buckets': [
patient_data_bucket
]
}
],
'scoping': {
'includes': {
'and': [
{
'simpleScopeTerm': {
'comparator': 'EQ',
'key': 'OBJECT_EXTENSION',
'values': [
'csv',
]
}
},
]
}
}
},
samplingPercentage=100,
scheduleFrequency={
'dailySchedule': {}
},
tags={
'Project': 'Amazon Macie Demo'
}
)
logging.debug(f'Response: {response}')
except ClientError as e:
logging.error(e)
sys.exit(e)
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'patient_data_bucket': ssm_client.get_parameter(Name='/macie_demo/patient_data_bucket')['Parameter']['Value']
}
return params
if __name__ == '__main__':
main()

To create the scheduled sensitive data discovery job, run the following command.

python3 ./scripts/create_macie_job_daily.py

The scheduleFrequency parameter is set to { 'dailySchedule': {} }. This value specifies a daily recurrence pattern for running the job. The initialRun parameter of the create_classification_job command is set to True. This will cause the new job to analyze all eligible objects immediately after the job is created, in addition to on a daily basis.

Scheduled sensitive data discovery job in an active/idle state

Conclusion

In this post, we learned how we can use Amazon Macie to discover and protect sensitive data in Amazon S3. We learned how to use automation to trigger alerts based on Macie’s findings and to isolate data files based on the types of findings. The post’s data inspection workflow can easily be incorporated into existing data lake ingestion pipelines to ensure the integrity of incoming data.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Amazon QuickSight Identity Federation with Auth0: Managing QuickSight users with a third-party enterprise identity provider (IdP)

Introduction

As a Solutions Architect working with Analytics customers, I am often asked about integrating Amazon QuickSight with Active Directory or single sign-on with third-party identity providers for user management.

Amazon QuickSight

Amazon QuickSight, according to AWS, is a scalable, serverless, embeddable, machine learning-powered business intelligence (BI) service built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include machine learning-powered insights. QuickSight dashboards can be accessed from any device and seamlessly embedded into your applications, portals, and websites.

Auth0

Auth0 is an easy-to-implement, adaptable authentication and authorization platform. Auth0’s identity and management platform, according to Auth0, provides greater control, superior security, and ease of use. Single Sign-On (SSO), whether through enterprise federation, social log-in, or username and password authentication, according to Auth0, allows users to simply log in once and use all applications they have been granted access to.

Identity Federation

According to AWS, Amazon QuickSight supports identity federation in both Standard and Enterprise editions. With federated identities, you manage users with your enterprise identity provider (IdP) and use AWS Identity and Access Management (IAM) to authenticate users when they sign in to Amazon QuickSight. You can use a third-party identity provider that supports Security Assertion Markup Language 2.0 (SAML 2.0) to provide a simple onboarding flow for your QuickSight users. Such identity providers include Microsoft Active Directory Federation Services (AD FS), Okta, Ping Identity, Duo, Azure AD, and Auth0.

With identity federation, your users get one-click access to their Amazon QuickSight applications using their existing identity credentials. You also have the security benefit of identity authentication by your identity provider. You can control which users have access to Amazon QuickSight using your existing identity provider. Authenticated users can log directly into QuickSight, bypassing the AWS Management Console.

Initiating Sign-On from Amazon QuickSight

In this post’s scenario, a user initiates the sign-on process from the Amazon QuickSight application portal without being signed on to Auth0, the identity provider. The user has an existing federated account managed by Auth0. The user may or may not already have an account on QuickSight. QuickSight sends an authentication request to the IdP, Auth0. After the user is successfully authenticated, QuickSight opens.

For this post, we will assume that you have signed up for the Enterprise Edition of Amazon QuickSight and chosen Use Role Based Federation (SSO) as opposed to Use Active Directory. The Use Role Based Federation (SSO) option will allow us to configure a third-party IdP for identity authentication.

Auth0 Users and Roles

In Auth0’s User Management interface, create three users and their associated roles representing three QuickSight personas: Admin, Author, and Reader. For demonstration purposes, I chose to name the three users based on their QuickSight personas: QuickSightAdmin1, QuickSightAuthor1, and QuickSightReader1.

Next, create three roles: QuickSight-Admin-Role, QuickSight-Author-Role, and QuickSight-Reader-Role. Role names are arbitrary as long as they are identical to the equivalent IAM roles in AWS (created later in the post).

Associate each user with their corresponding role, one user per role. For example, associate the QuickSightDemoAdmin1 user with the QuickSight-Admin-Role role.

Auth0 Application

Next, in Auth0’s Application interface, create a new Regular Web Application. Name the new application, Amazon QuickSight.

On the new application’s Addons tab, enable the SAML2 Web App option.

On the SAML2 Web App Addon’s Settings tab, set the Application Callback URL value to https://signin.aws.amazon.com/saml. Change the JSON blob value for Settings to the following:

{
"audience": "urn:amazon:webservices"
}

The final configuration should match the example shown below.

Switch to the Usage tab. Download the Identity Provider Metadata XML file and note the Identity Provider Login URL value. You will need the metadata file and the URL to configure QuickSight later in the post.

Next, on the Connections tab of the new Amazon QuickSight application, ensure only Username-Password-Authentication is enabled.

Lastly, on the Settings tab, in the Application URIs configuration section, ensure the Allowed Callback URLs value is also set to https://signin.aws.amazon.com/saml.

Auth Pipeline Rule

Next, in Auth0’s Auth Pipeline Rules interface, create a new Empty rule.

Name the new rule: Change QuickSight SAML configuration.

For the Script field value, use the following JavaScript code snippet.

function changeSamlConfiguration(user, context, callback) {
if (context.clientID !== '<your_web_client_id>')
return callback(null, user, context);
const assignedRoles = (context.authorization || {}).roles;
const accountId = '<your_aws_account_id>';
const provider = 'saml-provider/Auth0';
user.awsRole = 'arn:aws:iam::' + accountId + ':role/' + assignedRoles[0] +
',arn:aws:iam::' + accountId + ':' + provider;
user.quickSightUser = user.name.replace(/@.*/, '');
context.samlConfiguration.mappings = {
'https://aws.amazon.com/SAML/Attributes/Role': 'awsRole',
'https://aws.amazon.com/SAML/Attributes/RoleSessionName': 'quickSightUser',
};
callback(null, user, context);
}

Replace the <your_web_client_id> placeholder with the Client ID of the Amazon QuickSight regular web application you created previously. The Client ID is listed in the Application interface, alongside the application’s name. Also, replace the <your_aws_account_id> placeholder with your twelve digital AWS account Id.

This rule will be used to modify the SAML assertion, as shown in the SAML assertion snippet example below, returned by Auth0 as part of the authentication process. The rule will inject the Amazon Resource Name (ARN) of the IAM role, to which the Auth0 user should be associated: QuickSight-Admin-Role, QuickSight-Author-Role, or QuickSight-Reader-Role. Note that the rule assumes one role per user. Additional logic would be required if the user is assigned to multiple roles.

AWS IAM Identity Provider

Back in the AWS Management Console, add an AWS IAM Identity Provider for Auth0. From the IAM console’s Identity providers interface, click Add provider. For Provider type choose SAML. Name the provider, Auth0. Click Choose file in the Metadata document section. Select the metadata document you downloaded earlier from Auth0. Click Add provider to finish.

The resulting Auth0 IAM Identity provider should be similar to the below example.

AWS IAM Policies and Roles

Next, create three AWS IAM roles that correspond to the three QuickSight personas of Administrator, Author, and Reader. These three roles also correspond to the three Auth0 roles we created previously. The Auth0 user will pass the Auth0 role name in the SAML document. The Auth0 role name will correspond to the IAM role. The IAM role defines the permissions the Auth0 user will have for QuickSight. We can associate many Auth0 users to one Auth0 Role and correspondingly with one IAM role.

First, create three IAM policies: QuickSight-Admin-Policy, QuickSight-Author-Policy, and QuickSight-Reader-Policy. These policies will each be associated with a corresponding IAM role. Create the policy, QuickSight-Admin-Policy. Replace the <your_aws_account_id> placeholder with your twelve digital AWS account Id.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "quicksight:CreateAdmin",
"Resource": "arn:aws:quicksight::<your_aws_account_id>:user/${aws:userid}"
}
]
}

Then, QuickSight-Author-Policy.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "quicksight:CreateUser",
"Resource": "arn:aws:quicksight::<your_aws_account_id>:user/${aws:userid}"
}
]
}

Finally, QuickSight-Reader-Policy.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "quicksight:CreateReader",
"Resource": "arn:aws:quicksight::<your_aws_account_id>:user/${aws:userid}"
}
]
}

Next, create the three corresponding IAM roles and associate the corresponding IAM policy: QuickSight-Admin-Role (QuickSight-Admin-Policy), QuickSight-Author-Role (QuickSight-Author-Policy), and QuickSight-Reader-Role (QuickSight-Reader-Policy).

The role’s Trust relationships establishes a trust relationship between the role and the Auth0 IAM Identity provider, as shown below.

For each of the three roles, click on Edit trust relationship and modify the access control policy document as shown below. Replace the <your_aws_account_id> placeholder with your twelve digital AWS account Id.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::<your_aws_account_id>:saml-provider/Auth0"
},
"Action": "sts:AssumeRoleWithSAML",
"Condition": {
"StringEquals": {
"saml:aud": "https://signin.aws.amazon.com/saml"
}
}
}
]
}

The role’s access control policy document also includes a Condition policy element. According to AWS IAM documentation, for security reasons, AWS should be included as an audience in the SAML assertion your IdP sends to AWS. For the value of the Audience element, specify either https://signin.aws.amazon.com/saml or urn:amazon:webservices.

We can inspect the SAML assertion returned from Auth0 by decoding the form data in the response payload from Base64 format to XML. Below, we see the SAML assertion using Chrome’s Developer Tools to inspect network activity.

Note that the AWS IAM documentation states that the SAML AudienceRestriction value in the SAML assertion from the IdP does not map to the saml:aud context key that you can test in an IAM policy. Instead, the saml:aud context key comes from the SAML recipient attribute because it is the SAML equivalent to the OIDC audience field, for example, by accounts.google.com:aud. These are shown in the SAML assertion XML snippet, below.

QuickSight IdP Configuration

The last step in integrating QuickSight and Auth0 is to configure QuickSight with Auth0’s specific IdP information. From the QuickSight Management interface, select Single sign-on (SSO). Switch the Status to ON.

Add the IdP URL value. As a reminder, this value came from the Auth0 Amazon QuickSight application’s SAML2 Web App Addon’s Usage tab, the Identity Provider Login URL value (see below). Make sure to copy the actual link associated with the URL shown.

Lastly, for the IdP redirect URL parameter value, use RelayState. Select Save to save the IdP configuration.

Testing Identity Federation

The easiest way to test the integration is to open a new Incognito window and point your browser to https://quicksight.aws.amazon.com. You should be prompted for your QuickSight account name. You created your account name when you signed up for QuickSight (e.g., acme-corp-sales).

Once you have entered your QuickSight account name, you should be redirected to Auth0 and presented with the Amazon QuickSight application’s log-in screen. Enter any of the three Auth0 user’s email addresses and passwords.

If the Auth0 log-in is successful, you will be redirected back and into the QuickSight application portal. If this is the first time the user has logged into QuickSight, you will be prompted for the user’s email address. Use the same email address the user is associated with in Auth0. In this scenario, the user will be self-registered with QuickSight and associated with the default namespace.

Your QuickSight experience and available features will vary, depending on the IAM role associated with the user, either Reader, Author, or Admin.

Close the Incognito browser window to end the current user session. Open a new Incognito browser window and repeat the process with the two remaining users, ensuring each can log-in successfully. Also, ensure the user’s experience in QuickSight matches the associated role, either Reader, Author, or Admin.

User Management

As a QuickSight Admin, log back into QuickSight and open the Manage users interface. All three Auth0 users should be registered with QuickSight, as shown in the example below. The users should be associated with the correct IAM role (column 1, below, left of the forward slash): QuickSight-Admin-Role, QuickSight-Author-Role, and QuickSight-Reader-Role. Users should also be associated with the correct QuickSight role (column 3, below): Reader, Author, or Admin.

Users can have custom permissions applied using the Manage permissions option.

Conclusion

In this post, we learned about how Amazon QuickSight supports identity federation. We learned how to manage users with a third-party enterprise identity provider (IdP), Auth0, and use AWS Identity and Access Management (IAM) to authenticate users when they sign in to Amazon QuickSight.

References


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Istio Observability with Go, gRPC, and Protocol Buffers-based Microservices on Google Kubernetes Engine (GKE)

In the last two posts, Kubernetes-based Microservice Observability with Istio Service Mesh and Azure Kubernetes Service (AKS) Observability with Istio Service Mesh, we explored the observability tools which are included with Istio Service Mesh. These tools currently include Prometheus and Grafana for metric collection, monitoring, and alerting, Jaeger for distributed tracing, and Kiali for Istio service-mesh-based microservice visualization and monitoring. Combined with cloud platform-native monitoring and logging services, such as Stackdriver on GCP, CloudWatch on AWS, Azure Monitor logs on Azure, and we have a complete observability solution for modern, distributed, Cloud-based applications.

In this post, we will examine the use of Istio’s observability tools to monitor Go-based microservices that use Protocol Buffers (aka Protobuf) over gRPC (gRPC Remote Procedure Calls) and HTTP/2 for client-server communications, as opposed to the more traditional, REST-based JSON (JavaScript Object Notation) over HTTP (Hypertext Transfer Protocol). We will see how Kubernetes, Istio, Envoy, and the observability tools work seamlessly with gRPC, just as they do with JSON over HTTP, on Google Kubernetes Engine (GKE).

screen_shot_2019-04-18_at_6_03_38_pm

Technologies

Image result for grpc logogRPC

According to the gRPC project, gRPC, a CNCF incubating project, is a modern, high-performance, open-source and universal remote procedure call (RPC) framework that can run anywhere. It enables client and server applications to communicate transparently and makes it easier to build connected systems. Google, the original developer of gRPC, has used the underlying technologies and concepts in gRPC for years. The current implementation is used in several Google cloud products and Google externally facing APIs. It is also being used by Square, Netflix, CoreOS, Docker, CockroachDB, Cisco, Juniper Networks and many other organizations.

Image result for google developerProtocol Buffers

By default, gRPC uses Protocol Buffers. According to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto file to generate data access classes.

Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.

Protocol buffers currently support generated code in Java, Python, Objective-C, and C++, Dart, Go, Ruby, and C#. For this post, we have compiled for Go. You can read more about the binary wire format of Protobuf on Google’s Developers Portal.

Envoy Proxy

According to the Istio project, Istio uses an extended version of the Envoy proxy. Envoy is deployed as a sidecar to a relevant service in the same Kubernetes pod. Envoy, created by Lyft, is a high-performance proxy developed in C++ to mediate all inbound and outbound traffic for all services in the service mesh. Istio leverages Envoy’s many built-in features, including dynamic service discovery, load balancing, TLS termination, HTTP/2 and gRPC proxies, circuit-breakers, health checks, staged rollouts, fault injection, and rich metrics.

According to the post by Harvey Tuch of Google, Evolving a Protocol Buffer canonical API, Envoy proxy adopted Protocol Buffers, specifically proto3, as the canonical specification of for version 2 of Lyft’s gRPC-first API.

Reference Microservices Platform

In the last two posts, we explored Istio’s observability tools, using a RESTful microservices-based API platform written in Go and using JSON over HTTP for service to service communications. The API platform was comprised of eight Go-based microservices and one sample Angular 7, TypeScript-based front-end web client. The various services are dependent on MongoDB, and RabbitMQ for event queue-based communications. Below, the is JSON over HTTP-based platform architecture.

Golang Service Diagram with Proxy v2

Below, the current Angular 7-based web client interface.

screen_shot_2019-04-15_at_10_23_47_pm

Converting to gRPC and Protocol Buffers

For this post, I have modified the eight Go microservices to use gRPC and Protocol Buffers, Google’s data interchange format. Specifically, the services use version 3 release (aka proto3) of Protocol Buffers. With gRPC, a gRPC client calls a gRPC server. Some of the platform’s services are gRPC servers, others are gRPC clients, while some act as both client and server, such as Service A, B, and E. The revised architecture is shown below.

Golang-Service-Diagram-with-gRPC

gRPC Gateway

Assuming for the sake of this demonstration, that most consumers of the API would still expect to communicate using a RESTful JSON over HTTP API, I have added a gRPC Gateway reverse proxy to the platform. The gRPC Gateway is a gRPC to JSON reverse proxy, a common architectural pattern, which proxies communications between the JSON over HTTP-based clients and the gRPC-based microservices. A diagram from the grpc-gateway GitHub project site effectively demonstrates how the reverse proxy works.

grpc_gateway.png

Image courtesy: https://github.com/grpc-ecosystem/grpc-gateway

In the revised platform architecture diagram above, note the addition of the reverse proxy, which replaces Service A at the edge of the API. The proxy sits between the Angular-based Web UI and Service A. Also, note the communication method between services is now Protobuf over gRPC instead of JSON over HTTP. The use of Envoy Proxy (via Istio) is unchanged, as is the MongoDB Atlas-based databases and CloudAMQP RabbitMQ-based queue, which are still external to the Kubernetes cluster.

Alternatives to gRPC Gateway

As an alternative to the gRPC Gateway reverse proxy, we could convert the TypeScript-based Angular UI client to gRPC and Protocol Buffers, and continue to communicate directly with Service A as the edge service. However, this would limit other consumers of the API to rely on gRPC as opposed to JSON over HTTP, unless we also chose to expose two different endpoints, gRPC, and JSON over HTTP, another common pattern.

Demonstration

In this post’s demonstration, we will repeat the exact same installation process, outlined in the previous post, Kubernetes-based Microservice Observability with Istio Service Mesh. We will deploy the revised gRPC-based platform to GKE on GCP. You could just as easily follow Azure Kubernetes Service (AKS) Observability with Istio Service Mesh, and deploy the platform to AKS.

Source Code

All source code for this post is available on GitHub, contained in three projects. The Go-based microservices source code, all Kubernetes resources, and all deployment scripts are located in the k8s-istio-observe-backend project repository, in the new grpc branch.

git clone \
  --branch grpc --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/k8s-istio-observe-backend.git

The Angular-based web client source code is located in the k8s-istio-observe-frontend repository on the new grpc branch. The source protocol buffers .proto file and the generated code, using the protocol buffers compiler, is located in the new pb-greeting project repository. You do not need to clone either of these projects for this post’s demonstration.

All Docker images for the services, UI, and the reverse proxy are located on Docker Hub.

Code Changes

This post is not specifically about writing Go for gRPC and Protobuf. However, to better understand the observability requirements and capabilities of these technologies, compared to JSON over HTTP, it is helpful to review some of the source code.

Service A

First, compare the source code for Service A, shown below, to the original code in the previous post. The service’s code is almost completely re-written. I relied on several references for writing the code, including, Tracing gRPC with Istio, written by Neeraj Poddar of Aspen Mesh and Distributed Tracing Infrastructure with Jaeger on Kubernetes, by Masroor Hasan.

Specifically, note the following code changes to Service A:

  • Import of the pb-greeting protobuf package;
  • Local Greeting struct replaced with pb.Greeting struct;
  • All services are now hosted on port 50051;
  • The HTTP server and all API resource handler functions are removed;
  • Headers, used for distributed tracing with Jaeger, have moved from HTTP request object to metadata passed in the gRPC context object;
  • Service A is coded as a gRPC server, which is called by the gRPC Gateway reverse proxy (gRPC client) via the Greeting function;
  • The primary PingHandler function, which returns the service’s Greeting, is replaced by the pb-greeting protobuf package’s Greeting function;
  • Service A is coded as a gRPC client, calling both Service B and Service C using the CallGrpcService function;
  • CORS handling is offloaded to Istio;
  • Logging methods are unchanged;

Source code for revised gRPC-based Service A (gist):


// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: Service A – gRPC/Protobuf
package main
import (
"context"
"github.com/banzaicloud/logrus-runtime-formatter"
"github.com/google/uuid"
"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
ot "github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"net"
"os"
"time"
pb "github.com/garystafford/pb-greeting"
)
const (
port = ":50051"
)
type greetingServiceServer struct {
}
var (
greetings []*pb.Greeting
)
func (s *greetingServiceServer) Greeting(ctx context.Context, req *pb.GreetingRequest) (*pb.GreetingResponse, error) {
greetings = nil
tmpGreeting := pb.Greeting{
Id: uuid.New().String(),
Service: "Service-A",
Message: "Hello, from Service-A!",
Created: time.Now().Local().String(),
}
greetings = append(greetings, &tmpGreeting)
CallGrpcService(ctx, "service-b:50051")
CallGrpcService(ctx, "service-c:50051")
return &pb.GreetingResponse{
Greeting: greetings,
}, nil
}
func CallGrpcService(ctx context.Context, address string) {
conn, err := createGRPCConn(ctx, address)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
headersIn, _ := metadata.FromIncomingContext(ctx)
log.Infof("headersIn: %s", headersIn)
client := pb.NewGreetingServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx = metadata.NewOutgoingContext(context.Background(), headersIn)
defer cancel()
req := pb.GreetingRequest{}
greeting, err := client.Greeting(ctx, &req)
log.Info(greeting.GetGreeting())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
for _, greeting := range greeting.GetGreeting() {
greetings = append(greetings, greeting)
}
}
func createGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) {
//https://aspenmesh.io/2018/04/tracing-grpc-with-istio/
var opts []grpc.DialOption
opts = append(opts, grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(
grpc_opentracing.WithTracer(ot.GlobalTracer()))))
opts = append(opts, grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(
grpc_opentracing.WithTracer(ot.GlobalTracer()))))
opts = append(opts, grpc.WithInsecure())
conn, err := grpc.DialContext(ctx, addr, opts)
if err != nil {
log.Fatalf("Failed to connect to application addr: ", err)
return nil, err
}
return conn, nil
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(getEnv("LOG_LEVEL", "info"))
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreetingServiceServer(s, &greetingServiceServer{})
log.Fatal(s.Serve(lis))
}

view raw

main.go

hosted with ❤ by GitHub

Greeting Protocol Buffers

Shown below is the greeting source protocol buffers .proto file. The greeting response struct, originally defined in the services, remains largely unchanged (gist). The UI client responses will look identical.


syntax = "proto3";
package greeting;
import "google/api/annotations.proto";
message Greeting {
string id = 1;
string service = 2;
string message = 3;
string created = 4;
}
message GreetingRequest {
}
message GreetingResponse {
repeated Greeting greeting = 1;
}
service GreetingService {
rpc Greeting (GreetingRequest) returns (GreetingResponse) {
option (google.api.http) = {
get: "/api/v1/greeting"
};
}
}

view raw

greeting.proto

hosted with ❤ by GitHub

When compiled with protoc,  the Go-based protocol compiler plugin, the original 27 lines of source code swells to almost 270 lines of generated data access classes that are easier to use programmatically.

# Generate gRPC stub (.pb.go)
protoc -I /usr/local/include -I. \
  -I ${GOPATH}/src \
  -I ${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
  --go_out=plugins=grpc:. \
  greeting.proto

# Generate reverse-proxy (.pb.gw.go)
protoc -I /usr/local/include -I. \
  -I ${GOPATH}/src \
  -I ${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
  --grpc-gateway_out=logtostderr=true:. \
  greeting.proto

# Generate swagger definitions (.swagger.json)
protoc -I /usr/local/include -I. \
  -I ${GOPATH}/src \
  -I ${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
  --swagger_out=logtostderr=true:. \
  greeting.proto

Below is a small snippet of that compiled code, for reference. The compiled code is included in the pb-greeting project on GitHub and imported into each microservice and the reverse proxy (gist). We also compile a separate version for the reverse proxy to implement.


// Code generated by protoc-gen-go. DO NOT EDIT.
// source: greeting.proto
package greeting
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
_ "google.golang.org/genproto/googleapis/api/annotations"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Greeting struct {
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Service string `protobuf:"bytes,2,opt,name=service,proto3" json:"service,omitempty"`
Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
Created string `protobuf:"bytes,4,opt,name=created,proto3" json:"created,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Greeting) Reset() { *m = Greeting{} }
func (m *Greeting) String() string { return proto.CompactTextString(m) }
func (*Greeting) ProtoMessage() {}
func (*Greeting) Descriptor() ([]byte, []int) {
return fileDescriptor_6acac03ccd168a87, []int{0}
}
func (m *Greeting) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Greeting.Unmarshal(m, b)
}
func (m *Greeting) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Greeting.Marshal(b, m, deterministic)

view raw

greeting.pb.go

hosted with ❤ by GitHub

Using Swagger, we can view the greeting protocol buffers’ single RESTful API resource, exposed with an HTTP GET method. I use the Docker-based version of Swagger UI for viewing protoc generated swagger definitions.

docker run -p 8080:8080 -d --name swagger-ui \
  -e SWAGGER_JSON=/tmp/greeting.swagger.json \
  -v ${GOAPTH}/src/pb-greeting:/tmp swaggerapi/swagger-ui

The Angular UI makes an HTTP GET request to the /api/v1/greeting resource, which is transformed to gRPC and proxied to Service A, where it is handled by the Greeting function.

screen_shot_2019-04-15_at_9_05_23_pm

gRPC Gateway Reverse Proxy

As explained earlier, the gRPC Gateway reverse proxy service is completely new. Specifically, note the following code features in the gist below:

  • Import of the pb-greeting protobuf package;
  • The proxy is hosted on port 80;
  • Request headers, used for distributed tracing with Jaeger, are collected from the incoming HTTP request and passed to Service A in the gRPC context;
  • The proxy is coded as a gRPC client, which calls Service A;
  • Logging is largely unchanged;

The source code for the Reverse Proxy (gist):


// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: gRPC Gateway / Reverse Proxy
// reference: https://github.com/grpc-ecosystem/grpc-gateway
package main
import (
"context"
"flag"
lrf "github.com/banzaicloud/logrus-runtime-formatter"
gw "github.com/garystafford/pb-greeting"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"net/http"
"os"
)
func injectHeadersIntoMetadata(ctx context.Context, req *http.Request) metadata.MD {
//https://aspenmesh.io/2018/04/tracing-grpc-with-istio/
var (
otHeaders = []string{
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context"}
)
var pairs []string
for _, h := range otHeaders {
if v := req.Header.Get(h); len(v) > 0 {
pairs = append(pairs, h, v)
}
}
return metadata.Pairs(pairs)
}
type annotator func(context.Context, *http.Request) metadata.MD
func chainGrpcAnnotators(annotators annotator) annotator {
return func(c context.Context, r *http.Request) metadata.MD {
var mds []metadata.MD
for _, a := range annotators {
mds = append(mds, a(c, r))
}
return metadata.Join(mds)
}
}
func run() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
annotators := []annotator{injectHeadersIntoMetadata}
mux := runtime.NewServeMux(
runtime.WithMetadata(chainGrpcAnnotators(annotators)),
)
opts := []grpc.DialOption{grpc.WithInsecure()}
err := gw.RegisterGreetingServiceHandlerFromEndpoint(ctx, mux, "service-a:50051", opts)
if err != nil {
return err
}
return http.ListenAndServe(":80", mux)
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func init() {
formatter := lrf.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(getEnv("LOG_LEVEL", "info"))
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}
func main() {
flag.Parse()
if err := run(); err != nil {
log.Fatal(err)
}
}

view raw

main.go

hosted with ❤ by GitHub

Below, in the Stackdriver logs, we see an example of a set of HTTP request headers in the JSON payload, which are propagated upstream to gRPC-based Go services from the gRPC Gateway’s reverse proxy. Header propagation ensures the request produces a complete distributed trace across the complete service call chain.

screen_shot_2019-04-15_at_11_10_50_pm

Istio VirtualService and CORS

According to feedback in the project’s GitHub Issues, the gRPC Gateway does not directly support Cross-Origin Resource Sharing (CORS) policy. In my own experience, the gRPC Gateway cannot handle OPTIONS HTTP method requests, which must be issued by the Angular 7 web UI. Therefore, I have offloaded CORS responsibility to Istio, using the VirtualService resource’s CorsPolicy configuration. This makes CORS much easier to manage than coding CORS configuration into service code (gist):


apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: service-rev-proxy
spec:
hosts:
api.dev.example-api.com
gateways:
demo-gateway
http:
match:
uri:
prefix: /
route:
destination:
port:
number: 80
host: service-rev-proxy.dev.svc.cluster.local
weight: 100
corsPolicy:
allowOrigin:
"*"
allowMethods:
OPTIONS
GET
allowCredentials: true
allowHeaders:
"*"

Set-up and Installation

To deploy the microservices platform to GKE, follow the detailed instructions in part one of the post, Kubernetes-based Microservice Observability with Istio Service Mesh: Part 1, or Azure Kubernetes Service (AKS) Observability with Istio Service Mesh for AKS.

  1. Create the external MongoDB Atlas database and CloudAMQP RabbitMQ clusters;
  2. Modify the Kubernetes resource files and bash scripts for your own environments;
  3. Create the managed GKE or AKS cluster on GCP or Azure;
  4. Configure and deploy Istio to the managed Kubernetes cluster, using Helm;
  5. Create DNS records for the platform’s exposed resources;
  6. Deploy the Go-based microservices, gRPC Gateway reverse proxy, Angular UI, and associated resources to Kubernetes cluster;
  7. Test and troubleshoot the platform deployment;
  8. Observe the results;

The Three Pillars

As introduced in the first post, logs, metrics, and traces are often known as the three pillars of observability. These are the external outputs of the system, which we may observe. As modern distributed systems grow ever more complex, the ability to observe those systems demands equally modern tooling that was designed with this level of complexity in mind. Traditional logging and monitoring systems often struggle with today’s hybrid and multi-cloud, polyglot language-based, event-driven, container-based and serverless, infinitely-scalable, ephemeral-compute platforms.

Tools like Istio Service Mesh attempt to solve the observability challenge by offering native integrations with several best-of-breed, open-source telemetry tools. Istio’s integrations include Jaeger for distributed tracing, Kiali for Istio service mesh-based microservice visualization and monitoring, and Prometheus and Grafana for metric collection, monitoring, and alerting. Combined with cloud platform-native monitoring and logging services, such as Stackdriver for GKE, CloudWatch for Amazon’s EKS, or Azure Monitor logs for AKS, and we have a complete observability solution for modern, distributed, Cloud-based applications.

Pillar 1: Logging

Moving from JSON over HTTP to gRPC does not require any changes to the logging configuration of the Go-based service code or Kubernetes resources.

Stackdriver with Logrus

As detailed in part two of the last post, Kubernetes-based Microservice Observability with Istio Service Mesh, our logging strategy for the eight Go-based microservices and the reverse proxy continues to be the use of Logrus, the popular structured logger for Go, and Banzai Cloud’s logrus-runtime-formatter.

If you recall, the Banzai formatter automatically tags log messages with runtime/stack information, including function name and line number; extremely helpful when troubleshooting. We are also using Logrus’ JSON formatter. Below, in the Stackdriver console, note how each log entry below has the JSON payload contained within the message with the log level, function name, lines on which the log entry originated, and the message.

screen_shot_2019-04-15_at_11_10_36_pm

Below, we see the details of a specific log entry’s JSON payload. In this case, we can see the request headers propagated from the downstream service.

screen_shot_2019-04-15_at_11_10_50_pm

Pillar 2: Metrics

Moving from JSON over HTTP to gRPC does not require any changes to the metrics configuration of the Go-based service code or Kubernetes resources.

Prometheus

Prometheus is a completely open source and community-driven systems monitoring and alerting toolkit originally built at SoundCloud, circa 2012. Interestingly, Prometheus joined the Cloud Native Computing Foundation (CNCF) in 2016 as the second hosted-project, after Kubernetes.

screen_shot_2019-04-15_at_11_04_54_pm

Grafana

Grafana describes itself as the leading open source software for time series analytics. According to Grafana Labs, Grafana allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. You can easily create, explore, and share visually-rich, data-driven dashboards. Grafana allows users to visually define alert rules for your most important metrics. Grafana will continuously evaluate rules and can send notifications.

According to Istio, the Grafana add-on is a pre-configured instance of Grafana. The Grafana Docker base image has been modified to start with both a Prometheus data source and the Istio Dashboard installed. Below, we see two of the pre-configured dashboards, the Istio Mesh Dashboard and the Istio Performance Dashboard.

screen_shot_2019-04-15_at_10_45_38_pm

screen_shot_2019-04-15_at_10_46_03_pm

Pillar 3: Traces

Moving from JSON over HTTP to gRPC did require a complete re-write of the tracing logic in the service code. In fact, I spent the majority of my time ensuring the correct headers were propagated from the Istio Ingress Gateway to the gRPC Gateway reverse proxy, to Service A in the gRPC context, and upstream to all the dependent, gRPC-based services. I am sure there are a number of optimization in my current code, regarding the correct handling of traces and how this information is propagated across the service call stack.

Jaeger

According to their website, Jaeger, inspired by Dapper and OpenZipkin, is a distributed tracing system released as open source by Uber Technologies. It is used for monitoring and troubleshooting microservices-based distributed systems, including distributed context propagation, distributed transaction monitoring, root cause analysis, service dependency analysis, and performance and latency optimization. The Jaeger website contains an excellent overview of Jaeger’s architecture and general tracing-related terminology.

Below we see the Jaeger UI Traces View. In it, we see a series of traces generated by hey, a modern load generator and benchmarking tool, and a worthy replacement for Apache Bench (ab). Unlike abhey supports HTTP/2. The use of hey was detailed in the previous post.

screen_shot_2019-04-18_at_6_08_21_pm

A trace, as you might recall, is an execution path through the system and can be thought of as a directed acyclic graph (DAG) of spans. If you have worked with systems like Apache Spark, you are probably already familiar with DAGs.

screen_shot_2019-04-15_at_11_06_13_pm

Below we see the Jaeger UI Trace Detail View. The example trace contains 16 spans, which encompasses nine components – seven of the eight Go-based services, the reverse proxy, and the Istio Ingress Gateway. The trace and the spans each have timings. The root span in the trace is the Istio Ingress Gateway. In this demo, traces do not span the RabbitMQ message queues. This means you would not see a trace which includes the decoupled, message-based communications between Service D to Service F, via the RabbitMQ.

screen_shot_2019-04-15_at_11_08_07_pm

Within the Jaeger UI Trace Detail View, you also have the ability to drill into a single span, which contains additional metadata. Metadata includes the URL being called, HTTP method, response status, and several other headers.

screen_shot_2019-04-15_at_11_08_22_pm

Microservice Observability

Moving from JSON over HTTP to gRPC does not require any changes to the Kiali configuration of the Go-based service code or Kubernetes resources.

Kiali

According to their website, Kiali provides answers to the questions: What are the microservices in my Istio service mesh, and how are they connected? Kiali works with Istio, in OpenShift or Kubernetes, to visualize the service mesh topology, to provide visibility into features like circuit breakers, request rates and more. It offers insights about the mesh components at different levels, from abstract Applications to Services and Workloads.

The Graph View in the Kiali UI is a visual representation of the components running in the Istio service mesh. Below, filtering on the cluster’s dev Namespace, we should observe that Kiali has mapped all components in the platform, along with rich metadata, such as their version and communication protocols.

screen_shot_2019-04-18_at_6_03_38_pm

Using Kiali, we can confirm our service-to-service IPC protocol is now gRPC instead of the previous HTTP.

screen_shot_2019-04-14_at_11_15_49_am

Conclusion

Although converting from JSON over HTTP to protocol buffers with gRPC required major code changes to the services, it did not impact the high-level observability we have of those services using the tools provided by Istio, including Prometheus, Grafana, Jaeger, and Kiali.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , , , , , , , ,

2 Comments

Automating Multi-Environment Kubernetes Virtual Clusters with Google Cloud DNS, Auth0, and Istio 1.0

Kubernetes supports multiple virtual clusters within the same physical cluster. These virtual clusters are called Namespaces. Namespaces are a way to divide cluster resources between multiple users. Many enterprises use Namespaces to divide the same physical Kubernetes cluster into different virtual software development environments as part of their overall Software Development Lifecycle (SDLC). This practice is commonly used in ‘lower environments’ or ‘non-prod’ (not Production) environments. These environments commonly include Continous Integration and Delivery (CI/CD), Development, Integration, Testing/Quality Assurance (QA), User Acceptance Testing (UAT), Staging, Demo, and Hotfix. Namespaces provide a basic form of what is referred to as soft multi-tenancy.

Generally, the security boundaries and performance requirements between non-prod environments, within the same enterprise, are less restrictive than Production or Disaster Recovery (DR) environments. This allows for multi-tenant environments, while Production and DR are normally single-tenant environments. In order to approximate the performance characteristics of Production, the Performance Testing environment is also often isolated to a single-tenant. A typical enterprise would minimally have a non-prod, performance, production, and DR environment.

Using Namespaces to create virtual separation on the same physical Kubernetes cluster provides enterprises with more efficient use of virtual compute resources, reduces Cloud costs, eases the management burden, and often expedites and simplifies the release process.

Demonstration

In this post, we will re-examine the topic of virtual clusters, similar to the recent post, Managing Applications Across Multiple Kubernetes Environments with Istio: Part 1 and Part 2. We will focus specifically on automating the creation of the virtual clusters on GKE with Istio 1.0, managing the Google Cloud DNS records associated with the cluster’s environments, and enabling both HTTPS and token-based OAuth access to each environment. We will use the Storefront API for our demonstration, featured in the previous three posts, including Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine.

gke-routing.png

Source Code

The source code for this post may be found on the gke branch of the storefront-kafka-docker GitHub repository.

git clone --branch gke --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/storefront-kafka-docker.git

Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers, such as LinkedIn.

This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources.

Screen Shot 2019-01-19 at 11.49.31 AM.png

To follow along, you will need to register your own domain, arrange for an Auth0, or alternative, authentication and authorization service, and obtain an SSL/TLS certificate.

SSL/TLS Wildcard Certificate

In the recent post, Securing Your Istio Ingress Gateway with HTTPS, we examined how to create and apply an SSL/TLS certificate to our GKE cluster, to secure communications. Although we are only creating a non-prod cluster, it is more and more common to use SSL/TLS everywhere, especially in the Cloud. For this post, I have registered a single wildcard certificate, *.api.storefront-demo.com. This certificate will cover the three second-level subdomains associated with the virtual clusters: dev.api.storefront-demo.com, test.api.storefront-demo.com, and uat.api.storefront-demo.com. Setting the environment name, such as dev.*, as the second-level subdomain of my storefront-demo domain, following the first level api.* subdomain, makes the use of a wildcard certificate much easier.

screen_shot_2019-01-13_at_10.04.23_pm

As shown below, my wildcard certificate contains the Subject Name and Subject Alternative Name (SAN) of *.api.storefront-demo.com. For Production, api.storefront-demo.com, I prefer to use a separate certificate.

screen_shot_2019-01-13_at_10.36.33_pm_detail

Create GKE Cluster

With your certificate in hand, create the non-prod Kubernetes cluster. Below, the script creates a minimally-sized, three-node, multi-zone GKE cluster, running on GCP, with Kubernetes Engine cluster version 1.11.5-gke.5 and Istio on GKE version 1.0.3-gke.0. I have enabled the master authorized networks option to secure my GKE cluster master endpoint. For the demo, you can add your own IP address CIDR on line 9 (i.e. 1.2.3.4/32), or remove lines 30 – 31 to remove the restriction (gist).

  • Lines 16–39: Create a 3-node, multi-zone GKE cluster with Istio;
  • Line 48: Creates three non-prod Namespaces: dev, test, and uat;
  • Lines 51–53: Enable Istio automatic sidecar injection within each Namespace;


#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Create non-prod Kubernetes cluster on GKE
# Constants – CHANGE ME!
readonly PROJECT='gke-confluent-atlas'
readonly CLUSTER='storefront-api-non-prod'
readonly REGION='us-central1'
readonly MASTER_AUTH_NETS='<your_ip_cidr>'
readonly NAMESPACES=( 'dev' 'test' 'uat' )
# Build a 3-node, single-region, multi-zone GKE cluster
time gcloud beta container \
–project $PROJECT clusters create $CLUSTER \
–region $REGION \
–no-enable-basic-auth \
–no-issue-client-certificate \
–cluster-version "1.11.5-gke.5" \
–machine-type "n1-standard-2" \
–image-type "COS" \
–disk-type "pd-standard" \
–disk-size "100" \
–scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \
–num-nodes "1" \
–enable-stackdriver-kubernetes \
–enable-ip-alias \
–enable-master-authorized-networks \
–master-authorized-networks $MASTER_AUTH_NETS \
–network "projects/${PROJECT}/global/networks/default" \
–subnetwork "projects/${PROJECT}/regions/${REGION}/subnetworks/default" \
–default-max-pods-per-node "110" \
–addons HorizontalPodAutoscaling,HttpLoadBalancing,Istio \
–istio-config auth=MTLS_STRICT \
–metadata disable-legacy-endpoints=true \
–enable-autoupgrade \
–enable-autorepair
# Get cluster creds
gcloud container clusters get-credentials $CLUSTER \
–region $REGION –project $PROJECT
kubectl config current-context
# Create Namespaces
kubectl apply -f ./resources/other/namespaces.yaml
# Enable automatic Istio sidecar injection
for namespace in ${NAMESPACES[@]}; do
kubectl label namespace $namespace istio-injection=enabled
done

If successful, the results should look similar to the output, below.

screen_shot_2019-01-15_at_11.51.08_pm

The cluster will contain a pool of three minimally-sized VMs, the Kubernetes nodes.

screen_shot_2019-01-16_at_12.06.03_am

Deploying Resources

The Istio Gateway and three ServiceEntry resources are the primary resources responsible for routing the traffic from the ingress router to the Services, within the multiple Namespaces. Both of these resource types are new to Istio 1.0 (gist).

  • Lines 9–16: Port config that only accepts HTTPS traffic on port 443 using TLS;
  • Lines 18–20: The three subdomains being routed to the non-prod GKE cluster;
  • Lines 28, 63, 98: The three subdomains being routed to the non-prod GKE cluster;
  • Lines 39, 47, 65, 74, 82, 90, 109, 117, 125: Routing to FQDN of Storefront API Services within the three Namespaces;


apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: storefront-gateway
spec:
selector:
istio: ingressgateway
servers:
port:
number: 443
name: https
protocol: HTTPS
tls:
mode: SIMPLE
serverCertificate: /etc/istio/ingressgateway-certs/tls.crt
privateKey: /etc/istio/ingressgateway-certs/tls.key
hosts:
dev.api.storefront-demo.com
test.api.storefront-demo.com
uat.api.storefront-demo.com
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-dev
spec:
hosts:
dev.api.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.dev.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.dev.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.dev.svc.cluster.local
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-test
spec:
hosts:
test.api.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.test.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.test.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.test.svc.cluster.local
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-uat
spec:
hosts:
uat.api.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.uat.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.uat.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.uat.svc.cluster.local

Next, deploy the Istio and Kubernetes resources to the new GKE cluster. For the sake of brevity, we will deploy the same number of instances and the same version of each the three Storefront API services (Accounts, Orders, Fulfillment) to each of the three non-prod environments (dev, test, uat). In reality, you would have varying numbers of instances of each service, and each environment would contain progressive versions of each service, as part of the SDLC of each microservice (gist).

  • Lines 13–14: Deploy the SSL/TLS certificate and the private key;
  • Line 17: Deploy the Istio Gateway and three ServiceEntry resources;
  • Lines 20–22: Deploy the Istio Authentication Policy resources each Namespace;
  • Lines 26–37: Deploy the same set of resources to the dev, test, and uat Namespaces;


#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Deploy Kubernetes/Istio resources
# Constants – CHANGE ME!
readonly CERT_PATH=~/Documents/Articles/gke-kafka/sslforfree_non_prod
readonly NAMESPACES=( 'dev' 'test' 'uat' )
# Kubernetes Secret to hold the server’s certificate and private key
kubectl create -n istio-system secret tls istio-ingressgateway-certs \
–key $CERT_PATH/private.key –cert $CERT_PATH/certificate.crt
# Istio Gateway and three ServiceEntry resources
kubectl apply -f ./resources/other/istio-gateway.yaml
# End-user auth applied per environment
kubectl apply -f ./resources/other/auth-policy-dev.yaml
kubectl apply -f ./resources/other/auth-policy-test.yaml
kubectl apply -f ./resources/other/auth-policy-uat.yaml
# Loop through each non-prod Namespace (environment)
# Re-use same resources (incld. credentials) for all environments, just for the demo
for namespace in ${NAMESPACES[@]}; do
kubectl apply -n $namespace -f ./resources/config/confluent-cloud-kafka-configmap.yaml
kubectl apply -n $namespace -f ./resources/config/mongodb-atlas-secret.yaml
kubectl apply -n $namespace -f ./resources/config/confluent-cloud-kafka-secret.yaml
kubectl apply -n $namespace -f ./resources/other/mongodb-atlas-external-mesh.yaml
kubectl apply -n $namespace -f ./resources/other/confluent-cloud-external-mesh.yaml
kubectl apply -n $namespace -f ./resources/services/accounts.yaml
kubectl apply -n $namespace -f ./resources/services/fulfillment.yaml
kubectl apply -n $namespace -f ./resources/services/orders.yaml
done

The deployed Storefront API Services should look as follows.

screen_shot_2019-01-13_at_7.16.03_pm

Google Cloud DNS

Next, we need to enable DNS access to the GKE cluster using Google Cloud DNS. According to Google, Cloud DNS is a scalable, reliable and managed authoritative Domain Name System (DNS) service running on the same infrastructure as Google. It has low latency, high availability, and is a cost-effective way to make your applications and services available to your users.

Whenever a new GKE cluster is created, a new Network Load Balancer is also created. By default, the load balancer’s front-end is an external IP address.

screen_shot_2019-01-15_at_11.56.01_pm.png

Using a forwarding rule, traffic directed at the external IP address is redirected to the load balancer’s back-end. The load balancer’s back-end is comprised of three VM instances, which are the three Kubernete nodes in the GKE cluster.

screen_shot_2019-01-15_at_11.56.19_pm

If you are following along with this post’s demonstration, we will assume you have a domain registered and configured with Google Cloud DNS. I am using the storefront-demo.com domain, which I have used in the last three posts to demonstrate Istio and GKE.

Google Cloud DNS has a fully functional web console, part of the Google Cloud Console. However, using the Cloud DNS web console is impractical in a DevOps CI/CD workflow, where Kubernetes clusters, Namespaces, and Workloads are ephemeral. Therefore we will use the following script. Within the script, we reset the IP address associated with the A records for each non-prod subdomains associated with storefront-demo.com domain (gist).

  • Lines 23–25: Find the previous load balancer’s front-end IP address;
  • Lines 27–29: Find the new load balancer’s front-end IP address;
  • Line 35: Start the Cloud DNS transaction;
  • Lines 37–47: Add the DNS record changes to the transaction;
  • Line 49: Execute the Cloud DNS transaction;


#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Update Cloud DNS A Records
# Constants – CHANGE ME!
readonly PROJECT='gke-confluent-atlas'
readonly DOMAIN='storefront-demo.com'
readonly ZONE='storefront-demo-com-zone'
readonly REGION='us-central1'
readonly TTL=300
readonly RECORDS=('dev' 'test' 'uat')
# Make sure any old load balancers were removed
if [ $(gcloud compute forwarding-rules list –filter "region:($REGION)" | wc -l | awk '{$1=$1};1') -gt 2 ]; then
echo "More than one load balancer detected, exiting script."
exit 1
fi
# Get load balancer IP address from first record
readonly OLD_IP=$(gcloud dns record-sets list \
–filter "name=${RECORDS[0]}.api.${DOMAIN}." –zone $ZONE \
| awk 'NR==2 {print $4}')
readonly NEW_IP=$(gcloud compute forwarding-rules list \
–filter "region:($REGION)" \
| awk 'NR==2 {print $3}')
echo "Old LB IP Address: ${OLD_IP}"
echo "New LB IP Address: ${NEW_IP}"
# Update DNS records
gcloud dns record-sets transaction start –zone $ZONE
for record in ${RECORDS[@]}; do
echo "${record}.api.${DOMAIN}."
gcloud dns record-sets transaction remove \
–name "${record}.api.${DOMAIN}." –ttl $TTL \
–type A –zone $ZONE "${OLD_IP}"
gcloud dns record-sets transaction add \
–name "${record}.api.${DOMAIN}." –ttl $TTL \
–type A –zone $ZONE "${NEW_IP}"
done
gcloud dns record-sets transaction execute –zone $ZONE

The outcome of the script is shown below. Note how changes are executed as part of a transaction, by automatically creating a transaction.yaml file. The file contains the six DNS changes, three additions and three deletions. The command executes the transaction and then deletes the transaction.yaml file.

> sh ./part3_set_cloud_dns.sh
Old LB IP Address: 35.193.208.115
New LB IP Address: 35.238.196.231

Transaction started [transaction.yaml].

dev.api.storefront-demo.com.
Record removal appended to transaction at [transaction.yaml].
Record addition appended to transaction at [transaction.yaml].

test.api.storefront-demo.com.
Record removal appended to transaction at [transaction.yaml].
Record addition appended to transaction at [transaction.yaml].

uat.api.storefront-demo.com.
Record removal appended to transaction at [transaction.yaml].
Record addition appended to transaction at [transaction.yaml].

Executed transaction [transaction.yaml] for managed-zone [storefront-demo-com-zone].
Created [https://www.googleapis.com/dns/v1/projects/gke-confluent-atlas/managedZones/storefront-demo-com-zone/changes/53].

ID  START_TIME                STATUS
55  2019-01-16T04:54:14.984Z  pending

Based on my own domain and cluster details, the transaction.yaml file looks as follows. Again, note the six DNS changes, three additions, followed by three deletions (gist).


additions:
kind: dns#resourceRecordSet
name: storefront-demo.com.
rrdatas:
ns-cloud-a1.googledomains.com. cloud-dns-hostmaster.google.com. 25 21600 3600
259200 300
ttl: 21600
type: SOA
kind: dns#resourceRecordSet
name: dev.api.storefront-demo.com.
rrdatas:
35.238.196.231
ttl: 300
type: A
kind: dns#resourceRecordSet
name: test.api.storefront-demo.com.
rrdatas:
35.238.196.231
ttl: 300
type: A
kind: dns#resourceRecordSet
name: uat.api.storefront-demo.com.
rrdatas:
35.238.196.231
ttl: 300
type: A
deletions:
kind: dns#resourceRecordSet
name: storefront-demo.com.
rrdatas:
ns-cloud-a1.googledomains.com. cloud-dns-hostmaster.google.com. 24 21600 3600
259200 300
ttl: 21600
type: SOA
kind: dns#resourceRecordSet
name: dev.api.storefront-demo.com.
rrdatas:
35.193.208.115
ttl: 300
type: A
kind: dns#resourceRecordSet
name: test.api.storefront-demo.com.
rrdatas:
35.193.208.115
ttl: 300
type: A
kind: dns#resourceRecordSet
name: uat.api.storefront-demo.com.
rrdatas:
35.193.208.115
ttl: 300
type: A

Confirm DNS Changes

Use the dig command to confirm the DNS records are now correct and that DNS propagation has occurred. The IP address returned by dig should be the external IP address assigned to the front-end of the Google Cloud Load Balancer.

> dig dev.api.storefront-demo.com +short
35.238.196.231

Or, all the three records.

echo \
  "dev.api.storefront-demo.com\n" \
  "test.api.storefront-demo.com\n" \
  "uat.api.storefront-demo.com" \
  > records.txt | dig -f records.txt +short

35.238.196.231
35.238.196.231
35.238.196.231

Optionally, more verbosely by removing the +short option.

> dig +nocmd dev.api.storefront-demo.com

;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 30763
;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 512
;; QUESTION SECTION:
;dev.api.storefront-demo.com.   IN  A

;; ANSWER SECTION:
dev.api.storefront-demo.com. 299 IN A   35.238.196.231

;; Query time: 27 msec
;; SERVER: 8.8.8.8#53(8.8.8.8)
;; WHEN: Wed Jan 16 18:00:49 EST 2019
;; MSG SIZE  rcvd: 72

The resulting records in the Google Cloud DNS management console should look as follows.

screen_shot_2019-01-15_at_11.57.12_pm

JWT-based Authentication

As discussed in the previous post, Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0, it is typical to limit restrict access to the Kubernetes cluster, Namespaces within the cluster, or Services running within Namespaces to end-users, whether they are humans or other applications. In that previous post, we saw an example of applying a machine-to-machine (M2M) Istio Authentication Policy to only the uat Namespace. This scenario is common when you want to control access to resources in non-production environments, such as UAT, to outside test teams, accessing the uat Namespace through an external application. To simulate this scenario, we will apply the following Istio Authentication Policy to the uat Namespace. (gist).


apiVersion: authentication.istio.io/v1alpha1
kind: Policy
metadata:
name: default
namespace: uat
spec:
peers:
mtls: {}
origins:
jwt:
audiences:
"storefront-api-uat"
issuer: "https://storefront-demo.auth0.com/"
jwksUri: "https://storefront-demo.auth0.com/.well-known/jwks.json"
principalBinding: USE_ORIGIN

For the dev and test Namespaces, we will apply an additional, different Istio Authentication Policy. This policy will protect against the possibility of dev and test M2M API consumers interfering with uat M2M API consumers and vice-versa. Below is the dev and test version of the Policy (gist).


apiVersion: authentication.istio.io/v1alpha1
kind: Policy
metadata:
name: default
namespace: dev
spec:
peers:
mtls: {}
origins:
jwt:
audiences:
"storefront-api-dev-test"
issuer: "https://storefront-demo.auth0.com/"
jwksUri: "https://storefront-demo.auth0.com/.well-known/jwks.json"
principalBinding: USE_ORIGIN

Testing Authentication

Using Postman, with the ‘Bearer Token’ type authentication method, as detailed in the previous post, a call a Storefront API resource in the uat Namespace should succeed. This also confirms DNS and HTTPS are working properly.

screen_shot_2019-01-15_at_11.58.41_pm

The dev and test Namespaces require different authentication. Trying to use no Authentication, or authenticating as a UAT API consumer, will result in a 401 Unauthorized HTTP status, along with the Origin authentication failed. error message.

screen_shot_2019-01-16_at_12.00.55_am

Conclusion

In this brief post, we demonstrated how to create a GKE cluster with Istio 1.0.x, containing three virtual clusters, or Namespaces. Each Namespace represents an environment, which is part of an application’s SDLC. We enforced HTTP over TLS (HTTPS) using a wildcard SSL/TLS certificate. We also enforced end-user authentication using JWT-based OAuth 2.0 with Auth0. Lastly, we provided user-friendly DNS routing to each environment, using Google Cloud DNS. Short of a fully managed API Gateway, like Apigee, and automating the execution of the scripts with Jenkins or Spinnaker, this cluster is ready to provide a functional path to Production for developing our Storefront API.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , ,

2 Comments

Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0

In the recent post, Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine, we built and deployed a microservice-based, cloud-native API to Google Kubernetes Engine, with Istio 1.0.x, on Google Cloud Platform. For brevity, we intentionally omitted a few key features required to operationalize and secure the API. These missing features included HTTPS, user authentication, request quotas, request throttling, and the integration of a full lifecycle API management tool, like Google Apigee.

In a follow-up post, Securing Your Istio Ingress Gateway with HTTPS, we disabled HTTP access to the API running on the GKE cluster. We then enabled bidirectional encryption of communications between a client and GKE cluster with HTTPS.

In this post, we will further enhance the security of the Storefront Demo API by enabling Istio end-user authentication using JSON Web Token-based credentials. Using JSON Web Tokens (JWT), pronounced ‘jot’, will allow Istio to authenticate end-users calling the Storefront Demo API. We will use Auth0, an Authentication-as-a-Service provider, to generate JWT tokens for registered Storefront Demo API consumers, and to validate JWT tokens from Istio, as part of an OAuth 2.0 token-based authorization flow.

istio-gke-auth

JSON Web Tokens

Token-based authentication, according to Auth0, works by ensuring that each request to a server is accompanied by a signed token which the server verifies for authenticity and only then responds to the request. JWT, according to JWT.io, is an open standard (RFC 7519) that defines a compact and self-contained way for securely transmitting information between parties as a JSON object. This information can be verified and trusted because it is digitally signed. Other common token types include Simple Web Tokens (SWT) and Security Assertion Markup Language Tokens (SAML).

JWTs can be signed using a secret with the Hash-based Message Authentication Code (HMAC) algorithm, or a public/private key pair using Rivest–Shamir–Adleman (RSA) or Elliptic Curve Digital Signature Algorithm (ECDSA). Authorization is the most common scenario for using JWT. Within the token payload, you can easily specify user roles and permissions as well as resources that the user can access.

A registered API consumer makes an initial request to the Authorization server, in which they exchange some form of credentials for a token. The JWT is associated with a set of specific user roles and permissions. Each subsequent request will include the token, allowing the user to access authorized routes, services, and resources that are permitted with that token.

Auth0

To use JWTs for end-user authentication with Istio, we need a way to authenticate credentials associated with specific users and exchange those credentials for a JWT. Further, we need a way to validate the JWTs from Istio. To meet these requirements, we will use Auth0. Auth0 provides a universal authentication and authorization platform for web, mobile, and legacy applications. According to G2 Crowd, competitors to Auth0 in the Customer Identity and Access Management (CIAM) Software category include Okta, Microsoft Azure Active Directory (AD) and AD B2C, Salesforce Platform: Identity, OneLogin, Idaptive, IBM Cloud Identity Service, and Bitium.

screen_shot_2019-01-09_at_10.18.16_am.png

Auth0 currently offers four pricing plans: Free, Developer, Developer Pro, and Enterprise. Subscriptions to plans are on a monthly or discounted yearly basis. For this demo’s limited requirements, you need only use Auth0’s Free Plan.

screen_shot_2019-01-06_at_6.11.45_pm

Client Credentials Grant

The OAuth 2.0 protocol defines four flows, or grants types, to get an Access Token, depending on the application architecture and the type of end-user. We will be simulating a third-party, external application that needs to consume the Storefront API, using the Client Credentials grant type. According to Auth0, The Client Credentials Grant, defined in The OAuth 2.0 Authorization Framework RFC 6749, section 4.4, allows an application to request an Access Token using its Client Id and Client Secret. It is used for non-interactive applications, such as a CLI, a daemon, or a Service running on your backend, where the token is issued to the application itself, instead of an end user.

jwt-istio-authorize-flow

With Auth0, we need to create two types of entities, an Auth0 API and an Auth0 Application. First, we define an Auth0 API, which represents the Storefront API we are securing. Second, we define an Auth0 Application, a consumer of our API. The Application is associated with the API. This association allows the Application (consumer of the API) to authenticate with Auth0 and receive a JWT. Note there is no direct integration between Auth0 and Istio or the Storefront API. We are facilitating a decoupled, mutual trust relationship between Auth0, Istio, and the registered end-user application consuming the API.

Start by creating a new Auth0 API, the ‘Storefront Demo API’. For this demo, I used my domain’s URL as the Identifier. For use with Istio, choose RS256 (RSA Signature with SHA-256), an asymmetric algorithm that uses a public/private key pair, as opposed to the HS256 symmetric algorithm. With RS256, Auth0 will use the same private key to both create the signature and to validate it. Auth0 has published a good post on the use of RS256 vs. HS256 algorithms.

screen_shot_2019-01-05_at_9.39.01_am

screen_shot_2019-01-05_at_1.49.06_pm

Scopes

Auth0 allows granular access control to your API through the use of Scopes. The permissions represented by the Access Token in OAuth 2.0 terms are known as scopes, According to Auth0. The scope parameter allows the application to express the desired scope of the access request. The scope parameter can also be used by the authorization server in the response to indicate which scopes were actually granted.

Although it is necessary to define and assign at least one scope to our Auth0 Application, we will not actually be using those scopes to control fine-grain authorization to resources within the Storefront API. In this demo, if an end-user is authenticated, they will be authorized to access all Storefront API resources.

screen_shot_2019-01-05_at_9.45.22_am

Machine to Machine Applications

Next, define a new Auth0 Machine to Machine (M2M) Application, ‘Storefront Demo API Consumer 1’.

screen_shot_2019-01-06_at_7.05.21_pm.png

Next, authorize the new M2M Application to request access to the new Storefront Demo API. Again, we are not using scopes, but at least one scope is required, or you will not be able to authenticate, later.

screen_shot_2019-01-06_at_7.23.40_pm.png

Each M2M Application has a unique Client ID and Client Secret, which are used to authenticate with the Auth0 server and retrieve a JWT.

screen_shot_2019-01-05_at_1.50.32_pm

Multiple M2M Applications may be authorized to request access to APIs.

screen_shot_2019-01-05_at_1.50.17_pm

In the Endpoints tab of the Advanced Application Settings, there are a series of OAuth URLs. To authorize our new M2M Application to consume the Storefront Demo API, we need the ‘OAuth Authorization URL’.

screen_shot_2019-01-06_at_7.32.54_pm.png

Testing Auth0

To test the Auth0 JWT-based authentication and authorization workflow, I prefer to use Postman. Conveniently, Auth0 provides a Postman Collection with all the HTTP request you will need, already built. Use the Client Credentials POST request. The grant_type header value will always be client_credentials. You will need to supply the Auth0 Application’s Client ID and Client Secret as the client_id and client_secret header values. The audience header value will be the API Identifier you used to create the Auth0 API earlier.

screen_shot_2019-01-06_at_5.25.50_pm

If the HTTP request is successful, you should receive a JWT access_token in response, which will allow us to authenticate with the Storefront API, later. Note the scopes you defined with Auth0 are also part of the response, along with the token’s TTL.

jwt.io Debugger

For now, test the JWT using the jwt.io Debugger page. If everything is working correctly, the JWT should be successfully validated.