Commit 11bcca6a authored by Yuta Okamoto's avatar Yuta Okamoto Committed by oroulet

add from_string() and to_string() to RelativePath

parent d14c44e6
from enum import IntEnum
import re
from typing import List, Optional, Tuple
from asyncua.ua.uatypes import NodeId, NodeIdType, RelativePath, RelativePathElement, QualifiedName
from asyncua.ua.object_ids import ObjectIds, ObjectIdNames
_NS_IDX_PATTERN = re.compile(r"([0-9]*):")
_REFERENCE_TYPE_PREFIX_CHARS = "/.<"
_REFERENCE_TYPE_SUFFIX_CHAR = ">"
_INVALID_NAME_CHARS = "!:<>/."
_RESERVED_CHARS = "/.<>:#!&"
class RelativePathElementType(IntEnum):
AnyHierarchical = 0
AnyComponent = 1
ForwardReference = 2
InverseReference = 3
class RelativePathElementFormatter:
_element_type: RelativePathElementType = RelativePathElementType.AnyHierarchical
_include_subtypes: bool = True
_target_name: Optional[QualifiedName] = None
_reference_type_name: Optional[QualifiedName] = None
def __init__(self, element: Optional[RelativePathElement] = None):
if element is not None:
self._include_subtypes = element.IncludeSubtypes
self._target_name = element.TargetName
if not element.IsInverse and element.IncludeSubtypes:
if element.ReferenceTypeId.Identifier == ObjectIds.HierarchicalReferences:
self._element_type = RelativePathElementType.AnyHierarchical
elif element.ReferenceTypeId.Identifier == ObjectIds.Aggregates:
self._element_type = RelativePathElementType.AnyComponent
else:
self._element_type = RelativePathElementType.ForwardReference
self._reference_type_name = _find_reference_type_name(element.ReferenceTypeId)
else:
if not element.IsInverse:
self._element_type = RelativePathElementType.ForwardReference
else:
self._element_type = RelativePathElementType.InverseReference
self._reference_type_name = _find_reference_type_name(element.ReferenceTypeId)
assert self._element_type is not None
@staticmethod
def parse(string: str) -> Tuple['RelativePathElementFormatter', str]:
el = RelativePathElementFormatter()
rest = string
head = _peek(rest)
if head == "/":
el._element_type = RelativePathElementType.AnyHierarchical
rest = rest[1:]
elif head == ".":
el._element_type = RelativePathElementType.AnyComponent
rest = rest[1:]
elif head == "<":
el._element_type = RelativePathElementType.ForwardReference
rest = rest[1:]
if _peek(rest) == "#":
el._include_subtypes = False
rest = rest[1:]
if _peek(rest) == "!":
el._element_type = RelativePathElementType.InverseReference
rest = rest[1:]
el._reference_type_name, rest = RelativePathElementFormatter._parse_name(rest, True)
else:
el._element_type = RelativePathElementType.AnyHierarchical
el._target_name, rest = RelativePathElementFormatter._parse_name(rest, False)
return el, rest
@staticmethod
def _parse_name(string: str, is_reference: bool) -> Tuple[Optional[QualifiedName], str]:
rest = string
# Extract namespace index if present.
idx = 0
m = _NS_IDX_PATTERN.match(rest)
if m:
idx_str = m.group(1)
if idx_str:
idx = int(idx_str)
rest = rest[m.end():]
# Extract rest of name.
name = []
head: str = ""
while len(rest) > 0:
head = _peek(rest)
if is_reference:
if head == _REFERENCE_TYPE_SUFFIX_CHAR:
rest = rest[1:]
break
elif head in _REFERENCE_TYPE_PREFIX_CHARS:
break
if head in _INVALID_NAME_CHARS:
raise ValueError(f"Unexpected character '{head}' in browse path.")
if head == "&":
if len(rest) > 1:
name.append(rest[1])
rest = rest[2:]
continue
else:
raise ValueError("Missing escaped character following '&' in browse path.")
else:
name.append(head)
rest = rest[1:]
if is_reference and head != ">":
raise ValueError("Missing '>' for reference type name in browse path.")
if len(name) == 0:
if is_reference:
raise ValueError("Reference type name is null in browse path.")
if idx == 0:
return None, rest
return QualifiedName("".join(name), idx), rest
def build(self) -> RelativePathElement:
reference_type_id: Optional[NodeId] = None
is_inverse = False
include_subtypes = self._include_subtypes
target_name = self._target_name
if self._element_type == RelativePathElementType.AnyHierarchical:
reference_type_id = NodeId(ObjectIds.HierarchicalReferences)
elif self._element_type == RelativePathElementType.AnyComponent:
reference_type_id = NodeId(ObjectIds.Aggregates)
elif self._element_type == RelativePathElementType.ForwardReference:
reference_type_id = _find_reference_type(self._reference_type_name)
elif self._element_type == RelativePathElementType.InverseReference:
reference_type_id = _find_reference_type(self._reference_type_name)
is_inverse = True
if reference_type_id is None:
raise ValueError(f"Could not convert BrowseName to a ReferenceTypeId: {self._reference_type_name}")
return RelativePathElement(ReferenceTypeId=reference_type_id, IsInverse=is_inverse, IncludeSubtypes=include_subtypes, TargetName=target_name)
def to_string(self) -> str:
path = []
# Append the reference type component.
if self._element_type == RelativePathElementType.AnyHierarchical:
path.append("/")
elif self._element_type == RelativePathElementType.AnyComponent:
path.append(".")
elif (
self._element_type == RelativePathElementType.ForwardReference or self._element_type == RelativePathElementType.InverseReference
):
if self._reference_type_name and self._reference_type_name.Name:
path.append("<")
if not self._include_subtypes:
path.append("#")
if self._element_type == RelativePathElementType.InverseReference:
path.append("!")
if self._reference_type_name.NamespaceIndex != 0:
path.append(f"{self._reference_type_name.NamespaceIndex}:")
path.append(_encode_name(self._reference_type_name.Name))
path.append(">")
# Append the target browse name component.
if self._target_name and self._target_name.Name:
if self._target_name.NamespaceIndex != 0:
path.append(f"{self._target_name.NamespaceIndex}:")
path.append(_encode_name(self._target_name.Name))
return "".join(path)
class RelativePathFormatter:
"""
Implementation of OPC-UA Specification Part 4: Services - A.2 BNF of RelativePath.
https://reference.opcfoundation.org/Core/Part4/v105/docs/A.2
"""
_elements: List[RelativePathElementFormatter]
def __init__(self, relative_path: Optional[RelativePath] = None):
self._elements = []
if relative_path:
self._elements = [RelativePathElementFormatter(el) for el in relative_path.Elements]
@staticmethod
def parse(string: str):
formatter = RelativePathFormatter()
if string:
rest = string
try:
while len(rest) > 0:
el, rest = RelativePathElementFormatter.parse(rest)
formatter._elements.append(el)
except Exception as e:
raise ValueError(f"Cannot parse relative path: {string}") from e
return formatter
def build(self) -> RelativePath:
return RelativePath(Elements=[el.build() for el in self._elements])
def to_string(self) -> str:
return "".join([el.to_string() for el in self._elements])
def _peek(string: str) -> Optional[str]:
return string[0] if len(string) > 0 else None
def _encode_name(name: str) -> str:
return "".join([ch if ch not in _RESERVED_CHARS else f"&{ch}" for ch in name])
def _find_reference_type(reference_type_name: QualifiedName) -> NodeId:
type_id = getattr(ObjectIds, reference_type_name.Name, None)
if type_id is not None:
return NodeId(Identifier=type_id, NamespaceIndex=0)
else:
raise ValueError('Non-standard ReferenceTypes are not supported.')
def _find_reference_type_name(reference_type_id: NodeId) -> QualifiedName:
if reference_type_id.Identifier in ObjectIdNames.keys():
id_type = reference_type_id.NodeIdType
if id_type == NodeIdType.TwoByte or id_type == NodeIdType.FourByte or id_type == NodeIdType.Numeric:
type_id: int = reference_type_id.Identifier
return QualifiedName.from_string(ObjectIdNames[type_id])
raise ValueError('Non-integer NodeIds are not supported.')
else:
raise ValueError('Non-standard ReferenceTypes are not supported.')
......@@ -13,6 +13,7 @@ from asyncua.ua.uatypes import SByte, Byte, Bytes, ByteString, Int16, Int32, Int
from asyncua.ua.uatypes import UInt64, Boolean, Float, Double, Null, String, CharArray, DateTime, Guid
from asyncua.ua.uatypes import AccessLevel, EventNotifier
from asyncua.ua.uatypes import LocalizedText, Variant, QualifiedName, StatusCode, DataValue
from asyncua.ua.uatypes import RelativePath, RelativePathElement
from asyncua.ua.uatypes import NodeId, FourByteNodeId, ExpandedNodeId, ExtensionObject, DiagnosticInfo
from asyncua.ua.uatypes import extension_object_typeids, extension_objects_by_typeid
from asyncua.ua.object_ids import ObjectIds
......@@ -6099,43 +6100,6 @@ class BrowseNextResponse:
self.ResponseHeader_ = val
@dataclass(frozen=FROZEN)
class RelativePathElement:
"""
https://reference.opcfoundation.org/v105/Core/docs/Part4/7.31
:ivar ReferenceTypeId:
:vartype ReferenceTypeId: NodeId
:ivar IsInverse:
:vartype IsInverse: Boolean
:ivar IncludeSubtypes:
:vartype IncludeSubtypes: Boolean
:ivar TargetName:
:vartype TargetName: QualifiedName
"""
data_type = NodeId(ObjectIds.RelativePathElement)
ReferenceTypeId: NodeId = field(default_factory=NodeId)
IsInverse: Boolean = True
IncludeSubtypes: Boolean = True
TargetName: QualifiedName = field(default_factory=QualifiedName)
@dataclass(frozen=FROZEN)
class RelativePath:
"""
https://reference.opcfoundation.org/v105/Core/docs/Part4/7.31
:ivar Elements:
:vartype Elements: RelativePathElement
"""
data_type = NodeId(ObjectIds.RelativePath)
Elements: List[RelativePathElement] = field(default_factory=list)
@dataclass(frozen=FROZEN)
class BrowsePath:
"""
......
......@@ -56,6 +56,7 @@ def type_is_union(uatype):
def type_is_list(uatype):
return get_origin(uatype) == list
def type_allow_subclass(uatype):
return get_origin(uatype) not in [Union, list, None]
......@@ -696,6 +697,55 @@ class QualifiedName:
return QualifiedName(Name=name, NamespaceIndex=idx)
@dataclass(frozen=False)
class RelativePathElement:
"""
https://reference.opcfoundation.org/v105/Core/docs/Part4/7.31
:ivar ReferenceTypeId:
:vartype ReferenceTypeId: NodeId
:ivar IsInverse:
:vartype IsInverse: Boolean
:ivar IncludeSubtypes:
:vartype IncludeSubtypes: Boolean
:ivar TargetName:
:vartype TargetName: QualifiedName
"""
data_type = NodeId(537)
ReferenceTypeId: NodeId = field(default_factory=NodeId)
IsInverse: Boolean = True
IncludeSubtypes: Boolean = True
TargetName: QualifiedName = field(default_factory=QualifiedName)
@dataclass(frozen=False)
class RelativePath:
"""
https://reference.opcfoundation.org/v105/Core/docs/Part4/7.31
:ivar Elements:
:vartype Elements: RelativePathElement
"""
data_type = NodeId(540)
Elements: List[RelativePathElement] = field(default_factory=list)
@staticmethod
def from_string(string: str):
from asyncua.ua.relative_path import RelativePathFormatter
return RelativePathFormatter.parse(string).build()
def to_string(self) -> str:
from asyncua.ua.relative_path import RelativePathFormatter
# Note: RelativePathFormatter will raise ValueError if ReferenceType is non-standard.
return RelativePathFormatter(self).to_string()
@dataclass(frozen=True, init=False)
class LocalizedText:
"""
......@@ -872,7 +922,7 @@ class Variant:
def __post_init__(self):
if self.is_array is None:
if isinstance(self.Value, (list, tuple)) or self.Dimensions :
if isinstance(self.Value, (list, tuple)) or self.Dimensions:
object.__setattr__(self, "is_array", True)
else:
object.__setattr__(self, "is_array", False)
......@@ -1027,7 +1077,7 @@ class DataValue:
Encoding: Byte = field(default=0, repr=False, init=False, compare=False)
Value: Optional[Variant] = None
StatusCode_: Optional[StatusCode] = field(default_factory=StatusCode)
SourceTimestamp: Optional[DateTime] = None # FIXME type DateType raises type hinting errors because datetime is assigned
SourceTimestamp: Optional[DateTime] = None # FIXME type DateType raises type hinting errors because datetime is assigned
ServerTimestamp: Optional[DateTime] = None
SourcePicoseconds: Optional[UInt16] = None
ServerPicoseconds: Optional[UInt16] = None
......
......@@ -70,6 +70,7 @@ class CodeGenerator:
self.write('from asyncua.ua.uatypes import UInt64, Boolean, Float, Double, Null, String, CharArray, DateTime, Guid')
self.write('from asyncua.ua.uatypes import AccessLevel, EventNotifier ')
self.write('from asyncua.ua.uatypes import LocalizedText, Variant, QualifiedName, StatusCode, DataValue')
self.write('from asyncua.ua.uatypes import RelativePath, RelativePathElement')
self.write('from asyncua.ua.uatypes import NodeId, FourByteNodeId, ExpandedNodeId, ExtensionObject, DiagnosticInfo')
self.write('from asyncua.ua.uatypes import extension_object_typeids, extension_objects_by_typeid')
self.write('from asyncua.ua.object_ids import ObjectIds')
......
import pytest
from asyncua.ua.object_ids import ObjectIds
from asyncua.ua.uatypes import RelativePath
def test_relative_path():
"""
The following examples from 1 to 7 are taken from OPC-UA Specification Part 4 - Services, A.2 BNF of RelativePath.
https://reference.opcfoundation.org/Core/Part4/v105/docs/A.2
"""
path1 = RelativePath.from_string("/2:Block&.Output")
assert 1 == len(path1.Elements)
assert path1.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path1.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HierarchicalReferences
assert path1.Elements[0].IncludeSubtypes is True
assert path1.Elements[0].IsInverse is False
assert path1.Elements[0].TargetName.NamespaceIndex == 2
assert path1.Elements[0].TargetName.Name == "Block.Output"
assert path1.to_string() == "/2:Block&.Output"
path1_1 = RelativePath.from_string(".2:Block&.Output")
assert 1 == len(path1.Elements)
assert path1_1.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path1_1.Elements[0].ReferenceTypeId.Identifier == ObjectIds.Aggregates
assert path1_1.Elements[0].IncludeSubtypes is True
assert path1_1.Elements[0].IsInverse is False
assert path1_1.Elements[0].TargetName.NamespaceIndex == 2
assert path1_1.Elements[0].TargetName.Name == "Block.Output"
assert path1_1.to_string() == ".2:Block&.Output"
path2 = RelativePath.from_string("/3:Truck.0:NodeVersion")
assert 2 == len(path2.Elements)
assert path2.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path2.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HierarchicalReferences
assert path2.Elements[0].IncludeSubtypes is True
assert path2.Elements[0].IsInverse is False
assert path2.Elements[0].TargetName.NamespaceIndex == 3
assert path2.Elements[0].TargetName.Name == "Truck"
assert path2.Elements[1].ReferenceTypeId.NamespaceIndex == 0
assert path2.Elements[1].ReferenceTypeId.Identifier == ObjectIds.Aggregates
assert path2.Elements[1].IncludeSubtypes is True
assert path2.Elements[1].IsInverse is False
assert path2.Elements[1].TargetName.NamespaceIndex == 0
assert path2.Elements[1].TargetName.Name == "NodeVersion"
assert path2.to_string() == "/3:Truck.NodeVersion"
# TODO: Fix to use <1:ConnectedTo> when the non-standard reference types are supported.
path3 = RelativePath.from_string("<0:HasChild>1:Boiler/1:HeatSensor")
assert 2 == len(path3.Elements)
assert path3.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path3.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HasChild
assert path3.Elements[0].IncludeSubtypes is True
assert path3.Elements[0].IsInverse is False
assert path3.Elements[0].TargetName.NamespaceIndex == 1
assert path3.Elements[0].TargetName.Name == "Boiler"
assert path3.Elements[1].ReferenceTypeId.NamespaceIndex == 0
assert path3.Elements[1].ReferenceTypeId.Identifier == ObjectIds.HierarchicalReferences
assert path3.Elements[1].IncludeSubtypes is True
assert path3.Elements[1].IsInverse is False
assert path3.Elements[1].TargetName.NamespaceIndex == 1
assert path3.Elements[1].TargetName.Name == "HeatSensor"
assert path3.to_string() == "<HasChild>1:Boiler/1:HeatSensor"
# TODO: Fix to use <1:ConnectedTo> when the non-standard reference types are supported.
path4 = RelativePath.from_string("<0:HasChild>1:Boiler/")
assert 2 == len(path4.Elements)
assert path4.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path4.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HasChild
assert path4.Elements[0].IncludeSubtypes is True
assert path4.Elements[0].IsInverse is False
assert path4.Elements[0].TargetName.NamespaceIndex == 1
assert path4.Elements[0].TargetName.Name == "Boiler"
assert path4.Elements[1].ReferenceTypeId.NamespaceIndex == 0
assert path4.Elements[1].ReferenceTypeId.Identifier == ObjectIds.HierarchicalReferences
assert path4.Elements[1].IncludeSubtypes is True
assert path4.Elements[1].IsInverse is False
assert path4.Elements[1].TargetName is None
assert path4.to_string() == "<HasChild>1:Boiler/"
path5 = RelativePath.from_string("<0:HasChild>2:Wheel")
assert 1 == len(path5.Elements)
assert path5.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path5.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HasChild
assert path5.Elements[0].IncludeSubtypes is True
assert path5.Elements[0].IsInverse is False
assert path5.Elements[0].TargetName.NamespaceIndex == 2
assert path5.Elements[0].TargetName.Name == "Wheel"
assert path5.to_string() == "<HasChild>2:Wheel"
path6 = RelativePath.from_string("<!HasChild>Truck")
assert 1 == len(path6.Elements)
assert path6.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path6.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HasChild
assert path6.Elements[0].IncludeSubtypes is True
assert path6.Elements[0].IsInverse is True
assert path6.Elements[0].TargetName.NamespaceIndex == 0
assert path6.Elements[0].TargetName.Name == "Truck"
assert path6.to_string() == "<!HasChild>Truck"
path7 = RelativePath.from_string("<0:HasChild>")
assert 1 == len(path7.Elements)
assert path7.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path7.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HasChild
assert path7.Elements[0].IncludeSubtypes is True
assert path7.Elements[0].IsInverse is False
assert path7.Elements[0].TargetName is None
assert path7.to_string() == "<HasChild>"
path8 = RelativePath.from_string("<#0:HasChild>Truck")
assert 1 == len(path8.Elements)
assert path8.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path8.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HasChild
assert path8.Elements[0].IncludeSubtypes is False
assert path8.Elements[0].IsInverse is False
assert path8.Elements[0].TargetName.NamespaceIndex == 0
assert path8.Elements[0].TargetName.Name == "Truck"
assert path8.to_string() == "<#HasChild>Truck"
path9 = RelativePath.from_string("<#!0:HasChild>Truck")
assert 1 == len(path9.Elements)
assert path9.Elements[0].ReferenceTypeId.NamespaceIndex == 0
assert path9.Elements[0].ReferenceTypeId.Identifier == ObjectIds.HasChild
assert path9.Elements[0].IncludeSubtypes is False
assert path9.Elements[0].IsInverse is True
assert path9.Elements[0].TargetName.NamespaceIndex == 0
assert path9.Elements[0].TargetName.Name == "Truck"
assert path9.to_string() == "<#!HasChild>Truck"
def test_relative_path_with_non_standard_reference_type():
# TODO: Remove after the non-standard reference types are supported.
with pytest.raises(ValueError):
RelativePath.from_string("<1:ConnectedTo>1:Boiler/1:HeatSensor")
with pytest.raises(ValueError):
RelativePath.from_string("<1:ConnectedTo>1:Boiler/")
def test_relative_path_with_invalid_format():
with pytest.raises(ValueError):
RelativePath.from_string("/1:<Boiler") # Non-escaped '<' is invalid.
with pytest.raises(ValueError):
RelativePath.from_string("/1:Boiler&") # '&' is appeared without a follwing character.
with pytest.raises(ValueError):
RelativePath.from_string("<0:HasChild") # Closing delimiter '>' is missing.
with pytest.raises(ValueError):
RelativePath.from_string("<0:>1:Boiler") # Empty reference type name
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment