Simplified NDJSON Handling with Kinesis Data Firehose Dynamic Partitioning
Introduction
Kinesis Data Firehose recently introduced support for dynamic partitioning. With this, developers no longer require Lambda functions just to convert to NDJSON (Newline Delimited JSON).
Creating AWS Resources
To get started, create a CloudFormation template with the following content. Key points include the DynamicPartitioningConfiguration
(lines 13-14) and ProcessingConfiguration
(lines 17-29) sections.
AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: ndjson-firehose
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt S3Bucket.Arn
BufferingHints:
IntervalInSeconds: 60
DynamicPartitioningConfiguration:
Enabled: true
Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
ProcessingConfiguration:
Enabled: true
Processors:
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: '\\n'
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{user_id: .user_id}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn
S3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
IAMRoleKinesisFirehose:
Type: AWS::IAM::Role
Properties:
RoleName: ndjson-firehose-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
MaxSessionDuration: 3600
Policies:
- PolicyName: policy1
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3Bucket.Arn
- !Sub ${S3Bucket.Arn}/*
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
Condition:
StringEquals:
kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
StringLike:
kms:EncryptionContext:aws:s3:arn:
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*
Replace <YOUR_S3_BUCKET>
with your bucket’s name and deploy the stack using the following command:
aws cloudformation deploy \
--template-file stack.yaml \
--stack-name firehose-ndjson-sample \
--s3-bucket <YOUR_S3_BUCKET> \
--s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
--capabilities CAPABILITY_NAMED_IAM
Testing the Setup
Test the setup by ingesting two JSON records into the delivery stream:
Base64 Encoding
Data blobs must be Base64 encoded before being sent described in the official documentation.
The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.
Encode your sample JSON as follows:
$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==
$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==
Sending Data
Use the AWS CLI to send data to the Firehose stream:
$ echo '{
"DeliveryStreamName": "ndjson-firehose",
"Records": [
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
]
}' > input.json
$ aws firehose put-record-batch --cli-input-json file://~/input.json
{
"FailedPutCount": 0,
"Encrypted": false,
...
}
Verifying in S3
Check the objects in your S3 bucket:
$ aws s3 ls --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/
2022-07-26 23:52:47 69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ aws s3 cp s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
1 {"user_id": 1, "message": "Hello"}
2 {"user_id": 1, "message": "World"}
Cleaning Up Resources
To avoid incurring costs, clean up the resources:
aws s3 rm --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample
Conclusion
Dynamic partitioning in Kinesis Data Firehose simplifies the handling of NDJSON by removing the need for Lambda functions in many cases. This feature enhances efficiency and reduces operational overhead for AWS users.
Happy Coding! 🚀