Commit 1617be87 authored by olivier R-D's avatar olivier R-D

implement filtering in server side find_servers

parent dd5c347b
......@@ -89,9 +89,9 @@ class Client(object):
self.secure_channel_timeout = self.default_timeout
self.session_timeout = self.default_timeout
self._policy_ids = []
self.server_certificate = ""
self.client_certificate = ""
self.private_key = ""
self.server_certificate = b""
self.client_certificate = b""
self.private_key = b""
self.bclient = BinaryClient(timeout)
self._nonce = None
self._session_counter = 1
......
......@@ -81,6 +81,8 @@ class InternalServer(object):
def start(self):
self.logger.info("starting internal server")
for edp in self.endpoints:
self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
self.loop.start()
Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0)
Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.now())
......@@ -117,11 +119,15 @@ class InternalServer(object):
return self.endpoints[:]
def find_servers(self, params):
#FIXME: implement filtering from parmams.uri
if not params.ServerUris:
return [desc.Server for desc in self._known_servers.values()]
servers = []
for edp in self.endpoints:
servers.append(edp.Server)
return servers + [desc.Server for desc in self._known_servers.values()]
for serv in self._known_servers.values():
for uri in params.ServerUris:
if serv.Server.ApplicationUri.startswith(uri):
servers.append(serv.Server)
break
return servers
def register_server(self, server, conf=None):
appdesc = ua.ApplicationDescription()
......@@ -131,6 +137,7 @@ class InternalServer(object):
appdesc.ApplicationType = server.ServerType
appdesc.GatewayServerUri = server.GatewayServerUri
appdesc.DiscoveryUrls = server.DiscoveryUrls # FIXME: select discovery uri using reachability from client network
print("Registring new server: ", server.ServerUri)
self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
def register_server2(self, params):
......@@ -189,7 +196,7 @@ class InternalSession(object):
def activate_session(self, params):
self.logger.info("activate session")
result = ua.ActivateSessionResult()
if not self.state == SessionState.Created:
if self.state != SessionState.Created:
raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
result.ServerNonce = self.nonce
for _ in params.ClientSoftwareCertificates:
......
......@@ -56,7 +56,7 @@ class Server(object):
self.default_timeout = 3600000
self.iserver = InternalServer()
self.bserver = None
self._discovery_client = None
self._discovery_clients = {}
self._discovery_period = 60
# setup some expected values
......@@ -92,22 +92,33 @@ class Server(object):
params.ServerUris = uris
return self.iserver.find_servers(params)
def register_to_discovery(self, url, period=60):
def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
"""
Register to a OPC-UA Discovery server. Registering must be renewed at
Register to an OPC-UA Discovery server. Registering must be renewed at
least every 10 minutes, so this method will use our asyncio thread to
re-register every period seconds
"""
if url in self._discovery_clients:
self._discovery_clients[url].disconnect()
self._discovery_clients[url] = Client(url)
self._discovery_clients[url].connect()
self._discovery_clients[url].register_server(self)
self._discovery_period = period
self._discovery_client = Client(url)
self._discovery_client.connect()
self.iserver.loop.call_soon(self._renew_registration)
def _renew_registration(self):
if self._discovery_client:
self._discovery_client.register_server(self)
for client in self._discovery_clients.values():
client.register_server(self)
self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
"""
Create a client to discovery server and return it
"""
client = Client(url)
client.connect()
return client
def allow_remote_admin(self, allow):
"""
Enable or disable the builtin Admin user from network clients
......@@ -165,8 +176,8 @@ class Server(object):
"""
Start to listen on network
"""
self.iserver.start()
self._setup_server_nodes()
self.iserver.start()
self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
self.bserver.start()
......@@ -174,8 +185,8 @@ class Server(object):
"""
Stop server
"""
if self._discovery_client:
self._discovery_client.disconnect()
for client in self._discovery_clients.values():
client.disconnect()
self.bserver.stop()
self.iserver.stop()
......
......@@ -926,16 +926,50 @@ class TestServer(unittest.TestCase, CommonTests):
def test_discovery(self):
client = Client(self.discovery.endpoint.geturl())
client.connect()
servers = client.find_servers()
new_app_uri = "urn:freeopcua:python:server::test_discovery"
self.srv.application_uri = new_app_uri
self.srv.register_to_discovery(self.discovery.endpoint.geturl(), 1)
time.sleep(0.1) # let server register registration
new_servers = client.find_servers()
self.assertEqual(len(new_servers) - len(servers) , 1)
self.assertFalse(new_app_uri in [s.ApplicationUri for s in servers])
self.assertTrue(new_app_uri in [s.ApplicationUri for s in new_servers])
client.disconnect()
try:
servers = client.find_servers()
new_app_uri = "urn:freeopcua:python:server:test_discovery"
self.srv.application_uri = new_app_uri
self.srv.register_to_discovery(self.discovery.endpoint.geturl(), 1)
time.sleep(0.1) # let server register registration
new_servers = client.find_servers()
self.assertEqual(len(new_servers) - len(servers) , 1)
self.assertFalse(new_app_uri in [s.ApplicationUri for s in servers])
self.assertTrue(new_app_uri in [s.ApplicationUri for s in new_servers])
finally:
client.disconnect()
def test_find_servers2(self):
client = Client(self.discovery.endpoint.geturl())
client.connect()
try:
servers = client.find_servers()
new_app_uri1 = "urn:freeopcua:python:server:test_discovery1"
self.srv.application_uri = new_app_uri1
self.srv.register_to_discovery(self.discovery.endpoint.geturl())
new_app_uri2 = "urn:freeopcua:python:test_discovery2"
self.srv.application_uri = new_app_uri2
self.srv.register_to_discovery(self.discovery.endpoint.geturl())
time.sleep(0.1) # let server register registration
new_servers = client.find_servers()
self.assertEqual(len(new_servers) - len(servers) , 2)
self.assertFalse(new_app_uri1 in [s.ApplicationUri for s in servers])
self.assertFalse(new_app_uri2 in [s.ApplicationUri for s in servers])
self.assertTrue(new_app_uri1 in [s.ApplicationUri for s in new_servers])
self.assertTrue(new_app_uri2 in [s.ApplicationUri for s in new_servers])
# now do a query with filer
new_servers = client.find_servers(["urn:freeopcua:python:server"])
self.assertEqual(len(new_servers) - len(servers) , 0)
self.assertTrue(new_app_uri1 in [s.ApplicationUri for s in new_servers])
self.assertFalse(new_app_uri2 in [s.ApplicationUri for s in new_servers])
# now do a query with filer
new_servers = client.find_servers(["urn:freeopcua:python"])
self.assertEqual(len(new_servers) - len(servers) , 2)
self.assertTrue(new_app_uri1 in [s.ApplicationUri for s in new_servers])
self.assertTrue(new_app_uri2 in [s.ApplicationUri for s in new_servers])
finally:
client.disconnect()
"""
# not sure if this test is necessary, and there is a lot repetition with previous test
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment