Commit ba775f62 authored by Amit Dev's avatar Amit Dev

Basic disk aio support on linux via libaio

parent 6025e3a9
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_Check, PyBytes_AS_STRING
cdef extern from "stdlib.h":
int posix_memalign(void **memptr, size_t alignment, size_t size)
void free(void *ptr)
cdef extern from "libaio.h":
ctypedef struct io_context_t:
int a
cdef struct iocb:
void *data
unsigned key
short aio_lio_opcode
short aio_reqprio
int aio_fildes
cdef struct io_event:
void *data
iocb *obj
long long res
int io_setup(unsigned nr_events, io_context_t *ctxp)
int io_destroy(io_context_t ctx)
void io_prep_pread(iocb *iocb, int fd, void *buf, size_t count, long long offset)
void io_prep_pwrite(iocb *iocb, int fd, void *buf, size_t count, long long offset)
int io_submit(io_context_t ctx, long nr, iocb *iocbs[])
int io_getevents(io_context_t ctx, long min_nr, long nr, io_event *events, timespec *timeout)
void io_set_eventfd(iocb *iocb, int eventfd)
cdef extern from "sys/eventfd.h":
int eventfd(unsigned int initval, int flags)
int EFD_NONBLOCK
cdef enum:
MAX_PENDING_REQS = 1024
BLOCK_SIZE = 512
cdef int aio_eventfd
cdef io_context_t aio_ioctx
cdef dict aio_event_map
cdef iocb aio_iocb[MAX_PENDING_REQS]
cdef aio_setup():
cdef int res
global aio_eventfd, aio_event_map
res = io_setup(MAX_PENDING_REQS, &aio_ioctx)
if res:
raise_oserror_with_errno(res)
aio_eventfd = eventfd(0, EFD_NONBLOCK)
if aio_eventfd == -1:
raise_oserror()
the_poller._register_fd(aio_eventfd)
aio_event_map = {}
cdef aio_teardown():
cdef int res
res = io_destroy(aio_ioctx)
if res:
raise_oserror_with_errno(res)
close(aio_eventfd)
cdef aio_poll():
cdef int r, fd, res
cdef long n
cdef coro co
cdef io_event aio_io_events[MAX_PENDING_REQS]
read(aio_eventfd, <char*>&n, 8)
r = io_getevents(aio_ioctx, 1, n, aio_io_events, NULL)
if r < 0:
raise_oserror()
for i from 0 <= i < r:
fd = aio_io_events[i].obj.aio_fildes
res = aio_io_events[i].res
#print 'POLL: fd=%r, res=%r' % (fd, res)
co = aio_event_map.pop(fd)
co._schedule(res)
cdef _aligned_size(size):
if size % BLOCK_SIZE:
return (size/BLOCK_SIZE + 1) * BLOCK_SIZE
else:
return size
cdef long _aio_wb, _aio_rb, _aio_rnb, _aio_wnb, _aio_rp, _aio_wp, _aio_pending
_aio_wb = 0
_aio_rb = 0
_aio_rnb = 0
_aio_wnb = 0
_aio_rp = 0
_aio_wp = 0
_aio_pending = 0
def aio_stats():
"""Return AIO statistics.
:Return:
Returns a tuple of ``(rnb, rb, wnb, wb, rp, wp)``:
- ``rnb``: Non-blocking reads.
- ``rb``: Blocking-reads.
- ``wnb``: Non-blocking writes.
- ``wb``: Blocking-writes.
- ``rp``: Pending reads.
- ``wp``: Pending writes.
"""
return _aio_rnb, _aio_rb, _aio_wnb, _aio_wb, _aio_rp, _aio_wp
def aio_read (int fd, int nbytes, uint64_t offset):
"""Asynchronously read data from a file. fd should be opened in
O_DIRECT mode by the caller to make this non blocking.
:Parameters:
- `fd`: The file descriptor to read from.
- `nbytes`: The number of bytes to read.
- `offset`: The offset to read from.
:Return:
Returns a string of data read from the file.
:Exceptions:
- `OSError`: OS-level error.
"""
global _aio_pending, _aio_rb, _aio_rnb, _aio_rp, aio_iocb
cdef object buf, res
cdef int aligned_size
cdef iocb *piocb
cdef iocb *iocbs[1]
cdef char *strbuf
aligned_size = _aligned_size(nbytes)
posix_memalign(<void**>&strbuf, BLOCK_SIZE, aligned_size)
me = the_scheduler._current
piocb = &aio_iocb[_aio_pending]
_aio_pending += 1
_aio_rp += 1
io_prep_pread(piocb, fd, strbuf, aligned_size, offset)
io_set_eventfd(piocb, aio_eventfd)
iocbs[0] = piocb
io_submit(aio_ioctx, 1, iocbs)
aio_event_map[fd] = me
res = _YIELD()
_aio_pending -= 1
_aio_rp -= 1
buf = PyBytes_FromStringAndSize (strbuf, nbytes)
free(strbuf)
return buf
def aio_write (int fd, object buf, uint64_t offset):
"""Asynchronously write data to a file. fd should be opened in
O_DIRECT mode by the caller to make this non blocking.
:Parameters:
- `fd`: The file descriptor to write to.
- `buf`: String data to write.
- `offset`: The offset to write the data.
:Return:
Returns the number of bytes written.
:Exceptions:
- `OSError`: OS-level error.
"""
global _aio_pending, _aio_wb, _aio_wnb, _aio_wp, aio_iocb
cdef object res
cdef int size, aligned_size
cdef iocb *piocb
cdef iocb *iocbs[1]
cdef void *strbuf
size = PyBytes_Size(buf)
aligned_size = _aligned_size(size)
posix_memalign(&strbuf, BLOCK_SIZE, aligned_size)
memcpy(strbuf, PyBytes_AS_STRING(buf), size)
me = the_scheduler._current
piocb = &aio_iocb[_aio_pending]
_aio_pending += 1
_aio_wp += 1
io_prep_pwrite(piocb, fd, strbuf, aligned_size, offset)
io_set_eventfd(piocb, aio_eventfd)
iocbs[0] = piocb
io_submit(aio_ioctx, 1, iocbs)
aio_event_map[fd] = me
res = _YIELD()
_aio_pending -= 1
_aio_wp -= 1
free(strbuf)
return res
...@@ -33,6 +33,9 @@ from posix cimport unistd ...@@ -33,6 +33,9 @@ from posix cimport unistd
from libc cimport errno from libc cimport errno
from xlibc.stdlib cimport alloca from xlibc.stdlib cimport alloca
IF COMPILE_LINUX_AIO:
include "linux_aio.pyx"
cdef extern from "sys/time.h": cdef extern from "sys/time.h":
cdef struct timespec: cdef struct timespec:
unsigned int tv_sec unsigned int tv_sec
...@@ -185,11 +188,15 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -185,11 +188,15 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
self.ep_fd = epoll_create(1000) self.ep_fd = epoll_create(1000)
if self.ep_fd == -1: if self.ep_fd == -1:
raise SystemError, "epoll_create() failed" raise SystemError, "epoll_create() failed"
IF COMPILE_LINUX_AIO:
aio_setup()
cdef tear_down(self): cdef tear_down(self):
if self.ep_fd != -1: if self.ep_fd != -1:
unistd.close (self.ep_fd) unistd.close (self.ep_fd)
self.ep_fd = -1 self.ep_fd = -1
IF COMPILE_LINUX_AIO:
aio_teardown()
cdef object set_wait_for (self, event_key ek): cdef object set_wait_for (self, event_key ek):
cdef coro me cdef coro me
...@@ -211,6 +218,11 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -211,6 +218,11 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
me = the_scheduler._current me = the_scheduler._current
target = me target = me
add_to_map = True
IF COMPILE_LINUX_AIO:
if ek.fd == aio_eventfd:
add_to_map = False
if add_to_map:
self.event_map[ek] = target self.event_map[ek] = target
self._register_event(ek, flags) self._register_event(ek, flags)
...@@ -283,10 +295,13 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -283,10 +295,13 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
return self._wait_for_with_eof(fd, EPOLLOUT) return self._wait_for_with_eof(fd, EPOLLOUT)
cdef py_event _wait_for (self, int fd, int events): cdef py_event _wait_for (self, int fd, int events):
self._register_fd(fd, events)
return _YIELD()
cdef _register_fd (self, int fd, events=EPOLLIN):
cdef event_key ek cdef event_key ek
ek = event_key (events, fd) ek = event_key (events, fd)
et = self.set_wait_for (ek) self.set_wait_for (ek)
return _YIELD()
def wait_for (self, int fd, int events): def wait_for (self, int fd, int events):
"""Wait for an event. """Wait for an event.
...@@ -347,16 +362,21 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -347,16 +362,21 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
new_e.events = new_e.events & ~(EPOLLHUP) new_e.events = new_e.events & ~(EPOLLHUP)
new_e.events = new_e.events & ~(EPOLLERR) new_e.events = new_e.events & ~(EPOLLERR)
_py_event = py_event()
_py_event.__fast_init__(&new_e)
ek = event_key (new_e.events, new_e.data.fd) ek = event_key (new_e.events, new_e.data.fd)
IF COMPILE_LINUX_AIO:
if ek.fd == aio_eventfd:
aio_poll()
continue
try: try:
co = self.event_map[ek] co = self.event_map[ek]
except KeyError: except KeyError:
pass pass
#W ('un-handled event: fd=%s events=%s\n' % (new_e.data.fd, new_e.events)) #W ('un-handled event: fd=%s events=%s\n' % (new_e.data.fd, new_e.events))
else: else:
_py_event = py_event()
_py_event.__fast_init__(&new_e)
if isinstance (co, coro): if isinstance (co, coro):
co._schedule (_py_event) co._schedule (_py_event)
else: else:
......
...@@ -40,14 +40,20 @@ def check_lio(): ...@@ -40,14 +40,20 @@ def check_lio():
# arbitrary options to distutils. # arbitrary options to distutils.
return True return True
if newer('test/build/test_lio.c', 'test/build/test_lio'): if newer('test/build/test_lio.c', 'test/build/test_lio'):
status = os.system('gcc -o test/build/test_lio test/build/test_lio.c') status = os.system('gcc -o test/build/test_lio test/build/test_lio.c > /dev/null 2>&1')
if not exit_ok(status): if not exit_ok(status):
return False return False
status = os.system('test/build/test_lio') status = os.system('test/build/test_lio')
return exit_ok(status) return exit_ok(status)
def check_linux_aio():
return os.uname()[0] == 'Linux' and exit_ok(os.system('ldconfig -p | grep libaio > /dev/null'))
USE_LINUX_AIO = check_linux_aio()
compile_time_env = { compile_time_env = {
'COMPILE_LIO': check_lio(), 'COMPILE_LIO': check_lio(),
'COMPILE_LINUX_AIO': USE_LINUX_AIO,
'COMPILE_NETDEV' : False, 'COMPILE_NETDEV' : False,
'COMPILE_LZO' : False, 'COMPILE_LZO' : False,
'COMPILE_LZ4' : False, 'COMPILE_LZ4' : False,
...@@ -57,8 +63,6 @@ compile_time_env = { ...@@ -57,8 +63,6 @@ compile_time_env = {
#-------------------------------------------------------------------------------- #--------------------------------------------------------------------------------
# OpenSSL support # OpenSSL support
#-------------------------------------------------------------------------------- #--------------------------------------------------------------------------------
import os
import sys
# If you need NPN support (for SPDY), you most likely will have to link against # If you need NPN support (for SPDY), you most likely will have to link against
# newer openssl than the one that came with your OS. (this is circa 2012). # newer openssl than the one that came with your OS. (this is circa 2012).
...@@ -145,7 +149,7 @@ setup ( ...@@ -145,7 +149,7 @@ setup (
# and uncomment one of the following: # and uncomment one of the following:
#libraries=['lzo2', 'z'] #libraries=['lzo2', 'z']
#libraries=['lz4', 'z'], #libraries=['lz4', 'z'],
libraries=['z'] libraries=['z'] + (['aio'] if USE_LINUX_AIO else [])
), ),
Extension ('coro.oserrors', ['coro/oserrors.pyx', ],), Extension ('coro.oserrors', ['coro/oserrors.pyx', ],),
Extension ('coro.dns.packet', ['coro/dns/packet.pyx', ],), Extension ('coro.dns.packet', ['coro/dns/packet.pyx', ],),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment