Commit e7163da9 authored by oroulet's avatar oroulet Committed by oroulet

expose limits, add tests and fix all the bugs they exposed

parent 2de4455d
...@@ -50,13 +50,17 @@ class TransportLimits: ...@@ -50,13 +50,17 @@ class TransportLimits:
logger.error("Number of message chunks: %s is > configured max chunk count: %s", sz, self.max_chunk_count) logger.error("Number of message chunks: %s is > configured max chunk count: %s", sz, self.max_chunk_count)
return within_limit return within_limit
def create_acknowledge_limits(self, msg: ua.Hello) -> ua.Acknowledge: def create_acknowledge_and_set_limits(self, msg: ua.Hello) -> ua.Acknowledge:
ack = ua.Acknowledge() ack = ua.Acknowledge()
ack.ReceiveBufferSize = min(msg.ReceiveBufferSize, self.max_recv_buffer) ack.ReceiveBufferSize = min(msg.ReceiveBufferSize, self.max_send_buffer)
ack.SendBufferSize = min(msg.SendBufferSize, self.max_send_buffer) ack.SendBufferSize = min(msg.SendBufferSize, self.max_recv_buffer)
ack.MaxChunkCount = self._select_limit(msg.MaxChunkCount, self.max_chunk_count) ack.MaxChunkCount = self._select_limit(msg.MaxChunkCount, self.max_chunk_count)
ack.MaxMessageSize = self._select_limit(msg.MaxMessageSize, self.max_message_size) ack.MaxMessageSize = self._select_limit(msg.MaxMessageSize, self.max_message_size)
self.update_limits(ack) self.max_chunk_count = ack.MaxChunkCount
self.max_recv_buffer = ack.SendBufferSize
self.max_send_buffer = ack.ReceiveBufferSize
self.max_message_size = ack.MaxMessageSize
logger.warning("updating server limits to: %s", self)
return ack return ack
def create_hello_limits(self, msg: ua.Hello) -> ua.Hello: def create_hello_limits(self, msg: ua.Hello) -> ua.Hello:
...@@ -65,12 +69,12 @@ class TransportLimits: ...@@ -65,12 +69,12 @@ class TransportLimits:
msg.MaxChunkCount = self.max_chunk_count msg.MaxChunkCount = self.max_chunk_count
msg.MaxMessageSize = self.max_chunk_count msg.MaxMessageSize = self.max_chunk_count
def update_limits(self, msg: ua.Acknowledge) -> None: def update_client_limits(self, msg: ua.Acknowledge) -> None:
self.max_chunk_count = msg.MaxChunkCount self.max_chunk_count = msg.MaxChunkCount
self.max_recv_buffer = msg.ReceiveBufferSize self.max_recv_buffer = msg.ReceiveBufferSize
self.max_send_buffer = msg.SendBufferSize self.max_send_buffer = msg.SendBufferSize
self.max_message_size = msg.MaxMessageSize self.max_message_size = msg.MaxMessageSize
logger.warning("updating limits to: %s", self) logger.warning("updating client limits to: %s", self)
class MessageChunk: class MessageChunk:
...@@ -415,7 +419,7 @@ class SecureConnection: ...@@ -415,7 +419,7 @@ class SecureConnection:
return msg return msg
if header.MessageType == ua.MessageType.Acknowledge: if header.MessageType == ua.MessageType.Acknowledge:
msg = struct_from_binary(ua.Acknowledge, body) msg = struct_from_binary(ua.Acknowledge, body)
self._limits.update_limits(msg) self._limits.update_client_limits(msg)
return msg return msg
if header.MessageType == ua.MessageType.Error: if header.MessageType == ua.MessageType.Error:
msg = struct_from_binary(ua.ErrorMessage, body) msg = struct_from_binary(ua.ErrorMessage, body)
......
...@@ -3,7 +3,6 @@ Socket server forwarding request to internal server ...@@ -3,7 +3,6 @@ Socket server forwarding request to internal server
""" """
import logging import logging
import asyncio import asyncio
import math
from typing import Optional from typing import Optional
from ..common.connection import TransportLimits from ..common.connection import TransportLimits
...@@ -68,9 +67,8 @@ class OPCUAProtocol(asyncio.Protocol): ...@@ -68,9 +67,8 @@ class OPCUAProtocol(asyncio.Protocol):
try: try:
header = header_from_binary(buf) header = header_from_binary(buf)
except NotEnoughData: except NotEnoughData:
# a packet should at least contain a header otherwise it is malformed (8 or 12 bytes) # we jsut wait for more data, that happens.
logger.debug('Not enough data while parsing header from client, empty the buffer') # worst case recv will go in timeout or it hangs and it should be fine too
self.transport.close()
return return
if header.header_size + header.body_size <= header.header_size: if header.header_size + header.body_size <= header.header_size:
# malformed header prevent invalid access of your buffer # malformed header prevent invalid access of your buffer
...@@ -78,8 +76,7 @@ class OPCUAProtocol(asyncio.Protocol): ...@@ -78,8 +76,7 @@ class OPCUAProtocol(asyncio.Protocol):
self.transport.close() self.transport.close()
else: else:
if len(buf) < header.body_size: if len(buf) < header.body_size:
logger.debug('We did not receive enough data from client. Need %s got %s', header.body_size, logger.debug('We did not receive enough data from client. Need %s got %s', header.body_size, len(buf))
len(buf))
return return
# we have a complete message # we have a complete message
self.messages.put_nowait((header, buf)) self.messages.put_nowait((header, buf))
...@@ -112,7 +109,7 @@ class OPCUAProtocol(asyncio.Protocol): ...@@ -112,7 +109,7 @@ class OPCUAProtocol(asyncio.Protocol):
class BinaryServer: class BinaryServer:
def __init__(self, internal_server: InternalServer, hostname, port): def __init__(self, internal_server: InternalServer, hostname, port, limits: TransportLimits):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
...@@ -122,15 +119,7 @@ class BinaryServer: ...@@ -122,15 +119,7 @@ class BinaryServer:
self.clients = [] self.clients = []
self.closing_tasks = [] self.closing_tasks = []
self.cleanup_task = None self.cleanup_task = None
# Use accectable limits self.limits = limits
buffer_sz = 65535
max_msg_sz = 16 * 1024 * 1024 # 16mb simular to the opc ua c stack so this is a good default
self.limits = TransportLimits(
max_recv_buffer=buffer_sz,
max_send_buffer=buffer_sz,
max_chunk_count=math.ceil(buffer_sz / max_msg_sz), # Round up to allow max msg size
max_message_size=max_msg_sz
)
def set_policies(self, policies): def set_policies(self, policies):
self._policies = policies self._policies = policies
......
...@@ -4,6 +4,7 @@ High level interface to pure python OPC-UA server ...@@ -4,6 +4,7 @@ High level interface to pure python OPC-UA server
import asyncio import asyncio
import logging import logging
import math
from datetime import timedelta, datetime from datetime import timedelta, datetime
from urllib.parse import urlparse from urllib.parse import urlparse
from typing import Coroutine, Optional, Tuple from typing import Coroutine, Optional, Tuple
...@@ -23,6 +24,7 @@ from ..common.shortcuts import Shortcuts ...@@ -23,6 +24,7 @@ from ..common.shortcuts import Shortcuts
from ..common.structures import load_type_definitions, load_enums from ..common.structures import load_type_definitions, load_enums
from ..common.structures104 import load_data_type_definitions from ..common.structures104 import load_data_type_definitions
from ..common.ua_utils import get_nodes_of_namespace from ..common.ua_utils import get_nodes_of_namespace
from ..common.connection import TransportLimits
from ..crypto import security_policies, uacrypto from ..crypto import security_policies, uacrypto
...@@ -104,6 +106,15 @@ class Server: ...@@ -104,6 +106,15 @@ class Server:
self._permission_ruleset = None self._permission_ruleset = None
self._policyIDs = ["Anonymous", "Basic256Sha256", "Username", "Aes128Sha256RsaOaep"] self._policyIDs = ["Anonymous", "Basic256Sha256", "Username", "Aes128Sha256RsaOaep"]
self.certificate = None self.certificate = None
# Use accectable limits
buffer_sz = 65535
max_msg_sz = 100 * 1024 * 1024 # 100mb
self.limits = TransportLimits(
max_recv_buffer=buffer_sz,
max_send_buffer=buffer_sz,
max_chunk_count=math.ceil(max_msg_sz / buffer_sz), # Round up to allow max msg size
max_message_size=max_msg_sz
)
async def init(self, shelf_file=None): async def init(self, shelf_file=None):
await self.iserver.init(shelf_file) await self.iserver.init(shelf_file)
...@@ -425,7 +436,7 @@ class Server: ...@@ -425,7 +436,7 @@ class Server:
await self.iserver.start() await self.iserver.start()
try: try:
ipaddress, port = self._get_bind_socket_info() ipaddress, port = self._get_bind_socket_info()
self.bserver = BinaryServer(self.iserver, ipaddress, port) self.bserver = BinaryServer(self.iserver, ipaddress, port, self.limits)
self.bserver.set_policies(self._policies) self.bserver.set_policies(self._policies)
await self.bserver.start() await self.bserver.start()
except Exception as exp: except Exception as exp:
......
...@@ -109,7 +109,7 @@ class UaProcessor: ...@@ -109,7 +109,7 @@ class UaProcessor:
elif header.MessageType == ua.MessageType.SecureMessage: elif header.MessageType == ua.MessageType.SecureMessage:
return await self.process_message(msg.SequenceHeader(), msg.body()) return await self.process_message(msg.SequenceHeader(), msg.body())
elif isinstance(msg, ua.Hello): elif isinstance(msg, ua.Hello):
ack = self._limits.create_acknowledge_limits(msg) ack = self._limits.create_acknowledge_and_set_limits(msg)
data = uatcp_to_binary(ua.MessageType.Acknowledge, ack) data = uatcp_to_binary(ua.MessageType.Acknowledge, ack)
self._transport.write(data) self._transport.write(data)
elif isinstance(msg, ua.ErrorMessage): elif isinstance(msg, ua.ErrorMessage):
......
import asyncio
import logging
from asyncua import Client
url = "opc.tcp://localhost:4840/freeopcua/server/"
namespace = "http://examples.freeopcua.github.io"
async def main():
print(f"Connecting to {url} ...")
async with Client(url=url, watchdog_intervall=1000) as client:
# Find the namespace index
nsidx = await client.get_namespace_index(namespace)
print(f"Namespace Index for '{namespace}': {nsidx}")
# Get the variable node for read / write
var = await client.nodes.root.get_child(
["0:Objects", f"{nsidx}:MyObject", f"{nsidx}:MyVariable"]
)
print("READ!!!!!!!!!!!!!!!!!")
value = await var.read_value()
print("Received value of length !!!!!!!!!!!!!!!!!!!!!", len(value))
print("writting back value of MyVariable ")
await var.write_value(value)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
asyncio.run(main())
import asyncio
import logging
from asyncua import Server, ua
from asyncua.common.methods import uamethod
@uamethod
def func(parent, value):
return value * 2
async def main():
_logger = logging.getLogger("asyncua")
# setup our server
server = Server()
# set some hard connection limits
#server.limits.max_recv_buffer = 1024
#server.limits.max_send_buffer = 1024
#server.limits.max_send_buffer = 102400000000
server.limits.max_chunk_count = 10
print(server.limits)
await server.init()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = await server.register_namespace(uri)
# populating our address space
# setup a variable far too big for our limits
test_string = b'a' * (100 * 1024 * 1024)
test_string = b'a' * 100 * 1024
print("LENGTH VAR", len(test_string))
myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", test_string)
# Set MyVariable to be writable by clients
await myvar.set_writable()
_logger.info("Starting server!")
async with server:
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
asyncio.run(main(), debug=False)
...@@ -677,24 +677,43 @@ def restore_transport_limits_server(server: Server): ...@@ -677,24 +677,43 @@ def restore_transport_limits_server(server: Server):
server.bserver.limits.max_chunk_count = max_chunk_count server.bserver.limits.max_chunk_count = max_chunk_count
async def test_message_limits(restore_transport_limits_server: Server): async def test_message_limits_fail_write(restore_transport_limits_server: Server):
server = restore_transport_limits_server server = restore_transport_limits_server
server.bserver.limits.max_recv_buffer = 1024 server.bserver.limits.max_recv_buffer = 1024
server.bserver.limits.max_send_buffer = 10240000
server.bserver.limits.max_chunk_count = 10 server.bserver.limits.max_chunk_count = 10
n = await server.nodes.objects.add_variable(1, "MyLimitVariable", "t") test_string = b'a' * 100 * 1024
n = await server.nodes.objects.add_variable(1, "MyLimitVariable", test_string)
await n.set_writable(True) await n.set_writable(True)
client = Client(server.endpoint.geturl()) client = Client(server.endpoint.geturl())
# This should trigger a timeout error because the message is to large # This should trigger a timeout error because the message is to large
async with client:
n = client.get_node(n.nodeid)
await n.read_value()
with pytest.raises(ConnectionError): with pytest.raises(ConnectionError):
await n.write_value(test_string, ua.VariantType.ByteString)
async def test_message_limits_fail_read(restore_transport_limits_server: Server):
server = restore_transport_limits_server
server.bserver.limits.max_recv_buffer = 10240000
server.bserver.limits.max_send_buffer = 1024
server.bserver.limits.max_chunk_count = 10
test_string = b'a' * 100 * 1024
n = await server.nodes.objects.add_variable(1, "MyLimitVariable", test_string)
await n.set_writable(True)
client = Client(server.endpoint.geturl())
# This should trigger a connection error because the message is to large
async with client: async with client:
test_string = 'a' * (1024 * 1024 * 1024)
n = client.get_node(n.nodeid) n = client.get_node(n.nodeid)
await n.write_value(test_string, ua.VariantType.String) await n.write_value(test_string, ua.VariantType.ByteString)
with pytest.raises(ConnectionError):
await n.read_value()
async def test_message_limits_works(restore_transport_limits_server: Server): async def test_message_limits_works(restore_transport_limits_server: Server):
server = restore_transport_limits_server server = restore_transport_limits_server
server.bserver.limits.max_recv_buffer = 1024 # server.bserver.limits.max_recv_buffer = 1024
server.bserver.limits.max_send_buffer = 1024 server.bserver.limits.max_send_buffer = 1024
server.bserver.limits.max_chunk_count = 10 server.bserver.limits.max_chunk_count = 10
n = await server.nodes.objects.add_variable(1, "MyLimitVariable2", "t") n = await server.nodes.objects.add_variable(1, "MyLimitVariable2", "t")
...@@ -705,6 +724,7 @@ async def test_message_limits_works(restore_transport_limits_server: Server): ...@@ -705,6 +724,7 @@ async def test_message_limits_works(restore_transport_limits_server: Server):
n = client.get_node(n.nodeid) n = client.get_node(n.nodeid)
test_string = 'a' * (1024 * 5) test_string = 'a' * (1024 * 5)
await n.write_value(test_string, ua.VariantType.String) await n.write_value(test_string, ua.VariantType.String)
await n.read_value()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment