Commit c661e9ce authored by oroulet's avatar oroulet

always pass explicitly ThreadLoop in sync API, this is much much safer

parent 69d1c6e3
......@@ -21,8 +21,8 @@ class ThreadLoop(Thread):
self._cond = Condition()
def start(self):
Thread.start(self)
with self._cond:
Thread.start(self)
self._cond.wait()
def run(self):
......@@ -40,51 +40,12 @@ class ThreadLoop(Thread):
def __enter__(self):
self.start()
global _tloop
_tloop = self
return self
def __exit__(self, exc_t, exc_v, trace):
self.stop()
self.join()
#@ipcmethod
_ref_count = 0
_tloop = None
def start_thread_loop():
global _tloop
_tloop = ThreadLoop()
_tloop.start()
return _tloop
def stop_thread_loop():
global _tloop
_tloop.stop()
_tloop.join()
def get_thread_loop():
global _tloop
if _tloop is None:
start_thread_loop()
global _ref_count
_ref_count += 1
return _tloop
def release_thread_loop():
global _tloop
if _tloop is None:
return
global _ref_count
if _ref_count == 0:
_ref_count -= 1
stop_thread_loop()
def syncmethod(func):
def wrapper(self, *args, **kwargs):
......@@ -96,37 +57,42 @@ def syncmethod(func):
if isinstance(v, Node):
kwargs[k] = v.aio_obj
aio_func = getattr(self.aio_obj, func.__name__)
global _tloop
result = _tloop.post(aio_func(*args, **kwargs))
result = self.tloop.post(aio_func(*args, **kwargs))
if isinstance(result, node.Node):
return Node(result)
return Node(self.tloop, result)
if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
return [Node(i) for i in result]
return [Node(self.tloop, i) for i in result]
if isinstance(result, server.event_generator.EventGenerator):
return EventGenerator(result)
if isinstance(result, subscription.Subscription):
return Subscription(result)
return Subscription(self.tloop, result)
return result
return wrapper
class _SubHandler:
def __init__(self, sync_handler):
def __init__(self, tloop, sync_handler):
self.tloop = tloop
self.sync_handler = sync_handler
def datachange_notification(self, node, val, data):
self.sync_handler.datachange_notification(Node(node), val, data)
self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
def event_notification(self, event):
self.sync_handler.event_notification(event)
class Client:
def __init__(self, url: str, timeout: int = 4):
global _tloop
self.aio_obj = client.Client(url, timeout, loop=_tloop.loop)
self.nodes = Shortcuts(self.aio_obj.uaclient)
def __init__(self, url: str, timeout: int = 4, tloop=None):
self.tloop = tloop
self.close_tloop = False
if not self.tloop:
self.tloop = ThreadLoop()
self.tloop.start()
self.close_tloop = True
self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
def __str__(self):
return "Sync" + self.aio_obj.__str__()
......@@ -136,25 +102,30 @@ class Client:
def connect(self):
pass
@syncmethod
def disconnect(self):
pass
self.tloop.post(self.aio_obj.disconnect())
if self.close_tloop:
self.tloop.stop()
@syncmethod
def load_type_definitions(self, nodes=None):
pass
@syncmethod
def load_enums(self):
pass
def create_subscription(self, period, handler):
coro = self.aio_obj.create_subscription(period, _SubHandler(handler))
aio_sub = _tloop.post(coro)
return Subscription(aio_sub)
coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
aio_sub = self.tloop.post(coro)
return Subscription(self.tloop, aio_sub)
@syncmethod
def get_namespace_index(self, url):
pass
def get_node(self, nodeid):
return Node(self.aio_obj.get_node(nodeid))
return Node(self.tloop, self.aio_obj.get_node(nodeid))
def __enter__(self):
self.connect()
......@@ -165,18 +136,24 @@ class Client:
class Shortcuts:
def __init__(self, aio_server):
def __init__(self, tloop, aio_server):
self.tloop = tloop
self.aio_obj = shortcuts.Shortcuts(aio_server)
for k, v in self.aio_obj.__dict__.items():
setattr(self, k, Node(v))
setattr(self, k, Node(self.tloop, v))
class Server:
def __init__(self, shelf_file=None):
global _tloop
self.aio_obj = server.Server(loop=_tloop.loop)
_tloop.post(self.aio_obj.init(shelf_file))
self.nodes = Shortcuts(self.aio_obj.iserver.isession)
def __init__(self, shelf_file=None, tloop=None):
self.tloop = tloop
self.close_tloop = False
if not self.tloop:
self.tloop = ThreadLoop()
self.tloop.start()
self.close_tloop = True
self.aio_obj = server.Server(loop=self.tloop.loop)
self.tloop.post(self.aio_obj.init(shelf_file))
self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
def __str__(self):
return "Sync" + self.aio_obj.__str__()
......@@ -209,9 +186,10 @@ class Server:
def start(self):
pass
@syncmethod
def stop(self):
pass
self.tloop.post(self.aio_obj.stop())
if self.close_tloop:
self.tloop.stop()
def link_method(self, node, callback):
return self.aio_obj.link_method(node, callback)
......@@ -221,7 +199,7 @@ class Server:
pass
def get_node(self, nodeid):
return Node(server.Server.get_node(self, nodeid))
return Node(self.tloop, server.Server.get_node(self, nodeid))
@syncmethod
def import_xml(self, path=None, xmlstring=None):
......@@ -256,9 +234,9 @@ class EventGenerator:
class Node:
def __init__(self, aio_node):
def __init__(self, tloop, aio_node):
self.aio_obj = aio_node
global _tloop
self.tloop = tloop
def __hash__(self):
return self.aio_obj.__hash__()
......@@ -339,7 +317,8 @@ class Node:
class Subscription:
def __init__(self, sub):
def __init__(self, tloop, sub):
self.tloop = tloop
self.aio_obj = sub
@syncmethod
......
......@@ -15,7 +15,7 @@ except ImportError:
shell.interact()
from asyncua.sync import Client, start_thread_loop, stop_thread_loop
from asyncua.sync import Client, ThreadLoop
class SubHandler(object):
......@@ -23,7 +23,7 @@ class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
data_change and event methods are called directly from receiving thread.
Do not do expensive, slow or network operation there. Create another
Do not do expensive, slow or network operation there. Create another
thread if you need to do such a thing
"""
......@@ -38,54 +38,49 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
#logger = logging.getLogger("KeepAlive")
#logger.setLevel(logging.DEBUG)
start_thread_loop()
client = Client("opc.tcp://localhost:4840/freeopcua/server/")
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
try:
client.connect()
client.load_type_definitions() # load definition of server specific structures/extension objects
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
print("Objects node is: ", client.nodes.objects)
# Node objects have methods to read and write node attributes as well as browse or populate address space
print("Children of root are: ", client.nodes.root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# gettting our namespace idx
uri = "http://examples.freeopcua.github.io"
idx = client.get_namespace_index(uri)
# Now getting a variable node using its browse path
myvar = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx), "{}:MyVariable".format(idx)])
obj = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx)])
print("myvar is: ", myvar)
# subscribing to a variable node
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
time.sleep(0.1)
# we can also subscribe to events from server
sub.subscribe_events()
# sub.unsubscribe(handle)
# sub.delete()
# calling a method on server
res = obj.call_method("{}:multiply".format(idx), 3, "klk")
print("method result is: ", res)
embed()
finally:
client.disconnect()
stop_thread_loop()
with ThreadLoop() as tloop:
with Client("opc.tcp://localhost:4840/freeopcua/server/", tloop=tloop) as client:
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
client.load_type_definitions() # load definition of server specific structures/extension objects
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
print("Objects node is: ", client.nodes.objects)
# Node objects have methods to read and write node attributes as well as browse or populate address space
print("Children of root are: ", client.nodes.root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# gettting our namespace idx
uri = "http://examples.freeopcua.github.io"
idx = client.get_namespace_index(uri)
# Now getting a variable node using its browse path
myvar = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx), "{}:MyVariable".format(idx)])
obj = client.nodes.root.get_child(["0:Objects", "{}:MyObject".format(idx)])
print("myvar is: ", myvar)
# subscribing to a variable node
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
time.sleep(0.1)
# we can also subscribe to events from server
sub.subscribe_events()
# sub.unsubscribe(handle)
# sub.delete()
# calling a method on server
res = obj.call_method("{}:multiply".format(idx), 3, "klk")
print("method result is: ", res)
embed()
......@@ -2,33 +2,31 @@ import sys
sys.path.insert(0, "../..")
from asyncua.sync import Client, ThreadLoop
from asyncua.sync import Client
if __name__ == "__main__":
with ThreadLoop():
with Client("opc.tcp://localhost:4840/freeopcua/server/") as client:
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
# Node objects have methods to read and write node attributes as well as browse or populate address space
print("Children of root are: ", client.nodes.root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
myvar = client.nodes.root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
obj = client.nodes.root.get_child(["0:Objects", "2:MyObject"])
print("myvar is: ", myvar)
print("myobj is: ", obj)
# Stacked myvar access
# print("myvar is: ", root.get_children()[0].get_children()[1].get_variables()[0].get_value())
with Client("opc.tcp://localhost:4840/freeopcua/server/") as client:
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
# Node objects have methods to read and write node attributes as well as browse or populate address space
print("Children of root are: ", client.nodes.root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
myvar = client.nodes.root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
obj = client.nodes.root.get_child(["0:Objects", "2:MyObject"])
print("myvar is: ", myvar)
print("myobj is: ", obj)
# Stacked myvar access
# print("myvar is: ", root.get_children()[0].get_children()[1].get_variables()[0].get_value())
......@@ -82,9 +82,9 @@ if __name__ == "__main__":
# logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.uaprocessor")
# logger.setLevel(logging.DEBUG)
with ThreadLoop():
with ThreadLoop() as tloop:
# now setup our server
server = Server()
server = Server(tloop=tloop)
#server.disable_clock()
#server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
......
......@@ -3,12 +3,10 @@ sys.path.insert(0, "../..")
import time
from asyncua.sync import Server, start_thread_loop, stop_thread_loop
from asyncua.sync import Server
if __name__ == "__main__":
start_thread_loop()
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
......@@ -24,7 +22,7 @@ if __name__ == "__main__":
# starting!
server.start()
try:
count = 0
while True:
......@@ -34,4 +32,3 @@ if __name__ == "__main__":
finally:
#close connection, remove subcsriptions, etc
server.stop()
stop_thread_loop()
......@@ -13,8 +13,15 @@ def divide(parent, x, y):
@pytest.fixture
def server():
s = Server()
def tloop():
with ThreadLoop() as tl:
yield tl
@pytest.fixture
def server(tloop):
s = Server(tloop=tloop)
s.disable_clock(True)
s.set_endpoint('opc.tcp://0.0.0.0:8840/freeopcua/server/')
uri = "http://examples.freeopcua.github.io"
idx = s.register_namespace(uri)
......@@ -27,15 +34,15 @@ def server():
@pytest.fixture
def tloop():
with ThreadLoop() as t_loop:
yield t_loop
def client(tloop, server):
c = Client("opc.tcp://admin@localhost:8840/freeopcua/server", tloop=tloop)
with c:
yield c
@pytest.fixture
def client(tloop, server):
c = Client("opc.tcp://admin@localhost:8840/freeopcua/server")
with c:
def client_no_tloop(server):
with Client("opc.tcp://admin@localhost:8840/freeopcua/server") as c:
yield c
......@@ -76,6 +83,10 @@ class MySubHandler():
self.future.set_result(event)
def test_sync_tloop_sub(client_no_tloop):
test_sync_sub(client_no_tloop)
def test_sync_sub(client):
myhandler = MySubHandler()
sub = client.create_subscription(1, myhandler)
......@@ -98,3 +109,8 @@ def test_sync_meth(client, idx):
res = client.nodes.objects.call_method(f"{idx}:Divide", 4, 0)
def test_sync_client_no_tl(client_no_tloop, idx):
test_sync_meth(client_no_tloop, idx)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment