SiteWise Edge Gateway を介して OPC UA データを Kinesis Data Streams にストリーム配信する方法

SiteWise Edge Gateway を介して OPC UA データを Kinesis Data Streams にストリーム配信する方法

岩佐 孝浩
岩佐 孝浩
9 min read
Greengrass IoT Kinesis OPC-UA SiteWise

はじめに

この投稿では、AWS SiteWise Edge Gateway を介して OPC UA データを Kinesis Data Streams にストリーム配信する方法を解説します。デモでは、EC2 インスタンスをダミーの OPC UA サーバーとして構成します。

Overview Diagram

AWS リソースのセットアップ

OPC UA サーバーのセットアップ

ダミーの OPC UA サーバーを作成するには、opcua-asyncio Python ライブラリを使用します。以下の手順を実行してください。

  1. 必要なパッケージをインストールします。
pip install asyncua
  1. サンプルスクリプト (server-minimal.py) を EC2 インスタンスにダウンロードします。
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
  1. スクリプトを実行します。
python server-minimal.py

Greengrass V2 Core デバイスのセットアップ

公式ドキュメント に従って Greengrass V2 Core デバイス をセットアップします。インストール後、以下を実行してください。

  • aws.greengrass.StreamManager コンポーネントをデプロイします。
  • トークン交換ロールに以下の IAM ポリシーを追加し、Kinesis Data Streams へのデータ送信を許可します。
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
    }
  ]
}

SiteWise Edge Gateway のセットアップ

SiteWise Edge Gateway のセットアップを行います。主な手順は次のとおりです。

  1. ゲートウェイ構成: Advanced setup を選択します。

ステップ 1

  1. データ処理パック: 必要ありません。

ステップ 2

  1. パブリッシャー構成: スキップします。

ステップ 3

  1. データソースの追加: OPC UA サーバーの ローカルエンドポイント を指定し、Greengrass ストリーム名 (例: SiteWise_Stream_Kinesis) を設定します。

ステップ 4

ステップ 4

ステップ 4

  1. レビュー: 設定を確認し、作成 ボタンを押します。

ステップ 5

Kinesis Data Stream の作成

Kinesis Data Stream を作成し、OPC UA データの宛先として使用します。

Kinesis Data Stream 設定

Kinesis Data Firehose の設定

ストリーム配信データを永続化するため、以下を使用して Kinesis Data Firehose を設定します。

  • ソース: 上記で作成した Kinesis Data Stream
  • 宛先: S3 バケット。NDJSON 形式での出力に 動的パーティショニング 機能を使用します。

NDJSON の取り扱いについて詳しくは このブログ記事 をご参照ください。

Kinesis Firehose 設定

Greengrass コンポーネントの作成

Greengrass Stream から Kinesis Data Streams にデータをストリーム配信するには、カスタム Greengrass コンポーネント を作成する必要があります。公式ドキュメント をご参照ください。

ディレクトリ構造

コンポーネントのファイルを次の構造で整理します。

/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip

requirements.txt

requirements.txt ファイルを作成し、以下のコマンドを使用して必要な依存関係をインストールします。

pip install -r requirements.txt

stream-manager ライブラリは、Greengrass Stream Manager と連携するために不可欠です。

依存関係:

cbor2~=5.4.2
stream-manager==1.1.1

Python スクリプト

以下のコードで kinesis_data_stream.py を作成してください。このスクリプトは、Greengrass Stream Manager SDK を活用して、データを Kinesis Data Stream にストリーム配信します。

"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""

import argparse
import asyncio
import logging
import time

from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def main(
        stream_name: str,
        kinesis_stream_name: str,
        batch_size: int = None
):
    try:
        # Create a client for the StreamManager
        client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(
                identifier="KinesisExport" + stream_name,
                kinesis_stream_name=kinesis_stream_name,
                batch_size=batch_size,
            )]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )

        while True:
            time.sleep(1)

    except asyncio.TimeoutError:
        logger.exception("Timed out while executing")
    except Exception:
        logger.exception("Exception while running")
    finally:
        # Always close the client to avoid resource leaks
        if client:
            client.close()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
    parser.add_argument('--kinesis-stream', required=True)
    parser.add_argument('--batch-size', required=False, type=int, default=500)
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()
    logger.info(f'args: {args.__dict__}')
    main(args.greengrass_stream, args.kinesis_stream, args.batch_size)

注意点

  • stream_manager_sdk.zip ファイルには、Greengrass コンポーネントに必要な SDK を含める必要があります。
  • recipe.yaml ファイルを更新し、コンポーネントの設定を含めてください(この投稿には含まれていませんが、デプロイに必須です)。

詳細およびサンプルコードについては、Greengrass SDK ドキュメント をご参照ください。

コンポーネントのレシピ (recipe.yaml)

以下の内容で recipe.yaml ファイルを作成します。この例では、コンポーネント名を jp.co.xyz.StreamManagerKinesis としています。

# Replace $ArtifactsS3Bucket with your value to complete component registration.

RecipeFormatVersion: 2020-01-25

ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: '^2.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    GreengrassStream: SiteWise_Stream_Kinesis
    KinesisStream: ''
    BatchSize: 100 # minimum 1, maximum 500

Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
        python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
          --greengrass-stream {configuration:/GreengrassStream} \
          --kinesis-stream {configuration:/KinesisStream} \
          --batch-size {configuration:/BatchSize}
    Artifacts:
      - URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
        Unarchive: ZIP

このレシピの ComponentConfiguration には、以下の設定可能なパラメータが含まれています。

NameDescription
GreengrassStreamGreengrass ストリームの名前
KinesisStreamOPC UA データを受信する Kinesis Data Stream の名前
BatchSizeデータ転送のバッチサイズ (最小値: 1, 最大値: 500)

コンポーネントアーティファクトのアップロード

コンポーネントファイルをアーカイブし、以下のコマンドを使用して S3 バケットにアップロードします。

S3_BUCKET=<YOUR_BUCKET_NAME>
VERSION=1.0.0

zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip

コンポーネント登録

コンポーネントを登録するには、以下の手順を実行してください。

  1. 作成した recipe.yaml ファイルの内容をコピーします。
  2. $ArtifactsS3Bucket をコンポーネントアーティファクトが存在する S3 バケット名に置き換えます。

コンポーネント登録

コンポーネントのデプロイ

以下の手順に従ってコンポーネントをデプロイします。

  1. デプロイ をクリックします。

デプロイ

  1. コンポーネントの構成 をクリックします。

コンポーネントの構成

  1. 必要な詳細を入力してコンポーネントの構成を更新します。KinesisStream パラメータには、先ほど作成した Kinesis Data Stream の名前を設定します。

構成の更新

  1. 構成を確認し、コンポーネントをデプロイします。

コンポーネントのデプロイ

デプロイが完了すると、コンポーネントは Greengrass Stream から指定された Kinesis Data Stream へデータをストリーム配信します。

テスト

セットアップを検証するには、以下のコマンドを使用して S3 バケット内のオブジェクトを確認します。

aws s3 cp s3://<YOUR_BUCKET_NAME>/... ./

取得したデータは、以下の例のような形式になっているはずです。

{
  "propertyAlias": "/MyObject/MyVariable",
  "propertyValues": [
    {
      "value": {
        "doubleValue": 7.699999999999997
      },
      "timestamp": {
        "timeInSeconds": 1661581962,
        "offsetInNanos": 9000000
      },
      "quality": "GOOD"
    }
  ]
}

このようなデータが表示されている場合、システムは OPC UA データを Kinesis Data Stream に正常にストリーム配信し、S3 に永続化できています。

まとめ

この投稿では、EC2 ベースの OPC UA サーバーと AWS SiteWise Edge Gateway を使用して、OPC UA データを Kinesis Data Streams にストリーム配信する手順を説明しました。Greengrass V2KinesisSiteWise などの AWS サービスを活用することで、産業データをスケーラブルなクラウドインフラへ効率的に統合し、永続化することが可能です。

Happy Coding! 🚀

岩佐 孝浩

岩佐 孝浩

Software Developer at KAKEHASHI Inc.
AWS を活用したクラウドネイティブ・アプリケーションの要件定義・設計・開発に従事。 株式会社カケハシで、処方箋データ収集の新たな基盤の構築に携わっています。 Japan AWS Top Engineers 2020-2023