Commit 52e8bf8a authored by Olivier R-D's avatar Olivier R-D

python2 support: do not use coroutines

they have a different syntax between pyton 2 and 3
parent f7fada32
"""
server side implementation of subscriptions
"""
import time
import sys
from threading import RLock, Thread, Condition
from concurrent.futures import Future
import logging
import asyncio
try:
import asyncio
except ImportError:
import trollius as asyncio
from trollius import From
import functools
from opcua import ua
......@@ -34,30 +40,13 @@ class SubscriptionManager(Thread):
self.loop.run_forever()
self.logger.debug("subscription thread ended")
def _add_task(self, future, coro):
task = self.loop.create_task(coro)
future.set_result(task)
def add_task(self, coro):
"""
execute a coroutine in subscription loop
threadsafe method
"""
future = Future() #from concurrent, NOT asyncio
p = functools.partial(self._add_task, future, coro)
self.loop.call_soon_threadsafe(p)
return future.result() #wait until result is available
def cancel_task(self, task):
"""
threadsafe stop task
"""
self.loop.call_soon_threadsafe(task.cancel)
def stop(self):
"""
stop subscription loop, thus the subscription thread
"""
# no need to stop subscriptions, they will stop with loop
#for sub in self.subscriptions.values():
#sub.stop()
self.loop.call_soon_threadsafe(self.loop.stop)
def create_subscription(self, params, callback):
......@@ -174,35 +163,32 @@ class InternalSubscription(object):
self._startup = True
self._keep_alive_count = 0
self._publish_cycles_count = 0
self._stopev = False
def __str__(self):
return "Subscription(id:{})".format(self.data.SubscriptionId)
def start(self):
self.logger.debug("starting subscription %s", self.data.SubscriptionId)
self.task = self.manager.add_task(self._subscription_loop())
self._subscription_loop()
def stop(self):
self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
self.manager.cancel_task(self.task)
self._stopev = True
self.delete_all_monitored_items()
def delete_all_monitored_items(self):
self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
@asyncio.coroutine
def _subscription_loop(self):
self.logger.debug("%s loop running", self)
while True:
yield from asyncio.sleep(self.data.RevisedPublishingInterval/1000)
try:
expired = self.publish_results()
if expired:
self.stop()
except Exception as ex: #we catch everythin since it seems exceptions are lost in loop
self.logger.exception("Exception in %s loop", self)
#self.logger.debug("%s loop", self)
if not self._stopev:
p = functools.partial(self.manager.loop.call_later, self.data.RevisedPublishingInterval/1000.0, self._sub_loop)
self.manager.loop.call_soon_threadsafe(p)
def _sub_loop(self):
self.publish_results()
self._subscription_loop()
def has_published_results(self):
with self._lock:
......@@ -215,18 +201,16 @@ class InternalSubscription(object):
return False
def publish_results(self):
expired = False
if self._publish_cycles_count > self.data.RevisedLifetimeCount:
self.logger.warn("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
#FIXME this will never be send since we do not have publish request anyway
self.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
expired = True
self._stopev = True
with self._lock:
if self.has_published_results(): #FIXME: should we pop a publish request here? or we do not care?
self._publish_cycles_count += 1
result = self._pop_publish_result()
self.callback(result)
return expired
def _pop_publish_result(self):
result = ua.PublishResult()
......@@ -234,13 +218,13 @@ class InternalSubscription(object):
if self._triggered_datachanges:
notif = ua.DataChangeNotification()
notif.MonitoredItems = self._triggered_datachanges[:]
self._triggered_datachanges.clear()
self._triggered_datachanges = []
self.logger.debug("sending datachanges nontification with %s events", len(notif.MonitoredItems))
result.NotificationMessage.NotificationData.append(notif)
if self._triggered_events:
notif = ua.EventNotificationList()
notif.Events = self._triggered_events[:]
self._triggered_events.clear()
self._triggered_events = []
result.NotificationMessage.NotificationData.append(notif)
if self._triggered_statuschanges:
notif = ua.StatusChangeNotification()
......@@ -274,7 +258,6 @@ class InternalSubscription(object):
self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
return ua.NotificationMessage()
def create_monitored_items(self, params):
results = []
for item in params.ItemsToCreate:
......@@ -337,7 +320,6 @@ class InternalSubscription(object):
#force data change event generation
self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
return result
def delete_monitored_items(self, ids):
......
......@@ -155,7 +155,7 @@ def unpack_bytes(data):
def unpack_string(data):
b = unpack_bytes(data)
#return str(b)
#return str(b, "utf-8")
return b.decode("utf-8")
def test_bit(data, offset):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment