Commit eb0ddc4d authored by Sam Rushing's avatar Sam Rushing

Merge branch 'new-httpd'

Conflicts:
	setup.py
parents 3b44ab48 d4379310
# -*- Mode: Python -*-
from server import server, tlslite_server
import handlers
import coro
from coro import read_stream
import http_date
import session_handler
# -*- Mode: Python -*-
import coro
import coro.read_stream
from protocol import http_file, header_set, latch
W = coro.write_stderr
class HTTP_Protocol_Error (Exception):
pass
# viewed at its core, HTTP is a two-way exchange of messages,
# some of which may have content associated with them.
# two different usage patterns for pipelined requests:
# 1) requests are made by different threads
# 2) requests are made by a single thread
#
# we accommodate both patterns here.
# for #2, use the lower-level send_request() method, for #1,
# use GET, PUT, etc...
class request:
def __init__ (self, force=True):
self.latch = latch()
self.force = force
self.content = None
self.response = None
self.rheader = None
self.rfile = None
def wake (self):
if self.rfile and self.force:
self.content = self.rfile.read()
self.latch.wake_all()
if self.rfile and not self.force:
self.rfile.wait()
def wait (self):
return self.latch.wait()
class client:
def __init__ (self, host, port=80, conn=None, inflight=100):
self.host = host
self.inflight = coro.semaphore (inflight)
if conn is None:
self.conn = coro.tcp_sock()
self.conn.connect ((host, port))
else:
self.conn = conn
self.stream = coro.read_stream.sock_stream (self.conn)
self.pending = coro.fifo()
coro.spawn (self.read_thread)
def read_thread (self):
while 1:
req = self.pending.pop()
self._read_message (req)
if not req.response:
break
else:
req.wake()
def _read_message (self, req):
req.response = self.stream.read_line()[:-2]
lines = []
while 1:
line = self.stream.read_line()
if not line:
raise HTTP_Protocol_Error ('unexpected close')
elif line == '\r\n':
break
else:
lines.append (line[:-2])
req.rheader = h = header_set (lines)
if h['content-length'] or h['transfer-encoding']:
req.rfile = http_file (h, self.stream)
def send_request (self, method, uri, headers, content=None, force=False):
try:
self.inflight.acquire (1)
req = request (force)
self._send_request (method, uri, headers, content)
self.pending.push (req)
return req
finally:
self.inflight.release (1)
def _send_request (self, method, uri, headers, content):
if not headers.has_key ('host'):
headers['host'] = self.host
if content:
if type(content) is str:
headers['content-length'] = len(content)
elif not headers.has_key ('content-length'):
headers['transfer-encoding'] = 'chunked'
req = (
'%s %s HTTP/1.1\r\n'
'%s\r\n' % (method, uri, headers)
)
self.conn.send (req)
# XXX 100 continue
if content:
if type(content) is str:
self.conn.send (content)
elif headers.has_key ('content-length'):
clen = int (headers.get_one ('content-length'))
slen = 0
for block in content:
self.conn.send (block)
slen += len(block)
if slen > clen:
raise HTTP_Protocol_Error ("content larger than declared length", clen, slen)
else:
if slen != clen:
raise HTTP_Protocol_Error ("content smaller than declared length", clen, slen)
else:
# chunked encoding
for block in content:
if block:
self.conn.writev (['%x\r\n' % (len (block),), block])
self.conn.send ('0\r\n')
def GET (self, uri, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('GET', uri, headers, force=True)
req.wait()
return req
def GET_file (self, uri, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('GET', uri, headers, force=False)
req.wait()
return req
def PUT (self, uri, content, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('PUT', uri, headers, content, force=True)
req.wait()
return req
def POST (self, uri, content, **headers):
headers = header_set().from_keywords (headers)
req = self.send_request ('POST', uri, headers, content, force=True)
req.wait()
return req
# -*- Mode: Python -*-
import coro
import coro.http
import backdoor
# toy: move an X through a grid.
# tests: POST data, compression, persistent connections, shared state
import sys
W = sys.stderr.write
class grid_handler:
def __init__ (self, w, h):
self.w = w
self.h = h
self.grid = [['.' for x in range (w)] for y in range (h)]
self.pos = [w/2, h/2]
self.grid[self.pos[1]][self.pos[0]] = 'X'
def match (self, request):
return request.path.startswith ('/grid')
def handle_request (self, request):
if request.path == '/grid/source':
request['content-type'] = 'text/plain'
request.set_deflate()
request.push (open ('grid.py', 'rb').read())
request.done()
return
request['content-type'] = 'text/html'
request.set_deflate()
if request.file:
data = request.file.read()
pairs = [ x.split('=') for x in data.split ('&') ]
for k, v in pairs:
if k == 'dir':
x0, y0 = self.pos
x1, y1 = self.pos
if v == 'left':
x1 = max (x0-1, 0)
elif v == 'right':
x1 = min (x0+1, self.w-1)
elif v == 'up':
y1 = max (y0-1, 0)
elif v == 'down':
y1 = min (y0+1, self.h-1)
else:
pass
self.grid[y0][x0] = '*'
self.grid[y1][x1] = 'X'
self.pos = [x1, y1]
else:
pass
l = []
for y in self.grid:
l.append (''.join (y))
request.push ('<pre>')
request.push ('\n'.join (l))
request.push ('\n</pre>\n')
request.push (
'<form name="input" action="grid" method="post">'
'<input type="submit" name="dir" value="left" />'
'<input type="submit" name="dir" value="right" />'
'<input type="submit" name="dir" value="up" />'
'<input type="submit" name="dir" value="down" />'
'</form>'
'<a href="/grid/source">source for this handler</a>'
)
request.done()
server = coro.http.server()
server.push_handler (grid_handler (50, 30))
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9001))
coro.spawn (backdoor.serve, unix_path='/tmp/httpd.bd')
coro.event_loop (30.0)
# -*- Mode: Python -*-
import coro
import coro.http
import backdoor
# demonstrate the session handler
import sys
W = sys.stderr.write
def session (sid, fifo):
i = 0
while 1:
try:
# wait a half hour for a new hit
request = coro.with_timeout (1800, fifo.pop)
except coro.TimeoutError:
break
else:
request['content-type'] = 'text/html'
if i == 10:
request.push (
'<html><h1>Session Over! Bye!</h1>'
'<a href="session">start over</a>'
'</html>'
)
request.done()
break
else:
request.push (
'<html><h1>Session Demo</h1><br><h2>Hit=%d</h2>'
'<a href="session">hit me!</a>'
'</html>' % (i,)
)
request.done()
i += 1
server = coro.http.server()
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.session_handler.session_handler ('session', session))
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9001))
coro.spawn (backdoor.serve, unix_path='/tmp/httpd.bd')
coro.event_loop (30.0)
# -*- Mode: Python -*-
import coro
W = coro.write_stderr
from coro.httpd.client import client as http_client
def t0():
c = http_client ('127.0.0.1', 80)
l = [ c.send_request ('GET', '/postgresql/html/', {}, content=None, force=True) for x in range (10) ]
for req in l:
req.wait()
W ('%s\n' % (req.response,))
def t1():
c = http_client ('127.0.0.1', 80)
rl = coro.in_parallel ([(c.GET, ('/postgresql/html/',))] * 10)
for x in rl:
W ('%s\n' % (x.response,))
return rl
if __name__ == '__main__':
import coro.backdoor
coro.spawn (t0)
coro.spawn (coro.backdoor.serve, unix_path='/tmp/xx.bd')
coro.event_loop()
# -*- Mode: Python -*-
# demo an https server using the TLSLite package.
import coro
import coro.http
import coro.backdoor
# -----------------------------------------------------------------------
# --- change the location of the chain and key files on the next line ---
# -----------------------------------------------------------------------
server = coro.http.tlslite_server (
'cert/server.crt',
'cert/server.key',
)
server.push_handler (coro.http.handlers.coro_status_handler())
server.push_handler (coro.http.handlers.favicon_handler())
coro.spawn (server.start, ('0.0.0.0', 9443))
coro.spawn (coro.backdoor.serve, unix_path='/tmp/httpsd.bd')
coro.event_loop (30.0)
# -*- Mode: Python -*-
import coro
import os
import re
import sys
import time
import zlib
from coro.http.http_date import build_http_date
W = sys.stderr.write
# these two aren't real handlers, they're more like templates
# to give you an idea how to write one.
class post_handler:
def match (self, request):
# override to do a better job of matching
return request._method == 'post'
def handle_request (self, request):
data = request.file.read()
W ('post handler, data=%r\n' % (data,))
request.done()
class put_handler:
def match (self, request):
# override to do a better job of matching
return request.method == 'put'
def handle_request (self, request):
fp = request.file
while 1:
line = fp.readline()
if not line:
W ('line: DONE!\n')
break
else:
W ('line: %r\n' % (line,))
request.done()
class coro_status_handler:
def match (self, request):
return request.path.split ('/')[1] == 'status'
def clean (self, s):
s = s.replace ('<','&lt;')
s = s.replace ('>','&gt;')
return s
def handle_request (self, request):
request['content-type'] = 'text/html; charset=utf-8'
request.set_deflate()
request.push (
'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" '
'"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">'
'<html xmlns="http://www.w3.org/1999/xhtml">\r\n'
)
request.push ('<head><title>status</title></head><body>\r\n')
request.push ('<p>Listening on\r\n')
request.push (repr (request.server.addr))
request.push ('</p>\r\n')
request.push ('<table border="1">\r\n')
all_threads = ( (x, coro.where(x)) for x in coro.all_threads.values() )
for thread, traceback in all_threads:
request.push ('<tr><td>%s\r\n' % self.clean (repr(thread)))
request.push ('<pre>\r\n')
# traceback format seems to have changed
for level in traceback[1:-1].split ('] ['):
[file, fun] = level.split (' ')
fun, line = fun.split ('|')
request.push ('<b>%20s</b>:%3d %s\r\n' % (self.clean (fun), int(line), self.clean (file)))
request.push ('</pre></td></tr>')
request.push ('</table>\r\n')
request.push ('<p><a href="status">Update</a></p>')
request.push ('</body></html>')
request.done()
class file_handler:
block_size = 16000
def __init__ (self, doc_root):
self.doc_root = doc_root
def match (self, request):
path = request.path
filename = os.path.join (self.doc_root, path[1:])
return os.path.exists (filename)
crack_if_modified_since = re.compile ('([^;]+)(; length=([0-9]+))?$', re.IGNORECASE)
def handle_request (self, request):
path = request.path
filename = os.path.join (self.doc_root, path[1:])
if request.method not in ('get', 'head'):
request.error (405)
return
if os.path.isdir (filename):
filename = os.path.join (filename, 'index.html')
if not os.path.isfile (filename):
request.error (404)
else:
stat_info = os.stat (filename)
mtime = stat_info[stat.ST_MTIME]
file_length = stat_info[stat.ST_SIZE]
ims = request['if-modified-since']
if ims:
length_match = 1
m = self.crack_if_modified_since.match (ims)
if m:
length = m.group (3)
if length:
if int(length) != file_length:
length_match = 0
ims_date = http_date.parse_http_date (m.group(1))
if length_match and ims_date:
if mtime <= ims_date:
request.error (304)
return
ftype, fencoding = mimetypes.guess_type (filename)
request['Content-Type'] = ftype or 'text/plain'
request['Last-Modified'] = build_http_date (mtime)
# Note: these are blocking file operations.
if request.method == 'get':
f = open (filename, 'rb')
block = f.read (self.block_size)
if not block:
request.error (204) # no content
else:
while 1:
request.push (block)
block = f.read (self.block_size)
if not block:
break
elif request.method == 'head':
pass
else:
# should be impossible
request.error (405)
sample = (
'AAABAAEAICAQAAEABADoAgAAFgAAACgAAAAgAAAAQAAAAAEABAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
'AAAAAAAAAAAAAAD/+K///8AH//+iI///QAH//r4g//x3AH//Z6J//UABP/ovgD/458Ef+u+wv/Tn'
'0R/+79if9OXZH/6gCJ/2BwAf/u/8n/h33R/7Z7kf/ReQH/+qu7//BUW//7vrv//RR3//7r///80d'
'///pq///8EP//+rH///d9///6j///9Af/w=='
).decode ('base64')
zsample = zlib.compress (sample, 9)
last_modified = build_http_date (time.time())
class favicon_handler:
def __init__ (self, data=None):
if data is None:
self.data = zsample
else:
self.data = data
def match (self, request):
return request.path == '/favicon.ico'
def handle_request (self, request):
if request['if-modified-since']:
# if we cared, we could check that timestamp.
request.error (304)
else:
request['content-type'] = 'image/x-icon'
request['last-modified'] = last_modified
# are there browsers that don't accept deflate?
request['content-encoding'] = 'deflate'
request.push (self.data)
request.done()
# -*- Mode: Python -*-
import coro
from coro import read_stream
W = coro.write_stderr
# candidate for sync.pyx?
class latch:
"Like a CV, except without the race - if the event has already fired then wait() will return immediately."
def __init__ (self):
self.cv = coro.condition_variable()
self.done = False
def wake_all (self, args=()):
self.done = True
self.args = args
return self.cv.wake_all (args)
def wait (self):
if not self.done:
return self.cv.wait()
else:
return self.args
class http_file:
"HTTP message content, as a file-like object."
buffer_size = 8000
def __init__ (self, headers, stream):
self.streami = stream
self.done_cv = latch()
if headers.get_one ('transfer-encoding') == 'chunked':
self.streamo = read_stream.buffered_stream (self._gen_read_chunked().next)
else:
content_length = headers.get_one ('content-length')
if content_length:
self.content_length = int (content_length)
self.streamo = read_stream.buffered_stream (self._gen_read_fixed().next)
else:
raise HTTP_Protocol_Error ("no way to determine length of HTTP data")
def _gen_read_chunked (self):
"generator: decodes chunked transfer-encoding."
s = self.streami
while 1:
chunk_size = int (s.read_line()[:-2], 16)
if chunk_size == 0:
self.done_cv.wake_all()
return
else:
remain = chunk_size
while remain:
ask = min (remain, self.buffer_size)
block = s.read_exact (ask)
assert (s.read_exact (2) == '\r\n')
remain -= ask
yield block
def _gen_read_fixed (self):
"generate fixed-size blocks of content."
s = self.streami
remain = self.content_length
while remain:
ask = min (remain, self.buffer_size)
block = s.read_exact (ask)
remain -= ask
yield block
self.done_cv.wake_all()
return
# XXX implement <size> argument
def read (self, join=True):
"read the entire contents. join=False returns a generator, join=True returns a string."
r = (x for x in self.streamo.read_all())
if join:
return ''.join (r)
else:
return r
def readline (self):
"read a newline-delimited line."
if self.done_cv.done:
return ''
else:
return self.streamo.read_until ('\n')
def wait (self):
"wait until all the content has been read."
self.done_cv.wait()
class header_set:
def __init__ (self, headers=()):
self.headers = {}
for h in headers:
self.crack (h)
def from_keywords (self, kwds):
"Populate this header set from a dictionary of keyword arguments (e.g., 'content_length' becomes 'content-length')"
r = []
for k, v in kwds.items():
k = k.replace ('_', '-')
self[k] = v
return self
def crack (self, h):
"Crack one header line."
# deliberately ignoring 822 crap like continuation lines.
try:
i = h.index (': ')
name, value = h[:i], h[i+2:]
self[name] = value
except ValueError:
coro.write_stderr ('dropping bogus header %r\n' % (h,))
pass
def get_one (self, key):
"Get the value of a header expected to have at most one value. If not present, return None. If more than one, raise ValueError."
r = self.headers.get (key, None)
if r is None:
return r
elif isinstance (r, list) and len (r) > 1:
raise ValueError ("expected only one %s header, got %r" % (key, r))
else:
return r[0]
def has_key (self, key):
"Is this header present?"
return self.headers.has_key (key.lower())
def __getitem__ (self, key):
"Returns the list of values for this header, or None."
return self.headers.get (key, None)
def __setitem__ (self, name, value):
"Add a value to the header <name>."
name = name.lower()
probe = self.headers.get (name)
if probe is None:
self.headers[name] = [value]
else:
probe.append (value)
def __str__ (self):
"Render the set of headers."
r = []
for k, vl in self.headers.iteritems():
for v in vl:
r.append ('%s: %s\r\n' % (k, v))
return ''.join (r)
# -*- Mode: Python -*-
# history: this code traces all the way back to medusa, through egroups, then ironport, and into shrapnel.
# Very Rewritten in Feb 2012.
import coro
import errno
import http_date
import mimetypes
import os
import re
from coro import read_stream
import socket
import stat
import sys
import time
import zlib
from protocol import latch, http_file, header_set
W = sys.stderr.write
__version__ = '0.1'
class request_stream:
def __init__ (self, conn, stream):
self.timeout = conn.server.client_timeout
self.conn = conn
self.stream = stream
def get_request (self):
request_line = self.stream.read_line()
if not request_line:
raise StopIteration
else:
# read header
lines = []
while 1:
line = self.stream.read_line()
# XXX handle continuation lines
if line == '':
raise StopIteration
elif line == '\r\n':
break
else:
lines.append (line[:-2])
return http_request (self.conn, request_line[:-2], header_set (lines))
def gen_requests (self):
# read HTTP requests on this stream
while 1:
try:
request = coro.with_timeout (self.timeout, self.get_request)
except coro.TimeoutError:
return
else:
yield request
# can't read another request until we finish reading this one
# [it might have a body]
request.wait_until_read()
class connection:
def __init__ (self, server):
self.server = server
self.stream = None
def run (self, conn, peer):
self.conn = conn
self.peer = peer
self.stream = read_stream.sock_stream (self.conn)
try:
try:
for request in request_stream (self, self.stream).gen_requests():
if request.bad:
# bad request
request.error (400)
else:
try:
handler = self.pick_handler (request)
if handler:
# XXX with_timeout() ?
handler.handle_request (request)
else:
request.error (404)
request.wait_until_done()
except (coro.TimeoutError, coro.Interrupted):
raise
except:
tb = coro.compact_traceback()
request.error (500, tb)
self.server.log ('error: %r request=%r tb=%r' % (self.peer, request, tb))
except (OSError, coro.TimeoutError, coro.ClosedError):
pass
finally:
self.conn.close()
def pick_handler (self, request):
for handler in self.server.handlers:
if handler.match (request):
return handler
return None
def send (self, data):
return self.conn.send (data)
def close (self):
self.conn.close()
class http_request:
request_count = 0
# <path>;<params>?<query>#<fragment>
path_re = re.compile ('(/[^;?#]*)(;[^?#]*)?(\?[^#]*)?(#.*)?')
# <method> <uri> HTTP/<version>
request_re = re.compile ('([^ ]+) ([^ ]+) *(HTTP/([0-9.]+))?')
# shadowed instance variables
chunking = False
close = False
is_done = False
sent_headers = False
bad = False
body_done = False
file = None
def __init__ (self, client, request, headers):
self.reply_headers = header_set()
self.reply_code = 200
http_request.request_count = http_request.request_count + 1
self.request_number = http_request.request_count
self.request = request
self.request_headers = headers
#W ('headers=%s\n' % (headers,))
self.client = client
self.server = client.server
self.tstart = time.time()
self.peer = client.peer
self.output = buffered_output (self.client.conn)
self.done_cv = latch()
self.deflate = None
m = http_request.request_re.match (request)
if m:
(self.method, self.uri, ver, self.version) = m.groups()
self.method = self.method.lower()
if not self.version:
self.version = "0.9"
m = http_request.path_re.match (self.uri)
if m:
(self.path, self.params, self.query, self.frag) = m.groups()
else:
self.bad = True
else:
self.version = "1.0"
self.bad = True
if self.has_body():
self.file = http_file (headers, client.stream)
def wait_until_read (self):
"wait until this entire request body has been read"
if self.file:
self.file.done_cv.wait()
def wait_until_done (self):
"wait until this request is done (i.e, the response has been sent)"
if not self.is_done:
self.done_cv.wait()
def has_body (self):
if self.request_headers.has_key ('transfer-encoding'):
# 4.4 ignore any content-length
return True
else:
probe = self.request_headers.get_one ('content-length')
if probe:
try:
size = int (probe)
if size == 0:
return False
elif size > 0:
return True
else:
return False
except ValueError:
return False
def can_deflate (self):
acc_enc = self.request_headers.get_one ('accept-encoding')
if acc_enc:
for kind in acc_enc.split (','):
if kind.strip().lower() == 'deflate':
return True
return False
def set_deflate (self):
"set this request for on-the-fly compression (via zlib DEFLATE)"
if self.can_deflate():
self.deflate = zlib.compressobj()
self['content-encoding'] = 'deflate'
# http://zoompf.com/blog/2012/02/lose-the-wait-http-compression
# Note: chrome,firefox,safari,opera all handle the header. Not MSIE, sigh. Discard it.
assert (self.deflate.compress ('') == '\x78\x9c')
return self.deflate
def push (self, data, flush=False):
"push output data for this request. buffered, maybe chunked, maybe compressed"
if not self.sent_headers:
self.sent_headers = 1
self.output.write (self.get_headers())
if self.chunking:
self.output.set_chunk()
if self.deflate:
if data:
data = self.deflate.compress (data)
if flush:
data += self.deflate.flush()
if data:
self.output.write (data)
def done (self):
if self.is_done:
W ('done called twice?\n')
return
if not self.sent_headers:
self.push ('')
if self.deflate:
self.push ('', flush=True)
self.output.flush()
if self.close:
self.client.close()
self.is_done = True
self.client.server.log (self.log_line())
self.done_cv.wake_all()
# note: the difference of meaning between getitem/setitem
def __getitem__ (self, key):
# fetch a request header
# use this only when you expect at most one of this header.
return self.request_headers.get_one (key)
def __setitem__ (self, key, val):
# set a reply header
self.reply_headers[key] = val
def get_headers (self):
chunked = False
# here is were we decide things like keep-alive, 1.0 vs 1.1, chunking, etc.
hi = self.request_headers
ho = self.reply_headers
connection = hi.get_one('connection')
if connection:
connection_tokens = [ x.strip() for x in connection.split(',') ]
else:
connection_tokens = ()
close_it = False
if self.version == '1.1':
if 'close' in connection_tokens:
close_it = True
elif not ho.get_one ('content-length'):
ho['transfer-encoding'] = 'chunked'
chunked = True
elif self.version == '1.0':
if 'keep-alive' in connection_tokens:
if not ho.get_one ('content-length'):
close_it = True
else:
ho['connection'] = 'keep-alive'
else:
close_it = True
elif self.version == '0.9':
close_it = True
if close_it:
ho['connection'] = 'close'
self.chunking = chunked
ho['server'] = 'shrapnel httpd/%s' % __version__
ho['date'] = http_date.build_http_date (coro.now_usec / coro.microseconds)
return self.response (self.reply_code) + '\r\n' + str (self.reply_headers) + '\r\n'
def response (self, code=200):
message = self.responses[code]
self.reply_code = code
return 'HTTP/%s %d %s' % (self.version, code, message)
def error (self, code, reason=None):
self.reply_code = code
message = self.responses[code]
s = self.DEFAULT_ERROR_MESSAGE % {
'code': code, 'message': message, 'reason': reason
}
self['Content-Length'] = len(s)
self['Content-Type'] = 'text/html'
self.push (s)
self.done()
def log_line (self):
now = time.time()
# somewhere between common log format and squid, avoid the
# expense of formatting time
return '%.03f %s "%s" %d %d %0.2f' % (
now,
'%s:%d' % self.peer,
self.request,
self.reply_code,
self.output.sent,
now - self.tstart,
)
responses = {
100: "Continue",
101: "Switching Protocols",
200: "OK",
201: "Created",
202: "Accepted",
203: "Non-Authoritative Information",
204: "No Content",
205: "Reset Content",
206: "Partial Content",
300: "Multiple Choices",
301: "Moved Permanently",
302: "Moved Temporarily",
303: "See Other",
304: "Not Modified",
305: "Use Proxy",
400: "Bad Request",
401: "Unauthorized",
402: "Payment Required",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
406: "Not Acceptable",
407: "Proxy Authentication Required",
408: "Request Time-out",
409: "Conflict",
410: "Gone",
411: "Length Required",
412: "Precondition Failed",
413: "Request Entity Too Large",
414: "Request-URI Too Large",
415: "Unsupported Media Type",
500: "Internal Server Error",
501: "Not Implemented",
502: "Bad Gateway",
503: "Service Unavailable",
504: "Gateway Time-out",
505: "HTTP Version not supported"
}
# Default error message
DEFAULT_ERROR_MESSAGE = '\r\n'.join ([
'<html>',
'<head>',
'<title>Error response</title>',
'</head>',
'<body>',
'<h1>Error response</h1>',
'<p>Error code %(code)d.',
'<p>Message: %(message)s.',
'<p>Reason: %(reason)s.',
'</body>',
'</html>',
''
])
class HTTP_Protocol_Error (Exception):
pass
# chunking works thus:
# <data>
# becomes:
# <hex-length><CRLF>
# <data><CRLF>
# when done, signal with
# 0<CRLF><CRLF>
class buffered_output:
"Buffer HTTP output data; handle the 'chunked' transfer-encoding"
def __init__ (self, conn, size=8000):
self.conn = conn
self.size = size
self.buffer = []
self.len = 0
self.sent = 0
self.chunk_index = -1
self.chunk_len = 0
# at this point *exactly*, we want to start chunking the output.
# this is called immediately after the headers are pushed.
def set_chunk (self):
"start chunking here, exactly."
self.chunk_index = len (self.buffer)
def get_data (self):
"get data to send. may chunk."
data, self.buffer = self.buffer, []
if self.chunk_index >= 0:
# chunkify (the post-header portion of) our output list
data.insert (self.chunk_index, '%x\r\n' % (self.chunk_len,))
data.append ('\r\n')
self.chunk_len = 0
self.chunk_index = 0
self.len = 0
return data
def write (self, data):
"Push data to the buffer. If the accumulated data goes over the buffer size, send it."
self.buffer.append (data)
self.len += len (data)
if self.chunk_index >= 0:
self.chunk_len += len (data)
if self.len >= self.size:
self.send (self.get_data())
def flush (self):
"Flush the data from this buffer."
data = self.get_data()
if self.chunk_index >= 0:
data.append ('0\r\n\r\n')
self.send (data)
def send (self, data):
try:
self.sent += self.conn.writev (data)
except AttributeError:
# underlying socket may not support writev (e.g., tlslite)
self.sent += self.conn.send (''.join (data))
class server:
client_timeout = 30
def __init__ (self):
self.handlers = []
self.shutdown_flag = 0
self.thread_id = None
self.addr = ()
self.sock = None
def log (self, line):
sys.stderr.write ('http %s:%d: %s\n' % (self.addr[0], self.addr[1], line))
def push_handler (self, handler):
self.handlers.append (handler)
def start (self, addr, retries=5):
"""Start the web server listening on addr in a new coroutine.
Try up to <retries> time to bind to that address.
Raises an exception if the bind fails."""
self.sock = coro.tcp_sock()
self.sock.set_reuse_addr()
done = 0
save_errno = 0
self.addr = addr
while not done:
for x in xrange (retries):
try:
self.sock.bind (addr)
except OSError, why:
if why.errno not in (errno.EADDRNOTAVAIL, errno.EADDRINUSE):
raise
else:
save_errno = 0
if why.errno == errno.EADDRINUSE:
was_eaddrinuse = 1
else:
done = 1
break
coro.sleep_relative (1)
else:
self.log ('cannot bind to %s:%d after 5 attempts, errno = %d' % (addr[0], addr[1], save_errno))
coro.sleep_relative (15)
self.sock.listen (1024)
c = coro.spawn (self.run)
c.set_name ('http_server (%s:%d)' % addr)
def run (self):
self.thread_id = coro.current().thread_id()
while not self.shutdown_flag:
try:
conn, addr = self.accept()
client = connection (self)
c = coro.spawn (client.run, conn, addr)
c.set_name ('http connection on %r' % (addr,))
except coro.Shutdown:
break
except:
self.log ('error: %r\n' % (coro.compact_traceback(),))
coro.sleep_relative (0.25)
continue
self.sock.close()
def accept (self):
return self.sock.accept()
def shutdown (self):
self.shutdown_flag = 1
try:
# XXX SMR is this really necessary?
thread = coro.get_thread_by_id (self.thread_id)
thread.shutdown()
except KeyError:
return # already exited
class tlslite_server (server):
"https server using the tlslite package"
def __init__ (self, cert_path, key_path):
server.__init__ (self)
self.cert_path = cert_path
self.key_path = key_path
self.read_chain()
self.read_private()
def accept (self):
import tlslite
conn0, addr = server.accept (self)
conn = tlslite.TLSConnection (conn0)
conn.handshakeServer (certChain=self.chain, privateKey=self.private)
return conn, addr
def read_chain (self):
"cert chain is all in one file, in LEAF -> ROOT order"
import tlslite
delim = '-----END CERTIFICATE-----\n'
data = open (self.cert_path).read()
certs = data.split (delim)
chain = []
for cert in certs:
if cert:
x = tlslite.X509()
x.parse (cert + delim)
chain.append (x)
self.chain = tlslite.X509CertChain (chain)
def read_private (self):
import tlslite
self.private = tlslite.parsePEMKey (
open (self.key_path).read(),
private=True
)
# -*- Mode: Python -*-
import coro
import time
import uuid
import sys
W = sys.stderr.write
# See: http://en.wikipedia.org/wiki/HTTP_cookie#Session_cookie
def extract_session (cookie):
parts = cookie.split (';')
for part in parts:
pair = [x.strip() for x in part.split ('=')]
if len (pair) == 2:
if pair[0] == 'session':
return pair[1]
return None
class session_handler:
def __init__ (self, name, function):
self.name = name
self.function = function
self.sessions = {}
def match (self, request):
path = request.path.split ('/')
if (len(path) > 1) and (path[1] == self.name):
return 1
else:
return 0
def find_session (self, request):
# XXX does http allow more than one cookie header?
cookie = request['cookie']
if cookie:
sid = extract_session (cookie)
return sid, self.sessions.get (sid, None)
else:
return None, None
def gen_session_id (self):
return str (uuid.uuid4())
def handle_request (self, request):
sid, fifo = self.find_session (request)
if fifo is None:
# login
fifo = coro.fifo()
fifo.push (request)
sid = self.gen_session_id()
request['set-cookie'] = 'session=%s' % (sid,)
self.sessions[sid] = fifo
coro.spawn (self.wrap, sid, fifo)
else:
fifo.push (request)
def wrap (self, sid, fifo):
try:
self.function (sid, fifo)
finally:
del self.sessions[sid]
# -*- Mode: Python -*-
class socket_producer:
def __init__ (self, conn, buffer_size=8000):
self.conn = conn
self.buffer_size = buffer_size
def next (self):
return self.conn.recv (self.buffer_size)
def sock_stream (sock):
return buffered_stream (socket_producer (sock).next)
class buffered_stream:
def __init__ (self, producer):
self.producer = producer
self.buffer = ''
def gen_read_until (self, delim):
"generate pieces of input up to and including <delim>, then StopIteration"
ld = len(delim)
m = 0
while 1:
if not self.buffer:
self.buffer = self.producer()
if not self.buffer:
# eof
yield ''
return
i = 0
while i < len (self.buffer):
if self.buffer[i] == delim[m]:
m += 1
if m == ld:
result, self.buffer = self.buffer[:i+1], self.buffer[i+1:]
yield result
return
else:
m = 0
i += 1
block, self.buffer = self.buffer, ''
yield block
def gen_read_until_dfa (self, dfa):
"generate pieces of input up to and including a match on <dfa>, then StopIteration"
m = 0
while 1:
if not self.buffer:
self.buffer = self.producer()
if not self.buffer:
# eof
yield ''
return
i = 0
while i < len (self.buffer):
if dfa.consume (self.buffer[i]):
result, self.buffer = self.buffer[:i+1], self.buffer[i+1:]
yield result
return
i += 1
block, self.buffer = self.buffer, ''
yield block
def gen_read_exact (self, size):
"generate pieces of input up to <size> bytes, then StopIteration"
remain = size
while remain:
if len (self.buffer) >= remain:
result, self.buffer = self.buffer[:remain], self.buffer[remain:]
yield result
return
else:
piece, self.buffer = self.buffer, self.producer()
remain -= len (piece)
yield piece
if not self.buffer:
# eof
yield ''
return
def read_until (self, delim, join=True):
"read until <delim>. return a list of parts unless <join> is True"
result = ( x for x in self.gen_read_until (delim) )
if join:
return ''.join (result)
else:
return result
def read_exact (self, size, join=True):
"read exactly <size> bytes. return a list of parts unless <join> is True"
result = ( x for x in self.gen_read_exact (size) )
if join:
return ''.join (result)
else:
return result
def flush (self):
"flush this stream's buffer"
result, self.buffer = self.buffer, ''
return result
def read_line (self, delim='\r\n'):
"read a CRLF-delimited line from this stream"
return self.read_until (delim)
def read_all (self):
"read from self.producer until the stream terminates"
if self.buffer:
yield self.flush()
while 1:
block = self.producer()
if not block:
return
else:
yield block
......@@ -340,11 +340,11 @@ cdef public class sock [ object sock_object, type sock_type ]:
return self.fd
cdef _set_reuse_addr (self):
cdef int old
cdef int old = 0
cdef socklen_t optlen
optlen = sizeof (old);
optlen = sizeof (old)
getsockopt (self.fd, SOL_SOCKET, SO_REUSEADDR, <void*> &old, &optlen)
old = old | 1;
old = old | 1
setsockopt (self.fd, SOL_SOCKET, SO_REUSEADDR, <void*> &old, optlen)
def set_reuse_addr (self):
......
# -*- Mode: Python; tab-width: 4 -*-
# Copyright 1999 by eGroups, Inc.
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of
# eGroups not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# EGROUPS DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL EGROUPS BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
VERSION_STRING = '$Id$'
__split_vers__ = VERSION_STRING.split()
__version__ = (len(__split_vers__) > 2) and __split_vers__[2] or '1.0'
#
# coro_ehttpd
# This is an infrastructure for having a http server using coroutines.
# There are three major classes defined here:
# http_client
# This is a descendent of coro.Thread. It handles the connection
# to the client, spawned by http_server. Its run method goes through
# the stages of reading the request, filling out a http_request and
# finding the right handler, etc.
# http_request
# This object collects all of the data for a request. It is initialized
# from the http_client thread with the http request data, and is then
# passed to the handler to receive data. It attempts to enforce a valid
# http protocol on the response
# http_server
# This is a thread which just sits accepting on a socket, and spawning
# http_clients to handle incoming requests
#
# Additionally, the server expects http handler classes which respond
# to match and handle_request. There is an example class,
# http_file_handler, which is a basic handler to respond to GET requests
# to a document root. It'll return any file which exists.
#
# To use, implement your own handler class which responds to match and
# handle_request. Then, create a server, add handlers to the server,
# and start it. You then need to call the event_loop yourself.
# Something like:
#
# server = http_server(args = (('0.0.0.0', 7001),))
# file_handler = http_file_handler ('/home/htdocs/')
# server.push_handler (file_handler)
# server.start()
# coro.event_loop(30.0)
#
# we would like to handle persistent connections correctly.
# Here's the control flow:
# <server> : <-- connection
# create <client>
# while get_request(): <-- request
# create <request>
# get_handler().process_request()
#
# The difficulty (compared to medusa) is that the handler
# actually sends all the data, including the header. So
# the request object doesn't necessarily know what's going
# on.
# 1) we should force handlers to use an interface for setting headers. [i.e., CGI
# won't call "send('Content-Type: text/html')", it will use a method on the
# request object].
# 2) <request> should monitor changes to the headers.
# 3) a few standardized headers should have specific methods to improve performance.
#
# This still doesn't necessarily solve the difficult issue of persistence.
# 1) Will we pipeline? Only if we buffer on read. [I think we do].
# 2) what about chunking? We could just always use chunking. [no, that won't work
# because then browsers won't know how big files are]
#
# Suggestion: how about a dynamic-inheritance feature for setting headers?
# For example, use __getattr__ to catch anything beginning with {set,get}_header_xxx.
# This would allow subclasses to override specific methods. [could/should we use an
# inheritance to handle http/1.0 vs http/1.1?]
#
# [...]
# Ok, most of that is now done, although a bit of a mess. Who is reponsible for
# closing, in the event of http/1.0 and conn:close??
#
# some nastiness: important to understand exactly which coroutine each piece of code
# needs to run in, and whether it runs parallel or as a subroutine of some other coroutine.
#
# coroutines:
# 1) server (accept loop)
# 2) client (request loop)
# 3) session (request loop)
#
# For example, i'm pretty sure that the 'session' coroutine needs to execute as
# 'subroutine' of the 'client' coroutine; i.e. when the session yields(), it does
# so back to the client coroutine, not to main.
import coro
import coro_ssl
import errno
import http_date
import mime_type_table
import os
import re
import read_stream
import socket
import sslip
import stat
import sys
import time
try:
import qlog
except:
class Qlog:
def __call__(self, *args, **kw):
# print "Calling", self.name
return None
def __getattr__(self, name):
self.name = name
return self
def __repr__(self):
return 'null log daemon'
qlog = Qlog()
ssl_ctx = None
def init_ssl(protocol=sslip.SSLV23_SERVER_METHOD):
global ssl_ctx
if not ssl_ctx:
ssl_ctx = coro_ssl.ssl_ctx(protocol)
ssl_ctx.set_ciphers ('RC4-SHA:RC4-MD5:ALL')
def update_cert_key(cert, key, passwd='', chain=()):
global ssl_ctx
if cert and key:
cert_obj = sslip.read_pem_cert(cert)
ssl_ctx.use_cert(cert_obj, chain)
key_obj = sslip.read_pem_key(key, passwd)
ssl_ctx.use_key(key_obj)
class http_client:
def __init__ (self, group=None, target=None, name=None, logfp=sys.stderr, args=(), kwargs={}):
self.stream = None
self.buffer = ''
self._bytes = 0
self.logfp = logfp
def run (self, conn, peer, server_obj, handlers):
self.conn = conn
self.server = server_obj
self.peer = peer
# Note that peer could be a fake address, and server_obj can be None.
# These indicate a "backdoor" request from the gui.
try:
try:
count = 0
qlog.write('WEBUI.CONN_INIT', 'http', id(self), peer[0], peer[1])
while 1:
if self.server and self.server.shutdown_flag:
break
try:
# We use self.stream to read the header line-by-line
# and then switch to reading directly from the socket
# for the body (if needed). Reuse the previous
# instance if it exists, to support HTTP pipelining.
if not self.stream:
self.stream = read_stream.stream_reader(self.conn.recv)
request_line = self.read_line()
if not request_line:
break
except socket.error:
qlog.write('WEBUI.CONN_ERROR', 'http', id(self), 'socket error')
break
count = count + 1
headers = self.read_header()
#print '\n'.join (headers) + '\n\n'
request = http_request (self, request_line, headers)
request.read_body()
if request._error:
# Bad Request
request.error (400)
return
else:
try:
try:
handler = self.pick_handler (handlers, request)
if handler:
handler.handle_request (request)
else:
self.not_found (request)
if not request._done:
request.done()
except OSError, err:
if err[0] == errno.EPIPE:
pass # ignore broken pipe error
else:
raise # process exception in outer try
# These exceptions are used inside the coro
# stuff and shouldn't be thrown away
except (coro.TimeoutError, coro.Interrupted):
raise
except:
tb = coro.compact_traceback()
## sys.stderr.write (repr(tb))
request.error (500, tb)
qlog.write('COMMON.APP_FAILURE',
tb + ' request: ' + `request`)
tb = None
if request._close:
# ok, this gets interesting. the connection needs to close
# here. the finally clause below isn't getting hit because
# the session and client are running in the same coroutine.
# that's bad, I think.
conn.close()
break
# this should be a policy decision of the owner of logfp
# self.logfp.flush()
except read_stream.BufferOverflow:
# Indicates a request header that exceeded the line
# buffer, which may indicate an attack on the server.
# We just close the connection without a response.
# TODO:lrosenstein - log this since it may be an attack?
qlog.write('WEBUI.CONN_ERROR', 'http',
id(self), 'line buffer limit exceeded')
pass
except sslip.Error, why:
# Most likely a problem with SSL negotiation
qlog.write('WEBUI.CONN_ERROR',
'https',
id(self),
why[1])
pass
except OSError, err:
# We got some kind of I/O error that wasn't handled
# elsewhere. Since this seem to happen because the
# client closed the connection, it is safe to ignore
# the exception.
qlog.write('WEBUI.CONN_ERROR',
'http', id(self), 'OS error %s' % str(err[1]))
pass
except coro.TimeoutError:
# Either a timeout from coro_ssl or a timeout
# on a backdoor GUI request (see gui.py)
pass
finally:
conn.close()
def not_found (self, request):
request.error (404)
def pick_handler (self, handlers, request):
for handler in handlers:
if handler.match (request):
return handler
return None
# This is for handlers that process PUT/POST themselves. This whole
# thing needs to be redone with a file-like interface to 'stdin' for
# requests, and we need to think about HTTP/1.1 and pipelining,
# etc...
def read (self, size):
if self.stream:
self.buffer = self.stream.drain_buffer()
self.stream = None
while len(self.buffer) < size:
result = self.conn.recv(size-len(self.buffer))
if result:
self.buffer = self.buffer + result
else:
break # connection closed
result = self.buffer[:size]
self.buffer = self.buffer[size:]
return result
def read_line (self):
try:
return coro.with_timeout(300, self._read_line)
except coro.TimeoutError:
return '' # EOF
def _read_line (self):
"""Read a line of input. Return '' on EOF or error.
TODO:lrosenstein - we should probably distinguish EOF/error
from blank lines. This would affect read_header(), which
could return an incomplete set of headers if the connection
closed prematurely."""
while 1:
try:
(ln, eof) = self.stream.read_line()
if eof:
return '' # throw away incomplete lines
else:
return ln
except coro.TimeoutError: # ssl sockets timeout
# Ignored to fix bug 3185. The problem was that httpd
# was closing the connection after 30 sec, but IE/Win
# would try to use the connection and fail with a wierd
# error. Now, we have a 5 min timeout in read_line
# above, which applies to SSL and non-SSL connections
# to prevent clients from tying up server resources
# indefinitely.
continue
except OSError, why:
if why[0] == errno.ECONNRESET:
return '' # signal an eof to the caller
else:
raise
def read_header (self):
header = []
while 1:
l = self.read_line()
if not l:
break
else:
header.append (l)
return header
def send (self, data):
return self.conn.send (data)
def close (self):
self.conn.close()
class http_request:
request_count = 0
# <path>;<params>?<query>#<fragment>
path_re = re.compile ('(/[^;?#]*)(;[^?#]*)?(\?[^#]*)?(#.*)?')
# <method> <uri> HTTP/<version>
request_re = re.compile ('([^ ]+) ([^ ]+) *(HTTP/([0-9.]+))?')
# shadowed instance variable
_chunking = 0
_close = 0
_done = 0
_sent_headers = 0
_error = 0
_user_id = '-'
_session_id = '-'
def __init__ (self, client, request, headers):
self._reply_headers = {}
self._reply_cookies = ()
self._reply_code = 200
http_request.request_count = http_request.request_count + 1
self._request_number = http_request.request_count
self._request = request
self._request_headers = headers
self._client = client
self._server = client.server
self._tstart = time.time()
self._sent_bytes = 0
self._whom = client.conn.getpeername()
m = http_request.request_re.match (request)
if m:
(self._method, self._uri, ver, self._version) = m.groups()
self._method = self._method.lower()
if not self._version:
self._version = "0.9"
m = http_request.path_re.match (self._uri)
if m:
(self._path, self._params, self._query, self._frag) = m.groups()
if self._query and self._query[0] == '?':
self._query = self._query[1:]
else:
self._error = 1
else:
self._version = "1.0"
self._error = 1
def read_body(self):
"""Read the message body, if any, so that it's cleared from
the input stream. This avoids problems with keep-alives if
the request handler doesn't read the body itself.
This used to be done in the __init__method, but that can
lead to a fatal error in the Python interpreter (see bug 3367).
The ultimate solution is to fix the way connections are handled
to ensure that we don't reuse the connection if the body wasn't
fully read by the request handler."""
self._body = ''
clen = self.get_request_header('Content-Length')
if clen:
try:
clen = int(clen)
self._body = coro.with_timeout(
60,
self._client.read,
clen)
if len(self._body) < clen:
qlog.write('WEBUI.CONN_ERROR',
'http', id(self),
'Truncated body (%d<%d) (req:%s)' % \
(len(self._body), clen, self._request))
self._error = 1 # didn't get the body we were promised
except coro.TimeoutError:
qlog.write('WEBUI.CONN_ERROR',
'http', id(self),
'Body read timeout (req:%s)' % self._request)
self._error = 1
except ValueError:
qlog.write('WEBUI.CONN_ERROR',
'http', id(self),
'Invalid Content-Length (%s) (req:%s)' % \
(clen, self._request)
)
self._error = 1
def is_secure(self):
return self._client.server and self._client.server.is_secure()
# --------------------------------------------------
# request header management
# --------------------------------------------------
_header_re = re.compile (r'([^: ]+): (.*)')
def get_request_header (self, header):
header = header.lower()
for h in self._request_headers:
m = self._header_re.match (h)
if m:
name, value = m.groups()
if name.lower() == header:
return value
return ''
# --------------------------------------------------
# reply header management
# --------------------------------------------------
# header names are case-insensitive, and we need to be
# able to reliably query a request for certain headers,
# thus the sprinkling of key.lower() calls.
def __setitem__ (self, key, value):
self._reply_headers[key.lower()] = value
def __getitem__ (self, key):
return self._reply_headers[key.lower()]
def has_key (self, key):
return self._reply_headers.has_key (key.lower())
# TODO:lrosenstein - it's legal and necessary to have multiple
# TODO:lrosenstein - Set-Cookie headers. Handle these as a special case.
def set_reply_cookies(self, cookies):
"""Set sequence of cookies to be used in the response."""
self._reply_cookies = cookies
# --------------------------------------------------
# reading request
# --------------------------------------------------
def read (self, size):
data = self._body[:size]
self._body = self._body[size:]
return data
# --------------------------------------------------
# sending response
# --------------------------------------------------
def send (self, data):
self._sent_bytes = self._sent_bytes + len(data)
return self._client.send (data)
# chunking works thus:
# <data>
# becomes:
# <hex-length><CRLF>
# <data><CRLF>
# when done, signal with
# 0<CRLF><CRLF>
# ok, I admit this is now something of a mess.
# this could maybe be better if we could:
# 1) distinguish replies that have content
# [we could even detect it automatically?]
# 2) be more explicit about buffering the header
def push (self, data):
if not self._sent_headers:
self._sent_headers = 1
headers = self.get_headers()
else:
headers = ''
if data:
if self._chunking:
self.send (headers + '%x\r\n%s\r\n' % (len(data), data))
else:
self.send (headers + data)
else:
self.send (headers)
def done (self, with_body=1):
if not self._sent_headers:
self.send_headers()
if with_body and self._chunking:
# note: there's an invisible 'footer' between the pair of CRLF's.
# it can be used to send certain additional types of headers.
self.send ('0\r\n\r\n')
if self._close:
self._client.close ()
self._done = 1
qlog.write('WEBUI.HTTP_REQUEST',
self._client.peer[0],
self._user_id,
self._session_id,
self._reply_code,
self._request,
'' # XXX: User agent
)
#self._client.logfp.write (self.log())
def get_headers (self):
chunking = 0
# here is were we decide things like keep-alive, 1.0 vs 1.1, chunking, etc.
connection = self.get_request_header('connection').lower()
connection_tokens = [ x.strip() for x in connection.split(',')]
close_it = 0
if self._version == '1.0':
if 'keep-alive' in connection_tokens:
if not self.has_key ('content-length'):
close_it = 1
else:
self['Connection'] = 'Keep-Alive'
else:
close_it = 1
elif self._version == '1.1':
if 'close' in connection_tokens:
close_it = 1
elif not self.has_key ('content-length'):
if self.has_key ('transfer-encoding'):
if self['Transfer-Encoding'] == 'chunked':
chunking = 1
else:
close_it = 1
else:
self['Transfer-Encoding'] = 'chunked'
chunking = 1
elif self._version == '0.9':
close_it = 1
if close_it:
self['Connection'] = 'close'
self._close = 1
self._chunking = chunking
self['Server'] = 'IronPort httpd/%s' % __version__
self['Date'] = http_date.build_http_date (coro.now_usec / coro.microseconds)
headers = [self.response (self._reply_code)] + [
('%s: %s' % x) for x in self._reply_headers.items()
] + [
(x.output()) for x in self._reply_cookies
] + ['\r\n']
#print '\n'.join (headers) + '\n\n'
return '\r\n'.join (headers)
def send_headers (self):
# this will force the headers to be sent...
self.push ('')
def response (self, code=200):
message = self.responses[code]
self._reply_code = code
return 'HTTP/%s %d %s' % (self._version, code, message)
def error (self, code, reason=None, with_body=1):
self._reply_code = code
if with_body:
message = self.responses[code]
s = self.DEFAULT_ERROR_MESSAGE % {
'code': code, 'message': message, 'reason': reason
}
self['Content-Length'] = len(s)
self['Content-Type'] = 'text/html'
self.push (s)
self.done (with_body)
else:
self.done (with_body)
self._error = 1
def set_user_id(self, user_id, session_id=None):
self._user_id = user_id or '-'
self._session_id = session_id or '-'
def log_date_string (self, when):
return time.strftime (
'%d/%b/%Y:%H:%M:%S ',
time.gmtime(when)
) + tz_for_log
def log (self):
tend = time.time()
whom = '%s:%d ' % self._whom
return '%s - - [%s] "%s" %d %d %0.2f\n' % (
whom,
self.log_date_string (tend),
self._request,
self._reply_code,
self._sent_bytes,
tend - self._tstart
)
responses = {
100: "Continue",
101: "Switching Protocols",
200: "OK",
201: "Created",
202: "Accepted",
203: "Non-Authoritative Information",
204: "No Content",
205: "Reset Content",
206: "Partial Content",
300: "Multiple Choices",
301: "Moved Permanently",
302: "Moved Temporarily",
303: "See Other",
304: "Not Modified",
305: "Use Proxy",
400: "Bad Request",
401: "Unauthorized",
402: "Payment Required",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
406: "Not Acceptable",
407: "Proxy Authentication Required",
408: "Request Time-out",
409: "Conflict",
410: "Gone",
411: "Length Required",
412: "Precondition Failed",
413: "Request Entity Too Large",
414: "Request-URI Too Large",
415: "Unsupported Media Type",
500: "Internal Server Error",
501: "Not Implemented",
502: "Bad Gateway",
503: "Service Unavailable",
504: "Gateway Time-out",
505: "HTTP Version not supported"
}
# Default error message
DEFAULT_ERROR_MESSAGE = '\r\n'.join ([
'<html>',
'<head>',
'<title>Error response</title>',
'</head>',
'<body>',
'<h1>Error response</h1>',
'<p>Error code %(code)d.',
'<p>Message: %(message)s.',
'<p>Reason: %(reason)s.',
'</body>',
'</html>',
''
])
import pprint
class coro_status_handler:
def match (self, request):
return request._path.split ('/')[1] == 'status'
def clean (self, s):
s = s.replace ('<','&lt;')
s = s.replace ('>','&gt;')
return s
def handle_request (self, request):
request['Content-Type'] = 'text/html'
request.push ('<p>Listening on\r\n')
request.push ( repr(request._server.addr) )
request.push ('</p>\r\n')
request.push ('<p>Request dump</p><PRE>\r\n')
request.push ( pprint.pformat(request) )
request.push ('</PRE>\r\n')
request.push ('<ul>\r\n')
all_threads = map(lambda x: (x[1], coro.where(x[1])), coro.all_threads.items())
for thread, traceback in all_threads:
request.push ('<li>%s\r\n' % self.clean (repr(thread)))
request.push ('<pre>\r\n')
for level in traceback[1:-1].split ('|'):
[file, fun, line] = level.split(':')
request.push ('<b>%20s</b>:%03d %s\r\n' % (fun,int(line),file))
request.push ('</pre>')
request.push ('</ul>\r\n')
request.done()
class http_file_handler:
def __init__ (self, doc_root):
self.doc_root = doc_root
self.logfp = sys.stderr
def match (self, request):
path = request._path
filename = os.path.join (self.doc_root, path[1:])
if os.path.exists (filename):
return 1
return 0
crack_if_modified_since = re.compile ('([^;]+)(; length=([0-9]+))?$', re.IGNORECASE)
def handle_request (self, request):
path = request._path
filename = os.path.join (self.doc_root, path[1:])
if request._method not in ('get', 'head'):
request.error (405)
return
if os.path.isdir (filename):
filename = os.path.join (filename, 'index.html')
if not os.path.isfile (filename):
request.error (404)
else:
stat_info = os.stat (filename)
mtime = stat_info[stat.ST_MTIME]
file_length = stat_info[stat.ST_SIZE]
ims = request.get_request_header ('if-modified-since')
if ims:
length_match = 1
m = self.crack_if_modified_since.match (ims)
if m:
length = m.group (3)
if length:
if int(length) != file_length:
length_match = 0
ims_date = http_date.parse_http_date (m.group(1))
if length_match and ims_date:
if mtime <= ims_date:
request.error (304, with_body=0)
return
base, ext = os.path.splitext (filename)
ext = ext[1:].lower()
request['Content-Type'] = mime_type_table.content_type_map.get (ext, 'text/plain')
request['Last-Modified'] = http_date.build_http_date (mtime)
if request._method == 'get':
f = open (filename, 'rb')
block = f.read (32768)
if not block:
request.error (204) # no content
else:
while 1:
request.push (block)
block = f.read (32768)
if not block:
break
elif request._method == 'head':
pass
else:
# should be impossible
request.error (405)
class http_server:
def __init__ (self):
self._handlers = []
self.shutdown_flag = 0
self.thread_id = None
self.addr = ()
self.config = {}
self._qlog_code = 'http'
def set_config (self, name, value):
self.config[name] = value
def get_config (self, name):
return self.config.get(name, None)
def is_secure(self):
return hasattr(self, 'cert')
def push_handler (self, handler):
self._handlers.append (handler)
def _make_socket(self):
server_s = coro.make_socket (socket.AF_INET, socket.SOCK_STREAM)
server_s.set_reuse_addr()
return server_s
def start (self, addr, retries=5):
"""Start the web server listening on addr in a new coroutine.
Try up to retries time to bind to that address.
Raises an exception if the bind fails."""
server_s = self._make_socket()
done = 0
save_errno = 0
self.addr = addr
while not done:
for x in xrange (retries):
try:
was_eaddrinuse = 0
server_s.bind (addr)
except OSError, why:
if why.errno not in (errno.EADDRNOTAVAIL, errno.EADDRINUSE):
raise
else:
save_errno = 0
if why.errno == errno.EADDRINUSE:
was_eaddrinuse = 1
else:
done = 1
break
coro.sleep_relative(1) # ... and retry
else:
coro.print_stderr ("cannot bind to %s:%d after 5 attempts, errno = %d\n" % (addr[0], addr[1], save_errno))
if was_eaddrinuse:
qlog.write('WEBUI.PORT_IN_USE',
addr[0], str(addr[1]))
coro.sleep_relative(15)
server_s.listen (1024)
c = coro.spawn(self._run, server_s)
c.set_name('http_server (%s:%d)' % addr)
return 1 # in case the caller is expecting TRUE on success
def _run (self, server_s):
secure = self.is_secure()
self.thread_id = coro.current().thread_id()
while not self.shutdown_flag:
try:
conn, addr = server_s.accept()
client = http_client()
coro.spawn (client.run, conn, addr, self, self._handlers)
except coro.Shutdown:
# server-shutdown
break
except:
qlog.write('COMMON.APP_FAILURE',
('%s accept handler error %s' %
(self.__class__.__name__, coro.compact_traceback())))
coro.sleep_relative(0.25)
continue
server_s.close()
return None
def shutdown(self):
self.shutdown_flag = 1
try:
thread = coro.get_thread_by_id(self.thread_id)
thread.shutdown()
except KeyError:
return # already exited
class https_server (http_server):
def __init__ (self):
http_server.__init__(self)
self._qlog_code = 'https'
def _make_socket (self):
global ssl_ctx
ssl_sock = coro_ssl.ssl_sock(ssl_ctx)
ssl_sock.create()
return ssl_sock
# Copied from medusa/http_server.py
def compute_timezone_for_log ():
if time.daylight:
tz = time.altzone
else:
tz = time.timezone
if tz > 0:
neg = 1
else:
neg = 0
tz = -tz
h, rem = divmod (tz, 3600)
m, rem = divmod (rem, 60)
if neg:
return '-%02d%02d' % (h, m)
else:
return '+%02d%02d' % (h, m)
# if you run this program over a TZ change boundary, this will be invalid.
tz_for_log = compute_timezone_for_log()
if __name__ == '__main__':
import backdoor, grp, os
if len (sys.argv) > 1:
doc_root = sys.argv[1]
else:
doc_root = '.'
import coro_httpd
init_ssl()
update_cert_key(coro_ssl.CERT, coro_ssl.KEY)
coro_httpd.init_ssl()
coro_httpd.update_cert_key(coro_ssl.CERT, coro_ssl.KEY)
# server = https_server()
server = http_server()
file_handler = http_file_handler (doc_root)
server.push_handler (coro_status_handler())
server.push_handler (file_handler)
#coro.spawn (server._run, (('0.0.0.0', 9001)))
coro.spawn(server.start, ('0.0.0.0', 9001))
# server.start((('0.0.0.0', 9001)))
coro.spawn (backdoor.serve)
coro.event_loop (30.0)
# -*- Python -*-
# Converted by ./convert_mime_type_table.py from:
# /usr/src2/apache_1.2b6/conf/mime.types
#
content_type_map = \
{
'ai': 'application/postscript',
'aif': 'audio/x-aiff',
'aifc': 'audio/x-aiff',
'aiff': 'audio/x-aiff',
'au': 'audio/basic',
'avi': 'video/x-msvideo',
'bcpio': 'application/x-bcpio',
'bin': 'application/octet-stream',
'cdf': 'application/x-netcdf',
'class': 'application/octet-stream',
'cpio': 'application/x-cpio',
'cpt': 'application/mac-compactpro',
'csh': 'application/x-csh',
'dcr': 'application/x-director',
'dir': 'application/x-director',
'dms': 'application/octet-stream',
'doc': 'application/msword',
'dvi': 'application/x-dvi',
'dxr': 'application/x-director',
'eps': 'application/postscript',
'etx': 'text/x-setext',
'exe': 'application/octet-stream',
'gif': 'image/gif',
'gtar': 'application/x-gtar',
'gz': 'application/x-gzip',
'hdf': 'application/x-hdf',
'hqx': 'application/mac-binhex40',
'htm': 'text/html',
'html': 'text/html',
'ice': 'x-conference/x-cooltalk',
'ief': 'image/ief',
'jpe': 'image/jpeg',
'jpeg': 'image/jpeg',
'jpg': 'image/jpeg',
'kar': 'audio/midi',
'latex': 'application/x-latex',
'lha': 'application/octet-stream',
'lzh': 'application/octet-stream',
'man': 'application/x-troff-man',
'me': 'application/x-troff-me',
'mid': 'audio/midi',
'midi': 'audio/midi',
'mif': 'application/x-mif',
'mov': 'video/quicktime',
'movie': 'video/x-sgi-movie',
'mp2': 'audio/mpeg',
'mpe': 'video/mpeg',
'mpeg': 'video/mpeg',
'mpg': 'video/mpeg',
'mpga': 'audio/mpeg',
'mp3': 'audio/mpeg',
'ms': 'application/x-troff-ms',
'nc': 'application/x-netcdf',
'oda': 'application/oda',
'pbm': 'image/x-portable-bitmap',
'pdb': 'chemical/x-pdb',
'pdf': 'application/pdf',
'pgm': 'image/x-portable-graymap',
'png': 'image/png',
'pnm': 'image/x-portable-anymap',
'ppm': 'image/x-portable-pixmap',
'ppt': 'application/powerpoint',
'ps': 'application/postscript',
'qt': 'video/quicktime',
'ra': 'audio/x-realaudio',
'ram': 'audio/x-pn-realaudio',
'ras': 'image/x-cmu-raster',
'rgb': 'image/x-rgb',
'roff': 'application/x-troff',
'rpm': 'audio/x-pn-realaudio-plugin',
'rtf': 'application/rtf',
'rtx': 'text/richtext',
'sgm': 'text/x-sgml',
'sgml': 'text/x-sgml',
'sh': 'application/x-sh',
'shar': 'application/x-shar',
'sit': 'application/x-stuffit',
'skd': 'application/x-koan',
'skm': 'application/x-koan',
'skp': 'application/x-koan',
'skt': 'application/x-koan',
'snd': 'audio/basic',
'src': 'application/x-wais-source',
'sv4cpio': 'application/x-sv4cpio',
'sv4crc': 'application/x-sv4crc',
't': 'application/x-troff',
'tar': 'application/x-tar',
'tcl': 'application/x-tcl',
'tex': 'application/x-tex',
'texi': 'application/x-texinfo',
'texinfo': 'application/x-texinfo',
'tif': 'image/tiff',
'tiff': 'image/tiff',
'tr': 'application/x-troff',
'tsv': 'text/tab-separated-values',
'txt': 'text/plain',
'ustar': 'application/x-ustar',
'vcd': 'application/x-cdlink',
'vrml': 'x-world/x-vrml',
'wav': 'audio/x-wav',
'wrl': 'x-world/x-vrml',
'xbm': 'image/x-xbitmap',
'xpm': 'image/x-xpixmap',
'xwd': 'image/x-xwindowdump',
'xyz': 'chemical/x-pdb',
'zip': 'application/zip',
}
This source diff could not be displayed because it is too large. You can view the blob instead.
# -*- Mode: Python; tab-width: 4 -*-
import coro
import string
import re
import time
h_re = re.compile (r'([^: ]+): (.*)')
def get_header (header, headers):
for h in headers:
m = h_re.match (h)
if m:
name, value = m.groups()
if string.lower (name) == header:
return value
return None
def extract_session (cookie):
parts = string.split (cookie, ';')
for part in parts:
pair = string.split (part, '=')
if len(pair) == 2:
if pair[0] == 'session':
return pair[1]
return None
class session_handler:
def __init__ (self, name, function):
self.name = name
self.function = function
self.sessions = {}
def match (self, request):
path = string.split (request._path, '/')
if (len(path) > 1) and (path[1] == self.name):
return 1
else:
return 0
def get_next_request (self):
return coro._yield()
def find_session (self, request):
cookie = get_header ('cookie', request._request_headers)
if cookie:
sid = extract_session (cookie)
return sid, self.sessions.get (sid, None)
else:
return None, None
def gen_session_id (self):
import random
import sys
sid = None
while self.sessions.has_key (sid):
n = random.randint (0,sys.maxint-1)
sid = hex(n)[2:]
return sid
expires_delta = 100 * 86400
def handle_request (self, request):
sid, c = self.find_session (request)
# The sid=='None' test is temporary hack, can probably remove it
if (not sid) or (sid=='None'):
sid = self.gen_session_id()
if c and c.isAlive():
# is c already running?
# hack, must grok this
coro.schedule (c, request)
request._done = 1
else:
# login
c = coro.new (self.function, self, request, sid)
# Wdy, DD-Mon-YYYY HH:MM:SS GMT
expires = time.strftime ('%a, %d-%b-%Y 00:00:00 GMT', time.gmtime (int (time.time()) + self.expires_delta))
request['Set-Cookie'] = 'session=%s; path=/; expires=%s' % (sid, expires)
# hack, must grok this
request._done = 1
c.start()
......@@ -108,13 +108,13 @@ setup (
],
),
],
packages=['coro', 'coro.clocks'],
packages=['coro', 'coro.clocks', 'coro.http'],
package_dir = {
'': 'coroutine',
'coro': 'coro',
'coro.clocks': 'coro/clocks'
},
py_modules = ['backdoor', 'coro_process', 'coro_unittest'],
py_modules = ['backdoor', 'coro.read_stream', 'coro_process', 'coro_unittest',],
download_url = 'http://github.com/ironport/shrapnel/tarball/master#egg=coro-1.0.2',
install_requires = ['Cython>=0.12.1', 'distribute>=0.6.16'],
cmdclass={'build_ext': build_ext},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment