Commit d1a27501 authored by oroulet's avatar oroulet

fix big bug in handling of receiving buffer, many more tests working

parent c5ae7561
import sys
sys.path.insert(0, "..")
import logging
import asyncio
from opcua import ua, Server
from opcua.common.methods import uamethod
......@@ -11,7 +12,7 @@ def func(parent, value):
return value * 2
async def task(loop):
async def main():
# setup our server
server = Server()
await server.init()
......@@ -36,18 +37,18 @@ async def task(loop):
async with server:
count = 0
while True:
await asyncio.sleep(1000)
print("UPDATE")
await asyncio.sleep(1)
count += 0.1
await myvar.set_value(count)
def main():
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.run_until_complete(main())
loop.close()
if __name__ == '__main__':
main()
......@@ -23,11 +23,12 @@ class OPCUAProtocol(asyncio.Protocol):
self.peer_name = None
self.transport = None
self.processor = None
self.receive_buffer = b''
self._buffer = b''
self.iserver = iserver
self.policies = policies
self.clients = clients
self.messages = asyncio.Queue()
self._task = None
def __str__(self):
return 'OPCUAProtocol({}, {})'.format(self.peer_name, self.processor.session)
......@@ -42,7 +43,7 @@ class OPCUAProtocol(asyncio.Protocol):
self.processor.set_policies(self.policies)
self.iserver.asyncio_transports.append(transport)
self.clients.append(self)
self.loop.create_task(self._process_received_message())
self._task = self.loop.create_task(self._process_received_message_loop())
def connection_lost(self, ex):
logger.info('Lost connection from %s, %s', self.peer_name, ex)
......@@ -52,34 +53,30 @@ class OPCUAProtocol(asyncio.Protocol):
if self in self.clients:
self.clients.remove(self)
self.messages.put_nowait((None, None))
self._task.cancel()
def data_received(self, data):
if self.receive_buffer:
data = self.receive_buffer + data
self.receive_buffer = b''
self._process_received_data(data)
def _process_received_data(self, data: Union[bytes, Buffer]):
buf = Buffer(data) if type(data) is bytes else data
try:
# try to parse the incoming data
self._buffer += data
# try to parse the incoming data
while len(self._buffer) > 0:
try:
header = header_from_binary(buf)
except NotEnoughData:
logger.debug('Not enough data while parsing header from client, waiting for more')
self.receive_buffer = data + self.receive_buffer
buf = Buffer(self._buffer)
try:
header = header_from_binary(buf)
except NotEnoughData:
logger.debug('Not enough data while parsing header from client, waiting for more')
return
if len(buf) < header.body_size:
logger.debug('We did not receive enough data from client. Need %s got %s', header.body_size, len(buf))
return
# we have a complete message
self.messages.put_nowait((header, buf))
self._buffer = self._buffer[(header.header_size + header.body_size):]
except Exception:
logger.exception('Exception raised while parsing message from client')
return
if len(buf) < header.body_size:
logger.debug('We did not receive enough data from client. Need %s got %s', header.body_size, len(buf))
self.receive_buffer = data + self.receive_buffer
return
# a message has been received
self.messages.put_nowait((header, buf))
except Exception:
logger.exception('Exception raised while parsing message from client')
return
async def _process_received_message(self):
async def _process_received_message_loop(self):
"""
Take message from the queue and try to process it.
"""
......@@ -88,15 +85,18 @@ class OPCUAProtocol(asyncio.Protocol):
if header is None and buf is None:
# Connection was closed, end task
break
logger.debug('_process_received_message %s %s', header.body_size, len(buf))
ret = await self.processor.process(header, buf)
if not ret:
logger.info('processor returned False, we close connection from %s', self.peer_name)
self.transport.close()
return
if len(buf) != 0:
# There is data left in the buffer - process it
self._process_received_data(buf)
try:
await self._process_one_msg(header, buf)
except:
logger.exception()
async def _process_one_msg(self, header, buf):
logger.debug('_process_received_message %s %s', header.body_size, len(buf))
ret = await self.processor.process(header, buf)
if not ret:
logger.info('processor returned False, we close connection from %s', self.peer_name)
self.transport.close()
return
class BinaryServer:
......
......@@ -419,10 +419,12 @@ class InternalSession:
return self.subscription_service.republish(params)
async def delete_subscriptions(self, ids):
# This is an async method, dues to symetry with client code
return await self.subscription_service.delete_subscriptions(ids)
async def delete_monitored_items(self, params):
subscription_result = await self.subscription_service.delete_monitored_items(params)
# This is an async method, dues to symetry with client code
subscription_result = self.subscription_service.delete_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(
CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
return subscription_result
......
......@@ -2,6 +2,7 @@
server side implementation of subscription service
"""
import asyncio
import logging
from opcua import ua
......@@ -39,13 +40,16 @@ class SubscriptionService:
async def delete_subscriptions(self, ids):
self.logger.info("delete subscriptions: %s", ids)
res = []
existing_subs = []
for i in ids:
if i not in self.subscriptions:
sub = self.subscriptions.pop(i, None)
if sub is None:
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
else:
sub = self.subscriptions.pop(i)
await sub.stop()
#await sub.stop()
existing_subs.append(sub)
res.append(ua.StatusCode())
await asyncio.gather(*[sub.stop() for sub in existing_subs])
return res
def publish(self, acks):
......
......@@ -510,6 +510,7 @@ def header_from_binary(data):
if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
hdr.body_size -= 4
hdr.ChannelId = Primitives.UInt32.unpack(data)
hdr.header_size = 12
return hdr
......
......@@ -48,6 +48,7 @@ class Header(uatypes.FrozenClass):
self.ChannelId = channelid
self.body_size = 0
self.packet_size = 0
self.header_size = 8
self._freeze = True
def add_size(self, size):
......
......@@ -26,6 +26,7 @@ def pytest_generate_tests(metafunc):
def event_loop(request):
"""Create an instance of the default event loop for each test case."""
loop = asyncio.get_event_loop_policy().new_event_loop()
loop.set_debug(True)
yield loop
loop.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment