Commit 91e1dc08 authored by Olivier R-D's avatar Olivier R-D

fix potential threading issues

parent 8fa9ffa8
......@@ -119,9 +119,10 @@ class BinaryClient(object):
self._call_callback(seqhdr.RequestId, body)
def _call_callback(self, request_id, body):
future = self._callbackmap.pop(request_id, None)
if future is None:
raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
with self._lock:
future = self._callbackmap.pop(request_id, None)
if future is None:
raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
future.set_result(body)
def _write_socket(self, hdr, *args):
......@@ -177,9 +178,10 @@ class BinaryClient(object):
hello = ua.Hello()
hello.EndpointUrl = url
header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
self._write_socket(header, hello)
future = Future()
self._callbackmap[0] = future
with self._lock:
self._callbackmap[0] = future
self._write_socket(header, hello)
return ua.Acknowledge.from_binary(future.result())
def open_secure_channel(self, params):
......@@ -191,10 +193,11 @@ class BinaryClient(object):
hdr = ua.Header(ua.MessageType.SecureOpen, ua.ChunkType.Single, self._security_token.ChannelId)
asymhdr = ua.AsymmetricAlgorithmHeader()
seqhdr = self._create_sequence_header()
self._write_socket(hdr, asymhdr, seqhdr, request)
future = Future()
self._callbackmap[seqhdr.RequestId] = future
with self._lock:
self._callbackmap[seqhdr.RequestId] = future
self._write_socket(hdr, asymhdr, seqhdr, request)
response = ua.OpenSecureChannelResponse.from_binary(future.result())
response.ResponseHeader.ServiceResult.check()
......
......@@ -2,6 +2,7 @@
high level interface to subscriptions
"""
import io
import time
import logging
from threading import RLock
......@@ -24,6 +25,7 @@ class Subscription(object):
self.parameters = params #move to data class
self._monitoreditems_map = {}
self._lock = RLock()
self.subscription_id = None
response = self.server.create_subscription(params, self.publish_callback)
self.subscription_id = response.SubscriptionId #move to data class
self.server.publish()
......@@ -35,6 +37,9 @@ class Subscription(object):
def publish_callback(self, publishresult):
self.logger.info("Publish callback called with result: %s", publishresult)
while self.subscription_id is None:
time.sleep(0.01)
for notif in publishresult.NotificationMessage.NotificationData:
if notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.DataChangeNotification_Encoding_DefaultBinary):
datachange = ua.DataChangeNotification.from_binary(io.BytesIO(notif.to_binary()))
......
......@@ -175,6 +175,7 @@ class InternalSubscription(object):
def _subscription_loop(self):
self.logger.debug("%s loop running", self)
while True:
yield from asyncio.sleep(self.data.RevisedPublishingInterval/1000)
#test disabled we do not check that one since we do not care about not received results
#if self._keep_alive_count > self.data.RevisedLifetimeCount:
#self.logger.warn("Subscription %s has expired, keep alive count(%s) > lifetime count (%s)", self.data.SubscriptionId, self._keep_alive_count, self.data.RevisedLifetimeCount)
......@@ -183,7 +184,6 @@ class InternalSubscription(object):
self.publish_results()
except Exception as ex: #we catch everythin since it seems exceptions are lost in loop
self.logger.exception("Exception in %s loop", self)
yield from asyncio.sleep(self.data.RevisedPublishingInterval/1000)
def has_published_results(self):
with self._lock:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment