Simplified NDJSON Handling with Kinesis Data Firehose Dynamic Partitioning

Simplified NDJSON Handling with Kinesis Data Firehose Dynamic Partitioning

Takahiro Iwasa
Takahiro Iwasa
3 min read
Firehose Kinesis

Introduction

Kinesis Data Firehose recently introduced support for dynamic partitioning. With this, developers no longer require Lambda functions just to convert to NDJSON (Newline Delimited JSON).

Creating AWS Resources

To get started, create a CloudFormation template with the following content. Key points include the DynamicPartitioningConfiguration (lines 13-14) and ProcessingConfiguration (lines 17-29) sections.

AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: ndjson-firehose
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt S3Bucket.Arn
        BufferingHints:
          IntervalInSeconds: 60
        DynamicPartitioningConfiguration:
          Enabled: true
        Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
        ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: AppendDelimiterToRecord
              Parameters:
                - ParameterName: Delimiter
                  ParameterValue: '\\n'
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{user_id: .user_id}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
        RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn

  S3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  IAMRoleKinesisFirehose:
    Type: AWS::IAM::Role
    Properties:
      RoleName: ndjson-firehose-role
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      MaxSessionDuration: 3600
      Policies:
        - PolicyName: policy1
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource:
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt S3Bucket.Arn
                  - !Sub ${S3Bucket.Arn}/*
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - kms:GenerateDataKey
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
                Condition:
                  StringEquals:
                    kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:s3:arn:
                      - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
                      - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*

Replace <YOUR_S3_BUCKET> with your bucket’s name and deploy the stack using the following command:

aws cloudformation deploy \
  --template-file stack.yaml \
  --stack-name firehose-ndjson-sample \
  --s3-bucket <YOUR_S3_BUCKET> \
  --s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
  --capabilities CAPABILITY_NAMED_IAM

Testing the Setup

Test the setup by ingesting two JSON records into the delivery stream:

Base64 Encoding

Data blobs must be Base64 encoded before being sent described in the official documentation.

The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.

Encode your sample JSON as follows:

$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==

$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==

Sending Data

Use the AWS CLI to send data to the Firehose stream:

$ echo '{
    "DeliveryStreamName": "ndjson-firehose",
    "Records": [
        {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
        {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
    ]
}' > input.json

$ aws firehose put-record-batch --cli-input-json file://~/input.json
{
    "FailedPutCount": 0,
    "Encrypted": false,
    ...
}

Verifying in S3

Check the objects in your S3 bucket:

$ aws s3 ls --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/
2022-07-26 23:52:47         69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751

$ aws s3 cp s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751

$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
     1  {"user_id": 1, "message": "Hello"}
     2  {"user_id": 1, "message": "World"}

Cleaning Up Resources

To avoid incurring costs, clean up the resources:

aws s3 rm --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample

Conclusion

Dynamic partitioning in Kinesis Data Firehose simplifies the handling of NDJSON by removing the need for Lambda functions in many cases. This feature enhances efficiency and reduces operational overhead for AWS users.

Happy Coding! 🚀

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.