AWS IoT トピックペイロードを利用した Kinesis シャード分離
はじめに
AWS IoT Core から Kinesis Data Streams にデータをストリーミングする際、トピックペイロードの値を活用してシャード分離を最適化することで、パフォーマンスとスケーラビリティを向上させることができます。
前提条件
以下がインストールされていることを確認してください。
- AWS SAM CLI
- Python 3.x
プロジェクトのセットアップ
ディレクトリ構成
以下のようなプロジェクト構成になります。
/
|-- src/
| |-- __init__.py
| |-- lambda_function.py
| `-- requirements.txt
`-- template.yaml
AWS SAM テンプレート
以下の AWS SAM テンプレート を使用して、アプリケーションとリソースを定義します。
AWS IoT トピックルールでは、PartitionKey: ${customer_id}
が指定されています(37 行目)。
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
Lambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./src/
Events:
Kinesis:
Type: Kinesis
Properties:
BatchSize: 100
BisectBatchOnFunctionError: true
Enabled: true
StartingPosition: LATEST
Stream: !GetAtt Kinesis.Arn
FunctionName: aws_iot_kinesis_partition_lambda
Handler: lambda_function.lambda_handler
Role: !GetAtt IamRole.Arn
Runtime: python3.8
Kinesis:
Type: AWS::Kinesis::Stream
Properties:
Name: aws_iot_kinesis_partition_stream
RetentionPeriodHours: 24
ShardCount: 1
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: aws_iot_kinesis_partition_topic_rule
TopicRulePayload:
Actions:
- Kinesis:
PartitionKey: ${customer_id}
RoleArn: !GetAtt IamRole.Arn
StreamName: aws_iot_kinesis_partition_stream
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'
IamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
- Effect: Allow
Principal:
Service: iot.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:DescribeStream
- kinesis:PutRecord
Resource:
- !GetAtt Kinesis.Arn
PolicyName: policy
LogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
RetentionInDays: 1
Lambda 関数の記述
requirements.txt
requirements.txt
ファイルは空のままにしてください。AWS Lambda 環境には boto3
がデフォルトで含まれています。
lambda_function.py
以下の Python スクリプトを実装して、Kinesis からストリームされたデータを処理します。
import json
def lambda_handler(event, context):
for record in event['Records']:
print(json.dumps(record['kinesis']))
アプリケーションのデプロイ
AWS SAM CLI を使用して、アプリケーションをビルドおよびデプロイします。
sam build
sam deploy --stack-name aws-iot-kinesis-partition-lambda --capabilities CAPABILITY_NAMED_IAM
セットアップのテスト
テストデータの送信
次のコマンドを実行し、AWS IoT トピックにテストペイロードを送信します。
aws iot-data publish \
--topic aws-iot-kinesis-partition-topic \
--payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
Lambda ログの確認
Lambda ログを確認して、partitionKey
がペイロード内の customer_id
に一致していることを確認してください。
クリーンアップ
以下を使用して、プロビジョニングされたリソースを削除します。
sam delete --stack-name aws-iot-kinesis-partition-lambda
まとめ
AWS IoT トピックペイロードに基づく Kinesis シャード分離の実装により、効率的なデータストリーミングと処理を実現できます。このアプローチによって、シャード使用が最適化され、全体的なスケーラビリティが向上します。
Happy Coding! 🚀