Commit 20b99177 authored by Olivier R-D's avatar Olivier R-D

wrap socket in wrapper to have same interface from

all implementations (asyncio, socket, StringIO,..)
parent 8c3efefe
......@@ -76,20 +76,20 @@ class BinaryClient(object):
while not self._do_stop:
try:
self._receive()
except ua.SocketClosedException:
except ua.utils.SocketClosedException:
self.logger.info("Socket has closed connection")
break
self.logger.info("Thread ended")
def _receive_header(self):
self.logger.debug("Waiting for header")
header = ua.Header.from_stream(self._socket)
header = ua.Header.from_string(self._socket)
self.logger.info("received header: %s", header)
return header
def _receive_body(self, size):
self.logger.debug("reading body of message (%s bytes)", size)
data = utils.recv_all(self._socket, size)
data = self._socket.read(size)
if size != len(data):
raise Exception("Error, did not received expected number of bytes, got {}, asked for {}".format(len(data), size))
return utils.Buffer(data)
......@@ -138,7 +138,7 @@ class BinaryClient(object):
alle.append(data)
alle.insert(0, hdr.to_binary())
alle = b"".join(alle)
self._socket.sendall(alle)
self._socket.write(alle)
def _create_request_header(self, timeout=1000):
hdr = ua.RequestHeader()
......@@ -166,14 +166,15 @@ class BinaryClient(object):
connect to server socket and start receiving thread
"""
self.logger.info("opening connection")
self._socket = socket.create_connection((host, port))
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # nodelay ncessary to avoid packing in one frame, some servers do not like it
sock = socket.create_connection((host, port))
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # nodelay ncessary to avoid packing in one frame, some servers do not like it
self._socket = utils.SocketWrapper(sock)
self.start()
def disconnect_socket(self):
self.logger.info("stop request")
self._do_stop = True
self._socket.shutdown(socket.SHUT_WR)
self._socket.socket.shutdown(socket.SHUT_WR)
def send_hello(self, url):
hello = ua.Hello()
......
......@@ -60,16 +60,17 @@ class UAHandler(socketserver.BaseRequestHandler):
"""
def handle(self):
processor = UAProcessor(self.server.internal_server, self.request, self.client_address)
sock = ua.utils.SocketWrapper(self.request)
processor = UAProcessor(self.server.internal_server, sock, self.client_address)
try:
while True:
hdr = ua.Header.from_stream(self.request)
body = ua.utils.recv_all(self.request, hdr.body_size)
hdr = ua.Header.from_string(sock)
body = sock.read(hdr.body_size)
ret = processor.process(hdr, ua.utils.Buffer(body))
if not ret:
break
except ua.SocketClosedException:
logger.warn("Server has closed connection")
except ua.utils.SocketClosedException:
logger.warning("Server has closed connection")
except Exception:
logger.exception("Exception raised while parsing message from client, closing")
......
......@@ -11,6 +11,7 @@ except ImportError:
from opcua import ua
from opcua.binary_server import BinaryServer
#from opcua.binary_server_asyncio import BinaryServer
from opcua.internal_server import InternalServer
from opcua import Node, Subscription, ObjectIds, Event
......
......@@ -50,7 +50,7 @@ class UAProcessor(object):
self.logger.info("writting %s bytes to socket, with header %s ", len(alle), hdr)
#self.logger.info("writting data %s", hdr, [i for i in args])
#self.logger.debug("data: %s", alle)
self.socket.sendall(alle)
self.socket.write(alle)
def open_secure_channel(self, body):
algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
......
......@@ -10,18 +10,6 @@ from opcua.attribute_ids import AttributeIds
logger = logging.getLogger('opcua.uaprotocol')
class SocketClosedException(Exception):
pass
def get_bytes_from_sock(sock, size):
data = utils.recv_all(sock, size)
if len(data) < size: # socket has closed!
raise SocketClosedException("Server socket has closed")
return io.BytesIO(data)
class Hello(uatypes.FrozenClass):
def __init__(self):
......@@ -96,19 +84,6 @@ class Header(uatypes.FrozenClass):
b.append(struct.pack("<I", self.ChannelId))
return b"".join(b)
@staticmethod
def from_stream(sock):
data = get_bytes_from_sock(sock, 8)
hdr = Header()
hdr.MessageType = struct.unpack("<3s", data.read(3))[0]
hdr.ChunkType = struct.unpack("<c", data.read(1))[0]
hdr.body_size = struct.unpack("<I", data.read(4))[0] - 8
if hdr.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
hdr.body_size -= 4
data = get_bytes_from_sock(sock, 4)
hdr.ChannelId = struct.unpack("<I", data.read(4))[0]
return hdr
@staticmethod
def from_string(data):
hdr = Header()
......
......@@ -49,19 +49,33 @@ class Buffer(object):
raise Exception("No enough data left in buffer, request for {}, we have {}".format(size, self))
return self.data[:size]
class SocketClosedException(Exception):
pass
def recv_all(socket, size):
class SocketWrapper(object):
"""
Receive up to size bytes from socket
wrapper to make it possible to have same api for
normal sockets, socket from asyncio, StringIO, etc....
"""
data = b''
while size > 0:
chunk = socket.recv(size)
if not chunk:
break
data += chunk
size -= len(chunk)
return data
def __init__(self, sock):
self.socket = sock
def read(self, size):
"""
Receive up to size bytes from socket
"""
data = b''
while size > 0:
chunk = self.socket.recv(size)
if not chunk:
raise SocketClosedException("Server socket has closed")
data += chunk
size -= len(chunk)
return data
def write(self, data):
self.socket.sendall(data)
def create_nonce():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment