AWS IoT トピックペイロードを利用した Kinesis シャード分離

AWS IoT トピックペイロードを利用した Kinesis シャード分離

岩佐 孝浩
岩佐 孝浩
3 min read
IoT Kinesis

はじめに

AWS IoT Core から Kinesis Data Streams にデータをストリーミングする際、トピックペイロードの値を活用してシャード分離を最適化することで、パフォーマンスとスケーラビリティを向上させることができます。

前提条件

以下がインストールされていることを確認してください。

プロジェクトのセットアップ

ディレクトリ構成

以下のようなプロジェクト構成になります。

/
|-- src/
|   |-- __init__.py
|   |-- lambda_function.py
|   `-- requirements.txt
`-- template.yaml

AWS SAM テンプレート

以下の AWS SAM テンプレート を使用して、アプリケーションとリソースを定義します。

AWS IoT トピックルールでは、PartitionKey: ${customer_id} が指定されています(37 行目)。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
  Lambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./src/
      Events:
        Kinesis:
          Type: Kinesis
          Properties:
            BatchSize: 100
            BisectBatchOnFunctionError: true
            Enabled: true
            StartingPosition: LATEST
            Stream: !GetAtt Kinesis.Arn
      FunctionName: aws_iot_kinesis_partition_lambda
      Handler: lambda_function.lambda_handler
      Role: !GetAtt IamRole.Arn
      Runtime: python3.8

  Kinesis:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: aws_iot_kinesis_partition_stream
      RetentionPeriodHours: 24
      ShardCount: 1

  TopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: aws_iot_kinesis_partition_topic_rule
      TopicRulePayload:
        Actions:
          - Kinesis:
              PartitionKey: ${customer_id}
              RoleArn: !GetAtt IamRole.Arn
              StreamName: aws_iot_kinesis_partition_stream
        AwsIotSqlVersion: 2016-03-23
        RuleDisabled: false
        Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'

  IamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
          - Effect: Allow
            Principal:
              Service: iot.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:DescribeStream
                  - kinesis:PutRecord
                Resource:
                  - !GetAtt Kinesis.Arn
          PolicyName: policy

  LogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
      RetentionInDays: 1

Lambda 関数の記述

requirements.txt

requirements.txt ファイルは空のままにしてください。AWS Lambda 環境には boto3 がデフォルトで含まれています。

lambda_function.py

以下の Python スクリプトを実装して、Kinesis からストリームされたデータを処理します。

import json


def lambda_handler(event, context):
    for record in event['Records']:
        print(json.dumps(record['kinesis']))

アプリケーションのデプロイ

AWS SAM CLI を使用して、アプリケーションをビルドおよびデプロイします。

sam build
sam deploy --stack-name aws-iot-kinesis-partition-lambda --capabilities CAPABILITY_NAMED_IAM

セットアップのテスト

テストデータの送信

次のコマンドを実行し、AWS IoT トピックにテストペイロードを送信します。

aws iot-data publish \
  --topic aws-iot-kinesis-partition-topic \
  --payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

Lambda ログの確認

Lambda ログを確認して、partitionKey がペイロード内の customer_id に一致していることを確認してください。

Lambda ログ出力の例

クリーンアップ

以下を使用して、プロビジョニングされたリソースを削除します。

sam delete --stack-name aws-iot-kinesis-partition-lambda

まとめ

AWS IoT トピックペイロードに基づく Kinesis シャード分離の実装により、効率的なデータストリーミングと処理を実現できます。このアプローチによって、シャード使用が最適化され、全体的なスケーラビリティが向上します。

Happy Coding! 🚀

岩佐 孝浩

岩佐 孝浩

Software Developer at KAKEHASHI Inc.
AWS を活用したクラウドネイティブ・アプリケーションの要件定義・設計・開発に従事。 株式会社カケハシで、処方箋データ収集の新たな基盤の構築に携わっています。 Japan AWS Top Engineers 2020-2023