How to Configure Separators for Kinesis Firehose in AWS IoT Core Topic Rules
Introduction
When integrating Kinesis Firehose with AWS IoT Core topic rules, it’s crucial to correctly configure the record separator. This post provides a step-by-step guide on configuring the separator using CloudFormation templates, including examples, testing, and potential errors.
Setting Up AWS Resources
Below is the CloudFormation template to create the necessary AWS resources. Pay close attention to the Separator: |+ <NEW_LINE>
configuration on lines 13-14.
AWSTemplateFormatVersion: "2010-09-09"
Resources:
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: topic_rule_firehose_separator_test
TopicRulePayload:
Actions:
- Firehose:
DeliveryStreamName: !Ref Firehose
RoleArn: !GetAtt IamTopicRule.Arn
Separator: |+
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: !Sub
SELECT * FROM 'topic_rule_firehose_separator_test'
Firehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: topic-rule-firehose-separator-test
DeliveryStreamType: DirectPut
S3DestinationConfiguration:
BucketARN: !GetAtt S3.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 5
CompressionFormat: GZIP
ErrorOutputPrefix: "error/!{firehose:error-output-type}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
Prefix: "success/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
RoleARN: !GetAtt IamFirehose.Arn
S3:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: topic-rule-firehose-separator-test
PublicAccessBlockConfiguration:
BlockPublicAcls: TRUE
BlockPublicPolicy: TRUE
IgnorePublicAcls: TRUE
RestrictPublicBuckets: TRUE
IamTopicRule:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: iot.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: firehose:PutRecord
Resource:
- !GetAtt Firehose.Arn
PolicyName: policy
RoleName: iam-topic-rule
IamFirehose:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref AWS::AccountId
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource: "*"
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3.Arn
- Fn::Sub:
- ${arn}/*
- {arn: !GetAtt S3.Arn}
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/topic-rule-firehose-separator-test
- Effect: Allow
Action:
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
Resource: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
- Effect: Allow
Action:
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
Condition:
StringEquals:
kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
StringLike:
kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
PolicyName: policy
RoleName: iam-firehose
Deploy the CloudFormation stack with the following command:
aws cloudformation deploy --template template.yaml --stack-name topic-rule-firehose-separator-test --capabilities CAPABILITY_NAMED_IAM
Testing the Configuration
Run the following command to verify the topic rule configuration:
aws iot get-topic-rule --rule-name topic_rule_firehose_separator_test
The separator
configuration should appear on line 12 in the output.
{
"ruleArn": "arn:aws:iot:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:rule/topic_rule_firehose_separator_test",
"rule": {
"ruleName": "topic_rule_firehose_separator_test",
"sql": "SELECT * FROM 'topic_rule_firehose_separator_test'",
"createdAt": "2020-05-13T10:29:18+09:00",
"actions": [
{
"firehose": {
"roleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/iam-topic-rule",
"deliveryStreamName": "topic-rule-firehose-separator-test",
"separator": "\n"
}
}
],
"ruleDisabled": false,
"awsIotSqlVersion": "2016-03-23"
}
}
Publishing Test Messages
Publish test messages to the topic topic_rule_firehose_separator_test
:
aws iot-data publish \
--topic topic_rule_firehose_separator_test \
--payload '{"id": 1, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
aws iot-data publish \
--topic topic_rule_firehose_separator_test \
--payload '{"id": 2, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
Validating S3 Output
Retrieve and inspect the object from the S3 bucket:
# Check an object.
$ aws s3 ls topic-rule-firehose-separator-test --recursive
2020-05-14 11:30:59 68 success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz
# Download the object.
$ aws s3 cp s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz ./result.gz
The output should contain the two records, separated by the configured separator.
# Check the JSON.
$ gunzip -c result.gz > result.json
$ cat result.json
{"id": 1, "message": "Hello from AWS IoT"}
{"id": 2, "message": "Hello from AWS IoT"}
# Delete the results.
$ rm result.gz result.json
$ aws s3 rm s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz
Common Issues
Using an incorrect separator such as Separator: \n
can cause a validation error:
1 validation error detected: Value '\n' at 'topicRulePayload.actions.1.member.firehose.separator' failed to satisfy constraint: Member must satisfy regular expression pattern: ([\n\t])|(\r\n)|(,)
Use |+
for multi-line strings to avoid such issues.
Cleaning Up Resources
To delete the provisioned resources, run the following command:
aws cloudformation delete-stack --stack-name topic-rule-firehose-separator-test
Conclusion
Configuring the separator for Kinesis Firehose actions in AWS IoT Core topic rules enables seamless data streaming to S3 with the desired formatting. Follow the steps outlined here to implement and validate your setup effectively.
Happy Coding! 🚀