Commit bc2d628c authored by Olivier R-D's avatar Olivier R-D

move ThreadLoop in utils, prepare use of asyncio

parent 20b99177
......@@ -5,16 +5,7 @@ Internal server implementing opcu-ua interface. can be used on server side or to
from datetime import datetime
import logging
from threading import Lock
from threading import Condition
from threading import Thread
from enum import Enum
import functools
try:
# we prefer to use bundles asyncio version, otherwise fallback to trollius
import asyncio
except ImportError:
import trollius as asyncio
from trollius import From
from opcua import ua
......@@ -28,43 +19,6 @@ from opcua.address_space import MethodService
from opcua.subscription_service import SubscriptionService
from opcua import standard_address_space
class ThreadLoop(Thread):
def __init__(self):
Thread.__init__(self)
self.logger = logging.getLogger(__name__)
self.loop = None
self._cond = Condition()
def start(self):
with self._cond:
Thread.start(self)
self._cond.wait()
def run(self):
self.logger.debug("Starting subscription thread")
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
with self._cond:
self._cond.notify_all()
self.loop.run_forever()
self.logger.debug("subscription thread ended")
def stop(self):
"""
stop subscription loop, thus the subscription thread
"""
self.loop.call_soon_threadsafe(self.loop.stop)
def call_soon(self, callback):
self.loop.call_soon_threadsafe(callback)
def call_later(self, delay, callback):
p = functools.partial(self.loop.call_later, delay, callback)
self.loop.call_soon_threadsafe(p)
class SessionState(Enum):
Created = 0
Activated = 1
......@@ -84,9 +38,9 @@ class InternalServer(object):
self.method_service = MethodService(self.aspace)
self.node_mgt_service = NodeManagementService(self.aspace)
standard_address_space.fill_address_space(self.node_mgt_service)
# standard_address_space.fill_address_space_from_disk(self.aspace)
#standard_address_space.fill_address_space_from_disk(self.aspace)
self.loop = ThreadLoop()
self.loop = utils.ThreadLoop()
self.subcsription_service = SubscriptionService(self.loop, self.aspace)
# create a session to use on server side
......
......@@ -76,8 +76,8 @@ class Server(object):
self.bserver.start()
def stop(self):
self.iserver.stop()
self.bserver.stop()
self.iserver.stop()
def get_root_node(self):
return self.get_node(ua.TwoByteNodeId(ObjectIds.RootFolder))
......
......@@ -116,6 +116,8 @@ class UAProcessor(object):
response = ua.CreateSessionResponse()
response.Parameters = sessiondata
self.logger.info("sending create sesssion response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
......@@ -125,6 +127,7 @@ class UAProcessor(object):
self.session.close_session(deletesubs)
response = ua.CloseSessionResponse()
self.logger.info("sending close sesssion response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
......@@ -136,13 +139,17 @@ class UAProcessor(object):
# result.Results.append(ua.StatusCode(ua.StatusCodes.BadSessionIdInvalid))
response = ua.ServiceFault()
response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadSessionIdInvalid)
self.logger.info("request to activate none existing session, sending service fault response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
else:
result = self.session.activate_session(params)
return True
response = ua.ActivateSessionResponse()
response.Parameters = result
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
result = self.session.activate_session(params)
response = ua.ActivateSessionResponse()
response.Parameters = result
self.logger.info("sending read response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
self.logger.info("Read request")
......@@ -152,6 +159,8 @@ class UAProcessor(object):
response = ua.ReadResponse()
response.Results = results
self.logger.info("sending read response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
......@@ -162,6 +171,8 @@ class UAProcessor(object):
response = ua.WriteResponse()
response.Results = results
self.logger.info("sending write response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
......@@ -172,6 +183,8 @@ class UAProcessor(object):
response = ua.BrowseResponse()
response.Results = results
self.logger.info("sending browse response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
......@@ -183,6 +196,7 @@ class UAProcessor(object):
response = ua.GetEndpointsResponse()
response.Endpoints = endpoints
self.logger.info("sending get endpoints response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
......@@ -194,6 +208,7 @@ class UAProcessor(object):
response = ua.TranslateBrowsePathsToNodeIdsResponse()
response.Results = paths
self.logger.info("sending translate browsepaths to nodeids response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
......@@ -205,6 +220,7 @@ class UAProcessor(object):
response = ua.AddNodesResponse()
response.Results = results
self.logger.info("sending add node response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
......@@ -216,6 +232,7 @@ class UAProcessor(object):
response = ua.CreateSubscriptionResponse()
response.Parameters = result
self.logger.info("sending create subscription response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
......@@ -227,6 +244,7 @@ class UAProcessor(object):
response = ua.DeleteSubscriptionsResponse()
response.Results = results
self.logger.info("sending delte subscription response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
......@@ -237,6 +255,7 @@ class UAProcessor(object):
response = ua.CreateMonitoredItemsResponse()
response.Results = results
self.logger.info("sending create monitored items response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
......@@ -247,6 +266,7 @@ class UAProcessor(object):
response = ua.ModifyMonitoredItemsResponse()
response.Results = results
self.logger.info("sending modify monitored items response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
......@@ -258,6 +278,7 @@ class UAProcessor(object):
response = ua.DeleteMonitoredItemsResponse()
response.Results = results
self.logger.info("sending delete monitored items response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
......@@ -275,6 +296,7 @@ class UAProcessor(object):
with self._datalock:
self._publishdata_queue.append(data) # will be used to send publish answers from server
self.session.publish(params.SubscriptionAcknowledgements)
self.logger.info("publish forward to server")
elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
self.logger.info("re-publish request")
......
......@@ -67,6 +67,7 @@ class Header(uatypes.FrozenClass):
self.ChunkType = chunkType
self.ChannelId = channelid
self.body_size = 0
self.packet_size = 0
self._freeze()
def add_size(self, size):
......@@ -89,7 +90,8 @@ class Header(uatypes.FrozenClass):
hdr = Header()
hdr.MessageType = struct.unpack("<3s", data.read(3))[0]
hdr.ChunkType = struct.unpack("<c", data.read(1))[0]
hdr.body_size = struct.unpack("<I", data.read(4))[0] - 8
hdr.packet_size = struct.unpack("<I", data.read(4))[0]
hdr.body_size = hdr.packet_size - 8
if hdr.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
hdr.body_size -= 4
hdr.ChannelId = struct.unpack("<I", data.read(4))[0]
......
import logging
import uuid
from concurrent.futures import Future
import functools
import threading
try:
# we prefer to use bundles asyncio version, otherwise fallback to trollius
import asyncio
except ImportError:
import trollius as asyncio
from trollius import From
class NotEnoughData(Exception):
pass
class Buffer(object):
......@@ -80,3 +91,91 @@ class SocketWrapper(object):
def create_nonce():
return uuid.uuid4().bytes + uuid.uuid4().bytes # seems we need at least 32 bytes not 16 as python gives us...
class ThreadLoop(threading.Thread):
"""
run an asyncio loop in a thread
"""
def __init__(self):
threading.Thread.__init__(self)
self.logger = logging.getLogger(__name__)
self.loop = None
self._cond = threading.Condition()
def start(self):
with self._cond:
threading.Thread.start(self)
self._cond.wait()
def run(self):
self.logger.debug("Starting subscription thread")
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
with self._cond:
self._cond.notify_all()
self.loop.run_forever()
self.logger.debug("subscription thread ended")
def create_server(self, proto, hostname, port):
return self.loop.create_server(proto, hostname, port)
def stop(self):
"""
stop subscription loop, thus the subscription thread
"""
self.loop.call_soon_threadsafe(self.loop.stop)
def call_soon(self, callback):
self.loop.call_soon_threadsafe(callback)
def call_later(self, delay, callback):
"""
threadsafe call_later from asyncio
"""
p = functools.partial(self.loop.call_later, delay, callback)
self.loop.call_soon_threadsafe(p)
def _create_task(self, future, coro, cb=None):
task = self.loop.create_task(coro)
if cb:
task.add_done_callback(cb)
future.set_result(task)
def create_task(self, coro, cb=None):
"""
threadsafe create_task from asyncio
"""
future = Future()
p = functools.partial(self._create_task, future, coro, cb)
self.loop.call_soon_threadsafe(p)
return future.result()
def run_coro_and_wait(self, coro):
cond = threading.Condition()
def cb(_):
with cond:
cond.notify_all()
with cond:
task = self.create_task(coro, cb)
cond.wait()
return task.result()
def _run_until_complete(self, future, coro):
task = self.loop.run_until_complete(coro)
future.set_result(task)
def run_until_complete(self, coro):
"""
threadsafe run_until_completed from asyncio
"""
future = Future()
p = functools.partial(self._run_until_complete, future, coro)
self.loop.call_soon_threadsafe(p)
return future.result()
#! /usr/bin/env python
import time
import logging
import math
import io
import sys
from datetime import datetime, timedelta
import unittest
import threading
try:
from queue import Queue
except ImportError:
from Queue import Queue
import time
from concurrent.futures import Future
from opcua import ua
......@@ -586,7 +580,6 @@ def add_server_methods(srv):
o = srv.get_objects_node()
v = o.add_method(ua.NodeId("ServerMethodArray2", 2), ua.QualifiedName('ServerMethodArray2', 2), func3, [ua.VariantType.Int64], [ua.VariantType.Int64])
class TestClient(unittest.TestCase, CommonTests):
'''
......@@ -663,6 +656,7 @@ class TestServer(unittest.TestCase, CommonTests):
self.assertEqual(result, 4.2)
if __name__ == '__main__':
logging.basicConfig(level=logging.WARN)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment