Read from the Stream

In this lab, a lambda function will be used as the Kinesis consumer. AWS Lambda integrates natively with Amazon Kinesis to process data ingested through a data stream. The polling, checkpointing, and error handling complexities are abstracted when you use this native integration.

If your architecture can support event-driven patterns, decoupled services, and stateless micro-services then using Lambda as the Kinesis consumer can be a quick, managed, and scalable fit.


Create the lambda function

A lambda function will be used to read the incoming data from the Kinesis streams. For convenience, a cloudformation template will be used to create the lambda function and all necessary supporting infrastructure.

CloudFormation Template

A cloudformation script will be used to provision the Aurora Serverless cluster. Use the quicklink below to start the cloudformation template in the AWS console. Ensure that the selected region is consistent throughout the entire lab.

The following resources will be created via the cloudformation template.

Region Launch CloudFormation Template
US East (Virginia) Launch Stack in us-east-1
US East (Ohio) Launch Stack in us-east-2
US West (Oregon) Launch Stack in us-west-2
Europe (Frankfurt) Launch Stack in eu-central-1

or, download the file to your local workstation and create a Cloudformation stack by uploading the template.

A Quick create stack should appear as below.

The Stack name can be left as default which is, stream-lambda.

Enter the KinesisStreamName and AuroraDatabaseName from your notes.

Now for the AuroraEndpoint, SubnetName, SecurityGroupName enter the values that were saved from the last step.

For the SubnetName field, ensure that all subnets are selected.

Finally, click on ‘I acknowledge that AWS CloudFormation might create IAM resources’ and then click on Create stack.

Before moving on, wait for the stack to complete as the stack will only take 1 or 2 min to complete. Click on the page refresh and wait for the Status to read CREATE_COMPLETE.

Lambda Function

From the Services tab, start typing Lambda and click on the service when it appears.

On the left of the Lambda service page, click on Functions if not already selected.

Now click on the Function name that starts with the CloudFormation stack name used in this section.

There is an adjustable setting in the lambda function that will will replicate QLDB’s user view (most current) or the user view and all prior revisions. Scroll down to the Environment variables block and take a look at the variable with the Key named HISTORY. For this example, we have it set to False but if you wanted to record the prior revisions as well as the current user view, this can be changed to True. Leave as False for this section, we will look at recording all revisions in the bonus lab.

Additionally, the variable with the Key named ORDER, when True will add revisions in order. So if version 1, version 0, and version 2 are all in the Kinesis stream then they will be processed into MySQL as version 0, version 1, then version 2. Note, that enabling order to True will increase the amount of application logic needed to process streams and will increase overall latency. Leave ORDER as False for this lab.

The last step in setting up the architecture is to now enable the Kinesis stream as a trigger the for Lambda. In the Designer block, click on the Kinesis trigger.

The Kinesis trigger will show up and click on the check box to select the Trigger. Then click on Enable.

The architecture is now completed. Lets take a recap at what was done.

  • Set up an Aurora Serverless MySQL database.
  • Build a QLDB Ledger
  • Start a continuous QLDB stream
  • Create the MySQL database and create the schema
  • Configure the lambda function (Stream consumer)
  • Start simulating load into QLDB
  • Execute SQL commands on Amazon Aurora
  • Clean up

Head over to the next step to start simulating load into QLDB

Optional - Know your requirements

Leverage the target databases features for uniqueness, error handling, and indexing. For example, if the use case is to replicate all QLDB documents and revisions into mysql then the below logic is all that is needed to stream into MySQL.

   with conn.cursor() as cur:
       history_statement = f'replace into {table_name.lower()}bonus {col.lower()} values {val}'
       cur.execute(history_statement)
   conn.commit()

MySQL error 1062 will keep the MySQL tables free from duplicates when PRIMARY_KEY(documentId,version) is used. Additionally, there is no need to read before write so a serializable isolation level is no longer required.

Restricting the replication to just the QLDB user view or maintaining order will increase the needed logic. Having a clear written understanding of the business and functional requirements can reduce development overhead while delivering the same functionality.