Commit 6ee90273 authored by ORD's avatar ORD

Merge pull request #115 from alkor/extension-objects

Support custom ExtensionObjects
parents 6c7b37f3 cd4d8265
...@@ -14344,7 +14344,11 @@ def extensionobject_from_binary(data): ...@@ -14344,7 +14344,11 @@ def extensionobject_from_binary(data):
if TypeId.Identifier == 0: if TypeId.Identifier == 0:
return None return None
elif TypeId.Identifier not in ExtensionClasses: elif TypeId.Identifier not in ExtensionClasses:
raise UAError("unknown ExtensionObject Type: {}".format(TypeId)) e = ExtensionObject()
e.TypeId = TypeId
e.Encoding = Encoding
e.Body = body.read(len(body))
return e
klass = ExtensionClasses[TypeId.Identifier] klass = ExtensionClasses[TypeId.Identifier]
if body is None: if body is None:
raise UAError("parsing ExtensionObject {} without data".format(klass.__name__)) raise UAError("parsing ExtensionObject {} without data".format(klass.__name__))
...@@ -14357,6 +14361,8 @@ def extensionobject_to_binary(obj): ...@@ -14357,6 +14361,8 @@ def extensionobject_to_binary(obj):
If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body). If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
Returns a binary string Returns a binary string
""" """
if isinstance(obj, ExtensionObject):
return obj.to_binary()
TypeId = NodeId() TypeId = NodeId()
Encoding = 0 Encoding = 0
Body = None Body = None
......
...@@ -647,6 +647,60 @@ class LocalizedText(FrozenClass): ...@@ -647,6 +647,60 @@ class LocalizedText(FrozenClass):
return False return False
class ExtensionObject(FrozenClass):
'''
Any UA object packed as an ExtensionObject
:ivar TypeId:
:vartype TypeId: NodeId
:ivar Body:
:vartype Body: bytes
'''
def __init__(self):
self.TypeId = NodeId()
self.Encoding = 0
self.Body = b''
self._freeze = True
def to_binary(self):
packet = []
if self.Body:
self.Encoding |= (1 << 0)
packet.append(self.TypeId.to_binary())
packet.append(pack_uatype('UInt8', self.Encoding))
if self.Body:
packet.append(pack_uatype('ByteString', self.Body))
return b''.join(packet)
@staticmethod
def from_binary(data):
obj = ExtensionObject()
obj.TypeId = NodeId.from_binary(data)
obj.Encoding = unpack_uatype('UInt8', data)
if obj.Encoding & (1 << 0):
obj.Body = unpack_uatype('ByteString', data)
return obj
@staticmethod
def from_object(obj):
ext = ExtensionObject()
oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
ext.TypeId = FourByteNodeId(oid)
ext.Body = obj.to_binary()
return ext
def __str__(self):
return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
__repr__ = __str__
class VariantType(Enum): class VariantType(Enum):
''' '''
......
...@@ -17,7 +17,11 @@ def extensionobject_from_binary(data): ...@@ -17,7 +17,11 @@ def extensionobject_from_binary(data):
if TypeId.Identifier == 0: if TypeId.Identifier == 0:
return None return None
elif TypeId.Identifier not in ExtensionClasses: elif TypeId.Identifier not in ExtensionClasses:
raise UAError("unknown ExtensionObject Type: {}".format(TypeId)) e = ExtensionObject()
e.TypeId = TypeId
e.Encoding = Encoding
e.Body = body.read(len(body))
return e
klass = ExtensionClasses[TypeId.Identifier] klass = ExtensionClasses[TypeId.Identifier]
if body is None: if body is None:
raise UAError("parsing ExtensionObject {} without data".format(klass.__name__)) raise UAError("parsing ExtensionObject {} without data".format(klass.__name__))
...@@ -30,6 +34,8 @@ def extensionobject_to_binary(obj): ...@@ -30,6 +34,8 @@ def extensionobject_to_binary(obj):
If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body). If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
Returns a binary string Returns a binary string
""" """
if isinstance(obj, ExtensionObject):
return obj.to_binary()
TypeId = NodeId() TypeId = NodeId()
Encoding = 0 Encoding = 0
Body = None Body = None
......
...@@ -205,6 +205,16 @@ class Unit(unittest.TestCase): ...@@ -205,6 +205,16 @@ class Unit(unittest.TestCase):
self.assertEqual(type(v1), type(v2)) self.assertEqual(type(v1), type(v2))
self.assertEqual(v1.VariantType, v2.VariantType) self.assertEqual(v1.VariantType, v2.VariantType)
def test_unknown_extension_object(self):
obj = ua.ExtensionObject()
obj.Body = b'example of data in custom format'
obj.TypeId = ua.NodeId.from_string('ns=3;i=42')
data = ua.utils.Buffer(extensionobject_to_binary(obj))
obj2 = ua.extensionobject_from_binary(data)
self.assertEqual(type(obj2), ua.ExtensionObject)
self.assertEqual(obj2.TypeId, obj.TypeId)
self.assertEqual(obj2.Body, b'example of data in custom format')
def test_datetime(self): def test_datetime(self):
now = datetime.utcnow() now = datetime.utcnow()
epch = ua.datetime_to_win_epoch(now) epch = ua.datetime_to_win_epoch(now)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment