Commit b2302f93 authored by Fabian Beitler's avatar Fabian Beitler Committed by oroulet

Replace .format strings with f-strings

Remove unused import statements
Make code more PEP8 conform
parent 1907fe28
......@@ -86,9 +86,10 @@ class Client:
"""
_logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
for ep in endpoints:
if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and ep.SecurityPolicyUri == policy_uri):
if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and
ep.SecurityPolicyUri == policy_uri):
return ep
raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
raise ua.UaError(f"No matching endpoints: {security_mode}, {policy_uri}")
def set_user(self, username: str):
"""
......@@ -126,14 +127,14 @@ class Client:
return
parts = string.split(",")
if len(parts) < 4:
raise ua.UaError("Wrong format: `{}`, expected at least 4 comma-separated values".format(string))
raise ua.UaError(f"Wrong format: `{string}`, expected at least 4 comma-separated values")
if '::' in parts[3]: # if the filename contains a colon, assume it's a conjunction and parse it
parts[3], client_key_password = parts[3].split('::')
else:
client_key_password = None
policy_class = getattr(security_policies, "SecurityPolicy{}".format(parts[0]))
policy_class = getattr(security_policies, f"SecurityPolicy{parts[0]}")
mode = getattr(ua.MessageSecurityMode, parts[1])
return await self.set_security(policy_class, parts[2], parts[3], client_key_password,
parts[4] if len(parts) >= 5 else None, mode)
......@@ -276,7 +277,8 @@ class Client:
params.ClientNonce = create_nonce(self.security_policy.symmetric_key_size)
result = await self.uaclient.open_secure_channel(params)
if self.secure_channel_timeout != result.SecurityToken.RevisedLifetime:
_logger.info("Requested secure channel timeout to be %dms, got %dms instead", self.secure_channel_timeout, result.SecurityToken.RevisedLifetime)
_logger.info("Requested secure channel timeout to be %dms, got %dms instead",
self.secure_channel_timeout, result.SecurityToken.RevisedLifetime)
self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
async def close_secure_channel(self):
......@@ -363,7 +365,8 @@ class Client:
self._policy_ids = ep.UserIdentityTokens
# Actual maximum number of milliseconds that a Session shall remain open without activity
if self.session_timeout != response.RevisedSessionTimeout:
_logger.warning("Requested session timeout to be %dms, got %dms instead", self.secure_channel_timeout, response.RevisedSessionTimeout)
_logger.warning("Requested session timeout to be %dms, got %dms instead",
self.secure_channel_timeout, response.RevisedSessionTimeout)
self.session_timeout = response.RevisedSessionTimeout
self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
return response
......
......@@ -83,7 +83,7 @@ class CallbackDispatcher(object):
self.addListener(eventName, getattr(subscriber, params))
elif isinstance(params, list):
if not params:
raise ValueError('Invalid params "{0!r}" for event "{1!s}"'.format(params, eventName))
raise ValueError(f'Invalid params "{repr(params)}" for event "{str(eventName)}"')
if len(params) <= 2 and isinstance(params[0], str):
priority = params[1] if len(params) > 1 else 0
self.addListener(eventName, getattr(subscriber, params[0]), priority)
......@@ -92,4 +92,4 @@ class CallbackDispatcher(object):
priority = listener[1] if len(listener) > 1 else 0
self.addListener(eventName, getattr(subscriber, listener[0]), priority)
else:
raise ValueError('Invalid params for event "{0!s}"'.format(eventName))
raise ValueError(f'Invalid params for event "{str(eventName)}"')
......@@ -13,14 +13,15 @@ class MessageChunk(ua.FrozenClass):
"""
Message Chunk, as described in OPC UA specs Part 6, 6.7.2.
"""
def __init__(self, security_policy, body=b'', msg_type=ua.MessageType.SecureMessage, chunk_type=ua.ChunkType.Single):
def __init__(self, security_policy, body=b'', msg_type=ua.MessageType.SecureMessage,
chunk_type=ua.ChunkType.Single):
self.MessageHeader = ua.Header(msg_type, chunk_type)
if msg_type in (ua.MessageType.SecureMessage, ua.MessageType.SecureClose):
self.SecurityHeader = ua.SymmetricAlgorithmHeader()
elif msg_type == ua.MessageType.SecureOpen:
self.SecurityHeader = ua.AsymmetricAlgorithmHeader()
else:
raise ua.UaError("Unsupported message type: {0}".format(msg_type))
raise ua.UaError(f"Unsupported message type: {msg_type}")
self.SequenceHeader = ua.SequenceHeader()
self.Body = body
self.security_policy = security_policy
......@@ -43,7 +44,7 @@ class MessageChunk(ua.FrozenClass):
security_header = struct_from_binary(ua.AsymmetricAlgorithmHeader, data)
crypto = security_policy.asymmetric_cryptography
else:
raise ua.UaError("Unsupported message type: {0}".format(header.MessageType))
raise ua.UaError(f"Unsupported message type: {header.MessageType}")
obj = MessageChunk(crypto)
obj.MessageHeader = header
obj.SecurityHeader = security_header
......@@ -52,7 +53,8 @@ class MessageChunk(ua.FrozenClass):
if signature_size > 0:
signature = decrypted[-signature_size:]
decrypted = decrypted[:-signature_size]
crypto.verify(header_to_binary(obj.MessageHeader) + struct_to_binary(obj.SecurityHeader) + decrypted, signature)
crypto.verify(header_to_binary(obj.MessageHeader) + struct_to_binary(obj.SecurityHeader) + decrypted,
signature)
data = ua.utils.Buffer(crypto.remove_padding(decrypted))
obj.SequenceHeader = struct_from_binary(ua.SequenceHeader, data)
obj.Body = data.read(len(data))
......@@ -81,7 +83,8 @@ class MessageChunk(ua.FrozenClass):
return max_plain_size - ua.SequenceHeader.max_size() - crypto.signature_size() - crypto.min_padding_size()
@staticmethod
def message_to_chunks(security_policy, body, max_chunk_size, message_type=ua.MessageType.SecureMessage, channel_id=1, request_id=1, token_id=1):
def message_to_chunks(security_policy, body, max_chunk_size, message_type=ua.MessageType.SecureMessage,
channel_id=1, request_id=1, token_id=1):
"""
Pack message body (as binary string) into one or more chunks.
Size of each chunk will not exceed max_chunk_size.
......@@ -119,7 +122,8 @@ class MessageChunk(ua.FrozenClass):
return chunks
def __str__(self):
return "{0}({1}, {2}, {3}, {4} bytes)".format(self.__class__.__name__, self.MessageHeader, self.SequenceHeader, self.SecurityHeader, len(self.Body))
return f"{self.__class__.__name__}({self.MessageHeader}, {self.SequenceHeader}," \
f" {self.SecurityHeader}, {len(self.Body)} bytes)"
__repr__ = __str__
......@@ -215,7 +219,7 @@ class SecureConnection:
self.security_policy = policy.create(peer_certificate)
return
if self.security_policy.URI != uri or (mode is not None and self.security_policy.Mode != mode):
raise ua.UaError("No matching policy: {0}, {1}".format(uri, mode))
raise ua.UaError(f"No matching policy: {uri}, {mode}")
def revolve_tokens(self):
"""
......@@ -235,7 +239,9 @@ class SecureConnection:
The only supported types are SecureOpen, SecureMessage, SecureClose.
If message_type is SecureMessage, the AlgorithmHeader should be passed as arg.
"""
chunks = MessageChunk.message_to_chunks(self.security_policy, message, self._max_chunk_size, message_type=message_type, channel_id=self.security_token.ChannelId, request_id=request_id, token_id=self.security_token.TokenId)
chunks = MessageChunk.message_to_chunks(self.security_policy, message, self._max_chunk_size,
message_type=message_type, channel_id=self.security_token.ChannelId,
request_id=request_id, token_id=self.security_token.TokenId)
for chunk in chunks:
self._sequence_number += 1
if self._sequence_number >= (1 << 32):
......@@ -249,7 +255,7 @@ class SecureConnection:
Validates the symmetric header of the message chunk and revolves the
security token if needed.
"""
assert isinstance(security_hdr, ua.SymmetricAlgorithmHeader), "Expected SymAlgHeader, got: {0}".format(security_hdr)
assert isinstance(security_hdr, ua.SymmetricAlgorithmHeader), f"Expected SymAlgHeader, got: {security_hdr}"
if security_hdr.TokenId == self.security_token.TokenId:
return
......@@ -263,25 +269,29 @@ class SecureConnection:
# expired SecurityToken for up to 25 % of the token lifetime. This should ensure that
# Messages sent by the Server before the token expired are not rejected because of
# network delays.
timeout = self.prev_security_token.CreatedAt + timedelta(milliseconds=self.prev_security_token.RevisedLifetime * 1.25)
timeout = self.prev_security_token.CreatedAt + \
timedelta(milliseconds=self.prev_security_token.RevisedLifetime * 1.25)
if timeout < datetime.utcnow():
raise ua.UaError("Security token id {} has timed out ({} < {})".format(security_hdr.TokenId, timeout, datetime.utcnow()))
raise ua.UaError(f"Security token id {security_hdr.TokenId} has timed out "
f"({timeout} < {datetime.utcnow()})")
return
expected_tokens = [self.security_token.TokenId, self.next_security_token.TokenId]
if self._allow_prev_token:
expected_tokens.insert(0, self.prev_security_token.TokenId)
raise ua.UaError("Invalid security token id {}, expected one of: {}".format(security_hdr.TokenId, expected_tokens))
raise ua.UaError(f"Invalid security token id {security_hdr.TokenId}, expected one of: {expected_tokens}")
def _check_incoming_chunk(self, chunk):
if not isinstance(chunk, MessageChunk):
raise ValueError(f'Expected chunk, got: {chunk}')
if chunk.MessageHeader.MessageType != ua.MessageType.SecureOpen:
if chunk.MessageHeader.ChannelId != self.security_token.ChannelId:
raise ua.UaError('Wrong channel id {0}, expected {1}'.format(chunk.MessageHeader.ChannelId, self.security_token.ChannelId))
raise ua.UaError(f'Wrong channel id {chunk.MessageHeader.ChannelId},'
f' expected {self.security_token.ChannelId}')
if self._incoming_parts:
if self._incoming_parts[0].SequenceHeader.RequestId != chunk.SequenceHeader.RequestId:
raise ua.UaError('Wrong request id {0}, expected {1}'.format(chunk.SequenceHeader.RequestId, self._incoming_parts[0].SequenceHeader.RequestId))
raise ua.UaError(f'Wrong request id {chunk.SequenceHeader.RequestId},'
f' expected {self._incoming_parts[0].SequenceHeader.RequestId}')
# The sequence number must monotonically increase (but it can wrap around)
seq_num = chunk.SequenceHeader.SequenceNumber
if self._peer_sequence_number is not None:
......@@ -292,7 +302,9 @@ class SecureConnection:
logger.debug('Sequence number wrapped: %d -> %d', self._peer_sequence_number, seq_num)
else:
# Condition for monotonically increase is not met
raise ua.UaError("Received chunk: {0} with wrong sequence expecting: {1}, received: {2}, spec says to close connection".format(chunk, self._peer_sequence_number, seq_num))
raise ua.UaError(f"Received chunk: {chunk} with wrong sequence expecting:"
f" {self._peer_sequence_number}, received: {seq_num},"
f" spec says to close connection")
self._peer_sequence_number = seq_num
def receive_from_header_and_body(self, header, body):
......@@ -323,9 +335,9 @@ class SecureConnection:
return msg
if header.MessageType == ua.MessageType.Error:
msg = struct_from_binary(ua.ErrorMessage, body)
logger.warning("Received an error: %s", msg)
logger.warning(f"Received an error: {msg}")
return msg
raise ua.UaError("Unsupported message type {0}".format(header.MessageType))
raise ua.UaError(f"Unsupported message type {header.MessageType}")
def _receive(self, msg):
self._check_incoming_chunk(msg)
......@@ -334,7 +346,7 @@ class SecureConnection:
return None
if msg.MessageHeader.ChunkType == ua.ChunkType.Abort:
err = struct_from_binary(ua.ErrorMessage, ua.utils.Buffer(msg.Body))
logger.warning("Message %s aborted: %s", msg, err)
logger.warning(f"Message {msg} aborted: {err}")
# specs Part 6, 6.7.3 say that aborted message shall be ignored
# and SecureChannel should not be closed
self._incoming_parts = []
......@@ -343,4 +355,4 @@ class SecureConnection:
message = ua.Message(self._incoming_parts)
self._incoming_parts = []
return message
raise ua.UaError("Unsupported chunk type: {0}".format(msg))
raise ua.UaError(f"Unsupported chunk type: {msg}")
......@@ -34,7 +34,8 @@ async def _copy_node(server, parent_nodeid, rdesc, nodeid, recursive):
if recursive:
descs = await node_to_copy.get_children_descriptions()
for desc in descs:
nodes = await _copy_node(server, res.AddedNodeId, desc, nodeid=ua.NodeId(namespaceidx=desc.NodeId.NamespaceIndex), recursive=True)
nodes = await _copy_node(server, res.AddedNodeId, desc,
nodeid=ua.NodeId(namespaceidx=desc.NodeId.NamespaceIndex), recursive=True)
added_nodes.extend(nodes)
return added_nodes
......@@ -73,5 +74,6 @@ async def _read_and_copy_attrs(node_type, struct, addnode):
else:
setattr(struct, name, results[idx].Value.Value)
else:
logger.warning("Instantiate: while copying attributes from node type {0!s}, attribute {1!s}, statuscode is {2!s}".format(node_type, name, results[idx].StatusCode))
logger.warning(f"Instantiate: while copying attributes from node type {str(node_type)},"
f" attribute {str(name)}, statuscode is {str(results[idx].StatusCode)}")
addnode.NodeAttributes = struct
......@@ -202,7 +202,9 @@ async def get_event_obj_from_type_node(node):
name = (await prop.read_browse_name()).Name
val = await prop.read_data_value()
self.add_property(name, val.Value.Value, val.Value.VariantType)
parents = await curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
parents = await curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype,
direction=ua.BrowseDirection.Inverse,
includesubtypes=True)
if len(parents) != 1: # Something went wrong
raise UaError("Parent of event type could notbe found")
......@@ -218,11 +220,13 @@ async def get_event_obj_from_type_node(node):
async def _find_parent_eventtype(node):
"""
"""
parents = await node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
parents = await node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype,
direction=ua.BrowseDirection.Inverse, includesubtypes=True)
if len(parents) != 1: # Something went wrong
raise UaError("Parent of event type could notbe found")
if parents[0].nodeid.Identifier in asyncua.common.event_objects.IMPLEMENTED_EVENTS.keys():
return parents[0].nodeid.Identifier, asyncua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
return parents[0].nodeid.Identifier,\
asyncua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
else:
return await _find_parent_eventtype(parents[0])
......@@ -95,12 +95,12 @@ async def _instantiate_node(server,
if not refs:
# spec says to ignore nodes without modelling rules
logger.info("Instantiate: Skip node without modelling rule %s as part of %s",
c_rdesc.BrowseName, addnode.BrowseName)
c_rdesc.BrowseName, addnode.BrowseName)
continue
# exclude nodes with optional ModellingRule if requested
if not instantiate_optional and refs[0].nodeid == ua.NodeId(ua.ObjectIds.ModellingRule_Optional):
logger.info("Instantiate: Skip optional node %s as part of %s", c_rdesc.BrowseName,
addnode.BrowseName)
addnode.BrowseName)
continue
# if root node being instantiated has a String NodeId, create the children with a String NodeId
if res.AddedNodeId.NodeIdType is ua.NodeIdType.String:
......
......@@ -32,8 +32,8 @@ def _parse_nodeid_qname(*args):
raise
except Exception as ex:
raise TypeError(
"This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {0} and got exception {1}".format(
args, ex)
f"This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname."
f" Received arguments {args} and got exception {ex}"
)
......@@ -118,7 +118,7 @@ async def create_variable_type(parent, nodeid, bname, datatype):
datatype = ua.NodeId(datatype, 0)
if datatype and not isinstance(datatype, ua.NodeId):
raise RuntimeError(
"Data type argument must be a nodeid or an int refering to a nodeid, received: {}".format(datatype))
f"Data type argument must be a nodeid or an int refering to a nodeid, received: {datatype}")
return make_node(
parent.server,
await _create_variable_type(parent.server, parent.nodeid, nodeid, qname, datatype)
......@@ -155,7 +155,8 @@ async def create_method(parent, *args):
args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
or idx, name, method_to_be_called, [input argument types], [output argument types]
if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
a callback is a method accepting the nodeid of the parent as first argument and variants after.
returns a list of variants
"""
_logger.info('create_method %r', parent)
nodeid, qname = _parse_nodeid_qname(*args[:2])
......
......@@ -85,27 +85,27 @@ def uamethod(func):
def _format_call_inputs(parent, *args):
if isinstance(parent, ua.NodeId):
return (parent, *[arg.Value for arg in args])
else:
self = parent
parent = args[0]
args = args[1:]
return (self, parent, *[arg.Value for arg in args])
if isinstance(parent, ua.NodeId):
return parent, *[arg.Value for arg in args]
else:
self = parent
parent = args[0]
args = args[1:]
return self, parent, *[arg.Value for arg in args]
def _format_call_outputs(result):
if result is None:
return []
elif isinstance(result, ua.CallMethodResult):
result.OutputArguments = to_variant(*result.OutputArguments)
return result
elif isinstance(result, ua.StatusCode):
return result
elif isinstance(result, tuple):
return to_variant(*result)
else:
return to_variant(result)
if result is None:
return []
elif isinstance(result, ua.CallMethodResult):
result.OutputArguments = to_variant(*result.OutputArguments)
return result
elif isinstance(result, ua.StatusCode):
return result
elif isinstance(result, tuple):
return to_variant(*result)
else:
return to_variant(result)
def to_variant(*args):
......
......@@ -34,7 +34,7 @@ def _to_nodeid(nodeid):
elif type(nodeid) in (str, bytes):
return ua.NodeId.from_string(nodeid)
else:
raise ua.UaError("Could not resolve '{0}' to a type id".format(nodeid))
raise ua.UaError(f"Could not resolve '{nodeid}' to a type id")
class Node:
......@@ -58,7 +58,8 @@ class Node:
elif isinstance(nodeid, int):
self.nodeid = ua.NodeId(nodeid, 0)
else:
raise ua.UaError("argument to node must be a NodeId object or a string defining a nodeid found {0} of type {1}".format(nodeid, type(nodeid)))
raise ua.UaError(f"argument to node must be a NodeId object or a string"
f" defining a nodeid found {nodeid} of type {type(nodeid)}")
self.basenodeid = None
def __eq__(self, other):
......@@ -73,7 +74,7 @@ class Node:
return self.nodeid.to_string()
def __repr__(self):
return "Node({0})".format(self.nodeid)
return f"Node({self.nodeid})"
def __hash__(self):
return self.nodeid.__hash__()
......@@ -160,8 +161,8 @@ class Node:
"""
Get value of a node as a python type. Only variables ( and properties) have values.
An exception will be generated for other node types.
WARNING: on server side, this function returns a ref to object in ua database. Do not modify it if it is a mutable
object unless you know what you are doing
WARNING: on server side, this function returns a ref to object in ua database.
Do not modify it if it is a mutable object unless you know what you are doing
"""
result = await self.read_data_value()
return result.Value.Value
......@@ -410,7 +411,7 @@ class Node:
returns type definition of the node.
"""
references = await self.get_references(refs=ua.ObjectIds.HasTypeDefinition,
direction=ua.BrowseDirection.Forward)
direction=ua.BrowseDirection.Forward)
if len(references) == 0:
return None
return references[0].NodeId
......
......@@ -60,7 +60,7 @@ class {0}(IntEnum):
for EnumeratedValue in self.fields:
name = EnumeratedValue.Name
value = EnumeratedValue.Value
code += " {} = {}\n".format(name, value)
code += f" {name} = {value}\n"
return code
......@@ -81,7 +81,7 @@ class Struct(object):
self.typeid = None
def __str__(self):
return "Struct(name={}, fields={}".format(self.name, self.fields)
return f"Struct(name={self.name}, fields={self.fields}"
__repr__ = __str__
def get_code(self):
......@@ -126,7 +126,7 @@ class Field(object):
self.array = False
def __str__(self):
return "Field(name={}, uatype={}".format(self.name, self.uatype)
return f"Field(name={self.name}, uatype={self.uatype}"
__repr__ = __str__
......@@ -188,7 +188,8 @@ class StructGenerator(object):
def _make_registration(self):
code = "\n\n"
for struct in self.model:
code += f"ua.register_extension_object('{struct.name}', ua.NodeId.from_string('{struct.typeid}'), {struct.name})\n"
code += f"ua.register_extension_object('{struct.name}'," \
f" ua.NodeId.from_string('{struct.typeid}'), {struct.name})\n"
return code
def get_python_classes(self, env=None):
......@@ -213,7 +214,6 @@ from asyncua import ua
return
async def load_type_definitions(server, nodes=None):
"""
Download xml from given variable node defining custom structures.
......@@ -250,10 +250,10 @@ async def load_type_definitions(server, nodes=None):
for ndesc in await node.get_children_descriptions():
ndesc_node = server.get_node(ndesc.NodeId)
ref_desc_list = await ndesc_node.get_references(refs=ua.ObjectIds.HasDescription,
direction=ua.BrowseDirection.Inverse)
direction=ua.BrowseDirection.Inverse)
if ref_desc_list: # some server put extra things here
name = _clean_name(ndesc.BrowseName.Name)
if not name in structs_dict:
if name not in structs_dict:
_logger.warning("%s is found as child of binary definition node but is not found in xml", name)
continue
nodeid = ref_desc_list[0].NodeId
......@@ -320,7 +320,8 @@ async def load_enums(server, env=None):
try:
c = await _get_enum_values(name, node)
except ua.UaError as ex:
_logger.warning("Node %s, %s under DataTypes/Enumeration, does not seem to have a child called EnumString or EumValue: %s", name, node, ex)
_logger.warning(f"Node {name}, {node} under DataTypes/Enumeration,"
f" does not seem to have a child called EnumString or EumValue: {ex}")
continue
if not hasattr(ua, c.name):
_logger.warning("Adding enum %s to ua namespace", c)
......
......@@ -54,7 +54,7 @@ class OPCTypeDictionaryBuilder:
if data_type in self._build_in_list:
data_type = 'opc:' + data_type
else:
#data_type = 'tns:' + _to_camel_case(data_type)
# data_type = 'tns:' + _to_camel_case(data_type)
data_type = 'tns:' + data_type
return data_type
......@@ -86,7 +86,7 @@ class OPCTypeDictionaryBuilder:
def append_struct(self, name):
appended_struct = Et.SubElement(self.etree.getroot(), 'opc:StructuredType')
appended_struct.attrib['BaseType'] = 'ua:ExtensionObject'
#appended_struct.attrib['Name'] = _to_camel_case(name)
# appended_struct.attrib['Name'] = _to_camel_case(name)
appended_struct.attrib['Name'] = name
self._structs_dict[name] = appended_struct
return appended_struct
......@@ -160,7 +160,7 @@ class DataTypeDictionaryBuilder:
res = await self._session_server.add_nodes([node])
return res[0].AddedNodeId
logger.warning("Making %s object for node %s which already exist, its data will be overriden", self, node)
#FIXME: we have an issue
# FIXME: we have an issue
return node.nodeid
async def _link_nodes(self, linked_obj_node_id, data_type_node_id, description_node_id):
......@@ -193,7 +193,7 @@ class DataTypeDictionaryBuilder:
await self._session_server.add_references(refs)
async def _create_data_type(self, type_name, nodeid=None, init=True):
#name = _to_camel_case(type_name)
# name = _to_camel_case(type_name)
name = type_name
if nodeid is None:
......
......@@ -17,7 +17,6 @@ def value_to_datavalue(val, varianttype=None):
"""
convert anyting to a DataValue using varianttype
"""
datavalue = None
if isinstance(val, ua.DataValue):
datavalue = val
elif isinstance(val, ua.Variant):
......@@ -256,7 +255,7 @@ async def get_base_data_type(datatype):
if base.nodeid.NamespaceIndex == 0 and isinstance(base.nodeid.Identifier, int) and base.nodeid.Identifier <= 30:
return base
base = await get_node_supertype(base)
raise ua.UaError("Datatype must be a subtype of builtin types {0!s}".format(datatype))
raise ua.UaError(f"Datatype must be a subtype of builtin types {str(datatype)}")
async def get_nodes_of_namespace(server, namespaces=None):
......
......@@ -422,10 +422,11 @@ class XmlExporter:
return val.ua_types.keys()
else:
member_keys = [name for name in XmlExporter.extobj_ordered_elements[dtype] if
name in val.ua_types.keys() and getattr(val, name) is not None]
name in val.ua_types.keys() and getattr(val, name) is not None]
return member_keys
def indent(elem, level=0):
"""
copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
......
......@@ -135,7 +135,8 @@ class XmlImporter:
node = ua.AddNodesItem()
node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
node.BrowseName = self._migrate_ns(obj.browsename)
self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename,
obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
if obj.parent and obj.parentlink:
node.ParentNodeId = self._migrate_ns(obj.parent)
......@@ -240,7 +241,7 @@ class XmlImporter:
for name, uatype in obj.ua_types:
if name == attname:
return uatype
raise UaError("Attribute '{}' defined in xml is not found in object '{}'".format(attname, obj))
raise UaError(f"Attribute '{attname}' defined in xml is not found in object '{obj}'")
def _set_attr(self, obj, attname: str, val):
# tow possible values:
......@@ -292,7 +293,7 @@ class XmlImporter:
if hasattr(ua.ua_binary.Primitives, vtype):
return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
elif vtype == "LocalizedText":
return ua.Variant([getattr(ua, vtype)(text=item["Text"],locale=item["Locale"]) for item in obj.value])
return ua.Variant([getattr(ua, vtype)(text=item["Text"], locale=item["Locale"]) for item in obj.value])
else:
return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
elif obj.valuetype == 'ExtensionObject':
......
......@@ -298,9 +298,9 @@ class XMLParser:
for localized_text in el:
mylist = self._parse_body(localized_text)
# each localized text is in a dictionary with "Locale" and "Text" keys
item = {"Text":None,"Locale":None}
item = {"Text": None, "Locale": None}
for name, val in mylist:
item.update({str(name):val})
item.update({str(name): val})
# value is an array of dictionaries with localized texts
value.append(item)
return value
......
......@@ -2,7 +2,7 @@ import logging
import struct
from abc import ABCMeta, abstractmethod
from ..ua import CryptographyNone, SecurityPolicy, MessageSecurityMode, UaError, uaerrors
from ..ua import CryptographyNone, SecurityPolicy, MessageSecurityMode, UaError
try:
from ..crypto import uacrypto
......@@ -20,7 +20,7 @@ def require_cryptography(obj):
Call this function in constructors.
"""
if not CRYPTOGRAPHY_AVAILABLE:
raise UaError("Can't use {0}, cryptography module is not installed".format(obj.__class__.__name__))
raise UaError(f"Can't use {obj.__class__.__name__}, cryptography module is not installed")
class Signer(object):
......@@ -618,4 +618,4 @@ def encrypt_asymmetric(pubkey, data, policy_uri):
cls.AsymmetricEncryptionURI)
if not policy_uri or policy_uri == POLICY_NONE_URI:
return data, ''
raise UaError("Unsupported security policy `{0}`".format(policy_uri))
raise UaError(f"Unsupported security policy `{policy_uri}`")
......@@ -51,6 +51,7 @@ def sign_sha1(private_key, data):
hashes.SHA1()
)
def sign_sha256(private_key, data):
return private_key.sign(
data,
......@@ -58,6 +59,7 @@ def sign_sha256(private_key, data):
hashes.SHA256()
)
def verify_sha1(certificate, data, signature):
certificate.public_key().verify(
signature,
......@@ -74,6 +76,7 @@ def verify_sha256(certificate, data, signature):
padding.PKCS1v15(),
hashes.SHA256())
def encrypt_basic256(public_key, data):
ciphertext = public_key.encrypt(
data,
......@@ -142,17 +145,21 @@ def hmac_sha1(key, message):
hasher.update(message)
return hasher.finalize()
def hmac_sha256(key, message):
hasher = hmac.HMAC(key, hashes.SHA256(), backend=default_backend())
hasher.update(message)
return hasher.finalize()
def sha1_size():
return hashes.SHA1.digest_size
def sha256_size():
return hashes.SHA256.digest_size
def p_sha1(secret, seed, sizes=()):
"""
Derive one or more keys from secret and seed.
......@@ -175,6 +182,7 @@ def p_sha1(secret, seed, sizes=()):
result = result[size:]
return tuple(parts)
def p_sha256(secret, seed, sizes=()):
"""
Derive one or more keys from secret and seed.
......@@ -197,8 +205,9 @@ def p_sha256(secret, seed, sizes=()):
result = result[size:]
return tuple(parts)
def x509_name_to_string(name):
parts = ["{0}={1}".format(attr.oid._name, attr.value) for attr in name]
parts = [f"{attr.oid._name}={attr.value}" for attr in name]
return ', '.join(parts)
......@@ -209,6 +218,6 @@ def x509_to_string(cert):
if cert.subject == cert.issuer:
issuer = ' (self-signed)'
else:
issuer = ', issuer: {0}'.format(x509_name_to_string(cert.issuer))
issuer = f', issuer: {x509_name_to_string(cert.issuer)}'
# TODO: show more information
return "{0}{1}, {2} - {3}".format(x509_name_to_string(cert.subject), issuer, cert.not_valid_before, cert.not_valid_after)
return f"{x509_name_to_string(cert.subject)}{issuer}, {cert.not_valid_before} - {cert.not_valid_after}"
......@@ -47,14 +47,14 @@ class AttributeService:
self._aspace: "AddressSpace" = aspace
def read(self, params):
#self.logger.debug("read %s", params)
# self.logger.debug("read %s", params)
res = []
for readvalue in params.NodesToRead:
res.append(self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId))
return res
async def write(self, params, user=User(role=UserRole.Admin)):
#self.logger.debug("write %s as user %s", params, user)
# self.logger.debug("write %s as user %s", params, user)
res = []
for writevalue in params.NodesToWrite:
if user.role != UserRole.Admin:
......@@ -64,11 +64,12 @@ class AttributeService:
al = self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
ual = self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(
ual.Value.Value, ua.AccessLevel.CurrentWrite):
al.Value.Value, ua.AccessLevel.CurrentWrite) or not \
ua.ua_binary.test_bit(ual.Value.Value, ua.AccessLevel.CurrentWrite):
res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
continue
res.append(await self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
res.append(
await self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
return res
......@@ -79,7 +80,7 @@ class ViewService(object):
self._aspace: "AddressSpace" = aspace
def browse(self, params):
#self.logger.debug("browse %s", params)
# self.logger.debug("browse %s", params)
res = []
for desc in params.NodesToBrowse:
res.append(self._browse(desc))
......@@ -99,15 +100,15 @@ class ViewService(object):
def _is_suitable_ref(self, desc, ref):
if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
#self.logger.debug("%s is not suitable due to direction", ref)
# self.logger.debug("%s is not suitable due to direction", ref)
return False
if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
#self.logger.debug("%s is not suitable due to type", ref)
# self.logger.debug("%s is not suitable due to type", ref)
return False
if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
#self.logger.debug("%s is not suitable due to class", ref)
# self.logger.debug("%s is not suitable due to class", ref)
return False
#self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
# self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
return True
def _suitable_reftype(self, ref1, ref2, subtypes):
......@@ -146,14 +147,14 @@ class ViewService(object):
return False
def translate_browsepaths_to_nodeids(self, browsepaths):
#self.logger.debug("translate browsepath: %s", browsepaths)
# self.logger.debug("translate browsepath: %s", browsepaths)
results = []
for path in browsepaths:
results.append(self._translate_browsepath_to_nodeid(path))
return results
def _translate_browsepath_to_nodeid(self, path):
#self.logger.debug("looking at path: %s", path)
# self.logger.debug("looking at path: %s", path)
res = ua.BrowsePathResult()
if not path.RelativePath.Elements[-1].TargetName:
# OPC UA Part 4: Services, 5.8.4 TranslateBrowsePathsToNodeIds
......@@ -212,7 +213,7 @@ class NodeManagementService:
yield item
def _add_node(self, item, user, check=True):
#self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
# self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
result = ua.AddNodesResult()
if not user.role == UserRole.Admin:
......@@ -223,7 +224,7 @@ class NodeManagementService:
# If Identifier of requested NodeId is null we generate a new NodeId using
# the namespace of the nodeid, this is an extention of the spec to allow
# to requests the server to generate a new nodeid in a specified namespace
#self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
# self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
else:
if item.RequestedNewNodeId in self._aspace:
......@@ -241,7 +242,7 @@ class NodeManagementService:
parentdata = self._aspace.get(item.ParentNodeId)
if parentdata is None and not item.ParentNodeId.is_null():
self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
item.RequestedNewNodeId, item.ParentNodeId)
item.RequestedNewNodeId, item.ParentNodeId)
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
return result
......@@ -353,7 +354,7 @@ class NodeManagementService:
self._aspace.delete_datachange_callback(handle)
except Exception as ex:
self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
ua.AttributeIds.Value, ex)
ua.AttributeIds.Value, ex)
def add_references(self, refs, user=User(role=UserRole.Admin)):
result = []
......
......@@ -31,7 +31,7 @@ class OPCUAProtocol(asyncio.Protocol):
self._task = None
def __str__(self):
return 'OPCUAProtocol({}, {})'.format(self.peer_name, self.processor.session)
return f'OPCUAProtocol({self.peer_name}, {self.processor.session})'
__repr__ = __str__
......
......@@ -70,8 +70,8 @@ class EventGenerator:
self.emitting_node = emitting_node
def __str__(self):
return "EventGenerator(Type:{0}, Emitting Node:{1}, Time:{2}, Message: {3})".format(
self.event.EventType, self.emitting_node, self.event.Time, self.event.Message)
return f"EventGenerator(Type:{self.event.EventType}, Emitting Node:{self.emitting_node}, " \
f"Time:{self.event.Time}, Message: {self.event.Message})"
__repr__ = __str__
......
......@@ -234,7 +234,7 @@ class HistoryManager:
if not self._sub:
self._sub = await self._create_subscription(SubHandler(self.storage, self.iserver.loop))
if node in self._handlers:
raise ua.UaError("Node {0} is already historized".format(node))
raise ua.UaError(f"Node {node} is already historized")
await self.storage.new_historized_node(node.nodeid, period, count)
handler = await self._sub.subscribe_data_change(node)
self._handlers[node] = handler
......@@ -255,7 +255,7 @@ class HistoryManager:
if not self._sub:
self._sub = await self._create_subscription(SubHandler(self.storage, self.iserver.loop))
if source in self._handlers:
raise ua.UaError("Events from {0} are already historized".format(source))
raise ua.UaError(f"Events from {source} are already historized")
# get list of all event types that the source node generates; change this to only historize specific events
event_types = await source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
......
......@@ -155,7 +155,7 @@ class HistorySQLite(HistoryStorageInterface):
await self._db.execute(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
await self._db.commit()
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Delete Old Data Error for events from %s: %s", event.emitting_node, e)
self.logger.error(f"Historizing SQL Delete Old Data Error for events from {event.emitting_node}: {e}")
async def read_event_history(self, source_id, start, end, nb_values, evfilter):
table = self._get_table_name(source_id)
......
......@@ -162,7 +162,8 @@ class InternalServer:
self.logger.info('starting internal server')
for edp in self.endpoints:
self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).write_value(ua.ServerState.Running, ua.VariantType.Int32)
await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).write_value(ua.ServerState.Running,
ua.VariantType.Int32)
await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).write_value(datetime.utcnow())
if not self.disabled_clock:
self._set_current_time()
......
......@@ -45,7 +45,8 @@ class InternalSession:
self.logger.info('Created internal session %s', self.name)
def __str__(self):
return f'InternalSession(name:{self.name}, user:{self.user}, id:{self.session_id}, auth_token:{self.auth_token})'
return f'InternalSession(name:{self.name},' \
f' user:{self.user}, id:{self.session_id}, auth_token:{self.auth_token})'
async def get_endpoints(self, params=None, sockname=None):
return await self.iserver.get_endpoints(params, sockname)
......@@ -171,7 +172,7 @@ class InternalSession:
# This is an async method, dues to symmetry with client code
subscription_result = self.subscription_service.delete_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionDeleted,
ServerItemCallback(params, subscription_result))
ServerItemCallback(params, subscription_result))
return subscription_result
def publish(self, acks: Optional[Iterable[ua.SubscriptionAcknowledgement]] = None):
......
......@@ -86,7 +86,7 @@ class InternalSubscription:
return True
if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
return True
self._keep_alive_count += 1
return False
......@@ -97,7 +97,7 @@ class InternalSubscription:
"""
if self._publish_cycles_count > self.data.RevisedLifetimeCount:
self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
self._publish_cycles_count, self.data.RevisedLifetimeCount)
self._publish_cycles_count, self.data.RevisedLifetimeCount)
# FIXME this will never be send since we do not have publish request anyway
await self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
result = None
......@@ -106,7 +106,7 @@ class InternalSubscription:
self._publish_cycles_count += 1
result = self._pop_publish_result()
if result is not None:
#self.logger.info('publish_results for %s', self.data.SubscriptionId)
# self.logger.info('publish_results for %s', self.data.SubscriptionId)
# The callback can be:
# Subscription.publish_callback -> server internal subscription
# UaProcessor.forward_publish_response -> client subscription
......@@ -139,7 +139,7 @@ class InternalSubscription:
notif = ua.DataChangeNotification()
notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
self._triggered_datachanges = {}
#self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
# self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
result.NotificationMessage.NotificationData.append(notif)
def _pop_triggered_events(self, result: ua.PublishResult):
......@@ -149,7 +149,7 @@ class InternalSubscription:
notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
self._triggered_events = {}
result.NotificationMessage.NotificationData.append(notif)
#self.logger.debug("sending event notification with %s events", len(notif.Events))
# self.logger.debug("sending event notification with %s events", len(notif.Events))
def _pop_triggered_statuschanges(self, result: ua.PublishResult):
"""Append all enqueued status changes to the given `PublishResult` and clear the queue."""
......@@ -157,20 +157,20 @@ class InternalSubscription:
notif = ua.StatusChangeNotification()
notif.Status = self._triggered_statuschanges.pop(0)
result.NotificationMessage.NotificationData.append(notif)
#self.logger.debug("sending event notification %s", notif.Status)
# self.logger.debug("sending event notification %s", notif.Status)
def publish(self, acks: Iterable[int]):
"""
Reset publish cycle count, acknowledge PublishResults.
:param acks: Sequence number of the PublishResults to acknowledge
"""
#self.logger.info("publish request with acks %s", acks)
# self.logger.info("publish request with acks %s", acks)
self._publish_cycles_count = 0
for nb in acks:
self._not_acknowledged_results.pop(nb, None)
def republish(self, nb):
#self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
# self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
result = self._not_acknowledged_results.pop(nb, None)
if result:
self.logger.info("re-publishing ack %s in subscription %s", nb, self)
......@@ -204,8 +204,8 @@ class InternalSubscription:
self._triggered_statuschanges.append(code)
await self._trigger_publish()
async def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int,
queue: dict):
async def _enqueue_event(self, mid: int,
eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int, queue: dict):
if mid not in queue:
# New Monitored Item Id
queue[mid] = [eventdata]
......
......@@ -116,7 +116,7 @@ class MonitoredItemService:
result, mdata = self._make_monitored_item_common(params)
ev_notify_byte = self.aspace.read_attribute_value(params.ItemToMonitor.NodeId,
ua.AttributeIds.EventNotifier).Value.Value
ua.AttributeIds.EventNotifier).Value.Value
if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
......@@ -178,7 +178,8 @@ class MonitoredItemService:
handle, error)
await self.trigger_statuschange(error)
else:
#self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self, handle, value.Value)
# self.logger.info(f"subscription {self}: datachange callback called "
# f"with handle '{handle}' and value '{value.Value}'")
event = ua.MonitoredItemNotification()
mid = self._monitored_datachange[handle]
mdata = self._monitored_items[mid]
......
......@@ -107,7 +107,8 @@ class Server:
await self.set_build_info(self.product_uri, self.manufacturer_name, self.name, "1.0pre", "0", datetime.now())
async def set_build_info(self, product_uri, manufacturer_name, product_name, software_version, build_number, build_date):
async def set_build_info(self, product_uri, manufacturer_name, product_name, software_version,
build_number, build_date):
status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
......@@ -452,7 +453,8 @@ class Server:
await ev_gen.init(etype, emitting_node=emitting_node)
return ev_gen
async def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None, description=None) -> Coroutine:
async def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType,
properties=None, description=None) -> Coroutine:
if properties is None:
properties = []
base_t = _get_node(self.iserver.isession, basetype)
......@@ -462,10 +464,12 @@ class Server:
datatype = None
if len(prop) > 2:
datatype = prop[2]
await custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
await custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]),
varianttype=prop[1], datatype=datatype)
return custom_t
async def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
async def create_custom_event_type(self, idx, name,
basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
if properties is None:
properties = []
return await self._create_custom_type(idx, name, basetype, properties, [], [])
......@@ -542,7 +546,8 @@ class Server:
Export nodes of one or more namespaces to an XML file.
Namespaces used by nodes are always exported for consistency.
:param path: name of the xml file to write
:param namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
:param namespaces: list of string uris or int indexes of the namespace to export,
if not provide all ns are used except 0
"""
if namespaces is None:
namespaces = []
......
This diff is collapsed.
......@@ -60,7 +60,7 @@ class ThreadLoop(Thread):
def syncmethod(func):
def wrapper(self, *args, **kwargs):
args = list(args) #FIXME: might be very inefficient...
args = list(args) # FIXME: might be very inefficient...
for idx, arg in enumerate(args):
if isinstance(arg, Node):
args[idx] = arg.aio_obj
......@@ -385,6 +385,7 @@ class Node:
def read_attributes(self):
pass
class Subscription:
def __init__(self, tloop, sub):
self.tloop = tloop
......
......@@ -83,16 +83,16 @@ def _require_nodeid(parser, args):
# check that a nodeid has been given explicitly, a bit hackish...
if args.nodeid == "i=84" and args.path == "":
parser.print_usage()
print("{0}: error: A NodeId or BrowsePath is required".format(parser.prog))
print(f"{parser.prog}: error: A NodeId or BrowsePath is required")
sys.exit(1)
def parse_args(parser, requirenodeid=False):
args = parser.parse_args()
#logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
# logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
logging.basicConfig(level=getattr(logging, args.loglevel))
if args.url and '://' not in args.url:
logging.info("Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
logging.info(f"Adding default scheme {ua.OPC_TCP_SCHEME} to URL {args.url}")
args.url = ua.OPC_TCP_SCHEME + '://' + args.url
if requirenodeid:
_require_nodeid(parser, args)
......@@ -195,16 +195,16 @@ def _val_to_variant(val, args):
return _arg_to_variant(val, array, int, ua.VariantType.SByte)
elif args.datatype == "byte":
return _arg_to_variant(val, array, int, ua.VariantType.Byte)
#elif args.datatype == "uint8":
#return _arg_to_variant(val, array, int, ua.VariantType.Byte)
# elif args.datatype == "uint8":
# return _arg_to_variant(val, array, int, ua.VariantType.Byte)
elif args.datatype == "uint16":
return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
elif args.datatype == "uint32":
return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
elif args.datatype == "uint64":
return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
#elif args.datatype == "int8":
#return ua.Variant(int(val), ua.VariantType.Int8)
# elif args.datatype == "int8":
# return ua.Variant(int(val), ua.VariantType.Int8)
elif args.datatype == "int16":
return _arg_to_variant(val, array, int, ua.VariantType.Int16)
elif args.datatype == "int32":
......@@ -315,7 +315,7 @@ async def _uals():
await client.connect()
try:
node = await get_node(client, args)
print("Browsing node {0} at {1}\n".format(node, args.url))
print(f"Browsing node {node} at {args.url}\n")
if args.long_format == 0:
await _lsprint_0(node, args.depth - 1)
elif args.long_format == 1:
......@@ -426,8 +426,7 @@ async def _uasubscribe():
def application_to_strings(app):
result = []
result.append(('Application URI', app.ApplicationUri))
result = [('Application URI', app.ApplicationUri)]
optionals = [
('Product URI', app.ProductUri),
('Application Name', app.ApplicationName.to_string()),
......@@ -449,7 +448,7 @@ def cert_to_string(der):
try:
from .crypto import uacrypto
except ImportError:
return "{0} bytes".format(len(der))
return f"{len(der)} bytes"
cert = uacrypto.x509_from_der(der)
return uacrypto.x509_to_string(cert)
......@@ -602,37 +601,37 @@ async def _uadiscover():
"--network",
action="store_true",
help="Also send a FindServersOnNetwork request to server")
#parser.add_argument("-s",
#"--servers",
#action="store_false",
#help="send a FindServers request to server")
#parser.add_argument("-e",
#"--endpoints",
#action="store_false",
#help="send a GetEndpoints request to server")
# parser.add_argument("-s",
# "--servers",
# action="store_false",
# help="send a FindServers request to server")
# parser.add_argument("-e",
# "--endpoints",
# action="store_false",
# help="send a GetEndpoints request to server")
args = parse_args(parser)
client = Client(args.url, timeout=args.timeout)
if args.network:
print("Performing discovery at {0}\n".format(args.url))
print(f"Performing discovery at {args.url}\n")
for i, server in enumerate(await client.connect_and_find_servers_on_network(), start=1):
print('Server {0}:'.format(i))
#for (n, v) in application_to_strings(server):
#print(' {}: {}'.format(n, v))
print(f'Server {i}:')
# for (n, v) in application_to_strings(server):
# print(' {}: {}'.format(n, v))
print('')
print("Performing discovery at {0}\n".format(args.url))
print(f"Performing discovery at {args.url}\n")
for i, server in enumerate(await client.connect_and_find_servers(), start=1):
print('Server {0}:'.format(i))
print(f'Server {i}:')
for (n, v) in application_to_strings(server):
print(' {0}: {1}'.format(n, v))
print(f' {n}: {v}')
print('')
for i, ep in enumerate(await client.connect_and_get_server_endpoints(), start=1):
print('Endpoint {0}:'.format(i))
print(f'Endpoint {i}:')
for (n, v) in endpoint_to_strings(ep):
print(' {0}: {1}'.format(n, v))
print(f' {n}: {v}')
print('')
sys.exit(0)
......@@ -689,7 +688,7 @@ async def _uahistoryread():
node = await get_node(client, args)
starttime = str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
endtime = str_to_datetime(args.endtime, datetime.utcnow())
print("Reading raw history of node {0} at {1}; start at {2}, end at {3}\n".format(node, args.url, starttime, endtime))
print(f"Reading raw history of node {node} at {args.url}; start at {starttime}, end at {endtime}\n")
if args.events:
evs = await node.read_event_history(starttime, endtime, numvalues=args.limit)
for ev in evs:
......
......@@ -5,4 +5,4 @@ from .object_ids import ObjectIdNames
from .status_codes import StatusCodes
from .uaprotocol_auto import *
from .uaprotocol_hand import *
from .uatypes import * #TODO: This should be renamed to uatypes_hand
from .uatypes import * # TODO: This should be renamed to uatypes_hand
......@@ -2,7 +2,6 @@
Binary protocol specific functions and constants
"""
import sys
import struct
import logging
import uuid
......@@ -130,7 +129,7 @@ class _Primitive1(object):
return Primitives.Int32.pack(-1)
if not isinstance(data, list):
logger.warning(f'ua_binary.py > _Primitive1 > pack_array > data: {data} is not a instance of "list"!')
return Primitives.Int32.pack(-1) #to prevent crashing while runtime
return Primitives.Int32.pack(-1) # to prevent crashing while runtime
size_data = Primitives.Int32.pack(len(data))
return size_data + struct.pack(self._fmt.format(len(data)), *data)
......@@ -255,7 +254,7 @@ def to_binary(uatype, val):
Pack a python object to binary given a string defining its type
"""
if uatype.startswith('ListOf'):
#if isinstance(val, (list, tuple)):
# if isinstance(val, (list, tuple)):
return list_to_binary(uatype[6:], val)
elif type(uatype) is str and hasattr(ua.VariantType, uatype):
vtype = getattr(ua.VariantType, uatype)
......
......@@ -59,7 +59,8 @@ class Header(uatypes.FrozenClass):
return struct.calcsize("<3scII")
def __str__(self):
return f'Header(type:{self.MessageType}, chunk_type:{self.ChunkType}, body_size:{self.body_size}, channel:{self.ChannelId})'
return f'Header(type:{self.MessageType}, chunk_type:{self.ChunkType}, body_size:{self.body_size},' \
f' channel:{self.ChannelId})'
__repr__ = __str__
......@@ -112,7 +113,8 @@ class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
def __str__(self):
size1 = len(self.SenderCertificate) if self.SenderCertificate is not None else None
size2 = len(self.ReceiverCertificateThumbPrint) if self.ReceiverCertificateThumbPrint is not None else None
return f'{self.__class__.__name__}(SecurityPolicy:{self.SecurityPolicyURI}, certificatesize:{size2}, receiverCertificatesize:{size2} )'
return f'{self.__class__.__name__}(SecurityPolicy:{self.SecurityPolicyURI},' \
f' certificatesize:{size2}, receiverCertificatesize:{size2} )'
__repr__ = __str__
......@@ -344,11 +346,11 @@ class Argument(auto.Argument):
class XmlElement(FrozenClass):
'''
"""
An XML element encoded as a UTF-8 string.
:ivar Value:
:vartype Value: String
'''
"""
ua_types = [
('Value', 'String'),
......
......@@ -68,7 +68,8 @@ class _FrozenClass(object):
def __setattr__(self, key, value):
if self._freeze and not hasattr(self, key):
raise TypeError(
f"Error adding member '{key}' to class '{self.__class__.__name__}', class is frozen, members are {self.__dict__.keys()}"
f"Error adding member '{key}' to class '{self.__class__.__name__}',"
f" class is frozen, members are {self.__dict__.keys()}"
)
object.__setattr__(self, key, value)
......@@ -270,7 +271,8 @@ class NodeId(object):
Args:
identifier: The identifier might be an int, a string, bytes or a Guid
namespaceidx(int): The index of the namespace
nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something
special like twobyte nodeid or fourbytenodeid
:ivar Identifier:
......@@ -387,7 +389,7 @@ class NodeId(object):
def to_string(self):
string = []
if self.NamespaceIndex != 0:
string.append("ns={0}".format(self.NamespaceIndex))
string.append(f"ns={self.NamespaceIndex}")
ntype = None
if self.NodeIdType == NodeIdType.Numeric:
ntype = "i"
......@@ -401,11 +403,11 @@ class NodeId(object):
ntype = "g"
elif self.NodeIdType == NodeIdType.ByteString:
ntype = "b"
string.append("{0}={1}".format(ntype, self.Identifier))
string.append(f"{ntype}={self.Identifier}")
if self.ServerIndex:
string.append("srv={}".format(self.ServerIndex))
string.append(f"srv={self.ServerIndex}")
if self.NamespaceUri:
string.append("nsu={0}".format(self.NamespaceUri))
string.append(f"nsu={self.NamespaceUri}")
return ";".join(string)
def __str__(self):
......@@ -469,7 +471,7 @@ class QualifiedName(FrozenClass):
self._freeze = True
def to_string(self):
return "{0}:{1}".format(self.NamespaceIndex, self.Name)
return f"{self.NamespaceIndex}:{self.Name}"
@staticmethod
def from_string(string):
......@@ -542,7 +544,7 @@ class LocalizedText(FrozenClass):
@Text.setter
def Text(self, text):
if not isinstance(text, str):
raise ValueError("A LocalizedText object takes a string as argument \"text\", not a {}, {}".format(type(text), text))
raise ValueError(f"A LocalizedText object takes a string as argument \"text\", not a {type(text)}, {text}")
self._text = text
if self._text:
self.Encoding |= (1 << 1)
......@@ -550,10 +552,11 @@ class LocalizedText(FrozenClass):
@Locale.setter
def Locale(self, locale):
if not isinstance(locale, str):
raise ValueError("A LocalizedText object takes a string as argument \"locale\", not a {}, {}".format(type(locale), locale))
raise ValueError(f"A LocalizedText object takes a string as argument \"locale\","
f" not a {type(locale)}, {locale}")
self._locale = locale
if self._locale:
self.Encoding |= (1)
self.Encoding |= 1
def to_string(self):
if self.Text is None:
......@@ -566,9 +569,9 @@ class LocalizedText(FrozenClass):
def from_string(string):
m = re.match("^LocalizedText\(Encoding:(.*), Locale:(.*), Text:(.*)\)$", string)
if m:
text = m.group(3) if m.group(3)!=str(None) else None
locale = m.group(2) if m.group(2)!=str(None) else None
return LocalizedText(text=text,locale=locale)
text = m.group(3) if m.group(3) != str(None) else None
locale = m.group(2) if m.group(2) != str(None) else None
return LocalizedText(text=text, locale=locale)
else:
return LocalizedText(string)
......@@ -801,8 +804,7 @@ def _split_list(l, n):
def flatten_and_get_shape(mylist):
dims = []
dims.append(len(mylist))
dims = [len(mylist)]
while isinstance(mylist[0], (list, tuple)):
dims.append(len(mylist[0]))
mylist = [item for sublist in mylist for item in sublist]
......@@ -942,7 +944,7 @@ def get_default_value(vtype):
elif vtype == VariantType.Guid:
return uuid.uuid4()
elif vtype == VariantType.XmlElement:
return None #Not sure this is correct
return None # Not sure this is correct
elif vtype == VariantType.NodeId:
return NodeId()
elif vtype == VariantType.ExpandedNodeId:
......
......@@ -35,9 +35,9 @@ if __name__ == "__main__":
tag1 = client.get_node("ns=2;s=Channel1.Device1.Tag1")
print("tag1 is: {0} with value {1} ".format(tag1, tag1.read_value()))
print(f"tag1 is: {tag1} with value {tag1.read_value()} ")
tag2 = client.get_node("ns=2;s=Channel1.Device1.Tag2")
print("tag2 is: {0} with value {1} ".format(tag2, tag2.read_value()))
print(f"tag2 is: {tag2} with value {tag2.read_value()} ")
handler = SubHandler()
sub = client.create_subscription(500, handler)
......
......@@ -26,7 +26,7 @@ def create_monitored_items(event, dispatcher):
for idx in range(len(event.response_params)) :
if (event.response_params[idx].StatusCode.is_good()) :
nodeId = event.request_params.ItemsToCreate[idx].ItemToMonitor.NodeId
print("Node {0} was created".format(nodeId))
print(f"Node {nodeId} was created")
def modify_monitored_items(event, dispatcher):
......
......@@ -64,8 +64,8 @@ if __name__ == "__main__":
idx = client.get_namespace_index(uri)
# Now getting a variable node using its browse path
myvar = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx), "{}:MyVariable".format(idx)])
obj = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx)])
myvar = client.nodes.root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])
obj = client.nodes.root.get_child(["0:Objects", f"{idx}:MyObject"])
print("myvar is: ", myvar)
# subscribing to a variable node
......@@ -80,7 +80,7 @@ if __name__ == "__main__":
# sub.delete()
# calling a method on server
res = obj.call_method("{}:multiply".format(idx), 3, "klk")
res = obj.call_method(f"{idx}:multiply", 3, "klk")
print("method result is: ", res)
embed()
......@@ -9,7 +9,7 @@ def bump_version():
v1, v2, v3 = m.groups()
oldv = "{0}.{1}.{2}".format(v1, v2, v3)
newv = "{0}.{1}.{2}".format(v1, v2, str(int(v3) + 1))
print("Current version is: {0}, write new version, ctrl-c to exit".format(oldv))
print(f"Current version is: {oldv}, write new version, ctrl-c to exit")
ans = input(newv)
if ans:
newv = ans
......@@ -25,7 +25,7 @@ def release():
if ans in ("", "y", "yes"):
os.system("git add setup.py")
os.system("git commit -m 'new release'")
os.system("git tag {0}".format(v))
os.system(f"git tag {v}")
ans = input("change committed, push to server?(Y/n)")
if ans in ("", "y", "yes"):
os.system("git push")
......
......@@ -25,7 +25,7 @@ def extensionobject_from_binary(data):
return e
klass = ExtensionClasses[TypeId.Identifier]
if body is None:
raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
raise UaError(f"parsing ExtensionObject {klass.__name__} without data")
return klass.from_binary(body)
......@@ -41,7 +41,7 @@ def extensionobject_to_binary(obj):
Encoding = 0
Body = None
if obj is not None:
TypeId = FourByteNodeId(getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__)))
TypeId = FourByteNodeId(getattr(ObjectIds, f"{obj.__class__.__name__}_Encoding_DefaultBinary"))
Encoding |= (1 << 0)
Body = obj.to_binary()
packet = []
......
......@@ -291,7 +291,7 @@ async def test_subscription_data_change_many(opc):
elif node == v2:
assert val == startv2
else:
raise RuntimeError("Error node {0} is neither {1} nor {2}".format(node, v1, v2))
raise RuntimeError(f"Error node {node} is neither {v1} nor {v2}")
await sub.delete()
......
......@@ -233,7 +233,7 @@ def test_string_to_variant_localized_text():
def test_string_to_variant_localized_text_with_locale():
locale = "cs-CZ"
string = "Moje jméno"
string_repr = "LocalizedText(Encoding:3, Locale:{}, Text:{})".format(locale, string)
string_repr = f"LocalizedText(Encoding:3, Locale:{locale}, Text:{string})"
obj = ua.LocalizedText(string, locale)
assert obj == string_to_val(string_repr, ua.VariantType.LocalizedText)
assert string_repr == val_to_string(obj)
......@@ -242,7 +242,7 @@ def test_string_to_variant_localized_text_with_locale():
def test_string_to_variant_localized_text_with_none1():
locale = "en-US"
string = ""
string_repr = "LocalizedText(Encoding:1, Locale:{}, Text:{})".format(locale, string)
string_repr = f"LocalizedText(Encoding:1, Locale:{locale}, Text:{string})"
obj = ua.LocalizedText(string, locale)
obj2 = ua.LocalizedText(string)
assert obj == string_to_val(string_repr, ua.VariantType.LocalizedText)
......@@ -253,7 +253,7 @@ def test_string_to_variant_localized_text_with_none1():
def test_string_to_variant_localized_text_with_none2():
locale = None
string = "my name is ..."
string_repr = "LocalizedText(Encoding:2, Locale:{}, Text:{})".format(locale, string)
string_repr = f"LocalizedText(Encoding:2, Locale:{locale}, Text:{string})"
obj = ua.LocalizedText(string, locale)
assert obj == string_to_val(string_repr, ua.VariantType.LocalizedText)
assert obj == string_to_val(string, ua.VariantType.LocalizedText)
......
......@@ -136,7 +136,7 @@ async def test_xml_ns(opc, tmpdir):
vnew = await onew.add_variable(new_ns, "xmlns_new_var", 9.99)
o_no_export = await opc.opc.nodes.objects.add_object(ref_ns, "xmlns_parent")
v_no_parent = await o_no_export.add_variable(new_ns, "xmlns_new_var_no_parent", 9.99)
o_bname = await onew.add_object("ns={0};i=4000".format(new_ns), "{0}:BNAME".format(bname_ns))
o_bname = await onew.add_object(f"ns={new_ns};i=4000", f"{bname_ns}:BNAME")
nodes = [o, o50, o200, onew, vnew, v_no_parent, o_bname]
tmp_path = tmpdir.join("tmp_test_export-ns.xml").strpath
await opc.opc.export_xml(nodes, tmp_path)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment