Commit f7f8533a authored by Julien Muchembled's avatar Julien Muchembled

Small optimization in epoll loop

parent 90a5aa17
......@@ -482,6 +482,7 @@ class Connection(BaseConnection):
def readable(self):
"""Called when self is readable."""
# last known remote activity
empty_queue = not self._queue
try:
try:
if self.connector.receive(self.read_buf):
......@@ -499,7 +500,7 @@ class Connection(BaseConnection):
except PacketMalformedError, e:
logging.error('malformed packet from %r: %s', self, e)
self._closure()
return not not self._queue
return empty_queue and not not self._queue
def hasPendingMessages(self):
"""
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from collections import deque
from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EAGAIN, EEXIST, EINTR, ENOENT
......@@ -41,7 +42,7 @@ class EpollEventManager(object):
self.reader_set = set()
self.writer_set = set()
self.epoll = epoll()
self._pending_processing = []
self._pending_processing = deque()
self._trigger_list = []
self._trigger_fd, w = os.pipe()
os.close(w)
......@@ -112,12 +113,13 @@ class EpollEventManager(object):
self.addReader(conn)
def unregister(self, conn, close=False):
new_pending_processing = [x for x in self._pending_processing
if x is not conn]
# Check that we removed at most one entry from
# self._pending_processing .
assert len(new_pending_processing) > len(self._pending_processing) - 2
self._pending_processing = new_pending_processing
try:
self._pending_processing.remove(conn)
except ValueError:
pass
else:
# Check that there was no duplicate.
assert conn not in self._pending_processing
connector = conn.getConnector()
fd = connector.getDescriptor()
try:
......@@ -144,24 +146,18 @@ class EpollEventManager(object):
def isIdle(self):
return not (self._pending_processing or self.writer_set)
def _addPendingConnection(self, conn):
pending_processing = self._pending_processing
if conn not in pending_processing:
pending_processing.append(conn)
def poll(self, blocking=1):
if not self._pending_processing:
# Fetch messages from polled file descriptors
pending_processing = self._pending_processing
if not pending_processing:
self._poll(blocking)
if not self._pending_processing:
if not pending_processing:
return
to_process = self._pending_processing.pop(0)
to_process = pending_processing.popleft()
try:
to_process.process()
finally:
# ...and requeue if there are pending messages
if to_process.hasPendingMessages():
self._addPendingConnection(to_process)
pending_processing.append(to_process)
# Non-blocking call: as we handled a packet, we should just offer
# poll a chance to fetch & send already-available data, but it must
# not delay us.
......@@ -209,6 +205,7 @@ class EpollEventManager(object):
return
else:
if event_list:
pending_processing = self._pending_processing
wlist = []
elist = []
for fd, event in event_list:
......@@ -218,7 +215,7 @@ class EpollEventManager(object):
except KeyError:
continue
if conn.readable():
self._addPendingConnection(conn)
pending_processing.append(conn)
if event & EPOLLOUT:
wlist.append(fd)
if event & (EPOLLERR | EPOLLHUP):
......@@ -244,7 +241,7 @@ class EpollEventManager(object):
del action_list[:]
continue
if conn.readable():
self._addPendingConnection(conn)
pending_processing.append(conn)
return
finally:
self._closeRelease()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment