Commit b48b7e3d authored by oroulet's avatar oroulet Committed by oroulet

always pass explicitly ThreadLoop in sync API, this is much much safer

parent f6a9b763
......@@ -13,5 +13,4 @@ install:
- pip install pytest-asyncio
- pip install cryptography
# command to run tests
#script: pytest -v
script: pytest -v
script: pytest -v -s
......@@ -39,7 +39,7 @@ class Client:
Each request sent to the server expects an answer within this
time. The timeout is specified in seconds.
"""
self.logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)
self.loop = loop or asyncio.get_event_loop()
self.server_url = urlparse(url)
# take initial username and password from the url
......@@ -71,6 +71,10 @@ class Client:
async def __aexit__(self, exc_type, exc_value, traceback):
await self.disconnect()
def __str__(self):
return f"Client({self.server_url.geturl()})"
__repr__ = __str__
@staticmethod
def find_endpoint(endpoints, security_mode, policy_uri):
"""
......@@ -353,10 +357,10 @@ class Client:
while True:
# 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
await asyncio.sleep(duration)
self.logger.debug("renewing channel")
_logger.debug("renewing channel")
await self.open_secure_channel(renew=True)
val = await self.nodes.server_state.get_value()
self.logger.debug("server state is: %s ", val)
_logger.debug("server state is: %s ", val)
except asyncio.CancelledError:
pass
......@@ -432,7 +436,7 @@ class Client:
# then the password only contains UTF-8 encoded password
# and EncryptionAlgorithm is null
if self._password:
self.logger.warning("Sending plain-text password")
_logger.warning("Sending plain-text password")
params.UserIdentityToken.Password = password.encode("utf8")
params.UserIdentityToken.EncryptionAlgorithm = None
elif self._password:
......@@ -464,7 +468,7 @@ class Client:
return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
def get_objects_node(self):
self.logger.info("get_objects_node")
_logger.info("get_objects_node")
return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
def get_server_node(self):
......
......@@ -493,8 +493,10 @@ class MethodService:
async def _run_method(self, func, parent, *args):
if asyncio.iscoroutinefunction(func):
self.logger.warning("func %s is a coroutine, awaiting with args: %s", func, args)
return await func(parent, *args)
p = partial(func, parent, *args)
self.logger.warning("func %s is a sync function, awaiting in executor %s with args: %s", func, self._pool, args)
res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
return res
......
......@@ -74,7 +74,7 @@ class Server:
def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None):
self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
self.logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)
self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
self._application_uri = "urn:freeopcua:python:server"
self.product_uri = "urn:freeopcua.github.io:python:server"
......@@ -260,11 +260,11 @@ class Server:
if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
if not (self.certificate and self.iserver.private_key):
self.logger.warning("Endpoints other than open requested but private key and certificate are not set.")
_logger.warning("Endpoints other than open requested but private key and certificate are not set.")
return
if ua.SecurityPolicyType.NoSecurity in self._security_policy:
self.logger.warning(
_logger.warning(
"Creating an open endpoint to the server, although encrypted endpoints are enabled.")
if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
......@@ -333,8 +333,11 @@ class Server:
self.bserver.set_policies(self._policies)
await self.bserver.start()
except Exception as exp:
_logger.exception("%s error starting server", self)
await self.iserver.stop()
raise exp
else:
_logger.debug("%s server started", self)
async def stop(self):
"""
......@@ -344,6 +347,7 @@ class Server:
await asyncio.wait([client.disconnect() for client in self._discovery_clients.values()])
await self.bserver.stop()
await self.iserver.stop()
_logger.debug("%s Internal server stopped, everything closed", self)
def get_root_node(self):
"""
......
......@@ -14,6 +14,10 @@ from asyncua.common import subscription, shortcuts
logger = logging.getLogger(__name__)
class ThreadLoopNotRunning(Exception):
pass
class ThreadLoop(Thread):
def __init__(self):
Thread.__init__(self)
......@@ -27,54 +31,30 @@ class ThreadLoop(Thread):
def run(self):
self.loop = asyncio.new_event_loop()
logger.debug("Threadloop: %s", self.loop)
self.loop.call_soon_threadsafe(self._notify_start)
self.loop.run_forever()
def _notify_start(self):
with self._cond:
self._cond.notify_all()
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def post(self, coro):
if not self.loop or not self.loop.is_running():
raise ThreadLoopNotRunning(f"could not post {coro}")
futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
return futur.result()
def __enter__(self):
self.start()
return self
#@ipcmethod
_ref_count = 0
_tloop = None
def start_thread_loop():
global _tloop
_tloop = ThreadLoop()
_tloop.start()
return _tloop
def stop_thread_loop():
global _tloop
_tloop.stop()
_tloop.join()
def get_thread_loop():
global _tloop
if _tloop is None:
start_thread_loop()
global _ref_count
_ref_count += 1
return _tloop
def release_thread_loop():
global _tloop
if _tloop is None:
return
global _ref_count
if _ref_count == 0:
_ref_count -= 1
stop_thread_loop()
def __exit__(self, exc_t, exc_v, trace):
self.stop()
self.join()
def syncmethod(func):
......@@ -83,65 +63,118 @@ def syncmethod(func):
for idx, arg in enumerate(args):
if isinstance(arg, Node):
args[idx] = arg.aio_obj
for k, v in kwargs.items():
if isinstance(v, Node):
kwargs[k] = v.aio_obj
aio_func = getattr(self.aio_obj, func.__name__)
global _tloop
result = _tloop.post(aio_func(*args, **kwargs))
result = self.tloop.post(aio_func(*args, **kwargs))
if isinstance(result, node.Node):
return Node(result)
return Node(self.tloop, result)
if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
return [Node(i) for i in result]
return [Node(self.tloop, i) for i in result]
if isinstance(result, server.event_generator.EventGenerator):
return EventGenerator(result)
if isinstance(result, subscription.Subscription):
return Subscription(result)
return Subscription(self.tloop, result)
return result
return wrapper
class _SubHandler:
def __init__(self, tloop, sync_handler):
self.tloop = tloop
self.sync_handler = sync_handler
def datachange_notification(self, node, val, data):
self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
def event_notification(self, event):
self.sync_handler.event_notification(event)
class Client:
def __init__(self, url: str, timeout: int = 4):
global _tloop
self.aio_obj = client.Client(url, timeout, loop=_tloop.loop)
self.nodes = Shortcuts(self.aio_obj.uaclient)
def __init__(self, url: str, timeout: int = 4, tloop=None):
self.tloop = tloop
self.close_tloop = False
if not self.tloop:
self.tloop = ThreadLoop()
self.tloop.start()
self.close_tloop = True
self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
def __str__(self):
return "Sync" + self.aio_obj.__str__()
__repr__ = __str__
@syncmethod
def connect(self):
pass
@syncmethod
def disconnect(self):
pass
self.tloop.post(self.aio_obj.disconnect())
if self.close_tloop:
self.tloop.stop()
@syncmethod
def load_type_definitions(self, nodes=None):
pass
@syncmethod
async def create_subscription(self, period, handler):
def load_enums(self):
pass
def create_subscription(self, period, handler):
coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
aio_sub = self.tloop.post(coro)
return Subscription(self.tloop, aio_sub)
@syncmethod
def get_namespace_index(self, url):
pass
def get_node(self, nodeid):
return Node(self.aio_obj.get_node(nodeid))
return Node(self.tloop, self.aio_obj.get_node(nodeid))
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.disconnect()
class Shortcuts:
def __init__(self, aio_server):
def __init__(self, tloop, aio_server):
self.tloop = tloop
self.aio_obj = shortcuts.Shortcuts(aio_server)
for k, v in self.aio_obj.__dict__.items():
setattr(self, k, Node(v))
setattr(self, k, Node(self.tloop, v))
class Server:
def __init__(self, shelf_file=None):
global _tloop
self.aio_obj = server.Server(loop=_tloop.loop)
_tloop.post(self.aio_obj.init(shelf_file))
self.nodes = Shortcuts(self.aio_obj.iserver.isession)
def __init__(self, shelf_file=None, tloop=None):
self.tloop = tloop
self.close_tloop = False
if not self.tloop:
self.tloop = ThreadLoop()
self.tloop.start()
self.close_tloop = True
self.aio_obj = server.Server(loop=self.tloop.loop)
self.tloop.post(self.aio_obj.init(shelf_file))
self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
def __str__(self):
return "Sync" + self.aio_obj.__str__()
__repr__ = __str__
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
def set_endpoint(self, url):
return self.aio_obj.set_endpoint(url)
......@@ -163,21 +196,37 @@ class Server:
def start(self):
pass
@syncmethod
def stop(self):
pass
self.tloop.post(self.aio_obj.stop())
if self.close_tloop:
self.tloop.stop()
def link_method(self, node, callback):
return self.aio_obj.link_method(node, callback)
@syncmethod
async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
pass
def get_node(self, nodeid):
return Node(server.Server.get_node(self, nodeid))
return Node(self.tloop, server.Server.get_node(self, nodeid))
@syncmethod
def import_xml(self, path=None, xmlstring=None):
pass
@syncmethod
def get_namespace_index(self, url):
pass
@syncmethod
def load_enums(self):
pass
@syncmethod
def load_type_definitions(self):
pass
def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
......@@ -195,9 +244,16 @@ class EventGenerator:
class Node:
def __init__(self, aio_node):
def __init__(self, tloop, aio_node):
self.aio_obj = aio_node
global _tloop
self.tloop = tloop
def __hash__(self):
return self.aio_obj.__hash__()
def __str__(self):
return "Sync" + self.aio_obj.__str__()
__repr__ = __str__
@property
def nodeid(self):
......@@ -271,7 +327,8 @@ class Node:
class Subscription:
def __init__(self, sub):
def __init__(self, tloop, sub):
self.tloop = tloop
self.aio_obj = sub
@syncmethod
......
......@@ -15,7 +15,7 @@ except ImportError:
shell.interact()
from asyncua.sync import Client, start_thread_loop, stop_thread_loop
from asyncua.sync import Client, ThreadLoop
class SubHandler(object):
......@@ -23,7 +23,7 @@ class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
data_change and event methods are called directly from receiving thread.
Do not do expensive, slow or network operation there. Create another
Do not do expensive, slow or network operation there. Create another
thread if you need to do such a thing
"""
......@@ -38,54 +38,49 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
#logger = logging.getLogger("KeepAlive")
#logger.setLevel(logging.DEBUG)
start_thread_loop()
client = Client("opc.tcp://localhost:4840/freeopcua/server/")
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
try:
client.connect()
client.load_type_definitions() # load definition of server specific structures/extension objects
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
print("Objects node is: ", client.nodes.objects)
# Node objects have methods to read and write node attributes as well as browse or populate address space
print("Children of root are: ", client.nodes.root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# gettting our namespace idx
uri = "http://examples.freeopcua.github.io"
idx = client.get_namespace_index(uri)
# Now getting a variable node using its browse path
myvar = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx), "{}:MyVariable".format(idx)])
obj = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx)])
print("myvar is: ", myvar)
# subscribing to a variable node
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
time.sleep(0.1)
# we can also subscribe to events from server
sub.subscribe_events()
# sub.unsubscribe(handle)
# sub.delete()
# calling a method on server
res = obj.call_method("{}:multiply".format(idx), 3, "klk")
print("method result is: ", res)
embed()
finally:
client.disconnect()
stop_thread_loop()
with ThreadLoop() as tloop:
with Client("opc.tcp://localhost:4840/freeopcua/server/", tloop=tloop) as client:
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
client.load_type_definitions() # load definition of server specific structures/extension objects
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
print("Objects node is: ", client.nodes.objects)
# Node objects have methods to read and write node attributes as well as browse or populate address space
print("Children of root are: ", client.nodes.root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# gettting our namespace idx
uri = "http://examples.freeopcua.github.io"
idx = client.get_namespace_index(uri)
# Now getting a variable node using its browse path
myvar = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx), "{}:MyVariable".format(idx)])
obj = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx)])
print("myvar is: ", myvar)
# subscribing to a variable node
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
time.sleep(0.1)
# we can also subscribe to events from server
sub.subscribe_events()
# sub.unsubscribe(handle)
# sub.delete()
# calling a method on server
res = obj.call_method("{}:multiply".format(idx), 3, "klk")
print("method result is: ", res)
embed()
......@@ -2,17 +2,12 @@ import sys
sys.path.insert(0, "../..")
from asyncua.sync import Client, start_thread_loop, stop_thread_loop
from asyncua.sync import Client
if __name__ == "__main__":
start_thread_loop()
client = Client("opc.tcp://localhost:4840/freeopcua/server/")
with Client("opc.tcp://localhost:4840/freeopcua/server/") as client:
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
try:
client.connect()
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
# Node objects have methods to read and write node attributes as well as browse or populate address space
......@@ -35,7 +30,3 @@ if __name__ == "__main__":
# Stacked myvar access
# print("myvar is: ", root.get_children()[0].get_children()[1].get_variables()[0].get_value())
finally:
client.disconnect()
stop_thread_loop()
......@@ -20,7 +20,7 @@ except ImportError:
from asyncua import ua, uamethod
from asyncua.sync import Server, start_thread_loop, stop_thread_loop
from asyncua.sync import Server, ThreadLoop
class SubHandler(object):
......@@ -82,84 +82,81 @@ if __name__ == "__main__":
# logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.uaprocessor")
# logger.setLevel(logging.DEBUG)
start_thread_loop()
# now setup our server
server = Server()
#server.disable_clock()
#server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_server_name("FreeOpcUa Example Server")
# set all possible endpoint policies for clients to connect through
server.set_security_policy([
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256Sha256_Sign])
# setup our own namespace
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
# create a new node type we can instantiate in our address space
dev = server.nodes.base_object_type.add_object_type(idx, "MyDevice")
dev.add_variable(idx, "sensor1", 1.0).set_modelling_rule(True)
dev.add_property(idx, "device_id", "0340").set_modelling_rule(True)
ctrl = dev.add_object(idx, "controller")
ctrl.set_modelling_rule(True)
ctrl.add_property(idx, "state", "Idle").set_modelling_rule(True)
# populating our address space
# First a folder to organise our nodes
myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
# instanciate one instance of our device
mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
mydevice_var = mydevice.get_child([f"{idx}:controller", "{idx}:state"]) # get proxy to our device state variable
# create directly some objects and variables
myobj = server.nodes.objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
myvar.set_writable() # Set MyVariable to be writable by clients
mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
mystringvar.set_writable() # Set MyVariable to be writable by clients
mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
mydtvar.set_writable() # Set MyVariable to be writable by clients
myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
myprop = myobj.add_property(idx, "myproperty", "I am a property")
mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
# import some nodes from xml
server.import_xml("custom_nodes.xml")
# creating a default event object
# The event object automatically will have members for all events properties
# you probably want to create a custom event type, see other examples
myevgen = server.get_event_generator()
myevgen.event.Severity = 300
# starting!
server.start()
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
vup = VarUpdater(mysin) # just a stupide class update a variable
vup.start()
try:
# enable following if you want to subscribe to nodes on server side
#handler = SubHandler()
#sub = server.create_subscription(500, handler)
#handle = sub.subscribe_data_change(myvar)
# trigger event, all subscribed clients wil receive it
var = myarrayvar.get_value() # return a ref to value in db server side! not a copy!
var = copy.copy(var) # WARNING: we need to copy before writting again otherwise no data change event will be generated
var.append(9.3)
myarrayvar.set_value(var)
mydevice_var.set_value("Running")
myevgen.trigger(message="This is BaseEvent")
server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9)) # Server side write method which is a but faster than using set_value
embed()
finally:
vup.stop()
server.stop()
stop_thread_loop()
with ThreadLoop() as tloop:
# now setup our server
server = Server(tloop=tloop)
#server.disable_clock()
#server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_server_name("FreeOpcUa Example Server")
# set all possible endpoint policies for clients to connect through
server.set_security_policy([
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256Sha256_Sign])
# setup our own namespace
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
print("IDX", idx)
# create a new node type we can instantiate in our address space
dev = server.nodes.base_object_type.add_object_type(idx, "MyDevice")
dev.add_variable(idx, "sensor1", 1.0).set_modelling_rule(True)
dev.add_property(idx, "device_id", "0340").set_modelling_rule(True)
ctrl = dev.add_object(idx, "controller")
ctrl.set_modelling_rule(True)
ctrl.add_property(idx, "state", "Idle").set_modelling_rule(True)
# populating our address space
# First a folder to organise our nodes
myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
# instanciate one instance of our device
mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
mydevice_var = mydevice.get_child([f"{idx}:controller", f"{idx}:state"]) # get proxy to our device state variable
# create directly some objects and variables
myobj = server.nodes.objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
myvar.set_writable() # Set MyVariable to be writable by clients
mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
mystringvar.set_writable() # Set MyVariable to be writable by clients
mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
mydtvar.set_writable() # Set MyVariable to be writable by clients
myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
myprop = myobj.add_property(idx, "myproperty", "I am a property")
mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
# import some nodes from xml
server.import_xml("custom_nodes.xml")
# creating a default event object
# The event object automatically will have members for all events properties
# you probably want to create a custom event type, see other examples
myevgen = server.get_event_generator()
myevgen.event.Severity = 300
# starting!
with server:
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
vup = VarUpdater(mysin) # just a stupide class update a variable
vup.start()
# enable following if you want to subscribe to nodes on server side
#handler = SubHandler()
#sub = server.create_subscription(500, handler)
#handle = sub.subscribe_data_change(myvar)
# trigger event, all subscribed clients wil receive it
var = myarrayvar.get_value() # return a ref to value in db server side! not a copy!
var = copy.copy(var) # WARNING: we need to copy before writting again otherwise no data change event will be generated
var.append(9.3)
myarrayvar.set_value(var)
mydevice_var.set_value("Running")
myevgen.trigger(message="This is BaseEvent")
server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9)) # Server side write method which is a but faster than using set_value
embed()
vup.stop()
......@@ -3,12 +3,10 @@ sys.path.insert(0, "../..")
import time
from asyncua.sync import Server, start_thread_loop, stop_thread_loop
from asyncua.sync import Server
if __name__ == "__main__":
start_thread_loop()
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
......@@ -24,7 +22,7 @@ if __name__ == "__main__":
# starting!
server.start()
try:
count = 0
while True:
......@@ -34,4 +32,3 @@ if __name__ == "__main__":
finally:
#close connection, remove subcsriptions, etc
server.stop()
stop_thread_loop()
import time
from concurrent.futures import Future
import pytest
from asyncua.sync import Client, start_thread_loop, stop_thread_loop, Server
from asyncua import ua
from asyncua.sync import Client, Server, ThreadLoop, Node
from asyncua import ua, uamethod
@uamethod
def divide(parent, x, y):
return x / y
@pytest.fixture
def tloop():
with ThreadLoop() as tl:
tl.loop.set_debug(True)
yield tl
@pytest.fixture
def server():
s = Server()
def server(tloop):
s = Server(tloop=tloop)
s.disable_clock(True)
s.set_endpoint('opc.tcp://0.0.0.0:8840/freeopcua/server/')
uri = "http://examples.freeopcua.github.io"
idx = s.register_namespace(uri)
myobj = s.nodes.objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
s.start()
yield s
s.stop()
meth = s.nodes.objects.add_method(idx, "Divide", divide, [ua.VariantType.Float, ua.VariantType.Float], [ua.VariantType.Float])
with s:
yield s
@pytest.fixture
def tloop():
t_loop = start_thread_loop()
yield t_loop
stop_thread_loop()
def client(tloop, server):
c = Client("opc.tcp://admin@localhost:8840/freeopcua/server", tloop=tloop)
with c:
yield c
@pytest.fixture
def client(tloop, server):
c = Client("opc.tcp://localhost:8840/freeopcua/server")
c.connect()
yield c
c.disconnect()
def client_no_tloop(server):
with Client("opc.tcp://admin@localhost:8840/freeopcua/server") as c:
yield c
def test_sync_client(client):
client.load_type_definitions()
@pytest.fixture
def idx(client):
uri = "http://examples.freeopcua.github.io"
idx = client.get_namespace_index(uri)
i = client.get_namespace_index(uri)
return i
def test_sync_client(client, idx):
client.load_type_definitions()
myvar = client.nodes.root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])
assert myvar.get_value() == 6.7
......@@ -47,6 +64,54 @@ def test_sync_get_node(client):
node = client.get_node(85)
assert node == client.nodes.objects
nodes = node.get_children()
assert len(nodes) == 2
assert len(nodes) > 2
assert nodes[0] == client.nodes.server
assert isinstance(nodes[0], Node)
class MySubHandler():
def __init__(self):
self.future = Future()
def reset(self):
self.future = Future()
def datachange_notification(self, node, val, data):
self.future.set_result((node, val))
def event_notification(self, event):
self.future.set_result(event)
def test_sync_tloop_sub(client_no_tloop):
test_sync_sub(client_no_tloop)
def test_sync_sub(client):
myhandler = MySubHandler()
sub = client.create_subscription(1, myhandler)
var = client.nodes.objects.add_variable(3, 'SubVar', 0.1)
sub.subscribe_data_change(var)
n, v = myhandler.future.result()
assert v == 0.1
assert n == var
myhandler.reset()
var.set_value(0.123)
n, v = myhandler.future.result()
assert v == 0.123
sub.delete()
def test_sync_meth(client, idx):
res = client.nodes.objects.call_method(f"{idx}:Divide", 4, 2)
assert res == 2
with pytest.raises(ua.UaError):
res = client.nodes.objects.call_method(f"{idx}:Divide", 4, 0)
def test_sync_client_no_tl(client_no_tloop, idx):
test_sync_meth(client_no_tloop, idx)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment