Commit b34dc27c authored by ORD's avatar ORD

Merge pull request #83 from FreeOpcUa/dev2

tests for command lines
parents 5d9c3895 b36d347a
import sys
sys.path.insert(0, "..")
import time
import logging
from opcua import Client
......@@ -57,6 +58,7 @@ if __name__ == "__main__":
print("Mehtod result is: ", result)
#embed()
time.sleep(3)
sub.unsubscribe(handle)
sub.delete()
#client.close_session()
......
......@@ -75,7 +75,6 @@ class MonitoredItemService(object):
result = ua.MonitoredItemCreateResult()
result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
result.RevisedQueueSize = params.RequestedParameters.QueueSize
result.FilterResult = params.RequestedParameters.Filter
self._monitored_item_counter += 1
result.MonitoredItemId = self._monitored_item_counter
self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
......@@ -84,15 +83,21 @@ class MonitoredItemService(object):
mdata.parameters = result
mdata.mode = params.MonitoringMode
mdata.client_handle = params.RequestedParameters.ClientHandle
mdata.mfilter = params.RequestedParameters.Filter
mdata.monitored_item_id = result.MonitoredItemId
self._monitored_items[result.MonitoredItemId] = mdata
if params.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
result.FilterResult = ua.EventFilterResult()
for _ in params.RequestedParameters.Filter.SelectClauses:
result.FilterResult.SelectClauseResults.append(ua.StatusCode())
# FIXME: where clause result
self._monitored_events[params.ItemToMonitor.NodeId] = result.MonitoredItemId
else:
self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
result.FilterResult = params.RequestedParameters.Filter
result.StatusCode, handle = self.aspace.add_datachange_callback(params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
mdata.callback_handle = handle
......@@ -153,7 +158,7 @@ class MonitoredItemService(object):
mdata = self._monitored_items[mid]
fieldlist = ua.EventFieldList()
fieldlist.ClientHandle = mdata.client_handle
fieldlist.EventFields = self._get_event_fields(mdata.parameters.FilterResult, event)
fieldlist.EventFields = self._get_event_fields(mdata.mfilter, event)
self.isub.enqueue_event(mid, fieldlist, mdata.parameters.RevisedQueueSize)
return True
......
......@@ -220,7 +220,8 @@ class Subscription(object):
data.node = Node(self.server, mi.ItemToMonitor.NodeId)
data.attribute = mi.ItemToMonitor.AttributeId
data.server_handle = result.MonitoredItemId
data.mfilter = result.FilterResult
#data.mfilter = result.FilterResult
data.mfilter = mi.RequestedParameters.Filter
self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
mids.append(result.MonitoredItemId)
......
......@@ -3,6 +3,7 @@
import sys
sys.path.insert(0, "..")
sys.path.insert(0, ".")
import subprocess
import time
import logging
import math
......@@ -922,6 +923,51 @@ class AdminTestClient(unittest.TestCase, CommonTests):
self.assertEqual(v_ro.get_value(), 2)
class TestCmdLines(unittest.TestCase):
'''
Test command lines
'''
@classmethod
def setUpClass(self):
self.srv = Server()
self.srv_url = 'opc.tcp://localhost:%d' % port_num2
self.srv.set_endpoint(self.srv_url)
objects = self.srv.get_objects_node()
obj = objects.add_object(4, "directory")
var = obj.add_variable(4, "variable", 1.999)
var2 = obj.add_variable(4, "variable2", 1.777)
var2.set_writable()
self.srv.start()
def test_uals(self):
s = subprocess.check_output(["python3", "tools/uals", "--url", self.srv_url])
self.assertIn(b"i=85", s)
self.assertNotIn(b"i=89", s)
self.assertNotIn(b"1.999", s)
s = subprocess.check_output(["python3", "tools/uals", "--url", self.srv_url, "-d", "3"])
self.assertIn(b"1.999", s)
def test_uaread(self):
s = subprocess.check_output(["python3", "tools/uaread", "--url", self.srv_url, "--path", "0:Objects,4:directory,4:variable"])
self.assertIn(b"1.999", s)
def test_uawrite(self):
s = subprocess.check_output(["python3", "tools/uawrite", "--url", self.srv_url, "--path", "0:Objects,4:directory,4:variable2", "1.789"])
s = subprocess.check_output(["python3", "tools/uaread", "--url", self.srv_url, "--path", "0:Objects,4:directory,4:variable2"])
self.assertIn(b"1.789", s)
self.assertNotIn(b"1.999", s)
def test_uadiscover(self):
s = subprocess.check_output(["python3", "tools/uadiscover", "--url", self.srv_url])
self.assertIn(b"opc.tcp://localhost", s)
self.assertIn(b"FreeOpcUa", s)
self.assertIn(b"urn:freeopcua:python:server", s)
@classmethod
def tearDownClass(self):
self.srv.stop()
class TestServer(unittest.TestCase, CommonTests):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment