Commit 9a97a01d authored by Michel's avatar Michel Committed by ORD

Issue #226 (#232)

* Issue #226

Add server Event for client Item subscription

* Add exemple for Callback
parent 64122868
import sys
sys.path.insert(0, "..")
import logging
from datetime import datetime
try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
shell = code.InteractiveConsole(vars)
shell.interact()
from opcua import ua, uamethod, Server
from opcua.common.callback import CallbackType
def create_monitored_items(event, dispatcher):
print("Monitored Item")
for idx in range(len(event.response_params)) :
if (event.response_params[idx].StatusCode.is_good()) :
nodeId = event.request_params.ItemsToCreate[idx].ItemToMonitor.NodeId
print("Node {} was created".format(nodeId))
def modify_monitored_items(event, dispatcher):
print('modify_monitored_items')
def delete_monitored_items(event, dispatcher):
print('delete_monitored_items')
if __name__ == "__main__":
# optional: setup logging
logging.basicConfig(level=logging.WARN)
#logger = logging.getLogger("opcua.address_space")
# logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.internal_server")
# logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.binary_server_asyncio")
# logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.uaprocessor")
# logger.setLevel(logging.DEBUG)
logger = logging.getLogger("opcua.subscription_service")
logger.setLevel(logging.DEBUG)
# now setup our server
server = Server()
#server.disable_clock()
#server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_server_name("FreeOpcUa Example Server")
# setup our own namespace
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
# get Objects node, this is where we should put our custom stuff
objects = server.get_objects_node()
# populating our address space
myfolder = objects.add_folder(idx, "myEmptyFolder")
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
myvar.set_writable() # Set MyVariable to be writable by clients
# starting!
server.start()
# Create Callback for item event
server.subscribe_server_callback(CallbackType.ItemSubscriptionCreated, create_monitored_items)
server.subscribe_server_callback(CallbackType.ItemSubscriptionModified, modify_monitored_items)
server.subscribe_server_callback(CallbackType.ItemSubscriptionDeleted, delete_monitored_items)
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
try:
# enable following if you want to subscribe to nodes on server side
embed()
finally:
server.stop()
"""
server side implementation of callback event
"""
from collections import OrderedDict
from enum import Enum
class CallbackType(Enum):
'''
The possible types of a Callback type.
:ivar Null:
:ivar MonitoredItem:
'''
Null = 0
ItemSubscriptionCreated = 1
ItemSubscriptionModified = 2
ItemSubscriptionDeleted= 3
class Callback(object):
def __init__(self):
self.__name = None
def setName(self, name):
self.__name = name
def getName(self):
return self.__name
class ServerItemCallback(Callback):
def __init__(self, request_params, response_params):
self.request_params = request_params
self.response_params = response_params
class CallbackSubscriberInterface(object):
def getSubscribedEvents(self):
raise NotImplementedError()
class CallbackDispatcher(object):
def __init__(self):
self._listeners = {}
def dispatch(self, eventName, event=None):
if event is None:
event = Callback()
elif not isinstance(event, Callback):
raise ValueError('Unexpected event type given')
event.setName(eventName)
if eventName not in self._listeners:
return event
for listener in self._listeners[eventName].values():
listener(event, self)
return event
def addListener(self, eventName, listener, priority=0):
if eventName not in self._listeners:
self._listeners[eventName] = {}
self._listeners[eventName][priority] = listener
self._listeners[eventName] = OrderedDict(sorted(self._listeners[eventName].items(), key=lambda item: item[0]))
def removeListener(self, eventName, listener=None):
if eventName not in self._listeners:
return
if not listener:
del self._listeners[eventName]
else:
for p, l in self._listeners[eventName].items():
if l is listener:
self._listeners[eventName].pop(p)
return
def addSubscriber(self, subscriber):
if not isinstance(subscriber, CallbackSubscriberInterface):
raise ValueError('Unexpected subscriber type given')
for eventName, params in subscriber.getSubscribedEvents().items():
if isinstance(params, str):
self.addListener(eventName, getattr(subscriber, params))
elif isinstance(params, list):
if not params:
raise ValueError('Invalid params "%r" for event "%s"' % (params, eventName))
if len(params) <= 2 and isinstance(params[0], str):
priority = params[1] if len(params) > 1 else 0
self.addListener(eventName, getattr(subscriber, params[0]), priority)
else:
for listener in params:
priority = listener[1] if len(listener) > 1 else 0
self.addListener(eventName, getattr(subscriber, listener[0]), priority)
else:
raise ValueError('Invalid params for event "%s"' % eventName)
"""
Usefull methods and classes not depending on opcua library
"""
import logging import logging
import os import os
from concurrent.futures import Future from concurrent.futures import Future
...@@ -204,3 +199,6 @@ class ThreadLoop(threading.Thread): ...@@ -204,3 +199,6 @@ class ThreadLoop(threading.Thread):
p = functools.partial(self._run_until_complete, future, coro) p = functools.partial(self._run_until_complete, future, coro)
self.loop.call_soon_threadsafe(p) self.loop.call_soon_threadsafe(p)
return future.result() return future.result()
...@@ -17,6 +17,7 @@ except ImportError: ...@@ -17,6 +17,7 @@ except ImportError:
from opcua import ua from opcua import ua
from opcua.common import utils from opcua.common import utils
from opcua.common.callback import CallbackType, ServerItemCallback, CallbackDispatcher
from opcua.common.node import Node from opcua.common.node import Node
from opcua.server.history import HistoryManager from opcua.server.history import HistoryManager
from opcua.server.address_space import AddressSpace from opcua.server.address_space import AddressSpace
...@@ -46,6 +47,9 @@ class InternalServer(object): ...@@ -46,6 +47,9 @@ class InternalServer(object):
def __init__(self, cacheFile = None): def __init__(self, cacheFile = None):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.server_callback_dispatcher = CallbackDispatcher()
self.endpoints = [] self.endpoints = []
self._channel_id_counter = 5 self._channel_id_counter = 5
self.allow_remote_admin = True self.allow_remote_admin = True
...@@ -83,6 +87,8 @@ class InternalServer(object): ...@@ -83,6 +87,8 @@ class InternalServer(object):
uries = ["http://opcfoundation.org/UA/"] uries = ["http://opcfoundation.org/UA/"]
ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray)) ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
ns_node.set_value(uries) ns_node.set_value(uries)
def load_address_space(self, path): def load_address_space(self, path):
self.aspace.load(path) self.aspace.load(path)
...@@ -195,6 +201,18 @@ class InternalServer(object): ...@@ -195,6 +201,18 @@ class InternalServer(object):
""" """
source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead) source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
self.history_manager.dehistorize(source) self.history_manager.dehistorize(source)
def subscribe_server_callback(self, event, handle):
"""
Create a subscription from event to handle
"""
self.server_callback_dispatcher.addListener(event, handle)
def unsubscribe_server_callback(self, event, handle):
"""
Remove a subscription from event to handle
"""
self.server_callback_dispatcher.removeListener(event, handle)
class InternalSession(object): class InternalSession(object):
...@@ -307,12 +325,16 @@ class InternalSession(object): ...@@ -307,12 +325,16 @@ class InternalSession(object):
self.subscriptions.append(result.SubscriptionId) self.subscriptions.append(result.SubscriptionId)
return result return result
def create_monitored_items(self, params): def create_monitored_items(self, params):
return self.subscription_service.create_monitored_items(params) subscription_result = self.subscription_service.create_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
return subscription_result
def modify_monitored_items(self, params): def modify_monitored_items(self, params):
return self.subscription_service.modify_monitored_items(params) subscription_result = self.subscription_service.modify_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionModified, ServerItemCallback(params, subscription_result))
return subscription_result
def republish(self, params): def republish(self, params):
return self.subscription_service.republish(params) return self.subscription_service.republish(params)
...@@ -324,7 +346,9 @@ class InternalSession(object): ...@@ -324,7 +346,9 @@ class InternalSession(object):
return self.subscription_service.delete_subscriptions(ids) return self.subscription_service.delete_subscriptions(ids)
def delete_monitored_items(self, params): def delete_monitored_items(self, params):
return self.subscription_service.delete_monitored_items(params) subscription_result = self.subscription_service.delete_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
return subscription_result
def publish(self, acks=None): def publish(self, acks=None):
if acks is None: if acks is None:
......
...@@ -377,3 +377,10 @@ class Server(object): ...@@ -377,3 +377,10 @@ class Server(object):
def dehistorize_node(self, node): def dehistorize_node(self, node):
self.iserver.disable_history(node) self.iserver.disable_history(node)
def subscribe_server_callback(self, event, handle):
self.iserver.subscribe_server_callback(event, handle)
def unsubscribe_server_callback(self, event, handle):
self.iserver.unsubscribe_server_callback(event, handle)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment