Commit 50a6acf8 authored by Amit Dev's avatar Amit Dev

Made linux_poller unaware of aio implementation details

parent 6b3cae38
......@@ -38,13 +38,14 @@ cdef enum:
BLOCK_SIZE = 512
cdef int aio_eventfd
cdef coro aio_poller
cdef io_context_t aio_ioctx
cdef dict aio_event_map
cdef iocb aio_iocb[MAX_PENDING_REQS]
cdef aio_setup():
cdef int res
global aio_eventfd, aio_event_map
global aio_eventfd, aio_poller, aio_event_map
res = io_setup(MAX_PENDING_REQS, &aio_ioctx)
if res:
......@@ -52,33 +53,41 @@ cdef aio_setup():
aio_eventfd = eventfd(0, EFD_NONBLOCK)
if aio_eventfd == -1:
raise_oserror()
the_poller._register_fd(aio_eventfd)
aio_event_map = {}
aio_poller = spawn(_aio_poll)
cdef aio_teardown():
cdef int res
aio_poller.shutdown()
close(aio_eventfd)
res = io_destroy(aio_ioctx)
if res:
raise_oserror_with_errno(res)
close(aio_eventfd)
cdef aio_poll():
def _aio_poll():
cdef int r, fd, res
cdef long n
cdef coro co
cdef io_event aio_io_events[MAX_PENDING_REQS]
read(aio_eventfd, <char*>&n, 8)
r = io_getevents(aio_ioctx, 1, n, aio_io_events, NULL)
if r < 0:
raise_oserror()
for i from 0 <= i < r:
fd = aio_io_events[i].obj.aio_fildes
res = aio_io_events[i].res
#print 'POLL: fd=%r, res=%r' % (fd, res)
co = aio_event_map.pop(fd)
co._schedule(res)
while 1:
try:
the_poller._wait_for_read(aio_eventfd)
read(aio_eventfd, <char*>&n, 8)
if n < 1 or n > MAX_PENDING_REQS:
raise_oserror()
r = io_getevents(aio_ioctx, 1, n, aio_io_events, NULL)
if r < 0:
raise_oserror()
for i from 0 <= i < r:
fd = aio_io_events[i].obj.aio_fildes
res = aio_io_events[i].res
#print 'POLL: fd=%r, res=%r' % (fd, res)
co = aio_event_map.pop(fd)
co._schedule(res)
except Shutdown:
break
cdef _aligned_size(size):
if size % BLOCK_SIZE:
......
......@@ -218,12 +218,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
me = the_scheduler._current
target = me
add_to_map = True
IF COMPILE_LINUX_AIO:
if ek.fd == aio_eventfd:
add_to_map = False
if add_to_map:
self.event_map[ek] = target
self.event_map[ek] = target
self._register_event(ek, flags)
return target
......@@ -295,13 +290,10 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
return self._wait_for_with_eof(fd, EPOLLOUT)
cdef py_event _wait_for (self, int fd, int events):
self._register_fd(fd, events)
return _YIELD()
cdef _register_fd (self, int fd, events=EPOLLIN):
cdef event_key ek
ek = event_key (events, fd)
self.set_wait_for (ek)
return _YIELD()
def wait_for (self, int fd, int events):
"""Wait for an event.
......@@ -364,11 +356,6 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
ek = event_key (new_e.events, new_e.data.fd)
IF COMPILE_LINUX_AIO:
if ek.fd == aio_eventfd:
aio_poll()
continue
try:
co = self.event_map[ek]
except KeyError:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment