How to Efficiently Monitor OPC UA Server Variable Nodes with Python

How to Efficiently Monitor OPC UA Server Variable Nodes with Python

Takahiro Iwasa
Takahiro Iwasa
3 min read
OPC-UA Python

Introduction

You can subscribe to data changes on variable nodes in OPC UA servers using opcua-asyncio. With this feature, you can avoid OPC UA clients from continuously polling for new values, enhancing the efficiency of your implementation.

The example code can be cloned from my GitHub repository.

Requirements

Install opcua-asyncio with the following command:

pip install asyncua

Starting the OPC UA Server

Create a file named server.py with the code below. This script starts an OPC UA testing server that writes random integer values to a variable node every second. The server endpoint is opc.tcp://localhost:4840, and the variable node name is MyObject/MyVariable.

import asyncio
import random

from asyncua import Server

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


async def main() -> None:
    # Start a server.
    server = Server()
    await server.init()
    server.set_endpoint(ENDPOINT)
    idx = await server.register_namespace(NAMESPACE)
    await server.start()
    print(f'Server started: {server}')

    # Create a node.
    myobj = await server.get_objects_node().add_object(idx, 'MyObject')
    myvar = await myobj.add_variable(idx, 'MyVariable', 1)
    await myvar.set_writable()

    # Write a new value every second.
    while True:
        await myvar.write_value(random.randint(1, 100))
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(main())

Run the server with:

$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)

Running the OPC UA Client for Subscription

Create another file named client.py with the following code. This client script subscribes to data changes from the OPC UA server and processes them asynchronously.

import asyncio

from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


class MyHandler(SubHandler):
    def __init__(self):
        self._queue = asyncio.Queue()

    def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
        self._queue.put_nowait([node, value, data])
        print(f'Data change notification was received and queued.')

    async def process(self) -> None:
        try:
            while True:
                # Get data from the queue.
                [node, value, data] = self._queue.get_nowait()
                path = await node.get_path(as_string=True)

                # *** Write your processing code ***

                print(f'New value {value} of "{path}" was processed.')

        except asyncio.QueueEmpty:
            pass


async def main() -> None:
    async with Client(url=ENDPOINT) as client:
        # Get a variable node.
        idx = await client.get_namespace_index(NAMESPACE)
        node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])

        # Subscribe to data change.
        handler = MyHandler()
        subscription = await client.create_subscription(period=0, handler=handler)
        await subscription.subscribe_data_change(node)

        # Process data changes every 100ms.
        while True:
            await handler.process()
            await asyncio.sleep(0.1)


if __name__ == '__main__':
    asyncio.run(main())

Run the client with:

$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...

Conclusion

By following the implementation detailed in this post, you can efficiently monitor OPC UA server variable nodes using a publish/subscribe architecture. This approach eliminates the need for continuous polling, optimizing the performance of OPC UA clients.

Happy Coding! 🚀

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.