Commit 7906e1ef authored by Christian Bergmiller's avatar Christian Bergmiller

[ADD] wip

parent a8d37321
import asyncio
import logging
from opcua.client.async_client import AsyncClient
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('opcua')
async def browse_nodes(node, level=0):
node_class = node.get_node_class()
return {
'id': node.nodeid.to_string(),
'name': node.get_display_name().Text.decode('utf8'),
'cls': node_class.value,
'children': [
browse_nodes(child, level=level + 1) for child in node.get_children(nodeclassmask=objects_and_variables)
],
'type': node.get_data_type_as_variant_type().value if node_class == ua.NodeClass.Variable else None,
}
async def task(loop):
try:
client = AsyncClient(url='opc.tcp://commsvr.com:51234/UA/CAS_UA_Server')
await client.connect()
obj_node = client.get_objects_node()
_logger.info('Objects Node: %r', obj_node)
tree = await browse_nodes(obj_node)
_logger.info('Tree: %r', tree)
except Exception:
_logger.exception('Task error')
loop.stop()
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.create_task(task(loop))
try:
loop.run_forever()
except Exception:
_logger.exception('Event loop error')
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
if __name__ == '__main__':
main()
import asyncio
import logging
from urllib.parse import urlparse
......@@ -13,6 +13,7 @@ from opcua.common import utils
from opcua.crypto import security_policies
from opcua.common.shortcuts import Shortcuts
from opcua.common.structures import load_type_definitions
use_crypto = True
try:
from opcua.crypto import uacrypto
......@@ -21,11 +22,11 @@ except ImportError:
use_crypto = False
class KeepAlive(Thread):
class KeepAlive:
"""
Used by Client to keep the session open.
OPCUA defines timeout both for sessions and secure channel
ToDo: remove
"""
def __init__(self, client, timeout):
......@@ -33,25 +34,24 @@ class KeepAlive(Thread):
:param session_timeout: Timeout to re-new the session
in milliseconds.
"""
Thread.__init__(self)
self.logger = logging.getLogger(__name__)
self.loop = asyncio.get_event_loop()
self.client = client
self._dostop = False
self._do_stop = False
self._cond = Condition()
self.timeout = timeout
# some server support no timeout, but we do not trust them
if self.timeout == 0:
self.timeout = 3600000 # 1 hour
self.timeout = 3600000 # 1 hour
def run(self):
self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
while not self._dostop:
while not self._do_stop:
with self._cond:
self._cond.wait(self.timeout / 1000)
if self._dostop:
if self._do_stop:
break
self.logger.debug("renewing channel")
self.client.open_secure_channel(renew=True)
......@@ -61,19 +61,18 @@ class KeepAlive(Thread):
def stop(self):
self.logger.debug("stoping keepalive thread")
self._dostop = True
self._do_stop = True
with self._cond:
self._cond.notify_all()
class Client(object):
class AsyncClient(object):
"""
High level client to connect to an OPC-UA server.
This class makes it easy to connect and browse address space.
It attemps to expose as much functionality as possible
but if you want more flexibility it is possible and adviced to
It attempts to expose as much functionality as possible
but if you want more flexibility it is possible and advised to
use UaClient object, available as self.uaclient
which offers the raw OPC-UA services interface.
"""
......@@ -91,17 +90,17 @@ class Client(object):
"""
self.logger = logging.getLogger(__name__)
self.server_url = urlparse(url)
#take initial username and password from the url
# take initial username and password from the url
self._username = self.server_url.username
self._password = self.server_url.password
self.name = "Pure Python Client"
self.name = "Pure Python Async. Client"
self.description = self.name
self.application_uri = "urn:freeopcua:client"
self.product_uri = "urn:freeopcua.github.no:client"
self.security_policy = ua.SecurityPolicy()
self.secure_channel_id = None
self.secure_channel_timeout = 3600000 # 1 hour
self.session_timeout = 3600000 # 1 hour
self.secure_channel_timeout = 3600000 # 1 hour
self.session_timeout = 3600000 # 1 hour
self._policy_ids = []
self.uaclient = UaClient(timeout)
self.user_certificate = None
......@@ -110,14 +109,14 @@ class Client(object):
self._session_counter = 1
self.keepalive = None
self.nodes = Shortcuts(self.uaclient)
self.max_messagesize = 0 # No limits
self.max_chunkcount = 0 # No limits
self.max_messagesize = 0 # No limits
self.max_chunkcount = 0 # No limits
def __enter__(self):
self.connect()
async def __aenter__(self):
await self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
async def __aexit__(self, exc_type, exc_value, traceback):
self.disconnect()
@staticmethod
......@@ -163,20 +162,20 @@ class Client(object):
raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
mode = getattr(ua.MessageSecurityMode, parts[1])
return self.set_security(policy_class, parts[2], parts[3],
parts[4] if len(parts) >= 5 else None, mode)
return self.set_security(
policy_class, parts[2], parts[3], parts[4] if len(parts) >= 5 else None, mode
)
def set_security(self, policy, certificate_path, private_key_path,
server_certificate_path=None,
mode=ua.MessageSecurityMode.SignAndEncrypt):
async def set_security(self, policy, certificate_path, private_key_path,
server_certificate_path=None, mode=ua.MessageSecurityMode.SignAndEncrypt):
"""
Set SecureConnection mode.
Call this before connect()
"""
if server_certificate_path is None:
# load certificate from server's list of endpoints
endpoints = self.connect_and_get_server_endpoints()
endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
endpoints = await self.connect_and_get_server_endpoints()
endpoint = AsyncClient.find_endpoint(endpoints, mode, policy.URI)
server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
else:
server_cert = uacrypto.load_certificate(server_certificate_path)
......@@ -197,81 +196,81 @@ class Client(object):
"""
self.user_private_key = uacrypto.load_private_key(path)
def connect_and_get_server_endpoints(self):
async def connect_and_get_server_endpoints(self):
"""
Connect, ask server for endpoints, and disconnect
"""
self.connect_socket()
self.send_hello()
self.open_secure_channel()
endpoints = self.get_endpoints()
self.close_secure_channel()
await self.connect_socket()
await self.send_hello()
await self.open_secure_channel()
endpoints = await self.get_endpoints()
await self.close_secure_channel()
self.disconnect_socket()
return endpoints
def connect_and_find_servers(self):
async def connect_and_find_servers(self):
"""
Connect, ask server for a list of known servers, and disconnect
"""
self.connect_socket()
self.send_hello()
self.open_secure_channel() # spec says it should not be necessary to open channel
servers = self.find_servers()
self.close_secure_channel()
await self.connect_socket()
await self.send_hello()
await self.open_secure_channel() # spec says it should not be necessary to open channel
servers = await self.find_servers()
await self.close_secure_channel()
self.disconnect_socket()
return servers
def connect_and_find_servers_on_network(self):
async def connect_and_find_servers_on_network(self):
"""
Connect, ask server for a list of known servers on network, and disconnect
"""
self.connect_socket()
self.send_hello()
self.open_secure_channel()
servers = self.find_servers_on_network()
self.close_secure_channel()
await self.connect_socket()
await self.send_hello()
await self.open_secure_channel()
servers = await self.find_servers_on_network()
await self.close_secure_channel()
self.disconnect_socket()
return servers
def connect(self):
async def connect(self):
"""
High level method
Connect, create and activate session
"""
self.connect_socket()
self.send_hello()
self.open_secure_channel()
self.create_session()
self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
await self.connect_socket()
await self.send_hello()
await self.open_secure_channel()
await self.create_session()
await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
def disconnect(self):
async def disconnect(self):
"""
High level method
Close session, secure channel and socket
"""
try:
self.close_session()
self.close_secure_channel()
await self.close_session()
await self.close_secure_channel()
finally:
self.disconnect_socket()
def connect_socket(self):
async def connect_socket(self):
"""
connect to socket defined in url
"""
self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
def disconnect_socket(self):
self.uaclient.disconnect_socket()
def send_hello(self):
async def send_hello(self):
"""
Send OPC-UA hello to server
"""
ack = self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
# FIXME check ack
def open_secure_channel(self, renew=False):
async def open_secure_channel(self, renew=False):
"""
Open secure channel, if renew is True, renew channel
"""
......@@ -282,21 +281,22 @@ class Client(object):
params.RequestType = ua.SecurityTokenRequestType.Renew
params.SecurityMode = self.security_policy.Mode
params.RequestedLifetime = self.secure_channel_timeout
nonce = utils.create_nonce(self.security_policy.symmetric_key_size) # length should be equal to the length of key of symmetric encryption
params.ClientNonce = nonce # this nonce is used to create a symmetric key
result = self.uaclient.open_secure_channel(params)
# length should be equal to the length of key of symmetric encryption
nonce = utils.create_nonce(self.security_policy.symmetric_key_size)
params.ClientNonce = nonce # this nonce is used to create a symmetric key
result = await self.uaclient.open_secure_channel(params)
self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
def close_secure_channel(self):
return self.uaclient.close_secure_channel()
async def close_secure_channel(self):
return await self.uaclient.close_secure_channel()
async def get_endpoints(self):
params = ua.GetEndpointsParameters()
params.EndpointUrl = self.server_url.geturl()
return await self.uaclient.get_endpoints(params)
def register_server(self, server, discovery_configuration=None):
async def register_server(self, server, discovery_configuration=None):
"""
register a server to discovery server
if discovery_configuration is provided, the newer register_server2 service call is used
......@@ -312,11 +312,11 @@ class Client(object):
params = ua.RegisterServer2Parameters()
params.Server = serv
params.DiscoveryConfiguration = discovery_configuration
return self.uaclient.register_server2(params)
return await self.uaclient.register_server2(params)
else:
return self.uaclient.register_server(serv)
return await self.uaclient.register_server(serv)
def find_servers(self, uris=None):
async def find_servers(self, uris=None):
"""
send a FindServer request to the server. The answer should be a list of
servers the server knows about
......@@ -327,13 +327,13 @@ class Client(object):
params = ua.FindServersParameters()
params.EndpointUrl = self.server_url.geturl()
params.ServerUris = uris
return self.uaclient.find_servers(params)
return await self.uaclient.find_servers(params)
def find_servers_on_network(self):
async def find_servers_on_network(self):
params = ua.FindServersOnNetworkParameters()
return self.uaclient.find_servers_on_network(params)
return await self.uaclient.find_servers_on_network(params)
def create_session(self):
async def create_session(self):
"""
send a CreateSessionRequest to server with reasonable parameters.
If you want o modify settings look at code of this methods
......@@ -346,7 +346,8 @@ class Client(object):
desc.ApplicationType = ua.ApplicationType.Client
params = ua.CreateSessionParameters()
nonce = utils.create_nonce(32) # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
# at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
nonce = utils.create_nonce(32)
params.ClientNonce = nonce
params.ClientCertificate = self.security_policy.client_certificate
params.ClientDescription = desc
......@@ -354,7 +355,7 @@ class Client(object):
params.SessionName = self.description + " Session" + str(self._session_counter)
params.RequestedSessionTimeout = 3600000
params.MaxResponseMessageSize = 0 # means no max size
response = self.uaclient.create_session(params)
response = await self.uaclient.create_session(params)
if self.security_policy.client_certificate is None:
data = nonce
else:
......@@ -366,11 +367,13 @@ class Client(object):
elif self.security_policy.server_certificate != response.ServerCertificate:
raise ua.UaError("Server certificate mismatch")
# remember PolicyId's: we will use them in activate_session()
ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
ep = AsyncClient.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
self._policy_ids = ep.UserIdentityTokens
self.session_timeout = response.RevisedSessionTimeout
self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7) # 0.7 is from spec
self.keepalive.start()
# 0.7 is from spec
# ToDo: refactor with callback_later
# self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)
# self.keepalive.start()
return response
def server_policy_id(self, token_type, default):
......@@ -393,11 +396,11 @@ class Client(object):
if policy.TokenType == token_type:
if policy.SecurityPolicyUri:
return policy.SecurityPolicyUri
else: # empty URI means "use this endpoint's policy URI"
else: # empty URI means "use this endpoint's policy URI"
return self.security_policy.URI
return self.security_policy.URI
def activate_session(self, username=None, password=None, certificate=None):
async def activate_session(self, username=None, password=None, certificate=None):
"""
Activate session using either username and password or private_key
"""
......@@ -416,7 +419,7 @@ class Client(object):
self._add_certificate_auth(params, certificate, challenge)
else:
self._add_user_auth(params, username, password)
return self.uaclient.activate_session(params)
return await self.uaclient.activate_session(params)
def _add_anonymous_auth(self, params):
params.UserIdentityToken = ua.AnonymousIdentityToken()
......@@ -462,13 +465,13 @@ class Client(object):
data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
return data, uri
def close_session(self):
async def close_session(self):
"""
Close session
"""
if self.keepalive:
self.keepalive.stop()
return self.uaclient.close_session(True)
return await self.uaclient.close_session(True)
def get_root_node(self):
return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
......@@ -553,5 +556,3 @@ class Client(object):
def load_type_definitions(self, nodes=None):
return load_type_definitions(self, nodes)
......@@ -47,22 +47,25 @@ class UASocketProtocol(asyncio.Protocol):
async def read(self, size):
"""Receive up to size bytes from socket."""
data = b''
self.logger.debug('read %s bytes from socket', size)
while size > 0:
self.logger.debug('data is now %s, waiting for %s bytes', len(data), size)
# ToDo: abort on timeout, socket close
# raise SocketClosedException("Server socket has closed")
if self._leftover_chunk:
self.logger.debug('leftover bytes %s', len(self._leftover_chunk))
# use leftover chunk first
chunk = self._leftover_chunk
self._leftover_chunk = None
else:
chunk = await self.receive_buffer.get()
needed_length = size - len(data)
if len(chunk) <= needed_length:
self.logger.debug('got chunk %s needed_length is %s', len(chunk), size)
if len(chunk) <= size:
_chunk = chunk
else:
# chunk is too big
_chunk = chunk[:needed_length]
self._leftover_chunk = chunk[needed_length:]
_chunk = chunk[:size]
self._leftover_chunk = chunk[size:]
data += _chunk
size -= len(_chunk)
return data
......@@ -99,7 +102,8 @@ class UASocketProtocol(asyncio.Protocol):
"""
future = self._send_request(request, callback, timeout, message_type)
if not callback:
data = await asyncio.wait_for(future.result(), self.timeout)
await asyncio.wait_for(future, self.timeout)
data = future.result()
self.check_answer(data, " in response to " + request.__class__.__name__)
return data
......@@ -246,7 +250,7 @@ class UaClient:
self.logger.info("create_session")
request = ua.CreateSessionRequest()
request.Parameters = parameters
data = self.protocol.send_request(request)
data = await self.protocol.send_request(request)
response = struct_from_binary(ua.CreateSessionResponse, data)
self.logger.debug(response)
response.ResponseHeader.ServiceResult.check()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment