Commit 20ac60b7 authored by Sam Rushing's avatar Sam Rushing

Merge branch 'master' of github.com:ironport/shrapnel into stub-resolver

Conflicts:
	setup.py
parents d9cf6f14 eb0ddc4d
......@@ -797,7 +797,7 @@ cdef public void _wrap1 "_wrap1" (coro co):
# event queue
# ================================================================================
include "event_queue.pyx"
from event_queue import event_queue, __event_queue_version__
cdef class event:
cdef public uint64_t t
......@@ -907,9 +907,9 @@ cdef public class sched [ object sched_object, type sched_type ]:
cdef int stack_size
cdef public object _current, pending, staging
cdef coro _last
cdef readonly event_queue events
cdef int profiling
cdef uint64_t latency_threshold
cdef object events
def __init__ (self, stack_size=4*1024*1024):
self.stack_size = stack_size
......@@ -1108,7 +1108,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
tb = timebomb (self._current, delta)
e = event (tb.when, tb)
self.events.c_insert (e.t, e)
self.events.insert (e.t, e)
try:
try:
return PyObject_Call (function, args, kwargs)
......@@ -1121,7 +1121,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
raise
finally:
if not e.expired:
self.events.c_delete (e.t, e)
self.events.remove (e.t, e)
e.expire()
cdef sleep (self, uint64_t when):
......@@ -1134,12 +1134,12 @@ cdef public class sched [ object sched_object, type sched_type ]:
IF CORO_DEBUG:
assert self._current is not None
e = event (when, self._current)
self.events.c_insert (when, e)
self.events.insert (when, e)
try:
(<coro>self._current).__yield ()
finally:
if not e.expired:
self.events.c_delete (e.t, e)
self.events.remove (e.t, e)
e.expire()
def sleep_relative (self, delta):
......@@ -1170,10 +1170,10 @@ cdef public class sched [ object sched_object, type sched_type ]:
cdef coro c
retry = _fifo()
while self.events.len():
e = self.events.c_top(NULL)
while len(self.events):
e = self.events.top()
if e.t <= now:
self.events.c_pop(NULL)
self.events.pop()
# two kinds of event values:
# 1) a coro (for sleep_{relative,absolute}())
# 2) a timebomb (for with_timeout())
......@@ -1198,19 +1198,19 @@ cdef public class sched [ object sched_object, type sched_type ]:
# retry all the timebombs that failed due to ScheduleError
while retry.size:
e = retry._pop()
self.events.c_insert (e.t, e)
self.events.insert (e.t, e)
cdef get_timeout_to_next_event (self, int default_timeout):
cdef uint64_t delta, now
cdef event e
# default_timeout is in seconds
now = c_rdtsc()
if self.events.len():
if len(self.events):
# 1) get time to next event
while 1:
e = self.events.c_top(NULL)
e = self.events.top()
if e.expired:
self.events.c_pop(NULL)
self.events.pop()
else:
break
if e.t < now:
......
/*
Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "Python.h"
#include <map>
typedef std::multimap <unsigned long long, PyObject *> event_queue;
typedef event_queue::iterator event_queue_iter;
/* Needed because Pyrex is compiled as a C program and is expecting C-like
symbols.
*/
extern "C" {
/*
Create a new event queue.
*/
event_queue *
event_queue_new()
{
return new event_queue;
}
/*
Delete the event queue and free all data.
*/
void
event_queue_dealloc(event_queue * q)
{
event_queue_iter iter;
for (iter = q->begin(); iter != q->end(); iter++) {
Py_DECREF(iter->second);
}
delete q;
}
/*
Returns the length of the queue.
*/
int
event_queue_len(event_queue * q)
{
return q->size();
}
/*
Returns a new iterator.
*/
event_queue_iter
event_queue_new_iter(event_queue * q)
{
return q->begin();
}
/*
Return the current value of the iterator and move the position forward.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with StopIteration set.
*/
PyObject *
event_queue_iter_next(event_queue * q, event_queue_iter * iter, uint64_t * time)
{
PyObject * value;
if (*iter == q->end()) {
PyErr_SetObject(PyExc_StopIteration, NULL);
return NULL;
} else {
if (time) {
*time = (*iter)->first;
}
value = (*iter)->second;
Py_INCREF(value);
(*iter)++;
return value;
}
}
/*
Peek at the top of the queue.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with IndexError set.
*/
PyObject *
event_queue_top(event_queue * q, uint64_t * time)
{
PyObject * value;
if (q->size()) {
event_queue_iter iter = q->begin();
if (time) {
*time = iter->first;
}
value = iter->second;
Py_INCREF(value);
return value;
} else {
PyErr_SetString(PyExc_IndexError, "top of empty queue");
return NULL;
}
}
/*
Pop the first element off the queue.
The timestamp of the first element is stored in time (if not NULL). The value is returned.
Returns NULL if the queue is empty with IndexError set.
*/
PyObject *
event_queue_pop(event_queue * q, uint64_t * time)
{
PyObject * value;
if (q->size()) {
event_queue_iter iter = q->begin();
if (time) {
*time = iter->first;
}
value = iter->second;
q->erase (iter);
return value;
} else {
PyErr_SetString(PyExc_IndexError, "pop from empty queue");
return NULL;
}
}
/*
Insert a new entry into the queue.
Returns 0 on succes, -1 on failure.
(Currently never fails.)
*/
int
event_queue_insert(event_queue * q, uint64_t time, PyObject * value)
{
q->insert (std::pair <uint64_t, PyObject *> (time, value));
Py_INCREF(value);
return 0;
}
/*
Delete an entry from the queue.
Returns 0 on success, -1 on failure with IndexError set.
*/
int
event_queue_delete(event_queue * q, uint64_t time, PyObject * value)
{
event_queue_iter iter = q->find(time);
// Iterate since we support duplicate keys.
while (iter != q->end()) {
if (iter->first != time) {
break;
}
if (iter->second == value) {
Py_DECREF(iter->second);
q->erase(iter);
return 0;
} else {
iter++;
}
}
PyErr_SetString(PyExc_IndexError, "event not found");
return -1;
}
} /* extern "C" */
/*
Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef _EVENT_QUEUE_H_
#define _EVENT_QUEUE_H_
#include "Python.h"
/* Types are defined as void * to fake out Pyrex which can't handle C++ types. */
//typedef std::multimap <long long, PyObject *> event_queue;
typedef void * event_queue;
event_queue * event_queue_new(void);
void event_queue_dealloc(event_queue * q);
PyObject * event_queue_top(event_queue * q, uint64_t * time);
PyObject * event_queue_pop(event_queue * q, uint64_t * time);
int event_queue_insert(event_queue * q, uint64_t time, PyObject * value);
int event_queue_delete(event_queue * q, uint64_t time, PyObject * value);
int event_queue_len(event_queue * q);
//typedef event_queue::iterator event_queue_iter;
typedef void * event_queue_iter;
event_queue_iter event_queue_new_iter(event_queue * q);
PyObject * event_queue_iter_next(event_queue * q, event_queue_iter * iter, uint64_t * time);
#endif /* _EVENT_QUEUE_H_ */
......@@ -22,117 +22,111 @@
__event_queue_version__ = "$Id: event_queue.pyx,v 1.1 2007/01/03 00:19:50 ehuss Exp $"
cdef extern from "event_queue.h":
ctypedef void * cpp_event_queue "event_queue"
cpp_event_queue * event_queue_new()
void event_queue_dealloc(cpp_event_queue * q)
object event_queue_top(cpp_event_queue * q, uint64_t * time)
object event_queue_pop(cpp_event_queue * q, uint64_t * time)
int event_queue_insert(cpp_event_queue * q, uint64_t time, object) except -1
int event_queue_delete(cpp_event_queue * q, uint64_t time, object) except -1
int event_queue_len(cpp_event_queue * q)
ctypedef void * cpp_event_queue_iter "event_queue_iter"
cpp_event_queue_iter event_queue_new_iter(cpp_event_queue * q)
object event_queue_iter_next(cpp_event_queue * q, cpp_event_queue_iter * iter, uint64_t * time)
cdef class event_queue_iter
include "python.pxi"
from cython.operator cimport dereference as deref, preincrement as inc
from libcpp.utility cimport pair
from libc cimport uint64_t
cdef extern from "<map>" namespace "std":
cdef cppclass multimap[T, U]:
cppclass iterator:
pair[T,U]& operator*()
iterator operator++()
iterator operator--()
bint operator==(iterator)
bint operator!=(iterator)
map()
U& operator[](T&)
U& at(T&)
iterator begin()
size_t count(T&)
bint empty()
iterator end()
void erase(iterator)
void erase(iterator, iterator)
size_t erase(T&)
iterator find(T&)
pair[iterator, bint] insert(pair[T,U])
size_t size()
cdef class event_queue:
cdef cpp_event_queue * q
cdef multimap[uint64_t, PyObject*] *q
def __cinit__(self):
self.q = event_queue_new()
self.q = new multimap[uint64_t, PyObject*]()
def __dealloc__(self):
event_queue_dealloc(self.q)
cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
while it != self.q.end():
Py_DECREF(<object> deref(it).second)
inc(it)
del self.q
def __len__(self):
return event_queue_len(self.q)
cpdef insert(self, uint64_t time, value):
"""Insert a new value into the queue.
cdef int len(self):
return event_queue_len(self.q)
:Parameters:
- `time`: The uint64 time.
- `value`: The value to insert.
"""
cdef pair[uint64_t, PyObject*] p
p.first, p.second = time, <PyObject *> value
self.q.insert(p)
Py_INCREF(value)
cdef c_top(self, uint64_t * time):
return event_queue_top(self.q, time)
def __len__(self):
return self.q.size()
def top(self):
cpdef top(self):
"""Peek at the top value of the queue.
:Return:
Returns a ``(time, value)`` tuple from the top of the queue.
Returns value from the top of the queue.
:Exceptions:
- `IndexError`: The queue is empty.
"""
cdef uint64_t time
if not self.q.size():
raise IndexError('Top of empty queue')
cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
return <object> deref(it).second
value = event_queue_top(self.q, &time)
return (time, value)
cdef c_pop(self, uint64_t * time):
return event_queue_pop(self.q, time)
def pop(self):
cpdef pop(self):
"""Grab the top value of the queue and remove it.
:Return:
Returns a ``(time, value)`` tuple from the top of the queue.
Returns value from the top of the queue.
:Exceptions:
- `IndexError`: The queue is empty.
"""
cdef uint64_t time
value = event_queue_pop(self.q, &time)
return (time, value)
cdef c_insert(self, uint64_t time, value):
event_queue_insert(self.q, time, value)
def insert(self, uint64_t time, value):
"""Insert a new value into the queue.
:Parameters:
- `time`: The uint64 time.
- `value`: The value to insert.
"""
event_queue_insert(self.q, time, value)
cdef c_delete(self, uint64_t time, value):
event_queue_delete(self.q, time, value)
def delete(self, uint64_t time, value):
if not self.q.size():
raise IndexError('Top of empty queue')
cdef multimap[uint64_t, PyObject*].iterator it = self.q.begin()
value = <object> deref(it).second
self.q.erase(it)
Py_DECREF(value)
return value
cpdef remove(self, uint64_t time, value):
"""Delete a value from the queue.
:Parameters:
- `time`: The uint64 time.
- `value`: The value to delete.
"""
event_queue_delete(self.q, time, value)
def __iter__(self):
cdef event_queue_iter i
i = event_queue_iter()
i.q = self.q
i.iter = event_queue_new_iter(self.q)
return i
cdef class event_queue_iter:
cdef cpp_event_queue * q
cdef cpp_event_queue_iter iter
def __iter__(self):
return self
def __next__(self):
cdef uint64_t time
value = event_queue_iter_next(self.q, &self.iter, &time)
return (time, value)
cdef PyObject *val
cdef multimap[uint64_t, PyObject*].iterator it = self.q.find(time)
cdef PyObject *v = <PyObject *> value
while it != self.q.end():
if deref(it).first != time:
break
val = <PyObject *> deref(it).second
if v == val:
self.q.erase(it)
Py_DECREF(<object>val)
return 0
else:
inc(it)
raise IndexError('Event not found')
......@@ -2,6 +2,7 @@
from server import server, tlslite_server
import handlers
import read_stream
import coro
from coro import read_stream
import http_date
import session_handler
# -*- Mode: Python -*-
import coro
import coro.read_stream
from protocol import http_file, header_set, latch
W = coro.write_stderr
class HTTP_Protocol_Error (Exception):
pass
# viewed at its core, HTTP is a two-way exchange of messages,
# some of which may have content associated with them.
# two different usage patterns for pipelined requests:
# 1) requests are made by different threads
# 2) requests are made by a single thread
#
# we accommodate both patterns here.
# for #2, use the lower-level send_request() method, for #1,
# use GET, PUT, etc...
class request:
def __init__ (self, force=True):
self.latch = latch()
self.force = force
self.content = None
self.response = None
self.rheader = None
self.rfile = None
def wake (self):
if self.rfile and self.force:
self.content = self.rfile.read()
self.latch.wake_all()
if self.rfile and not self.force:
self.rfile.wait()
def wait (self):
return self.latch.wait()
class client:
def __init__ (self, host, port=80, conn=None, inflight=100):
self.host = host
self.inflight = coro.semaphore (inflight)
if conn is None:
self.conn = coro.tcp_sock()
self.conn.connect ((host, port))
else:
self.conn = conn
self.stream = coro.read_stream.sock_stream (self.conn)
self.pending = coro.fifo()
coro.spawn (self.read_thread)
def read_thread (self):
while 1:
req = self.pending.pop()
self._read_message (req)
if not req.response:
break
else:
req.wake()
def _read_message (self, req):
req.response = self.stream.read_line()[:-2]
lines = []
while 1:
line = self.stream.read_line()
if not line:
raise HTTP_Protocol_Error ('unexpected close')
elif line == '\r\n':
break
else:
lines.append (line[:-2])
req.rheader = h = header_set (lines)
if h['content-length'] or h['transfer-encoding']:
req.rfile = http_file (h, self.stream)
def send_request (self, method, uri, headers, content=None, force=False):
try:
self.inflight.acquire (1)
req = request (force)
self._send_request (method, uri, headers, content)
self.pending.push (req)
return req
finally:
self.inflight.release (1)
def _send_request (self, method, uri, headers, content):
if not headers.has_key ('host'):
headers['host'] = self.host
if content:
if type(content) is str:
headers['content-length'] = len(content)
elif not headers.has_key ('content-length'):
headers['transfer-encoding'] = 'chunked'
req = (
'%s %s HTTP/1.1\r\n'
'%s\r\n' % (method, uri, headers)
)
self.conn.send (req)
# XXX 100 continue
if content:
if type(content) is str:
self.conn.send (content)
elif headers.has_key ('content-length'):
clen = int (headers.get_one ('content-length'))
slen = 0
for block in content:
self.conn.send (block)
slen += len(block)
if slen > clen:
raise HTTP_Protocol_Error ("content larger than declared length", clen, slen)
else:
if slen != clen:
raise HTTP_Protocol_Error ("content smaller than declared length", clen, slen)
else:
# chunked encoding
for block in content:
if block:
self.conn.writev (['%x\r\n' % (len (block),), block])
self.conn.send ('0\r\n')
def GET (self, uri, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('GET', uri, headers, force=True)
req.wait()
return req
def GET_file (self, uri, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('GET', uri, headers, force=False)
req.wait()
return req
def PUT (self, uri, content, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('PUT', uri, headers, content, force=True)
req.wait()
return req
def POST (self, uri, content, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('POST', uri, headers, content, force=True)
req.wait()
return req
# -*- Mode: Python -*-
import coro
import coro.httpd
import coro.http
import backdoor
# toy: move an X through a grid.
......@@ -66,14 +66,14 @@ class grid_handler:
'<input type="submit" name="dir" value="up" />'
'<input type="submit" name="dir" value="down" />'
'</form>'
'<a href="grid/source">source for this handler</a>'
'<a href="/grid/source">source for this handler</a>'
)
request.done()
server = coro.httpd.server()
server = coro.http.server()
server.push_handler (grid_handler (50, 30))
server.push_handler (coro.httpd.handlers.coro_status_handler())
server.push_handler (coro.httpd.handlers.favicon_handler())
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9001))
coro.spawn (backdoor.serve, unix_path='/tmp/httpd.bd')
coro.event_loop (30.0)
# -*- Mode: Python -*-
import coro
import coro.httpd
import coro.http
import backdoor
# demonstrate the session handler
......@@ -36,10 +36,10 @@ def session (sid, fifo):
request.done()
i += 1
server = coro.httpd.server()
server.push_handler (coro.httpd.handlers.coro_status_handler())
server.push_handler (coro.httpd.session_handler.session_handler ('session', session))
server.push_handler (coro.httpd.handlers.favicon_handler())
server = coro.http.server()
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.session_handler.session_handler ('session', session))
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9001))
coro.spawn (backdoor.serve, unix_path='/tmp/httpd.bd')
coro.event_loop (30.0)
# -*- Mode: Python -*-
import coro
W = coro.write_stderr
from coro.httpd.client import client as http_client
def t0():
c = http_client ('127.0.0.1', 80)
l = [ c.send_request ('GET', '/postgresql/html/', {}, content=None, force=True) for x in range (10) ]
for req in l:
req.wait()
W ('%s\n' % (req.response,))
def t1():
c = http_client ('127.0.0.1', 80)
rl = coro.in_parallel ([(c.GET, ('/postgresql/html/',))] * 10)
for x in rl:
W ('%s\n' % (x.response,))
return rl
if __name__ == '__main__':
import coro.backdoor
coro.spawn (t0)
coro.spawn (coro.backdoor.serve, unix_path='/tmp/xx.bd')
coro.event_loop()
......@@ -3,15 +3,18 @@
# demo an https server using the TLSLite package.
import coro
import coro.httpd
import coro.http
import coro.backdoor
# -----------------------------------------------------------------------
# --- change the location of the chain and key files on the next line ---
# -----------------------------------------------------------------------
server = coro.httpd.tlslite_server ('cert/server.crt', 'cert/server.key')
server.push_handler (coro.httpd.handlers.coro_status_handler())
server.push_handler (coro.httpd.handlers.favicon_handler())
server = coro.http.tlslite_server (
'cert/server.crt',
'cert/server.key',
)
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9443))
coro.spawn (coro.backdoor.serve, unix_path='/tmp/httpsd.bd')
coro.event_loop (30.0)
......@@ -7,7 +7,7 @@ import sys
import time
import zlib
from coro.httpd.http_date import build_http_date
from coro.http.http_date import build_http_date
W = sys.stderr.write
......
# -*- Mode: Python -*-
import coro
from coro import read_stream
W = coro.write_stderr
# candidate for sync.pyx?
class latch:
"Like a CV, except without the race - if the event has already fired then wait() will return immediately."
def __init__ (self):
self.cv = coro.condition_variable()
self.done = False
def wake_all (self, args=()):
self.done = True
self.args = args
return self.cv.wake_all (args)
def wait (self):
if not self.done:
return self.cv.wait()
else:
return self.args
class http_file:
"HTTP message content, as a file-like object."
buffer_size = 8000
def __init__ (self, headers, stream):
self.streami = stream
self.done_cv = latch()
if headers.get_one ('transfer-encoding') == 'chunked':
self.streamo = read_stream.buffered_stream (self._gen_read_chunked().next)
else:
content_length = headers.get_one ('content-length')
if content_length:
self.content_length = int (content_length)
self.streamo = read_stream.buffered_stream (self._gen_read_fixed().next)
else:
raise HTTP_Protocol_Error ("no way to determine length of HTTP data")
def _gen_read_chunked (self):
"generator: decodes chunked transfer-encoding."
s = self.streami
while 1:
chunk_size = int (s.read_line()[:-2], 16)
if chunk_size == 0:
self.done_cv.wake_all()
return
else:
remain = chunk_size
while remain:
ask = min (remain, self.buffer_size)
block = s.read_exact (ask)
assert (s.read_exact (2) == '\r\n')
remain -= ask
yield block
def _gen_read_fixed (self):
"generate fixed-size blocks of content."
s = self.streami
remain = self.content_length
while remain:
ask = min (remain, self.buffer_size)
block = s.read_exact (ask)
remain -= ask
yield block
self.done_cv.wake_all()
return
# XXX implement <size> argument
def read (self, join=True):
"read the entire contents. join=False returns a generator, join=True returns a string."
r = (x for x in self.streamo.read_all())
if join:
return ''.join (r)
else:
return r
def readline (self):
"read a newline-delimited line."
if self.done_cv.done:
return ''
else:
return self.streamo.read_until ('\n')
def wait (self):
"wait until all the content has been read."
self.done_cv.wait()
class header_set:
def __init__ (self, headers=()):
self.headers = {}
for h in headers:
self.crack (h)
def from_keywords (self, kwds):
"Populate this header set from a dictionary of keyword arguments (e.g., 'content_length' becomes 'content-length')"
r = []
for k, v in kwds.items():
k = k.replace ('_', '-')
self[k] = v
return self
def crack (self, h):
"Crack one header line."
# deliberately ignoring 822 crap like continuation lines.
try:
i = h.index (': ')
name, value = h[:i], h[i+2:]
self[name] = value
except ValueError:
coro.write_stderr ('dropping bogus header %r\n' % (h,))
pass
def get_one (self, key):
"Get the value of a header expected to have at most one value. If not present, return None. If more than one, raise ValueError."
r = self.headers.get (key, None)
if r is None:
return r
elif isinstance (r, list) and len (r) > 1:
raise ValueError ("expected only one %s header, got %r" % (key, r))
else:
return r[0]
def has_key (self, key):
"Is this header present?"
return self.headers.has_key (key.lower())
def __getitem__ (self, key):
"Returns the list of values for this header, or None."
return self.headers.get (key, None)
def __setitem__ (self, name, value):
"Add a value to the header <name>."
name = name.lower()
probe = self.headers.get (name)
if probe is None:
self.headers[name] = [value]
else:
probe.append (value)
def __str__ (self):
"Render the set of headers."
r = []
for k, vl in self.headers.iteritems():
for v in vl:
r.append ('%s: %s\r\n' % (k, v))
return ''.join (r)
......@@ -9,13 +9,15 @@ import http_date
import mimetypes
import os
import re
import read_stream
from coro import read_stream
import socket
import stat
import sys
import time
import zlib
from protocol import latch, http_file, header_set
W = sys.stderr.write
__version__ = '0.1'
......@@ -36,6 +38,7 @@ class request_stream:
lines = []
while 1:
line = self.stream.read_line()
# XXX handle continuation lines
if line == '':
raise StopIteration
elif line == '\r\n':
......@@ -57,20 +60,6 @@ class request_stream:
# [it might have a body]
request.wait_until_read()
class latch:
def __init__ (self):
self.cv = coro.condition_variable()
self.done = False
def wake_all (self):
self.done = True
self.cv.wake_all()
def wait (self):
if not self.done:
self.cv.wait()
class connection:
def __init__ (self, server):
......@@ -119,44 +108,6 @@ class connection:
def close (self):
self.conn.close()
class header_set:
def __init__ (self, headers=()):
self.headers = {}
for h in headers:
self.crack (h)
def crack (self, h):
i = h.index (': ')
name, value = h[:i], h[i+2:]
self[name] = value
def get_one (self, key):
r = self.headers.get (key, None)
if r is None:
return r
elif isinstance (r, list) and len (r) > 1:
raise ValueError ("expected only one %s header, got %r" % (key, r))
else:
return r[0]
def __getitem__ (self, key):
return self.headers.get (key, None)
def __setitem__ (self, name, value):
name = name.lower()
probe = self.headers.get (name)
if probe is None:
self.headers[name] = [value]
else:
probe.append (value)
def __str__ (self):
r = []
for k, vl in self.headers.iteritems():
for v in vl:
r.append ('%s: %s\r\n' % (k, v))
return ''.join (r)
class http_request:
request_count = 0
......@@ -203,7 +154,7 @@ class http_request:
self.version = "1.0"
self.bad = True
if self.has_body():
self.file = http_file (self)
self.file = http_file (headers, client.stream)
def wait_until_read (self):
"wait until this entire request body has been read"
......@@ -216,7 +167,22 @@ class http_request:
self.done_cv.wait()
def has_body (self):
return self['content-length'] or self['transfer-encoding']
if self.request_headers.has_key ('transfer-encoding'):
# 4.4 ignore any content-length
return True
else:
probe = self.request_headers.get_one ('content-length')
if probe:
try:
size = int (probe)
if size == 0:
return False
elif size > 0:
return True
else:
return False
except ValueError:
return False
def can_deflate (self):
acc_enc = self.request_headers.get_one ('accept-encoding')
......@@ -448,85 +414,22 @@ class buffered_output:
if self.chunk_index >= 0:
self.chunk_len += len (data)
if self.len >= self.size:
try:
self.sent += self.conn.writev (self.get_data())
except AttributeError:
# underlying socket may not support writev (e.g., tlslite)
self.sent += self.conn.send (''.join (data))
self.send (self.get_data())
def flush (self):
"Flush the data from this buffer."
data = self.get_data()
if self.chunk_index >= 0:
data.append ('0\r\n\r\n')
self.send (data)
def send (self, data):
try:
self.sent += self.conn.writev (data)
except AttributeError:
# underlying socket may not support writev (e.g., tlslite)
self.sent += self.conn.send (''.join (data))
class http_file:
buffer_size = 8000
def __init__ (self, request):
self.request = request
self.streami = request.client.stream
self.done_cv = latch()
if self.request['transfer-encoding'] == 'chunked':
self.streamo = read_stream.buffered_stream (self._read_chunked().next)
else:
content_length = self.request['content-length']
if content_length:
self.content_length = int (content_length)
self.streamo = read_stream.buffered_stream (self._read_fixed().next)
else:
raise HTTP_Protocol_Error ("no way to determine length of request data")
def _read_chunked (self):
s = self.streami
while 1:
chunk_size = int (s.read_line()[:-2], 16)
if chunk_size == 0:
W ('chunked wake\n')
self.done_cv.wake_all()
return
else:
remain = chunk_size
while remain:
ask = min (remain, self.buffer_size)
block = s.read_exact (ask)
assert (s.read_exact (2) == '\r\n')
remain -= ask
yield block
def _read_fixed (self):
s = self.streami
remain = self.content_length
while remain:
ask = min (remain, self.buffer_size)
block = s.read_exact (ask)
remain -= ask
yield block
self.done_cv.wake_all()
return
# XXX implement <size> argument
def read (self, join=True):
r = []
for x in self.streamo.read_all():
r.append (x)
if join:
return ''.join (r)
else:
return r
def readline (self):
if self.request.body_done:
return ''
else:
return self.streamo.read_until ('\n')
class server:
client_timeout = 30
......@@ -611,36 +514,36 @@ class tlslite_server (server):
"https server using the tlslite package"
def __init__ (self, cert_path, key_path):
server.__init__ (self)
server.__init__ (self)
self.cert_path = cert_path
self.key_path = key_path
self.key_path = key_path
self.read_chain()
self.read_private()
self.read_private()
def accept (self):
import tlslite
conn0, addr = server.accept (self)
import tlslite
conn0, addr = server.accept (self)
conn = tlslite.TLSConnection (conn0)
conn.handshakeServer (certChain=self.chain, privateKey=self.private)
return conn, addr
return conn, addr
def read_chain (self):
"cert chain is all in one file, in LEAF -> ROOT order"
import tlslite
delim = '-----END CERTIFICATE-----\n'
data = open (self.cert_path).read()
"cert chain is all in one file, in LEAF -> ROOT order"
import tlslite
delim = '-----END CERTIFICATE-----\n'
data = open (self.cert_path).read()
certs = data.split (delim)
chain = []
chain = []
for cert in certs:
if cert:
x = tlslite.X509()
x.parse (cert + delim)
if cert:
x = tlslite.X509()
x.parse (cert + delim)
chain.append (x)
self.chain = tlslite.X509CertChain (chain)
self.chain = tlslite.X509CertChain (chain)
def read_private (self):
import tlslite
self.private = tlslite.parsePEMKey (
self.private = tlslite.parsePEMKey (
open (self.key_path).read(),
private=True
)
# -*- Mode: Python -*-
import sys
W = sys.stderr.write
class socket_producer:
def __init__ (self, conn, buffer_size=8000):
self.conn = conn
......
This diff is collapsed.
# -*- Mode: Python; tab-width: 4 -*-
import re
import string
import time
def concat (*args):
return ''.join (args)
def join (seq, field=' '):
return field.join (seq)
def group (s):
return '(' + s + ')'
short_days = ['sun','mon','tue','wed','thu','fri','sat']
long_days = ['sunday','monday','tuesday','wednesday','thursday','friday','saturday']
short_day_reg = group (join (short_days, '|'))
long_day_reg = group (join (long_days, '|'))
daymap = {}
for i in range(7):
daymap[short_days[i]] = i
daymap[long_days[i]] = i
hms_reg = join (3 * [group('[0-9][0-9]')], ':')
months = ['jan','feb','mar','apr','may','jun','jul','aug','sep','oct','nov','dec']
monmap = {}
for i in range(12):
monmap[months[i]] = i+1
months_reg = group (join (months, '|'))
# From draft-ietf-http-v11-spec-07.txt/3.3.1
# Sun, 06 Nov 1994 08:49:37 GMT ; RFC 822, updated by RFC 1123
# Sunday, 06-Nov-94 08:49:37 GMT ; RFC 850, obsoleted by RFC 1036
# Sun Nov 6 08:49:37 1994 ; ANSI C's asctime() format
# rfc822 format
rfc822_date = join (
[concat (short_day_reg,','), # day
group('[0-9][0-9]?'), # date
months_reg, # month
group('[0-9]+'), # year
hms_reg, # hour minute second
'gmt'
],
' '
)
rfc822_reg = re.compile (rfc822_date)
def unpack_rfc822 (m):
g = m.group
a = string.atoi
return (
a(g(4)), # year
monmap[g(3)], # month
a(g(2)), # day
a(g(5)), # hour
a(g(6)), # minute
a(g(7)), # second
0,
0,
0
)
# rfc850 format
rfc850_date = join (
[concat (long_day_reg,','),
join (
[group ('[0-9][0-9]?'),
months_reg,
group ('[0-9]+')
],
'-'
),
hms_reg,
'gmt'
],
' '
)
rfc850_reg = re.compile (rfc850_date)
# they actually unpack the same way
def unpack_rfc850 (m):
g = m.group
a = string.atoi
return (
a(g(4)), # year
monmap[g(3)], # month
a(g(2)), # day
a(g(5)), # hour
a(g(6)), # minute
a(g(7)), # second
0,
0,
0
)
# parsdate.parsedate - ~700/sec.
# parse_http_date - ~1333/sec.
weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
monthname = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def build_http_date(when):
year, month, day, hh, mm, ss, wd, y, z = time.gmtime(when)
return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
weekdayname[wd],
day, monthname[month], year,
hh, mm, ss)
def parse_http_date (d):
d = string.lower (d)
tz = time.timezone
m = rfc850_reg.match (d)
try:
if m and m.end() == len(d):
retval = int (time.mktime (unpack_rfc850(m)) - tz)
else:
m = rfc822_reg.match (d)
if m and m.end() == len(d):
retval = int (time.mktime (unpack_rfc822(m)) - tz)
else:
return 0
return retval
except:
# problem in unpack or mktime failed
return 0
# -*- Python -*-
# Converted by ./convert_mime_type_table.py from:
# /usr/src2/apache_1.2b6/conf/mime.types
#
content_type_map = \
{
'ai': 'application/postscript',
'aif': 'audio/x-aiff',
'aifc': 'audio/x-aiff',
'aiff': 'audio/x-aiff',
'au': 'audio/basic',
'avi': 'video/x-msvideo',
'bcpio': 'application/x-bcpio',
'bin': 'application/octet-stream',
'cdf': 'application/x-netcdf',
'class': 'application/octet-stream',
'cpio': 'application/x-cpio',
'cpt': 'application/mac-compactpro',
'csh': 'application/x-csh',
'dcr': 'application/x-director',
'dir': 'application/x-director',
'dms': 'application/octet-stream',
'doc': 'application/msword',
'dvi': 'application/x-dvi',
'dxr': 'application/x-director',
'eps': 'application/postscript',
'etx': 'text/x-setext',
'exe': 'application/octet-stream',
'gif': 'image/gif',
'gtar': 'application/x-gtar',
'gz': 'application/x-gzip',
'hdf': 'application/x-hdf',
'hqx': 'application/mac-binhex40',
'htm': 'text/html',
'html': 'text/html',
'ice': 'x-conference/x-cooltalk',
'ief': 'image/ief',
'jpe': 'image/jpeg',
'jpeg': 'image/jpeg',
'jpg': 'image/jpeg',
'kar': 'audio/midi',
'latex': 'application/x-latex',
'lha': 'application/octet-stream',
'lzh': 'application/octet-stream',
'man': 'application/x-troff-man',
'me': 'application/x-troff-me',
'mid': 'audio/midi',
'midi': 'audio/midi',
'mif': 'application/x-mif',
'mov': 'video/quicktime',
'movie': 'video/x-sgi-movie',
'mp2': 'audio/mpeg',
'mpe': 'video/mpeg',
'mpeg': 'video/mpeg',
'mpg': 'video/mpeg',
'mpga': 'audio/mpeg',
'mp3': 'audio/mpeg',
'ms': 'application/x-troff-ms',
'nc': 'application/x-netcdf',
'oda': 'application/oda',
'pbm': 'image/x-portable-bitmap',
'pdb': 'chemical/x-pdb',
'pdf': 'application/pdf',
'pgm': 'image/x-portable-graymap',
'png': 'image/png',
'pnm': 'image/x-portable-anymap',
'ppm': 'image/x-portable-pixmap',
'ppt': 'application/powerpoint',
'ps': 'application/postscript',
'qt': 'video/quicktime',
'ra': 'audio/x-realaudio',
'ram': 'audio/x-pn-realaudio',
'ras': 'image/x-cmu-raster',
'rgb': 'image/x-rgb',
'roff': 'application/x-troff',
'rpm': 'audio/x-pn-realaudio-plugin',
'rtf': 'application/rtf',
'rtx': 'text/richtext',
'sgm': 'text/x-sgml',
'sgml': 'text/x-sgml',
'sh': 'application/x-sh',
'shar': 'application/x-shar',
'sit': 'application/x-stuffit',
'skd': 'application/x-koan',
'skm': 'application/x-koan',
'skp': 'application/x-koan',
'skt': 'application/x-koan',
'snd': 'audio/basic',
'src': 'application/x-wais-source',
'sv4cpio': 'application/x-sv4cpio',
'sv4crc': 'application/x-sv4crc',
't': 'application/x-troff',
'tar': 'application/x-tar',
'tcl': 'application/x-tcl',
'tex': 'application/x-tex',
'texi': 'application/x-texinfo',
'texinfo': 'application/x-texinfo',
'tif': 'image/tiff',
'tiff': 'image/tiff',
'tr': 'application/x-troff',
'tsv': 'text/tab-separated-values',
'txt': 'text/plain',
'ustar': 'application/x-ustar',
'vcd': 'application/x-cdlink',
'vrml': 'x-world/x-vrml',
'wav': 'audio/x-wav',
'wrl': 'x-world/x-vrml',
'xbm': 'image/x-xbitmap',
'xpm': 'image/x-xpixmap',
'xwd': 'image/x-xwindowdump',
'xyz': 'chemical/x-pdb',
'zip': 'application/zip',
}
This diff is collapsed.
# -*- Mode: Python; tab-width: 4 -*-
import coro
import string
import re
import time
h_re = re.compile (r'([^: ]+): (.*)')
def get_header (header, headers):
for h in headers:
m = h_re.match (h)
if m:
name, value = m.groups()
if string.lower (name) == header:
return value
return None
def extract_session (cookie):
parts = string.split (cookie, ';')
for part in parts:
pair = string.split (part, '=')
if len(pair) == 2:
if pair[0] == 'session':
return pair[1]
return None
class session_handler:
def __init__ (self, name, function):
self.name = name
self.function = function
self.sessions = {}
def match (self, request):
path = string.split (request._path, '/')
if (len(path) > 1) and (path[1] == self.name):
return 1
else:
return 0
def get_next_request (self):
return coro._yield()
def find_session (self, request):
cookie = get_header ('cookie', request._request_headers)
if cookie:
sid = extract_session (cookie)
return sid, self.sessions.get (sid, None)
else:
return None, None
def gen_session_id (self):
import random
import sys
sid = None
while self.sessions.has_key (sid):
n = random.randint (0,sys.maxint-1)
sid = hex(n)[2:]
return sid
expires_delta = 100 * 86400
def handle_request (self, request):
sid, c = self.find_session (request)
# The sid=='None' test is temporary hack, can probably remove it
if (not sid) or (sid=='None'):
sid = self.gen_session_id()
if c and c.isAlive():
# is c already running?
# hack, must grok this
coro.schedule (c, request)
request._done = 1
else:
# login
c = coro.new (self.function, self, request, sid)
# Wdy, DD-Mon-YYYY HH:MM:SS GMT
expires = time.strftime ('%a, %d-%b-%Y 00:00:00 GMT', time.gmtime (int (time.time()) + self.expires_delta))
request['Set-Cookie'] = 'session=%s; path=/; expires=%s' % (sid, expires)
# hack, must grok this
request._done = 1
c.start()
This diff is collapsed.
......@@ -3,8 +3,7 @@ Shrapnel/Coro
=============
:Date: $Date: 2008/05/06 $
:Revision: $Revision: #1 $
:Author: Sam Rushing <rushing@ironport.com>
:Author: Sam Rushing
.. contents::
:depth: 2
......@@ -13,6 +12,15 @@ Shrapnel/Coro
Shrapnel/Coro is a cooperative thread facility built on top of Python.
.. note::
This document was originally written for internal use at
IronPort. It refers to several facilities that unfortunately have not
(yet) been open-sourced, (e.g., the dns resolver and sntp client).
It also references and describes things that are specific to the
IronPort mail appliance. Much of the advice in here is good, though,
and I hope to revisit it soon.
Threads
=======
......
# $Header: //prod/main/ap/shrapnel/setup.py#17 $
#!/usr/bin/env python
from distutils.core import setup
from Cython.Distutils import build_ext
from Cython.Distutils.extension import Extension
#from Cython.Distutils import build_ext
#from Cython.Distutils.extension import Extension
import sys
import glob
import os
from distribute_setup import use_setuptools
use_setuptools()
from setuptools import setup
try:
from Cython.Distutils import build_ext
from Cython.Distutils.extension import Extension
except ImportError:
sys.stderr.write (
'\nThe Cython compiler is required to build Shrapnel.\n'
' Try "pip install cython"\n'
' *or* "easy_install cython"\n'
)
sys.exit (-1)
include_dir = os.getcwd()
def newer(x, y):
......@@ -39,16 +49,25 @@ def check_lio():
setup (
name='coro',
version='1.0.0-000',
version='1.0.2-000',
description='IronPort Coroutine/Threading Library',
author='Sam Rushing, Eric Huss, IronPort Engineering',
author_email='sam-coro@rushing.nightmare.com',
license = "MIT",
url = "http://github.com/ironport/shrapnel",
ext_modules = [
Extension(
'coro.event_queue',
['coro/event_queue.pyx'],
language='c++',
depends=[os.path.join(include_dir, 'pyrex', 'python.pxi'),],
pyrex_include_dirs=[
os.path.join(include_dir, '.'),
os.path.join(include_dir, 'pyrex'),
],),
Extension (
'coro._coro',
['coro/_coro.pyx', 'coro/swap.c', 'coro/event_queue.cc'],
['coro/_coro.pyx', 'coro/swap.c'],
extra_compile_args = ['-Wno-unused-function'],
depends=(glob.glob('coro/*.pyx') +
glob.glob('coro/*.pxi') +
......@@ -90,14 +109,15 @@ setup (
],
),
],
packages=['coro', 'coro.clocks', 'coro.dns'],
packages=['coro', 'coro.clocks', 'coro.http', 'coro.dns'],
package_dir = {
'': 'coroutine',
'coro': 'coro',
'coro.clocks': 'coro/clocks',
'coro.dns': 'coro/dns',
},
py_modules = ['backdoor', 'coro_process', 'coro_unittest'],
install_requires = ['cython>=0.12.1', 'pyrex>=0.9.8.6'],
py_modules = ['backdoor', 'coro.read_stream', 'coro_process', 'coro_unittest',],
download_url = 'http://github.com/ironport/shrapnel/tarball/master#egg=coro-1.0.2',
install_requires = ['Cython>=0.12.1', 'distribute>=0.6.16'],
cmdclass={'build_ext': build_ext},
)
# Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Unittests for event queue wrapper."""
__version__ = '$Revision: #1 $'
import unittest
import coro
import coro_unittest
class Test(unittest.TestCase):
def setUp(self):
self.q = coro.event_queue()
def test_insert(self):
data = [(3, "3"), (2, "21"), (1, "1"), (2, "22")]
res = ["1", "21", "22", "3"]
for i in data:
self.q.insert(*i)
self.assertEquals(len(data), len(self.q))
for j in res:
self.assertEquals(self.q.top(), j)
self.assertEquals(self.q.pop(), j)
def test_remove(self):
data = [(3, "3"), (2, "21"), (1, "1"), (2, "22")]
for i in data:
self.q.insert(*i)
self.assertRaises(IndexError, self.q.remove, 1, "2")
self.assertRaises(IndexError, self.q.remove, 10, "2")
for i in data:
self.q.remove(*i)
self.assertEquals(0, len(self.q))
def test_empty(self):
self.assertRaises(IndexError, self.q.top)
self.assertRaises(IndexError, self.q.pop)
self.assertRaises(IndexError, self.q.remove, 1, "2")
if __name__ == '__main__':
coro_unittest.run_tests()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment