Kinesis Shard Separation Using AWS IoT Topic Payloads
Introduction
When streaming data from AWS IoT Core to Kinesis Data Streams, optimizing shard separation based on topic payload values can improve performance and scalability.
Prerequisites
Ensure you have the following installed on your computer:
- AWS SAM CLI
- Python 3.x
Setting Up the Project
Directory Structure
Organize your project as shown below:
/
|-- src/
| |-- __init__.py
| |-- lambda_function.py
| `-- requirements.txt
`-- template.yaml
AWS SAM Template
Use the following AWS SAM template to define your application and resources.
In the AWS IoT topic rule, PartitionKey: ${customer_id}
is specified (line 37).
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
Lambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./src/
Events:
Kinesis:
Type: Kinesis
Properties:
BatchSize: 100
BisectBatchOnFunctionError: true
Enabled: true
StartingPosition: LATEST
Stream: !GetAtt Kinesis.Arn
FunctionName: aws_iot_kinesis_partition_lambda
Handler: lambda_function.lambda_handler
Role: !GetAtt IamRole.Arn
Runtime: python3.8
Kinesis:
Type: AWS::Kinesis::Stream
Properties:
Name: aws_iot_kinesis_partition_stream
RetentionPeriodHours: 24
ShardCount: 1
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: aws_iot_kinesis_partition_topic_rule
TopicRulePayload:
Actions:
- Kinesis:
PartitionKey: ${customer_id}
RoleArn: !GetAtt IamRole.Arn
StreamName: aws_iot_kinesis_partition_stream
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'
IamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
- Effect: Allow
Principal:
Service: iot.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:DescribeStream
- kinesis:PutRecord
Resource:
- !GetAtt Kinesis.Arn
PolicyName: policy
LogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
RetentionInDays: 1
Writing the Lambda Function
requirements.txt
Leave the requirements.txt
file empty. The AWS Lambda environment includes boto3
by default.
lambda_function.py
Implement the following Python script to process data streamed by Kinesis:
import json
def lambda_handler(event, context):
for record in event['Records']:
print(json.dumps(record['kinesis']))
Deploying the Application
Use the AWS SAM CLI to build and deploy the application:
sam build
sam deploy --stack-name aws-iot-kinesis-partition-lambda --capabilities CAPABILITY_NAMED_IAM
Testing the Setup
Sending Test Data
Send test payloads to the AWS IoT topic using the following command:
aws iot-data publish \
--topic aws-iot-kinesis-partition-topic \
--payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
Verifying Lambda Logs
Check the Lambda logs to confirm that the partitionKey
matches the customer_id
in your payload.
Cleaning Up
Remove the provisioned resources using:
sam delete --stack-name aws-iot-kinesis-partition-lambda
Conclusion
Implementing Kinesis shard separation based on AWS IoT topic payloads ensures efficient data streaming and processing. This approach optimizes shard usage and improves overall scalability.
Happy Coding! 🚀