Streaming OPC UA Data to Kinesis Data Streams via SiteWise Edge Gateway

Streaming OPC UA Data to Kinesis Data Streams via SiteWise Edge Gateway

Takahiro Iwasa
Takahiro Iwasa
6 min read
Greengrass IoT Kinesis OPC-UA SiteWise

Introduction

This guide explains how to stream OPC UA data to Kinesis Data Streams through AWS SiteWise Edge Gateway. An EC2 instance is configured as a dummy OPC UA server to demonstrate the process.

Overview Diagram

Setting Up AWS Resources

Setting Up an OPC UA Server

To create a dummy OPC UA server, use the opcua-asyncio Python library. Follow these steps:

  1. Install the required package:
pip install asyncua
  1. Download the example script (server-minimal.py) to your EC2 instance:
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
  1. Run the script:
python server-minimal.py

Setting Up a Greengrass V2 Core Device

Set up a Greengrass V2 Core Device following the official documentation. After installation, perform the following:

  • Deploy the aws.greengrass.StreamManager component.
  • Add the IAM policy below to the Token Exchange Role to allow data transmission to Kinesis Data Streams:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
    }
  ]
}

Setting Up a SiteWise Edge Gateway

Set up a SiteWise Edge Gateway. Key steps include:

  1. Gateway Configuration: Choose Advanced setup.

Step 1

  1. Data Processing Pack: Not required.

Step 2

  1. Publisher Configuration: Skip this step.

Step 3

  1. Add Data Source: Specify the local endpoint of your OPC UA server and set the Greengrass stream name (e.g., SiteWise_Stream_Kinesis).

Step 4

Step 4

Step 4

  1. Review: Review the configuration and press the Create button.

Step 5

Creating a Kinesis Data Stream

Create a Kinesis Data Stream as the destination for your OPC UA data.

Kinesis Data Stream Setup

Configuring Kinesis Data Firehose

To persist the streamed data, configure a Kinesis Data Firehose with the following:

  • Source: The Kinesis Data Stream created above.
  • Destination: An S3 bucket. Use the Dynamic Partitioning feature for outputting NDJSON format.

For detailed instructions on handling NDJSON, refer to this blog post.

Kinesis Firehose Setup

Creating a Greengrass Component

To stream data from the Greengrass Stream to Kinesis Data Streams, you need to create a custom Greengrass component. For detailed information about developing Greengrass components, refer to the official documentation.

Directory Structure

Organize the component’s files in the following structure:

/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip

requirements.txt

Create a requirements.txt file and install the required dependencies using:

pip install -r requirements.txt

The stream-manager library is essential for interacting with Greengrass Stream Manager.

Dependencies:

cbor2~=5.4.2
stream-manager==1.1.1

Python Script

Create kinesis_data_stream.py with the following code. This script leverages the Greengrass Stream Manager SDK to stream data into a Kinesis Data Stream.

"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""

import argparse
import asyncio
import logging
import time

from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def main(
        stream_name: str,
        kinesis_stream_name: str,
        batch_size: int = None
):
    try:
        # Create a client for the StreamManager
        client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(
                identifier="KinesisExport" + stream_name,
                kinesis_stream_name=kinesis_stream_name,
                batch_size=batch_size,
            )]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )

        while True:
            time.sleep(1)

    except asyncio.TimeoutError:
        logger.exception("Timed out while executing")
    except Exception:
        logger.exception("Exception while running")
    finally:
        # Always close the client to avoid resource leaks
        if client:
            client.close()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
    parser.add_argument('--kinesis-stream', required=True)
    parser.add_argument('--batch-size', required=False, type=int, default=500)
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()
    logger.info(f'args: {args.__dict__}')
    main(args.greengrass_stream, args.kinesis_stream, args.batch_size)

Notes

  • The stream_manager_sdk.zip file should contain the necessary SDK for your Greengrass component.
  • Update the recipe.yaml file to include the component’s configuration (not included in this guide but essential for deployment).

Refer to the Greengrass SDK documentation for additional details and sample code.

Component Recipe (recipe.yaml)

Create a recipe.yaml file with the following content. In this example, the component name is jp.co.xyz.StreamManagerKinesis.

# Replace $ArtifactsS3Bucket with your value to complete component registration.

RecipeFormatVersion: 2020-01-25

ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: '^2.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    GreengrassStream: SiteWise_Stream_Kinesis
    KinesisStream: ''
    BatchSize: 100 # minimum 1, maximum 500

Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
        python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
          --greengrass-stream {configuration:/GreengrassStream} \
          --kinesis-stream {configuration:/KinesisStream} \
          --batch-size {configuration:/BatchSize}
    Artifacts:
      - URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
        Unarchive: ZIP

This recipe includes the following configurable parameters in ComponentConfiguration:

NameDescription
GreengrassStreamName of the Greengrass stream
KinesisStreamKinesis Data Stream name to receive OPC UA data
BatchSizeBatch size for data transfer (minimum: 1, maximum: 500)

Uploading Component Artifact

Archive the component files and upload them to your S3 bucket using the following commands:

S3_BUCKET=<YOUR_BUCKET_NAME>
VERSION=1.0.0

zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip

Component Registration

To register the component:

  1. Copy the contents of the recipe.yaml file created earlier.
  2. Replace $ArtifactsS3Bucket with the actual S3 bucket name where the component artifact resides.

Component Registration

Component Deployment

Follow these steps to deploy the component:

  1. Click Deploy.

Deploy

  1. Click Configure component.

Configure Component

  1. Update the component’s configuration with the required details. Set the KinesisStream parameter to the Kinesis Data Stream name you created earlier.

Update Configuration

  1. Review the configuration and deploy the component.

Deploy Component

Once the deployment is complete, the component will stream data from the Greengrass Stream to the specified Kinesis Data Stream.

Testing

To verify the setup, check the objects in your S3 bucket using the following command:

aws s3 cp s3://<YOUR_BUCKET_NAME>/... ./

The retrieved data should resemble the following example:

{
  "propertyAlias": "/MyObject/MyVariable",
  "propertyValues": [
    {
      "value": {
        "doubleValue": 7.699999999999997
      },
      "timestamp": {
        "timeInSeconds": 1661581962,
        "offsetInNanos": 9000000
      },
      "quality": "GOOD"
    }
  ]
}

If the data appears as expected, the system is successfully streaming OPC UA data to the Kinesis Data Stream and persisting it in S3.

Conclusion

This guide walked you through the process of streaming OPC UA data to Kinesis Data Streams using an EC2-based OPC UA server and AWS SiteWise Edge Gateway. By leveraging AWS IoT services such as Greengrass V2, Kinesis and SiteWise, you can efficiently integrate and persist industrial data into scalable cloud infrastructure.

Happy Coding! 🚀

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.