Python で OPC UA サーバーの変数ノードを効率的に監視する方法

Python で OPC UA サーバーの変数ノードを効率的に監視する方法

岩佐 孝浩
岩佐 孝浩
4 min read
OPC-UA Python

はじめに

opcua-asyncio を使用すると、OPC UA サーバーの変数ノードのデータ変更をサブスクライブできます。この機能により、OPC UA クライアントが新しい値を連続的にポーリングする必要がなくなり、実装の効率性が向上します。

サンプルコードは、GitHub リポジトリからクローンできます。

必要条件

以下のコマンドで opcua-asyncio をインストールしてください。

pip install asyncua

OPC UA サーバーを起動する

以下のコードで server.py というファイルを作成します。このスクリプトは OPC UA テストサーバーを起動し、1 秒ごとに変数ノードにランダムな整数値を書き込みます。サーバーのエンドポイントは opc.tcp://localhost:4840 で、変数ノード名は MyObject/MyVariable です。

import asyncio
import random

from asyncua import Server

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


async def main() -> None:
    # サーバーを開始
    server = Server()
    await server.init()
    server.set_endpoint(ENDPOINT)
    idx = await server.register_namespace(NAMESPACE)
    await server.start()
    print(f'Server started: {server}')

    # ノードを作成
    myobj = await server.get_objects_node().add_object(idx, 'MyObject')
    myvar = await myobj.add_variable(idx, 'MyVariable', 1)
    await myvar.set_writable()

    # 毎秒新しい値を書き込み
    while True:
        await myvar.write_value(random.randint(1, 100))
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(main())

以下のコマンドでサーバーを起動します。

$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)

OPC UA クライアントでサブスクライブを実行する

別のファイル client.py を作成し、以下のコードを記述します。このクライアントスクリプトは、OPC UA サーバーのデータ変更をサブスクライブし、非同期で処理を行います。

import asyncio

from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


class MyHandler(SubHandler):
    def __init__(self):
        self._queue = asyncio.Queue()

    def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
        self._queue.put_nowait([node, value, data])
        print(f'Data change notification was received and queued.')

    async def process(self) -> None:
        try:
            while True:
                # キューからデータを取得
                [node, value, data] = self._queue.get_nowait()
                path = await node.get_path(as_string=True)

                # *** 処理コードを記述 ***

                print(f'New value {value} of "{path}" was processed.')

        except asyncio.QueueEmpty:
            pass


async def main() -> None:
    async with Client(url=ENDPOINT) as client:
        # 変数ノードを取得
        idx = await client.get_namespace_index(NAMESPACE)
        node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])

        # データ変更をサブスクライブ
        handler = MyHandler()
        subscription = await client.create_subscription(period=0, handler=handler)
        await subscription.subscribe_data_change(node)

        # 100ms ごとにデータ変更を処理
        while True:
            await handler.process()
            await asyncio.sleep(0.1)


if __name__ == '__main__':
    asyncio.run(main())

以下のコマンドでクライアントを実行します。

$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...

まとめ

この記事で紹介した実装により、publish/subscribe アーキテクチャを用いて OPC UA サーバーの変数ノードを効率的に監視できます。この方法では連続的なポーリングが不要となり、OPC UA クライアントのパフォーマンスが最適化されます。

Happy Coding! 🚀

岩佐 孝浩

岩佐 孝浩

Software Developer at KAKEHASHI Inc.
AWS を活用したクラウドネイティブ・アプリケーションの要件定義・設計・開発に従事。 株式会社カケハシで、処方箋データ収集の新たな基盤の構築に携わっています。 Japan AWS Top Engineers 2020-2023