Commit ce83f6cf authored by Christian Bergmiller's avatar Christian Bergmiller

[ADD] replaced code (sync with async)

parent 7906e1ef
......@@ -2,28 +2,31 @@
import asyncio
import logging
from opcua.client.async_client import AsyncClient
from opcua.client.client import Client
from opcua import ua
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('opcua')
OBJECTS_AND_VARIABLES = ua.NodeClass.Object | ua.NodeClass.Variable
async def browse_nodes(node, level=0):
node_class = node.get_node_class()
node_class = await node.get_node_class()
children = []
for child in await node.get_children(nodeclassmask=OBJECTS_AND_VARIABLES):
children.append(await browse_nodes(child, level=level + 1))
return {
'id': node.nodeid.to_string(),
'name': node.get_display_name().Text.decode('utf8'),
'name': (await node.get_display_name()).Text,
'cls': node_class.value,
'children': [
browse_nodes(child, level=level + 1) for child in node.get_children(nodeclassmask=objects_and_variables)
],
'type': node.get_data_type_as_variant_type().value if node_class == ua.NodeClass.Variable else None,
'children': children,
'type': (await node.get_data_type_as_variant_type()).value if node_class == ua.NodeClass.Variable else None,
}
async def task(loop):
try:
client = AsyncClient(url='opc.tcp://commsvr.com:51234/UA/CAS_UA_Server')
client = Client(url='opc.tcp://commsvr.com:51234/UA/CAS_UA_Server')
await client.connect()
obj_node = client.get_objects_node()
_logger.info('Objects Node: %r', obj_node)
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import asyncio
import logging
from opcua.common.connection import SecureConnection
from opcua.ua.async_ua_binary import header_from_binary
from opcua import ua
logger = logging.getLogger('opcua.uaprotocol')
class AsyncSecureConnection(SecureConnection):
"""
Async version of SecureConnection
"""
async def receive_from_socket(self, protocol):
"""
Convert binary stream to OPC UA TCP message (see OPC UA
specs Part 6, 7.1: Hello, Acknowledge or ErrorMessage), or a Message
object, or None (if intermediate chunk is received)
"""
logger.debug("Waiting for header")
header = await header_from_binary(protocol)
logger.info("received header: %s", header)
body = await protocol.read(header.body_size)
if len(body) != header.body_size:
# ToDo: should never happen since UASocketProtocol.read() waits until `size` bytes are received. Remove?
raise ua.UaError("{0} bytes expected, {1} available".format(header.body_size, len(body)))
return self.receive_from_header_and_body(header, ua.utils.Buffer(body))
......@@ -27,8 +27,8 @@ class MessageChunk(ua.FrozenClass):
self.security_policy = security_policy
@staticmethod
def from_binary(security_policy, data):
h = header_from_binary(data)
async def from_binary(security_policy, data):
h = await header_from_binary(data)
return MessageChunk.from_header_and_body(security_policy, h, data)
@staticmethod
......@@ -190,11 +190,9 @@ class SecureConnection(object):
if policy.matches(uri, mode):
self.security_policy = policy.create(peer_certificate)
return
if self.security_policy.URI != uri or (mode is not None and
self.security_policy.Mode != mode):
if self.security_policy.URI != uri or (mode is not None and self.security_policy.Mode != mode):
raise ua.UaError("No matching policy: {0}, {1}".format(uri, mode))
def message_to_binary(self, message, message_type=ua.MessageType.SecureMessage, request_id=0, algohdr=None):
"""
Convert OPC UA secure message to binary.
......@@ -219,7 +217,6 @@ class SecureConnection(object):
chunk.SequenceHeader.SequenceNumber = self._sequence_number
return b"".join([chunk.to_binary() for chunk in chunks])
def _check_incoming_chunk(self, chunk):
assert isinstance(chunk, MessageChunk), "Expected chunk, got: {0}".format(chunk)
if chunk.MessageHeader.MessageType != ua.MessageType.SecureOpen:
......@@ -229,13 +226,14 @@ class SecureConnection(object):
self.channel.SecurityToken.ChannelId))
if chunk.SecurityHeader.TokenId != self.channel.SecurityToken.TokenId:
if chunk.SecurityHeader.TokenId not in self._old_tokens:
logger.warning("Received a chunk with wrong token id %s, expected %s", chunk.SecurityHeader.TokenId, self.channel.SecurityToken.TokenId)
logger.warning(
"Received a chunk with wrong token id %s, expected %s",
chunk.SecurityHeader.TokenId, self.channel.SecurityToken.TokenId
)
#raise UaError("Wrong token id {}, expected {}, old tokens are {}".format(
#chunk.SecurityHeader.TokenId,
#self.channel.SecurityToken.TokenId,
#self._old_tokens))
else:
# Do some cleanup, spec says we can remove old tokens when new one are used
idx = self._old_tokens.index(chunk.SecurityHeader.TokenId)
......@@ -294,17 +292,18 @@ class SecureConnection(object):
else:
raise ua.UaError("Unsupported message type {0}".format(header.MessageType))
def receive_from_socket(self, socket):
async def receive_from_socket(self, protocol):
"""
Convert binary stream to OPC UA TCP message (see OPC UA
specs Part 6, 7.1: Hello, Acknowledge or ErrorMessage), or a Message
object, or None (if intermediate chunk is received)
"""
logger.debug("Waiting for header")
header = header_from_binary(socket)
logger.info("received header: %s", header)
body = socket.read(header.body_size)
header = await header_from_binary(protocol)
logger.debug("Received header: %s", header)
body = await protocol.read(header.body_size)
if len(body) != header.body_size:
# ToDo: should never happen since UASocketProtocol.read() waits until `size` bytes are received. Remove?
raise ua.UaError("{0} bytes expected, {1} available".format(header.body_size, len(body)))
return self.receive_from_header_and_body(header, ua.utils.Buffer(body))
......@@ -326,5 +325,3 @@ class SecureConnection(object):
return message
else:
raise ua.UaError("Unsupported chunk type: {0}".format(msg))
This diff is collapsed.
......@@ -117,28 +117,28 @@ def string_to_variant(string, vtype):
return ua.Variant(string_to_val(string, vtype), vtype)
def get_node_children(node, nodes=None):
async def get_node_children(node, nodes=None):
"""
Get recursively all children of a node
"""
if nodes is None:
nodes = [node]
for child in node.get_children():
for child in await node.get_children():
nodes.append(child)
get_node_children(child, nodes)
await get_node_children(child, nodes)
return nodes
def get_node_subtypes(node, nodes=None):
async def get_node_subtypes(node, nodes=None):
if nodes is None:
nodes = [node]
for child in node.get_children(refs=ua.ObjectIds.HasSubtype):
for child in await node.get_children(refs=ua.ObjectIds.HasSubtype):
nodes.append(child)
get_node_subtypes(child, nodes)
await get_node_subtypes(child, nodes)
return nodes
def get_node_supertypes(node, includeitself=False, skipbase=True):
async def get_node_supertypes(node, includeitself=False, skipbase=True):
"""
return get all subtype parents of node recursive
:param node: can be a ua.Node or ua.NodeId
......@@ -149,47 +149,46 @@ def get_node_supertypes(node, includeitself=False, skipbase=True):
parents = []
if includeitself:
parents.append(node)
parents.extend(_get_node_supertypes(node))
parents.extend(await _get_node_supertypes(node))
if skipbase and len(parents) > 1:
parents = parents[:-1]
return parents
def _get_node_supertypes(node):
async def _get_node_supertypes(node):
"""
recursive implementation of get_node_derived_from_types
"""
basetypes = []
parent = get_node_supertype(node)
parent = await get_node_supertype(node)
if parent:
basetypes.append(parent)
basetypes.extend(_get_node_supertypes(parent))
basetypes.extend(await _get_node_supertypes(parent))
return basetypes
def get_node_supertype(node):
async def get_node_supertype(node):
"""
return node supertype or None
"""
supertypes = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype,
direction=ua.BrowseDirection.Inverse,
includesubtypes=True)
supertypes = await node.get_referenced_nodes(
refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True
)
if supertypes:
return supertypes[0]
else:
return None
def is_child_present(node, browsename):
async def is_child_present(node, browsename):
"""
return if a browsename is present a child from the provide node
:param node: node wherein to find the browsename
:param browsename: browsename to search
:returns returne True if the browsename is present else False
"""
child_descs = node.get_children_descriptions()
child_descs = await node.get_children_descriptions()
for child_desc in child_descs:
if child_desc.BrowseName == browsename:
return True
......@@ -197,20 +196,20 @@ def is_child_present(node, browsename):
return False
def data_type_to_variant_type(dtype_node):
async def data_type_to_variant_type(dtype_node):
"""
Given a Node datatype, find out the variant type to encode
data. This is not exactly straightforward...
"""
base = get_base_data_type(dtype_node)
base = await get_base_data_type(dtype_node)
if base.nodeid.Identifier != 29:
return ua.VariantType(base.nodeid.Identifier)
else:
# we have an enumeration, value is a Int32
return ua.VariantType.Int32
def get_base_data_type(datatype):
async def get_base_data_type(datatype):
"""
Looks up the base datatype of the provided datatype Node
The base datatype is either:
......@@ -225,7 +224,7 @@ def get_base_data_type(datatype):
while base:
if base.nodeid.NamespaceIndex == 0 and isinstance(base.nodeid.Identifier, int) and base.nodeid.Identifier <= 30:
return base
base = get_node_supertype(base)
base = await get_node_supertype(base)
raise ua.UaError("Datatype must be a subtype of builtin types {0!s}".format(datatype))
......@@ -251,8 +250,10 @@ def get_nodes_of_namespace(server, namespaces=None):
namespace_indexes = [n if isinstance(n, int) else ns_available.index(n) for n in namespaces]
# filter nodeis based on the provide namespaces and convert the nodeid to a node
nodes = [server.get_node(nodeid) for nodeid in server.iserver.aspace.keys()
if nodeid.NamespaceIndex != 0 and nodeid.NamespaceIndex in namespace_indexes]
nodes = [
server.get_node(nodeid) for nodeid in server.iserver.aspace.keys()
if nodeid.NamespaceIndex != 0 and nodeid.NamespaceIndex in namespace_indexes
]
return nodes
......@@ -263,5 +264,3 @@ def get_default_value(uatype):
return ua.get_default_value(getattr(ua.VariantType, uatype))
else:
return getattr(ua, uatype)()
......@@ -452,5 +452,5 @@ def encrypt_asymmetric(pubkey, data, policy_uri):
return (cls.encrypt_asymmetric(pubkey, data),
cls.AsymmetricEncryptionURI)
if not policy_uri or policy_uri == POLICY_NONE_URI:
return (data, '')
return data, ''
raise UaError("Unsupported security policy `{0}`".format(policy_uri))
import os
import aiofiles
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
......@@ -12,13 +13,13 @@ from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.ciphers import modes
def load_certificate(path):
async def load_certificate(path):
_, ext = os.path.splitext(path)
with open(path, "rb") as f:
async with aiofiles.open(path, mode='rb') as f:
if ext == ".pem":
return x509.load_pem_x509_certificate(f.read(), default_backend())
return x509.load_pem_x509_certificate(await f.read(), default_backend())
else:
return x509.load_der_x509_certificate(f.read(), default_backend())
return x509.load_der_x509_certificate(await f.read(), default_backend())
def x509_from_der(data):
......@@ -27,13 +28,13 @@ def x509_from_der(data):
return x509.load_der_x509_certificate(data, default_backend())
def load_private_key(path):
async def load_private_key(path):
_, ext = os.path.splitext(path)
with open(path, "rb") as f:
async with aiofiles.open(path, mode='rb') as f:
if ext == ".pem":
return serialization.load_pem_private_key(f.read(), password=None, backend=default_backend())
return serialization.load_pem_private_key(await f.read(), password=None, backend=default_backend())
else:
return serialization.load_der_private_key(f.read(), password=None, backend=default_backend())
return serialization.load_der_private_key(await f.read(), password=None, backend=default_backend())
def der_from_x509(certificate):
......@@ -172,12 +173,3 @@ def x509_to_string(cert):
# TODO: show more information
return "{0}{1}, {2} - {3}".format(x509_name_to_string(cert.subject), issuer, cert.not_valid_before, cert.not_valid_after)
if __name__ == "__main__":
# Convert from PEM to DER
cert = load_certificate("../examples/server_cert.pem")
#rsa_pubkey = pubkey_from_dercert(der)
rsa_privkey = load_private_key("../examples/mykey.pem")
from IPython import embed
embed()
......@@ -107,14 +107,14 @@ class Server(object):
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
def load_certificate(self, path):
async def load_certificate(self, path):
"""
load server certificate from file, either pem or der
"""
self.certificate = uacrypto.load_certificate(path)
self.certificate = await uacrypto.load_certificate(path)
def load_private_key(self, path):
self.private_key = uacrypto.load_private_key(path)
async def load_private_key(self, path):
self.private_key = await uacrypto.load_private_key(path)
def disable_clock(self, val=True):
"""
......
import struct
from opcua import ua
from opcua.ua.ua_binary import Primitives
async def header_from_binary(data):
hdr = ua.Header()
hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", await data.read(8))
hdr.body_size = hdr.packet_size - 8
if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
hdr.body_size -= 4
hdr.ChannelId = Primitives.UInt32.unpack(ua.utils.Buffer(await data.read(4)))
return hdr
......@@ -478,7 +478,7 @@ def from_binary(uatype, data):
vtype = getattr(ua.VariantType, uatype)
return unpack_uatype(vtype, data)
elif isinstance(uatype, (str, unicode)) and hasattr(Primitives, uatype):
return getattr(Primitives, uatype).unpack(data)
return getattr(Primitives, uatype).unpack(data)
else:
return struct_from_binary(uatype, data)
......@@ -516,13 +516,13 @@ def header_to_binary(hdr):
return b"".join(b)
def header_from_binary(data):
async def header_from_binary(data):
hdr = ua.Header()
hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", await data.read(8))
hdr.body_size = hdr.packet_size - 8
if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
hdr.body_size -= 4
hdr.ChannelId = Primitives.UInt32.unpack(data)
hdr.ChannelId = Primitives.UInt32.unpack(ua.utils.Buffer(await data.read(4)))
return hdr
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment