Commit 39bf7855 authored by Olivier R-D's avatar Olivier R-D

server: subscription implementation part1

parent e73b22d9
......@@ -5,6 +5,18 @@ from opcua.server import Server
from IPython import embed
class SubHandler(object):
"""
Client to subscription. It will receive events from server
"""
def data_change(self, handle, node, val, attr):
print("Python: New data change event", handle, node, val, attr)
def event(self, handle, event):
print("Python: New event", handle, event)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
......@@ -14,9 +26,6 @@ if __name__ == "__main__":
server = Server()
server.set_endpoint("opc.tcp://localhost:4841/freeopcua/server/")
server.set_server_name("FreeOpcUa Example Server")
server.start()
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
try:
root = server.get_root_node()
objects = server.get_objects_node()
myfolder = objects.add_folder(2, "myfolder")
......@@ -24,6 +33,13 @@ if __name__ == "__main__":
myobj = myfolder.add_object(2, "myobj")
myarrayvar = myobj.add_variable(2, "myarrayvar", [6.7, 7.9])
myprop = myobj.add_property(2, "myproperty", "I am a property")
server.start()
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
try:
handler = SubHandler()
sub = server.create_subscription(500, handler)
sub.subscribe_data_change(myvar)
embed()
finally:
server.stop()
......
......@@ -3,11 +3,16 @@ import logging
from opcua import ua
class DataChangeCallbackData(object):
def __init__(self):
self.callback = None
self.client_handle = None
class AttributeValue(object):
def __init__(self, value):
self.value = value
self.value_callback = None
self.datachange_callback = None
self.datachange_callbacks = {}
def __str__(self):
return "AttributeValue({})".format(self.value)
......@@ -28,6 +33,8 @@ class AddressSpace(object):
self.logger = logging.getLogger(__name__)
self._nodes = {}
self._lock = RLock() #FIXME: should use multiple reader, one writter pattern
self._datachange_callback_counter = 200
self._handle_to_attribute_map = {}
def add_nodes(self, addnodeitems):
......@@ -144,10 +151,39 @@ class AddressSpace(object):
return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
attval = node.attributes[attr]
attval.value = value
if attval.datachange_callback:
if attval.value_callback:
return attval.value_callback(nodeid, attr, value)
for k, v in attval.datachange_callbacks.items():
try:
v(k, value)
except Exception as ex:
self.logger.warn("Error calling datachange callback %s, %s, %s", k, v, ex)
return ua.StatusCode()
def add_datachange_callback(self, nodeid, attr, callback):
with self._lock:
self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
if not nodeid in self._nodes:
return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
node = self._nodes[nodeid]
if not attr in node.attributes:
return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
attval = node.attributes[attr]
self._datachange_callback_counter += 1
handle = self._datachange_callback_counter
#cb = DataChangeCallbackData()
#cb.callback = callback
#cb.client_handle = handle
attval.datachange_callbacks[handle] = callback
self._handle_to_attribute_map[handle] = (nodeid, attr)
return ua.StatusCode(), handle
def delete_datachange_callback(self, handle):
nodeid, attr = self._handle_to_attribute_map.pop(handle)
self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
def _add_nodeattributes(self, item, nodedata):
item = ua.downcast_extobject(item)
if item.SpecifiedAttributes & ua.NodeAttributesMask.AccessLevel:
......
......@@ -245,9 +245,6 @@ class UAProcessor(object):
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
else:
self.logger.warning("Uknown message received %s", typeid)
sf = ua.ServiceFault()
......
"""
Internal server to be used on server side
"""
import asyncio
from datetime import datetime
import uuid
import logging
from threading import RLock, Timer
from threading import RLock, Timer, Thread, Condition
from opcua import ua
from opcua import utils
......@@ -32,6 +33,152 @@ class Session(object):
def __str__(self):
return "InternalSession(id:{}, auth_token:{})".format(self.session_id, self.authentication_token)
class SubscriptionManager(Thread):
def __init__(self, aspace):
Thread.__init__(self)
self.logger = logging.getLogger(__name__)
self.loop = None
self.aspace = aspace
self.subscriptions = {}
self._sub_id_counter = 77
self._cond = Condition()
def start(self):
print("start internal")
Thread.start(self)
with self._cond:
self._cond.wait()
print("start internal finished")
def run(self):
self.logger.warn("Starting subscription thread")
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
with self._cond:
self._cond.notify_all()
self.loop.run_forever()
print("LOOP", self.loop)
def add_task(self, coroutine):
return self.loop.create_task(coroutine)
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def create_subscription(self, params, callback):
result = ua.CreateSubscriptionResult()
self._sub_id_counter += 1
result.SubscriptionId = self._sub_id_counter
result.RevisedPublishingInterval = params.RequestedPublishingInterval
result.RevisedLifetimeCount = params.RequestedLifetimeCount
result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
sub = Subscription(self, result, self.aspace, callback)
sub.start()
self.subscriptions[result.SubscriptionId] = sub
return result
def delete_subscriptions(self, ids):
res = []
for i in ids:
sub = self.subscriptions.pop(i)
sub.stop()
res.append(ua.StatusCode())
return res
def publish(self, acks):
self.logger.warn("publish request with acks %s", acks)
def create_monitored_items(self, params):
if not params.SubscriptionId in self.subscriptions:
res = []
for _ in params.ItemsToCreate:
response = ua.MonitoredItemCreateResult()
response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
res.append(response)
return res
return self.subscriptions[params.SubscriptionId].create_monitored_items(params)
class MonitoredItemData(object):
def __init__(self):
self.client_handle = None
self.callback_handle = None
self.monitored_item_id = None
self.parameters = None
self.mode = None
class Subscription(object):
def __init__(self, manager, data, addressspace, callback):
self.logger = logging.getLogger(__name__)
self.aspace = addressspace
self.manager = manager
self.data = data
self.callback = callback
self.task = None
self._monitored_item_counter = 111
self._monitored_events = {}
self._monitored_datachange = {}
def start(self):
self.task = self.manager.add_task(self.loop())
def stop(self):
self.task.cancel()
@asyncio.coroutine
def loop(self):
self.logger.debug("starting subscription %s", self.data.SubscriptionId)
while True:
self.publish_results()
yield from asyncio.sleep(1)
def publish_results(self):
print("looking for results and publishing")
def create_monitored_items(self, params):
results = []
for item in params.ItemsToCreate:
results.append(self._create_monitored_item(item))
return results
def _create_monitored_item(self, params):
result = ua.MonitoredItemCreateResult()
result.RevisedSamplingInterval = self.data.RevisedPublishingInterval
result.RevisedQueueSize = params.RequestedParameters.QueueSize #FIXME check and use value
result.FilterResult = params.RequestedParameters.Filter
self._monitored_item_counter += 1
result.MonitoredItemId = self._monitored_item_counter
if params.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
self.logger.info("request to subscribe to events")
self._monitored_events[params.ItemToMonitor.NodeId] = result.MonitoredItemId
else:
self.logger.info("request to subscribe to datachange")
result.StatusCode, handle = self.aspace.add_datachange_callback(params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
mdata = MonitoredItemData()
mdata.parameters = result
mdata.Mode = params.MonitoringMode
mdata.client_handle = params.RequestedParameters.ClientHandle
mdata.callback_handle = handle
mdata.monitored_item_id = result.MonitoredItemId
self._monitored_datachange[result.MonitoredItemId] = mdata
#FIXME force event generation
return result
def datachange_callback(self, handle, value):
self.logger.warn("subscription %s: datachange callback called with %s, %s", self, handle, value)
class InternalServer(object):
def __init__(self):
......@@ -53,14 +200,18 @@ class InternalServer(object):
#set some node values expected by some clients
self.current_time_node = Node(self, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
self._stopev = False
self.submanager = SubscriptionManager(self.aspace)
self._timer = None
def start(self):
Node(self, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0)
Node(self, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.now())
# set time every seconds, maybe we should disable it for performance reason??
self._set_current_time()
self.submanager.start()
def stop(self):
self.submanager.stop()
self._stopev = True
def _set_current_time(self):
......@@ -149,4 +300,15 @@ class InternalServer(object):
def add_nodes(self, params):
return self.aspace.add_nodes(params)
def create_subscription(self, params, callback):
return self.submanager.create_subscription(params, callback)
def create_monitored_items(self, params):
return self.submanager.create_monitored_items(params)
def publish(self, acks=None):
if acks is None:
acks = []
return self.submanager.publish(acks)
......@@ -57,6 +57,7 @@ class Server(object):
self.name = name
def start(self):
print("START SERVER")
self.iserver.start()
self._set_endpoints()
self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
......
......@@ -89,6 +89,7 @@ class Subscription(object):
results = self.server.create_monitored_items(params)
result = results[0]
print(result)
result.StatusCode.check()
data = SubscriptionItemData()
......
......@@ -3,7 +3,7 @@ from distutils.command.install_data import install_data
setup (name = "freeopcua",
version = "0.6.1",
version = "0.6.2",
description = "Pure Python OPC-UA client and server library",
author = "Olivier Roulet-Dubonnet",
author_email = "olivier.roulet@gmail.com",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment