Commit 0dd63f64 authored by zerox1212's avatar zerox1212

Fix Tests, PEP8 Fixes

Must  be reviewed, do not merge
parent 3b0f90ad
......@@ -69,6 +69,8 @@ if __name__ == "__main__":
myevgen2.trigger(message="This is MySecondEvent " + str(count))
serverevgen.trigger(message="Server Event Message")
res = server_node.read_event_history(None, None, 0)
finally:
# close connection, remove subscriptions, etc
server.stop()
......@@ -382,7 +382,7 @@ class Node(object):
result = self.server.history_read(params)[0]
return result
def read_event_history(self, starttime=None, endtime=None, numvalues=0, evtype=ua.ObjectIds.BaseEventType):
def read_event_history(self, starttime=None, endtime=None, numvalues=0, evtypes=ua.ObjectIds.BaseEventType):
"""
Read event history of a source node
result code from server is checked and an exception is raised in case of error
......@@ -390,10 +390,6 @@ class Node(object):
then result will be truncated
"""
# FIXME event filter must be supplied externally, the problem is the node class doesn't have a way to get
# FIXME another node from the address space as these methods are at the server level, therefore there is
# FIXME no way to build an event filter here (although it could be nicer for a user who doesn't want a filter)
details = ua.ReadEventDetails()
if starttime:
details.StartTime = starttime
......@@ -405,7 +401,18 @@ class Node(object):
details.EndTime = ua.DateTimeMinValue
details.NumValuesPerNode = numvalues
evfilter = events.get_filter_from_event_type(Node(self.server, evtype))
if not type(evtypes) in (list, tuple):
evtypes = [evtypes]
# FIXME not a very nice way to make sure events.get_filter gets a list of nodes...
evtype_nodes = []
for evtype in evtypes:
if not isinstance(evtype, Node):
evtype_nodes.append(Node(self.server, ua.NodeId(evtype))) # make sure we have a list of Node objects
else:
evtype_nodes.append(evtype)
evfilter = events.get_filter_from_event_type(Node(self.server, evtype_nodes))
details.Filter = evfilter
result = self.history_read_events(details)
......
......@@ -183,12 +183,24 @@ class Subscription(object):
sourcenode = Node(self.server, sourcenode)
if evfilter is None:
# FIXME Review this, the commented out way doesn't support evtypes being passed a Node object
# if not type(evtypes) in (list, tuple):
# evtypes = [evtypes]
#
# evtypes = [Node(self.server, i) for i in evtypes] # make sure we have a list of Node objects
if not type(evtypes) in (list, tuple):
evtypes = [evtypes]
evtypes = [Node(self.server, i) for i in evtypes] # make sure we have a list of Node objects
# FIXME not a very nice way to make sure events.get_filter gets a list of nodes...
evtype_nodes = []
for evtype in evtypes:
if not isinstance(evtype, Node):
evtype_nodes.append(Node(self.server, ua.NodeId(evtype))) # make sure we have a list of Node objects
else:
evtype_nodes.append(evtype)
evfilter = events.get_filter_from_event_type(evtypes)
evfilter = events.get_filter_from_event_type(evtype_nodes)
return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
......
......@@ -383,23 +383,24 @@ def check_custom_event_type(test, ev):
test.assertTrue(ev.get_child("2:PropertyString") in properties)
test.assertEqual(ev.get_child("2:PropertyString").get_data_value().Value.VariantType, ua.VariantType.String)
class TestServerCaching(unittest.TestCase):
def runTest(self):
tmpfile = NamedTemporaryFile()
path = tmpfile.name
tmpfile.close()
#create cache file
server = Server(cacheFile = path)
# create cache file
server = Server(cacheFile=path)
#modify cache content
# modify cache content
id = ua.NodeId(ua.ObjectIds.Server_ServerStatus_SecondsTillShutdown)
s = shelve.open(path, "w", writeback = True)
s = shelve.open(path, "w", writeback=True)
s[id.to_string()].attributes[ua.AttributeIds.Value].value = ua.DataValue(123)
s.close()
#ensure that we are actually loading from the cache
server = Server(cacheFile = path)
# ensure that we are actually loading from the cache
server = Server(cacheFile=path)
self.assertEqual(server.get_node(id).get_value(), 123)
os.remove(path)
......@@ -9,9 +9,9 @@ from opcua import ua
class SubHandler():
'''
Dummy subscription client
'''
"""
Dummy subscription client
"""
def datachange_notification(self, node, val, data):
pass
......@@ -22,9 +22,9 @@ class SubHandler():
class MySubHandler():
'''
"""
More advanced subscription client using Future, so we can wait for events in tests
'''
"""
def __init__(self):
self.future = Future()
......@@ -168,12 +168,12 @@ class SubscriptionTests(object):
sub.delete()
def test_subscription_data_change(self):
'''
"""
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
"""
myhandler = MySubHandler()
o = self.opc.get_objects_node()
......@@ -207,14 +207,13 @@ class SubscriptionTests(object):
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(handle1) # sub does not exist anymore
def test_subscription_data_change_bool(self):
'''
"""
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
"""
myhandler = MySubHandler()
o = self.opc.get_objects_node()
......@@ -241,12 +240,12 @@ class SubscriptionTests(object):
sub.delete() # should delete our monitoreditem too
def test_subscription_data_change_many(self):
'''
"""
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
"""
myhandler = MySubHandler2()
o = self.opc.get_objects_node()
......@@ -420,7 +419,7 @@ class SubscriptionTests(object):
myhandler = MySubHandler()
sub = self.opc.create_subscription(100, myhandler)
handle = sub.subscribe_events(evtype=etype)
handle = sub.subscribe_events(evtypes=etype)
propertynum = 2
propertystring = "This is my test"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment