Kinesis Shard Separation Using AWS IoT Topic Payloads

Kinesis Shard Separation Using AWS IoT Topic Payloads

Takahiro Iwasa
Takahiro Iwasa
3 min read
IoT Kinesis

Introduction

When streaming data from AWS IoT Core to Kinesis Data Streams, optimizing shard separation based on topic payload values can improve performance and scalability.

Prerequisites

Ensure you have the following installed on your computer:

Setting Up the Project

Directory Structure

Organize your project as shown below:

/
|-- src/
|   |-- __init__.py
|   |-- lambda_function.py
|   `-- requirements.txt
`-- template.yaml

AWS SAM Template

Use the following AWS SAM template to define your application and resources.

In the AWS IoT topic rule, PartitionKey: ${customer_id} is specified (line 37).

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
  Lambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./src/
      Events:
        Kinesis:
          Type: Kinesis
          Properties:
            BatchSize: 100
            BisectBatchOnFunctionError: true
            Enabled: true
            StartingPosition: LATEST
            Stream: !GetAtt Kinesis.Arn
      FunctionName: aws_iot_kinesis_partition_lambda
      Handler: lambda_function.lambda_handler
      Role: !GetAtt IamRole.Arn
      Runtime: python3.8

  Kinesis:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: aws_iot_kinesis_partition_stream
      RetentionPeriodHours: 24
      ShardCount: 1

  TopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: aws_iot_kinesis_partition_topic_rule
      TopicRulePayload:
        Actions:
          - Kinesis:
              PartitionKey: ${customer_id}
              RoleArn: !GetAtt IamRole.Arn
              StreamName: aws_iot_kinesis_partition_stream
        AwsIotSqlVersion: 2016-03-23
        RuleDisabled: false
        Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'

  IamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
          - Effect: Allow
            Principal:
              Service: iot.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:DescribeStream
                  - kinesis:PutRecord
                Resource:
                  - !GetAtt Kinesis.Arn
          PolicyName: policy

  LogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
      RetentionInDays: 1

Writing the Lambda Function

requirements.txt

Leave the requirements.txt file empty. The AWS Lambda environment includes boto3 by default.

lambda_function.py

Implement the following Python script to process data streamed by Kinesis:

import json


def lambda_handler(event, context):
    for record in event['Records']:
        print(json.dumps(record['kinesis']))

Deploying the Application

Use the AWS SAM CLI to build and deploy the application:

sam build
sam deploy --stack-name aws-iot-kinesis-partition-lambda --capabilities CAPABILITY_NAMED_IAM

Testing the Setup

Sending Test Data

Send test payloads to the AWS IoT topic using the following command:

aws iot-data publish \
  --topic aws-iot-kinesis-partition-topic \
  --payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

Verifying Lambda Logs

Check the Lambda logs to confirm that the partitionKey matches the customer_id in your payload.

Cleaning Up

Remove the provisioned resources using:

sam delete --stack-name aws-iot-kinesis-partition-lambda

Conclusion

Implementing Kinesis shard separation based on AWS IoT topic payloads ensures efficient data streaming and processing. This approach optimizes shard usage and improves overall scalability.

Happy Coding! 🚀

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.