Commit 5187b9ce authored by Sam Rushing's avatar Sam Rushing

Merge branch 'master' of github.com:ironport/shrapnel

parents 720d3c22 44a4699b
......@@ -5,5 +5,7 @@ build/
coro/_coro.[ch]
coro/oserrors.[ch]
coro/clocks/tsc_time.c
coro/dns/packet.c
coro/event_queue.cpp
*.pyc
*.so
......@@ -11,7 +11,7 @@ connections.
API Documentation
=================
See http://ironport.github.com/api/shrapnel/frames.html
See http://ironport.github.com/shrapnel/
Short History Of Python Coroutine Implementations
......
......@@ -20,113 +20,7 @@
# $Header: //prod/main/ap/shrapnel/coro/__init__.py#31 $
"""Coroutine threading library.
Introduction
============
Shrapnel is a cooperative threading library.
Getting Started
===============
When your process starts up, you must spawn a thread to do some work, and then
start the event loop. The event loop runs forever processing events until the
process exits. An example::
import coro
def main():
print 'Hello world!'
# This will cause the process to exit.
coro.set_exit(0)
coro.spawn(main)
coro.event_loop()
Coroutines
==========
Every coroutine thread is created with either the `new` function (which does
NOT automatically start the thread) or the `spawn` function (which DOES
automatically start it).
Every thread has a unique numeric ID. You may also set the name of the thread
when you create it.
Timeouts
========
The shrapnel timeout facility allows you to execute a function which will be
interrupted if it does not finish within a specified period of time. The
`coro.TimeoutError` exception will be raised if the timeout expires. See the
`with_timeout` docstring for more detail.
If the event loop is not running (such as in a non-coro process), a custom
version of `with_timeout` is installed that will operate using SIGALRM so that
you may use `with_timeout` in code that needs to run in non-coro processes
(though this is not recommended and should be avoided if possible).
Thread Local Storage
====================
There is a thread-local storage interface available for storing global data that
is thread-specific. You instantiate a `ThreadLocal` instance and you can
assign attributes to it that will be specific to that thread. See the
`ThreadLocal` docs for more detail.
Signal Handlers
===============
By default when you start the event loop, two signal handlers are installed
(for SIGTERM and SIGINT). The default signal handler will exit the event loop.
You can change this behavior by setting `install_signal_handlers` to False
before starting the event loop.
See `coro.signal_handler` for more detail on setting coro signal handlers.
Selfishness
===========
Certain socket operations are allowed to try to execute without blocking if
they are able to (such as send/receiving data on a local socket or on a
high-speed network). However, there is a limit to the number of times a thread
is allowed to do this. The default is 4. The default may be changed
(`set_selfishness`) and the value on a per-thread may be changed
(`coro.coro.set_max_selfish_acts`).
Time
====
Shrapnel uses the `tsc_time` module for handling time. It uses the TSC
value for a stable and high-resolution unit of time. See that module's
documentation for more detail.
A thread is always created when you start the event loop that will
resynchronize the TSC relationship to accomodate any clock drift (see
`tick_updater` and `tsc_time.update_time_relation`).
Exception Notifier
==================
When a thread exits due to an exception, by default a stack trace is printed to
stderr. You may install your own callback to handle this situation. See the
`set_exception_notifier` function for more detail.
Debug Output
============
The shrapnel library provides a mechanism for printing debug information to
stderr. The `print_stderr` function will print a string with a timestamp
and the thread number. The `write_stderr` function writes the string verbatim.
Shrapnel keeps a reference to the "real" stderr (in `saved_stderr`) and the
`print_stderr` and `write_stderr` functions always use the real stderr value. A
particular reason for doing this is the backdoor module replaces sys.stderr and
sys.stdout, but we do not want debug output to go to the interactive session.
Profiling
=========
Shrapnel has its own profiler that is coro-aware. See `coro.profiler` for
details on how to run the profiler.
:Variables:
- `all_threads`: A dictionary of all live coroutine objects. The key is
the coroutine ID, and the value is the coroutine object.
- `saved_stderr`: The actual stderr object for the process. This normally
should not be used. An example of why this exists is because the
backdoor replaces sys.stderr while executing user code.
"""
"""Coroutine threading library."""
from coro._coro import *
from coro._coro import _yield
......@@ -166,13 +60,12 @@ set_exception_notifier (default_exception_notifier)
class InParallelError (Exception):
"""An error occurred in the `in_parallel` function.
"""An error occurred in the :func:`in_parallel` function.
:IVariables:
- `result_list`: A list of ``(status, result)`` tuples. ``status`` is
either `SUCCESS` or `FAILURE`. For success, the result is the return
:ivar result_list: A list of ``(status, result)`` tuples. ``status`` is
either :data:`SUCCESS` or :data:`FAILURE`. For success, the result is the return
value of the function. For failure, it is the output from
`sys.exc_info`.
``sys.exc_info``.
"""
def __init__(self, result_list):
......@@ -195,17 +88,14 @@ def in_parallel (fun_arg_list):
This will block until all functions have returned or raised an exception.
If one or more functions raises an exception, then the `InParallelError`
If one or more functions raises an exception, then the :exc:`InParallelError`
exception will be raised.
:Parameters:
- `fun_arg_list`: A list of ``(fun, args)`` tuples.
:param fun_arg_list: A list of ``(fun, args)`` tuples.
:Return:
Returns a list of return values from the functions.
:returns: A list of return values from the functions.
:Exceptions:
- `InParallelError`: One or more of the functions raised an exception.
:raises InParallelError: One or more of the functions raised an exception.
"""
# InParallelError, [(SUCCESS, result0), (FAILURE, exc_info1), ...]
......@@ -257,14 +147,11 @@ def tick_updater():
def waitpid (pid):
"""Wait for a process to exit.
:Parameters:
- `pid`: The process ID to wait for.
:param pid: The process ID to wait for.
:Return:
Returns a tuple ``(pid, status)`` of the process.
:returns: A tuple ``(pid, status)`` of the process.
:Exceptions:
- `SimultaneousError`: Something is already waiting for this process
:raises SimultaneousError: Something is already waiting for this process
ID.
"""
if UNAME == "Linux":
......@@ -290,14 +177,11 @@ def waitpid (pid):
def get_thread_by_id (thread_id):
"""Get a coro thread by ID.
:Parameters:
- `thread_id`: The thread ID.
:param thread_id: The thread ID.
:Return:
Returns the coroutine object.
:returns: The coroutine object.
:Exceptions:
- `KeyError`: The coroutine does not exist.
:raises KeyError: The coroutine does not exist.
"""
return all_threads[thread_id]
......@@ -305,11 +189,9 @@ def where (co):
"""Return a string indicating where the given coroutine thread is currently
running.
:Parameters:
- `co`: The coroutine object.
:param co: The coroutine object.
:Return:
Returns a string displaying where the coro thread is currently
:returns: A string displaying where the coro thread is currently
executing.
"""
f = co.get_frame()
......@@ -318,8 +200,7 @@ def where (co):
def where_all():
"""Get a dictionary of where all coroutines are currently executing.
:Return:
Returns a dictionary mapping the coroutine ID to a tuple of ``(name,
:returns: A dictionary mapping the coroutine ID to a tuple of ``(name,
coro, where)`` where ``where`` is a string representing where the
coroutine is currently running.
"""
......@@ -339,13 +220,11 @@ def spawn (fun, *args, **kwargs):
Additional arguments and keyword arguments will be passed to the given function.
:Parameters:
- `fun`: The function to call when the coroutine starts.
- `thread_name`: The name of the thread. Defaults to the name of the
:param fun: The function to call when the coroutine starts.
:param thread_name: The name of the thread. Defaults to the name of the
function.
:Return:
Returns the new coroutine object.
:returns: The new coroutine object.
"""
if kwargs.has_key('thread_name'):
thread_name = kwargs['thread_name']
......@@ -364,13 +243,11 @@ def new (fun, *args, **kwargs):
This will not start the coroutine. Call the ``start`` method on the
coroutine to schedule it to run.
:Parameters:
- `fun`: The function to call when the coroutine starts.
- `thread_name`: The name of the thread. Defaults to the name of the
:param fun: The function to call when the coroutine starts.
:param thread_name: The name of the thread. Defaults to the name of the
function.
:Return:
Returns the new coroutine object.
:returns: The new coroutine object.
"""
if kwargs.has_key('thread_name'):
thread_name = kwargs['thread_name']
......@@ -457,8 +334,7 @@ event_loop_is_running = False
def coro_is_running():
"""Determine if the coro event loop is running.
:Return:
Returns True if the event loop is running, otherwise False.
:returns: True if the event loop is running, otherwise False.
"""
return event_loop_is_running
......@@ -468,8 +344,7 @@ def sigterm_handler (*_unused_args):
def event_loop (timeout=30):
"""Start the event loop.
:Parameters:
- `timeout`: The amount of time to wait for kevent to return
:param timeout: The amount of time to wait for kevent to return
events. You should probably *not* set this value.
"""
global event_loop_is_running, with_timeout, sleep_relative
......
# Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
#
# Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
......@@ -19,14 +19,13 @@
# SOFTWARE.
# -*- Mode: Pyrex -*-
#cython: embedsignature=True
"""Pyrex module for coroutine implementation.
Module variables defined below are available only from Pyrex. Python-accessible
variables are documented in the top level of the coro package ``__init__.py``.
:Variables:
- `_ticks_per_sec`: Number of CPU ticks per second (uint64_t).
"""
__coro_version__ = "$Id: //prod/main/ap/shrapnel/coro/_coro.pyx#114 $"
......@@ -82,6 +81,7 @@ cdef extern from "Python.h":
# ================================================================================
# global variables
# ================================================================================
# Number of CPU ticks per second (uint64_t).
cdef uint64_t _ticks_per_sec
cdef object ticks_per_sec
_ticks_per_sec = tsc_time_module.ticks_per_sec
......@@ -155,10 +155,6 @@ cdef enum:
import sys
class Exit (Exception):
"exception used to exit the event loop"
pass
class ScheduleError (Exception):
"attempt to schedule an already-scheduled coroutine"
pass
......@@ -206,6 +202,12 @@ cdef int default_selfishness
default_selfishness = 4
def set_selfishness(n):
"""Set the global default selfishness limit.
This sets the default for every new coroutine.
:param n: The new limit.
"""
global default_selfishness
default_selfishness = n
......@@ -214,11 +216,10 @@ live_coros = 0
cdef public class coro [ object _coro_object, type _coro_type ]:
"""XXX
"""The coroutine object.
:IVariables:
- `top`: A `call_stack` object used by the profiler. NULL if the
profiler is not enabled or if this is the first call of the coroutine.
Do not create this object directly. Use either :func:`new` or
:func:`spawn` to create one.
"""
cdef machine_state state
......@@ -232,7 +233,8 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
cdef size_t stack_size
cdef PyFrameObject * frame
cdef void * saved_exception_data[6]
# used only by the profiler
# used only by the profiler, a call_stack object. NULL if the profiler is
# not enabled or if this is the first call of the coroutine.
cdef call_stack * top
cdef int saved_recursion_depth
cdef int selfish_acts, max_selfish_acts
......@@ -488,45 +490,39 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
cdef _schedule (self, value):
"""Schedule this coroutine to run.
:Parameters:
- `value`: The value to resume the coroutine with. Note that
"interrupting" the coroutine resumes it with a special
`exception` value which is checked when this coro is resumed.
:param value: The value to resume the coroutine with. Note that
"interrupting" the coroutine resumes it with a special
``exception`` value which is checked when this coro is resumed.
:Exceptions:
- `DeadCoroutine`: The coroutine is dead (it has already exited).
- `ScheduleError`: The coroutine is already scheduled to run.
- `ScheduleError`: Attempted to schedule the currently running coro.
:raises DeadCoroutine: The coroutine is dead (it has already exited).
:raises ScheduleError: The coroutine is already scheduled to run.
:raises ScheduleError: Attempted to schedule the currently running coro.
"""
the_scheduler._schedule (self, value)
cdef _unschedule (self):
"""Unschedule this coroutine.
:Return:
Returns True if it was successfully unscheduled, False if not.
:returns: True if it was successfully unscheduled, False if not.
"""
return the_scheduler._unschedule (self)
def schedule (self, value=None):
"""Schedule this coroutine to run.
:Parameters:
- `value`: The value to resume the coroutine with. Defaults to
None.
:param value: The value to resume the coroutine with. Defaults to
None.
:Exceptions:
- `DeadCoroutine`: The coroutine is dead (it has already exited).
- `ScheduleError`: The coroutine is already scheduled to run.
- `ScheduleError`: Attempted to schedule the currently running coro.
:raises DeadCoroutine: The coroutine is dead (it has already exited).
:raises ScheduleError: The coroutine is already scheduled to run.
:raises ScheduleError: Attempted to schedule the currently running coro.
"""
return self._schedule (value)
def start (self):
"""Start the coroutine for the first time.
:Exceptions:
- `ScheduleError`: The coro is already started.
:raises ScheduleError: The coro is already started.
"""
if self.started:
raise ScheduleError(self)
......@@ -549,20 +545,18 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
cdef __interrupt (self, the_exception):
"""Schedule the coro to resume with an exception.
:Parameters:
- `the_exception`: The exception to raise (may be class or instance).
:param the_exception: The exception to raise (may be class or instance).
:Exceptions:
- `DeadCoroutine`: The coroutine is dead (it has already exited).
- `ScheduleError`: The coroutine is already scheduled to run.
- `ScheduleError`: Attempted to interrupt the currently running coro.
:raises DeadCoroutine: The coroutine is dead (it has already exited).
:raises ScheduleError: The coroutine is already scheduled to run.
:raises ScheduleError: Attempted to interrupt the currently running coro.
"""
self._schedule (exception (the_exception))
def shutdown (self):
"""Shut down this coroutine.
This will raise the `Shutdown` exception on this thread.
This will raise the :exc:`Shutdown` exception on this thread.
This method will not fail. If the thread is already dead, then it is
ignored. If the thread hasn't started, then it is canceled.
......@@ -573,22 +567,19 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
def raise_exception (self, the_exception, force=True, cancel_start=False):
"""Schedule this coroutine to resume with an exception.
:Parameters:
- `the_exception`: The exception to raise. May be an Exception
class or instance.
- `force`: If True, will force the exception to be raised, even if
:param the_exception: The exception to raise. May be an Exception class or instance.
:param force: If True, will force the exception to be raised, even if
the coroutine is already scheduled. Defaults to True.
- `cancel_start`: If True, will cancel the coroutine if it has not
:param cancel_start: If True, will cancel the coroutine if it has not
started, yet. If False, and the couroutine has not started, then
it will rise `NotStartedError`. Defaults to False.
it will rise :exc:`NotStartedError`. Defaults to False.
:Exceptions:
- `DeadCoroutine`: The coroutine is dead (it has already exited).
- `ScheduleError`: The coroutine is already scheduled to run (and
`force` was set to False).
- `ScheduleError`: Attempted to raise an exception on the currently
:raises DeadCoroutine: The coroutine is dead (it has already exited).
:raises ScheduleError: The coroutine is already scheduled to run (and
``force`` was set to False).
:raises ScheduleError: Attempted to raise an exception on the currently
running coro.
- `NotStartedError`: The coroutine has not started, yet.
:raises NotStartedError: The coroutine has not started, yet.
"""
IF CORO_DEBUG:
# Minstack coro used to take an "exception value" as the second
......@@ -645,8 +636,7 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
def set_name (self, name):
"""Set the name of this coroutine thread.
:Parameters:
- `name`: The name of the thread.
:param name: The name of the thread.
"""
self.name = name
return self
......@@ -660,8 +650,7 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
If no name has been specified, then a name is generated.
:Return:
Returns the coroutine name.
:returns: The coroutine name.
"""
return self.name
......@@ -679,8 +668,7 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
When a coroutine is created, it defaults to 4.
:Parameters:
- `maximum`: The maximum number of selfish acts.
:param maximum: The maximum number of selfish acts.
"""
if maximum > 32768:
raise ValueError('Value too large.')
......@@ -719,8 +707,7 @@ def get_live_coros():
Note that this includes coroutines that have not started or have exited,
but not deallocated, yet.
:Return:
Returns the number of live coroutine objects.
:returns: The number of live coroutine objects.
"""
global live_coros
return live_coros
......@@ -757,11 +744,9 @@ def set_exception_notifier (new_func):
due to an exception. The default exception notifier simply prints the name
of the coroutine and a traceback of where the exception was raised.
:Parameters:
- `new_func`: The exception notifier to call. It takes no arguments.
:param new_func: The exception notifier to call. It takes no arguments.
:Return:
Returns the old exception notifier.
:returns: The old exception notifier.
"""
global exception_notifier
old_func = exception_notifier
......@@ -867,12 +852,11 @@ class SimultaneousError (Exception):
"""Two threads attempted a conflicting blocking operation (e.g., read() on
the same descriptor).
:IVariables:
- `co`: The coroutine that is trying to block on an event.
- `other`: The coroutine or function that is already waiting on the
:ivar co: The coroutine that is trying to block on an event.
:ivar other: The coroutine or function that is already waiting on the
event.
- `event`: The event that it is trying to block on. For kqueue, this
is normally a `kevent_key` object.
:ivar event: The event that it is trying to block on. For kqueue, this
is normally a ``kevent_key`` object.
"""
def __init__(self, co, other, object event):
......@@ -1000,11 +984,9 @@ cdef public class sched [ object sched_object, type sched_type ]:
cdef _unschedule (self, coro co):
"""Unschedule this coroutine.
:Parameters:
- `co`: The coroutine to unschedule.
:param co: The coroutine to unschedule.
:Return:
Returns True if it was successfully unscheduled, False if not.
:returns: True if it was successfully unscheduled, False if not.
"""
cdef int i
for i from 0 <= i < len(self.pending):
......@@ -1038,15 +1020,12 @@ cdef public class sched [ object sched_object, type sched_type ]:
The default latency warning threshold is 0.2 seconds. This will allow
you to change the threshold by multiplying the 0.2 value.
:Parameters:
- `factor`: The latency threshold multiplier. May be a number from
:param factor: The latency threshold multiplier. May be a number from
0 to 300. A value of 0 disables latency warnings.
:Return:
Returns the old multipler factor.
:returns: The old multipler factor.
:Exceptions:
- `ValueError`: The factor is too small or too large.
:raises ValueError: The factor is too small or too large.
"""
if factor < 0 or factor > 300:
raise ValueError('Latency factor must be a number from 0 to 300.')
......@@ -1075,25 +1054,22 @@ cdef public class sched [ object sched_object, type sched_type ]:
Nested timeouts will be handled correctly. If an outer timeout fires
first, then only the outer ``except TimeoutError`` exception handler
will catch it. An exception handlers on the inside will be skipped
becaue the actual exception is the `Interrupted` exception until it
becaue the actual exception is the :exc:`Interrupted` exception until it
gets to the original ``with_timeout`` frame.
Nested timeouts that are set to fire at the exact same time are not
defined which one will fire first.
Care must be taken to *never* catch the `Interrupted` exception within
Care must be taken to *never* catch the :exc:`Interrupted` exception within
code that is wrapped with a timeout.
:Parameters:
- `delta`: The number of seconds to wait before raising a timeout.
:param delta: The number of seconds to wait before raising a timeout.
Should be >= 0. Negative value will be treated as 0.
- `function`: The function to call.
:param function: The function to call.
:Return:
Returns the return value of the function.
:returns: The return value of the function.
:Exceptions:
- `TimeoutError`: The function did not return within the specified
:raises TimeoutError: The function did not return within the specified
timeout.
"""
cdef timebomb tb
......@@ -1127,8 +1103,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
cdef sleep (self, uint64_t when):
"""Sleep until a specific point in time.
:Parameters:
- `when`: The TSC value when you want the coroutine to wake up.
:param when: The TSC value when you want the coroutine to wake up.
"""
cdef event e
IF CORO_DEBUG:
......@@ -1150,8 +1125,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
Your thread may continue running (with the interrupt rescheduled to try
again later), or it may be interrupted.
:Parameters:
- `delta`: The number of seconds to sleep.
:param delta: The number of seconds to sleep.
"""
cdef uint64_t when
# Two lines to avoid Pyrex Python conversion.
......@@ -1160,7 +1134,10 @@ cdef public class sched [ object sched_object, type sched_type ]:
self.sleep (when)
def sleep_absolute (self, uint64_t when):
"""This is an alias for the `sleep` method."""
"""Sleep until a specific point in time.
:param when: The TSC value when you want the coroutine to wake up.
"""
self.sleep (when)
cdef schedule_ready_events (self, uint64_t now):
......@@ -1231,8 +1208,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
def event_loop (self, timeout=30):
"""Start the event loop.
:Parameters:
- `timeout`: The amount of time to wait for kevent to return
:param timeout: The amount of time to wait for kevent to return
events. You should probably *not* set this value. Defaults to 30
seconds.
"""
......@@ -1301,11 +1277,10 @@ def yield_slice():
def schedule (coro co, value=None):
"""Schedule a coroutine to run.
See `coro.schedule` for more detail.
See :meth:`coro.schedule` for more detail.
:Parameters:
- `co`: The coroutine to schedule.
- `value`: The value to resume the coroutine with. Defaults to None.
:param co: The coroutine to schedule.
:param value: The value to resume the coroutine with. Defaults to None.
"""
return co._schedule (value)
......@@ -1355,8 +1330,7 @@ def print_stderr (s):
This will print the thread id, followed by a timestamp, followed by the
string. If the string does not end with a newline, one will be added.
:Parameters:
- `s`: A string to print.
:param s: A string to print.
"""
try:
current_thread = current()
......@@ -1394,11 +1368,9 @@ def spawn (fun, *args, **kwargs):
Additional arguments and keyword arguments will be passed to the given function.
:Parameters:
- `fun`: The function to call when the coroutine starts.
:param fun: The function to call when the coroutine starts.
:Return:
Returns the new coroutine object.
:returns: The new coroutine object.
"""
return _spawn (fun, args, kwargs)
......@@ -1410,11 +1382,9 @@ def new (fun, *args, **kwargs):
This will not start the coroutine. Call the ``start`` method on the
coroutine to schedule it to run.
:Parameters:
- `fun`: The function to call when the coroutine starts.
:param fun: The function to call when the coroutine starts.
:Return:
Returns the new coroutine object.
:returns: The new coroutine object.
"""
id = get_coro_id()
co = coro (fun, args, kwargs, id)
......@@ -1427,8 +1397,7 @@ def set_exit(exit_code=0):
Note that if any other coroutines are scheduled to run, they will be given
a chance to run before exiting.
:Parameters:
- `exit_code`: The exit code of the process. Defaults to 0.
:param exit_code: The exit code of the process. Defaults to 0.
"""
global _exit
global _exit_code
......@@ -1441,8 +1410,7 @@ def set_print_exit_string(val):
By default, the string will be printed.
:Parameters:
- `val`: Whether or not "Exiting" should be printed when the event loop
:param val: Whether or not "Exiting" should be printed when the event loop
exits.
"""
global _print_exit_string
......
......@@ -67,7 +67,7 @@ cdef extern from "sys/epoll.h":
int EPOLLHUP, EPOLLPRI, EPOLLERR, EPOLLET, EPOLLONESHOT, EPOLLRDHUP
int EPOLLIN, EPOLLOUT
cdef struct fake_epoll_event:
cdef struct shrapnel_epoll_event:
uint32_t events
epoll_data_t data
int op
......@@ -75,6 +75,7 @@ cdef struct fake_epoll_event:
int err
cdef int SECS_TO_MILLISECS = 1000
cdef double NSECS_TO_MILLISECS = 1000000.0
class EV:
......@@ -105,31 +106,6 @@ cdef enum:
EVENT_STATUS_FIRED
EVENT_STATUS_ABORTED
# - ONESHOT: Used to indicate that this target should be removed from the
# event map after it fires.
# - CLOSED: For file-descriptor events, this indicates that the file
# descriptor has been closed, and that the event key has been
# removed from the event map.
cdef enum:
TARGET_FLAG_ONESHOT = 1
TARGET_CLOSED = 2
cdef class event_target:
cdef public int status
cdef public int index
cdef public object target
cdef public int flags
def __cinit__ (self, target, int index):
self.status = EVENT_STATUS_NEW
self.index = index
self.target = target
self.flags = 0
def __repr__ (self):
return '<event_target status=%r index=%r target=%r flags=%r>' % (self.status,
self.index, self.target, self.flags)
cdef class py_event:
"""Representation of a epoll event.
......@@ -146,7 +122,7 @@ cdef class py_event:
# cinit cannot take a C struct.
# It would be nice to support fast-constructor semantics in Cython.
cdef __fast_init__ (self, fake_epoll_event *e):
cdef __fast_init__ (self, shrapnel_epoll_event *e):
self.events = e.events
self.data.fd = e.data.fd
self.flags = e.flags
......@@ -196,20 +172,10 @@ cdef class event_key:
cdef public class queue_poller [ object queue_poller_object, type queue_poller_type ]:
cdef fake_epoll_event * change_list
cdef int change_list_index
cdef int ep_fd
cdef object event_map
def __cinit__ (self):
# XXX EVENT_SCALE should be a parameter.
self.change_list = <fake_epoll_event *>PyMem_Malloc (sizeof (fake_epoll_event) * EVENT_SCALE)
def __dealloc__ (self):
PyMem_Free (self.change_list)
def __init__ (self):
self.change_list_index = 0
self.ep_fd = -1
self.event_map = {}
......@@ -224,28 +190,60 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
self.ep_fd = -1
cdef object set_wait_for (self, event_key ek):
cdef fake_epoll_event *e
cdef coro me
cdef event_target et
if self.change_list_index < EVENT_SCALE:
if PyDict_Contains (self.event_map, ek):
# Should be impossible to have KeyError due to previous line.
et = self.event_map[ek]
raise SimultaneousError (the_scheduler._current, et.target, ek)
else:
me = the_scheduler._current
target = event_target (me, self.change_list_index)
target.flags = TARGET_FLAG_ONESHOT
self.event_map[ek] = target
e = &(self.change_list[self.change_list_index])
e.data.fd = ek.fd
e.op = EPOLL_CTL_MOD
e.events = ek.events
e.flags = EPOLLONESHOT
self.change_list_index = self.change_list_index + 1
return target
cdef unsigned flag = 0
if PyDict_Contains (self.event_map, ek):
# Should be impossible to have KeyError due to previous line.
et = self.event_map[ek]
raise SimultaneousError (the_scheduler._current, et.target, ek)
else:
raise SystemError, "too many events in change_list"
ek1 = event_key (EPOLLOUT, ek.fd)
ek2 = event_key (EPOLLIN, ek.fd)
if ( (PyDict_Contains (self.event_map, ek2) and ek.events == EPOLLOUT) or
(PyDict_Contains (self.event_map, ek1) and ek.events == EPOLLIN)):
flags = EPOLLOUT | EPOLLIN | EPOLLET
else:
flags = EPOLLET
me = the_scheduler._current
target = me
self.event_map[ek] = target
self._register_event(ek, flags)
return target
def set_handler (self, object event, object handler, int flags=0, unsigned int fflags=0):
return
cdef notify_of_close (self, int fd):
return
cdef _register_event(self, event_key ek, unsigned int flags):
cdef int r
cdef epoll_event org_e
org_e.data.fd = ek.fd
org_e.events = ek.events | flags
r = epoll_ctl (
self.ep_fd,
EPOLL_CTL_MOD,
org_e.data.fd,
&org_e
)
# if fd doesn't exist in epoll, add it
if r == -1 and (libc.errno == libc.ENOENT):
r = epoll_ctl (
self.ep_fd,
EPOLL_CTL_ADD,
org_e.data.fd,
&org_e
)
if r == -1 and (libc.errno != libc.EEXIST):
raise_oserror()
cdef _wait_for_with_eof (self, int fd, int events):
cdef py_event event
......@@ -261,44 +259,11 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
cdef _wait_for_write (self, int fd):
return self._wait_for_with_eof(fd, EPOLLOUT)
def wait_for_read (self, int fd):
return self._wait_for_with_eof(fd, EPOLLIN)
def wait_for_write (self, int fd):
return self._wait_for_with_eof(fd, EPOLLOUT)
cdef py_event _wait_for (self, int fd, int events):
cdef event_target et
cdef fake_epoll_event *e
cdef event_key ek
ek = event_key (events, fd)
et = self.set_wait_for (ek)
try:
return _YIELD()
finally:
if et.status == EVENT_STATUS_NEW:
# still in the change list
e = &self.change_list[et.index]
# event() will ignore this entry
e.events = 0
e.data.fd = 0
et.status = EVENT_STATUS_ABORTED
if not et.flags & TARGET_CLOSED:
# remove from event map
del self.event_map[ek]
#W ('wait_for() cleanup: (%d, %d) STATUS_NEW\n' % (ident, filter))
elif et.status == EVENT_STATUS_SUBMITTED:
# event submitted, delete it.
et.status = EVENT_STATUS_ABORTED
if not et.flags & TARGET_CLOSED:
self.delete_event (fd, events)
# remove from event map
del self.event_map[ek]
#W ('wait_for() cleanup: (%d, %d) STATUS_SUBMITTED\n' % (ident, filter))
elif et.status == EVENT_STATUS_FIRED:
# event already fired! do nothing.
#W ('wait_for() cleanup: (%d, %d) STATUS_FIRED\n' % (ident, filter))
pass
return _YIELD()
def wait_for (self, int fd, int events):
"""Wait for an event.
......@@ -325,184 +290,53 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
#if r < 0:
# raise_oserror()
def set_handler (self, object event, object handler, int flags=EPOLLONESHOT, int op=EPOLL_CTL_MOD):
"""Add a event handler.
This is a low-level interface to register a event handler.
:Parameters:
- `event`: A tuple of ``(ident, filter)`` of the event to handle.
- `handler`: The handler to use, a callable object which will be
called with one argument, a `py_event` object.
- `flags`: Kevent flags to use. Defaults to ``EV_ADD|EV_ONESHOT``.
:Exceptions:
- `SimultaneousError`: There is already a handler set for this
event.
"""
cdef int events
cdef int fd
cdef fake_epoll_event * e
cdef event_key ek
assert callable(handler)
fd = PySequence_GetItem(event, 0)
events = PySequence_GetItem(event, 1)
#events |= flags
ek = event_key (events, fd)
if PyDict_Contains (self.event_map, ek):
# Should be impossible to have KeyError due to previous line.
et = self.event_map[ek]
raise SimultaneousError (the_scheduler._current, et.target, ek)
else:
if self.change_list_index < EVENT_SCALE:
e = &(self.change_list[self.change_list_index])
e.data.fd = fd
e.events = events
e.op = op
e.flags = flags
self.change_list_index = self.change_list_index + 1
et = event_target (handler, self.change_list_index)
self.event_map[ek] = et
else:
raise SystemError, "too many events in change_list"
cdef set_event_target (self, object event, event_target et):
cdef int filter
cdef int fd
cdef event_key ek
fd = PySequence_GetItem(event, 0)
filter = PySequence_GetItem(event, 1)
ek = event_key (filter, fd)
self.event_map[ek] = et
cdef notify_of_close (self, int fd):
cdef event_target et
cdef coro co
cdef event_key ek
cdef epoll_event e
ek = event_key (EPOLLIN, fd)
if PyDict_Contains(self.event_map, ek):
et = self.event_map[ek]
et.flags = et.flags | TARGET_CLOSED
del self.event_map[ek]
W ('(notify_of_close (%d) [read])\n' % (fd,))
co = et.target
try:
co.__interrupt (ClosedError(the_scheduler._current))
except ScheduleError:
W ('notify_of_close (%d) [read]: unable to interrupt thread: %r\n' % (fd, co))
ek = event_key (EPOLLOUT, fd)
if PyDict_Contains(self.event_map, ek):
et = self.event_map[ek]
et.flags = et.flags | TARGET_CLOSED
del self.event_map[ek]
W ('(notify_of_close (%d) [write])\n' % (fd,))
co = et.target
try:
co.__interrupt (ClosedError(the_scheduler._current))
except ScheduleError:
W ('notify_of_close (%d) [write]: unable to interrupt thread: %r\n' % (fd, co))
def poll (self, timeout=(30,0), int nevents=2000):
cdef timespec ts
cdef int r, i
cdef epoll_event * events, ee
cdef fake_epoll_event *e, new_e
cdef epoll_event org_e
cdef epoll_event * events
cdef shrapnel_epoll_event new_e
cdef coro co
cdef event_target et
cdef event_key ek
cdef py_event _py_event
ts.tv_sec, ts.tv_nsec = timeout
# poll() is only called from <main>, so alloca() is OK.
events = <epoll_event *> libc.alloca (sizeof (epoll_event) * nevents)
for i from 0 <= i < self.change_list_index:
e = &(self.change_list[i])
org_e.events = e.events | e.flags
org_e.data.fd = e.data.fd
if e.events != 0:
r = epoll_ctl (
self.ep_fd,
e.op,
org_e.data.fd,
&org_e
)
if r == -1 and (libc.errno == libc.ENOENT):
r = epoll_ctl (
self.ep_fd,
EPOLL_CTL_ADD,
org_e.data.fd,
&org_e
)
#print 'epoll_ctl event >>>>>> %s for %s' % (org_e.events, org_e.data.fd)
if r == -1 and (libc.errno != libc.EEXIST):
raise_oserror()
r = epoll_wait (self.ep_fd, events, nevents, timeout[0] * SECS_TO_MILLISECS)
if the_scheduler.profiling:
the_profiler.charge_wait()
if r == -1:
raise_oserror()
else:
for i from 0 <= i < self.change_list_index:
e = &(self.change_list[i])
# We mark events with a filter of 0 when we want them ignored
# (see EVENT_STATUS_NEW in _wait_for).
if e.events != 0:
ek = event_key (e.events, e.data.fd)
try:
et = self.event_map[ek]
except KeyError:
# This should never happen.
P('Missing event from dictionary for events=%r fd=%r (this should never happen).' % (
e.events, e.data.fd))
else:
et.status = EVENT_STATUS_SUBMITTED
r = epoll_wait (self.ep_fd, events, nevents, timeout[0] * SECS_TO_MILLISECS + (timeout[1] / NSECS_TO_MILLISECS))
#W ('{%d}' % (r,))
#P('mapsize = %i' % len(self.event_map))
for i from 0 <= i < r:
new_e.data.fd = events[i].data.fd
new_e.events = events[i].events
new_e.err = 0
#print 'epoll_wait event >>>>>> %s for %s' % (new_e.events, new_e.data.fd)
tmp = (EPOLLIN, EPOLLOUT)
if events[i].events & tmp[0] and events[i].events & tmp[1]:
pass
else:
tmp = [0]
for j from 0 <= j < len(tmp):
if len(tmp) == 2:
if events[i].events & tmp[j]:
new_e.events = events[i].events & ~(tmp[j])
self.change_list_index = 0
#W ('{%d}' % (r,))
#P('mapsize = %i' % len(self.event_map))
for i from 0 <= i < r:
new_e.data.fd = events[i].data.fd
new_e.events = events[i].events
new_e.err = 0
#print 'epoll_wait event >>>>>> %s for %s' % (new_e.events, new_e.data.fd)
if new_e.events & EPOLLERR or new_e.events & EPOLLHUP:
#print 'epoll_wait event >>>>>> %s for %s' % (new_e.events, new_e.data.fd)
new_e.events = new_e.events & ~(EPOLLHUP)
new_e.events = new_e.events & ~(EPOLLERR)
new_e.err = 104
# epoll doesn't specify the last event we had registered so make a guess
if new_e.events == 0:
new_e.events = EPOLLIN
try:
et = self.event_map[event_key(EPOLLIN, new_e.data.fd)]
except KeyError:
new_e.events = EPOLLOUT
_py_event = py_event()
_py_event.__fast_init__(&new_e)
ek = event_key (new_e.events, new_e.data.fd)
try:
et = self.event_map[ek]
co = self.event_map[ek]
except KeyError:
W ('un-handled event: fd=%s events=%s\n' % (new_e.data.fd, new_e.events))
pass
#W ('un-handled event: fd=%s events=%s\n' % (new_e.data.fd, new_e.events))
else:
assert et.status != EVENT_STATUS_ABORTED
try:
et.status = EVENT_STATUS_FIRED
if isinstance (et.target, coro):
co = et.target
co._schedule (_py_event)
else:
# assumes kt.target is a callable object
_spawn(et.target, (_py_event,), {})
finally:
if et.flags & TARGET_FLAG_ONESHOT:
del self.event_map[ek]
if isinstance (co, coro):
co._schedule (_py_event)
else:
# assumes kt.target is a callable object
_spawn(co, (_py_event,), {})
del self.event_map[ek]
......@@ -25,17 +25,18 @@
Introduction
============
This profiler is coro-aware. It produces output to a binary file on disk. You
then use the `coro.print_profile` module to convert it to an HTML file.
then use the :mod:`coro.print_profile` module to convert it to an HTML file.
Using The Profiler
==================
There are two ways to run the profiler. One is to use the `go` function where
you give it a python function to run. Profiling will start and call the
function, and then the profiler will automatically stop when the function
exits.
There are two ways to run the profiler. One is to use the
:func:`coro.profiler.go` function where you give it a python function to run.
Profiling will start and call the function, and then the profiler will
automatically stop when the function exits.
The other method is to call `start` to start the profiler and `stop` when you
want to stop profiling. This can be conveniently done from the backdoor.
The other method is to call :func:`coro.profiler.start` to start the profiler
and :func:`coro.profiler.stop` when you want to stop profiling. This can be
conveniently done from the backdoor.
Rendering Output
================
......@@ -49,12 +50,13 @@ Then view the profile output in your web browser.
Profiler Types
==============
The profiler supports different ways of gathering statistics. This is done by
specifying the "bench" object to use (see `go` and `start`). They default to
the "rusage" method of gathering statistics about every function call (see the
getrusage man page for more detail). If you want a higher performance profile,
you can use the `coro.bench` object instead which simply records TSC values for
every function call. If you want to define your own method of gathering
statistics, subclass `coro.bench` and implement your own techniques.
specifying the "bench" object to use (see :func:`go` and :func:`start`). They
default to the "rusage" method of gathering statistics about every function
call (see the getrusage man page for more detail). If you want a higher
performance profile, you can use the :class:`coro.bench` object instead which
simply records TSC values for every function call. If you want to define your
own method of gathering statistics, subclass :class:`coro.bench` and implement
your own techniques.
"""
......@@ -98,14 +100,12 @@ def go (fun, *args, **kwargs):
This will display the results to stdout after the function is finished.
:Parameters:
- `fun`: The function to call.
:param fun: The function to call.
:Keywords:
- `profile_filename`: The name of the file to save the profile data.
:keyword profile_filename: The name of the file to save the profile data.
Defaults to '/tmp/coro_profile.bin'.
- `profile_bench`: The bench object type to use. Defaults to
`coro.rusage_bench`.
:keyword profile_bench: The bench object type to use. Defaults to
:class:`coro.rusage_bench`.
"""
if kwargs.has_key('profile_filename'):
profile_filename = kwargs['profile_filename']
......
......@@ -207,20 +207,19 @@ cdef _readv_compute(size_list, buffer_tuple, int n, int received, iovec * iov,
into consideration what has been received so far will create an iovec array
for the readv function.
:Parameters:
- `size_list`: A Python object that should be a sequence of integers
:param size_list: A Python object that should be a sequence of integers
that indicate which blocks are being requested.
- `buffer_tuple`: A tuple of Python strings (should be already
:param buffer_tuple: A tuple of Python strings (should be already
allocated and should be the same length as size_list).
- `n`: The length of size_list and buffer_tuple.
- `received`: The number of bytes received so far.
- `iov`: The ``iovec`` array. This should have `n` elements.
- `left`: OUTPUT: The number of bytes left to read.
- `iov_pos`: OUTPUT: The number of elements added to ``iov``.
- `complete_index`: The index of the last element in the buffer tuple
:param n: The length of size_list and buffer_tuple.
:param received: The number of bytes received so far.
:param iov: The ``iovec`` array. This should have ``n`` elements.
:param left: OUTPUT: The number of bytes left to read.
:param iov_pos: OUTPUT: The number of elements added to ``iov``.
:param complete_index: The index of the last element in the buffer tuple
that has been *completely* received. -1 if nothing has been
completely received.
- `partial_index`: The index of the element in the buffer tuple that
:param partial_index: The index of the element in the buffer tuple that
has partially received some data. -1 if none of the elements have
partial data.
"""
......@@ -254,7 +253,8 @@ cdef _readv_compute(size_list, buffer_tuple, int n, int received, iovec * iov,
cdef public class sock [ object sock_object, type sock_type ]:
"""Coro socket object.
"""
Coro socket object.
This is typically used for network sockets, but can also be used for
coro-safe IO on any file descriptor that supports kqueue non-blocking
......@@ -262,20 +262,19 @@ cdef public class sock [ object sock_object, type sock_type ]:
The constructor takes the following parameters:
- `domain`: The socket domain family, defaults to AF_INET (see `AF`).
- `stype`: The socket type, defaults to SOCK_STREAM (see `SOCK`).
- `protocol`: The socket protocol (normally not used, defaults to 0).
- `fd`: The file descriptor to use. Creates a new socket file
:param domain: The socket domain family, defaults to AF_INET (see :class:`AF`).
:param stype: The socket type, defaults to SOCK_STREAM (see :class:`SOCK`).
:param protocol: The socket protocol (normally not used, defaults to 0).
:param fd: The file descriptor to use. Creates a new socket file
descriptor if not specified.
:IVariables:
- `fd`: The file descriptor number. Set to -1 when the socket is
:ivar fd: The file descriptor number. Set to -1 when the socket is
closed.
- `orig_fd`: The original file descriptor number. This is left for
:ivar orig_fd: The original file descriptor number. This is left for
debugging purposes to determine which file descriptor was in use
before the socket was closed.
- `domain`: The socket domain (AF_INET, AF_UNIX, AF_UNSPEC).
- `stype`: The socket type (SOCK_STREAM, SOCK_DGRAM)
:ivar domain: The socket domain (AF_INET, AF_UNIX, AF_UNSPEC).
:ivar stype: The socket type (SOCK_STREAM, SOCK_DGRAM)
"""
cdef public int fd, orig_fd, domain, stype
......@@ -323,8 +322,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
def get_fileno (self):
"""Get the current file descriptor.
:Return:
Returns the current file descriptor number. Returns -1 if the
:returns: The current file descriptor number. Returns -1 if the
socket is closed.
"""
warnings.warn('socket.get_fileno() is deprecated, use fileno() instead.', DeprecationWarning)
......@@ -333,8 +331,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
def fileno (self):
"""Get the current file descriptor.
:Return:
Returns the current file descriptor number. Returns -1 if the
:returns: The current file descriptor number. Returns -1 if the
socket is closed.
"""
return self.fd
......@@ -362,19 +359,16 @@ cdef public class sock [ object sock_object, type sock_type ]:
def getsockopt (self, int level, int optname, socklen_t buflen=0):
"""Get a socket option.
:Parameters:
- `level`: The socket level to get (see `SOL`).
- `optname`: The socket option to get (see `SO`).
- `buflen`: The size of the buffer needed to retrieve the value. If
:param level: The socket level to get (see :class:`SOL`).
:param optname: The socket option to get (see :class:`SO`).
:param buflen: The size of the buffer needed to retrieve the value. If
not specified, it assumes the result is an integer and will
return an integer. Otherwise, it will create a new string with
the result, and you may use the struct module to decode it.
:Return:
Returns an integer if `buflen` is zero, otherwise returns a string.
:returns: An integer if ``buflen`` is zero, otherwise returns a string.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef int flag, r
cdef socklen_t flagsize
......@@ -397,13 +391,11 @@ cdef public class sock [ object sock_object, type sock_type ]:
def setsockopt (self, int level, int optname, value):
"""Set a socket option.
:Parameters:
- `level`: The socket level to set (see `SOL`).
- `optname`: The socket option to set (see `SO`).
- `value`: The value to set. May be an integer, or a struct-packed string.
:param level: The socket level to set (see :class:`SOL`).
:param optname: The socket option to set (see :class:`SO`).
:param value: The value to set. May be an integer, or a struct-packed string.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef int flag, r
cdef socklen_t optlen
......@@ -421,8 +413,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
It is safe to call this if the socket is already closed.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef int r
if self.fd != -1:
......@@ -442,15 +433,12 @@ cdef public class sock [ object sock_object, type sock_type ]:
This will repeatedly call write to ensure all data has been sent. This
will raise OSError if it is unable to send all data.
:Parameters:
- `data`: The data to send.
:param data: The data to send.
:Return:
Returns the number of bytes sent, which should always be the length
of `data`.
:returns: The number of bytes sent, which should always be the length
of ``data``.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef char * buffer
cdef int r, left, sent
......@@ -482,35 +470,32 @@ cdef public class sock [ object sock_object, type sock_type ]:
def sendall(self, data):
"""Send all data.
This is an alias for the `send` method.
This is an alias for the :meth:`send` method.
"""
return self.send(data)
def write (self, data):
"""Write data.
This is an alias for the `send` method.
This is an alias for the :meth:`send` method.
"""
return self.send(data)
def sendto (self, data, address, int flags=0):
"""Send data to a specific address.
:Parameters:
- `data`: The data to send.
- `address`: The address to send to. For unix-domain sockets, this
:param data: The data to send.
:param address: The address to send to. For unix-domain sockets, this
is a string. For IP sockets, this is a tuple ``(IP, port)``
where IP is a string.
Port is always an integer.
- `flags`: sendto flags to use (defaults to 0) (see sendto(2)
:param flags: sendto flags to use (defaults to 0) (see sendto(2)
manpage).
:Return:
Returns the number of bytes sent which may be less than the send
:returns: The number of bytes sent which may be less than the send
requested.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef char * buffer
cdef sockaddr_storage sa
......@@ -536,23 +521,19 @@ cdef public class sock [ object sock_object, type sock_type ]:
else:
return r
# - `flags`: recv flags to use (defaults to 0) (see recv(2) manpage).
def recv (self, int buffer_size):
"""Receive data.
This may return less data than you request if the socket buffer is not
large enough. Use `recv_exact` to ensure you receive exactly the
large enough. Use :meth:`recv_exact` to ensure you receive exactly the
amount requested.
:Parameters:
- `buffer_size`: The number of bytes to receive.
:param buffer_size: The number of bytes to receive.
:Return:
Returns a string of data. Returns the empty string when the end of
:returns: A string of data. Returns the empty string when the end of
the stream is reached.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef buffer
cdef int r, new_buffer_size
......@@ -584,7 +565,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
def read (self, buffer_size):
"""Read data.
This is an alias for the `recv` method.
This is an alias for the :meth:`recv` method.
"""
return self.recv(buffer_size)
......@@ -594,20 +575,17 @@ cdef public class sock [ object sock_object, type sock_type ]:
This may return less data than you request if the socket buffer is not
large enough.
:Parameters:
- `buffer_size`: The number of bytes to receive.
- `flags`: Socket flags to set (defaults to 0) (see recvfrom(2)
:param buffer_size: The number of bytes to receive.
:param flags: Socket flags to set (defaults to 0) (see recvfrom(2)
manpage).
:Return:
Returns a tuple ``(data, address)`` where data is a string and
:returns: A tuple ``(data, address)`` where data is a string and
address is the address of the remote side (string for unix-domain,
tuple of ``(IP, port)`` for IP where IP is a string and port is an
integer). Data is the empty string when the end of the stream is
reached.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef buffer
cdef sockaddr_storage sa
......@@ -654,15 +632,12 @@ cdef public class sock [ object sock_object, type sock_type ]:
This will repeatedly call read until all data is received.
:Parameters:
- `bytes`: The number of bytes to receive.
:param bytes: The number of bytes to receive.
:Return:
Returns the data as a string.
:returns: The data as a string.
:Exceptions:
- `OSError`: OS-level error.
- `EOFError`: Not all data could be read. The first argument
:raises OSError: OS-level error.
:raises EOFError: Not all data could be read. The first argument
includes any partial data read as a string.
"""
cdef char * p, * p0
......@@ -696,16 +671,13 @@ cdef public class sock [ object sock_object, type sock_type ]:
of the stream is reached before all data is received, then the result
tuple will only contain the elements competely or partially received.
:Parameters:
- `size_list`: A sequence of integers that indicates the buffer
:param size_list: A sequence of integers that indicates the buffer
sizes to read.
:Return:
Returns a tuple of strings corresponding to the sizes requested in
`size_list`.
:returns: A tuple of strings corresponding to the sizes requested in
``size_list``.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef int n, i
cdef int iov_pos
......@@ -791,15 +763,12 @@ cdef public class sock [ object sock_object, type sock_type ]:
This will repeatedly call writev until all data is sent. If it is
unable to send all data, it will raise an OSError exception.
:Parameters:
- `data`: A sequence of strings to write.
:param data: A sequence of strings to write.
:Return:
Returns the number of bytes sent which should always be the sum of
:returns: The number of bytes sent which should always be the sum of
the lengths of all the strings in the data sequence.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef char * buffer
cdef int r, left, size, sent
......@@ -867,20 +836,17 @@ cdef public class sock [ object sock_object, type sock_type ]:
This is for the Python buffer interface. If you don't know what that
is, move along. This method is for Python socket compatibility.
:Parameters:
- `buffer`: A writeable Python buffer object. Must be a contiguous
:param buffer: A writeable Python buffer object. Must be a contiguous
segment.
- `nbytes`: Number of bytes to read. Must be less than or equal to
:param nbytes: Number of bytes to read. Must be less than or equal to
the size of the buffer. Defaults to 0 which means the size of
`buffer`.
- `flags`: Flags for the recv system call (see recv(2) manpage).
``buffer``.
:param flags: Flags for the recv system call (see recv(2) manpage).
Defaults to 0.
:Return:
Returns the number of bytes read.
:returns: The number of bytes read.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef void *cbuf
cdef Py_ssize_t cbuflen
......@@ -917,22 +883,19 @@ cdef public class sock [ object sock_object, type sock_type ]:
This is for the Python buffer interface. If you don't know what that
is, move along. This method is for Python socket compatibility.
:Parameters:
- `buffer`: A writeable Python buffer object. Must be a contiguous
:param buffer: A writeable Python buffer object. Must be a contiguous
segment.
- `nbytes`: Number of bytes to read. Must be less than or equal to
:param nbytes: Number of bytes to read. Must be less than or equal to
the size of the buffer. Defaults to 0 which means the size of
`buffer`.
- `flags`: Flags for the recv system call (see recvfrom(2) manpage).
``buffer``.
:param flags: Flags for the recv system call (see recvfrom(2) manpage).
Defaults to 0.
:Return:
Returns a tuple ``(nbytes, address)`` where ``bytes`` is the number
:returns: A tuple ``(nbytes, address)`` where ``bytes`` is the number
of bytes read and ``address`` then it is the address of the remote
side.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef sockaddr_storage sa
cdef void *cbuf
......@@ -972,18 +935,16 @@ cdef public class sock [ object sock_object, type sock_type ]:
cdef parse_address (self, object address, sockaddr_storage * sa, socklen_t * addr_len, bint resolve=False):
"""Parse a Python socket address and set the C structure values.
:Parameters:
- `address`: The Python address to parse. For IP, it should be a
:param address: The Python address to parse. For IP, it should be a
``(IP, port)`` tuple where the IP is a string. Use the empty
string to indicate INADDR_ANY.
The port should always be a host-byte-order integer.
For Unix-domain sockets, the address should be a string.
- `sa`: OUTPUT: The sockaddr_storage C-structure to store the
:param sa: OUTPUT: The sockaddr_storage C-structure to store the
result in.
- `addr_len`: OUTPUT: The size of the structure placed into `sa`.
:param addr_len: OUTPUT: The size of the structure placed into ``sa``.
:Exceptions:
- `ValueError`: The value could not be parsed.
:raises ValueError: The value could not be parsed.
"""
cdef sockaddr_in * sin
cdef sockaddr_in6 *sin6
......@@ -1053,12 +1014,10 @@ cdef public class sock [ object sock_object, type sock_type ]:
cdef object unparse_address (self, sockaddr_storage *sa, socklen_t addr_len):
"""Unpack a C-socket address structure and generate a Python address object.
:Parameters:
- `sa`: The sockaddr_storage structure to unpack.
- `addr_len`: The length of the `sa` structure.
:param sa: The sockaddr_storage structure to unpack.
:param addr_len: The length of the ``sa`` structure.
:Return:
Returns a ``(IP, port)`` tuple for IP addresses where IP is a
:returns: A ``(IP, port)`` tuple for IP addresses where IP is a
string in canonical format for the given address family . Returns a
string for UNIX-domain sockets. Returns None for unknown socket
domains.
......@@ -1087,13 +1046,11 @@ cdef public class sock [ object sock_object, type sock_type ]:
This will block until there is data available to be read.
:Return:
Returns the amount "readable". For different sockets, this may be
:returns: The amount "readable". For different sockets, this may be
different values, see the EVFILT_READ section of the kevent manpage
for details.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
return self._wait_for_read()
......@@ -1102,11 +1059,9 @@ cdef public class sock [ object sock_object, type sock_type ]:
This will block until it is possible to write to the socket.
:Return:
Returns the number of bytes writeable on the socket.
:returns: The number of bytes writeable on the socket.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
return self._wait_for_write()
......@@ -1119,14 +1074,12 @@ cdef public class sock [ object sock_object, type sock_type ]:
cpdef connect_addr (self, address, bint resolve=False):
"""Connect the socket.
:Parameters:
- `address`: The address to connect to. For IP, it should be a
:param address: The address to connect to. For IP, it should be a
``(IP, port)`` tuple where the IP is a string.
The port should always be a host-byte-order integer. For
Unix-domain sockets, the address should be a string.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef sockaddr_storage sa
cdef socklen_t addr_len
......@@ -1152,15 +1105,13 @@ cdef public class sock [ object sock_object, type sock_type ]:
def bind (self, address):
"""Bind the socket.
:Parameters:
- `address`: The address to bind to. For IP, it should be a
:param address: The address to bind to. For IP, it should be a
``(IP, port)`` tuple where the IP is a string. Use the empty
string to indicate INADDR_ANY.
The port should always be a host-byte-order integer.
For Unix-domain sockets, the address should be a string.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef sockaddr_storage sa
cdef socklen_t addr_len
......@@ -1176,11 +1127,9 @@ cdef public class sock [ object sock_object, type sock_type ]:
def listen (self, backlog):
"""Set the socket to listen for connections.
:Parameters:
- `backlog`: The maximum size of the queue for pending connections.
:param backlog: The maximum size of the queue for pending connections.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef int r
r = listen (self.fd, backlog)
......@@ -1190,14 +1139,12 @@ cdef public class sock [ object sock_object, type sock_type ]:
def accept (self):
"""Accept a connection.
:Return:
Returns a tuple ``(socket, address)`` where ``socket`` is a socket
:returns: A tuple ``(socket, address)`` where ``socket`` is a socket
object and ``address`` is an ``(IP, port)`` tuple for IP
addresses or a string for UNIX-domain sockets. IP addresses are
returned as strings.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef sockaddr_storage sa
cdef socklen_t addr_len
......@@ -1227,20 +1174,17 @@ cdef public class sock [ object sock_object, type sock_type ]:
def accept_many (self, int max=0):
"""Accept multiple connections.
This will accept up to `max` connections for any connections available
This will accept up to ``max`` connections for any connections available
on the listen queue. This will block if there are no connections
waiting.
:Parameters:
- `max`: The maximum number of connections to accept. If not
:param max: The maximum number of connections to accept. If not
specified, defaults to infinity (accept all pending connections).
:Return:
Returns a list of ``(socket, address)`` tuples (see `accept` method
:returns: A list of ``(socket, address)`` tuples (see :meth:`accept` method
for information on return format).
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef sockaddr_storage sa
cdef socklen_t addr_len
......@@ -1280,11 +1224,9 @@ cdef public class sock [ object sock_object, type sock_type ]:
def shutdown (self, int how):
"""Shutdown the socket.
:Parameters:
- `how`: How to shut down the socket (see the shutdown(2) manpage).
:param how: How to shut down the socket (see the shutdown(2) manpage).
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef int r
r = shutdown (self.fd, how)
......@@ -1296,12 +1238,10 @@ cdef public class sock [ object sock_object, type sock_type ]:
def getpeername (self):
"""Get the remote-side address.
:Return:
Returns a ``(IP, port)`` tuple for IP addresses where IP is a
:returns: A ``(IP, port)`` tuple for IP addresses where IP is a
string. Returns a string for UNIX-domain sockets.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef sockaddr_storage sa
cdef socklen_t addr_len
......@@ -1318,8 +1258,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
def getsockname (self):
"""Get the local address of the socket.
:Return:
Returns a ``(IP, port)`` tuple for IP addresses where IP is a
:returns: A ``(IP, port)`` tuple for IP addresses where IP is a
string or an empty string for INADDR_ANY. Returns a
string for UNIX-domain sockets (empty string if not bound).
"""
......@@ -1340,17 +1279,15 @@ cdef public class sock [ object sock_object, type sock_type ]:
The mode and bufsize arguments are as for the built-in open() function.
The underlying socket is duplicated via `sock.dup` to emulate Python's
The underlying socket is duplicated via ``sock.dup`` to emulate Python's
reference counting behavior.
:Parameters:
- `mode`: The mode of the file, defaults to 'r'.
- `bufsize`: The buffer size (0 is no buffering, 1 is line
:param mode: The mode of the file, defaults to 'r'.
:param bufsize: The buffer size (0 is no buffering, 1 is line
buffering, greater than 1 is the explicit buffer size).
Defaults to -1 (does not change the default buffering).
:Return:
Returns a file-like object that wraps the socket.
:returns: A file-like object that wraps the socket.
"""
# Probably unwise to access an underscore private value from the
# socket module, but it should work OK for the foreseeable future.
......@@ -1366,8 +1303,7 @@ cdef public class sock [ object sock_object, type sock_type ]:
def dup(self):
"""Duplicate the socket object using the OS dup() call.
:Return:
Returns a new sock instance that holds the new file descriptor.
:returns: A new sock instance that holds the new file descriptor.
"""
cdef sock new_sock
cdef int new_fd
......@@ -1410,8 +1346,7 @@ def get_live_sockets():
"""Get the number of live socket objects. This includes socket objects
that are closed.
:Return:
Returns the number of socket objects.
:returns: The number of socket objects.
"""
global live_sockets
return live_sockets
......@@ -1476,7 +1411,7 @@ cdef class file_sock(sock):
The constructor takes one argument:
- ``fileobj``: A Python-like file object. Currently only needs to
:param fileobj: A Python-like file object. Currently only needs to
implement the ``fileno`` method.
When the object is deallocated, the file descriptor is closed.
......@@ -1499,7 +1434,7 @@ cdef class fd_sock(sock):
The constructor takes one argument:
- ``fd``: A file descriptor.
:param fd: A file descriptor.
When the object is deallocated, the file descriptor is closed.
"""
......@@ -1511,55 +1446,45 @@ cdef class fd_sock(sock):
def tcp_sock():
"""Create a streaming IPv4 socket.
:Return:
Returns a socket object.
:returns: A socket object.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
return sock (AF_INET, SOCK_STREAM)
def udp_sock():
"""Create a datagram IPv4 socket.
:Return:
Returns a socket object.
:returns: A socket object.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
return sock (AF_INET, SOCK_DGRAM)
def tcp6_sock():
"""Create a streaming IPv6 socket.
:Return:
Returns a socket object.
:returns: A socket object.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
return sock (AF_INET6, SOCK_STREAM)
def udp6_sock():
"""Create a datagram IPv6 socket.
:Return:
Returns a socket object.
:returns: A socket object.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
return sock (AF_INET6, SOCK_DGRAM)
def unix_sock():
"""Create a streaming unix-domain socket.
:Return:
Returns a socket object.
:returns: A socket object.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
return sock (AF_UNIX, SOCK_STREAM)
......@@ -1569,26 +1494,19 @@ def make_socket (int domain, int stype):
This is a backwards-compatibility wrapper around the sock object
constructor.
:Parameters:
- `domain`: The socket domain family (see `AF`).
- `stype`: The socket type (see `SOCK`).
:param domain: The socket domain family (see :class:`AF`).
:param stype: The socket type (see :class:`SOCK`).
:Return:
Returns a socket object.
:returns: A socket object.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
return sock (domain, stype)
def has_ipv6():
"""Whether or not this system can create an IPv6 socket.
:Return:
Returns True if this system can create an IPv6 socket, False otherwise
:Exceptions:
- None
:returns: True if this system can create an IPv6 socket, False otherwise
"""
cdef int s
......@@ -1603,16 +1521,13 @@ def has_ipv6():
def socketpair(int domain=AF_UNIX, int stype=SOCK_STREAM, int protocol=0):
"""Create an unnamed pair of connected sockets.
:Parameters:
- `domain`: The socket domain family (defaults to AF_UNIX).
- `stype`: The socket type (defaults to SOCK_STREAM).
- `protocol`: The socket protocol (normally not used, defaults to 0).
:param domain: The socket domain family (defaults to AF_UNIX).
:param stype: The socket type (defaults to SOCK_STREAM).
:param protocol: The socket protocol (normally not used, defaults to 0).
:Return:
Returns a tuple of 2 connected sockets.
:returns: A tuple of 2 connected sockets.
:Exceptions:
- `OSError`: OS-level error.
:raises OSError: OS-level error.
"""
cdef int sv[2]
cdef int rc
......
......@@ -49,22 +49,19 @@ class LockError (Exception):
# Semaphore
# ===========================================================================
"""A semaphore is a locking primitive that corresponds with a set of
resources. A semphore is essentially a counter. Whenever a resource is
aquired, the count is lowered. If the count goes below 0, then it blocks
until it goes above zero. Once you are done with a resource, you raise
the counter."""
cdef class semaphore:
"""Semaphore lock object.
"""
A semaphore is a locking primitive that corresponds with a set of
resources. A semphore is essentially a counter. Whenever a resource is
aquired, the count is lowered. If the count goes below 0, then it blocks
until it goes above zero. Once you are done with a resource, you raise
the counter.
The constructor takes one parameter, the value to start the semaphore with.
:param value: The value to start the semaphore with (an integer).
:IVariables:
- `avail`: The current value of the semaphore. Also available via
__int__.
- `_waiting`: A fifo of ``(value, co)`` tuples of coroutines waiting
:ivar avail: The current value of the semaphore. Also available via __int__.
:ivar _waiting: A fifo of ``(value, co)`` tuples of coroutines waiting
for the semaphore. ``value`` is the value being requested, and ``co``
is the coroutine object. (C only.)
"""
......@@ -87,8 +84,7 @@ cdef class semaphore:
if the requested number of resource elements are not available (if the
value would go negative).
:Parameters:
- `value`: The number of resource elements.
:param value: The number of resource elements.
"""
cdef coro me
me = the_scheduler._current
......@@ -116,8 +112,7 @@ cdef class semaphore:
def release(self, int value):
"""Release a number of resource elements.
:Parameters:
- `value`: The number of resource elements to release (add to the
:param value: The number of resource elements to release (add to the
sempahore).
"""
cdef coro ci
......@@ -174,21 +169,18 @@ cdef class semaphore:
cdef class inverted_semaphore:
"""Inverted semaphore.
"""
An inverted semaphore works very much like a regular semaphore, except
threads block _until_ the value reaches zero. For example, if you want a
thread to wait for 1 or more events to finish, you can have each event
raise the value (always nonblocking) and have your waiter thread call
block_till_zero.
The constructor takes one optional parameter, the value to start the
semaphore with. It defaults to 0.
:param value: The value to start the semaphore with. It defaults to 0.
:IVariables:
- `value`: The value of the inverted semaphore. Also available via
:ivar value: The value of the inverted semaphore. Also available via
__int__.
- `_waiting`: A fifo of coroutine objects waiting for the semaphore to
:ivar _waiting: A fifo of coroutine objects waiting for the semaphore to
reach zero. (C only).
"""
......@@ -207,8 +199,7 @@ cdef class inverted_semaphore:
This never blocks.
:Parameters:
- `value`: The number of resource elements to acquire (add to the
:param value: The number of resource elements to acquire (add to the
semaphore). Defaults to 1.
"""
self.value = self.value + value
......@@ -218,8 +209,7 @@ cdef class inverted_semaphore:
This never blocks. This may wake up waiting threads.
:Parameters:
- `value`: The number of resource elements to release (subtract
:param value: The number of resource elements to release (subtract
from the semaphore). Defaults to 1.
"""
cdef coro co
......@@ -258,16 +248,16 @@ cdef class inverted_semaphore:
cdef class mutex:
"""Mutual Exclusion lock object.
"""
Mutual Exclusion lock object.
A single thread may acquire the mutex multiple times, but it must release
the lock an equal number of times.
:IVariables:
- `_locked`: Count of how many locks on the mutex are currently held.
- `_owner`: The coroutine object that owns the lock (None if no owner).
:ivar _locked: Count of how many locks on the mutex are currently held.
:ivar _owner: The coroutine object that owns the lock (None if no owner).
(C only.)
- `_waiting`: A fifo of coroutine objects waiting for the lock.
:ivar _waiting: A fifo of coroutine objects waiting for the lock.
"""
cdef public int _locked
......@@ -290,8 +280,7 @@ cdef class mutex:
A coro thread may lock the mutex multiple times. It must call unlock
the same number of times to release it.
:Return:
Returns True if it blocked, False if the mutex was acquired
:returns: True if it blocked, False if the mutex was acquired
immediately.
"""
cdef coro me
......@@ -317,8 +306,7 @@ cdef class mutex:
def trylock(self):
"""Try to lock the mutex.
:Return:
Returns True if it is already locked by another coroutine thread.
:returns: True if it is already locked by another coroutine thread.
Returns False if the lock was successfully acquired.
"""
cdef coro me
......@@ -333,20 +321,17 @@ cdef class mutex:
def locked (self):
"""Determine if the mutex is currently locked.
:Return:
Returns True if the mutex is locked, otherwise False.
:returns: True if the mutex is locked, otherwise False.
"""
return (self._locked > 0)
def has_lock (self, thread=None):
"""Determine if a particular coroutine has the lock.
:Parameters:
- `thread`: The coroutine object to check if it owns the lock. If
:param thread: The coroutine object to check if it owns the lock. If
not specified, defaults to the current thread.
:Return:
Returns True if the specified thread has the lock, otherwise
:returns: True if the specified thread has the lock, otherwise
returns False.
"""
if thread is None:
......@@ -358,8 +343,7 @@ cdef class mutex:
The thread unlocking must be the thread that initially locked it.
:Return:
Returns True if another thread was waiting for the lock, otherwise
:returns: True if another thread was waiting for the lock, otherwise
it returns False.
"""
cdef coro me, co
......@@ -397,7 +381,8 @@ cdef class mutex:
cdef class rw_lock:
"""A many-reader single-writer lock.
"""
A many-reader single-writer lock.
This lock allows multiple "readers" to own the lock simultaneously. A
"writer" can only acquire a lock if there are no other "readers" or
......@@ -411,14 +396,13 @@ cdef class rw_lock:
way around (holding a read lock and trying to acquire a write lock will
cause a deadlock).
:IVariables:
- `_writer`: Count of the number of write locks. (C only.)
- `_writer_id`: Thread ID of the current write lock owner (0 if there
:ivar _writer: Count of the number of write locks. (C only.)
:ivar _writer_id: Thread ID of the current write lock owner (0 if there
is no owner). (C only.)
- `_reader`: Count of the number of read locks. (C only.)
- `_waiting_writers`: A fifo of coroutine objects waiting for a write
:ivar _reader: Count of the number of read locks. (C only.)
:ivar _waiting_writers: A fifo of coroutine objects waiting for a write
lock. (C only.)
- `_waiting_readers`: A fifo of coroutine objects waiting for a read
:ivar _waiting_readers: A fifo of coroutine objects waiting for a read
lock. (C only.)
"""
......@@ -443,7 +427,7 @@ cdef class rw_lock:
thread.
A coro thread may acquire multiple read locks, but it must call
`read_unlock` an equal number of times.
:meth:`read_unlock` an equal number of times.
"""
cdef coro me
me = the_scheduler._current
......@@ -463,12 +447,11 @@ cdef class rw_lock:
def try_read_lock(self):
"""Attempt to acquire a read lock.
This is the same as `read_lock` except it does not block if it cannot
This is the same as :meth:`read_lock` except it does not block if it cannot
acquire the lock.
:Return:
Returns True if it cannot acquire the lock.
Returns False if it successfully acquired the lock.
:returns: True if it cannot acquire the lock.
False if it successfully acquired the lock.
"""
cdef coro me
me = the_scheduler._current
......@@ -484,7 +467,7 @@ cdef class rw_lock:
This blocks if there are any other readers or writers holding the lock.
A coro thread may acquire multiple write locks, but it must call
`write_unlock` an equal number of times.
:meth:`write_unlock` an equal number of times.
Attempting to acquire a read lock while holding a write lock will cause
a deadlock.
......@@ -518,12 +501,11 @@ cdef class rw_lock:
def try_write_lock(self):
"""Attempt to acquire a write lock.
This is the same as `write_lock` except it does not block if it cannot
This is the same as :meth:`write_lock` except it does not block if it cannot
acquire the lock.
:Return:
Returns True if it cannot acquire the lock.
Returns False if it successfully acquired the lock.
:returns: True if it cannot acquire the lock.
False if it successfully acquired the lock.
"""
cdef coro me
me = the_scheduler._current
......@@ -602,13 +584,11 @@ cdef class rw_lock:
cdef class condition_variable:
"""Condition variable.
"""
This locking primitive provides a method to "trigger" an event for other
threads.
:IVariables:
- `_waiting`: A fifo of coroutine objects waiting for the lock. (C only.)
:ivar _waiting: A fifo of coroutine objects waiting for the lock. (C only.)
"""
cdef readonly _fifo _waiting
......@@ -634,8 +614,7 @@ cdef class condition_variable:
def wait (self):
"""Wait for the condition variable to be triggered.
:Return:
Returns the arguments given to the wake call (defaults to the empty
:returns: The arguments given to the wake call (defaults to the empty
tuple).
"""
return self._wait()
......@@ -663,20 +642,17 @@ cdef class condition_variable:
If there are no threads waiting, this does nothing.
:Parameters:
- `args`: The arguments to wake the thread with. Defaults to the
:param args: The arguments to wake the thread with. Defaults to the
empty tuple.
:Return:
Returns True if a thread was awoken, False if not.
:returns: True if a thread was awoken, False if not.
"""
return self._wake_one (args)
def wake_all (self, args=()):
"""Wake all waiting threads.
:Parameters:
- `args`: The arguments to wake the thread with. Defaults to the
:param args: The arguments to wake the thread with. Defaults to the
empty tuple.
"""
cdef coro co
......@@ -690,13 +666,11 @@ cdef class condition_variable:
def wake_n (self, int count, args=()):
"""Wake a specific number of threads.
:Parameters:
- `count`: The number of threads to wake up.
- `args`: The arguments to wake the thread with. Defaults to the
:param count: The number of threads to wake up.
:param args: The arguments to wake the thread with. Defaults to the
empty tuple.
:Return:
Returns the total number of threads actually awoken.
:returns: The total number of threads actually awoken.
"""
cdef coro co
cdef int total
......@@ -715,8 +689,7 @@ cdef class condition_variable:
def raise_all (self, the_exception):
"""Raise an exception on all waiting threads.
:Parameters:
- `the_exception`: The exception to raise on all waiting threads.
:param the_exception: The exception to raise on all waiting threads.
"""
cdef coro co
while self._waiting.size:
......@@ -732,13 +705,13 @@ cdef class condition_variable:
cdef class fifo:
"""First-in First-Out container.
"""
First-in First-Out container.
This uses a linked list.
:IVariables:
- `fifo`: The fifo object. (C only.)
- `cv`: A condition variable. (C only.)
:ivar fifo: The fifo object. (C only.)
:ivar cv: A condition variable. (C only.)
"""
cdef _fifo fifo
......@@ -754,8 +727,7 @@ cdef class fifo:
def push (self, thing):
"""Push an object to the end of the FIFO.
:Parameters:
- `thing`: The thing to add to the FIFO.
:param thing: The thing to add to the FIFO.
"""
self.fifo._push (thing)
self.cv.wake_one()
......@@ -765,8 +737,7 @@ cdef class fifo:
This blocks if the FIFO is empty.
:Return:
Returns the next object from the FIFO.
:returns: The next object from the FIFO.
"""
while self.fifo.size == 0:
self.cv._wait()
......@@ -778,8 +749,7 @@ cdef class fifo:
This will block if the fifo is empty and wait until there is an element
to pop.
:Return:
Returns a list of objects. Returns an empty list if the FIFO is
:returns: A list of objects. Returns an empty list if the FIFO is
empty.
"""
cdef int i
......
# Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
# the i18n builder cannot share the environment and doctrees with the others
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " texinfo to make Texinfo files"
@echo " info to make Texinfo files and run them through makeinfo"
@echo " gettext to make PO message catalogs"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/Shrapnel.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/Shrapnel.qhc"
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/Shrapnel"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/Shrapnel"
@echo "# devhelp"
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
texinfo:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo
@echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
@echo "Run \`make' in that directory to run these through makeinfo" \
"(use \`make info' here to do that automatically)."
info:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo "Running Texinfo files through makeinfo..."
make -C $(BUILDDIR)/texinfo info
@echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
gettext:
$(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
@echo
@echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
# -*- coding: utf-8 -*-
#
# Shrapnel documentation build configuration file, created by
# sphinx-quickstart on Fri Apr 13 18:44:49 2012.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
# -- General configuration -----------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.todo']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'Shrapnel'
copyright = u'2012, Sam Rushing'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '1.0.2'
# The full version, including alpha/beta/rc tags.
release = '1.0.2'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = ['_build']
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
html_theme_options = {'collapsiblesidebar': True}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'Shrapneldoc'
# -- Options for LaTeX output --------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'Shrapnel.tex', u'Shrapnel Documentation',
u'Sam Rushing', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# -- Options for manual page output --------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'shrapnel', u'Shrapnel Documentation',
[u'Sam Rushing'], 1)
]
# If true, show URL addresses after external links.
#man_show_urls = False
# -- Options for Texinfo output ------------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'Shrapnel', u'Shrapnel Documentation',
u'Sam Rushing', 'Shrapnel', 'One line description of project.',
'Miscellaneous'),
]
# Documents to append as an appendix to all manuals.
#texinfo_appendices = []
# If false, no module index is generated.
#texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
#texinfo_show_urls = 'footnote'
autodoc_default_flags = ['members', 'show-inheritance']
.. Shrapnel documentation master file, created by
sphinx-quickstart on Fri Apr 13 18:44:49 2012.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to Shrapnel's documentation!
====================================
Contents:
* :doc:`Installation <installation>`
* :doc:`Tutorial <tutorial>`
* :doc:`Reference Manual <ref/index>`
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
============
Installation
============
Supported Platforms
===================
Shrapnel currently works on FreeBSD, Linux, and Mac OS X with x86 32- or 64-bit platforms.
It supports Python 2.7 (TODO: and 2.6?).
Prerequisites
=============
pip
---
To make installation easy, you can use `pip <http://www.pip-installer.org/>`_.
This is a tool which will fetch Python packages from `PyPi
<http://pypi.python.org/>`_ and install them.
Visit http://www.pip-installer.org/en/latest/installing.html for information
on how to install pip if you don't already have it installed.
Cython
------
You need version 0.12.1 or newer of `Cython <http://cython.org/>`_. If you
already have Cython installed, you can check your current version by running
``cython -V``.
To install Cython, run:
pip install cython
Distribute
----------
You need version 0.6.16 or newer of `distribute <http://pypi.python.org/pypi/distribute>`_.
Distribute is a build and packaging tool for Python (a replacement for setuptools).
To install distribute, run:
pip install distribute
Shrapnel
--------
Finally, you can install Shrapnel, run:
pip install shrapnel
Alternatively you can download it from https://github.com/ironport/shrapnel
and do the usual ``python setup.py install`` procedure.
======
Clocks
======
Shrapnel needs to keep track of time to manage scheduling of sleeps and
timeouts. Because Shrapnel is intended to support thousands of coroutines,
and each coroutine may be making many timeout calls per second, Shrapnel needs
to use a timing facility that is relatively high performance. It also needs
one that is monotonic, so it does not need to deal with system clock changes.
The ``clocks`` subpackage is intended to provide a variety of different time
facilities. Currently it only supports using the x86 TSC timer. This is a
timer built in to the CPU, and thus is very fast.
TSC Time
========
Support for TSC time is implemented in the ``coro.clocks.tsc_time`` module.
.. automodule:: coro.clocks.tsc_time
==========
Coroutines
==========
The central concept of Shrapnel is the coroutine. You can think of a coroutine
like it is a thread. When it runs out of work to do, it yields and allows other
coroutines to run. Scheduling of coroutines is handled by the scheduler which
runs an "event loop".
Event Loop
==========
The event loop is a loop that runs forever until the program ends. Every
Shrapnel program needs to start the event loop as one of the first things it
does. A typical example would be::
import coro
def main():
print 'Hello world!'
# This will cause the process to exit.
coro.set_exit(0)
if __name__ == '__main__':
coro.spawn(main)
coro.event_loop()
Coroutines
==========
Every coroutine thread is created with either the :func:`new` function (which
does NOT automatically start the thread) or the :func:`spawn` function (which
DOES automatically start it).
Every thread has a unique numeric ID. You may also set the name of the thread
when you create it.
.. autoclass:: coro.coro
Timeouts
========
The shrapnel timeout facility allows you to execute a function which will be
interrupted if it does not finish within a specified period of time. The
:class:`TimeoutError` exception will be raised if the timeout expires. See the
:func:`with_timeout` docstring for more detail.
If the event loop is not running (such as in a non-coro process), a custom
version of `with_timeout` is installed that will operate using SIGALRM so that
you may use `with_timeout` in code that needs to run in non-coro processes
(though this is not recommended and should be avoided if possible).
.. autofunction:: coro.with_timeout
Parallel Execution
==================
XXX
.. autofunction:: coro.in_parallel
.. autoexception:: coro.InParallelError
Thread Local Storage
====================
There is a thread-local storage interface available for storing global data that
is thread-specific. You instantiate a :class:`ThreadLocal` instance and you can
assign attributes to it that will be specific to that thread. From a design
perspective, it is generally discouraged to use thread-local storage. But
nonetheless, it can be useful at times.
.. autoclass:: coro.ThreadLocal
Functions
=========
The coro module defines the following functions:
.. autofunction:: coro.get_thread_by_id
.. autofunction:: coro.coro_is_running
.. autofunction:: coro.event_loop
.. autofunction:: coro.new
.. autofunction:: coro.spawn
.. autofunction:: coro.waitpid
.. autofunction:: coro.yield_slice
.. autofunction:: coro.schedule
.. autofunction:: coro.current
.. autofunction:: coro.set_exit
.. autofunction:: coro.set_print_exit_string
.. autofunction:: coro.sleep_absolute
.. autofunction:: coro.sleep_relative
Variables
=========
.. py:data:: coro.all_threads
A dictionary of all live coroutine objects. The key is the coroutine ID,
and the value is the coroutine object.
Exceptions
==========
The coro module defines the following exceptions:
.. autoexception:: coro.ScheduleError
.. autoexception:: coro.DeadCoroutine
.. autoexception:: coro.ClosedError
.. autoexception:: coro.NotStartedError
.. autoexception:: coro.TimeoutError
.. autoexception:: coro.SimultaneousError
.. autoexception:: coro.Shutdown
.. autoexception:: coro.WakeUp
=========
Debugging
=========
There are a variety of features available to help with debugging in Shrapnel.
Backdoor
========
A very powerful feature of Shrapnel is the ability to access a running process
via a backdoor. You can telnet to a socket (typically a unix-domain socket)
and get a Python prompt. At this point, you can interact with anything in
your Shrapnel process.
As an example of something you can do in the backdoor is call
:func:`coro.where_all`. This will return a dictionary of every coroutine that
is running with a string describing the call stack of where that coroutine is
currently blocked.
To enable the backdoor, you typically start a backdoor coroutine before starting
the event loop with the following code:
.. sourcecode:: python
import coro.backdoor
coro.spawn(coro.backdoor.serve)
By default this will listen on all IP's on the lowest port available from 8023
to 8033. This isn't a very safe or secure thing to do. It's best to specify a
unix-domain socket with the ``unix_path`` parameter. See
:func:`coro.backdoor.serve` for details.
By default, the globals available in a backdoor session is a copy of the
globals from your applications ``__main__`` module.
.. autofunction:: coro.backdoor.serve
Stderr Output
=============
Shrapnel provides some functions for printing debug information to stderr. The
:func:`coro.print_stderr` function will print a string with a timestamp and
the thread number. The :func:`coro.write_stderr` function writes the string
verbatim with no newline.
Shrapnel keeps a reference to the "real" stderr (in ``saved_stderr``) and the
``print_stderr`` and ``write_stderr`` functions always use the real stderr
value. A particular reason for doing this is the backdoor module replaces
sys.stderr and sys.stdout, but we do not want debug output to go to the
interactive session.
.. autofunction:: coro.write_stderr
.. autofunction:: coro.print_stderr
Exceptions
==========
Tracebacks
----------
As a convenience, Shrapnel has a module for printing stack traces in a
condensed format. The ``coro.tb`` module has the :func:`coro.tb.stack_string`
function for printing the current stack, and :func:`coro.tb.traceback_string`
for getting a traceback in an exception handler.
.. autofunction:: coro.tb.stack_string
.. autofunction:: coro.tb.traceback_string
Exception Notifications
-----------------------
If an exception is raised in a coroutine and is never caught, then Shrapnel
will by default display the exception to stderr. If you want to change this
behavior, use :func:`coro.set_exception_notifier`.
.. autofunction:: coro.set_exception_notifier
Latency
=======
Shrapnel will keep track of how long a coroutine runs before it yields.
This is helpful to track down coroutines which are running for too long, or are
potentially calling blocking calls. Here is an example of the output that would
be sent to stderr when this happens::
Sat Apr 14 20:55:39 2012 High Latency: (3.884s)
for <coro #1 name='<function my_func at 0x800fd32a8>'
dead=0 started=1 scheduled=0 at 0x801424720>
You can change the threshold that will trigger this warning with the
:func:`coro.set_latency_warning` function. However, doing this to silence
warnings isn't a good idea. It is best to fix whatever code is causing the
warnings. You can either call :func:`coro.yield_slice` periodically to let
other coroutines run, or make sure you are not calling any blocking
operations.
.. autofunction:: coro.set_latency_warning
Functions
=========
The ``coro`` module defines the following functions:
.. autofunction:: coro.where
.. autofunction:: coro.where_all
.. autofunction:: coro.get_live_coros
===
DNS
===
TODO
.. autofunction:: coro.set_resolver
=========
Emulation
=========
Because Shrapnel is essentially its own threading system, code written with
the intention of using Python's standard threads will not work. Things like
Python's socket class will block and hang the entire program. To solve this
problem, Shrapnel includes some code that will monkeypatch some of Python's
standard classes to work with Shrapnel. You must manually enable this
behavior by calling :func:`coro.install_thread_emulation`.
.. autofunction:: coro.install_thread_emulation
*************************
Shrapnel Reference Manaul
*************************
This reference manual describes all of the basic concepts of Shrapnel along with all of the APIs.
.. toctree::
:maxdepth: 1
:numbered:
coroutines.rst
sockets.rst
synchronization.rst
clocks.rst
dns.rst
emulation.rst
debugging.rst
signals.rst
selfishness.rst
profiling.rst
oserrors.rst
========
OSErrors
========
As a convenience, Shrapnel wraps all OSError exceptions that it raises with a
subclass that is specific to the errno code. For example, an OSError with an
errno of ENOENT will be raised as the ENOENT exception. All exceptions derive
from OSError, so it is compatible with regular OSError handling.
All of the exceptions are defined in the ``coro.oserrors`` module.
For example, instead of doing this:
.. sourcecode:: python
try:
data = sock.recv(1024)
except OSError, e:
if e.errno == errno.ECONNRESET:
# Handle connection reset.
else:
# Handle other unknown error.
You can do this:
.. sourcecode:: python
try:
data = sock.recv(1024):
except ECONNRESET:
# Handle connection reset.
Profiling
=========
.. automodule:: coro.profiler
===========
Selfishness
===========
Shrapnel maintains a concept called "selfishness". This mechanism is used to
prevent a coroutine from yielding too often (or from running for too long).
This is currently only relevant to socket objects and socket I/O.
Each coroutine is given a set number of "free passes" each time it tries to do
I/O on a socket. If there is data immediately available on the socket, then
the coroutine may immediately receive that data. If Shrapnel did not
implement any "selfishness" limits, and that coroutine is in a loop repeatedly
calling ``read`` and there is always data available to the socket, then that
coroutine would run continuously without letting its fellow coroutines a
chance to run.
By default, every coroutine has a selfishness limit of 4. That means it is
allowed to do 4 I/O operations before it is forced to yield. Of course, if it
attempts to do an I/O operation that would block (such as if there is no data
available on a socket), then it will yield immediately.
You can set the default selfishness limit for all new coroutines with the
:func:`coro.set_selfishness` function. You can also change a coroutine's
limit with the :meth:`coro.coro.set_max_selfish_acts` method.
Functions
=========
The following functions are available in the ``coro`` module:
.. autofunction:: coro.set_selfishness
=======
Signals
=======
Shrapnel provides a way to handle signals. Youn can register a function to
receive signals with :func:`coro.signal_handler.register`.
By default when you start the event loop, two signal handlers are installed
(for SIGTERM and SIGINT). The default signal handler will exit the event loop.
You can change this behavior by setting ``coro.install_signal_handlers`` to False
before starting the event loop.
Additionally, there is a signal handler installed for SIGINFO. It prints the
name of the coroutine that is currently running. On a typical terminal, you
can trigger this with CTRL-T.
.. autofunction:: coro.signal_handler.register
=======
Sockets
=======
Most Shrapnel programs make heavy use of sockets. The ``coro`` package
implements its own socket class, which is nearly identical to the socket class
in Python. Indeed, if you use :func:`coro.install_thread_emulation` then the
socket class will be monkey-patched into Python's socket module.
Creating Sockets
================
Though you are free to directly instantiate the :class:`coro.sock` object, there are
a variety of functions to assist in creating socket objects with a little more clarity.
.. autofunction:: coro.tcp6_sock
.. autofunction:: coro.tcp_sock
.. autofunction:: coro.udp6_sock
.. autofunction:: coro.udp_sock
.. autofunction:: coro.unix_sock
.. autofunction:: coro.socketpair
.. autofunction:: coro.has_ipv6
Socket Classes
==============
.. autoclass:: coro.sock
.. autoclass:: coro.file_sock
.. autoclass:: coro.fd_sock
Socket Functions
================
The coro module offers the following functions related to sockets.
.. autofunction:: coro.get_live_sockets
Socket Constants
================
The following classes provide a variety of constants often used with sockets.
.. autoclass:: coro.AF
.. autoclass:: coro.PF
.. autoclass:: coro.SHUT
.. autoclass:: coro.SO
.. autoclass:: coro.SOCK
.. autoclass:: coro.SOL
===============
Synchronization
===============
You typically do not need to use synchronization primitives with Shrapnel
because coroutines are cooperative. However, there are situations where they
can be useful. For example, if you manipulate multiple shared data structures
that need to remain consistent, and you have potentially context-switch calls
interspersed (such as socket I/O).
Synchronization Classes
=======================
.. autoclass:: coro.condition_variable
.. autoclass:: coro.fifo
.. autoclass:: coro.inverted_semaphore
.. autoclass:: coro.LockError
.. autoclass:: coro.mutex
.. autoclass:: coro.semaphore
......@@ -111,7 +111,7 @@ setup (
],
packages=['coro', 'coro.clocks', 'coro.http', 'coro.dns'],
package_dir = {
'': 'coroutine',
# '': 'coroutine',
'coro': 'coro',
'coro.clocks': 'coro/clocks',
'coro.dns': 'coro/dns',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment