Commit ad62c5c7 authored by Julien Muchembled's avatar Julien Muchembled

fixup! Fix use of several EpollEventManager within the same process

set_wakeup_fd only works in main thread.

See commit f47dd646.
parent ffecd4ae
...@@ -199,7 +199,8 @@ class Application(BaseApplication, Monitor): ...@@ -199,7 +199,8 @@ class Application(BaseApplication, Monitor):
def run(self): def run(self):
try: try:
self._run() with self.em.wakeup_fd():
self._run()
except Exception: except Exception:
logging.exception('Pre-mortem data:') logging.exception('Pre-mortem data:')
self.log() self.log()
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
import fcntl, os import fcntl, os
from collections import deque from collections import deque
from contextlib import contextmanager
from signal import set_wakeup_fd from signal import set_wakeup_fd
from time import time from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
...@@ -36,38 +37,6 @@ def nonblock(fd): ...@@ -36,38 +37,6 @@ def nonblock(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL) flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# We use set_wakeup_fd to handle the case of a signal that happens between
# Python checks for signals and epoll_wait is called. Otherwise, the signal
# would not be processed as long as epoll_wait sleeps.
# If a process has several instances of EpollEventManager like in threaded
# tests, it does not matter which one is woke up by signals.
class WakeupFD(object):
_lock = Lock()
_fds = []
@classmethod
def add(cls, fd):
fds = cls._fds
with cls._lock:
if fds:
assert fd not in fds, (fd, fds)
else:
prev = set_wakeup_fd(fd)
assert prev == -1, (fd, prev)
fds.append(fd)
@classmethod
def remove(cls, fd):
fds = cls._fds
with cls._lock:
i = fds.index(fd)
del fds[i]
if not (i and fds):
prev = set_wakeup_fd(fds[0] if fds else -1)
assert prev == fd, (fd, prev)
class EpollEventManager(object): class EpollEventManager(object):
"""This class manages connections and events based on epoll(5).""" """This class manages connections and events based on epoll(5)."""
...@@ -86,7 +55,6 @@ class EpollEventManager(object): ...@@ -86,7 +55,6 @@ class EpollEventManager(object):
self._wakeup_wfd = w self._wakeup_wfd = w
nonblock(r) nonblock(r)
nonblock(w) nonblock(w)
WakeupFD.add(w)
self.epoll.register(r, EPOLLIN) self.epoll.register(r, EPOLLIN)
self._trigger_lock = Lock() self._trigger_lock = Lock()
self.lock = l = Lock() self.lock = l = Lock()
...@@ -105,7 +73,6 @@ class EpollEventManager(object): ...@@ -105,7 +73,6 @@ class EpollEventManager(object):
self._closeRelease = release self._closeRelease = release
def close(self): def close(self):
WakeupFD.remove(self._wakeup_wfd)
os.close(self._wakeup_wfd) os.close(self._wakeup_wfd)
os.close(self._wakeup_rfd) os.close(self._wakeup_rfd)
for c in self.connection_dict.values(): for c in self.connection_dict.values():
...@@ -113,6 +80,26 @@ class EpollEventManager(object): ...@@ -113,6 +80,26 @@ class EpollEventManager(object):
self.epoll.close() self.epoll.close()
del self.__dict__ del self.__dict__
@contextmanager
def wakeup_fd(self):
"""
We use set_wakeup_fd to handle the case of a signal that happens
between Python checks for signals and epoll_wait is called. Otherwise,
the signal would not be processed as long as epoll_wait sleeps.
"""
fd = self._wakeup_wfd
try:
prev = set_wakeup_fd(fd)
except ValueError: # not main thread
yield
else:
assert prev != fd
try:
yield
finally:
prev = set_wakeup_fd(prev)
assert prev == fd, prev
def getConnectionList(self): def getConnectionList(self):
# XXX: use index # XXX: use index
while 1: while 1:
......
...@@ -77,7 +77,8 @@ class ThreadedApplication(BaseApplication): ...@@ -77,7 +77,8 @@ class ThreadedApplication(BaseApplication):
def run(self): def run(self):
logging.debug("Started %s", self.poll_thread) logging.debug("Started %s", self.poll_thread)
try: try:
self._run() with self.em.wakeup_fd():
self._run()
finally: finally:
super(ThreadedApplication, self).close() super(ThreadedApplication, self).close()
logging.debug("Poll thread stopped") logging.debug("Poll thread stopped")
......
...@@ -169,7 +169,8 @@ class Application(BaseApplication): ...@@ -169,7 +169,8 @@ class Application(BaseApplication):
def run(self): def run(self):
try: try:
self._run() with self.em.wakeup_fd():
self._run()
except BaseException, e: except BaseException, e:
if not isinstance(e, SystemExit) or e.code: if not isinstance(e, SystemExit) or e.code:
logging.exception('Pre-mortem data:') logging.exception('Pre-mortem data:')
......
...@@ -65,16 +65,17 @@ class NeoCTL(BaseApplication): ...@@ -65,16 +65,17 @@ class NeoCTL(BaseApplication):
def __ask(self, packet): def __ask(self, packet):
# TODO: make thread-safe # TODO: make thread-safe
connection = self.__getConnection() with self.em.wakeup_fd():
connection.ask(packet) connection = self.__getConnection()
response_queue = self.response_queue connection.ask(packet)
assert len(response_queue) == 0 response_queue = self.response_queue
while self.connected: assert not response_queue
self.em.poll(1) while self.connected:
if response_queue: self.em.poll(1)
break if response_queue:
else: break
raise NotReadyException, 'Connection closed' else:
raise NotReadyException('Connection closed')
response = response_queue.pop() response = response_queue.pop()
if response[0] == Packets.Error and \ if response[0] == Packets.Error and \
response[1] == ErrorCodes.NOT_READY: response[1] == ErrorCodes.NOT_READY:
...@@ -212,10 +213,11 @@ class NeoCTL(BaseApplication): ...@@ -212,10 +213,11 @@ class NeoCTL(BaseApplication):
return response[2] return response[2]
def flushLog(self): def flushLog(self):
conn = self.__getConnection() with self.em.wakeup_fd():
conn.send(Packets.FlushLog()) conn = self.__getConnection()
while conn.pending(): conn.send(Packets.FlushLog())
self.em.poll(1) while conn.pending():
self.em.poll(1)
def getMonitorInformation(self): def getMonitorInformation(self):
response = self.__ask(Packets.AskMonitorInformation()) response = self.__ask(Packets.AskMonitorInformation())
......
...@@ -192,7 +192,7 @@ class Application(BaseApplication): ...@@ -192,7 +192,7 @@ class Application(BaseApplication):
def run(self): def run(self):
try: try:
with self.dm.lock: with self.em.wakeup_fd(), self.dm.lock:
self._run() self._run()
except Exception: except Exception:
logging.exception('Pre-mortem data:') logging.exception('Pre-mortem data:')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment