Commit c0c8d166 authored by Ivan Tyagov's avatar Ivan Tyagov

Better introspection.

parent 4ed51799
...@@ -5,18 +5,18 @@ OPCUA to ERP5 OPCUA Document setter / getter glue script. ...@@ -5,18 +5,18 @@ OPCUA to ERP5 OPCUA Document setter / getter glue script.
import json import json
from erp5.component.module.Log import log from erp5.component.module.Log import log
def convertNodeValue(node_id, node_value): def convertNodeValue(node_id, node_value, type_data):
""" """
Based on node_id convert to proper Python type. Based on this format: Based on type_data convert to proper Python type.
ns=<namespaceindex>;<type>=<value>
XXX: handle rest of datatypes!
""" """
node_type = node_id.split(";")[1] if "VariantType=<VariantType.String:" in type_data:
if node_type.startswith("s"): # string
node_value = str(node_value) node_value = str(node_value)
if node_type.startswith("i"): elif "VariantType=<VariantType.Float:" in type_data:
node_value = float(node_value)
elif "VariantType=<VariantType.Int64:" in type_data:
node_value = int(node_value) node_value = int(node_value)
return node_value return node_value
# XXX: what should be the OPCUA container? # XXX: what should be the OPCUA container?
...@@ -49,11 +49,14 @@ elif http_method == "POST": ...@@ -49,11 +49,14 @@ elif http_method == "POST":
# log only set requests for now # log only set requests for now
node_id = data['node'] node_id = data['node']
node_value = data['val'] node_value = data['val']
# introspect type of the value
type_data = data["data"]
# set to "ERP5 OPCUA Document" but only if it's not a None value # set to "ERP5 OPCUA Document" but only if it's not a None value
if node_value is not None and node_value != "None": if node_value is not None and node_value != "None":
log("Set '%s' = '%s' to %s as %s" %(node_id, node_value, default_opcua_document, node_id)) log("Set '%s' = '%s' to %s as %s" %(node_id, node_value, default_opcua_document, node_id))
node_dict = default_opcua_document.getNodeDict() node_dict = default_opcua_document.getNodeDict()
node_dict[node_id] = convertNodeValue(node_id, node_value) node_dict[node_id] = convertNodeValue(node_id, node_value, type_data)
default_opcua_document.setNodeDict(node_dict) default_opcua_document.setNodeDict(node_dict)
log("Changed = %s" %default_opcua_document.getNodeDict()) log("Changed = %s" %default_opcua_document.getNodeDict())
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
import json import json
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
class TestDataInestOPCUADocumentgestion(ERP5TypeTestCase): class TestOPCUADocumentDataInestgestion(ERP5TypeTestCase):
""" """
Test Class for OPC UA Document implementation. Test Class for OPC UA Document implementation.
...@@ -60,8 +60,10 @@ class TestDataInestOPCUADocumentgestion(ERP5TypeTestCase): ...@@ -60,8 +60,10 @@ class TestDataInestOPCUADocumentgestion(ERP5TypeTestCase):
# test setting over a client # test setting over a client
node_id = "n1=1;i=810" node_id = "n1=1;i=810"
node_val = 10 node_val = 10
type_data = "DataChangeNotification(<asyncua.common.subscription.SubscriptionItemData object at 0x7f1761723a60>, MonitoredItemNotification(ClientHandle=202, Value=DataValue(Value=Variant(Value=1112, VariantType=<VariantType.Int64: 8>, Dimensions=None, is_array=False), StatusCode_=StatusCode(value=0), SourceTimestamp=None, ServerTimestamp=datetime.datetime(2023, 9, 5, 13, 23, 39, 285445), SourcePicoseconds=None, ServerPicoseconds=None)))"
opcua_structure = {"node": node_id, opcua_structure = {"node": node_id,
"val":node_val} "val":node_val,
"data": type_data}
result_json = self.portal.ERP5Site_handleOPCUARequest(http_method="POST", result_json = self.portal.ERP5Site_handleOPCUARequest(http_method="POST",
default_opcua_document=default_opcua_document, default_opcua_document=default_opcua_document,
data = json.dumps(opcua_structure)) data = json.dumps(opcua_structure))
...@@ -78,7 +80,8 @@ class TestDataInestOPCUADocumentgestion(ERP5TypeTestCase): ...@@ -78,7 +80,8 @@ class TestDataInestOPCUADocumentgestion(ERP5TypeTestCase):
# test UPDATE # test UPDATE
node_val = 100 node_val = 100
opcua_structure = {"node": node_id, opcua_structure = {"node": node_id,
"val":node_val} "val":node_val,
"data": type_data}
result_json = self.portal.ERP5Site_handleOPCUARequest(http_method="POST", result_json = self.portal.ERP5Site_handleOPCUARequest(http_method="POST",
default_opcua_document=default_opcua_document, default_opcua_document=default_opcua_document,
data = json.dumps(opcua_structure)) data = json.dumps(opcua_structure))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment