動的パーティショニングを活用した Kinesis Data Firehose による NDJSON 処理

動的パーティショニングを活用した Kinesis Data Firehose による NDJSON 処理

岩佐 孝浩
岩佐 孝浩
4 min read
Firehose Kinesis

はじめに

Kinesis Data Firehose は動的パーティショニングに対応しました。この機能により、開発者は NDJSON (Newline Delimited JSON) への変換のためだけに Lambda 関数を使用する必要がなくなりました。

AWS リソースの作成

まず、以下の内容で CloudFormation テンプレート を作成します。重要なポイントは、DynamicPartitioningConfiguration (13-14 行目) および ProcessingConfiguration (17-29 行目) のセクションです。

AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: ndjson-firehose
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt S3Bucket.Arn
        BufferingHints:
          IntervalInSeconds: 60
        DynamicPartitioningConfiguration:
          Enabled: true
        Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
        ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: AppendDelimiterToRecord
              Parameters:
                - ParameterName: Delimiter
                  ParameterValue: '\\n'
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{user_id: .user_id}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
        RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn

  S3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  IAMRoleKinesisFirehose:
    Type: AWS::IAM::Role
    Properties:
      RoleName: ndjson-firehose-role
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      MaxSessionDuration: 3600
      Policies:
        - PolicyName: policy1
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource:
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt S3Bucket.Arn
                  - !Sub ${S3Bucket.Arn}/*
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - kms:GenerateDataKey
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
                Condition:
                  StringEquals:
                    kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:s3:arn:
                      - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
                      - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*

<YOUR_S3_BUCKET> をバケット名に置き換え、以下のコマンドでスタックをデプロイします。

aws cloudformation deploy \
  --template-file stack.yaml \
  --stack-name firehose-ndjson-sample \
  --s3-bucket <YOUR_S3_BUCKET> \
  --s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
  --capabilities CAPABILITY_NAMED_IAM

セットアップのテスト

作成した Delivery Stream に JSON レコードを送信してテストします。

Base64 エンコーディング

データ Blob は、公式ドキュメントに記載されているように送信前に Base64 エンコードする必要があります。

The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.

$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==

$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==

データの送信

以下のコマンドを使ってデータを送信します。

$ echo '{
    "DeliveryStreamName": "ndjson-firehose",
    "Records": [
        {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
        {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
    ]
}' > input.json

$ aws firehose put-record-batch --cli-input-json file://~/input.json
{
    "FailedPutCount": 0,
    "Encrypted": false,
    ...
}

S3 に保存されたデータの確認

S3 バケット内のデータを確認します。

$ aws s3 ls --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/
2022-07-26 23:52:47         69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751

$ aws s3 cp s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751

$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
     1  {"user_id": 1, "message": "Hello"}
     2  {"user_id": 1, "message": "World"}

リソースの削除

コストが発生しないよう、以下のコマンドでリソースを削除してください。

aws s3 rm --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample

まとめ

Kinesis Data Firehose の動的パーティショニング機能は、NDJSON の処理を簡素化し、多くのケースで Lambda 関数を不要にします。この機能により、AWS ユーザーの効率性が向上し、運用負荷が軽減されます。

Happy Coding! 🚀

岩佐 孝浩

岩佐 孝浩

Software Developer at KAKEHASHI Inc.
AWS を活用したクラウドネイティブ・アプリケーションの要件定義・設計・開発に従事。 株式会社カケハシで、処方箋データ収集の新たな基盤の構築に携わっています。 Japan AWS Top Engineers 2020-2023