How to Efficiently Monitor OPC UA Server Variable Nodes with Python
Introduction
You can subscribe to data changes on variable nodes in OPC UA servers using opcua-asyncio. With this feature, you can avoid OPC UA clients from continuously polling for new values, enhancing the efficiency of your implementation.
The example code can be cloned from my GitHub repository.
Requirements
Install opcua-asyncio with the following command:
pip install asyncua
Starting the OPC UA Server
Create a file named server.py
with the code below. This script starts an OPC UA testing server that writes random integer values to a variable node every second. The server endpoint is opc.tcp://localhost:4840
, and the variable node name is MyObject/MyVariable
.
import asyncio
import random
from asyncua import Server
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
async def main() -> None:
# Start a server.
server = Server()
await server.init()
server.set_endpoint(ENDPOINT)
idx = await server.register_namespace(NAMESPACE)
await server.start()
print(f'Server started: {server}')
# Create a node.
myobj = await server.get_objects_node().add_object(idx, 'MyObject')
myvar = await myobj.add_variable(idx, 'MyVariable', 1)
await myvar.set_writable()
# Write a new value every second.
while True:
await myvar.write_value(random.randint(1, 100))
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
Run the server with:
$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)
Running the OPC UA Client for Subscription
Create another file named client.py
with the following code. This client script subscribes to data changes from the OPC UA server and processes them asynchronously.
import asyncio
from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
class MyHandler(SubHandler):
def __init__(self):
self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
self._queue.put_nowait([node, value, data])
print(f'Data change notification was received and queued.')
async def process(self) -> None:
try:
while True:
# Get data from the queue.
[node, value, data] = self._queue.get_nowait()
path = await node.get_path(as_string=True)
# *** Write your processing code ***
print(f'New value {value} of "{path}" was processed.')
except asyncio.QueueEmpty:
pass
async def main() -> None:
async with Client(url=ENDPOINT) as client:
# Get a variable node.
idx = await client.get_namespace_index(NAMESPACE)
node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])
# Subscribe to data change.
handler = MyHandler()
subscription = await client.create_subscription(period=0, handler=handler)
await subscription.subscribe_data_change(node)
# Process data changes every 100ms.
while True:
await handler.process()
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())
Run the client with:
$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...
Conclusion
By following the implementation detailed in this post, you can efficiently monitor OPC UA server variable nodes using a publish/subscribe architecture. This approach eliminates the need for continuous polling, optimizing the performance of OPC UA clients.
Happy Coding! 🚀