Commit e921e61e authored by Sam Rushing's avatar Sam Rushing

Merge branch 'master' into zstack

Conflicts:
	coro/_coro.pyx
parents 7ba1e191 5187b9ce
...@@ -5,5 +5,7 @@ build/ ...@@ -5,5 +5,7 @@ build/
coro/_coro.[ch] coro/_coro.[ch]
coro/oserrors.[ch] coro/oserrors.[ch]
coro/clocks/tsc_time.c coro/clocks/tsc_time.c
coro/dns/packet.c
coro/event_queue.cpp
*.pyc *.pyc
*.so *.so
...@@ -8,6 +8,12 @@ around FreeBSD's kqueue() system call. It's designed for ...@@ -8,6 +8,12 @@ around FreeBSD's kqueue() system call. It's designed for
single-process servers that can handle 10,000+ simultaneous network single-process servers that can handle 10,000+ simultaneous network
connections. connections.
API Documentation
=================
See http://ironport.github.com/shrapnel/
Short History Of Python Coroutine Implementations Short History Of Python Coroutine Implementations
================================================= =================================================
......
...@@ -20,113 +20,7 @@ ...@@ -20,113 +20,7 @@
# $Header: //prod/main/ap/shrapnel/coro/__init__.py#31 $ # $Header: //prod/main/ap/shrapnel/coro/__init__.py#31 $
"""Coroutine threading library. """Coroutine threading library."""
Introduction
============
Shrapnel is a cooperative threading library.
Getting Started
===============
When your process starts up, you must spawn a thread to do some work, and then
start the event loop. The event loop runs forever processing events until the
process exits. An example::
import coro
def main():
print 'Hello world!'
# This will cause the process to exit.
coro.set_exit(0)
coro.spawn(main)
coro.event_loop()
Coroutines
==========
Every coroutine thread is created with either the `new` function (which does
NOT automatically start the thread) or the `spawn` function (which DOES
automatically start it).
Every thread has a unique numeric ID. You may also set the name of the thread
when you create it.
Timeouts
========
The shrapnel timeout facility allows you to execute a function which will be
interrupted if it does finish within a specified period of time. The
`coro.TimeoutError` exception will be raised if the timeout expires. See the
`with_timeout` docstring for more detail.
If the event loop is not running (such as in a non-coro process), a custom
version of `with_timeout` is installed that will operate using SIGALRM so that
you may use `with_timeout` in code that needs to run in non-coro processes
(though this is not recommended and should be avoided if possible).
Thread Local Storage
====================
There is a tread-local storage interface available for storing global data this
is thread-specific. You instantiate a `ThreadLocal` instance and you can
assign attributes to it that will be specific to that thread. See the
`ThreadLocal` docs for more detail.
Signal Handlers
===============
By default when you start the event loop, two signal handlers are installed
(for SIGTERM and SIGINT). The default signal handler will exit the event loop.
You can change this behavior by setting `install_signal_handlers` to False
before starting the event loop.
See `coro.signal_handler` for more detail on setting coro signal handlers.
Selfishness
===========
Certain socket operations are allowed to try to execute without blocking if
they are able to (such as send/receiving data on a local socket or on a
high-speed network). However, there is a limit to the number of times a thread
is allowed to do this. The default is 4. The default may be changed
(`set_selfishness`) and the value on a per-thread may be changed
(`coro.coro.set_max_selfish_acts`).
Time
====
Shrapnel uses the `tsc_time` module for handling time. It uses the TSC
value for a stable and high-resolution unit of time. See that module's
documentation for more detail.
A thread is always created when you start the event loop that will
resynchronize the TSC relationship to accomodate any clock drift (see
`tick_updater` and `tsc_time.update_time_relation`).
Exception Notifier
==================
When a thread exits due to an exception, by default a stack trace is printed to
stderr. You may install your own callback to handle this situation. See the
`set_exception_notifier` function for more detail.
Debug Output
============
The shrapnel library provides a mechanism for printing debug information to
stderr. The `print_stderr` function will print a string with a timestamp
and the thread number. The `write_stderr` function writes the string verbatim.
Shrapnel keeps a reference to the "real" stderr (in `saved_stderr`) and the
`print_stderr` and `write_stderr` functions always use the real stderr value. A
particular reason for doing this is the backdoor module replaces sys.stderr and
sys.stdout, but we do not want debug output to go to the interactive session.
Profiling
=========
Shrapnel has its own profiler that is coro-aware. See `coro.profiler` for
details on how to run the profiler.
:Variables:
- `all_threads`: A dictionary of all live coroutine objects. The key is
the coroutine ID, and the value is the coroutine object.
- `saved_stderr`: The actual stderr object for the process. This normally
should not be used. An example of why this exists is because the
backdoor replaces sys.stderr while executing user code.
"""
from coro._coro import * from coro._coro import *
from coro._coro import _yield from coro._coro import _yield
...@@ -166,13 +60,12 @@ set_exception_notifier (default_exception_notifier) ...@@ -166,13 +60,12 @@ set_exception_notifier (default_exception_notifier)
class InParallelError (Exception): class InParallelError (Exception):
"""An error occurred in the `in_parallel` function. """An error occurred in the :func:`in_parallel` function.
:IVariables: :ivar result_list: A list of ``(status, result)`` tuples. ``status`` is
- `result_list`: A list of ``(status, result)`` tuples. ``status`` is either :data:`SUCCESS` or :data:`FAILURE`. For success, the result is the return
either `SUCCESS` or `FAILURE`. For success, the result is the return
value of the function. For failure, it is the output from value of the function. For failure, it is the output from
`sys.exc_info`. ``sys.exc_info``.
""" """
def __init__(self, result_list): def __init__(self, result_list):
...@@ -195,17 +88,14 @@ def in_parallel (fun_arg_list): ...@@ -195,17 +88,14 @@ def in_parallel (fun_arg_list):
This will block until all functions have returned or raised an exception. This will block until all functions have returned or raised an exception.
If one or more functions raises an exception, then the `InParallelError` If one or more functions raises an exception, then the :exc:`InParallelError`
exception will be raised. exception will be raised.
:Parameters: :param fun_arg_list: A list of ``(fun, args)`` tuples.
- `fun_arg_list`: A list of ``(fun, args)`` tuples.
:Return: :returns: A list of return values from the functions.
Returns a list of return values from the functions.
:Exceptions: :raises InParallelError: One or more of the functions raised an exception.
- `InParallelError`: One or more of the functions raised an exception.
""" """
# InParallelError, [(SUCCESS, result0), (FAILURE, exc_info1), ...] # InParallelError, [(SUCCESS, result0), (FAILURE, exc_info1), ...]
...@@ -257,14 +147,11 @@ def tick_updater(): ...@@ -257,14 +147,11 @@ def tick_updater():
def waitpid (pid): def waitpid (pid):
"""Wait for a process to exit. """Wait for a process to exit.
:Parameters: :param pid: The process ID to wait for.
- `pid`: The process ID to wait for.
:Return: :returns: A tuple ``(pid, status)`` of the process.
Returns a tuple ``(pid, status)`` of the process.
:Exceptions: :raises SimultaneousError: Something is already waiting for this process
- `SimultaneousError`: Something is already waiting for this process
ID. ID.
""" """
if UNAME == "Linux": if UNAME == "Linux":
...@@ -290,14 +177,11 @@ def waitpid (pid): ...@@ -290,14 +177,11 @@ def waitpid (pid):
def get_thread_by_id (thread_id): def get_thread_by_id (thread_id):
"""Get a coro thread by ID. """Get a coro thread by ID.
:Parameters: :param thread_id: The thread ID.
- `thread_id`: The thread ID.
:Return: :returns: The coroutine object.
Returns the coroutine object.
:Exceptions: :raises KeyError: The coroutine does not exist.
- `KeyError`: The coroutine does not exist.
""" """
return all_threads[thread_id] return all_threads[thread_id]
...@@ -305,11 +189,9 @@ def where (co): ...@@ -305,11 +189,9 @@ def where (co):
"""Return a string indicating where the given coroutine thread is currently """Return a string indicating where the given coroutine thread is currently
running. running.
:Parameters: :param co: The coroutine object.
- `co`: The coroutine object.
:Return: :returns: A string displaying where the coro thread is currently
Returns a string displaying where the coro thread is currently
executing. executing.
""" """
f = co.get_frame() f = co.get_frame()
...@@ -318,8 +200,7 @@ def where (co): ...@@ -318,8 +200,7 @@ def where (co):
def where_all(): def where_all():
"""Get a dictionary of where all coroutines are currently executing. """Get a dictionary of where all coroutines are currently executing.
:Return: :returns: A dictionary mapping the coroutine ID to a tuple of ``(name,
Returns a dictionary mapping the coroutine ID to a tuple of ``(name,
coro, where)`` where ``where`` is a string representing where the coro, where)`` where ``where`` is a string representing where the
coroutine is currently running. coroutine is currently running.
""" """
...@@ -339,13 +220,11 @@ def spawn (fun, *args, **kwargs): ...@@ -339,13 +220,11 @@ def spawn (fun, *args, **kwargs):
Additional arguments and keyword arguments will be passed to the given function. Additional arguments and keyword arguments will be passed to the given function.
:Parameters: :param fun: The function to call when the coroutine starts.
- `fun`: The function to call when the coroutine starts. :param thread_name: The name of the thread. Defaults to the name of the
- `thread_name`: The name of the thread. Defaults to the name of the
function. function.
:Return: :returns: The new coroutine object.
Returns the new coroutine object.
""" """
if kwargs.has_key('thread_name'): if kwargs.has_key('thread_name'):
thread_name = kwargs['thread_name'] thread_name = kwargs['thread_name']
...@@ -364,13 +243,11 @@ def new (fun, *args, **kwargs): ...@@ -364,13 +243,11 @@ def new (fun, *args, **kwargs):
This will not start the coroutine. Call the ``start`` method on the This will not start the coroutine. Call the ``start`` method on the
coroutine to schedule it to run. coroutine to schedule it to run.
:Parameters: :param fun: The function to call when the coroutine starts.
- `fun`: The function to call when the coroutine starts. :param thread_name: The name of the thread. Defaults to the name of the
- `thread_name`: The name of the thread. Defaults to the name of the
function. function.
:Return: :returns: The new coroutine object.
Returns the new coroutine object.
""" """
if kwargs.has_key('thread_name'): if kwargs.has_key('thread_name'):
thread_name = kwargs['thread_name'] thread_name = kwargs['thread_name']
...@@ -457,8 +334,7 @@ event_loop_is_running = False ...@@ -457,8 +334,7 @@ event_loop_is_running = False
def coro_is_running(): def coro_is_running():
"""Determine if the coro event loop is running. """Determine if the coro event loop is running.
:Return: :returns: True if the event loop is running, otherwise False.
Returns True if the event loop is running, otherwise False.
""" """
return event_loop_is_running return event_loop_is_running
...@@ -468,8 +344,7 @@ def sigterm_handler (*_unused_args): ...@@ -468,8 +344,7 @@ def sigterm_handler (*_unused_args):
def event_loop (timeout=30): def event_loop (timeout=30):
"""Start the event loop. """Start the event loop.
:Parameters: :param timeout: The amount of time to wait for kevent to return
- `timeout`: The amount of time to wait for kevent to return
events. You should probably *not* set this value. events. You should probably *not* set this value.
""" """
global event_loop_is_running, with_timeout, sleep_relative global event_loop_is_running, with_timeout, sleep_relative
......
This diff is collapsed.
# -*- Mode: Python -*-
This diff is collapsed.
# -*- Mode: Python -*-
import coro
import coro.dns
import coro.dns.packet as packet
import random
class QueryFailed (Exception):
pass
class stub_resolver:
def __init__ (self, nameservers, inflight=200):
self.nameservers = nameservers
self.inflight = coro.semaphore (inflight)
self.inflight_ids = set()
def lookup (self, qname, qtype, timeout=10, retries=3):
m = packet.Packer()
h = packet.Header()
while 1:
qid = random.randrange (65536)
# avoid collisions
if qid not in self.inflight_ids:
break
h.id = qid
h.opcode = packet.OPCODE.QUERY
h.rd = 1
h.qdcount = 1
m.addHeader (h)
m.addQuestion (qname, qtype, packet.CLASS.IN)
p = m.getbuf()
for addr in self.nameservers:
for i in range (retries):
self.inflight.acquire (1)
self.inflight_ids.add (qid)
try:
s = coro.udp_sock()
s.connect ((addr, 53))
s.send (p)
try:
reply = coro.with_timeout (timeout, s.recv, 1000)
u = packet.Unpacker (reply)
result = u.unpack()
rh = result[0]
if rh.id != qid:
raise QueryFailed ("bad id in reply")
else:
return result
except coro.TimeoutError:
pass
finally:
self.inflight.release (1)
self.inflight_ids.remove (qid)
raise QueryFailed ("no reply from nameservers")
def gethostbyname (self, name, qtype):
header, qdl, anl, nsl, arl = self.lookup (name, qtype)
for answer in anl:
name, rtype, _, ttl, addr = answer
if getattr (packet.TYPE, rtype) == qtype:
return addr
else:
raise QueryFailed ("no answer in nameserver reply")
def resolve_ipv4 (self, name):
return self.gethostbyname (name, packet.TYPE.A)
def resolve_ipv6 (self, name):
return self.gethostbyname (name, packet.TYPE.AAAA)
def install (nameserver_ips):
"install a stub resolver into the coro socket layer"
coro.set_resolver (
stub_resolver (nameserver_ips)
)
# -*- Mode: Python -*-
import coro
import coro.dns.packet as dns
def testpacker():
# See section 4.1.4 of RFC 1035
p = dns.Packer()
p.addbytes('*' * 20)
p.addname('f.ISI.ARPA')
p.addbytes('*' * 8)
p.addname('Foo.F.isi.arpa')
p.addbytes('*' * 18)
p.addname('arpa')
p.addbytes('*' * 26)
p.addname('')
packet = p.getbuf()
assert packet == (
'********************\x01f\x03ISI\x04ARPA\x00'
'********\x03Foo\xc0\x14******************\xc0\x1a'
'**************************\x00'
)
u = dns.Unpacker (packet)
res = (
u.getbytes(20),
u.getname(),
u.getbytes(8),
u.getname(),
u.getbytes(18),
u.getname(),
u.getbytes(26),
u.getname(),
)
assert res == (
'********************',
'f.isi.arpa',
'********',
'foo.f.isi.arpa',
'******************',
'arpa',
'**************************',
''
)
def test_packer_2 ():
p = dns.Packer()
h = dns.Header()
h.id = 3141
h.opcode = dns.OPCODE.QUERY
h.rd = 0
h.ancount = 1
h.arcount = 1
h.qdcount = 1
p.addHeader (h)
p.addQuestion ('glerg.org', dns.TYPE.CNAME, dns.CLASS.IN)
p.addCNAME ('glerg.org', dns.CLASS.IN, 3000, 'blerb.com')
p.addHINFO ('brodig.com', dns.CLASS.IN, 5000, 'vax', 'vms')
data = p.getbuf()
u = dns.Unpacker (data)
h, qdl, anl, nsl, arl = u.unpack()
assert qdl == [('glerg.org', 5, 1)]
assert anl == [('glerg.org', 'CNAME', 'IN', 3000, 'blerb.com')]
assert arl == [('brodig.com', 'HINFO', 'IN', 5000, ('vax', 'vms'))]
assert nsl == []
assert h.id == 3141
assert h.opcode == dns.OPCODE.QUERY
assert h.ancount == 1
assert h.qdcount == 1
def t0 (qname='www.nightmare.com', qtype=dns.TYPE.A):
m = dns.Packer()
h = dns.Header()
h.id = 3141
h.opcode = dns.OPCODE.QUERY
h.rd = 1
h.qdcount = 1
m.addHeader (h)
m.addQuestion (qname, qtype, dns.CLASS.IN)
p = m.getbuf()
return p
def t1 (qname, qtype):
p = t0 (qname, getattr (dns.TYPE, qtype))
s = coro.udp_sock()
s.connect (('192.168.200.1', 53))
s.send (p)
r = s.recv (8192)
coro.write_stderr ('reply=%r\n' % (r,))
u = dns.Unpacker (r)
return u.unpack()
def t2():
import coro.dns.stub_resolver
r = coro.dns.stub_resolver.stub_resolver (['192.168.200.1'])
coro.set_resolver (r)
# XXX make this into a real unit test.
if __name__ == '__main__':
#import coro.backdoor
#coro.spawn (coro.backdoor.serve, unix_path='/tmp/xx.bd')
#coro.event_loop()
test_packer_2()
/*
Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "Python.h"
#include <map>
typedef std::multimap <unsigned long long, PyObject *> event_queue;
typedef event_queue::iterator event_queue_iter;
/* Needed because Pyrex is compiled as a C program and is expecting C-like
symbols.
*/
extern "C" {
/*
Create a new event queue.
*/
event_queue *
event_queue_new()
{
return new event_queue;
}
/*
Delete the event queue and free all data.
*/
void
event_queue_dealloc(event_queue * q)
{
event_queue_iter iter;
for (iter = q->begin(); iter != q->end(); iter++) {
Py_DECREF(iter->second);
}
delete q;
}
/*
Returns the length of the queue.
*/
int
event_queue_len(event_queue * q)
{
return q->size();
}
/*
Returns a new iterator.
*/
event_queue_iter
event_queue_new_iter(event_queue * q)
{
return q->begin();
}
/*
Return the current value of the iterator and move the position forward.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with StopIteration set.
*/
PyObject *
event_queue_iter_next(event_queue * q, event_queue_iter * iter, uint64_t * time)
{
PyObject * value;
if (*iter == q->end()) {
PyErr_SetObject(PyExc_StopIteration, NULL);
return NULL;
} else {
if (time) {
*time = (*iter)->first;
}
value = (*iter)->second;
Py_INCREF(value);
(*iter)++;
return value;
}
}
/*
Peek at the top of the queue.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with IndexError set.
*/
PyObject *
event_queue_top(event_queue * q, uint64_t * time)
{
PyObject * value;
if (q->size()) {
event_queue_iter iter = q->begin();
if (time) {
*time = iter->first;
}
value = iter->second;
Py_INCREF(value);
return value;
} else {
PyErr_SetString(PyExc_IndexError, "top of empty queue");
return NULL;
}
}
/*
Pop the first element off the queue.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with IndexError set.
*/
PyObject *
event_queue_pop(event_queue * q, uint64_t * time)
{
PyObject * value;
if (q->size()) {
event_queue_iter iter = q->begin();
if (time) {
*time = iter->first;
}
value = iter->second;
q->erase (iter);
return value;
} else {
PyErr_SetString(PyExc_IndexError, "pop from empty queue");
return NULL;
}
}
/*
Insert a new entry into the queue.
Returns 0 on succes, -1 on failure.
(Currently never fails.)
*/
int
event_queue_insert(event_queue * q, uint64_t time, PyObject * value)
{
q->insert (std::pair <uint64_t, PyObject *> (time, value));
Py_INCREF(value);
return 0;
}
/*
Delete an entry from the queue.
Returns 0 on success, -1 on failure with IndexError set.
*/
int
event_queue_delete(event_queue * q, uint64_t time, PyObject * value)
{
event_queue_iter iter = q->find(time);
// Iterate since we support duplicate keys.
while (iter != q->end()) {
if (iter->first != time) {
break;
}
if (iter->second == value) {
Py_DECREF(iter->second);
q->erase(iter);
return 0;
} else {
iter++;
}
}
PyErr_SetString(PyExc_IndexError, "event not found");
return -1;
}
} /* extern "C" */
/*
Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef _EVENT_QUEUE_H_
#define _EVENT_QUEUE_H_
#include "Python.h"
/* Types are defined as void * to fake out Pyrex which can't handle C++ types. */
//typedef std::multimap <long long, PyObject *> event_queue;
typedef void * event_queue;
event_queue * event_queue_new(void);
void event_queue_dealloc(event_queue * q);
PyObject * event_queue_top(event_queue * q, uint64_t * time);
PyObject * event_queue_pop(event_queue * q, uint64_t * time);
int event_queue_insert(event_queue * q, uint64_t time, PyObject * value);
int event_queue_delete(event_queue * q, uint64_t time, PyObject * value);
int event_queue_len(event_queue * q);
//typedef event_queue::iterator event_queue_iter;
typedef void * event_queue_iter;
event_queue_iter event_queue_new_iter(event_queue * q);
PyObject * event_queue_iter_next(event_queue * q, event_queue_iter * iter, uint64_t * time);
#endif /* _EVENT_QUEUE_H_ */
...@@ -22,117 +22,111 @@ ...@@ -22,117 +22,111 @@
__event_queue_version__ = "$Id: event_queue.pyx,v 1.1 2007/01/03 00:19:50 ehuss Exp $" __event_queue_version__ = "$Id: event_queue.pyx,v 1.1 2007/01/03 00:19:50 ehuss Exp $"
cdef extern from "event_queue.h": include "python.pxi"
from cython.operator cimport dereference as deref, preincrement as inc
ctypedef void * cpp_event_queue "event_queue" from libcpp.utility cimport pair
from libc cimport uint64_t
cpp_event_queue * event_queue_new()
void event_queue_dealloc(cpp_event_queue * q) cdef extern from "<map>" namespace "std":
object event_queue_top(cpp_event_queue * q, uint64_t * time) cdef cppclass multimap[T, U]:
object event_queue_pop(cpp_event_queue * q, uint64_t * time) cppclass iterator:
int event_queue_insert(cpp_event_queue * q, uint64_t time, object) except -1 pair[T,U]& operator*()
int event_queue_delete(cpp_event_queue * q, uint64_t time, object) except -1 iterator operator++()
int event_queue_len(cpp_event_queue * q) iterator operator--()
bint operator==(iterator)
ctypedef void * cpp_event_queue_iter "event_queue_iter" bint operator!=(iterator)
cpp_event_queue_iter event_queue_new_iter(cpp_event_queue * q) map()
object event_queue_iter_next(cpp_event_queue * q, cpp_event_queue_iter * iter, uint64_t * time) U& operator[](T&)
U& at(T&)
cdef class event_queue_iter iterator begin()
size_t count(T&)
bint empty()
iterator end()
void erase(iterator)
void erase(iterator, iterator)
size_t erase(T&)
iterator find(T&)
pair[iterator, bint] insert(pair[T,U])
size_t size()
cdef class event_queue: cdef class event_queue:
cdef multimap[uint64_t, PyObject*] *q
cdef cpp_event_queue * q
def __cinit__(self): def __cinit__(self):
self.q = event_queue_new() self.q = new multimap[uint64_t, PyObject*]()
def __dealloc__(self): def __dealloc__(self):
event_queue_dealloc(self.q) cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
while it != self.q.end():
Py_DECREF(<object> deref(it).second)
inc(it)
del self.q
def __len__(self): cpdef insert(self, uint64_t time, value):
return event_queue_len(self.q) """Insert a new value into the queue.
cdef int len(self): :Parameters:
return event_queue_len(self.q) - `time`: The uint64 time.
- `value`: The value to insert.
"""
cdef pair[uint64_t, PyObject*] p
p.first, p.second = time, <PyObject *> value
self.q.insert(p)
Py_INCREF(value)
cdef c_top(self, uint64_t * time): def __len__(self):
return event_queue_top(self.q, time) return self.q.size()
def top(self): cpdef top(self):
"""Peek at the top value of the queue. """Peek at the top value of the queue.
:Return: :Return:
Returns a ``(time, value)`` tuple from the top of the queue. Returns value from the top of the queue.
:Exceptions: :Exceptions:
- `IndexError`: The queue is empty. - `IndexError`: The queue is empty.
""" """
cdef uint64_t time if not self.q.size():
raise IndexError('Top of empty queue')
cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
return <object> deref(it).second
value = event_queue_top(self.q, &time) cpdef pop(self):
return (time, value)
cdef c_pop(self, uint64_t * time):
return event_queue_pop(self.q, time)
def pop(self):
"""Grab the top value of the queue and remove it. """Grab the top value of the queue and remove it.
:Return: :Return:
Returns a ``(time, value)`` tuple from the top of the queue. Returns value from the top of the queue.
:Exceptions: :Exceptions:
- `IndexError`: The queue is empty. - `IndexError`: The queue is empty.
""" """
cdef uint64_t time if not self.q.size():
raise IndexError('Top of empty queue')
value = event_queue_pop(self.q, &time) cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
return (time, value) value = <object> deref(it).second
self.q.erase(it)
cdef c_insert(self, uint64_t time, value): Py_DECREF(value)
event_queue_insert(self.q, time, value) return value
def insert(self, uint64_t time, value): cpdef remove(self, uint64_t time, value):
"""Insert a new value into the queue.
:Parameters:
- `time`: The uint64 time.
- `value`: The value to insert.
"""
event_queue_insert(self.q, time, value)
cdef c_delete(self, uint64_t time, value):
event_queue_delete(self.q, time, value)
def delete(self, uint64_t time, value):
"""Delete a value from the queue. """Delete a value from the queue.
:Parameters: :Parameters:
- `time`: The uint64 time. - `time`: The uint64 time.
- `value`: The value to delete. - `value`: The value to delete.
""" """
event_queue_delete(self.q, time, value) cdef PyObject *val
cdef multimap[uint64_t, PyObject*].iterator it = self.q.find(time)
def __iter__(self): cdef PyObject *v = <PyObject *> value
cdef event_queue_iter i while it != self.q.end():
if deref(it).first != time:
i = event_queue_iter() break
i.q = self.q val = <PyObject *> deref(it).second
i.iter = event_queue_new_iter(self.q) if v == val:
return i self.q.erase(it)
Py_DECREF(<object>val)
cdef class event_queue_iter: return 0
else:
cdef cpp_event_queue * q inc(it)
cdef cpp_event_queue_iter iter raise IndexError('Event not found')
def __iter__(self):
return self
def __next__(self):
cdef uint64_t time
value = event_queue_iter_next(self.q, &self.iter, &time)
return (time, value)
# -*- Mode: Python -*-
from server import server, tlslite_server
import handlers
import coro
from coro import read_stream
import http_date
import session_handler
# -*- Mode: Python -*-
import coro
import coro.read_stream
from protocol import http_file, header_set, latch
W = coro.write_stderr
class HTTP_Protocol_Error (Exception):
pass
# viewed at its core, HTTP is a two-way exchange of messages,
# some of which may have content associated with them.
# two different usage patterns for pipelined requests:
# 1) requests are made by different threads
# 2) requests are made by a single thread
#
# we accommodate both patterns here.
# for #2, use the lower-level send_request() method, for #1,
# use GET, PUT, etc...
class request:
def __init__ (self, force=True):
self.latch = latch()
self.force = force
self.content = None
self.response = None
self.rheader = None
self.rfile = None
def wake (self):
if self.rfile and self.force:
self.content = self.rfile.read()
self.latch.wake_all()
if self.rfile and not self.force:
self.rfile.wait()
def wait (self):
return self.latch.wait()
class client:
def __init__ (self, host, port=80, conn=None, inflight=100):
self.host = host
self.inflight = coro.semaphore (inflight)
if conn is None:
self.conn = coro.tcp_sock()
self.conn.connect ((host, port))
else:
self.conn = conn
self.stream = coro.read_stream.sock_stream (self.conn)
self.pending = coro.fifo()
coro.spawn (self.read_thread)
def read_thread (self):
while 1:
req = self.pending.pop()
self._read_message (req)
if not req.response:
break
else:
req.wake()
def _read_message (self, req):
req.response = self.stream.read_line()[:-2]
lines = []
while 1:
line = self.stream.read_line()
if not line:
raise HTTP_Protocol_Error ('unexpected close')
elif line == '\r\n':
break
else:
lines.append (line[:-2])
req.rheader = h = header_set (lines)
if h['content-length'] or h['transfer-encoding']:
req.rfile = http_file (h, self.stream)
def send_request (self, method, uri, headers, content=None, force=False):
try:
self.inflight.acquire (1)
req = request (force)
self._send_request (method, uri, headers, content)
self.pending.push (req)
return req
finally:
self.inflight.release (1)
def _send_request (self, method, uri, headers, content):
if not headers.has_key ('host'):
headers['host'] = self.host
if content:
if type(content) is str:
headers['content-length'] = len(content)
elif not headers.has_key ('content-length'):
headers['transfer-encoding'] = 'chunked'
req = (
'%s %s HTTP/1.1\r\n'
'%s\r\n' % (method, uri, headers)
)
self.conn.send (req)
# XXX 100 continue
if content:
if type(content) is str:
self.conn.send (content)
elif headers.has_key ('content-length'):
clen = int (headers.get_one ('content-length'))
slen = 0
for block in content:
self.conn.send (block)
slen += len(block)
if slen > clen:
raise HTTP_Protocol_Error ("content larger than declared length", clen, slen)
else:
if slen != clen:
raise HTTP_Protocol_Error ("content smaller than declared length", clen, slen)
else:
# chunked encoding
for block in content:
if block:
self.conn.writev (['%x\r\n' % (len (block),), block])
self.conn.send ('0\r\n')
def GET (self, uri, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('GET', uri, headers, force=True)
req.wait()
return req
def GET_file (self, uri, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('GET', uri, headers, force=False)
req.wait()
return req
def PUT (self, uri, content, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('PUT', uri, headers, content, force=True)
req.wait()
return req
def POST (self, uri, content, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('POST', uri, headers, content, force=True)
req.wait()
return req
# -*- Mode: Python -*-
import coro
import coro.http
import backdoor
# toy: move an X through a grid.
# tests: POST data, compression, persistent connections, shared state
import sys
W = sys.stderr.write
class grid_handler:
def __init__ (self, w, h):
self.w = w
self.h = h
self.grid = [['.' for x in range (w)] for y in range (h)]
self.pos = [w/2, h/2]
self.grid[self.pos[1]][self.pos[0]] = 'X'
def match (self, request):
return request.path.startswith ('/grid')
def handle_request (self, request):
if request.path == '/grid/source':
request['content-type'] = 'text/plain'
request.set_deflate()
request.push (open ('grid.py', 'rb').read())
request.done()
return
request['content-type'] = 'text/html'
request.set_deflate()
if request.file:
data = request.file.read()
pairs = [ x.split('=') for x in data.split ('&') ]
for k, v in pairs:
if k == 'dir':
x0, y0 = self.pos
x1, y1 = self.pos
if v == 'left':
x1 = max (x0-1, 0)
elif v == 'right':
x1 = min (x0+1, self.w-1)
elif v == 'up':
y1 = max (y0-1, 0)
elif v == 'down':
y1 = min (y0+1, self.h-1)
else:
pass
self.grid[y0][x0] = '*'
self.grid[y1][x1] = 'X'
self.pos = [x1, y1]
else:
pass
l = []
for y in self.grid:
l.append (''.join (y))
request.push ('<pre>')
request.push ('\n'.join (l))
request.push ('\n</pre>\n')
request.push (
'<form name="input" action="grid" method="post">'
'<input type="submit" name="dir" value="left" />'
'<input type="submit" name="dir" value="right" />'
'<input type="submit" name="dir" value="up" />'
'<input type="submit" name="dir" value="down" />'
'</form>'
'<a href="/grid/source">source for this handler</a>'
)
request.done()
server = coro.http.server()
server.push_handler (grid_handler (50, 30))
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9001))
coro.spawn (backdoor.serve, unix_path='/tmp/httpd.bd')
coro.event_loop (30.0)
# -*- Mode: Python -*-
import coro
import coro.http
import backdoor
# demonstrate the session handler
import sys
W = sys.stderr.write
def session (sid, fifo):
i = 0
while 1:
try:
# wait a half hour for a new hit
request = coro.with_timeout (1800, fifo.pop)
except coro.TimeoutError:
break
else:
request['content-type'] = 'text/html'
if i == 10:
request.push (
'<html><h1>Session Over! Bye!</h1>'
'<a href="session">start over</a>'
'</html>'
)
request.done()
break
else:
request.push (
'<html><h1>Session Demo</h1><br><h2>Hit=%d</h2>'
'<a href="session">hit me!</a>'
'</html>' % (i,)
)
request.done()
i += 1
server = coro.http.server()
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.session_handler.session_handler ('session', session))
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9001))
coro.spawn (backdoor.serve, unix_path='/tmp/httpd.bd')
coro.event_loop (30.0)
# -*- Mode: Python -*-
import coro
W = coro.write_stderr
from coro.httpd.client import client as http_client
def t0():
c = http_client ('127.0.0.1', 80)
l = [ c.send_request ('GET', '/postgresql/html/', {}, content=None, force=True) for x in range (10) ]
for req in l:
req.wait()
W ('%s\n' % (req.response,))
def t1():
c = http_client ('127.0.0.1', 80)
rl = coro.in_parallel ([(c.GET, ('/postgresql/html/',))] * 10)
for x in rl:
W ('%s\n' % (x.response,))
return rl
if __name__ == '__main__':
import coro.backdoor
coro.spawn (t0)
coro.spawn (coro.backdoor.serve, unix_path='/tmp/xx.bd')
coro.event_loop()
# -*- Mode: Python -*-
# demo an https server using the TLSLite package.
import coro
import coro.http
import coro.backdoor
# -----------------------------------------------------------------------
# --- change the location of the chain and key files on the next line ---
# -----------------------------------------------------------------------
server = coro.http.tlslite_server (
'cert/server.crt',
'cert/server.key',
)
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9443))
coro.spawn (coro.backdoor.serve, unix_path='/tmp/httpsd.bd')
coro.event_loop (30.0)
# -*- Mode: Python -*-
import coro
import os
import re
import sys
import time
import zlib
from coro.http.http_date import build_http_date
W = sys.stderr.write
# these two aren't real handlers, they're more like templates
# to give you an idea how to write one.
class post_handler:
def match (self, request):
# override to do a better job of matching
return request._method == 'post'
def handle_request (self, request):
data = request.file.read()
W ('post handler, data=%r\n' % (data,))
request.done()
class put_handler:
def match (self, request):
# override to do a better job of matching
return request.method == 'put'
def handle_request (self, request):
fp = request.file
while 1:
line = fp.readline()
if not line:
W ('line: DONE!\n')
break
else:
W ('line: %r\n' % (line,))
request.done()
class coro_status_handler:
def match (self, request):
return request.path.split ('/')[1] == 'status'
def clean (self, s):
s = s.replace ('<','&lt;')
s = s.replace ('>','&gt;')
return s
def handle_request (self, request):
request['content-type'] = 'text/html; charset=utf-8'
request.set_deflate()
request.push (
'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" '
'"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">'
'<html xmlns="http://www.w3.org/1999/xhtml">\r\n'
)
request.push ('<head><title>status</title></head><body>\r\n')
request.push ('<p>Listening on\r\n')
request.push (repr (request.server.addr))
request.push ('</p>\r\n')
request.push ('<table border="1">\r\n')
all_threads = ( (x, coro.where(x)) for x in coro.all_threads.values() )
for thread, traceback in all_threads:
request.push ('<tr><td>%s\r\n' % self.clean (repr(thread)))
request.push ('<pre>\r\n')
# traceback format seems to have changed
for level in traceback[1:-1].split ('] ['):
[file, fun] = level.split (' ')
fun, line = fun.split ('|')
request.push ('<b>%20s</b>:%3d %s\r\n' % (self.clean (fun), int(line), self.clean (file)))
request.push ('</pre></td></tr>')
request.push ('</table>\r\n')
request.push ('<p><a href="status">Update</a></p>')
request.push ('</body></html>')
request.done()
class file_handler:
block_size = 16000
def __init__ (self, doc_root):
self.doc_root = doc_root
def match (self, request):
path = request.path
filename = os.path.join (self.doc_root, path[1:])
return os.path.exists (filename)
crack_if_modified_since = re.compile ('([^;]+)(; length=([0-9]+))?$', re.IGNORECASE)
def handle_request (self, request):
path = request.path
filename = os.path.join (self.doc_root, path[1:])
if request.method not in ('get', 'head'):
request.error (405)
return
if os.path.isdir (filename):
filename = os.path.join (filename, 'index.html')
if not os.path.isfile (filename):
request.error (404)
else:
stat_info = os.stat (filename)
mtime = stat_info[stat.ST_MTIME]
file_length = stat_info[stat.ST_SIZE]
ims = request['if-modified-since']
if ims:
length_match = 1
m = self.crack_if_modified_since.match (ims)
if m:
length = m.group (3)
if length:
if int(length) != file_length:
length_match = 0
ims_date = http_date.parse_http_date (m.group(1))
if length_match and ims_date:
if mtime <= ims_date:
request.error (304)
return
ftype, fencoding = mimetypes.guess_type (filename)
request['Content-Type'] = ftype or 'text/plain'
request['Last-Modified'] = build_http_date (mtime)
# Note: these are blocking file operations.
if request.method == 'get':
f = open (filename, 'rb')
block = f.read (self.block_size)
if not block:
request.error (204) # no content
else:
while 1:
request.push (block)
block = f.read (self.block_size)
if not block:
break
elif request.method == 'head':
pass
else:
# should be impossible
request.error (405)
sample = (
'AAABAAEAICAQAAEABADoAgAAFgAAACgAAAAgAAAAQAAAAAEABAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAD/+K///8AH//+iI///QAH//r4g//x3AH//Z6J//UABP/ovgD/458Ef+u+wv/Tn'
'0R/+79if9OXZH/6gCJ/2BwAf/u/8n/h33R/7Z7kf/ReQH/+qu7//BUW//7vrv//RR3//7r///80d'
'///pq///8EP//+rH///d9///6j///9Af/w=='
).decode ('base64')
zsample = zlib.compress (sample, 9)
last_modified = build_http_date (time.time())
class favicon_handler:
def __init__ (self, data=None):
if data is None:
self.data = zsample
else:
self.data = data
def match (self, request):
return request.path == '/favicon.ico'
def handle_request (self, request):
if request['if-modified-since']:
# if we cared, we could check that timestamp.
request.error (304)
else:
request['content-type'] = 'image/x-icon'
request['last-modified'] = last_modified
# are there browsers that don't accept deflate?
request['content-encoding'] = 'deflate'
request.push (self.data)
request.done()
# -*- Mode: Python -*-
import coro
from coro import read_stream
W = coro.write_stderr
# candidate for sync.pyx?
class latch:
"Like a CV, except without the race - if the event has already fired then wait() will return immediately."
def __init__ (self):
self.cv = coro.condition_variable()
self.done = False
def wake_all (self, args=()):
self.done = True
self.args = args
return self.cv.wake_all (args)
def wait (self):
if not self.done:
return self.cv.wait()
else:
return self.args
class http_file:
"HTTP message content, as a file-like object."
buffer_size = 8000
def __init__ (self, headers, stream):
self.streami = stream
self.done_cv = latch()
if headers.get_one ('transfer-encoding') == 'chunked':
self.streamo = read_stream.buffered_stream (self._gen_read_chunked().next)
else:
content_length = headers.get_one ('content-length')
if content_length:
self.content_length = int (content_length)
self.streamo = read_stream.buffered_stream (self._gen_read_fixed().next)
else:
raise HTTP_Protocol_Error ("no way to determine length of HTTP data")
def _gen_read_chunked (self):
"generator: decodes chunked transfer-encoding."
s = self.streami
while 1:
chunk_size = int (s.read_line()[:-2], 16)
if chunk_size == 0:
self.done_cv.wake_all()
return
else:
remain = chunk_size
while remain:
ask = min (remain, self.buffer_size)
block = s.read_exact (ask)
assert (s.read_exact (2) == '\r\n')
remain -= ask
yield block
def _gen_read_fixed (self):
"generate fixed-size blocks of content."
s = self.streami
remain = self.content_length
while remain:
ask = min (remain, self.buffer_size)
block = s.read_exact (ask)
remain -= ask
yield block
self.done_cv.wake_all()
return
# XXX implement <size> argument
def read (self, join=True):
"read the entire contents. join=False returns a generator, join=True returns a string."
r = (x for x in self.streamo.read_all())
if join:
return ''.join (r)
else:
return r
def readline (self):
"read a newline-delimited line."
if self.done_cv.done:
return ''
else:
return self.streamo.read_until ('\n')
def wait (self):
"wait until all the content has been read."
self.done_cv.wait()
class header_set:
def __init__ (self, headers=()):
self.headers = {}
for h in headers:
self.crack (h)
def from_keywords (self, kwds):
"Populate this header set from a dictionary of keyword arguments (e.g., 'content_length' becomes 'content-length')"
r = []
for k, v in kwds.items():
k = k.replace ('_', '-')
self[k] = v
return self
def crack (self, h):
"Crack one header line."
# deliberately ignoring 822 crap like continuation lines.
try:
i = h.index (': ')
name, value = h[:i], h[i+2:]
self[name] = value
except ValueError:
coro.write_stderr ('dropping bogus header %r\n' % (h,))
pass
def get_one (self, key):
"Get the value of a header expected to have at most one value. If not present, return None. If more than one, raise ValueError."
r = self.headers.get (key, None)
if r is None:
return r
elif isinstance (r, list) and len (r) > 1:
raise ValueError ("expected only one %s header, got %r" % (key, r))
else:
return r[0]
def has_key (self, key):
"Is this header present?"
return self.headers.has_key (key.lower())
def __getitem__ (self, key):
"Returns the list of values for this header, or None."
return self.headers.get (key, None)
def __setitem__ (self, name, value):
"Add a value to the header <name>."
name = name.lower()
probe = self.headers.get (name)
if probe is None:
self.headers[name] = [value]
else:
probe.append (value)
def __str__ (self):
"Render the set of headers."
r = []
for k, vl in self.headers.iteritems():
for v in vl:
r.append ('%s: %s\r\n' % (k, v))
return ''.join (r)
This diff is collapsed.
# -*- Mode: Python -*-
import coro
import time
import uuid
import sys
W = sys.stderr.write
# See: http://en.wikipedia.org/wiki/HTTP_cookie#Session_cookie
def extract_session (cookie):
parts = cookie.split (';')
for part in parts:
pair = [x.strip() for x in part.split ('=')]
if len (pair) == 2:
if pair[0] == 'session':
return pair[1]
return None
class session_handler:
def __init__ (self, name, function):
self.name = name
self.function = function
self.sessions = {}
def match (self, request):
path = request.path.split ('/')
if (len(path) > 1) and (path[1] == self.name):
return 1
else:
return 0
def find_session (self, request):
# XXX does http allow more than one cookie header?
cookie = request['cookie']
if cookie:
sid = extract_session (cookie)
return sid, self.sessions.get (sid, None)
else:
return None, None
def gen_session_id (self):
return str (uuid.uuid4())
def handle_request (self, request):
sid, fifo = self.find_session (request)
if fifo is None:
# login
fifo = coro.fifo()
fifo.push (request)
sid = self.gen_session_id()
request['set-cookie'] = 'session=%s' % (sid,)
self.sessions[sid] = fifo
coro.spawn (self.wrap, sid, fifo)
else:
fifo.push (request)
def wrap (self, sid, fifo):
try:
self.function (sid, fifo)
finally:
del self.sessions[sid]
This diff is collapsed.
...@@ -25,17 +25,18 @@ ...@@ -25,17 +25,18 @@
Introduction Introduction
============ ============
This profiler is coro-aware. It produces output to a binary file on disk. You This profiler is coro-aware. It produces output to a binary file on disk. You
then use the `coro.print_profile` module to convert it to an HTML file. then use the :mod:`coro.print_profile` module to convert it to an HTML file.
Using The Profiler Using The Profiler
================== ==================
There are two ways to run the profiler. One is to use the `go` function where There are two ways to run the profiler. One is to use the
you give it a python function to run. Profiling will start and call the :func:`coro.profiler.go` function where you give it a python function to run.
function, and then the profiler will automatically stop when the function Profiling will start and call the function, and then the profiler will
exits. automatically stop when the function exits.
The other method is to call `start` to start the profiler and `stop` when you The other method is to call :func:`coro.profiler.start` to start the profiler
want to stop profiling. This can be conveniently done from the backdoor. and :func:`coro.profiler.stop` when you want to stop profiling. This can be
conveniently done from the backdoor.
Rendering Output Rendering Output
================ ================
...@@ -49,12 +50,13 @@ Then view the profile output in your web browser. ...@@ -49,12 +50,13 @@ Then view the profile output in your web browser.
Profiler Types Profiler Types
============== ==============
The profiler supports different ways of gathering statistics. This is done by The profiler supports different ways of gathering statistics. This is done by
specifying the "bench" object to use (see `go` and `start`). They default to specifying the "bench" object to use (see :func:`go` and :func:`start`). They
the "rusage" method of gathering statistics about every function call (see the default to the "rusage" method of gathering statistics about every function
getrusage man page for more detail). If you want a higher performance profile, call (see the getrusage man page for more detail). If you want a higher
you can use the `coro.bench` object instead which simply records TSC values for performance profile, you can use the :class:`coro.bench` object instead which
every function call. If you want to define your own method of gathering simply records TSC values for every function call. If you want to define your
statistics, subclass `coro.bench` and implement your own techniques. own method of gathering statistics, subclass :class:`coro.bench` and implement
your own techniques.
""" """
...@@ -98,14 +100,12 @@ def go (fun, *args, **kwargs): ...@@ -98,14 +100,12 @@ def go (fun, *args, **kwargs):
This will display the results to stdout after the function is finished. This will display the results to stdout after the function is finished.
:Parameters: :param fun: The function to call.
- `fun`: The function to call.
:Keywords: :keyword profile_filename: The name of the file to save the profile data.
- `profile_filename`: The name of the file to save the profile data.
Defaults to '/tmp/coro_profile.bin'. Defaults to '/tmp/coro_profile.bin'.
- `profile_bench`: The bench object type to use. Defaults to :keyword profile_bench: The bench object type to use. Defaults to
`coro.rusage_bench`. :class:`coro.rusage_bench`.
""" """
if kwargs.has_key('profile_filename'): if kwargs.has_key('profile_filename'):
profile_filename = kwargs['profile_filename'] profile_filename = kwargs['profile_filename']
......
# -*- Mode: Python -*-
class socket_producer:
def __init__ (self, conn, buffer_size=8000):
self.conn = conn
self.buffer_size = buffer_size
def next (self):
return self.conn.recv (self.buffer_size)
def sock_stream (sock):
return buffered_stream (socket_producer (sock).next)
class buffered_stream:
def __init__ (self, producer):
self.producer = producer
self.buffer = ''
def gen_read_until (self, delim):
"generate pieces of input up to and including <delim>, then StopIteration"
ld = len(delim)
m = 0
while 1:
if not self.buffer:
self.buffer = self.producer()
if not self.buffer:
# eof
yield ''
return
i = 0
while i < len (self.buffer):
if self.buffer[i] == delim[m]:
m += 1
if m == ld:
result, self.buffer = self.buffer[:i+1], self.buffer[i+1:]
yield result
return
else:
m = 0
i += 1
block, self.buffer = self.buffer, ''
yield block
def gen_read_until_dfa (self, dfa):
"generate pieces of input up to and including a match on <dfa>, then StopIteration"
m = 0
while 1:
if not self.buffer:
self.buffer = self.producer()
if not self.buffer:
# eof
yield ''
return
i = 0
while i < len (self.buffer):
if dfa.consume (self.buffer[i]):
result, self.buffer = self.buffer[:i+1], self.buffer[i+1:]
yield result
return
i += 1
block, self.buffer = self.buffer, ''
yield block
def gen_read_exact (self, size):
"generate pieces of input up to <size> bytes, then StopIteration"
remain = size
while remain:
if len (self.buffer) >= remain:
result, self.buffer = self.buffer[:remain], self.buffer[remain:]
yield result
return
else:
piece, self.buffer = self.buffer, self.producer()
remain -= len (piece)
yield piece
if not self.buffer:
# eof
yield ''
return
def read_until (self, delim, join=True):
"read until <delim>. return a list of parts unless <join> is True"
result = ( x for x in self.gen_read_until (delim) )
if join:
return ''.join (result)
else:
return result
def read_exact (self, size, join=True):
"read exactly <size> bytes. return a list of parts unless <join> is True"
result = ( x for x in self.gen_read_exact (size) )
if join:
return ''.join (result)
else:
return result
def flush (self):
"flush this stream's buffer"
result, self.buffer = self.buffer, ''
return result
def read_line (self, delim='\r\n'):
"read a CRLF-delimited line from this stream"
return self.read_until (delim)
def read_all (self):
"read from self.producer until the stream terminates"
if self.buffer:
yield self.flush()
while 1:
block = self.producer()
if not block:
return
else:
yield block
This diff is collapsed.
This diff is collapsed.
...@@ -24,6 +24,9 @@ cdef class zstack: ...@@ -24,6 +24,9 @@ cdef class zstack:
if not self.buffer: if not self.buffer:
raise MemoryError raise MemoryError
self.buffer_size = size self.buffer_size = size
def __dealloc__ (self):
if self.buffer:
PyMem_Free (self.buffer)
cdef size_t deflate (self, void * base, size_t size): cdef size_t deflate (self, void * base, size_t size):
return lz4.LZ4_compress (<char*>base, self.buffer, size) return lz4.LZ4_compress (<char*>base, self.buffer, size)
cdef size_t inflate (self, void * dst, size_t dsize, void * src, size_t ssize): cdef size_t inflate (self, void * dst, size_t dsize, void * src, size_t ssize):
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -3,8 +3,7 @@ Shrapnel/Coro ...@@ -3,8 +3,7 @@ Shrapnel/Coro
============= =============
:Date: $Date: 2008/05/06 $ :Date: $Date: 2008/05/06 $
:Revision: $Revision: #1 $ :Author: Sam Rushing
:Author: Sam Rushing <rushing@ironport.com>
.. contents:: .. contents::
:depth: 2 :depth: 2
...@@ -13,6 +12,15 @@ Shrapnel/Coro ...@@ -13,6 +12,15 @@ Shrapnel/Coro
Shrapnel/Coro is a cooperative thread facility built on top of Python. Shrapnel/Coro is a cooperative thread facility built on top of Python.
.. note::
This document was originally written for internal use at
IronPort. It refers to several facilities that unfortunately have not
(yet) been open-sourced, (e.g., the dns resolver and sntp client).
It also references and describes things that are specific to the
IronPort mail appliance. Much of the advice in here is good, though,
and I hope to revisit it soon.
Threads Threads
======= =======
......
.. Shrapnel documentation master file, created by
sphinx-quickstart on Fri Apr 13 18:44:49 2012.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to Shrapnel's documentation!
====================================
Contents:
* :doc:`Installation <installation>`
* :doc:`Tutorial <tutorial>`
* :doc:`Reference Manual <ref/index>`
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import coro
import coro.backdoor
coro.spawn (coro.backdoor.serve, unix_path='/tmp/xx.bd')
coro.event_loop()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment