Commit 81c84aac authored by Sam Rushing's avatar Sam Rushing

Merge pull request #29 from samrushing/modernize

Modernize pyrex -> cython
parents d466ebdb b9552455
# -*- Mode: Cython -*-
from cpython.ref cimport PyObject
from libc cimport intptr_t, memcpy, memset, off_t, time_t, timeval, uint64_t, uintptr_t
cdef extern from "Python.h":
ctypedef struct PyCodeObject:
int co_argcount
int co_nlocals
int co_stacksize
int co_flags
PyObject *co_code
PyObject *co_consts
PyObject *co_names
PyObject *co_varnames
PyObject *co_freevars
PyObject *co_cellvars
PyObject *co_filename
PyObject *co_name
int co_firstlineno
PyObject *co_lnotab
int PyCode_Addr2Line(PyCodeObject *, int)
cdef extern from "frameobject.h":
ctypedef struct PyFrameObject:
PyFrameObject *f_back
PyCodeObject *f_code
PyObject *f_builtins
PyObject *f_globals
PyObject *f_locals
PyObject *f_trace
PyObject *f_exc_type
PyObject *f_exc_value
PyObject *f_exc_traceback
int f_lasti
int f_lineno
int f_restricted
int f_iblock
int f_nlocals
int f_ncells
int f_nfreevars
int f_stacksize
cdef enum:
EVENT_SCALE = 16384
# used by the profiler
cdef struct call_stack:
# Next is NULL for the end of the linked list.
call_stack * next
void * func # Really a <function>
void * b # Really a <bench>
cdef struct machine_state:
void * stack_pointer
void * frame_pointer
void * insn_pointer
# other registers that need saving
# # x86 amd64
void * r1 # ebx rbx
void * r2 # esi r12
void * r3 # edi r13
void * r4 # r14
void * r5 # r15
# from swap.c
cdef extern int __swap (void * ts, void * fs)
cdef extern object void_as_object (void * p)
cdef extern int frame_getlineno (object frame)
cdef extern int coro_breakpoint()
cdef extern int SHRAP_STACK_PAD
cdef int default_selfishness
cdef int live_coros
cdef public class coro [ object _coro_object, type _coro_type ]:
"""The coroutine object.
Do not create this object directly. Use either :func:`new` or
:func:`spawn` to create one.
"""
cdef machine_state state
cdef object fun
cdef object args, kwargs
cdef readonly bytes name
cdef public int id
# XXX think about doing these as a bitfield/property
cdef public unsigned char dead, started, scheduled
cdef public object value
cdef void * stack_copy
cdef size_t stack_size
cdef PyFrameObject * frame
cdef void * saved_exception_data[6]
# used only by the profiler, a call_stack object. NULL if the profiler is
# not enabled or if this is the first call of the coroutine.
cdef call_stack * top
cdef int saved_recursion_depth
cdef int selfish_acts, max_selfish_acts
cdef bint compress, compressed
cdef object waiting_joiners
# Used for thread-local-storage.
cdef dict _tdict
cdef __create (self)
cdef __destroy (self)
cdef __yield (self)
cdef __resume (self, value)
cdef save_exception_data (self)
cdef restore_exception_data (self)
cdef _schedule (self, value)
cdef _unschedule (self)
cdef _die (self)
cdef __interrupt (self, the_exception)
cdef int try_selfish (self)
# choose a library for stack compression
IF COMPILE_LZ4:
include "zstack_lz4.pxd"
ELIF COMPILE_LZO:
include "zstack_lzo.pxd"
ELSE:
include "zstack_zlib.pxd"
cdef public class sched [ object sched_object, type sched_type ]:
cdef machine_state state
# this is the stack that all coroutines run on
cdef void * stack_base
cdef int stack_size
cdef public list pending, staging
cdef public object _current
cdef coro _last
cdef int profiling
cdef uint64_t latency_threshold
cdef zstack squish
cdef object events
cdef _preserve_last (self)
cdef _restore (self, coro co)
cdef _schedule (self, coro co, object value)
cdef _unschedule (self, coro co)
cdef print_latency_warning (self, coro co, uint64_t delta)
cdef sleep (self, uint64_t when)
cdef schedule_ready_events (self, uint64_t now)
cdef get_timeout_to_next_event (self, int default_timeout)
include "socket.pxd"
# XXX need pxd files for sync.pyx, poller.pyx, etc...
...@@ -30,39 +30,29 @@ variables are documented in the top level of the coro package ``__init__.py``. ...@@ -30,39 +30,29 @@ variables are documented in the top level of the coro package ``__init__.py``.
__coro_version__ = "$Id: //prod/main/ap/shrapnel/coro/_coro.pyx#114 $" __coro_version__ = "$Id: //prod/main/ap/shrapnel/coro/_coro.pyx#114 $"
DEF CORO_DEBUG = 0
DEF COMPILE_LIO = 0
DEF COMPILE_NETDEV = 0
DEF COMPILE_LZO = 0
DEF COMPILE_LZ4 = 0
import coro as coro_package import coro as coro_package
import warnings import warnings
# Only import things from libc that are very common and have unique names. # Only import things from libc that are very common and have unique names.
from libc cimport intptr_t,\ from libc cimport intptr_t, memcpy, memset, off_t, time_t, timeval, uint64_t, uintptr_t
memcpy,\
memset,\
off_t,\
time_t,\
timeval,\
uint64_t,\
uintptr_t
# ================================================================================ # ================================================================================
# a re-implementation of the IronPort coro-threading system, this time # a re-implementation of the IronPort coro-threading system, this time
# in Pyrex, and using stack copying and switching with a stripped-down # in Cython, and using stack copying and switching with a stripped-down
# version of the 'set/getcontext' API. # version of the 'set/getcontext' API.
# ================================================================================ # ================================================================================
# XXX: blame jj behrens for this. # XXX: blame jj behrens for this.
# XXX: instead of a two-stack solution, think about an n-stack solution. # XXX: instead of a two-stack solution, think about an n-stack solution.
# [the main drawback is that each coro is then tied to a particular stack] # [the main drawback is that each coro is then tied to a particular stack...
# this might be appropriate for a system that needs a small number of high-priority
# threads that never get swapped out, e.g, a TCP implementation]
# ================================================================================ # ================================================================================
# external declarations # external declarations
# ================================================================================ # ================================================================================
include "python.pxi" #include "python.pxi"
# Note that this cimports libc. # Note that this cimports libc.
include "pyrex_helpers.pyx" include "pyrex_helpers.pyx"
include "tsc_time_include.pyx" include "tsc_time_include.pyx"
...@@ -75,8 +65,17 @@ cdef extern from "stdlib.h": ...@@ -75,8 +65,17 @@ cdef extern from "stdlib.h":
ELSE: ELSE:
void srandomdev() void srandomdev()
from cpython.ref cimport Py_DECREF, Py_INCREF
from cpython.mem cimport PyMem_Free, PyMem_Malloc
from cpython.list cimport PyList_New
from cpython.bytes cimport PyBytes_FromStringAndSize
cdef extern from "Python.h": cdef extern from "Python.h":
# hack ctypedef struct PyThreadState:
PyFrameObject * frame
int recursion_depth
void * curexc_type, * curexc_value, * curexc_traceback
void * exc_type, * exc_value, * exc_traceback
PyThreadState * _PyThreadState_Current PyThreadState * _PyThreadState_Current
# ================================================================================ # ================================================================================
...@@ -91,7 +90,7 @@ _ticks_per_sec = tsc_time_module.ticks_per_sec ...@@ -91,7 +90,7 @@ _ticks_per_sec = tsc_time_module.ticks_per_sec
# to a Long. # to a Long.
ticks_per_sec = _ticks_per_sec ticks_per_sec = _ticks_per_sec
cdef object _all_threads cdef dict _all_threads
_all_threads = {} _all_threads = {}
all_threads = _all_threads all_threads = _all_threads
...@@ -151,9 +150,6 @@ include "fifo.pyx" ...@@ -151,9 +150,6 @@ include "fifo.pyx"
# coroutine/context object # coroutine/context object
# ================================================================================ # ================================================================================
cdef enum:
EVENT_SCALE = 16384
import sys import sys
class ScheduleError (Exception): class ScheduleError (Exception):
...@@ -194,7 +190,7 @@ cdef extern int coro_breakpoint() ...@@ -194,7 +190,7 @@ cdef extern int coro_breakpoint()
cdef extern int SHRAP_STACK_PAD cdef extern int SHRAP_STACK_PAD
# forward # forward
cdef public class sched [ object sched_object, type sched_type ] #cdef public class sched [ object sched_object, type sched_type ]
cdef public class queue_poller [ object queue_poller_object, type queue_poller_type ] cdef public class queue_poller [ object queue_poller_object, type queue_poller_type ]
cdef sched the_scheduler "the_scheduler" cdef sched the_scheduler "the_scheduler"
cdef queue_poller the_poller "the_poller" cdef queue_poller the_poller "the_poller"
...@@ -223,27 +219,6 @@ cdef public class coro [ object _coro_object, type _coro_type ]: ...@@ -223,27 +219,6 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
:func:`spawn` to create one. :func:`spawn` to create one.
""" """
cdef machine_state state
cdef object fun
cdef object args, kwargs
cdef readonly object name
# XXX think about doing these as a bitfield/property
cdef public int dead, started, id, scheduled
cdef public object value
cdef void * stack_copy
cdef size_t stack_size
cdef PyFrameObject * frame
cdef void * saved_exception_data[6]
# used only by the profiler, a call_stack object. NULL if the profiler is
# not enabled or if this is the first call of the coroutine.
cdef call_stack * top
cdef int saved_recursion_depth
cdef int selfish_acts, max_selfish_acts
cdef bint compress, compressed
cdef object waiting_joiners
# Used for thread-local-storage.
cdef object _tdict
def __init__ (self, fun, args, kwargs, int id, name=None): def __init__ (self, fun, args, kwargs, int id, name=None):
global live_coros global live_coros
self.fun = fun self.fun = fun
...@@ -260,7 +235,7 @@ cdef public class coro [ object _coro_object, type _coro_type ]: ...@@ -260,7 +235,7 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
self.selfish_acts = default_selfishness self.selfish_acts = default_selfishness
self.max_selfish_acts = default_selfishness self.max_selfish_acts = default_selfishness
if name is None: if name is None:
self.name = 'coro %d' % (self.id,) self.name = b'coro %d' % (self.id,)
live_coros = live_coros + 1 live_coros = live_coros + 1
def __dealloc__ (self): def __dealloc__ (self):
...@@ -731,7 +706,7 @@ cdef int get_coro_id() except -1: ...@@ -731,7 +706,7 @@ cdef int get_coro_id() except -1:
next_coro_id = next_coro_id + 1 next_coro_id = next_coro_id + 1
if next_coro_id == libc.INT_MAX: if next_coro_id == libc.INT_MAX:
next_coro_id = 1 next_coro_id = 1
if not PyDict_Contains(_all_threads, result): if not _all_threads.has_key (result):
return result return result
def default_exception_notifier(): def default_exception_notifier():
...@@ -901,16 +876,6 @@ ELSE: ...@@ -901,16 +876,6 @@ ELSE:
# ================================================================================ # ================================================================================
cdef public class sched [ object sched_object, type sched_type ]: cdef public class sched [ object sched_object, type sched_type ]:
cdef machine_state state
# this is the stack that all coroutines run on
cdef void * stack_base
cdef int stack_size
cdef public object _current, pending, staging
cdef coro _last
cdef int profiling
cdef uint64_t latency_threshold
cdef zstack squish
cdef object events
def __init__ (self, stack_size=4*1024*1024): def __init__ (self, stack_size=4*1024*1024):
self.stack_size = stack_size self.stack_size = stack_size
...@@ -1003,7 +968,7 @@ cdef public class sched [ object sched_object, type sched_type ]: ...@@ -1003,7 +968,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
raise ScheduleError, self raise ScheduleError, self
else: else:
co.scheduled = 1 co.scheduled = 1
PyList_Append (self.pending, (co, value)) self.pending.append ((co, value))
cdef _unschedule (self, coro co): cdef _unschedule (self, coro co):
"""Unschedule this coroutine. """Unschedule this coroutine.
...@@ -1014,16 +979,16 @@ cdef public class sched [ object sched_object, type sched_type ]: ...@@ -1014,16 +979,16 @@ cdef public class sched [ object sched_object, type sched_type ]:
""" """
cdef int i cdef int i
for i from 0 <= i < len(self.pending): for i from 0 <= i < len(self.pending):
co2, v2 = PySequence_GetItem (self.pending, i) co2, v2 = self.pending[i]
if co is co2: if co is co2:
PySequence_DelItem (self.pending, i) del self.pending[i]
co.scheduled = 0 co.scheduled = 0
return True return True
else: else:
for i from 0 <= i < len(self.staging): for i from 0 <= i < len(self.staging):
co2, v2 = PySequence_GetItem (self.staging, i) co2, v2 = self.staging[i]
if co is co2: if co is co2:
PySequence_SetItem (self.staging, i, (None, None)) self.staging[i] = (None, None)
co.scheduled = 0 co.scheduled = 0
return True return True
else: else:
...@@ -1128,11 +1093,11 @@ cdef public class sched [ object sched_object, type sched_type ]: ...@@ -1128,11 +1093,11 @@ cdef public class sched [ object sched_object, type sched_type ]:
self.events.insert (e.t, e) self.events.insert (e.t, e)
try: try:
try: try:
return PyObject_Call (function, args, kwargs) return function (*args, **kwargs)
except Interrupted, value: except Interrupted, value:
# is this *my* timebomb? # is this *my* timebomb?
args = value.args args = value.args
if (PyTuple_Size(args) > 0) and (PySequence_GetItem (args, 0) is tb): if len(args) > 0 and args[0] is tb:
raise TimeoutError raise TimeoutError
else: else:
raise raise
...@@ -1258,6 +1223,7 @@ cdef public class sched [ object sched_object, type sched_type ]: ...@@ -1258,6 +1223,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
cdef coro co cdef coro co
cdef uint64_t _now cdef uint64_t _now
cdef object _coro_package cdef object _coro_package
cdef tuple x
# Make a cdef reference to avoid __Pyx_GetName. # Make a cdef reference to avoid __Pyx_GetName.
_coro_package = coro_package _coro_package = coro_package
...@@ -1270,12 +1236,11 @@ cdef public class sched [ object sched_object, type sched_type ]: ...@@ -1270,12 +1236,11 @@ cdef public class sched [ object sched_object, type sched_type ]:
_coro_package.now_usec = c_ticks_to_usec(_now) _coro_package.now_usec = c_ticks_to_usec(_now)
self.schedule_ready_events (_now) self.schedule_ready_events (_now)
while 1: while 1:
if PyList_GET_SIZE (self.pending): if len(self.pending) > 0:
self.staging, self.pending = self.pending, self.staging self.staging, self.pending = self.pending, self.staging
for i from 0 <= i < PyList_GET_SIZE (self.staging): for i from 0 <= i < len (self.staging):
x = PyList_GET_ITEM_SAFE (self.staging, i) x = self.staging[i]
co = PyTuple_GET_ITEM_SAFE (x, 0) co, value = x
value = PyTuple_GET_ITEM_SAFE (x, 1)
# co may be None if it was unscheduled. # co may be None if it was unscheduled.
if co is not None: if co is not None:
#W ('resuming %d: #%d\n' % (i, co.id)) #W ('resuming %d: #%d\n' % (i, co.id))
...@@ -1465,19 +1430,21 @@ cdef void info(int sig): ...@@ -1465,19 +1430,21 @@ cdef void info(int sig):
co = the_scheduler._current co = the_scheduler._current
frame = _PyThreadState_Current.frame frame = _PyThreadState_Current.frame
if co: if co:
libc.fprintf(libc.stderr, 'coro %i "%s" at %s: %s %i\n', libc.fprintf (
libc.stderr, 'coro %i "%s" at %s: %s %i\n',
co.id, co.id,
PyString_AsString (co.name), co.name,
PyString_AsString (<object>frame.f_code.co_filename), <bytes>frame.f_code.co_filename,
PyString_AsString (<object>frame.f_code.co_name), <bytes>frame.f_code.co_name,
PyCode_Addr2Line (frame.f_code, frame.f_lasti) PyCode_Addr2Line (frame.f_code, frame.f_lasti)
) )
else: else:
libc.fprintf(libc.stderr, 'No current coro. %s: %s %i\n', libc.fprintf (
PyString_AsString (<object>frame.f_code.co_filename), libc.stderr, 'No current coro. %s: %s %i\n',
PyString_AsString (<object>frame.f_code.co_name), <bytes>frame.f_code.co_filename,
<bytes>frame.f_code.co_name,
PyCode_Addr2Line (frame.f_code, frame.f_lasti) PyCode_Addr2Line (frame.f_code, frame.f_lasti)
) )
event_loop = the_scheduler.event_loop event_loop = the_scheduler.event_loop
with_timeout = the_scheduler.with_timeout with_timeout = the_scheduler.with_timeout
......
...@@ -130,12 +130,12 @@ cdef class ThreadLocal: ...@@ -130,12 +130,12 @@ cdef class ThreadLocal:
cdef coro co cdef coro co
# _all_threads.itervalues() might be better here. # _all_threads.itervalues() might be better here.
for co in PyDict_Values(_all_threads): for co in _all_threads.values():
if co._tdict is not None: if co._tdict is not None:
# Avoiding exceptions for performance. # Avoiding exceptions for performance.
# Would be much better to just call PyDict_DelItem, but we've # Would be much better to just call PyDict_DelItem, but we've
# defined it with except -1. # defined it with except -1.
if PyDict_Contains(co._tdict, self.key): if co._tdict.has_key (self.key):
del co._tdict[self.key] del co._tdict[self.key]
property __dict__: property __dict__:
......
...@@ -269,7 +269,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -269,7 +269,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
cdef kevent * change_list cdef kevent * change_list
cdef int change_list_index cdef int change_list_index
cdef int kq_fd cdef int kq_fd
cdef object event_map cdef dict event_map
def __cinit__ (self): def __cinit__ (self):
# XXX EVENT_SCALE should be a parameter. # XXX EVENT_SCALE should be a parameter.
...@@ -298,7 +298,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -298,7 +298,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
cdef coro me cdef coro me
cdef kevent_target kt cdef kevent_target kt
if self.change_list_index < EVENT_SCALE: if self.change_list_index < EVENT_SCALE:
if PyDict_Contains (self.event_map, kk): if self.event_map.has_key (kk):
# Should be impossible to have KeyError due to previous line. # Should be impossible to have KeyError due to previous line.
kt = self.event_map[kk] kt = self.event_map[kk]
raise SimultaneousError (the_scheduler._current, kt.target, kk) raise SimultaneousError (the_scheduler._current, kt.target, kk)
...@@ -397,7 +397,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -397,7 +397,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
if r < 0: if r < 0:
raise_oserror() raise_oserror()
def set_handler (self, object event, object handler, int flags=(EV_ADD|EV_ONESHOT), unsigned int fflags=0): def set_handler (self, tuple event, object handler, int flags=(EV_ADD|EV_ONESHOT), unsigned int fflags=0):
"""Add a kevent handler. """Add a kevent handler.
This is a low-level interface to register a kevent handler. This is a low-level interface to register a kevent handler.
...@@ -419,11 +419,10 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -419,11 +419,10 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
cdef kevent_key kk cdef kevent_key kk
assert callable(handler) assert callable(handler)
ident = PySequence_GetItem(event, 0) ident, filter = event
filter = PySequence_GetItem(event, 1)
kk = kevent_key (filter, ident) kk = kevent_key (filter, ident)
# for kqueue, event == (ident, filter) # for kqueue, event == (ident, filter)
if PyDict_Contains (self.event_map, kk): if self.event_map.has_key (kk):
# Should be impossible to have KeyError due to previous line. # Should be impossible to have KeyError due to previous line.
kt = self.event_map[kk] kt = self.event_map[kk]
raise SimultaneousError (the_scheduler._current, kt.target, kk) raise SimultaneousError (the_scheduler._current, kt.target, kk)
...@@ -437,13 +436,12 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -437,13 +436,12 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
else: else:
raise SystemError, "too many kevents in change_list" raise SystemError, "too many kevents in change_list"
cdef set_event_target (self, object event, kevent_target kt): cdef set_event_target (self, tuple event, kevent_target kt):
cdef short filter cdef short filter
cdef uintptr_t ident cdef uintptr_t ident
cdef kevent_key kk cdef kevent_key kk
ident = PySequence_GetItem(event, 0) ident, filter = event
filter = PySequence_GetItem(event, 1)
kk = kevent_key (filter, ident) kk = kevent_key (filter, ident)
self.event_map[kk] = kt self.event_map[kk] = kt
...@@ -453,7 +451,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -453,7 +451,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
cdef kevent_key kk cdef kevent_key kk
kk = kevent_key (EVFILT_READ, fd) kk = kevent_key (EVFILT_READ, fd)
if PyDict_Contains(self.event_map, kk): if self.event_map.has_key (kk):
kt = self.event_map[kk] kt = self.event_map[kk]
kt.flags = kt.flags | KTARGET_CLOSED kt.flags = kt.flags | KTARGET_CLOSED
del self.event_map[kk] del self.event_map[kk]
...@@ -465,7 +463,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t ...@@ -465,7 +463,7 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
W ('notify_of_close (%d) [read]: unable to interrupt thread: %r\n' % (fd, co)) W ('notify_of_close (%d) [read]: unable to interrupt thread: %r\n' % (fd, co))
kk = kevent_key (EVFILT_WRITE, fd) kk = kevent_key (EVFILT_WRITE, fd)
if PyDict_Contains(self.event_map, kk): if self.event_map.has_key (kk):
kt = self.event_map[kk] kt = self.event_map[kk]
kt.flags = kt.flags | KTARGET_CLOSED kt.flags = kt.flags | KTARGET_CLOSED
del self.event_map[kk] del self.event_map[kk]
......
...@@ -133,7 +133,7 @@ cdef class internal_function(function): ...@@ -133,7 +133,7 @@ cdef class internal_function(function):
cdef object name cdef object name
def __hash__(self): def __hash__(self):
return PyObject_Hash(self.name) return hash (self.name)
def __cmp__(x, y): def __cmp__(x, y):
if not isinstance(x, function) or not isinstance(y, function): if not isinstance(x, function) or not isinstance(y, function):
...@@ -169,7 +169,7 @@ cdef class python_function(function): ...@@ -169,7 +169,7 @@ cdef class python_function(function):
cdef object code cdef object code
def __hash__(self): def __hash__(self):
return PyObject_Hash(self.code) return hash (self.code)
def __cmp__(x, y): def __cmp__(x, y):
if not isinstance(x, function) or not isinstance(y, function): if not isinstance(x, function) or not isinstance(y, function):
...@@ -216,7 +216,7 @@ cdef class c_function(function): ...@@ -216,7 +216,7 @@ cdef class c_function(function):
cdef uintptr_t func_addr cdef uintptr_t func_addr
def __hash__(self): def __hash__(self):
return PyObject_Hash(self.func_addr) return hash (self.func_addr)
def __cmp__(x, y): def __cmp__(x, y):
cdef uintptr_t a,b cdef uintptr_t a,b
...@@ -559,7 +559,8 @@ cdef class _profiler: ...@@ -559,7 +559,8 @@ cdef class _profiler:
""" """
cdef bench counter, main_bench, wait_bench cdef bench counter, main_bench, wait_bench
cdef public object charges, call_counts, bench_class, func_cache cdef public dict charges, call_counts, func_cache
cdef public object bench_class
cdef uint64_t start_ticks cdef uint64_t start_ticks
def __init__ (self, bench_class): def __init__ (self, bench_class):
...@@ -619,7 +620,7 @@ cdef class _profiler: ...@@ -619,7 +620,7 @@ cdef class _profiler:
self.counter.mark(<bench>top.b) self.counter.mark(<bench>top.b)
if PyDict_Contains (self.charges, <function>top.func): if self.charges.has_key (<function>top.func):
b = self.charges[<function>top.func] b = self.charges[<function>top.func]
else: else:
b = self.bench_class() b = self.bench_class()
...@@ -659,7 +660,7 @@ cdef class _profiler: ...@@ -659,7 +660,7 @@ cdef class _profiler:
:Return: :Return:
Returns a `call_counts_object`. Returns a `call_counts_object`.
""" """
if PyDict_Contains (self.call_counts, func) == 0: if not self.call_counts.has_key (func):
cc = call_counts_object() cc = call_counts_object()
self.call_counts[func] = cc self.call_counts[func] = cc
return cc return cc
...@@ -686,7 +687,7 @@ cdef class _profiler: ...@@ -686,7 +687,7 @@ cdef class _profiler:
Returns a `python_function` object. Returns a `python_function` object.
""" """
code_ptr = <intptr_t><void *> code code_ptr = <intptr_t><void *> code
if PyDict_Contains (self.func_cache, code_ptr): if self.func_cache.has_key (code_ptr):
return self.func_cache[code_ptr] return self.func_cache[code_ptr]
else: else:
func = new_python_function(code) func = new_python_function(code)
...@@ -705,7 +706,7 @@ cdef class _profiler: ...@@ -705,7 +706,7 @@ cdef class _profiler:
Returns a `c_function` object. Returns a `c_function` object.
""" """
c_func_ptr = <intptr_t>PyCFunction_GetFunction(c_func) c_func_ptr = <intptr_t>PyCFunction_GetFunction(c_func)
if PyDict_Contains (self.func_cache, c_func_ptr): if self.func_cache.has_key (c_func_ptr):
return self.func_cache[c_func_ptr] return self.func_cache[c_func_ptr]
else: else:
func = new_c_function(c_func) func = new_c_function(c_func)
......
# -*- Mode: Cython -*-
IF UNAME_SYSNAME == "Linux":
cdef extern from "stdint.h":
ctypedef unsigned char uint8_t
ctypedef unsigned short uint16_t
ctypedef unsigned int uint32_t
ELSE:
from libc cimport uint8_t, uint16_t, uint32_t
cdef extern from "netinet/in.h":
IF UNAME_SYSNAME == "Linux":
cdef struct in_addr:
uint32_t s_addr
cdef struct sockaddr_in:
short sin_family
unsigned short sin_port
in_addr sin_addr
char sin_zero[8]
ELSE:
pass
cdef extern from "sys/un.h":
IF UNAME_SYSNAME == "Linux":
cdef struct sockaddr_un:
short sun_family
char sun_path[104]
ELSE:
pass
cdef extern from "arpa/inet.h":
cdef enum:
INET_ADDRSTRLEN
INET6_ADDRSTRLEN
int htons (int)
int htonl (int)
int ntohl (int)
int ntohs (int)
cdef extern from "sys/socket.h":
int AF_UNSPEC, AF_INET, AF_INET6, AF_UNIX
int SOCK_STREAM, SOCK_DGRAM, SOL_SOCKET, INADDR_ANY
int SHUT_RD, SHUT_WR, SHUT_RDWR
int SO_DEBUG, SO_REUSEADDR, SO_KEEPALIVE, SO_DONTROUTE, SO_LINGER
int SO_BROADCAST, SO_OOBINLINE, SO_SNDBUF, SO_RCVBUF, SO_SNDLOWAT
int SO_RCVLOWAT, SO_SNDTIMEO, SO_RCVTIMEO, SO_TYPE, SO_ERROR
IF UNAME_SYSNAME == "FreeBSD":
int SO_REUSEPORT, SO_ACCEPTFILTER
int SO_DONTROUTE, SO_LINGER, SO_BROADCAST, SO_OOBINLINE, SO_SNDBUF
int SO_REUSEADDR, SO_DEBUG, SO_RCVBUF, SO_SNDLOWAT, SO_RCVLOWAT
int SO_SNDTIMEO, SO_RCVTIMEO, SO_KEEPALIVE, SO_TYPE, SO_ERROR
ctypedef unsigned int sa_family_t
ctypedef unsigned int in_port_t
ctypedef unsigned int in_addr_t
ctypedef unsigned int socklen_t
cdef struct in_addr:
in_addr_t s_addr
union ip__u6_addr:
uint8_t __u6_addr8[16]
uint16_t __u6_addr16[8]
uint32_t __u6_addr32[4]
struct in6_addr:
ip__u6_addr __u6_addr
IF UNAME_SYSNAME == "FreeBSD" or UNAME_SYSNAME == "Darwin":
cdef struct sockaddr:
unsigned char sa_len
sa_family_t sa_family
char sa_data[250]
cdef struct sockaddr_in:
unsigned char sin_len
sa_family_t sin_family
in_port_t sin_port
in_addr sin_addr
char sin_zero[8]
cdef struct sockaddr_in6:
unsigned char sin6_len
sa_family_t sin6_family
in_port_t sin6_port
unsigned int sin6_flowinfo
in6_addr sin6_addr
unsigned int sin6_scope_id
cdef struct sockaddr_un:
unsigned char sun_len
sa_family_t sun_family
char sun_path[104]
cdef struct sockaddr_storage:
unsigned char sa_len
sa_family_t sa_family
ELSE:
cdef struct sockaddr:
sa_family_t sa_family
char sa_data[250]
cdef struct sockaddr_in:
sa_family_t sin_family
unsigned short sin_port
in_addr sin_addr
char sa_data[250]
cdef struct sockaddr_in6:
sa_family_t sin6_family
unsigned short sin6_port
in6_addr sin6_addr
char sa_data[250]
cdef struct sockaddr_storage:
sa_family_t sa_family
char sa_data[250]
int socket (int domain, int type, int protocol)
int connect (int fd, sockaddr * addr, socklen_t addr_len)
int accept (int fd, sockaddr * addr, socklen_t * addr_len)
int bind (int fd, sockaddr * addr, socklen_t addr_len)
int listen (int fd, int backlog)
int shutdown (int fd, int how)
int close (int fd)
int getsockopt (int fd, int level, int optname, void * optval, socklen_t * optlen)
int setsockopt (int fd, int level, int optname, void * optval, socklen_t optlen)
int getpeername (int fd, sockaddr * name, socklen_t * namelen)
int getsockname (int fd, sockaddr * name, socklen_t * namelen)
int sendto (int fd, void * buf, size_t len, int flags, sockaddr * addr, socklen_t addr_len)
int send (int fd, void * buf, size_t len, int flags)
int recv (int fd, void * buf, size_t len, int flags)
int recvfrom (int fd, void * buf, size_t len, int flags, sockaddr * addr, socklen_t * addr_len)
int _c_socketpair "socketpair" (int d, int type, int protocol, int *sv)
int inet_pton (int af, char *src, void *dst)
char *inet_ntop (int af, void *src, char *dst, socklen_t size)
char * inet_ntoa (in_addr pin)
int inet_aton (char * cp, in_addr * pin)
cdef extern from "sys/uio.h":
cdef struct iovec:
void * iov_base
size_t iov_len
cdef extern from "unistd.h":
size_t write (int fd, char * buf, size_t nbytes)
size_t read (int fd, char * buf, size_t nbytes)
size_t writev(int d, iovec *iov, int iovcnt)
size_t readv (int d, iovec *iov, int iovcnt)
cdef extern from "fcntl.h":
int fcntl (int fd, int cmd, ...)
int F_GETFL, O_NONBLOCK, F_SETFL
cdef public class sock [ object sock_object, type sock_type ]:
cdef public int fd, orig_fd, domain, stype
#def __init__ (self, int domain=AF_INET, int stype=SOCK_STREAM, int protocol=0, int fd=-1)
cdef int _try_selfish(self) except -1
cdef _set_reuse_addr (self)
cdef set_nonblocking (self)
cdef parse_address (self, object address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve=?)
cdef parse_address_inet (self, tuple address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve)
cdef parse_address_inet6 (self, tuple address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve)
cdef parse_address_unix (self, bytes address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve)
cdef object unparse_address (self, sockaddr_storage *sa, socklen_t addr_len)
cdef _wait_for_read (self)
cdef _wait_for_write (self)
cpdef connect_addr (self, address, bint resolve=?)
cpdef connect (self, address)
cpdef recv (self, int buffer_size)
cpdef read (self, int buffer_size)
cpdef recvfrom (self, int buffer_size, int flags=?)
cpdef recv_exact (self, int bytes)
cpdef readv (self, list size_list)
cpdef writev (self, list data)
# XXX is there a cpython.type for buffer objects?
IF False:
cpdef recv_into (self, buffer, int nbytes=?, int flags=?)
cpdef recvfrom_into(self, buffer, int nbytes=?, int flags=?)
cpdef bind (self, address)
cpdef listen (self, int backlog)
cpdef accept (self)
cpdef accept_many (self, int max=?)
cpdef shutdown (self, int how)
cpdef getpeername (self)
cpdef getsockname (self)
cpdef dup(self)
cdef class file_sock (sock):
cdef object _fileobj
...@@ -31,147 +31,9 @@ __socket_version__ = "$Id: //prod/main/ap/shrapnel/coro/socket.pyx#57 $" ...@@ -31,147 +31,9 @@ __socket_version__ = "$Id: //prod/main/ap/shrapnel/coro/socket.pyx#57 $"
import socket as __socketmodule import socket as __socketmodule
IF UNAME_SYSNAME == "Linux": from cpython.int cimport PyInt_Check
cdef extern from "stdint.h": from cpython.bytes cimport PyBytes_Size
ctypedef unsigned char uint8_t from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM, PyTuple_GET_ITEM
ctypedef unsigned short uint16_t
ctypedef unsigned int uint32_t
ELSE:
from libc cimport uint8_t, uint16_t, uint32_t
cdef extern from "netinet/in.h":
IF UNAME_SYSNAME == "Linux":
cdef struct in_addr:
uint32_t s_addr
cdef struct sockaddr_in:
short sin_family
unsigned short sin_port
in_addr sin_addr
char sin_zero[8]
ELSE:
pass
cdef extern from "sys/un.h":
IF UNAME_SYSNAME == "Linux":
cdef struct sockaddr_un:
short sun_family
char sun_path[104]
ELSE:
pass
cdef extern from "arpa/inet.h":
cdef enum:
INET_ADDRSTRLEN
INET6_ADDRSTRLEN
int htons (int)
int htonl (int)
int ntohl (int)
int ntohs (int)
cdef extern from "sys/socket.h":
int AF_UNSPEC, AF_INET, AF_INET6, AF_UNIX
int SOCK_STREAM, SOCK_DGRAM, SOL_SOCKET, INADDR_ANY
int SHUT_RD, SHUT_WR, SHUT_RDWR
int SO_DEBUG, SO_REUSEADDR, SO_KEEPALIVE, SO_DONTROUTE, SO_LINGER
int SO_BROADCAST, SO_OOBINLINE, SO_SNDBUF, SO_RCVBUF, SO_SNDLOWAT
int SO_RCVLOWAT, SO_SNDTIMEO, SO_RCVTIMEO, SO_TYPE, SO_ERROR
IF UNAME_SYSNAME == "FreeBSD":
int SO_REUSEPORT, SO_ACCEPTFILTER
int SO_DONTROUTE, SO_LINGER, SO_BROADCAST, SO_OOBINLINE, SO_SNDBUF
int SO_REUSEADDR, SO_DEBUG, SO_RCVBUF, SO_SNDLOWAT, SO_RCVLOWAT
int SO_SNDTIMEO, SO_RCVTIMEO, SO_KEEPALIVE, SO_TYPE, SO_ERROR
ctypedef unsigned int sa_family_t
ctypedef unsigned int in_port_t
ctypedef unsigned int in_addr_t
ctypedef unsigned int socklen_t
cdef struct in_addr:
in_addr_t s_addr
union ip__u6_addr:
uint8_t __u6_addr8[16]
uint16_t __u6_addr16[8]
uint32_t __u6_addr32[4]
struct in6_addr:
ip__u6_addr __u6_addr
IF UNAME_SYSNAME == "FreeBSD" or UNAME_SYSNAME == "Darwin":
cdef struct sockaddr:
unsigned char sa_len
sa_family_t sa_family
char sa_data[250]
cdef struct sockaddr_in:
unsigned char sin_len
sa_family_t sin_family
in_port_t sin_port
in_addr sin_addr
char sin_zero[8]
cdef struct sockaddr_in6:
unsigned char sin6_len
sa_family_t sin6_family
in_port_t sin6_port
unsigned int sin6_flowinfo
in6_addr sin6_addr
unsigned int sin6_scope_id
cdef struct sockaddr_un:
unsigned char sun_len
sa_family_t sun_family
char sun_path[104]
cdef struct sockaddr_storage:
unsigned char sa_len
sa_family_t sa_family
ELSE:
cdef struct sockaddr:
sa_family_t sa_family
char sa_data[250]
cdef struct sockaddr_in:
sa_family_t sin_family
unsigned short sin_port
in_addr sin_addr
char sa_data[250]
cdef struct sockaddr_in6:
sa_family_t sin6_family
unsigned short sin6_port
in6_addr sin6_addr
char sa_data[250]
cdef struct sockaddr_storage:
sa_family_t sa_family
char sa_data[250]
int socket (int domain, int type, int protocol)
int connect (int fd, sockaddr * addr, socklen_t addr_len)
int accept (int fd, sockaddr * addr, socklen_t * addr_len)
int bind (int fd, sockaddr * addr, socklen_t addr_len)
int listen (int fd, int backlog)
int shutdown (int fd, int how)
int close (int fd)
int getsockopt (int fd, int level, int optname, void * optval, socklen_t * optlen)
int setsockopt (int fd, int level, int optname, void * optval, socklen_t optlen)
int getpeername (int fd, sockaddr * name, socklen_t * namelen)
int getsockname (int fd, sockaddr * name, socklen_t * namelen)
int sendto (int fd, void * buf, size_t len, int flags, sockaddr * addr, socklen_t addr_len)
int send (int fd, void * buf, size_t len, int flags)
int recv (int fd, void * buf, size_t len, int flags)
int recvfrom (int fd, void * buf, size_t len, int flags, sockaddr * addr, socklen_t * addr_len)
int _c_socketpair "socketpair" (int d, int type, int protocol, int *sv)
int inet_pton (int af, char *src, void *dst)
char *inet_ntop (int af, void *src, char *dst, socklen_t size)
char * inet_ntoa (in_addr pin)
int inet_aton (char * cp, in_addr * pin)
cdef int min (int a, int b): cdef int min (int a, int b):
if a < b: if a < b:
...@@ -179,27 +41,12 @@ cdef int min (int a, int b): ...@@ -179,27 +41,12 @@ cdef int min (int a, int b):
else: else:
return b return b
cdef extern from "sys/uio.h":
cdef struct iovec:
void * iov_base
size_t iov_len
cdef extern from "unistd.h":
size_t write (int fd, char * buf, size_t nbytes)
size_t read (int fd, char * buf, size_t nbytes)
size_t writev(int d, iovec *iov, int iovcnt)
size_t readv (int d, iovec *iov, int iovcnt)
cdef extern from "fcntl.h":
int fcntl (int fd, int cmd, ...)
int F_GETFL, O_NONBLOCK, F_SETFL
# Number of socket objects. Note that this also includes closed socket objects. # Number of socket objects. Note that this also includes closed socket objects.
cdef int live_sockets cdef int live_sockets
live_sockets = 0 live_sockets = 0
cdef _readv_compute(size_list, buffer_tuple, int n, int received, iovec * iov, cdef _readv_compute(list size_list, tuple buffer_tuple, int n, int received, iovec * iov,
int * left, int * iov_pos, int * complete_index, int * partial_size): int * left, int * iov_pos, int * complete_index, int * partial_size):
"""Compute the IO Vector for the readv method. """Compute the IO Vector for the readv method.
...@@ -225,13 +72,14 @@ cdef _readv_compute(size_list, buffer_tuple, int n, int received, iovec * iov, ...@@ -225,13 +72,14 @@ cdef _readv_compute(size_list, buffer_tuple, int n, int received, iovec * iov,
""" """
cdef int i cdef int i
cdef int size cdef int size
cdef bytes buffer
iov_pos[0] = 0 iov_pos[0] = 0
left[0] = 0 left[0] = 0
complete_index[0] = -1 complete_index[0] = -1
partial_size[0] = -1 partial_size[0] = -1
for i from 0 <= i < n: for i from 0 <= i < n:
size = PySequence_GetItem(size_list, i) size = size_list[i]
buffer = PyTuple_GET_ITEM(buffer_tuple, i) buffer = buffer_tuple[i]
Py_INCREF(buffer) Py_INCREF(buffer)
if received >= left[0] + size: if received >= left[0] + size:
# This buffer has been completely received. # This buffer has been completely received.
...@@ -239,18 +87,17 @@ cdef _readv_compute(size_list, buffer_tuple, int n, int received, iovec * iov, ...@@ -239,18 +87,17 @@ cdef _readv_compute(size_list, buffer_tuple, int n, int received, iovec * iov,
elif received > left[0]: elif received > left[0]:
# This buffer has been partially received. # This buffer has been partially received.
partial_size[0] = (received - left[0]) partial_size[0] = (received - left[0])
iov[iov_pos[0]].iov_base = <void *> PyString_AS_STRING(buffer) + partial_size[0] iov[iov_pos[0]].iov_base = <void *> buffer + partial_size[0]
iov[iov_pos[0]].iov_len = size - partial_size[0] iov[iov_pos[0]].iov_len = size - partial_size[0]
iov_pos[0] = iov_pos[0] + 1 iov_pos[0] = iov_pos[0] + 1
else: else:
# This buffer still needs data. # This buffer still needs data.
iov[iov_pos[0]].iov_base = <void *> PyString_AS_STRING(buffer) iov[iov_pos[0]].iov_base = <void *> buffer
iov[iov_pos[0]].iov_len = size iov[iov_pos[0]].iov_len = size
iov_pos[0] = iov_pos[0] + 1 iov_pos[0] = iov_pos[0] + 1
left[0] = left[0] + size left[0] = left[0] + size
left[0] = left[0] - received left[0] = left[0] - received
cdef public class sock [ object sock_object, type sock_type ]: cdef public class sock [ object sock_object, type sock_type ]:
""" """
...@@ -277,7 +124,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -277,7 +124,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
:ivar stype: The socket type (SOCK_STREAM, SOCK_DGRAM) :ivar stype: The socket type (SOCK_STREAM, SOCK_DGRAM)
""" """
cdef public int fd, orig_fd, domain, stype #cdef public int fd, orig_fd, domain, stype
def __init__ (self, int domain=AF_INET, int stype=SOCK_STREAM, int protocol=0, int fd=-1): def __init__ (self, int domain=AF_INET, int stype=SOCK_STREAM, int protocol=0, int fd=-1):
global live_sockets global live_sockets
...@@ -372,6 +219,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -372,6 +219,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
""" """
cdef int flag, r cdef int flag, r
cdef socklen_t flagsize cdef socklen_t flagsize
cdef bytes s
if buflen == 0: if buflen == 0:
flag = 0 flag = 0
flagsize = sizeof (flag) flagsize = sizeof (flag)
...@@ -381,12 +229,12 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -381,12 +229,12 @@ cdef public class sock [ object sock_object, type sock_type ]:
else: else:
return flag return flag
else: else:
s = PyString_FromStringAndSize (NULL, buflen) s = PyBytes_FromStringAndSize (NULL, buflen)
r = getsockopt (self.fd, level, optname, <void*>PyString_AS_STRING (s), &buflen) r = getsockopt (self.fd, level, optname, <void*>s, &buflen)
if r == -1: if r == -1:
raise_oserror() raise_oserror()
else: else:
return PyString_FromStringAndSize (PyString_AS_STRING (s), buflen) return PyBytes_FromStringAndSize (s, buflen)
def setsockopt (self, int level, int optname, value): def setsockopt (self, int level, int optname, value):
"""Set a socket option. """Set a socket option.
...@@ -403,8 +251,8 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -403,8 +251,8 @@ cdef public class sock [ object sock_object, type sock_type ]:
flag = value flag = value
r = setsockopt (self.fd, level, optname, <void*>&flag, sizeof (flag)) r = setsockopt (self.fd, level, optname, <void*>&flag, sizeof (flag))
else: else:
optlen = PyString_Size (value) # does typecheck optlen = PyBytes_Size (value) # does typecheck
r = setsockopt (self.fd, level, optname, <void*>PyString_AS_STRING (value), optlen) r = setsockopt (self.fd, level, optname, <void*>value, optlen)
if r == -1: if r == -1:
raise_oserror() raise_oserror()
...@@ -427,7 +275,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -427,7 +275,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
else: else:
raise_oserror() raise_oserror()
def send (self, data): def send (self, bytes data):
"""Send data on the socket. """Send data on the socket.
This will repeatedly call write to ensure all data has been sent. This This will repeatedly call write to ensure all data has been sent. This
...@@ -444,8 +292,8 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -444,8 +292,8 @@ cdef public class sock [ object sock_object, type sock_type ]:
cdef int r, left, sent cdef int r, left, sent
sent = 0 sent = 0
left = PyString_Size (data) left = len(data)
buffer = PyString_AS_STRING (data) buffer = data
while left > 0: while left > 0:
if self._try_selfish() == 1: if self._try_selfish() == 1:
r = write (self.fd, buffer, left) r = write (self.fd, buffer, left)
...@@ -481,7 +329,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -481,7 +329,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
""" """
return self.send(data) return self.send(data)
def sendto (self, data, address, int flags=0): def sendto (self, bytes data, address, int flags=0):
"""Send data to a specific address. """Send data to a specific address.
:param data: The data to send. :param data: The data to send.
...@@ -507,7 +355,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -507,7 +355,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
buffer = data buffer = data
while 1: while 1:
if self._try_selfish() == 1: if self._try_selfish() == 1:
r = sendto (self.fd, buffer, PyString_Size (data), flags, <sockaddr*>&sa, addr_len) r = sendto (self.fd, buffer, len(data), flags, <sockaddr*>&sa, addr_len)
else: else:
r = -1 r = -1
libc.errno = libc.EWOULDBLOCK libc.errno = libc.EWOULDBLOCK
...@@ -521,7 +369,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -521,7 +369,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
else: else:
return r return r
def recv (self, int buffer_size): cpdef recv (self, int buffer_size):
"""Receive data. """Receive data.
This may return less data than you request if the socket buffer is not This may return less data than you request if the socket buffer is not
...@@ -535,14 +383,14 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -535,14 +383,14 @@ cdef public class sock [ object sock_object, type sock_type ]:
:raises OSError: OS-level error. :raises OSError: OS-level error.
""" """
cdef buffer cdef bytes buffer
cdef int r, new_buffer_size cdef int r, new_buffer_size
buffer = PyString_FromStringAndSize (NULL, buffer_size) buffer = PyBytes_FromStringAndSize (NULL, buffer_size)
while 1: while 1:
if self._try_selfish() == 1: if self._try_selfish() == 1:
#r = recv (self.fd, PyString_AS_STRING (buffer), buffer_size, 0) #r = recv (self.fd, buffer, buffer_size, 0)
r = read (self.fd, PyString_AS_STRING (buffer), buffer_size) r = read (self.fd, buffer, buffer_size)
else: else:
r = -1 r = -1
libc.errno = libc.EWOULDBLOCK libc.errno = libc.EWOULDBLOCK
...@@ -551,7 +399,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -551,7 +399,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
# kqueue will tell us exactly how many bytes are waiting for us. # kqueue will tell us exactly how many bytes are waiting for us.
new_buffer_size = min (self._wait_for_read(), buffer_size) new_buffer_size = min (self._wait_for_read(), buffer_size)
if new_buffer_size != buffer_size: if new_buffer_size != buffer_size:
buffer = PyString_FromStringAndSize (NULL, new_buffer_size) buffer = PyBytes_FromStringAndSize (NULL, new_buffer_size)
buffer_size = new_buffer_size buffer_size = new_buffer_size
else: else:
raise_oserror() raise_oserror()
...@@ -560,16 +408,16 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -560,16 +408,16 @@ cdef public class sock [ object sock_object, type sock_type ]:
elif r == buffer_size: elif r == buffer_size:
return buffer return buffer
else: else:
return PyString_FromStringAndSize (PyString_AS_STRING (buffer), r) return PyBytes_FromStringAndSize (buffer, r)
def read (self, buffer_size): cpdef read (self, int buffer_size):
"""Read data. """Read data.
This is an alias for the :meth:`recv` method. This is an alias for the :meth:`recv` method.
""" """
return self.recv(buffer_size) return self.recv (buffer_size)
def recvfrom (self, int buffer_size, int flags=0): cpdef recvfrom (self, int buffer_size, int flags=0):
"""Receive data. """Receive data.
This may return less data than you request if the socket buffer is not This may return less data than you request if the socket buffer is not
...@@ -587,24 +435,17 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -587,24 +435,17 @@ cdef public class sock [ object sock_object, type sock_type ]:
:raises OSError: OS-level error. :raises OSError: OS-level error.
""" """
cdef buffer cdef bytes buffer
cdef sockaddr_storage sa cdef sockaddr_storage sa
cdef int r, new_buffer_size cdef int r, new_buffer_size
cdef socklen_t addr_len cdef socklen_t addr_len
buffer = PyString_FromStringAndSize (NULL, buffer_size) buffer = PyBytes_FromStringAndSize (NULL, buffer_size)
while 1: while 1:
if self._try_selfish() == 1: if self._try_selfish() == 1:
addr_len = sizeof (sockaddr_storage) addr_len = sizeof (sockaddr_storage)
memset (&sa, 0, sizeof (sockaddr_storage)) memset (&sa, 0, sizeof (sockaddr_storage))
r = recvfrom ( r = recvfrom (self.fd, <void*>buffer, buffer_size, flags, <sockaddr*>&sa, &addr_len)
self.fd,
PyString_AS_STRING (buffer),
buffer_size,
flags,
<sockaddr*>&sa,
&addr_len
)
else: else:
r = -1 r = -1
libc.errno = libc.EWOULDBLOCK libc.errno = libc.EWOULDBLOCK
...@@ -613,7 +454,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -613,7 +454,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
# kqueue will tell us exactly how many bytes are waiting for us. # kqueue will tell us exactly how many bytes are waiting for us.
new_buffer_size = min (self._wait_for_read(), buffer_size) new_buffer_size = min (self._wait_for_read(), buffer_size)
if new_buffer_size != buffer_size: if new_buffer_size != buffer_size:
buffer = PyString_FromStringAndSize (NULL, new_buffer_size) buffer = PyBytes_FromStringAndSize (NULL, new_buffer_size)
buffer_size = new_buffer_size buffer_size = new_buffer_size
else: else:
raise_oserror() raise_oserror()
...@@ -624,10 +465,10 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -624,10 +465,10 @@ cdef public class sock [ object sock_object, type sock_type ]:
elif r == buffer_size: elif r == buffer_size:
result = buffer result = buffer
else: else:
result = PyString_FromStringAndSize (PyString_AS_STRING (buffer), r) result = PyBytes_FromStringAndSize (buffer, r)
return (result, address) return (result, address)
def recv_exact (self, int bytes): cpdef recv_exact (self, int nbytes):
"""Receive exactly the number of bytes requested. """Receive exactly the number of bytes requested.
This will repeatedly call read until all data is received. This will repeatedly call read until all data is received.
...@@ -642,13 +483,14 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -642,13 +483,14 @@ cdef public class sock [ object sock_object, type sock_type ]:
""" """
cdef char * p, * p0 cdef char * p, * p0
cdef int r cdef int r
cdef bytes buffer
buffer = PyString_FromStringAndSize (NULL, bytes) buffer = PyBytes_FromStringAndSize (NULL, nbytes)
p = PyString_AS_STRING (buffer) p = buffer
p0 = p p0 = p
while bytes: while nbytes:
if self._try_selfish() == 1: if self._try_selfish() == 1:
r = read (self.fd, p, bytes) r = read (self.fd, p, nbytes)
else: else:
r = -1 r = -1
libc.errno = libc.EWOULDBLOCK libc.errno = libc.EWOULDBLOCK
...@@ -658,13 +500,13 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -658,13 +500,13 @@ cdef public class sock [ object sock_object, type sock_type ]:
else: else:
raise_oserror() raise_oserror()
elif r == 0: elif r == 0:
raise EOFError, PyString_FromStringAndSize (buffer, p - p0) raise EOFError, PyBytes_FromStringAndSize (buffer, p - p0)
else: else:
bytes = bytes - r nbytes -= r
p = p + r p = p + r
return buffer return buffer
def readv (self, size_list): cpdef readv (self, list size_list):
"""Read a vector array of data. """Read a vector array of data.
This will repeatedly call readv until all data is received. If the end This will repeatedly call readv until all data is received. If the end
...@@ -688,17 +530,19 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -688,17 +530,19 @@ cdef public class sock [ object sock_object, type sock_type ]:
cdef int left cdef int left
cdef int complete_index cdef int complete_index
cdef int partial_size cdef int partial_size
cdef bytes buffer, new_buffer
cdef tuple buffer_tuple, new_buffer_tuple
received = 0 received = 0
n = PySequence_Size(size_list) n = len (size_list)
iov = <iovec *> PyMem_Malloc(sizeof(iovec) * n) iov = <iovec *> PyMem_Malloc(sizeof(iovec) * n)
# Prepare string buffers in which to read the result. # Prepare string buffers in which to read the result.
buffer_tuple = PyTuple_New(n) buffer_tuple = PyTuple_New(n)
for i from 0 <= i < n: for i from 0 <= i < n:
size = PySequence_GetItem(size_list, i) size = size_list[i]
buffer = PyString_FromStringAndSize(NULL, size) buffer = PyBytes_FromStringAndSize(NULL, size)
PyTuple_SET_ITEM(buffer_tuple, i, buffer) buffer_tuple[i] = buffer
Py_INCREF(buffer) Py_INCREF(buffer)
try: try:
...@@ -731,7 +575,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -731,7 +575,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
# Unfortunately can't call _PyString_Resize in Pyrex. :( # Unfortunately can't call _PyString_Resize in Pyrex. :(
if complete_index == -1: if complete_index == -1:
new_buffer_tuple = PyTuple_New(1) new_buffer_tuple = PyTuple_New(1)
buffer = PyTuple_GET_ITEM(buffer_tuple, 0) buffer = buffer_tuple[0]
Py_INCREF(buffer) Py_INCREF(buffer)
new_buffer = buffer[:partial_size] new_buffer = buffer[:partial_size]
PyTuple_SET_ITEM(new_buffer_tuple, 0, new_buffer) PyTuple_SET_ITEM(new_buffer_tuple, 0, new_buffer)
...@@ -739,11 +583,11 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -739,11 +583,11 @@ cdef public class sock [ object sock_object, type sock_type ]:
else: else:
new_buffer_tuple = PyTuple_New(complete_index + 2) new_buffer_tuple = PyTuple_New(complete_index + 2)
for i from 0 <= i <= complete_index: for i from 0 <= i <= complete_index:
buffer = PyTuple_GET_ITEM(buffer_tuple, i) buffer = buffer_tuple[i]
Py_INCREF(buffer) Py_INCREF(buffer)
PyTuple_SET_ITEM(new_buffer_tuple, i, buffer) PyTuple_SET_ITEM(new_buffer_tuple, i, buffer)
Py_INCREF(buffer) Py_INCREF(buffer)
buffer = PyTuple_GET_ITEM(buffer_tuple, complete_index+1) buffer = buffer_tuple[complete_index+1]
Py_INCREF(buffer) Py_INCREF(buffer)
new_buffer = buffer[:partial_size] new_buffer = buffer[:partial_size]
PyTuple_SET_ITEM(new_buffer_tuple, complete_index+1, new_buffer) PyTuple_SET_ITEM(new_buffer_tuple, complete_index+1, new_buffer)
...@@ -757,7 +601,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -757,7 +601,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
finally: finally:
PyMem_Free(iov) PyMem_Free(iov)
def writev (self, data): cpdef writev (self, list data):
"""Write a vector array of data. """Write a vector array of data.
This will repeatedly call writev until all data is sent. If it is This will repeatedly call writev until all data is sent. If it is
...@@ -774,9 +618,10 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -774,9 +618,10 @@ cdef public class sock [ object sock_object, type sock_type ]:
cdef int r, left, size, sent cdef int r, left, size, sent
cdef int n, i, j cdef int n, i, j
cdef iovec * iov cdef iovec * iov
cdef bytes elem
sent = 0 sent = 0
n = PySequence_Size (data) n = len (data)
iov = <iovec *> PyMem_Malloc (sizeof (iovec) * n) iov = <iovec *> PyMem_Malloc (sizeof (iovec) * n)
try: try:
...@@ -787,8 +632,8 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -787,8 +632,8 @@ cdef public class sock [ object sock_object, type sock_type ]:
i = 0 i = 0
j = 0 j = 0
while i < n: while i < n:
elem = PySequence_GetItem (data, i) elem = data[i]
size = PyString_Size (elem) size = len (elem)
# three cases: # three cases:
# [--------][XXXXXXXX][-----------] # [--------][XXXXXXXX][-----------]
# 3 | 2 | 1 # 3 | 2 | 1
...@@ -799,12 +644,12 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -799,12 +644,12 @@ cdef public class sock [ object sock_object, type sock_type ]:
pass pass
elif sent > left: elif sent > left:
# 2) this buffer contains <sent> # 2) this buffer contains <sent>
iov[j].iov_base = (PyString_AS_STRING (elem)) + (sent - left) iov[j].iov_base = <void*>(<char*>elem + (sent - left))
iov[j].iov_len = size - (sent - left) iov[j].iov_len = size - (sent - left)
j = j + 1 j = j + 1
else: else:
# 3) this buffer is after <sent> # 3) this buffer is after <sent>
iov[j].iov_base = PyString_AS_STRING (elem) iov[j].iov_base = <void*><char*>elem
iov[j].iov_len = size iov[j].iov_len = size
j = j + 1 j = j + 1
left = left + size left = left + size
...@@ -830,107 +675,170 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -830,107 +675,170 @@ cdef public class sock [ object sock_object, type sock_type ]:
finally: finally:
PyMem_Free (iov) PyMem_Free (iov)
def recv_into(self, buffer, int nbytes=0, int flags=0): # SMR 20120717 temporarily disabling this code, cython does not expose this part
"""Receive data into a Python buffer. # of the 'old' buffer interface. Looks like there's a new one, and the necessary
# compatibility will require updating this code.
IF False:
cpdef recv_into (self, buffer, int nbytes=0, int flags=0):
"""Receive data into a Python buffer.
This is for the Python buffer interface. If you don't know what that This is for the Python buffer interface. If you don't know what that
is, move along. This method is for Python socket compatibility. is, move along. This method is for Python socket compatibility.
:param buffer: A writeable Python buffer object. Must be a contiguous :param buffer: A writeable Python buffer object. Must be a contiguous
segment. segment.
:param nbytes: Number of bytes to read. Must be less than or equal to :param nbytes: Number of bytes to read. Must be less than or equal to
the size of the buffer. Defaults to 0 which means the size of the size of the buffer. Defaults to 0 which means the size of
``buffer``. ``buffer``.
:param flags: Flags for the recv system call (see recv(2) manpage). :param flags: Flags for the recv system call (see recv(2) manpage).
Defaults to 0. Defaults to 0.
:returns: The number of bytes read. :returns: The number of bytes read.
:raises OSError: OS-level error. :raises OSError: OS-level error.
""" """
cdef void *cbuf cdef void *cbuf
cdef Py_ssize_t cbuflen cdef Py_ssize_t cbuflen
cdef int r cdef int r
if nbytes < 0: if nbytes < 0:
raise ValueError('negative buffersize in recv_into') raise ValueError('negative buffersize in recv_into')
PyObject_AsWriteBuffer(buffer, &cbuf, &cbuflen) PyObject_AsWriteBuffer(buffer, &cbuf, &cbuflen)
if nbytes == 0: if nbytes == 0:
nbytes = cbuflen nbytes = cbuflen
if cbuflen < nbytes: if cbuflen < nbytes:
raise ValueError('buffer too small for requested bytes') raise ValueError('buffer too small for requested bytes')
while 1: while 1:
if self._try_selfish() == 1: if self._try_selfish() == 1:
r = recv(self.fd, cbuf, nbytes, flags) r = recv(self.fd, cbuf, nbytes, flags)
else:
r = -1
libc.errno = libc.EWOULDBLOCK
if r == -1:
if libc.errno == libc.EWOULDBLOCK:
self._wait_for_read()
else: else:
raise_oserror() r = -1
else: libc.errno = libc.EWOULDBLOCK
return r if r == -1:
if libc.errno == libc.EWOULDBLOCK:
self._wait_for_read()
else:
raise_oserror()
else:
return r
def recvfrom_into(self, buffer, int nbytes=0, int flags=0): cpdef recvfrom_into(self, buffer, int nbytes=0, int flags=0):
"""Receive data into a Python buffer. """Receive data into a Python buffer.
This is for the Python buffer interface. If you don't know what that This is for the Python buffer interface. If you don't know what that
is, move along. This method is for Python socket compatibility. is, move along. This method is for Python socket compatibility.
:param buffer: A writeable Python buffer object. Must be a contiguous :param buffer: A writeable Python buffer object. Must be a contiguous
segment. segment.
:param nbytes: Number of bytes to read. Must be less than or equal to :param nbytes: Number of bytes to read. Must be less than or equal to
the size of the buffer. Defaults to 0 which means the size of the size of the buffer. Defaults to 0 which means the size of
``buffer``. ``buffer``.
:param flags: Flags for the recv system call (see recvfrom(2) manpage). :param flags: Flags for the recv system call (see recvfrom(2) manpage).
Defaults to 0. Defaults to 0.
:returns: A tuple ``(nbytes, address)`` where ``bytes`` is the number :returns: A tuple ``(nbytes, address)`` where ``bytes`` is the number
of bytes read and ``address`` then it is the address of the remote of bytes read and ``address`` then it is the address of the remote
side. side.
:raises OSError: OS-level error. :raises OSError: OS-level error.
""" """
cdef sockaddr_storage sa cdef sockaddr_storage sa
cdef void *cbuf cdef void *cbuf
cdef Py_ssize_t cbuflen cdef Py_ssize_t cbuflen
cdef ssize_t nread cdef ssize_t nread
cdef socklen_t addr_len cdef socklen_t addr_len
cdef int r cdef int r
if nbytes < 0: if nbytes < 0:
raise ValueError('negative buffersize in recv_into') raise ValueError('negative buffersize in recv_into')
PyObject_AsWriteBuffer(buffer, &cbuf, &cbuflen) PyObject_AsWriteBuffer(buffer, &cbuf, &cbuflen)
if nbytes == 0: if nbytes == 0:
nbytes = cbuflen nbytes = cbuflen
if cbuflen < nbytes: if cbuflen < nbytes:
raise ValueError('buffer too small for requested bytes') raise ValueError('buffer too small for requested bytes')
while 1: while 1:
if self._try_selfish() == 1: if self._try_selfish() == 1:
addr_len = sizeof(sockaddr_storage) addr_len = sizeof(sockaddr_storage)
memset(&sa, 0, sizeof(sockaddr_storage)) memset(&sa, 0, sizeof(sockaddr_storage))
r = recvfrom(self.fd, cbuf, nbytes, flags, <sockaddr*>&sa, &addr_len) r = recvfrom(self.fd, cbuf, nbytes, flags, <sockaddr*>&sa, &addr_len)
else:
r = -1
libc.errno = libc.EWOULDBLOCK
if r == -1:
if libc.errno == libc.EWOULDBLOCK:
self._wait_for_read()
else: else:
raise_oserror() r = -1
libc.errno = libc.EWOULDBLOCK
if r == -1:
if libc.errno == libc.EWOULDBLOCK:
self._wait_for_read()
else:
raise_oserror()
else:
address = self.unparse_address(&sa, addr_len)
return r, address
cdef parse_address_inet (self, tuple address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve):
cdef sockaddr_in * sin = <sockaddr_in *>sa
cdef bytes ip
cdef uint16_t port
ip, port = address
if not ip:
ip = b'0.0.0.0'
sin.sin_family = AF_INET
IF UNAME_SYSNAME == "FreeBSD":
sin.sin_len = sizeof (sockaddr_in)
addr_len[0] = sizeof (sockaddr_in)
sin.sin_port = htons(port)
r = inet_pton (AF_INET, ip, &sin.sin_addr)
if r != 1:
if resolve:
# recurse
self.parse_address_inet (
(the_resolver.resolve_ipv4 (ip), port), sa, addr_len, False
)
else:
raise ValueError ("not a valid IPv4 address")
cdef parse_address_inet6 (self, tuple address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve):
cdef sockaddr_in6 * sin6 = <sockaddr_in6 *> sa
cdef bytes ip
cdef uint16_t port
ip, port = address
if not ip:
ip = b'::'
sin6.sin6_family = AF_INET6
IF UNAME_SYSNAME == "FreeBSD":
sin6.sin6_len = sizeof(sockaddr_in6)
addr_len[0] = sizeof(sockaddr_in6)
sin6.sin6_port = htons(port)
r = inet_pton(AF_INET6, ip, &sin6.sin6_addr)
if r != 1:
if resolve:
# recurse
self.parse_address_inet6 (
(the_resolver.resolve_ipv6 (ip), port), sa, addr_len, False
)
else: else:
address = self.unparse_address(&sa, addr_len) raise ValueError ("not a valid IPv6 address")
return r, address
cdef parse_address_unix (self, bytes address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve):
cdef sockaddr_un * sun
# AF_UNIX
# +1 to grab the NUL char
l = len (address) + 1
sun = <sockaddr_un *>sa
sun.sun_family = AF_UNIX
IF UNAME_SYSNAME == "FreeBSD":
sun.sun_len = sizeof (sockaddr_un)
if (l < sizeof (sun.sun_path)):
memcpy (<void *>sun.sun_path, <void*><char*>address, l)
addr_len[0] = sizeof (sockaddr_un)
else:
raise ValueError, "name too long"
cdef parse_address (self, object address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve=False): cdef parse_address (self, object address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve=False):
"""Parse a Python socket address and set the C structure values. """Parse a Python socket address and set the C structure values.
...@@ -950,66 +858,14 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -950,66 +858,14 @@ cdef public class sock [ object sock_object, type sock_type ]:
cdef sockaddr_in6 *sin6 cdef sockaddr_in6 *sin6
cdef sockaddr_un * sun cdef sockaddr_un * sun
cdef int r, l cdef int r, l
if PyTuple_Check (address) and PyTuple_Size (address) == 2: if self.domain == AF_INET:
# AF_INET and AF_INET6 return self.parse_address_inet (address, sa, addr_len, resolve)
sin = <sockaddr_in *>sa elif self.domain == AF_INET6:
sin6 = <sockaddr_in6 *>sa return self.parse_address_inet6 (address, sa, addr_len, resolve)
ip = PySequence_GetItem(address, 0) elif self.domain == AF_UNIX:
port = PySequence_GetItem(address, 1) return self.parse_address_unix (address, sa, addr_len, resolve)
if not PyString_Check (ip):
raise ValueError, "IP address must be a string"
if PyString_Size (ip) == 0:
if self.domain == AF_INET:
ip = "0.0.0.0"
elif self.domain == AF_INET6:
ip = "::"
else:
raise ValueError, "Unsupported address family: %d" % self.domain
if self.domain == AF_INET:
sin.sin_family = AF_INET
IF UNAME_SYSNAME == "FreeBSD":
sin.sin_len = sizeof(sockaddr_in)
addr_len[0] = sizeof(sockaddr_in)
sin.sin_port = htons(port)
r = inet_pton(AF_INET, PyString_AsString(ip), &sin.sin_addr)
if r != 1:
if resolve:
return self.parse_address (
(the_resolver.resolve_ipv4 (ip), port), sa, addr_len, False
)
else:
raise ValueError ("not a valid IPv4 address")
elif self.domain == AF_INET6:
sin6.sin6_family = AF_INET6
IF UNAME_SYSNAME == "FreeBSD":
sin6.sin6_len = sizeof(sockaddr_in6)
addr_len[0] = sizeof(sockaddr_in6)
sin6.sin6_port = htons(port)
r = inet_pton(AF_INET6, PyString_AsString(ip), &sin6.sin6_addr)
if r != 1:
if resolve:
return self.parse_address (
(the_resolver.resolve_ipv6 (ip), port), sa, addr_len, False
)
else:
raise ValueError ("not a valid IPv6 address")
else:
raise ValueError, "Unsupported address family: %d" % self.domain
elif PyString_Check (address) and address[0] == '/':
# AF_UNIX
# +1 to grab the NUL char
l = PyString_Size (address) + 1
sun = <sockaddr_un *>sa
sun.sun_family = AF_UNIX
IF UNAME_SYSNAME == "FreeBSD":
sun.sun_len = sizeof (sockaddr_un)
if (l < sizeof (sun.sun_path)):
memcpy (<void *>sun.sun_path, PyString_AS_STRING (address), l)
addr_len[0] = sizeof (sockaddr_un)
else:
raise ValueError, "name too long"
else: else:
raise ValueError, "can't interpret argument as socket address" raise ValueError, "can't parse address for this socket domain"
cdef object unparse_address (self, sockaddr_storage *sa, socklen_t addr_len): cdef object unparse_address (self, sockaddr_storage *sa, socklen_t addr_len):
"""Unpack a C-socket address structure and generate a Python address object. """Unpack a C-socket address structure and generate a Python address object.
...@@ -1030,11 +886,11 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1030,11 +886,11 @@ cdef public class sock [ object sock_object, type sock_type ]:
if (<sockaddr_in *>sa).sin_family == AF_INET: if (<sockaddr_in *>sa).sin_family == AF_INET:
sin = <sockaddr_in *> sa sin = <sockaddr_in *> sa
inet_ntop (AF_INET, &(sin.sin_addr), ascii_buf, INET_ADDRSTRLEN) inet_ntop (AF_INET, &(sin.sin_addr), ascii_buf, INET_ADDRSTRLEN)
return (PyString_FromString(ascii_buf), ntohs(sin.sin_port)) return (ascii_buf, ntohs(sin.sin_port))
elif (<sockaddr_in6 *>sa).sin6_family == AF_INET6: elif (<sockaddr_in6 *>sa).sin6_family == AF_INET6:
sin6 = <sockaddr_in6 *> sa sin6 = <sockaddr_in6 *> sa
inet_ntop (AF_INET6, &(sin6.sin6_addr), ascii_buf, INET6_ADDRSTRLEN) inet_ntop (AF_INET6, &(sin6.sin6_addr), ascii_buf, INET6_ADDRSTRLEN)
return (PyString_FromString(ascii_buf), ntohs(sin6.sin6_port)) return (ascii_buf, ntohs(sin6.sin6_port))
elif (<sockaddr_un *>sa).sun_family == AF_UNIX: elif (<sockaddr_un *>sa).sun_family == AF_UNIX:
sun = <sockaddr_un *>sa sun = <sockaddr_un *>sa
return sun.sun_path return sun.sun_path
...@@ -1102,7 +958,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1102,7 +958,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
cpdef connect (self, address): cpdef connect (self, address):
return self.connect_addr (address, True) return self.connect_addr (address, True)
def bind (self, address): cpdef bind (self, address):
"""Bind the socket. """Bind the socket.
:param address: The address to bind to. For IP, it should be a :param address: The address to bind to. For IP, it should be a
...@@ -1124,7 +980,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1124,7 +980,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
if r == -1: if r == -1:
raise_oserror() raise_oserror()
def listen (self, backlog): cpdef listen (self, int backlog):
"""Set the socket to listen for connections. """Set the socket to listen for connections.
:param backlog: The maximum size of the queue for pending connections. :param backlog: The maximum size of the queue for pending connections.
...@@ -1136,7 +992,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1136,7 +992,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
if r == -1: if r == -1:
raise_oserror() raise_oserror()
def accept (self): cpdef accept (self):
"""Accept a connection. """Accept a connection.
:returns: A tuple ``(socket, address)`` where ``socket`` is a socket :returns: A tuple ``(socket, address)`` where ``socket`` is a socket
...@@ -1171,7 +1027,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1171,7 +1027,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
self.unparse_address (&sa, addr_len) self.unparse_address (&sa, addr_len)
) )
def accept_many (self, int max=0): cpdef accept_many (self, int max=0):
"""Accept multiple connections. """Accept multiple connections.
This will accept up to ``max`` connections for any connections available This will accept up to ``max`` connections for any connections available
...@@ -1191,6 +1047,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1191,6 +1047,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
cdef int r, count cdef int r, count
cdef int upper_limit cdef int upper_limit
cdef coro me cdef coro me
cdef list result
count = 0 count = 0
result = PyList_New(max) result = PyList_New(max)
if max == 0: if max == 0:
...@@ -1215,13 +1072,13 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1215,13 +1072,13 @@ cdef public class sock [ object sock_object, type sock_type ]:
self.unparse_address (&sa, addr_len) self.unparse_address (&sa, addr_len)
) )
if max == 0: if max == 0:
PyList_Append (result, element) result.append (element)
else: else:
PyList_SetItem_SAFE (result, count, element) result[count] = element
count = count + 1 count = count + 1
return result return result
def shutdown (self, int how): cpdef shutdown (self, int how):
"""Shutdown the socket. """Shutdown the socket.
:param how: How to shut down the socket (see the shutdown(2) manpage). :param how: How to shut down the socket (see the shutdown(2) manpage).
...@@ -1235,7 +1092,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1235,7 +1092,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
else: else:
return None return None
def getpeername (self): cpdef getpeername (self):
"""Get the remote-side address. """Get the remote-side address.
:returns: A ``(IP, port)`` tuple for IP addresses where IP is a :returns: A ``(IP, port)`` tuple for IP addresses where IP is a
...@@ -1255,7 +1112,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1255,7 +1112,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
else: else:
return self.unparse_address (&sa, addr_len) return self.unparse_address (&sa, addr_len)
def getsockname (self): cpdef getsockname (self):
"""Get the local address of the socket. """Get the local address of the socket.
:returns: A ``(IP, port)`` tuple for IP addresses where IP is a :returns: A ``(IP, port)`` tuple for IP addresses where IP is a
...@@ -1300,7 +1157,7 @@ cdef public class sock [ object sock_object, type sock_type ]: ...@@ -1300,7 +1157,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
# platform issue). # platform issue).
return __socketmodule._fileobject(self.dup(), mode, bufsize, True) return __socketmodule._fileobject(self.dup(), mode, bufsize, True)
def dup(self): cpdef dup(self):
"""Duplicate the socket object using the OS dup() call. """Duplicate the socket object using the OS dup() call.
:returns: A new sock instance that holds the new file descriptor. :returns: A new sock instance that holds the new file descriptor.
...@@ -1421,7 +1278,7 @@ cdef class file_sock(sock): ...@@ -1421,7 +1278,7 @@ cdef class file_sock(sock):
When the object is deallocated, the file descriptor is closed. When the object is deallocated, the file descriptor is closed.
""" """
cdef object _fileobj #cdef object _fileobj
def __init__(self, fileobj): def __init__(self, fileobj):
# we need to keep the original file object around, because if its # we need to keep the original file object around, because if its
......
...@@ -25,6 +25,8 @@ __sync_version__ = "$Id: //prod/main/ap/shrapnel/coro/sync.pyx#29 $" ...@@ -25,6 +25,8 @@ __sync_version__ = "$Id: //prod/main/ap/shrapnel/coro/sync.pyx#29 $"
# Note: this file is included by <coro.pyx> # Note: this file is included by <coro.pyx>
from cpython.list cimport PyList_New
# ================================================================================ # ================================================================================
# synchronization primitives # synchronization primitives
# ================================================================================ # ================================================================================
...@@ -757,11 +759,12 @@ cdef class fifo: ...@@ -757,11 +759,12 @@ cdef class fifo:
empty. empty.
""" """
cdef int i cdef int i
cdef list result
while self.fifo.size == 0: while self.fifo.size == 0:
self.cv.wait() self.cv.wait()
result = [None] * self.fifo.size result = [None] * self.fifo.size
i = 0 i = 0
while self.fifo.size: while self.fifo.size:
PySequence_SetItem (result, i, self.fifo._pop()) result[i] = self.fifo._pop()
i = i + 1 i = i + 1
return result return result
# -*- Mode: Cython -*-
cimport zlib
cdef class zstack:
cdef zlib.z_stream squish, unsquish
# this buffer is only used for compression
cdef unsigned char * buffer
cdef int buffer_size
cdef size_t deflate (self, void * base, size_t size)
cdef size_t inflate (self, void * dst, size_t dsize, void * src, size_t ssize)
...@@ -6,10 +6,6 @@ class ZlibError (Exception): ...@@ -6,10 +6,6 @@ class ZlibError (Exception):
cimport zlib cimport zlib
cdef class zstack: cdef class zstack:
cdef zlib.z_stream squish, unsquish
# this buffer is only used for compression
cdef unsigned char * buffer
cdef int buffer_size
def __init__ (self, int size=4*1024*1024): def __init__ (self, int size=4*1024*1024):
cdef int r cdef int r
self.squish.zalloc = NULL self.squish.zalloc = NULL
......
...@@ -20,7 +20,6 @@ except ImportError: ...@@ -20,7 +20,6 @@ except ImportError:
) )
sys.exit (-1) sys.exit (-1)
include_dir = os.getcwd() include_dir = os.getcwd()
def newer(x, y): def newer(x, y):
...@@ -47,6 +46,14 @@ def check_lio(): ...@@ -47,6 +46,14 @@ def check_lio():
status = os.system('test/build/test_lio') status = os.system('test/build/test_lio')
return exit_ok(status) return exit_ok(status)
compile_time_env = {
'COMPILE_LIO': check_lio(),
'COMPILE_NETDEV' : False,
'COMPILE_LZO' : False,
'COMPILE_LZ4' : False,
'CORO_DEBUG': False,
}
setup ( setup (
name='coro', name='coro',
version='1.0.2-000', version='1.0.2-000',
...@@ -69,40 +76,34 @@ setup ( ...@@ -69,40 +76,34 @@ setup (
'coro._coro', 'coro._coro',
['coro/_coro.pyx', 'coro/swap.c'], ['coro/_coro.pyx', 'coro/swap.c'],
extra_compile_args = ['-Wno-unused-function'], extra_compile_args = ['-Wno-unused-function'],
depends=(glob.glob('coro/*.pyx') + depends = (
glob.glob('coro/*.pxi') + glob.glob('coro/*.pyx') +
glob.glob('coro/*.pxd') + glob.glob('coro/*.pxi') +
[os.path.join(include_dir, 'pyrex', 'python.pxi'), glob.glob('coro/*.pxd') + [
os.path.join(include_dir, 'pyrex', 'pyrex_helpers.pyx'), os.path.join(include_dir, 'pyrex', 'python.pxi'),
os.path.join(include_dir, 'include', 'pyrex_helpers.h'), os.path.join(include_dir, 'pyrex', 'pyrex_helpers.pyx'),
os.path.join(include_dir, 'pyrex', os.path.join(include_dir, 'include', 'pyrex_helpers.h'),
'tsc_time_include.pyx'), os.path.join(include_dir, 'pyrex', 'tsc_time_include.pyx'),
os.path.join(include_dir, 'include', 'tsc_time.h'), os.path.join(include_dir, 'include', 'tsc_time.h'),
os.path.join(include_dir, 'pyrex', 'libc.pxd'), os.path.join(include_dir, 'pyrex', 'libc.pxd'),
] ]
), ),
pyrex_include_dirs=[ pyrex_include_dirs=[
os.path.join(include_dir, '.'), os.path.join(include_dir, '.'),
os.path.join(include_dir, 'pyrex'), os.path.join(include_dir, 'pyrex'),
], ],
#include_dirs=[os.path.join(include_dir, 'pyrex')],
include_dirs=[ include_dirs=[
os.path.join(include_dir, '.'), os.path.join(include_dir, '.'),
os.path.join(include_dir, 'include'), os.path.join(include_dir, 'include'),
], ],
#pyrex_compile_time_env={'COMPILE_LIO': check_lio(), pyrex_compile_time_env = compile_time_env,
# 'CORO_DEBUG': True, # to enable LZO|LZ4 for stack compression, set COMPILE_LZO|COMPILE_LZ4 above
# },
# to enable LZO|LZ4 for stack compression, set COMPILE_LZO|COMPILE_LZ4 in coro/_coro.pyx
# and uncomment one of the following: # and uncomment one of the following:
#libraries=['lzo2', 'z'] #libraries=['lzo2', 'z']
#libraries=['lz4', 'z'], #libraries=['lz4', 'z'],
libraries=['z'] libraries=['z']
), ),
Extension( Extension ('coro.oserrors', ['coro/oserrors.pyx', ],),
'coro.oserrors',
['coro/oserrors.pyx', ],
),
Extension ('coro.dns.packet', ['coro/dns/packet.pyx', ],), Extension ('coro.dns.packet', ['coro/dns/packet.pyx', ],),
Extension ('coro.asn1.ber', ['coro/asn1/ber.pyx'],), Extension ('coro.asn1.ber', ['coro/asn1/ber.pyx'],),
Extension ('coro.ldap.query', ['coro/ldap/query.pyx'],), Extension ('coro.ldap.query', ['coro/ldap/query.pyx'],),
...@@ -118,7 +119,7 @@ setup ( ...@@ -118,7 +119,7 @@ setup (
], ],
packages=['coro', 'coro.clocks', 'coro.http', 'coro.dns', 'coro.emulation'], packages=['coro', 'coro.clocks', 'coro.http', 'coro.dns', 'coro.emulation'],
package_dir = { package_dir = {
# '': 'coroutine', # '': 'coroutine',
'coro': 'coro', 'coro': 'coro',
'coro.clocks': 'coro/clocks', 'coro.clocks': 'coro/clocks',
'coro.dns': 'coro/dns', 'coro.dns': 'coro/dns',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment