Commit 4a94d985 authored by olivier R-D's avatar olivier R-D

add timeout option, remove old scripts

parent 47192be7
......@@ -18,6 +18,11 @@ def add_minimum_args(parser):
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default='WARNING',
help="Set log level")
parser.add_argument("--timeout",
dest="timeout",
type=int,
defualt=1,
help="Set socket timeout (NOT the diverse UA timeouts)")
def add_common_args(parser):
......@@ -71,7 +76,7 @@ def uaread():
sys.exit(1)
logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
client = Client(args.url)
client = Client(args.url, timeout=args.timeout)
client.connect()
try:
node = client.get_node(args.nodeid)
......@@ -216,7 +221,7 @@ def uawrite():
sys.exit(1)
logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
client = Client(args.url)
client = Client(args.url, timeout=args.timeout)
client.connect()
try:
node = client.get_node(args.nodeid)
......@@ -246,7 +251,7 @@ def uals():
args = parser.parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
client = Client(args.url)
client = Client(args.url, timeout=args.timeout)
client.connect()
try:
node = client.get_node(args.nodeid)
......@@ -296,7 +301,7 @@ def uasubscribe():
sys.exit(1)
logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
client = Client(args.url)
client = Client(args.url, timeout=args.timeout)
client.connect()
try:
node = client.get_node(args.nodeid)
......@@ -377,14 +382,14 @@ def uadiscover():
args = parser.parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
client = Client(args.url)
client = Client(args.url, timeout=args.timeout)
for i, server in enumerate(client.find_all_servers(), start=1):
print('Server {}:'.format(i))
for (n, v) in application_to_strings(server):
print(' {}: {}'.format(n, v))
print('')
client = Client(args.url)
client = Client(args.url, timeout=args.timeout)
for i, ep in enumerate(client.get_server_endpoints()):
print('Endpoint {}:'.format(i))
for (n, v) in endpoint_to_strings(ep):
......
#!/usr/bin/env python
# Prints OPC UA servers and endpoints for URL
import os, sys, argparse, logging
from enum import Enum
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/..")
from opcua import ua, Client
# converts numeric value to its enum name.
def enum_to_string(klass, value):
if isinstance(value, Enum):
return value.name
# if value is not a subtype of Enum, try to find a constant
# with this value in this class
for k,v in vars(klass).items():
if not k.startswith('__') and v == value:
return k
return 'Unknown {} ({})'.format(klass.__name__, value)
def application_to_strings(app):
result = []
result.append(('Application URI', app.ApplicationUri))
optionals = [
('Product URI', app.ProductUri),
('Application Name', app.ApplicationName.to_string()),
('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
('Gateway Server URI', app.GatewayServerUri),
('Discovery Profile URI', app.DiscoveryProfileUri),
]
for (n, v) in optionals:
if v:
result.append((n, v))
for url in app.DiscoveryUrls:
result.append(('Discovery URL', url))
return result # ['{}: {}'.format(n, v) for (n, v) in result]
def endpoint_to_strings(ep):
result = [('Endpoint URL', ep.EndpointUrl)]
result += application_to_strings(ep.Server)
result += [
('Server Certificate', len(ep.ServerCertificate)),
('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
('Security Policy URI', ep.SecurityPolicyUri) ]
for tok in ep.UserIdentityTokens:
result += [
('User policy', tok.PolicyId),
(' Token type', enum_to_string(ua.UserTokenType, tok.TokenType)) ]
if tok.IssuedTokenType or tok.IssuerEndpointUrl:
result += [
(' Issued Token type', tok.IssuedTokenType)
(' Issuer Endpoint URL', tok.IssuerEndpointUrl) ]
if tok.SecurityPolicyUri:
result.append((' Security Policy URI', tok.SecurityPolicyUri))
result += [
('Transport Profile URI', ep.TransportProfileUri),
('Security Level', ep.SecurityLevel)]
return result
def main(args, loglevel):
logging.basicConfig(format="%(levelname)s: %(message)s", level=loglevel)
client = Client(args.address, timeout=10)
for i, server in enumerate(client.find_all_servers(), start=1):
print('Server {}:'.format(i))
for (n, v) in application_to_strings(server):
print(' {}: {}'.format(n, v))
print('')
client = Client(args.address, timeout=10)
for i, ep in enumerate(client.get_server_endpoints()):
print('Endpoint {}:'.format(i))
for (n, v) in endpoint_to_strings(ep):
print(' {}: {}'.format(n, v))
print('')
exit(0)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description = "Performs OPC UA discovery and prints information on servers and endpoints.")
parser.add_argument("address",
help = "URL of OPC UA server (for example, opc.tcp://example.org:4840)",
metavar = "URL")
parser.add_argument(
"-v",
"--verbose",
dest="logLevel",
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default='ERROR',
help="Set the logging level")
args = parser.parse_args()
main(args, getattr(logging, args.logLevel))
#!/usr/bin/env python
# lists references of OPC node
import os, sys, argparse, logging
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/..")
from opcua import ua, Client
FORMATS = ['display-name', 'browse-name', 'node-id']
def main(args, loglevel):
logging.basicConfig(format="%(levelname)s: %(message)s", level=loglevel)
i = -1
if args.format in FORMATS:
i = FORMATS.index(args.format)
client = Client(args.address, timeout=10)
try:
client.connect()
if args.node_id:
root = client.get_node(ua.NodeId.from_string(args.node_id))
else:
root = client.get_root_node()
for x in root.get_children_descriptions():
data = [x.DisplayName.to_string(), x.BrowseName.to_string(), x.NodeId.to_string()]
if i >= 0:
print(data[i])
else:
print("\t".join(data))
finally:
client.disconnect()
exit(0)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = "Performs OPC UA browse and prints result.")
parser.add_argument("address",
help = "URL of OPC UA server (for example, opc.tcp://example.org:4840)",
metavar = "URL")
parser.add_argument("node_id",
help = "Fully-qualified node ID (for example, i=85). Default: root node",
default='',
nargs='?',
metavar = "NODE")
parser.add_argument(
"-v",
"--verbose",
dest="logLevel",
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default='ERROR',
help="Set the logging level")
parser.add_argument(
"-f",
"--format",
dest="format",
choices=['full'] + FORMATS,
default='full',
help="Set the output format. Default: full")
args = parser.parse_args()
main(args, getattr(logging, args.logLevel))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment