Commit e9746a45 authored by Olivier R-D's avatar Olivier R-D

cleanup, renaming and split some files

parent dcf7c808
build*
MANIFEST
.idea*
......@@ -23,9 +23,7 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger("opcua.address_space")
#logger = logging.getLogger("opcua.internal_server")
logger = logging.getLogger("SubscriptionManager")
logger.setLevel(logging.DEBUG)
logger = logging.getLogger("Subscription")
logger = logging.getLogger("opcua.subscription_server")
logger.setLevel(logging.DEBUG)
# now setup our server and start it
......
......@@ -25,7 +25,7 @@ class BinaryClient(object):
uaprotocol_auto.py and uaprotocol_hand.py
"""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger(__name__)
self._socket = None
self._do_stop = False
self._security_token = ua.ChannelSecurityToken()
......
......@@ -9,7 +9,7 @@ except ImportError:
from threading import Thread, Lock
from opcua import ua
from opcua import utils
from opcua.uaprocessor import UAProcessor
logger = logging.getLogger(__name__)
......@@ -58,197 +58,3 @@ class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
class UAProcessor(object):
def __init__(self, internal_server, socket):
self.logger = logging.getLogger(__name__)
self.iserver = internal_server
self.socket = socket
self.channel = None
self._lock = Lock()
self.session = None
def loop(self):
#first we want a hello message
header = ua.Header.from_stream(self.socket)
body = self.receive_body(header.body_size)
if header.MessageType != ua.MessageType.Hello:
self.logger.warning("received a message which is not a hello, sending back an error message %s", header)
hdr = ua.Header(ua.MessageType.Error, ua.ChunkType.Single)
self.write_socket(hdr)
return
hello = ua.Hello.from_binary(body)
hdr = ua.Header(ua.MessageType.Acknowledge, ua.ChunkType.Single)
ack = ua.Acknowledge()
ack.ReceivebufferSize = hello.ReceiveBufferSize
ack.SendbufferSize = hello.SendBufferSize
self.write_socket(hdr, ack)
while True:
header = ua.Header.from_stream(self.socket)
if header is None:
return
if header.MessageType == ua.MessageType.Error:
self.logger.warning("Received an error message type")
return
body = self.receive_body(header.body_size)
if not self.process_body(header, body):
break
def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
with self._lock:
response.ResponseHeader.RequestHandle = requesthandle
seqhdr.SequenceNumber += 1
hdr = ua.Header(msgtype, ua.ChunkType.Single, self.channel.SecurityToken.ChannelId)
self.write_socket(hdr, algohdr, seqhdr, response)
def write_socket(self, hdr, *args):
alle = []
for arg in args:
data = arg.to_binary()
hdr.add_size(len(data))
alle.append(data)
alle.insert(0, hdr.to_binary())
alle = b"".join(alle)
self.logger.info("writting %s bytes to socket, with header %s ", len(alle), hdr)
#self.logger.info("writting data %s", hdr, [i for i in args])
#self.logger.debug("data: %s", alle)
self.socket.send(alle)
def receive_body(self, size):
self.logger.debug("reading body of message (%s bytes)", size)
data = self.socket.recv(size)
if size != len(data):
raise Exception("Error, did not received expected number of bytes, got {}, asked for {}".format(len(data), size))
return utils.Buffer(data)
def open_secure_channel(self, body):
algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
seqhdr = ua.SequenceHeader.from_binary(body)
request = ua.OpenSecureChannelRequest.from_binary(body)
self.channel = self.iserver.open_secure_channel(request.Parameters, self.channel)
#send response
response = ua.OpenSecureChannelResponse()
response.Parameters = self.channel
self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
def process_body(self, header, body):
if header.MessageType == ua.MessageType.SecureOpen:
self.open_secure_channel(body)
elif header.MessageType == ua.MessageType.SecureClose:
if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
return False
elif header.MessageType == ua.MessageType.SecureMessage:
algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
seqhdr = ua.SequenceHeader.from_binary(body)
self.process_message(algohdr, seqhdr, body)
else:
self.logger.warning("Unsupported message type: %s", header.MessageType)
return True
def process_message(self, algohdr, seqhdr, body):
typeid = ua.NodeId.from_binary(body)
requesthdr = ua.RequestHeader.from_binary(body)
if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
self.logger.info("Create session request")
params = ua.CreateSessionParameters.from_binary(body)
self.session = self.iserver.create_session(params)
response = ua.CreateSessionResponse()
response.Parameters = self.session
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
self.logger.info("Close session request")
deletesubs = ua.unpack_uatype('Boolean', body)
self.iserver.close_session(self.session, deletesubs)
response = ua.CloseSessionResponse()
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
self.logger.info("Activate session request")
params = ua.ActivateSessionParameters.from_binary(body)
result = self.iserver.activate_session(self.session, params)
response = ua.ActivateSessionResponse()
response.Parameters = result
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
self.logger.info("Read request")
params = ua.ReadParameters.from_binary(body)
results = self.iserver.read(params)
response = ua.ReadResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
self.logger.info("Write request")
params = ua.WriteParameters.from_binary(body)
results = self.iserver.write(params)
response = ua.WriteResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
self.logger.info("Browse request")
params = ua.BrowseParameters.from_binary(body)
results = self.iserver.browse(params)
response = ua.BrowseResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
self.logger.info("get endpoints request")
params = ua.GetEndpointsParameters.from_binary(body)
endpoints = self.iserver.get_endpoints(params)
response = ua.GetEndpointsResponse()
response.Endpoints = endpoints
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
self.logger.info("translate browsepaths to nodeids request")
params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
paths = self.iserver.translate_browsepaths_to_nodeids(params.BrowsePaths)
response = ua.TranslateBrowsePathsToNodeIdsResponse()
response.Results = paths
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
self.logger.info("add nodes request")
params = ua.AddNodesParameters.from_binary(body)
results = self.iserver.add_nodes(params.NodesToAdd)
response = ua.AddNodesResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
else:
self.logger.warning("Uknown message received %s", typeid)
sf = ua.ServiceFault()
sf.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, sf)
......@@ -18,7 +18,7 @@ class KeepAlive(Thread):
"""
def __init__(self, client, timeout):
Thread.__init__(self)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger(__name__)
if timeout == 0: # means no timeout bu we do not trust such servers
timeout = 360000
self.timeout = timeout
......@@ -63,7 +63,7 @@ class Client(object):
if you are unsure of url, write at least hostname and port
and call get_endpoints
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger(__name__)
self.server_url = urlparse(url)
self.name = "Pure Python Client"
self.description = self.name
......
......@@ -38,7 +38,7 @@ class Session(object):
class InternalServer(object):
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger(__name__)
self.endpoints = []
self.sessions = {}
self._channel_id_counter = 5
......
......@@ -57,7 +57,6 @@ class Server(object):
self.name = name
def start(self):
print("START SERVER")
self.iserver.start()
self._set_endpoints()
self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
......
......@@ -16,7 +16,7 @@ class SubscriptionItemData():
class Subscription(object):
def __init__(self, server, params, handler):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger(__name__)
self.server = server
self._client_handle = 200
self._handler = handler
......
......@@ -12,7 +12,7 @@ from opcua import ua
class SubscriptionManager(Thread):
def __init__(self, aspace):
Thread.__init__(self)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger(__name__)
self.loop = None
self.aspace = aspace
self.subscriptions = {}
......@@ -20,20 +20,18 @@ class SubscriptionManager(Thread):
self._cond = Condition()
def start(self):
print("start internal")
Thread.start(self)
with self._cond:
self._cond.wait()
print("start internal finished")
def run(self):
self.logger.warn("Starting subscription thread")
self.logger.debug("Starting subscription thread")
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
with self._cond:
self._cond.notify_all()
self.loop.run_forever()
print("LOOP", self.loop)
self.logger.debug("subsription thread ended")
def add_task(self, coro):
"""
......@@ -58,7 +56,7 @@ class SubscriptionManager(Thread):
result.RevisedLifetimeCount = params.RequestedLifetimeCount
result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
sub = Subscription(self, result, self.aspace, callback)
sub = InternalSubscription(self, result, self.aspace, callback)
sub.start()
self.subscriptions[result.SubscriptionId] = sub
......@@ -74,7 +72,7 @@ class SubscriptionManager(Thread):
return res
def publish(self, acks):
self.logger.warn("publish request with acks %s", acks)
self.logger.info("publish request with acks %s", acks)
def create_monitored_items(self, params):
self.logger.info("create monitored items")
......@@ -97,9 +95,9 @@ class MonitoredItemData(object):
self.mode = None
class Subscription(object):
class InternalSubscription(object):
def __init__(self, manager, data, addressspace, callback):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger(__name__)
self.aspace = addressspace
self.manager = manager
self.data = data
......@@ -123,7 +121,7 @@ class Subscription(object):
yield from asyncio.sleep(1)
def publish_results(self):
print("looking for results and publishing")
self.logger.debug("looking for results and publishing")
def create_monitored_items(self, params):
results = []
......@@ -158,7 +156,7 @@ class Subscription(object):
return result
def datachange_callback(self, handle, value):
self.logger.warn("subscription %s: datachange callback called with %s, %s", self, handle, value)
self.logger.info("subscription %s: datachange callback called with %s, %s", self, handle, value)
......
import logging
from threading import Lock
from opcua import ua
from opcua import utils
class UAProcessor(object):
def __init__(self, internal_server, socket):
self.logger = logging.getLogger(__name__)
self.iserver = internal_server
self.socket = socket
self.channel = None
self._lock = Lock()
self.session = None
def loop(self):
#first we want a hello message
header = ua.Header.from_stream(self.socket)
body = self.receive_body(header.body_size)
if header.MessageType != ua.MessageType.Hello:
self.logger.warning("received a message which is not a hello, sending back an error message %s", header)
hdr = ua.Header(ua.MessageType.Error, ua.ChunkType.Single)
self.write_socket(hdr)
return
hello = ua.Hello.from_binary(body)
hdr = ua.Header(ua.MessageType.Acknowledge, ua.ChunkType.Single)
ack = ua.Acknowledge()
ack.ReceivebufferSize = hello.ReceiveBufferSize
ack.SendbufferSize = hello.SendBufferSize
self.write_socket(hdr, ack)
while True:
header = ua.Header.from_stream(self.socket)
if header is None:
return
if header.MessageType == ua.MessageType.Error:
self.logger.warning("Received an error message type")
return
body = self.receive_body(header.body_size)
if not self.process_body(header, body):
break
def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
with self._lock:
response.ResponseHeader.RequestHandle = requesthandle
seqhdr.SequenceNumber += 1
hdr = ua.Header(msgtype, ua.ChunkType.Single, self.channel.SecurityToken.ChannelId)
self.write_socket(hdr, algohdr, seqhdr, response)
def write_socket(self, hdr, *args):
alle = []
for arg in args:
data = arg.to_binary()
hdr.add_size(len(data))
alle.append(data)
alle.insert(0, hdr.to_binary())
alle = b"".join(alle)
self.logger.info("writting %s bytes to socket, with header %s ", len(alle), hdr)
#self.logger.info("writting data %s", hdr, [i for i in args])
#self.logger.debug("data: %s", alle)
self.socket.send(alle)
def receive_body(self, size):
self.logger.debug("reading body of message (%s bytes)", size)
data = self.socket.recv(size)
if size != len(data):
raise Exception("Error, did not received expected number of bytes, got {}, asked for {}".format(len(data), size))
return utils.Buffer(data)
def open_secure_channel(self, body):
algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
seqhdr = ua.SequenceHeader.from_binary(body)
request = ua.OpenSecureChannelRequest.from_binary(body)
self.channel = self.iserver.open_secure_channel(request.Parameters, self.channel)
#send response
response = ua.OpenSecureChannelResponse()
response.Parameters = self.channel
self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
def process_body(self, header, body):
if header.MessageType == ua.MessageType.SecureOpen:
self.open_secure_channel(body)
elif header.MessageType == ua.MessageType.SecureClose:
if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
return False
elif header.MessageType == ua.MessageType.SecureMessage:
algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
seqhdr = ua.SequenceHeader.from_binary(body)
self.process_message(algohdr, seqhdr, body)
else:
self.logger.warning("Unsupported message type: %s", header.MessageType)
return True
def process_message(self, algohdr, seqhdr, body):
typeid = ua.NodeId.from_binary(body)
requesthdr = ua.RequestHeader.from_binary(body)
if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
self.logger.info("Create session request")
params = ua.CreateSessionParameters.from_binary(body)
self.session = self.iserver.create_session(params)
response = ua.CreateSessionResponse()
response.Parameters = self.session
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
self.logger.info("Close session request")
deletesubs = ua.unpack_uatype('Boolean', body)
self.iserver.close_session(self.session, deletesubs)
response = ua.CloseSessionResponse()
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
self.logger.info("Activate session request")
params = ua.ActivateSessionParameters.from_binary(body)
result = self.iserver.activate_session(self.session, params)
response = ua.ActivateSessionResponse()
response.Parameters = result
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
self.logger.info("Read request")
params = ua.ReadParameters.from_binary(body)
results = self.iserver.read(params)
response = ua.ReadResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
self.logger.info("Write request")
params = ua.WriteParameters.from_binary(body)
results = self.iserver.write(params)
response = ua.WriteResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
self.logger.info("Browse request")
params = ua.BrowseParameters.from_binary(body)
results = self.iserver.browse(params)
response = ua.BrowseResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
self.logger.info("get endpoints request")
params = ua.GetEndpointsParameters.from_binary(body)
endpoints = self.iserver.get_endpoints(params)
response = ua.GetEndpointsResponse()
response.Endpoints = endpoints
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
self.logger.info("translate browsepaths to nodeids request")
params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
paths = self.iserver.translate_browsepaths_to_nodeids(params.BrowsePaths)
response = ua.TranslateBrowsePathsToNodeIdsResponse()
response.Results = paths
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
self.logger.info("add nodes request")
params = ua.AddNodesParameters.from_binary(body)
results = self.iserver.add_nodes(params.NodesToAdd)
response = ua.AddNodesResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
else:
self.logger.warning("Uknown message received %s", typeid)
sf = ua.ServiceFault()
sf.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, sf)
......@@ -7,10 +7,7 @@ import opcua.uatypes as uatypes
import opcua.utils as utils
from opcua.object_ids import ObjectIds
logger = logging.getLogger(__name__)
logger = logging.getLogger('opcua.uaprotocol')
class SocketClosedException(Exception):
......
"""
implement ua datatypes
"""
import logging
from enum import Enum
from datetime import datetime, timedelta, tzinfo
from calendar import timegm
......@@ -10,6 +11,8 @@ import struct
import opcua.status_code as status_code
logger = logging.getLogger('opcua.uaprotocol')
#types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
......
......@@ -8,7 +8,7 @@ class Buffer(object):
and added a few conveniance methods
"""
def __init__(self, data):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = logging.getLogger(__name__)
self.data = data
def __str__(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment