動的パーティショニングを活用した Kinesis Data Firehose による NDJSON 処理
はじめに
Kinesis Data Firehose は動的パーティショニングに対応しました。この機能により、開発者は NDJSON (Newline Delimited JSON) への変換のためだけに Lambda 関数を使用する必要がなくなりました。
AWS リソースの作成
まず、以下の内容で CloudFormation テンプレート を作成します。重要なポイントは、DynamicPartitioningConfiguration
(13-14 行目) および ProcessingConfiguration
(17-29 行目) のセクションです。
AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: ndjson-firehose
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt S3Bucket.Arn
BufferingHints:
IntervalInSeconds: 60
DynamicPartitioningConfiguration:
Enabled: true
Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
ProcessingConfiguration:
Enabled: true
Processors:
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: '\\n'
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{user_id: .user_id}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn
S3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
IAMRoleKinesisFirehose:
Type: AWS::IAM::Role
Properties:
RoleName: ndjson-firehose-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
MaxSessionDuration: 3600
Policies:
- PolicyName: policy1
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3Bucket.Arn
- !Sub ${S3Bucket.Arn}/*
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
Condition:
StringEquals:
kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
StringLike:
kms:EncryptionContext:aws:s3:arn:
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*
<YOUR_S3_BUCKET>
をバケット名に置き換え、以下のコマンドでスタックをデプロイします。
aws cloudformation deploy \
--template-file stack.yaml \
--stack-name firehose-ndjson-sample \
--s3-bucket <YOUR_S3_BUCKET> \
--s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
--capabilities CAPABILITY_NAMED_IAM
セットアップのテスト
作成した Delivery Stream に JSON レコードを送信してテストします。
Base64 エンコーディング
データ Blob は、公式ドキュメントに記載されているように送信前に Base64 エンコードする必要があります。
The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.
$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==
$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==
データの送信
以下のコマンドを使ってデータを送信します。
$ echo '{
"DeliveryStreamName": "ndjson-firehose",
"Records": [
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
]
}' > input.json
$ aws firehose put-record-batch --cli-input-json file://~/input.json
{
"FailedPutCount": 0,
"Encrypted": false,
...
}
S3 に保存されたデータの確認
S3 バケット内のデータを確認します。
$ aws s3 ls --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/
2022-07-26 23:52:47 69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ aws s3 cp s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
1 {"user_id": 1, "message": "Hello"}
2 {"user_id": 1, "message": "World"}
リソースの削除
コストが発生しないよう、以下のコマンドでリソースを削除してください。
aws s3 rm --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample
まとめ
Kinesis Data Firehose の動的パーティショニング機能は、NDJSON の処理を簡素化し、多くのケースで Lambda 関数を不要にします。この機能により、AWS ユーザーの効率性が向上し、運用負荷が軽減されます。
Happy Coding! 🚀