Commit ada17a05 authored by olivier R-D's avatar olivier R-D

more work on uals

parent 78103830
import logging
import sys
import argparse
from opcua import ua, Client
from datetime import datetime
import code
from enum import Enum
from datetime import datetime
from opcua import ua
from opcua import Client
from opcua import Node
def add_minimum_args(parser):
......@@ -240,9 +243,9 @@ def uals():
parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
add_common_args(parser)
parser.add_argument("-l",
dest="print_format",
dest="long_format",
#action="store_true",
default=1,
const=3,
nargs="?",
type=int,
help="use a long listing format")
......@@ -253,6 +256,8 @@ def uals():
help="Browse depth")
args = parser.parse_args()
if args.long_format is None:
args.long_format = 1
logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
client = Client(args.url, timeout=args.timeout)
......@@ -262,52 +267,63 @@ def uals():
if args.path:
node = node.get_child(args.path.split(","))
print("Browsing node {} at {}\n".format(node, args.url))
if args.print_format == 0:
print("{:30} {:25}".format("DisplayName", "NodeId"))
print("")
elif args.print_format == 1:
print("{:30} {:25} {:25} {:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
print("")
_lsprint(client, node.nodeid, args.print_format, args.depth - 1)
if args.long_format == 0:
_lsprint_0(node, args.depth - 1)
elif args.long_format == 1:
_lsprint_1(node, args.depth - 1)
else:
_lsprint_long(node, args.depth - 1)
finally:
client.disconnect()
sys.exit(0)
print(args)
def _lsprint(client, nodeid, print_format, depth, indent=""):
indent += " "
pnode = client.get_node(nodeid)
if print_format in (0, 1):
for desc in pnode.get_children_descriptions():
if print_format == 0:
print("{:30} {:25}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string()))
else:
if desc.NodeClass == ua.NodeClass.Variable:
val = client.get_node(desc.NodeId).get_value()
print("{:30}, {!s:25}, {!s:25}, {!s:3}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string(), indent + desc.BrowseName.to_string(), indent + str(val)))
else:
print("{:30}, {!s:25}, {!s:25}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string(), indent + desc.BrowseName.to_string()))
if depth:
_lsprint(client, desc.NodeId, print_format, depth - 1, indent)
else:
for node in pnode.get_children():
attrs = node.get_attributes([ua.AttributeIds.DisplayName,
ua.AttributeIds.BrowseName,
ua.AttributeIds.NodeClass,
ua.AttributeIds.WriteMask,
ua.AttributeIds.UserWriteMask,
ua.AttributeIds.DataType,
ua.AttributeIds.Value])
name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
update = attrs[-1].ServerTimestamp
if nclass == ua.NodeClass.Variable:
print("{} {}, {}, {}, {}, {update}, {value}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update=update, value=val))
else:
print("{} {}, {}, {}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
if depth:
_lsprint(client, node.nodeid, print_format, depth - 1, indent)
def _lsprint_0(node, depth, indent=""):
if not indent:
print("{:30} {:25}".format("DisplayName", "NodeId"))
print("")
for desc in node.get_children_descriptions():
print("{}{:30} {:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
if depth:
_lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + " ")
def _lsprint_1(node, depth, indent=""):
if not indent:
print("{:30} {:25} {:25} {:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
print("")
for desc in node.get_children_descriptions():
if desc.NodeClass == ua.NodeClass.Variable:
val = Node(node.server, desc.NodeId).get_value()
print("{}{:30} {!s:25} {!s:25}, {!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
else:
print("{}{:30} {!s:25} {!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
if depth:
_lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + " ")
def _lsprint_long(pnode, depth, indent=""):
if not indent:
print("{:30} {:25} {:25} {:10} {:30} {:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
print("")
for node in pnode.get_children():
attrs = node.get_attributes([ua.AttributeIds.DisplayName,
ua.AttributeIds.BrowseName,
ua.AttributeIds.NodeClass,
ua.AttributeIds.WriteMask,
ua.AttributeIds.UserWriteMask,
ua.AttributeIds.DataType,
ua.AttributeIds.Value])
name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
update = attrs[-1].ServerTimestamp
if nclass == ua.NodeClass.Variable:
print("{}{:30} {:25} {:25} {:10} {!s:30} {!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
else:
print("{}{:30} {:25} {:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
if depth:
_lsprint_long(node, depth - 1, indent + " ")
class SubHandler(object):
......@@ -434,6 +450,7 @@ def uadiscover():
sys.exit(0)
def print_history(res):
if res.TypeId.Identifier == ua.ObjectIds.HistoryData_Encoding_DefaultBinary:
buf = ua.utils.Buffer(res.Body)
......@@ -441,6 +458,7 @@ def print_history(res):
for d in ua.HistoryData.from_binary(buf).DataValues:
print("{:30} {:10} {}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
def str_to_datetime(s):
if not s:
return datetime.utcnow()
......@@ -451,6 +469,7 @@ def str_to_datetime(s):
except ValueError:
pass
def uahistoryread():
parser = argparse.ArgumentParser(description="Read history of a node")
add_common_args(parser)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment