Commit 5c9c78aa authored by Amit Dev's avatar Amit Dev

Unit tests and minor fixes

parent 50a6acf8
# -*- Mode: Python -*-
#
# This module provides asynchronous, non blocking disk io support in Linux via libaio.
# For most of the cases one can live with blocking io on Linux since it is buffered and
# this is useful only if you need direct control of disk io and have your own cache etc.
#
# Asynchronous disk io support in Linux is not as good as in FreeBSD. There are bunch of
# options (posix aio, libeio) which uses userland threads to mimic this. This module uses
# libaio which does not need such dependancies and ties the event handling with epoll with
# an eventfd.
#
# libaio asynchronous APIs need the offset and size of reads and writes to be
# block-aligned (512 bytes) on 2.6+ kernels. Consequently this works best when
# the caller takes this into consideration. Currently aio_read() supports any
# offset/size, but aio_write() needs the offset to be aligned.
#
# Note: this file is included by <coro.pyx> if libaio is available.
#
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_Check, PyBytes_AS_STRING
cdef extern from "stdlib.h":
......@@ -43,6 +62,17 @@ cdef io_context_t aio_ioctx
cdef dict aio_event_map
cdef iocb aio_iocb[MAX_PENDING_REQS]
cdef _spawn_first(fun):
# Spawn this function before other pending coros
cdef coro co
id = get_coro_id()
co = coro (fun, [], {}, id)
_all_threads[id] = co
co.scheduled = 1
the_scheduler.pending.insert(0, (co, None))
return co
cdef aio_setup():
cdef int res
global aio_eventfd, aio_poller, aio_event_map
......@@ -54,7 +84,8 @@ cdef aio_setup():
if aio_eventfd == -1:
raise_oserror()
aio_event_map = {}
aio_poller = spawn(_aio_poll)
# _aio_poll needs to run first to listen for events
aio_poller = _spawn_first(_aio_poll)
cdef aio_teardown():
cdef int res
......@@ -83,15 +114,15 @@ def _aio_poll():
for i from 0 <= i < r:
fd = aio_io_events[i].obj.aio_fildes
res = aio_io_events[i].res
#print 'POLL: fd=%r, res=%r' % (fd, res)
co = aio_event_map.pop(fd)
co._schedule(res)
except Shutdown:
break
cdef _aligned_size(size):
if size % BLOCK_SIZE:
return (size/BLOCK_SIZE + 1) * BLOCK_SIZE
cdef _align(size, forward=True):
extra = size % BLOCK_SIZE
if extra:
return size - extra + BLOCK_SIZE if forward else size - extra
else:
return size
......@@ -135,16 +166,16 @@ def aio_read (int fd, int nbytes, uint64_t offset):
:Exceptions:
- `OSError`: OS-level error.
"""
# XXX: Remove limitation that offset needs to be a multiple of BLOCK_SIZE
global _aio_pending, _aio_rb, _aio_rnb, _aio_rp, aio_iocb
cdef object buf, res
cdef int aligned_size
cdef int aligned_size, aligned_offset
cdef iocb *piocb
cdef iocb *iocbs[1]
cdef char *strbuf
aligned_size = _aligned_size(nbytes)
aligned_offset = _align(offset, forward=False)
aligned_size = _align(offset+nbytes-aligned_offset)
res = posix_memalign(<void**>&strbuf, BLOCK_SIZE, aligned_size)
if res:
raise_oserror_with_errno(res)
......@@ -152,7 +183,7 @@ def aio_read (int fd, int nbytes, uint64_t offset):
piocb = &aio_iocb[_aio_pending]
_aio_pending += 1
_aio_rp += 1
io_prep_pread(piocb, fd, strbuf, aligned_size, offset)
io_prep_pread(piocb, fd, strbuf, aligned_size, aligned_offset)
io_set_eventfd(piocb, aio_eventfd)
iocbs[0] = piocb
res = io_submit(aio_ioctx, 1, iocbs)
......@@ -165,9 +196,9 @@ def aio_read (int fd, int nbytes, uint64_t offset):
assert res >= nbytes
_aio_pending -= 1
_aio_rp -= 1
buf = PyBytes_FromStringAndSize (strbuf, nbytes)
buf = PyBytes_FromStringAndSize (strbuf, aligned_size)
free(strbuf)
return buf
return buf[offset-aligned_offset:offset-aligned_offset+nbytes]
def aio_write (int fd, object buf, uint64_t offset):
"""Asynchronously write data to a file. fd should be opened in
......@@ -176,7 +207,7 @@ def aio_write (int fd, object buf, uint64_t offset):
:Parameters:
- `fd`: The file descriptor to write to.
- `buf`: String data to write.
- `offset`: The offset to write the data.
- `offset`: The offset to write data. Must be multiple of BLOCK_SIZE.
:Return:
Returns the number of bytes written.
......@@ -185,7 +216,6 @@ def aio_write (int fd, object buf, uint64_t offset):
- `OSError`: OS-level error.
"""
# XXX: Remove limitation that offset needs to be a multiple of BLOCK_SIZE
global _aio_pending, _aio_wb, _aio_wnb, _aio_wp, aio_iocb
cdef object res
......@@ -194,8 +224,10 @@ def aio_write (int fd, object buf, uint64_t offset):
cdef iocb *iocbs[1]
cdef void *strbuf
assert not offset % BLOCK_SIZE
size = PyBytes_Size(buf)
aligned_size = _aligned_size(size)
aligned_size = _align(size)
res = posix_memalign(&strbuf, BLOCK_SIZE, aligned_size)
if res:
......
......@@ -30,8 +30,14 @@ import unittest
import random
import resource
UNAME = os.uname()[0]
class Test(unittest.TestCase):
FLAG = os.O_RDWR | os.O_CREAT | os.O_TRUNC
if UNAME == 'Linux':
FLAG |= os.O_DIRECT
def tearDown(self):
if os.path.exists('test_aio_file'):
os.unlink('test_aio_file')
......@@ -45,7 +51,7 @@ class Test(unittest.TestCase):
def test_read_write(self):
"""Test read/write."""
self.fd = os.open('test_lio_file', os.O_RDWR|os.O_CREAT|os.O_TRUNC)
self.fd = os.open('test_lio_file', Test.FLAG)
# Simple 1-byte test.
self._read_write('a')
......@@ -55,12 +61,12 @@ class Test(unittest.TestCase):
self._read_write(data)
# Test offset read/write.
filesize = 512 * 1024
orig_data = os.urandom(filesize)
orig_data = os.urandom(512 * 1024)
filesize = len(orig_data)
coro.aio_write(self.fd, orig_data, 0)
for x in xrange(100):
size = random.randint(1, filesize)
offset = random.randint(0, filesize)
offset = random.randint(0, filesize-size)
data = coro.aio_read(self.fd, size, offset)
self.assertEqual(data, orig_data[offset:offset+size])
......@@ -69,7 +75,7 @@ class Test(unittest.TestCase):
def test_leak(self):
"""Test map leak."""
# There was a bug where we were leaking events in the event map.
self.fd = os.open('test_lio_file', os.O_RDWR|os.O_CREAT|os.O_TRUNC)
self.fd = os.open('test_lio_file', Test.FLAG)
event_size = len(coro.event_map)
......@@ -78,7 +84,7 @@ class Test(unittest.TestCase):
coro.aio_write(self.fd, orig_data, 0)
for x in xrange(100):
size = random.randint(1, filesize)
offset = random.randint(0, filesize)
offset = random.randint(0, filesize-size)
data = coro.aio_read(self.fd, size, offset)
self.assertEqual(data, orig_data[offset:offset+size])
......@@ -89,14 +95,14 @@ class Test(unittest.TestCase):
for x in xrange(100):
size = random.randint(1, filesize)
offset = random.randint(0, filesize)
offset = random.randint(0, filesize-size)
self.assertRaises(OSError, coro.aio_read, self.fd, size, offset)
self.assertEqual(event_size, len(coro.event_map))
def test_error(self):
"""Test error return."""
fd = os.open('test_aio_file', os.O_RDWR|os.O_CREAT|os.O_TRUNC)
fd = os.open('test_aio_file', Test.FLAG)
data = os.urandom(1024 * 1024)
r = coro.aio_write(fd, data, 0)
self.assertEqual(r, len(data))
......@@ -105,14 +111,15 @@ class Test(unittest.TestCase):
# Rip away the file descriptor.
os.close(fd)
# Verify it fails.
self.assertRaises(oserrors.EBADF, coro.aio_read, fd, len(data), 0)
self.assertRaises(OSError, coro.aio_read, fd, len(data), 0)
# Try a test that will fail from aio_return.
# (NOTE: On FreeBSD before 7, this would actually show up as an
# error immediately from aio_error, but in FreeBSD 7 it now appears to
# go through the kqueue code path.)
soft, hard = resource.getrlimit(resource.RLIMIT_FSIZE)
fd = os.open('test_aio_file', os.O_RDWR|os.O_CREAT|os.O_TRUNC)
if soft >= 0:
fd = os.open('test_aio_file', Test.FLAG)
self.assertRaises(oserrors.EFBIG, coro.aio_write, fd, data, soft)
os.close(fd)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment