How to Configure Separators for Kinesis Firehose in AWS IoT Core Topic Rules

How to Configure Separators for Kinesis Firehose in AWS IoT Core Topic Rules

Takahiro Iwasa
Takahiro Iwasa
4 min read
CloudFormation Firehose IoT Kinesis

Introduction

When integrating Kinesis Firehose with AWS IoT Core topic rules, it’s crucial to correctly configure the record separator. This post provides a step-by-step guide on configuring the separator using CloudFormation templates, including examples, testing, and potential errors.

Setting Up AWS Resources

Below is the CloudFormation template to create the necessary AWS resources. Pay close attention to the Separator: |+ <NEW_LINE> configuration on lines 13-14.

AWSTemplateFormatVersion: "2010-09-09"

Resources:
  TopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: topic_rule_firehose_separator_test
      TopicRulePayload:
        Actions:
          - Firehose:
              DeliveryStreamName: !Ref Firehose
              RoleArn: !GetAtt IamTopicRule.Arn
              Separator: |+

        AwsIotSqlVersion: 2016-03-23
        RuleDisabled: false
        Sql: !Sub
          SELECT * FROM 'topic_rule_firehose_separator_test'

  Firehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: topic-rule-firehose-separator-test
      DeliveryStreamType: DirectPut
      S3DestinationConfiguration:
        BucketARN: !GetAtt S3.Arn
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 5
        CompressionFormat: GZIP
        ErrorOutputPrefix: "error/!{firehose:error-output-type}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
        Prefix: "success/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
        RoleARN: !GetAtt IamFirehose.Arn

  S3:
    Type: AWS::S3::Bucket
    Properties:
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: topic-rule-firehose-separator-test
      PublicAccessBlockConfiguration:
        BlockPublicAcls: TRUE
        BlockPublicPolicy: TRUE
        IgnorePublicAcls: TRUE
        RestrictPublicBuckets: TRUE

  IamTopicRule:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: iot.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: firehose:PutRecord
                Resource:
                  - !GetAtt Firehose.Arn
          PolicyName: policy
      RoleName: iam-topic-rule

  IamFirehose:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId: !Ref AWS::AccountId
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource: "*"
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt S3.Arn
                  - Fn::Sub:
                      - ${arn}/*
                      - {arn: !GetAtt S3.Arn}
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/topic-rule-firehose-separator-test
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                Resource: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
              - Effect: Allow
                Action:
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
                Condition:
                  StringEquals:
                    kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
          PolicyName: policy
      RoleName: iam-firehose

Deploy the CloudFormation stack with the following command:

aws cloudformation deploy --template template.yaml --stack-name topic-rule-firehose-separator-test --capabilities CAPABILITY_NAMED_IAM

Testing the Configuration

Run the following command to verify the topic rule configuration:

aws iot get-topic-rule --rule-name topic_rule_firehose_separator_test

The separator configuration should appear on line 12 in the output.

{
  "ruleArn": "arn:aws:iot:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:rule/topic_rule_firehose_separator_test",
  "rule": {
    "ruleName": "topic_rule_firehose_separator_test",
    "sql": "SELECT * FROM 'topic_rule_firehose_separator_test'",
    "createdAt": "2020-05-13T10:29:18+09:00",
    "actions": [
      {
        "firehose": {
          "roleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/iam-topic-rule",
          "deliveryStreamName": "topic-rule-firehose-separator-test",
          "separator": "\n"
        }
      }
    ],
    "ruleDisabled": false,
    "awsIotSqlVersion": "2016-03-23"
  }
}

Publishing Test Messages

Publish test messages to the topic topic_rule_firehose_separator_test:

aws iot-data publish \
  --topic topic_rule_firehose_separator_test \
  --payload '{"id": 1, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

aws iot-data publish \
  --topic topic_rule_firehose_separator_test \
  --payload '{"id": 2, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

Validating S3 Output

Retrieve and inspect the object from the S3 bucket:

# Check an object.
$ aws s3 ls topic-rule-firehose-separator-test --recursive
2020-05-14 11:30:59         68 success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz

# Download the object.
$ aws s3 cp s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz ./result.gz

The output should contain the two records, separated by the configured separator.

# Check the JSON.
$ gunzip -c result.gz > result.json
$ cat result.json
{"id": 1, "message": "Hello from AWS IoT"}
{"id": 2, "message": "Hello from AWS IoT"}

# Delete the results.
$ rm result.gz result.json
$ aws s3 rm s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz

Common Issues

Using an incorrect separator such as Separator: \n can cause a validation error:

1 validation error detected: Value '\n' at 'topicRulePayload.actions.1.member.firehose.separator' failed to satisfy constraint: Member must satisfy regular expression pattern: ([\n\t])|(\r\n)|(,)

Use |+ for multi-line strings to avoid such issues.

Cleaning Up Resources

To delete the provisioned resources, run the following command:

aws cloudformation delete-stack --stack-name topic-rule-firehose-separator-test

Conclusion

Configuring the separator for Kinesis Firehose actions in AWS IoT Core topic rules enables seamless data streaming to S3 with the desired formatting. Follow the steps outlined here to implement and validate your setup effectively.

Happy Coding! 🚀

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.