Commit 0f4e1289 authored by Olivier R-D's avatar Olivier R-D

DataChange subscription works with freeopcua server

parent f89718c7
......@@ -79,6 +79,7 @@ class BinaryClient(object):
symhdr = self._create_sym_algo_header()
seqhdr = self._create_sequence_header()
rcall = RequestCallback()
rcall.callback = callback
self._callbackmap[seqhdr.RequestId] = rcall
self._write_socket(hdr, symhdr, seqhdr, request)
if not callback:
......@@ -101,7 +102,7 @@ class BinaryClient(object):
def _receive_header(self):
self.logger.debug("Waiting for header")
header = ua.Header.from_stream(self._socket)
self.logger.debug("received header: %s", header)
self.logger.info("received header: %s", header)
return header
def _receive_body(self, size):
......@@ -338,14 +339,22 @@ class BinaryClient(object):
del(self._publishcallbacks[sid])
return response.Results
def publish(self, request):
def publish(self, acks=None):
self.logger.info("publish")
if acks is None:
acks = []
request = ua.PublishRequest()
request.SubscriptionAcknowledgements = acks
self._send_request(request, self._call_publish_callback)
def _call_publish_callback(self, rcall):
self.logger.debug("call_publish_callback")
self.logger.info("call_publish_callback")
response = ua.PublishResponse.from_binary(rcall.data)
self._publishcallbacks[response.SubscriptionId].callback(response.Results)
try:
self._publishcallbacks[response.Parameters.SubscriptionId](response.Parameters)
except Exception as ex: #we call client code, catch everything!
self.logger.warn("exception while calling user callback:", ex)
def create_monitored_items(self, params):
self.logger.info("subscribe_data_change")
......
"""
high level interface to subscriptions
"""
import io
import logging
import opcua.uaprotocol as ua
class SubscriptionItemData():
def __init__(self):
self.node = None
self.client_handle = None
self.server_handle = None
self.attribute = None
class Subscription(object):
def __init__(self, server, params, handler):
self.logger = logging.getLogger(self.__class__.__name__)
self.server = server
self._client_handle = 200
self._handler = handler
self.parameters = params #move to data class
response = self.server.create_subscription(params, handler)
self._monitoreditems_map = {}
response = self.server.create_subscription(params, self.publish_callback)
self.subscription_id = response.SubscriptionId #move to data class
self.server.publish(ua.PublishRequest())
self.server.publish(ua.PublishRequest())
self.server.publish()
self.server.publish()
def delete(self):
results = self.server.delete_subscriptions([self.subscription_id])
......@@ -22,6 +33,37 @@ class Subscription(object):
def publish_callback(self, publishresult):
self.logger.info("Publish callback called with result: %s", publishresult)
for notif in publishresult.NotificationMessage.NotificationData:
if notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.DataChangeNotification_Encoding_DefaultBinary):
datachange = ua.DataChangeNotification.from_binary(io.BytesIO(notif.to_binary()))
self._call_datachange(datachange)
elif notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.EventNotificationList_Encoding_DefaultBinary):
eventlist = ua.EventNotificationList.from_binary(io.BytesIO(notif.to_binary()))
self._call_event(eventlist)
elif notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.StatusChangeNotification_Encoding_DefaultBinary):
statuschange = ua.StatusChangeNotification.from_binary(io.BytesIO(notif.to_binary()))
self._call_status(statuschange)
else:
self.logger.warn("Notification type not supported yet for notification %s", notif)
ack = ua.SubscriptionAcknowledgement()
ack.SubscriptionId = self.subscription_id
ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
self.server.publish([ack])
def _call_datachange(self, datachange):
for item in datachange.MonitoredItems:
data = self._monitoreditems_map[item.ClientHandle]
self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
def _call_event(self, eventlist):
print(eventlist)
self.logger.warn("Not implemented")
def _call_status(self, status):
print(status)
self.logger.warn("Not implemented")
def subscribe_data_change(self, node, attr=ua.AttributeIds.Value):
rv = ua.ReadValueId()
......@@ -44,6 +86,13 @@ class Subscription(object):
results = self.server.create_monitored_items(params)
result = results[0]
result.StatusCode.check()
data = SubscriptionItemData()
data.client_handle = mparams.ClientHandle
data.node = node
data.attribute = attr
data.server_handle = result.MonitoredItemId
self._monitoreditems_map[mparams.ClientHandle] = data
return result.MonitoredItemId
......
......@@ -8324,12 +8324,10 @@ class PublishRequest(object):
__repr__ = __str__
class PublishResponse(object):
class PublishResult(object):
'''
'''
def __init__(self):
self.TypeId = FourByteNodeId(ObjectIds.PublishResponse_Encoding_DefaultBinary)
self.ResponseHeader = ResponseHeader()
self.SubscriptionId = 0
self.AvailableSequenceNumbers = []
self.MoreNotifications = True
......@@ -8339,8 +8337,6 @@ class PublishResponse(object):
def to_binary(self):
packet = []
packet.append(self.TypeId.to_binary())
packet.append(self.ResponseHeader.to_binary())
packet.append(pack_uatype('UInt32', self.SubscriptionId))
packet.append(struct.pack('<i', len(self.AvailableSequenceNumbers)))
for fieldname in self.AvailableSequenceNumbers:
......@@ -8357,9 +8353,7 @@ class PublishResponse(object):
@staticmethod
def from_binary(data):
obj = PublishResponse()
obj.TypeId = NodeId.from_binary(data)
obj.ResponseHeader = ResponseHeader.from_binary(data)
obj = PublishResult()
obj.SubscriptionId = unpack_uatype('UInt32', data)
obj.AvailableSequenceNumbers = unpack_uatype_array('UInt32', data)
obj.MoreNotifications = unpack_uatype('Boolean', data)
......@@ -8375,9 +8369,7 @@ class PublishResponse(object):
return obj
def __str__(self):
return 'PublishResponse(' + 'TypeId:' + str(self.TypeId) + ', ' + \
'ResponseHeader:' + str(self.ResponseHeader) + ', ' + \
'SubscriptionId:' + str(self.SubscriptionId) + ', ' + \
return 'PublishResult(' + 'SubscriptionId:' + str(self.SubscriptionId) + ', ' + \
'AvailableSequenceNumbers:' + str(self.AvailableSequenceNumbers) + ', ' + \
'MoreNotifications:' + str(self.MoreNotifications) + ', ' + \
'NotificationMessage:' + str(self.NotificationMessage) + ', ' + \
......@@ -8386,6 +8378,36 @@ class PublishResponse(object):
__repr__ = __str__
class PublishResponse(object):
'''
'''
def __init__(self):
self.TypeId = FourByteNodeId(ObjectIds.PublishResponse_Encoding_DefaultBinary)
self.ResponseHeader = ResponseHeader()
self.Parameters = PublishResult()
def to_binary(self):
packet = []
packet.append(self.TypeId.to_binary())
packet.append(self.ResponseHeader.to_binary())
packet.append(self.Parameters.to_binary())
return b''.join(packet)
@staticmethod
def from_binary(data):
obj = PublishResponse()
obj.TypeId = NodeId.from_binary(data)
obj.ResponseHeader = ResponseHeader.from_binary(data)
obj.Parameters = PublishResult.from_binary(data)
return obj
def __str__(self):
return 'PublishResponse(' + 'TypeId:' + str(self.TypeId) + ', ' + \
'ResponseHeader:' + str(self.ResponseHeader) + ', ' + \
'Parameters:' + str(self.Parameters) + ')'
__repr__ = __str__
class RepublishParameters(object):
'''
'''
......
......@@ -188,6 +188,14 @@ class NodeId(object):
else:
raise Exception("NodeId: Could not guess type of NodeId, set NodeIdType")
def __eq__(self, node):
if self.Identifier == node.Identifier and self.NamespaceIndex == node.NamespaceIndex:
return True
return False
def __nq__(self, node):
return not self.__eq__(node)
@staticmethod
def from_string(string):
l = string.split(";")
......
......@@ -14,7 +14,7 @@ IgnoredEnums = []#["IdType", "NodeIdType"]
#we want to implement som struct by hand, to make better interface or simply because they are too complicated
IgnoredStructs = []#["NodeId", "ExpandedNodeId", "Variant", "QualifiedName", "DataValue", "LocalizedText"]#, "ExtensionObject"]
#by default we split requests and respons in header and parameters, but some are so simple we do not split them
NoSplitStruct = ["GetEndpointsResponse", "CloseSessionRequest", "AddNodesResponse", "BrowseResponse", "HistoryReadResponse", "HistoryUpdateResponse", "RegisterServerResponse", "CloseSecureChannelRequest", "CloseSecureChannelResponse", "CloseSessionRequest", "CloseSessionResponse", "UnregisterNodesResponse", "MonitoredItemModifyRequest", "MonitoredItemsCreateRequest", "ReadResponse", "WriteRequest", "WriteResponse", "TranslateBrowsePathsToNodeIdsRequest", "TranslateBrowsePathsToNodeIdsResponse", "DeleteSubscriptionsRequest", "DeleteSubscriptionsResponse", "PublishRequest", "PublishResponse", "CreateMonitoredItemsResponse"]
NoSplitStruct = ["GetEndpointsResponse", "CloseSessionRequest", "AddNodesResponse", "BrowseResponse", "HistoryReadResponse", "HistoryUpdateResponse", "RegisterServerResponse", "CloseSecureChannelRequest", "CloseSecureChannelResponse", "CloseSessionRequest", "CloseSessionResponse", "UnregisterNodesResponse", "MonitoredItemModifyRequest", "MonitoredItemsCreateRequest", "ReadResponse", "WriteRequest", "WriteResponse", "TranslateBrowsePathsToNodeIdsRequest", "TranslateBrowsePathsToNodeIdsResponse", "DeleteSubscriptionsRequest", "DeleteSubscriptionsResponse", "PublishRequest", "CreateMonitoredItemsResponse"]
OverrideTypes = {}#AttributeId": "AttributeID", "ResultMask": "BrowseResultMask", "NodeClassMask": "NodeClass", "AccessLevel": "VariableAccessLevel", "UserAccessLevel": "VariableAccessLevel", "NotificationData": "NotificationData"}
OverrideNames = {}#{"RequestHeader": "Header", "ResponseHeader": "Header", "StatusCode": "Status", "NodesToRead": "AttributesToRead"} # "MonitoringMode": "Mode",, "NotificationMessage": "Notification", "NodeIdType": "Type"}
......
......@@ -17,7 +17,7 @@ class SubHandler(object):
if __name__ == "__main__":
from IPython import embed
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.WARN)
client = Client("opc.tcp://localhost:4841/freeopcua/server/")
try:
client.connect()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment