Streaming OPC UA Data to Kinesis Data Streams via SiteWise Edge Gateway
Introduction
This guide explains how to stream OPC UA data to Kinesis Data Streams through AWS SiteWise Edge Gateway. An EC2 instance is configured as a dummy OPC UA server to demonstrate the process.
Setting Up AWS Resources
Setting Up an OPC UA Server
To create a dummy OPC UA server, use the opcua-asyncio
Python library. Follow these steps:
- Install the required package:
pip install asyncua
- Download the example script (
server-minimal.py
) to your EC2 instance:
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
- Run the script:
python server-minimal.py
Setting Up a Greengrass V2 Core Device
Set up a Greengrass V2 Core Device following the official documentation. After installation, perform the following:
- Deploy the
aws.greengrass.StreamManager
component. - Add the IAM policy below to the Token Exchange Role to allow data transmission to Kinesis Data Streams:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
}
]
}
Setting Up a SiteWise Edge Gateway
Set up a SiteWise Edge Gateway. Key steps include:
- Gateway Configuration: Choose
Advanced setup
.
- Data Processing Pack: Not required.
- Publisher Configuration: Skip this step.
- Add Data Source: Specify the local endpoint of your OPC UA server and set the Greengrass stream name (e.g.,
SiteWise_Stream_Kinesis
).
- Review: Review the configuration and press the
Create
button.
Creating a Kinesis Data Stream
Create a Kinesis Data Stream as the destination for your OPC UA data.
Configuring Kinesis Data Firehose
To persist the streamed data, configure a Kinesis Data Firehose with the following:
- Source: The Kinesis Data Stream created above.
- Destination: An S3 bucket. Use the Dynamic Partitioning feature for outputting NDJSON format.
For detailed instructions on handling NDJSON, refer to this blog post.
Creating a Greengrass Component
To stream data from the Greengrass Stream to Kinesis Data Streams, you need to create a custom Greengrass component. For detailed information about developing Greengrass components, refer to the official documentation.
Directory Structure
Organize the component’s files in the following structure:
/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip
requirements.txt
Create a requirements.txt
file and install the required dependencies using:
pip install -r requirements.txt
The stream-manager
library is essential for interacting with Greengrass Stream Manager.
Dependencies:
cbor2~=5.4.2
stream-manager==1.1.1
Python Script
Create kinesis_data_stream.py
with the following code. This script leverages the Greengrass Stream Manager SDK to stream data into a Kinesis Data Stream.
"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""
import argparse
import asyncio
import logging
import time
from stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def main(
stream_name: str,
kinesis_stream_name: str,
batch_size: int = None
):
try:
# Create a client for the StreamManager
client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + stream_name,
kinesis_stream_name=kinesis_stream_name,
batch_size=batch_size,
)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
while True:
time.sleep(1)
except asyncio.TimeoutError:
logger.exception("Timed out while executing")
except Exception:
logger.exception("Exception while running")
finally:
# Always close the client to avoid resource leaks
if client:
client.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
parser.add_argument('--kinesis-stream', required=True)
parser.add_argument('--batch-size', required=False, type=int, default=500)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
logger.info(f'args: {args.__dict__}')
main(args.greengrass_stream, args.kinesis_stream, args.batch_size)
Notes
- The
stream_manager_sdk.zip
file should contain the necessary SDK for your Greengrass component. - Update the
recipe.yaml
file to include the component’s configuration (not included in this guide but essential for deployment).
Refer to the Greengrass SDK documentation for additional details and sample code.
Component Recipe (recipe.yaml)
Create a recipe.yaml
file with the following content. In this example, the component name is jp.co.xyz.StreamManagerKinesis
.
# Replace $ArtifactsS3Bucket with your value to complete component registration.
RecipeFormatVersion: 2020-01-25
ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: '^2.0.0'
ComponentConfiguration:
DefaultConfiguration:
GreengrassStream: SiteWise_Stream_Kinesis
KinesisStream: ''
BatchSize: 100 # minimum 1, maximum 500
Manifests:
- Platform:
os: linux
Lifecycle:
Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
Run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
--greengrass-stream {configuration:/GreengrassStream} \
--kinesis-stream {configuration:/KinesisStream} \
--batch-size {configuration:/BatchSize}
Artifacts:
- URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
Unarchive: ZIP
This recipe includes the following configurable parameters in ComponentConfiguration
:
Name | Description |
---|---|
GreengrassStream | Name of the Greengrass stream |
KinesisStream | Kinesis Data Stream name to receive OPC UA data |
BatchSize | Batch size for data transfer (minimum: 1, maximum: 500) |
Uploading Component Artifact
Archive the component files and upload them to your S3 bucket using the following commands:
S3_BUCKET=<YOUR_BUCKET_NAME>
VERSION=1.0.0
zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip
Component Registration
To register the component:
- Copy the contents of the
recipe.yaml
file created earlier. - Replace
$ArtifactsS3Bucket
with the actual S3 bucket name where the component artifact resides.
Component Deployment
Follow these steps to deploy the component:
- Click Deploy.
- Click Configure component.
- Update the component’s configuration with the required details. Set the
KinesisStream
parameter to the Kinesis Data Stream name you created earlier.
- Review the configuration and deploy the component.
Once the deployment is complete, the component will stream data from the Greengrass Stream to the specified Kinesis Data Stream.
Testing
To verify the setup, check the objects in your S3 bucket using the following command:
aws s3 cp s3://<YOUR_BUCKET_NAME>/... ./
The retrieved data should resemble the following example:
{
"propertyAlias": "/MyObject/MyVariable",
"propertyValues": [
{
"value": {
"doubleValue": 7.699999999999997
},
"timestamp": {
"timeInSeconds": 1661581962,
"offsetInNanos": 9000000
},
"quality": "GOOD"
}
]
}
If the data appears as expected, the system is successfully streaming OPC UA data to the Kinesis Data Stream and persisting it in S3.
Conclusion
This guide walked you through the process of streaming OPC UA data to Kinesis Data Streams using an EC2-based OPC UA server and AWS SiteWise Edge Gateway. By leveraging AWS IoT services such as Greengrass V2, Kinesis and SiteWise, you can efficiently integrate and persist industrial data into scalable cloud infrastructure.
Happy Coding! 🚀