Commit 5f19a6de authored by ratara's avatar ratara Committed by oroulet

introduced type hinting into address_space.py for online discussion

parent a17138c9
from __future__ import annotations
import asyncio
import collections
import logging
import pickle
import shelve
import logging
import collections
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from functools import partial
from typing import TYPE_CHECKING
from asyncua.ua.attribute_ids import AttributeIds
from asyncua.ua.uaprotocol_auto import RelativePathElement
if TYPE_CHECKING:
from typing import Callable, Dict, List, Union
from asyncua.ua.uaprotocol_auto import AddNodesItem
from asyncua.ua.uatypes import ExtensionObject, NodeId, DataValue
from asyncua.ua.uaprotocol_auto import BrowsePath
from asyncua import ua
from .users import User, UserRole
_logger = logging.getLogger(__name__)
class AttributeValue(object):
def __init__(self, value):
# FIXME: What is the purpose of this class?
def __init__(self, value: DataValue):
self.value = value
self.value_callback = None
self.value_callback: Union[Callable[[], None], None] = None
self.datachange_callbacks = {}
def __str__(self):
......@@ -26,10 +40,11 @@ class AttributeValue(object):
class NodeData:
def __init__(self, nodeid):
# FIXME: What is the purpose of this class?
def __init__(self, nodeid: NodeId):
self.nodeid = nodeid
self.attributes = {}
self.references = []
self.attributes: Dict[AttributeIds, AttributeValue] = {}
self.references: List[ua.ReferenceDescription] = []
self.call = None
def __str__(self):
......@@ -39,18 +54,19 @@ class NodeData:
class AttributeService:
def __init__(self, aspace: "AddressSpace"):
# FIXME: What is the purpose of this class?
def __init__(self, aspace: AddressSpace):
self.logger = logging.getLogger(__name__)
self._aspace: "AddressSpace" = aspace
self._aspace: AddressSpace = aspace
def read(self, params):
def read(self, params) -> List[DataValue]:
# self.logger.debug("read %s", params)
res = []
res: List[DataValue] = []
for readvalue in params.NodesToRead:
res.append(self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId))
return res
async def write(self, params, user=User(role=UserRole.Admin)):
async def write(self, params, user: User = User(role=UserRole.Admin)):
# self.logger.debug("write %s as user %s", params, user)
res = []
for writevalue in params.NodesToWrite:
......@@ -74,11 +90,12 @@ class AttributeService:
class ViewService(object):
def __init__(self, aspace: "AddressSpace"):
# FIXME: What is the purpose of this class?
def __init__(self, aspace: AddressSpace):
self.logger = logging.getLogger(__name__)
self._aspace: "AddressSpace" = aspace
self._aspace: AddressSpace = aspace
def browse(self, params):
def browse(self, params): # fixme: What type is 'params'?
# self.logger.debug("browse %s", params)
res = []
for desc in params.NodesToBrowse:
......@@ -111,7 +128,6 @@ class ViewService(object):
return True
def _suitable_reftype(self, ref1, ref2, subtypes):
""""""
if ref1 == ua.NodeId(ua.ObjectIds.Null):
# If ReferenceTypeId is not specified in the BrowseDescription,
# all References are returned and includeSubtypes is ignored.
......@@ -144,14 +160,14 @@ class ViewService(object):
return True
return False
def translate_browsepaths_to_nodeids(self, browsepaths):
def translate_browsepaths_to_nodeids(self, browsepaths: List[BrowsePath]):
# self.logger.debug("translate browsepath: %s", browsepaths)
results = []
for path in browsepaths:
results.append(self._translate_browsepath_to_nodeid(path))
return results
def _translate_browsepath_to_nodeid(self, path):
def _translate_browsepath_to_nodeid(self, path: BrowsePath):
# self.logger.debug("looking at path: %s", path)
res = ua.BrowsePathResult()
if not path.RelativePath.Elements[-1].TargetName:
......@@ -174,7 +190,7 @@ class ViewService(object):
# FIXME: might need to order these one way or another
return res
def _navigate(self, start_nodeid, elements):
def _navigate(self, start_nodeid: NodeId, elements: List[RelativePathElement]) -> List[NodeId]:
current_nodeids = [start_nodeid]
for el in elements:
new_currents = []
......@@ -187,8 +203,8 @@ class ViewService(object):
current_nodeids = new_currents
return current_nodeids
def _find_elements_in_node(self, el, nodeid):
nodedata = self._aspace[nodeid]
def _find_elements_in_node(self, el: RelativePathElement, nodeid: NodeId):
nodedata: NodeData = self._aspace[nodeid]
nodeids = []
for ref in nodedata.references:
if ref.BrowseName != el.TargetName:
......@@ -205,23 +221,24 @@ class ViewService(object):
class NodeManagementService:
def __init__(self, aspace: "AddressSpace"):
# FIXME: What is the purpose of this class?
def __init__(self, aspace: AddressSpace):
self.logger = logging.getLogger(__name__)
self._aspace: "AddressSpace" = aspace
self._aspace: AddressSpace = aspace
def add_nodes(self, addnodeitems, user=User(role=UserRole.Admin)):
def add_nodes(self, addnodeitems: List[ua.AddNodesItem], user: User = User(role=UserRole.Admin)):
results = []
for item in addnodeitems:
results.append(self._add_node(item, user))
return results
def try_add_nodes(self, addnodeitems, user=User(role=UserRole.Admin), check=True):
def try_add_nodes(self, addnodeitems: List[ua.AddNodesItem], user: User = User(role=UserRole.Admin), check: bool = True):
for item in addnodeitems:
ret = self._add_node(item, user, check=check)
if not ret.StatusCode.is_good():
yield item
def _add_node(self, item, user, check=True):
def _add_node(self, item: ua.AddNodesItem, user: User, check: bool = True):
# self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
result = ua.AddNodesResult()
......@@ -292,7 +309,7 @@ class NodeManagementService:
return result
def _add_node_attributes(self, nodedata, item, add_timestamps):
def _add_node_attributes(self, nodedata: NodeData, item: AddNodesItem, add_timestamps: bool):
# add common attrs
nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
......@@ -306,7 +323,7 @@ class NodeManagementService:
# add requested attrs
self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
def _add_unique_reference(self, nodedata, desc):
def _add_unique_reference(self, nodedata: NodeData, desc: ua.ReferenceDescription):
for r in nodedata.references:
if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
if r.IsForward != desc.IsForward:
......@@ -337,7 +354,7 @@ class NodeManagementService:
addref.IsForward = False
self._add_reference_no_check(nodedata, addref)
def _add_type_definition(self, nodedata, item):
def _add_type_definition(self, nodedata: NodeData, item: ua.AddNodesItem):
addref = ua.AddReferencesItem()
addref.SourceNodeId = nodedata.nodeid
addref.IsForward = True
......@@ -404,7 +421,7 @@ class NodeManagementService:
return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
return self._add_reference_no_check(sourcedata, addref)
def _add_reference_no_check(self, sourcedata, addref):
def _add_reference_no_check(self, sourcedata: NodeData, addref: ua.AddReferencesItem):
rdesc = ua.ReferenceDescription()
rdesc.ReferenceTypeId = addref.ReferenceTypeId
rdesc.IsForward = addref.IsForward
......@@ -463,31 +480,31 @@ class NodeManagementService:
)
nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
def _add_nodeattributes(self, item, nodedata, add_timestamps):
self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32, is_array=True)
self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
self._add_node_attr(item, nodedata, "DataTypeDefinition", ua.VariantType.ExtensionObject)
self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
def _add_nodeattributes(self, node_attributes: ExtensionObject, nodedata: NodeData, add_timestamps: bool):
self._add_node_attr(node_attributes, nodedata, "AccessLevel", ua.VariantType.Byte)
self._add_node_attr(node_attributes, nodedata, "ArrayDimensions", ua.VariantType.UInt32, is_array=True)
self._add_node_attr(node_attributes, nodedata, "BrowseName", ua.VariantType.QualifiedName)
self._add_node_attr(node_attributes, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
self._add_node_attr(node_attributes, nodedata, "DataType", ua.VariantType.NodeId)
self._add_node_attr(node_attributes, nodedata, "Description", ua.VariantType.LocalizedText)
self._add_node_attr(node_attributes, nodedata, "DisplayName", ua.VariantType.LocalizedText)
self._add_node_attr(node_attributes, nodedata, "EventNotifier", ua.VariantType.Byte)
self._add_node_attr(node_attributes, nodedata, "Executable", ua.VariantType.Boolean)
self._add_node_attr(node_attributes, nodedata, "Historizing", ua.VariantType.Boolean)
self._add_node_attr(node_attributes, nodedata, "InverseName", ua.VariantType.LocalizedText)
self._add_node_attr(node_attributes, nodedata, "IsAbstract", ua.VariantType.Boolean)
self._add_node_attr(node_attributes, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
self._add_node_attr(node_attributes, nodedata, "NodeClass", ua.VariantType.Int32)
self._add_node_attr(node_attributes, nodedata, "NodeId", ua.VariantType.NodeId)
self._add_node_attr(node_attributes, nodedata, "Symmetric", ua.VariantType.Boolean)
self._add_node_attr(node_attributes, nodedata, "UserAccessLevel", ua.VariantType.Byte)
self._add_node_attr(node_attributes, nodedata, "UserExecutable", ua.VariantType.Boolean)
self._add_node_attr(node_attributes, nodedata, "UserWriteMask", ua.VariantType.Byte)
self._add_node_attr(node_attributes, nodedata, "ValueRank", ua.VariantType.Int32)
self._add_node_attr(node_attributes, nodedata, "WriteMask", ua.VariantType.UInt32)
self._add_node_attr(node_attributes, nodedata, "UserWriteMask", ua.VariantType.UInt32)
self._add_node_attr(node_attributes, nodedata, "DataTypeDefinition", ua.VariantType.ExtensionObject)
self._add_node_attr(node_attributes, nodedata, "Value", add_timestamps=add_timestamps)
class MethodService:
......@@ -554,7 +571,7 @@ class AddressSpace:
self._default_idx = 2
self._nodeid_counter = {0: 20000, 1: 2000}
def __getitem__(self, nodeid):
def __getitem__(self, nodeid): # question: Why do we use this 'fancy' python stuff? - It looks pretty complicated and is confusing for me.
return self._nodes.__getitem__(nodeid)
def get(self, nodeid):
......@@ -598,7 +615,7 @@ class AddressSpace:
def keys(self):
return self._nodes.keys()
def empty(self):
def empty(self): # FIXME: missleading name empty, better clear or reset, but could be braking change
"""Delete all nodes in address space"""
self._nodes = {}
......@@ -607,15 +624,21 @@ class AddressSpace:
Dump address space as binary to file; note that server must be stopped for this method to work
DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
"""
# prepare nodes in address space for being serialized
self.__prepare_nodes_for_dump()
with open(path, 'wb') as f:
pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
def __prepare_nodes_for_dump(self):
"""
Removes unserialisable data from nodes.
* function calls
"""
for nodeid, ndata in self._nodes.items():
# if the node has a reference to a method call, remove it so the object can be serialized
if ndata.call is not None:
self._nodes[nodeid].call = None
with open(path, 'wb') as f:
pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
def load(self, path):
"""
Load address space from a binary file, overwriting everything in the current address space
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment