Commit 66dd4e4c authored by Amit Dev's avatar Amit Dev

Use c++ multimap directly from cython.

parent 57861e6f
......@@ -797,7 +797,7 @@ cdef public void _wrap1 "_wrap1" (coro co):
# event queue
# ================================================================================
include "event_queue.pyx"
from event_queue import event_queue, __event_queue_version__
cdef class event:
cdef public uint64_t t
......@@ -907,9 +907,9 @@ cdef public class sched [ object sched_object, type sched_type ]:
cdef int stack_size
cdef public object _current, pending, staging
cdef coro _last
cdef readonly event_queue events
cdef int profiling
cdef uint64_t latency_threshold
cdef object events
def __init__ (self, stack_size=4*1024*1024):
self.stack_size = stack_size
......@@ -1108,7 +1108,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
tb = timebomb (self._current, delta)
e = event (tb.when, tb)
self.events.c_insert (e.t, e)
self.events.insert (e.t, e)
try:
try:
return PyObject_Call (function, args, kwargs)
......@@ -1121,7 +1121,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
raise
finally:
if not e.expired:
self.events.c_delete (e.t, e)
self.events.remove (e.t, e)
e.expire()
cdef sleep (self, uint64_t when):
......@@ -1134,12 +1134,12 @@ cdef public class sched [ object sched_object, type sched_type ]:
IF CORO_DEBUG:
assert self._current is not None
e = event (when, self._current)
self.events.c_insert (when, e)
self.events.insert (when, e)
try:
(<coro>self._current).__yield ()
finally:
if not e.expired:
self.events.c_delete (e.t, e)
self.events.remove (e.t, e)
e.expire()
def sleep_relative (self, delta):
......@@ -1170,10 +1170,10 @@ cdef public class sched [ object sched_object, type sched_type ]:
cdef coro c
retry = _fifo()
while self.events.len():
e = self.events.c_top(NULL)
while len(self.events):
e = self.events.top()
if e.t <= now:
self.events.c_pop(NULL)
self.events.pop()
# two kinds of event values:
# 1) a coro (for sleep_{relative,absolute}())
# 2) a timebomb (for with_timeout())
......@@ -1198,19 +1198,19 @@ cdef public class sched [ object sched_object, type sched_type ]:
# retry all the timebombs that failed due to ScheduleError
while retry.size:
e = retry._pop()
self.events.c_insert (e.t, e)
self.events.insert (e.t, e)
cdef get_timeout_to_next_event (self, int default_timeout):
cdef uint64_t delta, now
cdef event e
# default_timeout is in seconds
now = c_rdtsc()
if self.events.len():
if len(self.events):
# 1) get time to next event
while 1:
e = self.events.c_top(NULL)
e = self.events.top()
if e.expired:
self.events.c_pop(NULL)
self.events.pop()
else:
break
if e.t < now:
......
/*
Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "Python.h"
#include <map>
typedef std::multimap <unsigned long long, PyObject *> event_queue;
typedef event_queue::iterator event_queue_iter;
/* Needed because Pyrex is compiled as a C program and is expecting C-like
symbols.
*/
extern "C" {
/*
Create a new event queue.
*/
event_queue *
event_queue_new()
{
return new event_queue;
}
/*
Delete the event queue and free all data.
*/
void
event_queue_dealloc(event_queue * q)
{
event_queue_iter iter;
for (iter = q->begin(); iter != q->end(); iter++) {
Py_DECREF(iter->second);
}
delete q;
}
/*
Returns the length of the queue.
*/
int
event_queue_len(event_queue * q)
{
return q->size();
}
/*
Returns a new iterator.
*/
event_queue_iter
event_queue_new_iter(event_queue * q)
{
return q->begin();
}
/*
Return the current value of the iterator and move the position forward.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with StopIteration set.
*/
PyObject *
event_queue_iter_next(event_queue * q, event_queue_iter * iter, uint64_t * time)
{
PyObject * value;
if (*iter == q->end()) {
PyErr_SetObject(PyExc_StopIteration, NULL);
return NULL;
} else {
if (time) {
*time = (*iter)->first;
}
value = (*iter)->second;
Py_INCREF(value);
(*iter)++;
return value;
}
}
/*
Peek at the top of the queue.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with IndexError set.
*/
PyObject *
event_queue_top(event_queue * q, uint64_t * time)
{
PyObject * value;
if (q->size()) {
event_queue_iter iter = q->begin();
if (time) {
*time = iter->first;
}
value = iter->second;
Py_INCREF(value);
return value;
} else {
PyErr_SetString(PyExc_IndexError, "top of empty queue");
return NULL;
}
}
/*
Pop the first element off the queue.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with IndexError set.
*/
PyObject *
event_queue_pop(event_queue * q, uint64_t * time)
{
PyObject * value;
if (q->size()) {
event_queue_iter iter = q->begin();
if (time) {
*time = iter->first;
}
value = iter->second;
q->erase (iter);
return value;
} else {
PyErr_SetString(PyExc_IndexError, "pop from empty queue");
return NULL;
}
}
/*
Insert a new entry into the queue.
Returns 0 on succes, -1 on failure.
(Currently never fails.)
*/
int
event_queue_insert(event_queue * q, uint64_t time, PyObject * value)
{
q->insert (std::pair <uint64_t, PyObject *> (time, value));
Py_INCREF(value);
return 0;
}
/*
Delete an entry from the queue.
Returns 0 on success, -1 on failure with IndexError set.
*/
int
event_queue_delete(event_queue * q, uint64_t time, PyObject * value)
{
event_queue_iter iter = q->find(time);
// Iterate since we support duplicate keys.
while (iter != q->end()) {
if (iter->first != time) {
break;
}
if (iter->second == value) {
Py_DECREF(iter->second);
q->erase(iter);
return 0;
} else {
iter++;
}
}
PyErr_SetString(PyExc_IndexError, "event not found");
return -1;
}
} /* extern "C" */
/*
Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef _EVENT_QUEUE_H_
#define _EVENT_QUEUE_H_
#include "Python.h"
/* Types are defined as void * to fake out Pyrex which can't handle C++ types. */
//typedef std::multimap <long long, PyObject *> event_queue;
typedef void * event_queue;
event_queue * event_queue_new(void);
void event_queue_dealloc(event_queue * q);
PyObject * event_queue_top(event_queue * q, uint64_t * time);
PyObject * event_queue_pop(event_queue * q, uint64_t * time);
int event_queue_insert(event_queue * q, uint64_t time, PyObject * value);
int event_queue_delete(event_queue * q, uint64_t time, PyObject * value);
int event_queue_len(event_queue * q);
//typedef event_queue::iterator event_queue_iter;
typedef void * event_queue_iter;
event_queue_iter event_queue_new_iter(event_queue * q);
PyObject * event_queue_iter_next(event_queue * q, event_queue_iter * iter, uint64_t * time);
#endif /* _EVENT_QUEUE_H_ */
......@@ -22,117 +22,111 @@
__event_queue_version__ = "$Id: event_queue.pyx,v 1.1 2007/01/03 00:19:50 ehuss Exp $"
cdef extern from "event_queue.h":
ctypedef void * cpp_event_queue "event_queue"
cpp_event_queue * event_queue_new()
void event_queue_dealloc(cpp_event_queue * q)
object event_queue_top(cpp_event_queue * q, uint64_t * time)
object event_queue_pop(cpp_event_queue * q, uint64_t * time)
int event_queue_insert(cpp_event_queue * q, uint64_t time, object) except -1
int event_queue_delete(cpp_event_queue * q, uint64_t time, object) except -1
int event_queue_len(cpp_event_queue * q)
ctypedef void * cpp_event_queue_iter "event_queue_iter"
cpp_event_queue_iter event_queue_new_iter(cpp_event_queue * q)
object event_queue_iter_next(cpp_event_queue * q, cpp_event_queue_iter * iter, uint64_t * time)
cdef class event_queue_iter
include "python.pxi"
from cython.operator cimport dereference as deref, preincrement as inc
from libcpp.utility cimport pair
from libc cimport uint64_t
cdef extern from "<map>" namespace "std":
cdef cppclass multimap[T, U]:
cppclass iterator:
pair[T,U]& operator*()
iterator operator++()
iterator operator--()
bint operator==(iterator)
bint operator!=(iterator)
map()
U& operator[](T&)
U& at(T&)
iterator begin()
size_t count(T&)
bint empty()
iterator end()
void erase(iterator)
void erase(iterator, iterator)
size_t erase(T&)
iterator find(T&)
pair[iterator, bint] insert(pair[T,U])
size_t size()
cdef class event_queue:
cdef cpp_event_queue * q
cdef multimap[uint64_t, PyObject*] *q
def __cinit__(self):
self.q = event_queue_new()
self.q = new multimap[uint64_t, PyObject*]()
def __dealloc__(self):
event_queue_dealloc(self.q)
cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
while it != self.q.end():
Py_DECREF(<object> deref(it).second)
inc(it)
del self.q
def __len__(self):
return event_queue_len(self.q)
cpdef insert(self, uint64_t time, value):
"""Insert a new value into the queue.
cdef int len(self):
return event_queue_len(self.q)
:Parameters:
- `time`: The uint64 time.
- `value`: The value to insert.
"""
cdef pair[uint64_t, PyObject*] p
p.first, p.second = time, <PyObject *> value
self.q.insert(p)
Py_INCREF(value)
cdef c_top(self, uint64_t * time):
return event_queue_top(self.q, time)
def __len__(self):
return self.q.size()
def top(self):
cpdef top(self):
"""Peek at the top value of the queue.
:Return:
Returns a ``(time, value)`` tuple from the top of the queue.
Returns value from the top of the queue.
:Exceptions:
- `IndexError`: The queue is empty.
"""
cdef uint64_t time
if not self.q.size():
raise IndexError('Top of empty queue')
cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
return <object> deref(it).second
value = event_queue_top(self.q, &time)
return (time, value)
cdef c_pop(self, uint64_t * time):
return event_queue_pop(self.q, time)
def pop(self):
cpdef pop(self):
"""Grab the top value of the queue and remove it.
:Return:
Returns a ``(time, value)`` tuple from the top of the queue.
Returns value from the top of the queue.
:Exceptions:
- `IndexError`: The queue is empty.
"""
cdef uint64_t time
value = event_queue_pop(self.q, &time)
return (time, value)
cdef c_insert(self, uint64_t time, value):
event_queue_insert(self.q, time, value)
def insert(self, uint64_t time, value):
"""Insert a new value into the queue.
:Parameters:
- `time`: The uint64 time.
- `value`: The value to insert.
"""
event_queue_insert(self.q, time, value)
cdef c_delete(self, uint64_t time, value):
event_queue_delete(self.q, time, value)
def delete(self, uint64_t time, value):
if not self.q.size():
raise IndexError('Top of empty queue')
cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
value = <object> deref(it).second
self.q.erase(it)
Py_DECREF(value)
return value
cpdef remove(self, uint64_t time, value):
"""Delete a value from the queue.
:Parameters:
- `time`: The uint64 time.
- `value`: The value to delete.
"""
event_queue_delete(self.q, time, value)
def __iter__(self):
cdef event_queue_iter i
i = event_queue_iter()
i.q = self.q
i.iter = event_queue_new_iter(self.q)
return i
cdef class event_queue_iter:
cdef cpp_event_queue * q
cdef cpp_event_queue_iter iter
def __iter__(self):
return self
def __next__(self):
cdef uint64_t time
value = event_queue_iter_next(self.q, &self.iter, &time)
return (time, value)
cdef PyObject *val
cdef multimap[uint64_t, PyObject*].iterator it = self.q.find(time)
cdef PyObject *v = <PyObject *> value
while it != self.q.end():
if deref(it).first != time:
break
val = <PyObject *> deref(it).second
if v == val:
self.q.erase(it)
Py_DECREF(<object>val)
return 0
else:
inc(it)
raise IndexError('Event not found')
......@@ -46,9 +46,18 @@ setup (
license = "MIT",
url = "http://github.com/ironport/shrapnel",
ext_modules = [
Extension(
'coro.event_queue',
['coro/event_queue.pyx'],
language='c++',
depends=[os.path.join(include_dir, 'pyrex', 'python.pxi'),],
pyrex_include_dirs=[
os.path.join(include_dir, '.'),
os.path.join(include_dir, 'pyrex'),
],),
Extension (
'coro._coro',
['coro/_coro.pyx', 'coro/swap.c', 'coro/event_queue.cc'],
['coro/_coro.pyx', 'coro/swap.c'],
extra_compile_args = ['-Wno-unused-function'],
depends=(glob.glob('coro/*.pyx') +
glob.glob('coro/*.pxi') +
......
# Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Unittests for event queue wrapper."""
__version__ = '$Revision: #1 $'
import unittest
import coro
import coro_unittest
class Test(unittest.TestCase):
def setUp(self):
self.q = coro.event_queue()
def test_insert(self):
data = [(3, "3"), (2, "21"), (1, "1"), (2, "22")]
res = ["1", "21", "22", "3"]
for i in data:
self.q.insert(*i)
self.assertEquals(len(data), len(self.q))
for j in res:
self.assertEquals(self.q.top(), j)
self.assertEquals(self.q.pop(), j)
def test_remove(self):
data = [(3, "3"), (2, "21"), (1, "1"), (2, "22")]
for i in data:
self.q.insert(*i)
self.assertRaises(IndexError, self.q.remove, 1, "2")
self.assertRaises(IndexError, self.q.remove, 10, "2")
for i in data:
self.q.remove(*i)
self.assertEquals(0, len(self.q))
def test_empty(self):
self.assertRaises(IndexError, self.q.top)
self.assertRaises(IndexError, self.q.pop)
self.assertRaises(IndexError, self.q.remove, 1, "2")
if __name__ == '__main__':
coro_unittest.run_tests()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment