AWS IoT Core トピックルールで Kinesis Firehose のセパレータを設定する方法

AWS IoT Core トピックルールで Kinesis Firehose のセパレータを設定する方法

岩佐 孝浩
岩佐 孝浩
5 min read
CloudFormation Firehose IoT Kinesis

はじめに

Kinesis FirehoseAWS IoT Core トピックルール と統合する際、レコードセパレーター (record separator) を正しく設定することが重要です。この記事では、CloudFormation テンプレートを使用したセパレーター設定の手順、例、テスト、およびよくあるエラーについて解説します。

AWS リソースのセットアップ

以下は必要な AWS リソースを作成するための CloudFormation テンプレートです。13 行目から 14 行目Separator: |+ <NEW_LINE> 設定に注目してください。

AWSTemplateFormatVersion: "2010-09-09"

Resources:
  TopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: topic_rule_firehose_separator_test
      TopicRulePayload:
        Actions:
          - Firehose:
              DeliveryStreamName: !Ref Firehose
              RoleArn: !GetAtt IamTopicRule.Arn
              Separator: |+

        AwsIotSqlVersion: 2016-03-23
        RuleDisabled: false
        Sql: !Sub
          SELECT * FROM 'topic_rule_firehose_separator_test'

  Firehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: topic-rule-firehose-separator-test
      DeliveryStreamType: DirectPut
      S3DestinationConfiguration:
        BucketARN: !GetAtt S3.Arn
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 5
        CompressionFormat: GZIP
        ErrorOutputPrefix: "error/!{firehose:error-output-type}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
        Prefix: "success/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
        RoleARN: !GetAtt IamFirehose.Arn

  S3:
    Type: AWS::S3::Bucket
    Properties:
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: topic-rule-firehose-separator-test
      PublicAccessBlockConfiguration:
        BlockPublicAcls: TRUE
        BlockPublicPolicy: TRUE
        IgnorePublicAcls: TRUE
        RestrictPublicBuckets: TRUE

  IamTopicRule:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: iot.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: firehose:PutRecord
                Resource:
                  - !GetAtt Firehose.Arn
          PolicyName: policy
      RoleName: iam-topic-rule

  IamFirehose:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId: !Ref AWS::AccountId
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource: "*"
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt S3.Arn
                  - Fn::Sub:
                      - ${arn}/*
                      - {arn: !GetAtt S3.Arn}
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/topic-rule-firehose-separator-test
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                Resource: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
              - Effect: Allow
                Action:
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
                Condition:
                  StringEquals:
                    kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
          PolicyName: policy
      RoleName: iam-firehose

以下のコマンドを使用して CloudFormation スタックをデプロイします。

aws cloudformation deploy --template template.yaml --stack-name topic-rule-firehose-separator-test --capabilities CAPABILITY_NAMED_IAM

設定のテスト

以下のコマンドを実行して、トピックルールの設定を確認します。

aws iot get-topic-rule --rule-name topic_rule_firehose_separator_test

出力の 12 行目separator 設定が表示されます。

{
  "ruleArn": "arn:aws:iot:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:rule/topic_rule_firehose_separator_test",
  "rule": {
    "ruleName": "topic_rule_firehose_separator_test",
    "sql": "SELECT * FROM 'topic_rule_firehose_separator_test'",
    "createdAt": "2020-05-13T10:29:18+09:00",
    "actions": [
      {
        "firehose": {
          "roleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/iam-topic-rule",
          "deliveryStreamName": "topic-rule-firehose-separator-test",
          "separator": "\n"
        }
      }
    ],
    "ruleDisabled": false,
    "awsIotSqlVersion": "2016-03-23"
  }
}

テストメッセージの発行

トピック topic_rule_firehose_separator_test にテストメッセージを発行します。

aws iot-data publish \
  --topic topic_rule_firehose_separator_test \
  --payload '{"id": 1, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

aws iot-data publish \
  --topic topic_rule_firehose_separator_test \
  --payload '{"id": 2, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

S3 出力の検証

S3 バケットからオブジェクトを取得して確認します。

# オブジェクトを確認
$ aws s3 ls topic-rule-firehose-separator-test --recursive
2020-05-14 11:30:59         68 success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz

# オブジェクトをダウンロード
$ aws s3 cp s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz ./result.gz

出力には、設定されたセパレーターで区切られた 2 つのレコードが含まれるはずです。

# JSON を確認
$ gunzip -c result.gz > result.json
$ cat result.json
{"id": 1, "message": "Hello from AWS IoT"}
{"id": 2, "message": "Hello from AWS IoT"}

# 結果を削除
$ rm result.gz result.json
$ aws s3 rm s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz

よくある問題

例えば、Separator: \n のような誤ったセパレーターを使用すると、検証エラーが発生します。

1 validation error detected: Value '\n' at 'topicRulePayload.actions.1.member.firehose.separator' failed to satisfy constraint: Member must satisfy regular expression pattern: ([\n\t])|(\r\n)|(,)

この問題を避けるために、マルチライン文字列のための |+ を使用してください。

クリーンアップ手順

作成したリソースを削除するには、以下のコマンドを実行します。

aws cloudformation delete-stack --stack-name topic-rule-firehose-separator-test

まとめ

Kinesis Firehose アクション における AWS IoT Core トピックルール のセパレーター設定により、S3 にデータを希望のフォーマットでスムーズにストリーミングすることができます。ここで説明した手順を参考に設定と検証を実施してください。

Happy Coding! 🚀

岩佐 孝浩

岩佐 孝浩

Software Developer at KAKEHASHI Inc.
AWS を活用したクラウドネイティブ・アプリケーションの要件定義・設計・開発に従事。 株式会社カケハシで、処方箋データ収集の新たな基盤の構築に携わっています。 Japan AWS Top Engineers 2020-2023