Commit cb8a5a88 authored by Julien Muchembled's avatar Julien Muchembled

client: replace Event by a pipe as a way to stop the poll loop

This is a prerequisite for tickless poll loops.
parent 4a328ade
...@@ -15,18 +15,19 @@ ...@@ -15,18 +15,19 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from logging import DEBUG, ERROR from logging import DEBUG, ERROR
from threading import Thread, Event, enumerate as thread_enum from threading import Thread, enumerate as thread_enum
from neo.lib import logging from neo.lib import logging
from neo.lib.locking import Lock from neo.lib.locking import Lock
class _ThreadedPoll(Thread): class _ThreadedPoll(Thread):
"""Polling thread.""" """Polling thread."""
stopping = False
def __init__(self, em, **kw): def __init__(self, em, **kw):
Thread.__init__(self, **kw) Thread.__init__(self, **kw)
self.em = em self.em = em
self.daemon = True self.daemon = True
self._stop = Event()
def run(self): def run(self):
_log = logging.log _log = logging.log
...@@ -34,25 +35,24 @@ class _ThreadedPoll(Thread): ...@@ -34,25 +35,24 @@ class _ThreadedPoll(Thread):
# Ignore errors due to garbage collection on exit # Ignore errors due to garbage collection on exit
try: try:
_log(*args, **kw) _log(*args, **kw)
except: except Exception:
if not self.stopping(): if not self.stopping:
raise raise
log(DEBUG, 'Started %s', self) log(DEBUG, 'Started %s', self)
while not self.stopping():
try: try:
# XXX: Delay cannot be infinite here, because we need while 1:
# to check connection timeout and thread shutdown. try:
# XXX: Delay can't be infinite here, because we need
# to check connection timeouts.
self.em.poll(1) self.em.poll(1)
except: except Exception:
log(ERROR, 'poll raised, retrying', exc_info=1) log(ERROR, 'poll raised, retrying', exc_info=1)
finally:
log(DEBUG, 'Threaded poll stopped') log(DEBUG, 'Threaded poll stopped')
self._stop.clear()
def stop(self): def stop(self):
self._stop.set() self.stopping = True
self.em.wakeup(True)
def stopping(self):
return self._stop.isSet()
class ThreadedPoll(object): class ThreadedPoll(object):
""" """
...@@ -81,7 +81,7 @@ class ThreadedPoll(object): ...@@ -81,7 +81,7 @@ class ThreadedPoll(object):
self._status_lock_acquire() self._status_lock_acquire()
try: try:
thread = self._thread thread = self._thread
if thread.stopping(): if thread.stopping:
# XXX: ideally, we should wake thread up here, to be sure not # XXX: ideally, we should wake thread up here, to be sure not
# to wait forever. # to wait forever.
thread.join() thread.join()
......
...@@ -14,22 +14,30 @@ ...@@ -14,22 +14,30 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, thread
from time import time from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EAGAIN, EINTR, ENOENT from errno import EAGAIN, EEXIST, EINTR, ENOENT
from . import logging from . import logging
from .locking import Lock
class EpollEventManager(object): class EpollEventManager(object):
"""This class manages connections and events based on epoll(5).""" """This class manages connections and events based on epoll(5)."""
_trigger_exit = False
def __init__(self): def __init__(self):
self.connection_dict = {} self.connection_dict = {}
self.reader_set = set() self.reader_set = set()
self.writer_set = set() self.writer_set = set()
self.epoll = epoll() self.epoll = epoll()
self._pending_processing = [] self._pending_processing = []
self._trigger_fd, w = os.pipe()
os.close(w)
self._trigger_lock = Lock()
def close(self): def close(self):
os.close(self._trigger_fd)
for c in self.connection_dict.values(): for c in self.connection_dict.values():
c.close() c.close()
del self.__dict__ del self.__dict__
...@@ -150,6 +158,12 @@ class EpollEventManager(object): ...@@ -150,6 +158,12 @@ class EpollEventManager(object):
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError: except KeyError:
if fd == self._trigger_fd:
with self._trigger_lock:
self.epoll.unregister(fd)
if self._trigger_exit:
del self._trigger_exit
thread.exit()
continue continue
if conn.readable(): if conn.readable():
self._addPendingConnection(conn) self._addPendingConnection(conn)
...@@ -158,6 +172,16 @@ class EpollEventManager(object): ...@@ -158,6 +172,16 @@ class EpollEventManager(object):
for conn in self.connection_dict.values(): for conn in self.connection_dict.values():
conn.checkTimeout(t) conn.checkTimeout(t)
def wakeup(self, exit=False):
with self._trigger_lock:
self._trigger_exit |= exit
try:
self.epoll.register(self._trigger_fd)
except IOError, e:
# Ignore if 'wakeup' is called several times in a row.
if e.errno != EEXIST:
raise
def addReader(self, conn): def addReader(self, conn):
connector = conn.getConnector() connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector() assert connector is not None, conn.whoSetConnector()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment