Python で OPC UA サーバーの変数ノードを効率的に監視する方法
はじめに
opcua-asyncio を使用すると、OPC UA サーバーの変数ノードのデータ変更をサブスクライブできます。この機能により、OPC UA クライアントが新しい値を連続的にポーリングする必要がなくなり、実装の効率性が向上します。
サンプルコードは、GitHub リポジトリからクローンできます。
必要条件
以下のコマンドで opcua-asyncio をインストールしてください。
pip install asyncua
OPC UA サーバーを起動する
以下のコードで server.py
というファイルを作成します。このスクリプトは OPC UA テストサーバーを起動し、1 秒ごとに変数ノードにランダムな整数値を書き込みます。サーバーのエンドポイントは opc.tcp://localhost:4840
で、変数ノード名は MyObject/MyVariable
です。
import asyncio
import random
from asyncua import Server
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
async def main() -> None:
# サーバーを開始
server = Server()
await server.init()
server.set_endpoint(ENDPOINT)
idx = await server.register_namespace(NAMESPACE)
await server.start()
print(f'Server started: {server}')
# ノードを作成
myobj = await server.get_objects_node().add_object(idx, 'MyObject')
myvar = await myobj.add_variable(idx, 'MyVariable', 1)
await myvar.set_writable()
# 毎秒新しい値を書き込み
while True:
await myvar.write_value(random.randint(1, 100))
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
以下のコマンドでサーバーを起動します。
$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)
OPC UA クライアントでサブスクライブを実行する
別のファイル client.py
を作成し、以下のコードを記述します。このクライアントスクリプトは、OPC UA サーバーのデータ変更をサブスクライブし、非同期で処理を行います。
import asyncio
from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
class MyHandler(SubHandler):
def __init__(self):
self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
self._queue.put_nowait([node, value, data])
print(f'Data change notification was received and queued.')
async def process(self) -> None:
try:
while True:
# キューからデータを取得
[node, value, data] = self._queue.get_nowait()
path = await node.get_path(as_string=True)
# *** 処理コードを記述 ***
print(f'New value {value} of "{path}" was processed.')
except asyncio.QueueEmpty:
pass
async def main() -> None:
async with Client(url=ENDPOINT) as client:
# 変数ノードを取得
idx = await client.get_namespace_index(NAMESPACE)
node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])
# データ変更をサブスクライブ
handler = MyHandler()
subscription = await client.create_subscription(period=0, handler=handler)
await subscription.subscribe_data_change(node)
# 100ms ごとにデータ変更を処理
while True:
await handler.process()
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())
以下のコマンドでクライアントを実行します。
$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...
まとめ
この記事で紹介した実装により、publish/subscribe アーキテクチャを用いて OPC UA サーバーの変数ノードを効率的に監視できます。この方法では連続的なポーリングが不要となり、OPC UA クライアントのパフォーマンスが最適化されます。
Happy Coding! 🚀