Commit 5a5e423e authored by ORD's avatar ORD Committed by GitHub

Add missing Enums, add move some code to ua_utils.py (#217)

* small cleanup, add 2 methods in ua_utils.py

* improve and add more tests to string convertions

* update README

* add definition for EvenNotifier and WriteMask, ObjectIdNames and start using them

* update dependency to datetuil

* more use og new Enums, fixes

* DataType cannot always be converted to VariantType
parent 3edb2f39
......@@ -5,6 +5,7 @@ python:
- "pypy"
# command to install dependencies
install:
- pip install python-dateutil
- if [[ $TRAVIS_PYTHON_VERSION == '3.4' ]]; then pip install cryptography ; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install futures ; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install cryptography ; fi
......
Pure Python OPC-UA / IEC 62541 Client and Server Python 2, 3 and pypy .
Pure Python OPC UA / IEC 62541 Client and Server Python 2, 3 and pypy .
http://freeopcua.github.io/, https://github.com/FreeOpcUa/python-opcua
[![Build Status](https://travis-ci.org/FreeOpcUa/python-opcua.svg?branch=master)](https://travis-ci.org/FreeOpcUa/python-opcua)
......@@ -9,11 +9,11 @@ http://freeopcua.github.io/, https://github.com/FreeOpcUa/python-opcua
[![PyPI Oackage](https://badge.fury.io/py/freeopcua.svg)](https://badge.fury.io/py/freeopcua)
OPC-UA binary protocol implementation is quasi complete and has been tested against many different OPC-UA stacks. API offers both a low level interface to send and receive all UA defined structures and high level classes allowing to write a server or a client in a few lines. It is easy to mix high level objects and low level UA calls in one application.
OPC UA binary protocol implementation is quasi complete and has been tested against many different OPC UA stacks. API offers both a low level interface to send and receive all UA defined structures and high level classes allowing to write a server or a client in a few lines. It is easy to mix high level objects and low level UA calls in one application.
Most code is autogenerated from xml specification using same code as the one that is used for freeopcua C++ client and server, thus adding missing functionnality to client and server shoud be trivial.
Most low level code is autogenerated from xml specification, thus adding missing functionnality to client or server is often trivial.
Using Python > 3.4 the only dependency is cryptography. If using python 2.7 or pypy < 3 you need to install enum34, trollius(asyncio), and futures(concurrent.futures), with pip for example.
Using Python > 3.4 the dependencies are cryptography and dateutil. If using python 2.7 or pypy < 3 you also need to install enum34, trollius(asyncio), and futures(concurrent.futures), with pip for example.
coveryage.py reports a test coverage of over 90% of code, most of non-tested code is autogenerate code that is not used yet.
......@@ -34,7 +34,7 @@ A set of command line tools also available: https://github.com/FreeOpcUa/python-
* uawrite (write attribute of a node)
* uasubscribe (subscribe to a node and print datachange events)
* uaclient (connect to server and start python shell)
* uaserver (starts a demo OPC-UA server)
* uaserver (starts a demo OPC UA server)
tools/uaserver --populate --certificate cert.pem --private_key pk.pem
......@@ -89,20 +89,25 @@ Server: what is not implemented
Server: Running on a Raspberry Pi
Setting up the standard address-space from scratch is the most time-consuming step of the startup process which may lead to long startup times on less powerful devices like a Raspberry Pi. By passing a path to a cache-file to the server constructor, a shelve holding the address space will be created during the first startup. All following startups will make use of the cache-file which leads to significantly better startup performance (~3.5 vs 125 seconds on a Raspbery Pi Model B).
Setting up the standard address-space from XML is the most time-consuming step of the startup process which may lead to long startup times on less powerful devices like a Raspberry Pi. By passing a path to a cache-file to the server constructor, a shelve holding the address space will be created during the first startup. All following startups will make use of the cache-file which leads to significantly better startup performance (~3.5 vs 125 seconds on a Raspbery Pi Model B).
# Development
Code follows PEP8 apart for line lengths which should be max 120 characters and OPC UA enums that keep camel case from XML definition.
All code is under opcua directory
- ua contains all UA structures from specification
- common contains high level objects and methods used both in server and client
- client contains client specific code
- server contains server specific code
- utils contains some utilities not really related to OPC-UA
- tools contains command lines tools
Code follows PEP8 apart for line lengths which should be max 120 characters and OPC UA structures that keep camel case from XML definition.
All protocol code is under opcua directory
- opcua/ua contains all UA structures from specification, most are autogenerated
- opcua/common contains high level objects and methods used both in server and client
- opcua/client contains client specific code
- opcua/server contains server specific code
- opcua/utils contains some utilities function and classes
- opcua/tools contains code for command lines tools
- schemas contains the XML and text files from specification and the python scripts used to autogenerate code
- tests contains tests
- docs contains files to auto generate documentation from doc strings
- examples contains many example files
- tools contains python scripts that can be used to run command line tools from repository without installing
## Running tests:
......
import sys
sys.path.insert(0, "..")
import logging
from datetime import datetime
try:
from IPython import embed
......@@ -81,6 +82,10 @@ if __name__ == "__main__":
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
myvar.set_writable() # Set MyVariable to be writable by clients
mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
mystringvar.set_writable() # Set MyVariable to be writable by clients
mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.now())
mydtvar.set_writable() # Set MyVariable to be writable by clients
myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
myprop = myobj.add_property(idx, "myproperty", "I am a property")
......
......@@ -63,11 +63,51 @@ class Node(object):
def get_data_type(self):
"""
get data type of node
get data type of node as NodeId
"""
result = self.get_attribute(ua.AttributeIds.DataType)
return result.Value.Value
def get_data_type_as_variant_type(self):
"""
get data type of node as VariantType
This only works if node is a variable, otherwise type
may not be convertible to VariantType
"""
result = self.get_attribute(ua.AttributeIds.DataType)
return ua.DataType_to_VariantType(result.Value.Value)
def get_access_level(self):
"""
get access level of node as a list of AccessLevel Enum
"""
result = self.get_attribute(ua.AttributeIds.AccessLevel)
return ua.int_to_AccessLevel(result.Value.Value)
def get_user_access_level(self):
"""
get user access level of node as a list of AccessLevel Enum
"""
result = self.get_attribute(ua.AttributeIds.UserAccessLevel)
return ua.int_to_AccessLevel(result.Value.Value)
def get_event_notifier(self):
"""
get EventNotifier attribute value as a list of EventNotifier Enum
"""
result = self.get_attribute(ua.AttributeIds.EventNotifier)
return ua.int_to_EventNotifier(result.Value.Value)
def set_event_notifier(self, enum_list):
"""
set event notifier attribute,
arg is a list of EventNotifier Enum
"""
res = 1
for en in enum_list:
ua.set_bit(res, en.value)
self.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(res, ua.VariantType.Byte)))
def get_node_class(self):
"""
get node class attribute of node
......@@ -401,7 +441,7 @@ class Node(object):
details.EndTime = ua.DateTimeMinValue
details.NumValuesPerNode = numvalues
if not type(evtypes) in (list, tuple):
if not isinstance(evtypes, (list, tuple)):
evtypes = [evtypes]
evtypes = [Node(self.server, evtype) for evtype in evtypes]
......
"""
Usefull method and classes not belonging anywhere and depending on opcua library
"""
from dateutil import parser
from datetime import datetime
from enum import Enum, IntEnum
from opcua import ua
def val_to_string(val):
"""
convert a python object or python-opcua object to a string
which should be easy to understand for human
easy to modify, and not too hard to parse back ....not easy
meant for UI or command lines
"""
if isinstance(val, (list, tuple)):
res = []
for v in val:
res.append(val_to_string(v))
return "[" + ", ".join(res) + "]"
if hasattr(val, "to_string"):
val = val.to_string()
elif isinstance(val, ua.StatusCode):
val = val.name
elif isinstance(val, (Enum, IntEnum)):
val = val.name
elif isinstance(val, ua.DataValue):
val = variant_to_string(val.Value)
elif isinstance(val, str):
pass
elif isinstance(val, bytes):
val = str(val)
elif isinstance(val, datetime):
val = val.isoformat()
elif isinstance(val, (int, float)):
val = str(val)
else:
# FIXME: Some types are probably missing!
val = str(val)
return val
def variant_to_string(var):
"""
convert a variant to a string which should be easy to understand for human
easy to modify, and not too hard to parse back ....not easy
meant for UI or command lines
"""
return val_to_string(var.Value)
def string_to_val(string, vtype):
"""
Convert back a string to a python or python-opcua object
"""
string = string.strip()
if string.startswith("["):
string = string[1:-1]
var = []
for s in string.split(","):
s = s.strip()
val = string_to_val(s, vtype)
var.append(val)
return var
if vtype == ua.VariantType.Null:
val = None
elif vtype == ua.VariantType.Boolean:
val = bool(string)
elif 4 <= vtype.value < 9:
val = int(string)
elif vtype in (ua.VariantType.Float, ua.VariantType.Double):
val = float(string)
elif vtype in (ua.VariantType.String, ua.VariantType.XmlElement):
val = string
elif vtype in (ua.VariantType.SByte, ua.VariantType.Guid, ua.VariantType.ByteString):
val = bytes(string)
elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
val = ua.NodeId.from_string(string)
elif vtype == ua.VariantType.QualifiedName:
val = ua.QualifiedName.from_string(string)
elif vtype == ua.VariantType.DateTime:
val = parser.parse(string)
elif vtype == ua.VariantType.LocalizedText:
val = ua.LocalizedText(string)
elif vtype == ua.VariantType.StatusCode:
val = ua.StatusCode(string)
else:
# FIXME: Some types are probably missing!
raise NotImplementedError
return val
def string_to_variant(string, vtype):
"""
convert back a string to an ua.Variant
"""
return ua.Variant(string_to_val(string, vtype), vtype)
"""
Usefull methods and classes not depending on opcua library
"""
import logging
import os
from concurrent.futures import Future
......
......@@ -61,7 +61,7 @@ class EventGenerator(object):
self.event.SourceNode = source.nodeid
self.event.SourceName = source.get_browse_name().Name
source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
source.set_event_notifier([ua.EventNotifier.SubscribeToEvents, ua.EventNotifier.HistoryRead])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
......
......@@ -181,19 +181,19 @@ class InternalServer(object):
"""
Set attribute History Read of object events to True and start storing data for history
"""
# to historize events of an object, first check if object supports events
source_event_notifier = source.get_attribute(ua.AttributeIds.EventNotifier)
if source_event_notifier.Value.Value & 1 == 1: # check bit 0
# if it supports events, turn on bit 2 (enables history read)
source.set_attr_bit(ua.AttributeIds.EventNotifier, 2)
# send the object to history manager
self.history_manager.historize_event(source, period, count)
event_notifier = source.get_event_notifier()
if ua.EventNotifier.SubscribeToEvents not in event_notifier:
raise ua.UaError("Node does not generate events", event_notifier)
if ua.EventNotifier.SubscribeToEvents not in event_notifier:
event_notifier.append(ua.EventNotifier.HistoryRead)
source.set_event_notifier(event_notifier)
self.history_manager.historize_event(source, period, count)
def disable_history_event(self, source):
"""
Set attribute History Read of node to False and stop storing data for history
"""
source.unset_attr_bit(ua.AttributeIds.EventNotifier, 2)
source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
self.history_manager.dehistorize(source)
......
......@@ -125,7 +125,7 @@ class MonitoredItemService(object):
result, mdata = self._make_monitored_item_common(params)
ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
if ev_notify_byte is None or ev_notify_byte & 1 == 0:
if ev_notify_byte is None or not ua.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
return result
# result.FilterResult = ua.EventFilterResult() # spec says we can ignore if not error
......
# the order is important, some classes are overriden
from opcua.ua.attribute_ids import AttributeIds
from opcua.ua.object_ids import ObjectIds
from opcua.ua.object_ids import ObjectIdNames
from opcua.ua.status_codes import StatusCodes
from opcua.ua.uaprotocol_auto import *
from opcua.ua.uaprotocol_hand import *
......
This diff is collapsed.
......@@ -324,6 +324,43 @@ class AccessLevelMask(IntEnum):
SemanticChange = 1 << AccessLevel.SemanticChange
class WriteMask(IntEnum):
"""
Mask to indicate which attribute of a node is writable
Rmq: This is not a mask but bit index....
"""
AccessLevel = 0
ArrayDimensions = 1
BrowseName = 2
ContainsNoLoops = 3
DataType = 4
Description = 5
DisplayName = 6
EventNotifier = 7
Executable = 8
Historizing = 9
InverseName = 10
IsAbstract = 11
MinimumSamplingInterval = 12
NodeClass = 13
NodeId = 14
Symmetric = 15
UserAccessLevel = 16
UserExecutable = 17
UserWriteMask = 18
ValueRank = 19
WriteMask = 20
ValueForVariableType = 21
class EventNotifier(IntEnum):
"""
"""
SubscribeToEvents = 0
HistoryRead = 2
HistoryWrite = 3
class Guid(FrozenClass):
def __init__(self):
......@@ -358,8 +395,12 @@ class StatusCode(FrozenClass):
"""
def __init__(self, value=0):
self.value = value
self.name, self.doc = status_codes.get_name_and_doc(value)
if isinstance(value, str):
self.name = value
self.value = getattr(status_codes.StatusCodes, value)
else:
self.value = value
self.name, self.doc = status_codes.get_name_and_doc(value)
self._freeze = True
def to_binary(self):
......@@ -736,7 +777,9 @@ class LocalizedText(FrozenClass):
def to_string(self):
# FIXME: use local
return self.Text.decode()
if self.Text is None:
return ""
return self.Text.decode('utf-8')
def __str__(self):
return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
......@@ -875,7 +918,7 @@ class VariantTypeCustom(object):
self.name = "Custom"
self.value = val
if self.value > 0b00111111:
raise UaError("Cannot create VariantType. VariantType must be %s > x > %s", 0b111111, 25)
raise UaError("Cannot create VariantType. VariantType must be {} > x > {}, received {}".format(0b111111, 25, val))
def __str__(self):
return "VariantType.Custom:{}".format(self.value)
......@@ -978,10 +1021,7 @@ class Variant(FrozenClass):
dimensions = None
encoding = ord(data.read(1))
int_type = encoding & 0b00111111
if int_type > 25:
vtype = VariantTypeCustom(int_type)
else:
vtype = VariantType(int_type)
vtype = DataType_to_VariantType(int_type)
if vtype == VariantType.Null:
return Variant(None, vtype, encoding)
if test_bit(encoding, 7):
......@@ -1179,4 +1219,48 @@ class DataValue(FrozenClass):
__repr__ = __str__
def DataType_to_VariantType(int_type):
"""
Takes a NodeId or int and return a VariantType
This is only supported if int_type < 63 due to VariantType encoding
"""
if isinstance(int_type, NodeId):
int_type = int_type.Identifier
if int_type <= 25:
return VariantType(int_type)
else:
return VariantTypeCustom(int_type)
def int_to_AccessLevel(level):
"""
take an int and return a list of AccessLevel Enum
"""
res = []
for val in AccessLevel:
test_bit(level, val.value)
res.append(val)
return res
def int_to_WriteMask(level):
"""
take an int and return a list of WriteMask Enum
"""
res = []
for val in WriteMask:
test_bit(level, val.value)
res.append(val)
return res
def int_to_EventNotifier(level):
"""
take an int and return a list of EventNotifier Enum
"""
res = []
for val in EventNotifier:
test_bit(level, val.value)
res.append(val)
return res
......@@ -5,10 +5,20 @@ if __name__ == "__main__":
outputfile.write("\n")
outputfile.write("from enum import IntEnum\n")
outputfile.write("\n")
# Making ObjectIds inherit IntEnum has a huge performance impact!!!!!
# so we use a normal class and a reverse one for the few places we need it
outputfile.write("class ObjectIds(object):\n")
outputfile.write(" Null = 0\n")
for line in inputfile:
name, nb, datatype = line.split(",")
outputfile.write(" {} = {}\n".format(name, nb))
inputfile.close()
inputfile = open("NodeIds.csv")
outputfile.write("\n\nObjectIdNames = {}\n")
outputfile.write("ObjectIdNames[0] = 'Null'\n".format(nb, name))
for line in inputfile:
name, nb, datatype = line.split(",")
outputfile.write("ObjectIdNames[{}] = '{}'\n".format(nb, name))
inputfile = open("AttributeIds.csv")
outputfile = open("../opcua/ua/attribute_ids.py", "w")
......
......@@ -3,9 +3,9 @@ from setuptools import setup, find_packages
import sys
if sys.version_info[0] < 3:
install_requires = ["enum34", "trollius", "futures"]
install_requires = ["python-dateutil", "enum34", "trollius", "futures"]
else:
install_requires = []
install_requires = ["python-dateutil"]
setup(name="freeopcua",
version="0.10.12",
......
......@@ -289,11 +289,11 @@ class CommonTests(object):
def test_variable_data_type(self):
objects = self.opc.get_objects_node()
var = objects.add_variable(3, 'stringfordatatype', "a string")
val = var.get_data_type()
self.assertEqual(val, ua.NodeId(ua.ObjectIds.String))
val = var.get_data_type_as_variant_type()
self.assertEqual(val, ua.VariantType.String)
var = objects.add_variable(3, 'stringarrayfordatatype', ["a", "b"])
val = var.get_data_type()
self.assertEqual(val, ua.NodeId(ua.ObjectIds.String))
val = var.get_data_type_as_variant_type()
self.assertEqual(val, ua.VariantType.String)
def test_add_string_array_variable(self):
objects = self.opc.get_objects_node()
......@@ -414,8 +414,8 @@ class CommonTests(object):
def test_bool_variable(self):
o = self.opc.get_objects_node()
v = o.add_variable(3, 'BoolVariable', True)
dt = v.get_data_type()
self.assertEqual(dt, ua.TwoByteNodeId(ua.ObjectIds.Boolean))
dt = v.get_data_type_as_variant_type()
self.assertEqual(dt, ua.VariantType.Boolean)
val = v.get_value()
self.assertEqual(True, val)
v.set_value(False)
......
......@@ -11,6 +11,7 @@ from opcua.ua import extensionobject_to_binary
from opcua.ua.uatypes import flatten, get_shape, reshape
from opcua.server.internal_subscription import WhereClauseEvaluator
from opcua.common.event_objects import BaseEvent
from opcua.common.ua_utils import string_to_variant, variant_to_string, string_to_val, val_to_string
......@@ -20,6 +21,57 @@ class TestUnit(unittest.TestCase):
Simple unit test that do not need to setup a server or a client
'''
def test_string_to_variant_int(self):
s_arr_uint = "[1, 2, 3, 4]"
arr_uint = [1, 2, 3, 4]
s_uint = "1"
self.assertEqual(string_to_val(s_arr_uint, ua.VariantType.UInt32), arr_uint)
self.assertEqual(string_to_val(s_arr_uint, ua.VariantType.UInt16), arr_uint)
self.assertEqual(val_to_string(arr_uint), s_arr_uint)
def test_string_to_variant_float(self):
s_arr_float = "[1.1, 2.1, 3, 4.0]"
arr_float = [1.1, 2.1, 3, 4.0]
s_float = "1.9"
self.assertEqual(string_to_val(s_float, ua.VariantType.Float), 1.9)
self.assertEqual(val_to_string(arr_float), s_arr_float)
def test_string_to_variant_datetime_string(self):
s_arr_datetime = "[2014-05-6, 2016-10-3]"
arr_string = ['2014-05-6', '2016-10-3']
arr_datetime = [datetime(2014, 5, 6), datetime(2016, 10, 3)]
s_datetime = "2014-05-3"
self.assertEqual(val_to_string(arr_string), s_arr_datetime)
self.assertEqual(string_to_val(s_arr_datetime, ua.VariantType.String), arr_string)
self.assertEqual(string_to_val(s_arr_datetime, ua.VariantType.DateTime), arr_datetime )
def test_string_to_variant_nodeid(self):
s_arr_nodeid = "[ns=2;i=56, i=45]"
arr_nodeid = [ua.NodeId.from_string("ns=2;i=56"), ua.NodeId.from_string("i=45")]
s_nodeid = "i=45"
self.assertEqual(string_to_val(s_arr_nodeid, ua.VariantType.NodeId), arr_nodeid)
def test_string_to_variant_status_code(self):
s_statuscode = "Good"
statuscode = ua.StatusCode(ua.StatusCodes.Good)
s_statuscode2 = "Uncertain"
statuscode2 = ua.StatusCode(ua.StatusCodes.Uncertain)
self.assertEqual(string_to_val(s_statuscode, ua.VariantType.StatusCode), statuscode)
self.assertEqual(string_to_val(s_statuscode2, ua.VariantType.StatusCode), statuscode2)
def test_string_to_variant_qname(self):
string = "2:name"
obj = ua.QualifiedName("name", 2)
self.assertEqual(string_to_val(string, ua.VariantType.QualifiedName), obj)
self.assertEqual(val_to_string(obj), string)
def test_string_to_variant_localized_text(self):
string = "_This is my string"
#string = "_This is my nøåæ"FIXME: does not work with python2 ?!?!
obj = ua.LocalizedText(string)
self.assertEqual(string_to_val(string, ua.VariantType.LocalizedText), obj)
self.assertEqual(val_to_string(obj), string)
def test_variant_dimensions(self):
l = [[[1.0, 1.0, 1.0, 1.0], [2.0, 2.0, 2.0, 2.0], [3.0, 3.0, 3.0, 3.0]],[[5.0, 5.0, 5.0, 5.0], [7.0, 8.0, 9.0, 01.0], [1.0, 1.0, 1.0, 1.0]]]
v = ua.Variant(l)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment