Commit 8a70e761 authored by ORD's avatar ORD

Merge pull request #56 from alkor/tools

Tools
parents 4a03cf9c 73588ee7
......@@ -345,6 +345,15 @@ class BinaryClient(object):
response.ResponseHeader.ServiceResult.check()
return response.Endpoints
def find_servers(self, params):
self.logger.info("find_servers")
request = ua.FindServersRequest()
request.Parameters = params
data = self._uasocket.send_request(request)
response = ua.FindServersResponse.from_binary(data)
response.ResponseHeader.ServiceResult.check()
return response.Servers
def translate_browsepaths_to_nodeids(self, browsepaths):
self.logger.info("translate_browsepath_to_nodeid")
request = ua.TranslateBrowsePathsToNodeIdsRequest()
......
......@@ -99,8 +99,21 @@ class Client(object):
self.open_secure_channel()
endpoints = self.get_endpoints()
self.close_secure_channel()
self.disconnect_socket()
return endpoints
def find_all_servers(self):
"""
Connect, ask server for a list of known servers, and disconnect
"""
self.connect_socket()
self.send_hello()
self.open_secure_channel()
servers = self.find_servers()
self.close_secure_channel()
self.disconnect_socket()
return servers
def connect(self):
"""
High level method
......@@ -158,10 +171,12 @@ class Client(object):
def get_endpoints(self):
params = ua.GetEndpointsParameters()
params.EndpointUrl = self.server_url.geturl()
params.ProfileUris = ["http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"]
params.LocaleIds = ["http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"]
return self.bclient.get_endpoints(params)
def find_servers(self):
params = ua.FindServersParameters()
return self.bclient.find_servers(params)
def create_session(self):
desc = ua.ApplicationDescription()
desc.ApplicationUri = self.application_uri
......
#!/usr/bin/env python
# Prints OPC UA servers and endpoints for URL
import os, sys, argparse, logging
from enum import Enum
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/..")
from opcua import ua, Client
# converts numeric value to its enum name.
def enum_to_string(klass, value):
if isinstance(value, Enum):
return value.name
# if value is not a subtype of Enum, try to find a constant
# with this value in this class
for k,v in vars(klass).items():
if not k.startswith('__') and v == value:
return k
return 'Unknown {} ({})'.format(klass.__name__, value)
def application_to_strings(app):
result = []
result.append(('Application URI', app.ApplicationUri))
optionals = [
('Product URI', app.ProductUri),
('Application Name', app.ApplicationName.to_string()),
('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
('Gateway Server URI', app.GatewayServerUri),
('Discovery Profile URI', app.DiscoveryProfileUri),
]
for (n, v) in optionals:
if v:
result.append((n, v))
for url in app.DiscoveryUrls:
result.append(('Discovery URL', url))
return result # ['{}: {}'.format(n, v) for (n, v) in result]
def endpoint_to_strings(ep):
result = [('Endpoint URL', ep.EndpointUrl)]
result += application_to_strings(ep.Server)
result += [
('Server Certificate', len(ep.ServerCertificate)),
('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
('Security Policy URI', ep.SecurityPolicyUri) ]
for tok in ep.UserIdentityTokens:
result += [
('User policy', tok.PolicyId),
(' Token type', enum_to_string(ua.UserTokenType, tok.TokenType)) ]
if tok.IssuedTokenType or tok.IssuerEndpointUrl:
result += [
(' Issued Token type', tok.IssuedTokenType)
(' Issuer Endpoint URL', tok.IssuerEndpointUrl) ]
if tok.SecurityPolicyUri:
result.append((' Security Policy URI', tok.SecurityPolicyUri))
result += [
('Transport Profile URI', ep.TransportProfileUri),
('Security Level', ep.SecurityLevel)]
return result
def main(args, loglevel):
logging.basicConfig(format="%(levelname)s: %(message)s", level=loglevel)
client = Client(args.address, timeout=10)
for i, server in enumerate(client.find_all_servers(), start=1):
print('Server {}:'.format(i))
for (n, v) in application_to_strings(server):
print(' {}: {}'.format(n, v))
print('')
client = Client(args.address, timeout=10)
for i, ep in enumerate(client.get_server_endpoints()):
print('Endpoint {}:'.format(i))
for (n, v) in endpoint_to_strings(ep):
print(' {}: {}'.format(n, v))
print('')
exit(0)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description = "Performs OPC UA discovery and prints information on servers and endpoints.")
parser.add_argument("address",
help = "URL of OPC UA server (for example, opc.tcp://example.org:4840)",
metavar = "URL")
parser.add_argument(
"-v",
"--verbose",
dest="logLevel",
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default='ERROR',
help="Set the logging level")
args = parser.parse_args()
main(args, getattr(logging, args.logLevel))
#!/usr/bin/env python
# lists references of OPC node
import os, sys, argparse, logging
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/..")
from opcua import ua, Client
FORMATS = ['display-name', 'browse-name', 'node-id']
def main(args, loglevel):
logging.basicConfig(format="%(levelname)s: %(message)s", level=loglevel)
i = -1
if args.format in FORMATS:
i = FORMATS.index(args.format)
client = Client(args.address, timeout=10)
try:
client.connect()
if args.node_id:
root = client.get_node(ua.NodeId.from_string(args.node_id))
else:
root = client.get_root_node()
for x in root.get_children_descriptions():
data = [x.DisplayName.to_string(), x.BrowseName.to_string(), x.NodeId.to_string()]
if i >= 0:
print(data[i])
else:
print("\t".join(data))
finally:
client.disconnect()
exit(0)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = "Performs OPC UA browse and prints result.")
parser.add_argument("address",
help = "URL of OPC UA server (for example, opc.tcp://example.org:4840)",
metavar = "URL")
parser.add_argument("node_id",
help = "Fully-qualified node ID (for example, i=85). Default: root node",
default='',
nargs='?',
metavar = "NODE")
parser.add_argument(
"-v",
"--verbose",
dest="logLevel",
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default='ERROR',
help="Set the logging level")
parser.add_argument(
"-f",
"--format",
dest="format",
choices=['full'] + FORMATS,
default='full',
help="Set the output format. Default: full")
args = parser.parse_args()
main(args, getattr(logging, args.logLevel))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment