Commit aaccc985 authored by oroulet's avatar oroulet

port and check client_to_prosys.py example

parent d0e49b1e
import sys
import asyncio
sys.path.insert(0, "..")
import time
import logging
from opcua import Client
......@@ -10,7 +10,7 @@ from opcua import ua
class SubHandler(object):
"""
Client to subscription. It will receive events from server
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
......@@ -20,20 +20,15 @@ class SubHandler(object):
print("Python: New event", event)
if __name__ == "__main__":
#from IPython import embed
logging.basicConfig(level=logging.DEBUG)
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
#client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/")
#client.set_security_string("Basic256Sha256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
try:
client.connect()
root = client.get_root_node()
print("Root is", root)
print("childs of root are: ", root.get_children())
print("name of root is", root.get_browse_name())
objects = client.get_objects_node()
print("childs og objects are: ", objects.get_children())
async def main():
url = "opc.tcp://localhost:53530/OPCUA/SimulationServer/"
# url = "opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/"
async with Client(url=url) as client:
print("Root children are", await client.nodes.root.get_children())
myfloat = client.get_node("ns=4;s=Float")
mydouble = client.get_node("ns=4;s=Double")
myint64 = client.get_node("ns=4;s=Int64")
......@@ -43,25 +38,38 @@ if __name__ == "__main__":
var = client.get_node(ua.NodeId("Random1", 5))
print("var is: ", var)
print("value of var is: ", var.get_value())
var.set_value(ua.Variant([23], ua.VariantType.Double))
print("value of var is: ", await var.get_value())
await var.set_value(ua.Variant([23], ua.VariantType.Double))
print("setting float value")
myfloat.set_value(ua.Variant(1.234, ua.VariantType.Float))
print("reading float value: ", myfloat.get_value())
await myfloat.set_value(ua.Variant(1.234, ua.VariantType.Float))
print("reading float value: ", await myfloat.get_value())
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(var)
device = objects.get_child(["2:MyObjects", "2:MyDevice"])
method = device.get_child("2:MyMethod")
result = device.call_method(method, ua.Variant("sin"), ua.Variant(180, ua.VariantType.Double))
device = await client.nodes.objects.get_child(["2:MyObjects", "2:MyDevice"])
method = await device.get_child("2:MyMethod")
result = await device.call_method(method, ua.Variant("sin"), ua.Variant(180, ua.VariantType.Double))
print("Mehtod result is: ", result)
#embed()
time.sleep(3)
sub.unsubscribe(handle)
sub.delete()
#client.close_session()
finally:
client.disconnect()
handler = SubHandler()
sub = await client.create_subscription(500, handler)
handle = await sub.subscribe_data_change(var)
handle2 = await sub.subscribe_events(evtypes=2788)
cond = await client.nodes.root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "0:ConditionType"])
for _ in range(5):
# refresh server condition to force generation of events
await cond.call_method("0:ConditionRefresh", ua.Variant(sub.subscription_id, ua.VariantType.UInt32))
await asyncio.sleep(1)
await sub.unsubscribe(handle)
await sub.unsubscribe(handle2)
await sub.delete()
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()
import sys
sys.path.insert(0, "..")
import logging
from opcua import Client
from opcua import ua
from IPython import embed
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val)
def event_notification(self, event):
print("Python: New event", event.EventType)
if __name__ == "__main__":
#from IPython import embed
logging.basicConfig(level=logging.WARN)
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
#client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/")
try:
client.connect()
root = client.get_root_node()
print("Root is", root)
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_events(evtype=2788)
# refresh server condition to force generation of events
cond = root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "0:ConditionType"])
cond.call_method("0:ConditionRefresh", ua.Variant(sub.subscription_id, ua.VariantType.UInt32))
embed()
finally:
client.disconnect()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment