Commit 5e529dcf authored by Amos Latteier's avatar Amos Latteier

Synched with latest medusa sources from CVS. The main changes are the...

Synched with latest medusa sources from CVS. The main changes are the structure of asyncore.socket_map which now maps fds to channels which should provide a slight performance improvement.
parent 09bb46b6
# -*- Mode: Python; tab-width: 4 -*- # -*- Mode: Python; tab-width: 4 -*-
# $Id: asynchat.py,v 1.9 1999/07/19 17:43:47 amos Exp $ # $Id: asynchat.py,v 1.10 2000/01/14 02:35:56 amos Exp $
# Author: Sam Rushing <rushing@nightmare.com> # Author: Sam Rushing <rushing@nightmare.com>
# ====================================================================== # ======================================================================
...@@ -219,7 +219,7 @@ class async_chat (asyncore.dispatcher): ...@@ -219,7 +219,7 @@ class async_chat (asyncore.dispatcher):
def discard_buffers (self): def discard_buffers (self):
# Emergencies only! # Emergencies only!
self.ac_in_buffer = '' self.ac_in_buffer = ''
self.ac_out_buffer == '' self.ac_out_buffer = ''
while self.producer_fifo: while self.producer_fifo:
self.producer_fifo.pop() self.producer_fifo.pop()
......
# -*- Mode: Python; tab-width: 4 -*- # -*- Mode: Python; tab-width: 4 -*-
# $Id: asyncore.py,v 1.6 1999/07/26 07:06:36 amos Exp $ # $Id: asyncore.py,v 1.7 2000/01/14 02:35:56 amos Exp $
# Author: Sam Rushing <rushing@nightmare.com> # Author: Sam Rushing <rushing@nightmare.com>
# ====================================================================== # ======================================================================
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ====================================================================== # ======================================================================
import exceptions
import select import select
import socket import socket
import string import string
...@@ -41,60 +42,83 @@ if os.name == 'nt': ...@@ -41,60 +42,83 @@ if os.name == 'nt':
else: else:
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN
socket_map = {} try:
socket_map
except NameError:
socket_map = {}
class ExitNow (exceptions.Exception):
pass
DEBUG = 0
def poll (timeout=0.0): def poll (timeout=0.0):
global DEBUG
if socket_map: if socket_map:
r = []; w = []; e = [] r = []; w = []; e = []
for s in socket_map.keys(): for fd, obj in socket_map.items():
if s.readable(): if obj.readable():
r.append (s) r.append (fd)
if s.writable(): if obj.writable():
w.append (s) w.append (fd)
r,w,e = select.select (r,w,e, timeout)
(r,w,e) = select.select (r,w,e, timeout) if DEBUG:
print r,w,e
for x in r: for fd in r:
try: try:
x.handle_read_event() obj = socket_map[fd]
except: try:
x.handle_error() obj.handle_read_event()
for x in w: except ExitNow:
raise ExitNow
except:
obj.handle_error()
except KeyError:
pass
for fd in w:
try: try:
x.handle_write_event() obj = socket_map[fd]
except: try:
x.handle_error() obj.handle_write_event()
except ExitNow:
raise ExitNow
except:
obj.handle_error()
except KeyError:
pass
def poll2 (timeout=0.0): def poll2 (timeout=0.0):
import poll import poll
# timeout is in milliseconds # timeout is in milliseconds
timeout = int(timeout*1000) timeout = int(timeout*1000)
if socket_map: if socket_map:
fd_map = {}
for s in socket_map.keys():
fd_map[s.fileno()] = s
l = [] l = []
for fd, s in fd_map.items(): for fd, obj in socket_map.items():
flags = 0 flags = 0
if s.readable(): if obj.readable():
flags = poll.POLLIN flags = poll.POLLIN
if s.writable(): if obj.writable():
flags = flags | poll.POLLOUT flags = flags | poll.POLLOUT
if flags: if flags:
l.append (fd, flags) l.append (fd, flags)
r = poll.poll (l, timeout) r = poll.poll (l, timeout)
for fd, flags in r: for fd, flags in r:
s = fd_map[fd]
try: try:
if (flags & poll.POLLIN): obj = socket_map[fd]
s.handle_read_event() try:
if (flags & poll.POLLOUT): if (flags & poll.POLLIN):
s.handle_write_event() obj.handle_read_event()
if (flags & poll.POLLERR): if (flags & poll.POLLOUT):
s.handle_expt_event() obj.handle_write_event()
except: except ExitNow:
s.handle_error() raise ExitNow
except:
obj.handle_error()
except KeyError:
pass
def loop (timeout=30.0, use_poll=0): def loop (timeout=30.0, use_poll=0):
...@@ -143,23 +167,25 @@ class dispatcher: ...@@ -143,23 +167,25 @@ class dispatcher:
return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar) return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
def add_channel (self): def add_channel (self):
self.log_info ('adding channel %s' % self) #self.log_info ('adding channel %s' % self)
socket_map [self] = 1 socket_map [self._fileno] = self
def del_channel (self): def del_channel (self):
if socket_map.has_key (self): fd = self._fileno
self.log_info ('closing channel %d:%s' % (self.fileno(), self)) if socket_map.has_key (fd):
del socket_map [self] #self.log_info ('closing channel %d:%s' % (fd, self))
del socket_map [fd]
def create_socket (self, family, type): def create_socket (self, family, type):
self.family_and_type = family, type self.family_and_type = family, type
self.socket = socket.socket (family, type) self.socket = socket.socket (family, type)
self.socket.setblocking(0) self.socket.setblocking(0)
self._fileno = self.socket.fileno()
self.add_channel() self.add_channel()
def set_socket (self, socket): def set_socket (self, sock):
# This is done so we can be called safely from __init__ self.__dict__['socket'] = sock
self.__dict__['socket'] = socket self._fileno = sock.fileno()
self.add_channel() self.add_channel()
def set_reuse_addr (self): def set_reuse_addr (self):
...@@ -261,20 +287,19 @@ class dispatcher: ...@@ -261,20 +287,19 @@ class dispatcher:
# cheap inheritance, used to pass all other attribute # cheap inheritance, used to pass all other attribute
# references to the underlying socket object. # references to the underlying socket object.
# NOTE: this may be removed soon for performance reasons.
def __getattr__ (self, attr): def __getattr__ (self, attr):
return getattr (self.socket, attr) return getattr (self.socket, attr)
# log and log_info maybe overriden to provide more sophisitcated # log and log_info maybe overriden to provide more sophisitcated
# logging and warning methods. In general, log is for 'hit' logging # logging and warning methods. In general, log is for 'hit' logging
# and 'log_info' is for informational, warning and error logging. # and 'log_info' is for informational, warning and error logging.
def log (self, message): def log (self, message):
print 'log:', message sys.stderr.write ('log: %s\n' % str(message))
def log_info (self, message, type='info'): def log_info (self, message, type='info'):
if __debug__ or type != 'info': if __debug__ or type != 'info':
print '%s: %s' %(type, message) print '%s: %s' % (type, message)
def handle_read_event (self): def handle_read_event (self):
if self.accepting: if self.accepting:
...@@ -398,7 +423,7 @@ def compact_traceback (): ...@@ -398,7 +423,7 @@ def compact_traceback ():
def close_all (): def close_all ():
global socket_map global socket_map
for x in socket_map.keys(): for x in socket_map.values():
x.socket.close() x.socket.close()
socket_map.clear() socket_map.clear()
...@@ -449,6 +474,7 @@ if os.name == 'posix': ...@@ -449,6 +474,7 @@ if os.name == 'posix':
self.set_file (fd) self.set_file (fd)
def set_file (self, fd): def set_file (self, fd):
self._fileno = fd
self.socket = file_wrapper (fd) self.socket = file_wrapper (fd)
self.add_channel() self.add_channel()
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
# If you are interested in using this software in a commercial context, # If you are interested in using this software in a commercial context,
# or in purchasing support, please contact the author. # or in purchasing support, please contact the author.
RCS_ID = '$Id: ftp_server.py,v 1.8 1999/11/15 21:53:56 amos Exp $' RCS_ID = '$Id: ftp_server.py,v 1.9 2000/01/14 02:35:56 amos Exp $'
# An extensible, configurable, asynchronous FTP server. # An extensible, configurable, asynchronous FTP server.
# #
...@@ -336,6 +336,7 @@ class ftp_channel (asynchat.async_chat): ...@@ -336,6 +336,7 @@ class ftp_channel (asynchat.async_chat):
self.current_mode = t self.current_mode = t
self.respond ('200 Type set to %s.' % self.type_map[t]) self.respond ('200 Type set to %s.' % self.type_map[t])
def cmd_quit (self, line): def cmd_quit (self, line):
'terminate session' 'terminate session'
self.respond ('221 Goodbye.') self.respond ('221 Goodbye.')
...@@ -847,13 +848,16 @@ class xmit_channel (asynchat.async_chat): ...@@ -847,13 +848,16 @@ class xmit_channel (asynchat.async_chat):
self.channel = channel self.channel = channel
self.client_addr = client_addr self.client_addr = client_addr
asynchat.async_chat.__init__ (self) asynchat.async_chat.__init__ (self)
# def __del__ (self):
# print 'xmit_channel.__del__()'
def readable (self): def log (*args):
return 0
def log(self, *args):
pass pass
def readable (self):
return not self.connected
def writable (self): def writable (self):
return 1 return 1
...@@ -1054,13 +1058,7 @@ if os.name == 'posix': ...@@ -1054,13 +1058,7 @@ if os.name == 'posix':
self.log_info('FTP server shutting down. (received SIGINT)', 'warning') self.log_info('FTP server shutting down. (received SIGINT)', 'warning')
# close everything down on SIGINT. # close everything down on SIGINT.
# of course this should be a cleaner shutdown. # of course this should be a cleaner shutdown.
sm = socket.socket_map asyncore.close_all()
socket.socket_map = {}
for sock in sm.values():
try:
sock.close()
except:
pass
if __name__ == '__main__': if __name__ == '__main__':
test (sys.argv[1]) test (sys.argv[1])
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
# interested in using this software in a commercial context, or in # interested in using this software in a commercial context, or in
# purchasing support, please contact the author. # purchasing support, please contact the author.
RCS_ID = '$Id: http_server.py,v 1.13 1999/11/15 21:53:56 amos Exp $' RCS_ID = '$Id: http_server.py,v 1.14 2000/01/14 02:35:56 amos Exp $'
# python modules # python modules
import os import os
...@@ -269,7 +269,7 @@ class http_request: ...@@ -269,7 +269,7 @@ class http_request:
user_agent=self.get_header('user-agent') user_agent=self.get_header('user-agent')
if not user_agent: user_agent='' if not user_agent: user_agent=''
referer=self.get_header('referer') referer=self.get_header('referer')
if not referer: referer='' if not referer: referer=''
self.channel.server.logger.log ( self.channel.server.logger.log (
self.channel.addr[0], self.channel.addr[0],
' - - [%s] "%s" %d %d "%s" "%s"\n' % ( ' - - [%s] "%s" %d %d "%s" "%s"\n' % (
...@@ -385,7 +385,7 @@ class http_channel (asynchat.async_chat): ...@@ -385,7 +385,7 @@ class http_channel (asynchat.async_chat):
def kill_zombies (self): def kill_zombies (self):
now = int (time.time()) now = int (time.time())
for channel in asyncore.socket_map.keys(): for channel in asyncore.socket_map.values():
if channel.__class__ == self.__class__: if channel.__class__ == self.__class__:
if (now - channel.creation_time) > channel.zombie_timeout: if (now - channel.creation_time) > channel.zombie_timeout:
channel.close() channel.close()
...@@ -448,7 +448,7 @@ class http_channel (asynchat.async_chat): ...@@ -448,7 +448,7 @@ class http_channel (asynchat.async_chat):
# -------------------------------------------------- # --------------------------------------------------
# crack the request header # crack the request header
# -------------------------------------------------- # --------------------------------------------------
while lines and not lines[0]: while lines and not lines[0]:
# as per the suggestion of http-1.1 section 4.1, (and # as per the suggestion of http-1.1 section 4.1, (and
# Eric Parker <eparker@zyvex.com>), ignore a leading # Eric Parker <eparker@zyvex.com>), ignore a leading
...@@ -461,27 +461,18 @@ class http_channel (asynchat.async_chat): ...@@ -461,27 +461,18 @@ class http_channel (asynchat.async_chat):
return return
request = lines[0] request = lines[0]
try: command, uri, version = crack_request (request)
command, uri, version = crack_request (request)
except:
# deal with broken HTTP requests
try:
# maybe there were spaces in the URL
parts=string.split(request)
command, uri, version = crack_request(
'%s %s %s' % (parts[0], parts[1], parts[-1]))
except:
self.log_info('Bad HTTP request: %s' % request, 'error')
r = http_request (self, request,
None, None, None, join_headers(lines[1:]))
r.error(400)
return
header = join_headers (lines[1:]) header = join_headers (lines[1:])
r = http_request (self, request, command, uri, version, header) r = http_request (self, request, command, uri, version, header)
self.request_counter.increment() self.request_counter.increment()
self.server.total_requests.increment() self.server.total_requests.increment()
if command is None:
self.log_info('Bad HTTP request: %s' % request, 'error')
r.error(400)
return
# -------------------------------------------------- # --------------------------------------------------
# handler selection and dispatch # handler selection and dispatch
# -------------------------------------------------- # --------------------------------------------------
...@@ -612,7 +603,14 @@ class http_server (asyncore.dispatcher): ...@@ -612,7 +603,14 @@ class http_server (asyncore.dispatcher):
# accept. socketmodule.c:makesockaddr complains that the # accept. socketmodule.c:makesockaddr complains that the
# address family is unknown. We don't want the whole server # address family is unknown. We don't want the whole server
# to shut down because of this. # to shut down because of this.
self.log_info('Server accept() threw an exception', 'warning') self.log_info ('warning: server accept() threw an exception', 'warning')
return
except TypeError:
# unpack non-sequence. this can happen when a read event
# fires on a listening socket, but when we call accept()
# we get EWOULDBLOCK, so dispatcher.accept() returns None.
# Seen on FreeBSD3.
self.log_info ('warning: server accept() threw EWOULDBLOCK', 'warning')
return return
self.channel_class (self, conn, addr) self.channel_class (self, conn, addr)
...@@ -689,6 +687,8 @@ def crack_request (r): ...@@ -689,6 +687,8 @@ def crack_request (r):
else: else:
version = None version = None
return string.lower (REQUEST.group (1)), REQUEST.group(2), version return string.lower (REQUEST.group (1)), REQUEST.group(2), version
else:
return None, None, None
class fifo: class fifo:
def __init__ (self, list=None): def __init__ (self, list=None):
......
# -*- Mode: Python; tab-width: 4 -*- # -*- Mode: Python; tab-width: 4 -*-
# Author: Sam Rushing <rushing@nightmare.com> # Author: Sam Rushing <rushing@nightmare.com>
# #
# python REPL channel. # python REPL channel.
# #
RCS_ID = '$Id: monitor.py,v 1.5 1999/07/21 17:15:53 klm Exp $' RCS_ID = '$Id: monitor.py,v 1.6 2000/01/14 02:35:56 amos Exp $'
import md5 import md5
import socket import socket
import string import string
import sys import sys
import time import time
import traceback
VERSION = string.split(RCS_ID)[2] VERSION = string.split(RCS_ID)[2]
...@@ -23,330 +22,325 @@ from counter import counter ...@@ -23,330 +22,325 @@ from counter import counter
import producers import producers
class monitor_channel (asynchat.async_chat): class monitor_channel (asynchat.async_chat):
try_linemode = 1 try_linemode = 1
def __init__ (self, server, sock, addr): def __init__ (self, server, sock, addr):
asynchat.async_chat.__init__ (self, sock) asynchat.async_chat.__init__ (self, sock)
self.server = server self.server = server
self.addr = addr self.addr = addr
self.set_terminator ('\r\n') self.set_terminator ('\r\n')
self.data = '' self.data = ''
# local bindings specific to this channel # local bindings specific to this channel
self.local_env = {} self.local_env = sys.modules['__main__'].__dict__.copy()
self.push ('Python ' + sys.version + '\r\n') self.push ('Python ' + sys.version + '\r\n')
self.push (sys.copyright+'\r\n') self.push (sys.copyright+'\r\n')
self.push ('Welcome to %s\r\n' % self) self.push ('Welcome to %s\r\n' % self)
self.push ("[Hint: try 'from __main__ import *']\r\n") self.push ("[Hint: try 'from __main__ import *']\r\n")
self.prompt() self.prompt()
self.number = server.total_sessions.as_long() self.number = server.total_sessions.as_long()
self.line_counter = counter() self.line_counter = counter()
self.multi_line = [] self.multi_line = []
def handle_connect (self): def handle_connect (self):
# send IAC DO LINEMODE # send IAC DO LINEMODE
self.push ('\377\375\"') self.push ('\377\375\"')
def close (self): def close (self):
self.server.closed_sessions.increment() self.server.closed_sessions.increment()
asynchat.async_chat.close(self) asynchat.async_chat.close(self)
def prompt (self): def prompt (self):
self.push ('>>> ') self.push ('>>> ')
def collect_incoming_data (self, data): def collect_incoming_data (self, data):
self.data = self.data + data self.data = self.data + data
if len(self.data) > 1024: if len(self.data) > 1024:
# denial of service. # denial of service.
self.push ('BCNU\r\n') self.push ('BCNU\r\n')
self.close_when_done() self.close_when_done()
def found_terminator (self): def found_terminator (self):
line = self.clean_line (self.data) line = self.clean_line (self.data)
self.data = '' self.data = ''
self.line_counter.increment() self.line_counter.increment()
# check for special case inputs... # check for special case inputs...
if not line and not self.multi_line: if not line and not self.multi_line:
self.prompt() self.prompt()
return return
if line in ['\004', 'exit']: if line in ['\004', 'exit']:
self.push ('BCNU\r\n') self.push ('BCNU\r\n')
self.close_when_done() self.close_when_done()
return return
oldout = sys.stdout oldout = sys.stdout
olderr = sys.stderr olderr = sys.stderr
try: try:
p = output_producer(self, olderr) p = output_producer(self, olderr)
sys.stdout = p sys.stdout = p
sys.stderr = p sys.stderr = p
try: try:
# this is, of course, a blocking operation. # this is, of course, a blocking operation.
# if you wanted to thread this, you would have # if you wanted to thread this, you would have
# to synchronize, etc... and treat the output # to synchronize, etc... and treat the output
# like a pipe. Not Fun. # like a pipe. Not Fun.
# #
# try eval first. If that fails, try exec. If that fails, # try eval first. If that fails, try exec. If that fails,
# hurl. # hurl.
try: try:
if self.multi_line: if self.multi_line:
# oh, this is horrible... # oh, this is horrible...
raise SyntaxError raise SyntaxError
co = compile (line, repr(self), 'eval') co = compile (line, repr(self), 'eval')
result = eval (co, self.local_env) result = eval (co, self.local_env)
method = 'eval' method = 'eval'
if result is not None: if result is not None:
self.log_info(repr(result)) print repr(result)
print repr(result) self.local_env['_'] = result
self.local_env['_'] = result except SyntaxError:
except SyntaxError: try:
try: if self.multi_line:
if self.multi_line: if line and line[0] in [' ','\t']:
if line and line[0] in [' ','\t']: self.multi_line.append (line)
self.multi_line.append (line) self.push ('... ')
self.push ('... ') return
return else:
else: self.multi_line.append (line)
self.multi_line.append (line) line = string.join (self.multi_line, '\n')
line = string.join (self.multi_line, '\n') co = compile (line, repr(self), 'exec')
co = compile (line, repr(self), 'exec') self.multi_line = []
self.multi_line = [] else:
else: co = compile (line, repr(self), 'exec')
co = compile (line, repr(self), 'exec') except SyntaxError, why:
except SyntaxError, why: if why[0] == 'unexpected EOF while parsing':
if why[0] == 'unexpected EOF while parsing': self.push ('... ')
self.push ('... ') self.multi_line.append (line)
self.multi_line.append (line) return
return exec co in self.local_env
exec co in self.local_env method = 'exec'
method = 'exec' except:
except: method = 'exception'
method = 'exception' self.multi_line = []
self.multi_line = [] (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
t, v, tb = sys.exc_info() self.log_info('%s %s %s' %(t, v, tbinfo), 'warning')
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback() finally:
self.log_info('%s %s %s' %(t, v, tbinfo), 'warning') sys.stdout = oldout
traceback.print_exception(t, v, tb) sys.stderr = olderr
del tb self.log_info('%s:%s (%s)> %s' % (
finally: self.number,
sys.stdout = oldout self.line_counter,
sys.stderr = olderr method,
self.log_info('%s:%s (%s)> %s' % ( repr(line))
self.number, )
self.line_counter, self.push_with_producer (p)
method, self.prompt()
repr(line))
) # for now, we ignore any telnet option stuff sent to
self.push_with_producer (p) # us, and we process the backspace key ourselves.
self.prompt() # gee, it would be fun to write a full-blown line-editing
# environment, etc...
# for now, we ignore any telnet option stuff sent to def clean_line (self, line):
# us, and we process the backspace key ourselves. chars = []
# gee, it would be fun to write a full-blown line-editing for ch in line:
# environment, etc... oc = ord(ch)
def clean_line (self, line): if oc < 127:
chars = [] if oc in [8,177]:
for ch in line: # backspace
oc = ord(ch) chars = chars[:-1]
if oc < 127: else:
if oc in [8,177]: chars.append (ch)
# backspace return string.join (chars, '')
chars = chars[:-1]
else:
chars.append (ch)
return string.join (chars, '')
class monitor_server (asyncore.dispatcher): class monitor_server (asyncore.dispatcher):
SERVER_IDENT = 'Monitor Server (V%s)' % VERSION SERVER_IDENT = 'Monitor Server (V%s)' % VERSION
channel_class = monitor_channel channel_class = monitor_channel
def __init__ (self, hostname='127.0.0.1', port=8023): def __init__ (self, hostname='127.0.0.1', port=8023):
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
self.create_socket (socket.AF_INET, socket.SOCK_STREAM) self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr() self.set_reuse_addr()
self.bind ((hostname, port)) self.bind ((hostname, port))
self.log_info('%s started on port %d' % (self.SERVER_IDENT, port)) self.log_info('%s started on port %d' % (self.SERVER_IDENT, port))
self.listen (5) self.listen (5)
self.closed = 0 self.closed = 0
self.failed_auths = 0 self.failed_auths = 0
self.total_sessions = counter() self.total_sessions = counter()
self.closed_sessions = counter() self.closed_sessions = counter()
def writable (self): def writable (self):
return 0 return 0
def handle_accept (self): def handle_accept (self):
conn, addr = self.accept() conn, addr = self.accept()
self.log_info('Incoming monitor connection from %s:%d' % addr) self.log_info('Incoming monitor connection from %s:%d' % addr)
self.channel_class (self, conn, addr) self.channel_class (self, conn, addr)
self.total_sessions.increment() self.total_sessions.increment()
def status (self): def status (self):
return producers.simple_producer ( return producers.simple_producer (
'<h2>%s</h2>' % self.SERVER_IDENT '<h2>%s</h2>' % self.SERVER_IDENT
+ '<br><b>Total Sessions:</b> %s' % self.total_sessions + '<br><b>Total Sessions:</b> %s' % self.total_sessions
+ '<br><b>Current Sessions:</b> %d' % ( + '<br><b>Current Sessions:</b> %d' % (
self.total_sessions.as_long()-self.closed_sessions.as_long() self.total_sessions.as_long()-self.closed_sessions.as_long()
) )
) )
def hex_digest (s): def hex_digest (s):
m = md5.md5() m = md5.md5()
m.update (s) m.update (s)
return string.joinfields ( return string.joinfields (
map (lambda x: hex (ord (x))[2:], map (None, m.digest())), map (lambda x: hex (ord (x))[2:], map (None, m.digest())),
'', '',
) )
class secure_monitor_channel (monitor_channel): class secure_monitor_channel (monitor_channel):
authorized = 0 authorized = 0
def __init__ (self, server, sock, addr): def __init__ (self, server, sock, addr):
asynchat.async_chat.__init__ (self, sock) asynchat.async_chat.__init__ (self, sock)
self.server = server self.server = server
self.addr = addr self.addr = addr
self.set_terminator ('\r\n') self.set_terminator ('\r\n')
self.data = '' self.data = ''
# local bindings specific to this channel # local bindings specific to this channel
self.local_env = {} self.local_env = {}
# send timestamp string # send timestamp string
self.timestamp = str(time.time()) self.timestamp = str(time.time())
self.count = 0 self.count = 0
self.line_counter = counter() self.line_counter = counter()
self.number = int(server.total_sessions.as_long()) self.number = int(server.total_sessions.as_long())
self.multi_line = [] self.multi_line = []
self.push (self.timestamp + '\r\n') self.push (self.timestamp + '\r\n')
def found_terminator (self): def found_terminator (self):
if not self.authorized: if not self.authorized:
if hex_digest ('%s%s' % (self.timestamp, self.server.password)) != self.data: if hex_digest ('%s%s' % (self.timestamp, self.server.password)) != self.data:
self.log_info ('%s: failed authorization' % self, 'warning') self.log_info ('%s: failed authorization' % self, 'warning')
self.server.failed_auths = self.server.failed_auths + 1 self.server.failed_auths = self.server.failed_auths + 1
self.close() self.close()
else: else:
self.authorized = 1 self.authorized = 1
self.push ('Python ' + sys.version + '\r\n') self.push ('Python ' + sys.version + '\r\n')
self.push (sys.copyright+'\r\n') self.push (sys.copyright+'\r\n')
self.push ('Welcome to %s\r\n' % self) self.push ('Welcome to %s\r\n' % self)
self.push ("[Hint: try 'from __main__ import *']\r\n") self.prompt()
self.prompt() self.data = ''
self.data = '' else:
else: monitor_channel.found_terminator (self)
monitor_channel.found_terminator (self)
class secure_encrypted_monitor_channel (secure_monitor_channel): class secure_encrypted_monitor_channel (secure_monitor_channel):
"Wrap send() and recv() with a stream cipher" "Wrap send() and recv() with a stream cipher"
def __init__ (self, server, conn, addr): def __init__ (self, server, conn, addr):
key = server.password key = server.password
self.outgoing = server.cipher.new (key) self.outgoing = server.cipher.new (key)
self.incoming = server.cipher.new (key) self.incoming = server.cipher.new (key)
secure_monitor_channel.__init__ (self, server, conn, addr) secure_monitor_channel.__init__ (self, server, conn, addr)
def send (self, data): def send (self, data):
# send the encrypted data instead # send the encrypted data instead
ed = self.outgoing.encrypt (data) ed = self.outgoing.encrypt (data)
return secure_monitor_channel.send (self, ed) return secure_monitor_channel.send (self, ed)
def recv (self, block_size): def recv (self, block_size):
data = secure_monitor_channel.recv (self, block_size) data = secure_monitor_channel.recv (self, block_size)
if data: if data:
dd = self.incoming.decrypt (data) dd = self.incoming.decrypt (data)
return dd return dd
else: else:
return data return data
class secure_monitor_server (monitor_server): class secure_monitor_server (monitor_server):
channel_class = secure_monitor_channel channel_class = secure_monitor_channel
def __init__ (self, password, hostname='', port=8023): def __init__ (self, password, hostname='', port=8023):
monitor_server.__init__ (self, hostname, port) monitor_server.__init__ (self, hostname, port)
self.password = password self.password = password
def status (self): def status (self):
p = monitor_server.status (self) p = monitor_server.status (self)
# kludge # kludge
p.data = p.data + ('<br><b>Failed Authorizations:</b> %d' % self.failed_auths) p.data = p.data + ('<br><b>Failed Authorizations:</b> %d' % self.failed_auths)
return p return p
# don't try to print from within any of the methods # don't try to print from within any of the methods
# of this object. 8^) # of this object. 8^)
class output_producer: class output_producer:
def __init__ (self, channel, real_stderr): def __init__ (self, channel, real_stderr):
self.channel = channel self.channel = channel
self.data = '' self.data = ''
# use _this_ for debug output # use _this_ for debug output
self.stderr = real_stderr self.stderr = real_stderr
def check_data (self): def check_data (self):
if len(self.data) > 1<<16: if len(self.data) > 1<<16:
# runaway output, close it. # runaway output, close it.
self.channel.close() self.channel.close()
def write (self, data): def write (self, data):
lines = string.splitfields (data, '\n') lines = string.splitfields (data, '\n')
data = string.join (lines, '\r\n') data = string.join (lines, '\r\n')
self.data = self.data + data self.data = self.data + data
self.check_data() self.check_data()
def writeline (self, line): def writeline (self, line):
self.data = self.data + line + '\r\n' self.data = self.data + line + '\r\n'
self.check_data() self.check_data()
def writelines (self, lines): def writelines (self, lines):
self.data = self.data + string.joinfields ( self.data = self.data + string.joinfields (
lines, lines,
'\r\n' '\r\n'
) + '\r\n' ) + '\r\n'
self.check_data() self.check_data()
def ready (self): def ready (self):
return (len (self.data) > 0) return (len (self.data) > 0)
def flush (self): def flush (self):
pass pass
def softspace (self, *args): def softspace (self, *args):
pass pass
def more (self): def more (self):
if self.data: if self.data:
result = self.data[:512] result = self.data[:512]
self.data = self.data[512:] self.data = self.data[512:]
return result return result
else: else:
return '' return ''
if __name__ == '__main__': if __name__ == '__main__':
import string import string
import sys import sys
if '-s' in sys.argv: if '-s' in sys.argv:
sys.argv.remove ('-s') sys.argv.remove ('-s')
print 'Enter password: ', print 'Enter password: ',
password = raw_input() password = raw_input()
else: else:
password = None password = None
if '-e' in sys.argv: if '-e' in sys.argv:
sys.argv.remove ('-e') sys.argv.remove ('-e')
encrypt = 1 encrypt = 1
else: else:
encrypt = 0 encrypt = 0
print sys.argv print sys.argv
if len(sys.argv) > 1: if len(sys.argv) > 1:
port = string.atoi (sys.argv[1]) port = string.atoi (sys.argv[1])
else: else:
port = 8023 port = 8023
if password is not None: if password is not None:
s = secure_monitor_server (password, '', port) s = secure_monitor_server (password, '', port)
if encrypt: if encrypt:
s.channel_class = secure_encrypted_monitor_channel s.channel_class = secure_encrypted_monitor_channel
import sapphire import sapphire
s.cipher = sapphire s.cipher = sapphire
else: else:
s = monitor_server ('', port) s = monitor_server ('', port)
asyncore.loop() asyncore.loop()
...@@ -60,7 +60,7 @@ class monitor_client (asynchat.async_chat): ...@@ -60,7 +60,7 @@ class monitor_client (asynchat.async_chat):
def handle_close (self): def handle_close (self):
# close all the channels, which will make the standard main # close all the channels, which will make the standard main
# loop exit. # loop exit.
map (lambda x: x.close(), asyncore.socket_map.keys()) map (lambda x: x.close(), asyncore.socket_map.values())
def log (self, *ignore): def log (self, *ignore):
pass pass
......
# -*- Mode: Python; tab-width: 4 -*- # -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING = "$Id: select_trigger.py,v 1.8 1999/10/29 15:11:58 brian Exp $" VERSION_STRING = "$Id: select_trigger.py,v 1.9 2000/01/14 02:35:56 amos Exp $"
import asyncore import asyncore
import asynchat import asynchat
...@@ -9,275 +9,259 @@ import os ...@@ -9,275 +9,259 @@ import os
import socket import socket
import string import string
import thread import thread
if os.name == 'posix': if os.name == 'posix':
class trigger (asyncore.file_dispatcher): class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread" "Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O # This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another # subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to # thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have # select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns. # to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately # If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the # generate a READ event on the trigger object, which will force the
# select() invocation to return. # select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a # A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a # large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is # thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is # acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently # accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries # by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data # rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections] # to low-bandwidth connections]
# The other major feature provided by this class is the ability to # The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger() # move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the # with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main # event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around # purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see # Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some # why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that # new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some] # the main thread is trying to remove some]
def __init__ (self): def __init__ (self):
r, w = os.pipe() r, w = os.pipe()
self.trigger = w self.trigger = w
asyncore.file_dispatcher.__init__ (self, r) asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock() self.lock = thread.allocate_lock()
self.thunks = [] self.thunks = []
def __repr__ (self): def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self) return '<select-trigger (pipe) at %x>' % id(self)
def readable (self): def readable (self):
return 1 return 1
def writable (self): def writable (self):
return 0 return 0
def handle_connect (self): def handle_connect (self):
pass pass
def pull_trigger (self, thunk=None): def pull_trigger (self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks) # print 'PULL_TRIGGER: ', len(self.thunks)
if thunk: if thunk:
try: try:
self.lock.acquire() self.lock.acquire()
self.thunks.append (thunk) self.thunks.append (thunk)
finally: finally:
self.lock.release() self.lock.release()
os.write (self.trigger, 'x') os.write (self.trigger, 'x')
def handle_read (self): def handle_read (self):
self.recv (8192) self.recv (8192)
try: try:
self.lock.acquire() self.lock.acquire()
for thunk in self.thunks: for thunk in self.thunks:
try: try:
thunk() thunk()
except: except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback() (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
self.log_info( print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo), self.thunks = []
'error') finally:
self.thunks = [] self.lock.release()
finally:
self.lock.release()
else: else:
class trigger (asyncore.dispatcher): # win32-safe version
address = ('127.9.9.9', 19999) class trigger (asyncore.dispatcher):
def __init__ (self): address = ('127.9.9.9', 19999)
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM) def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
# tricky: get a pair of connected sockets a.bind (self.address)
host='127.9.9.9' a.listen (1)
port=19999 w.setblocking (0)
while 1: try:
try: w.connect (self.address)
self.address=(host, port) except:
a.bind(self.address) pass
break r, addr = a.accept()
except: a.close()
if port <= 19950: w.setblocking (1)
raise 'Bind Error', 'Cannot bind trigger!' self.trigger = w
port=port - 1
asyncore.dispatcher.__init__ (self, r)
a.listen (1) self.lock = thread.allocate_lock()
w.setblocking (0) self.thunks = []
try: self._trigger_connected = 0
w.connect (self.address)
except: def __repr__ (self):
pass return '<select-trigger (loopback) at %x>' % id(self)
r, addr = a.accept()
a.close() def readable (self):
w.setblocking (1) return 1
self.trigger = w
def writable (self):
asyncore.dispatcher.__init__ (self, r) return 0
self.lock = thread.allocate_lock()
self.thunks = [] def handle_connect (self):
self._trigger_connected = 0 pass
def __repr__ (self): def pull_trigger (self, thunk=None):
return '<select-trigger (loopback) at %x>' % id(self) if thunk:
try:
def readable (self): self.lock.acquire()
return 1 self.thunks.append (thunk)
finally:
def writable (self): self.lock.release()
return 0 self.trigger.send ('x')
def handle_connect (self): def handle_read (self):
pass self.recv (8192)
try:
def pull_trigger (self, thunk=None): self.lock.acquire()
if thunk: for thunk in self.thunks:
try: try:
self.lock.acquire() thunk()
self.thunks.append (thunk) except:
finally: (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
self.lock.release() print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.trigger.send ('x') self.thunks = []
finally:
def handle_read (self): self.lock.release()
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
self.log_info(
'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo),
'error')
self.thunks = []
finally:
self.lock.release()
the_trigger = None the_trigger = None
class trigger_file: class trigger_file:
"A 'triggered' file object" "A 'triggered' file object"
buffer_size = 4096 buffer_size = 4096
def __init__ (self, parent): def __init__ (self, parent):
global the_trigger global the_trigger
if the_trigger is None: if the_trigger is None:
the_trigger = trigger() the_trigger = trigger()
self.parent = parent self.parent = parent
self.buffer = '' self.buffer = ''
def write (self, data): def write (self, data):
self.buffer = self.buffer + data self.buffer = self.buffer + data
if len(self.buffer) > self.buffer_size: if len(self.buffer) > self.buffer_size:
d, self.buffer = self.buffer, '' d, self.buffer = self.buffer, ''
the_trigger.pull_trigger ( the_trigger.pull_trigger (
lambda d=d,p=self.parent: p.push (d) lambda d=d,p=self.parent: p.push (d)
) )
def writeline (self, line): def writeline (self, line):
self.write (line+'\r\n') self.write (line+'\r\n')
def writelines (self, lines): def writelines (self, lines):
self.write ( self.write (
string.joinfields ( string.joinfields (
lines, lines,
'\r\n' '\r\n'
) + '\r\n' ) + '\r\n'
) )
def flush (self): def flush (self):
if self.buffer: if self.buffer:
d, self.buffer = self.buffer, '' d, self.buffer = self.buffer, ''
the_trigger.pull_trigger ( the_trigger.pull_trigger (
lambda p=self.parent,d=d: p.push (d) lambda p=self.parent,d=d: p.push (d)
) )
def softspace (self, *args): def softspace (self, *args):
pass pass
def close (self): def close (self):
# in a derived class, you may want to call trigger_close() instead. # in a derived class, you may want to call trigger_close() instead.
self.flush() self.flush()
self.parent = None self.parent = None
def trigger_close (self): def trigger_close (self):
d, self.buffer = self.buffer, '' d, self.buffer = self.buffer, ''
p, self.parent = self.parent, None p, self.parent = self.parent, None
the_trigger.pull_trigger ( the_trigger.pull_trigger (
lambda p=p,d=d: (p.push(d), p.close_when_done()) lambda p=p,d=d: (p.push(d), p.close_when_done())
) )
if __name__ == '__main__': if __name__ == '__main__':
import time import time
def thread_function (output_file, i, n): def thread_function (output_file, i, n):
print 'entering thread_function' print 'entering thread_function'
while n: while n:
time.sleep (5) time.sleep (5)
output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file)) output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
output_file.flush() output_file.flush()
n = n - 1 n = n - 1
output_file.close() output_file.close()
print 'exiting thread_function' print 'exiting thread_function'
class thread_parent (asynchat.async_chat): class thread_parent (asynchat.async_chat):
def __init__ (self, conn, addr): def __init__ (self, conn, addr):
self.addr = addr self.addr = addr
asynchat.async_chat.__init__ (self, conn) asynchat.async_chat.__init__ (self, conn)
self.set_terminator ('\r\n') self.set_terminator ('\r\n')
self.buffer = '' self.buffer = ''
self.count = 0 self.count = 0
def collect_incoming_data (self, data): def collect_incoming_data (self, data):
self.buffer = self.buffer + data self.buffer = self.buffer + data
def found_terminator (self): def found_terminator (self):
data, self.buffer = self.buffer, '' data, self.buffer = self.buffer, ''
if not data: if not data:
asyncore.close_all() asyncore.close_all()
print "done" print "done"
return return
n = string.atoi (string.split (data)[0]) n = string.atoi (string.split (data)[0])
tf = trigger_file (self) tf = trigger_file (self)
self.count = self.count + 1 self.count = self.count + 1
thread.start_new_thread (thread_function, (tf, self.count, n)) thread.start_new_thread (thread_function, (tf, self.count, n))
class thread_server (asyncore.dispatcher): class thread_server (asyncore.dispatcher):
def __init__ (self, family=socket.AF_INET, address=('', 9003)): def __init__ (self, family=socket.AF_INET, address=('', 9003)):
asyncore.dispatcher.__init__ (self) asyncore.dispatcher.__init__ (self)
self.create_socket (family, socket.SOCK_STREAM) self.create_socket (family, socket.SOCK_STREAM)
self.set_reuse_addr() self.set_reuse_addr()
self.bind (address) self.bind (address)
self.listen (5) self.listen (5)
def handle_accept (self): def handle_accept (self):
conn, addr = self.accept() conn, addr = self.accept()
tp = thread_parent (conn, addr) tp = thread_parent (conn, addr)
thread_server() thread_server()
#asyncore.loop(1.0, use_poll=1) #asyncore.loop(1.0, use_poll=1)
try: try:
asyncore.loop () asyncore.loop ()
except: except:
asyncore.close_all() asyncore.close_all()
# -*- Mode: Python; tab-width: 4 -*- # -*- Mode: Python; tab-width: 4 -*-
# $Id: asynchat.py,v 1.9 1999/07/19 17:43:47 amos Exp $ # $Id: asynchat.py,v 1.10 2000/01/14 02:35:56 amos Exp $
# Author: Sam Rushing <rushing@nightmare.com> # Author: Sam Rushing <rushing@nightmare.com>
# ====================================================================== # ======================================================================
...@@ -219,7 +219,7 @@ class async_chat (asyncore.dispatcher): ...@@ -219,7 +219,7 @@ class async_chat (asyncore.dispatcher):
def discard_buffers (self): def discard_buffers (self):
# Emergencies only! # Emergencies only!
self.ac_in_buffer = '' self.ac_in_buffer = ''
self.ac_out_buffer == '' self.ac_out_buffer = ''
while self.producer_fifo: while self.producer_fifo:
self.producer_fifo.pop() self.producer_fifo.pop()
......
# -*- Mode: Python; tab-width: 4 -*- # -*- Mode: Python; tab-width: 4 -*-
# $Id: asyncore.py,v 1.6 1999/07/26 07:06:36 amos Exp $ # $Id: asyncore.py,v 1.7 2000/01/14 02:35:56 amos Exp $
# Author: Sam Rushing <rushing@nightmare.com> # Author: Sam Rushing <rushing@nightmare.com>
# ====================================================================== # ======================================================================
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ====================================================================== # ======================================================================
import exceptions
import select import select
import socket import socket
import string import string
...@@ -41,60 +42,83 @@ if os.name == 'nt': ...@@ -41,60 +42,83 @@ if os.name == 'nt':
else: else:
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN
socket_map = {} try:
socket_map
except NameError:
socket_map = {}
class ExitNow (exceptions.Exception):
pass
DEBUG = 0
def poll (timeout=0.0): def poll (timeout=0.0):
global DEBUG
if socket_map: if socket_map:
r = []; w = []; e = [] r = []; w = []; e = []
for s in socket_map.keys(): for fd, obj in socket_map.items():
if s.readable(): if obj.readable():
r.append (s) r.append (fd)
if s.writable(): if obj.writable():
w.append (s) w.append (fd)
r,w,e = select.select (r,w,e, timeout)
(r,w,e) = select.select (r,w,e, timeout) if DEBUG:
print r,w,e
for x in r: for fd in r:
try: try:
x.handle_read_event() obj = socket_map[fd]
except: try:
x.handle_error() obj.handle_read_event()
for x in w: except ExitNow:
raise ExitNow
except:
obj.handle_error()
except KeyError:
pass
for fd in w:
try: try:
x.handle_write_event() obj = socket_map[fd]
except: try:
x.handle_error() obj.handle_write_event()
except ExitNow:
raise ExitNow
except:
obj.handle_error()
except KeyError:
pass
def poll2 (timeout=0.0): def poll2 (timeout=0.0):
import poll import poll
# timeout is in milliseconds # timeout is in milliseconds
timeout = int(timeout*1000) timeout = int(timeout*1000)
if socket_map: if socket_map:
fd_map = {}
for s in socket_map.keys():
fd_map[s.fileno()] = s
l = [] l = []
for fd, s in fd_map.items(): for fd, obj in socket_map.items():
flags = 0 flags = 0
if s.readable(): if obj.readable():
flags = poll.POLLIN flags = poll.POLLIN
if s.writable(): if obj.writable():
flags = flags | poll.POLLOUT flags = flags | poll.POLLOUT
if flags: if flags:
l.append (fd, flags) l.append (fd, flags)
r = poll.poll (l, timeout) r = poll.poll (l, timeout)
for fd, flags in r: for fd, flags in r:
s = fd_map[fd]
try: try:
if (flags & poll.POLLIN): obj = socket_map[fd]
s.handle_read_event() try:
if (flags & poll.POLLOUT): if (flags & poll.POLLIN):
s.handle_write_event() obj.handle_read_event()
if (flags & poll.POLLERR): if (flags & poll.POLLOUT):
s.handle_expt_event() obj.handle_write_event()
except: except ExitNow:
s.handle_error() raise ExitNow
except:
obj.handle_error()
except KeyError:
pass
def loop (timeout=30.0, use_poll=0): def loop (timeout=30.0, use_poll=0):
...@@ -143,23 +167,25 @@ class dispatcher: ...@@ -143,23 +167,25 @@ class dispatcher:
return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar) return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
def add_channel (self): def add_channel (self):
self.log_info ('adding channel %s' % self) #self.log_info ('adding channel %s' % self)
socket_map [self] = 1 socket_map [self._fileno] = self
def del_channel (self): def del_channel (self):
if socket_map.has_key (self): fd = self._fileno
self.log_info ('closing channel %d:%s' % (self.fileno(), self)) if socket_map.has_key (fd):
del socket_map [self] #self.log_info ('closing channel %d:%s' % (fd, self))
del socket_map [fd]
def create_socket (self, family, type): def create_socket (self, family, type):
self.family_and_type = family, type self.family_and_type = family, type
self.socket = socket.socket (family, type) self.socket = socket.socket (family, type)
self.socket.setblocking(0) self.socket.setblocking(0)
self._fileno = self.socket.fileno()
self.add_channel() self.add_channel()
def set_socket (self, socket): def set_socket (self, sock):
# This is done so we can be called safely from __init__ self.__dict__['socket'] = sock
self.__dict__['socket'] = socket self._fileno = sock.fileno()
self.add_channel() self.add_channel()
def set_reuse_addr (self): def set_reuse_addr (self):
...@@ -261,20 +287,19 @@ class dispatcher: ...@@ -261,20 +287,19 @@ class dispatcher:
# cheap inheritance, used to pass all other attribute # cheap inheritance, used to pass all other attribute
# references to the underlying socket object. # references to the underlying socket object.
# NOTE: this may be removed soon for performance reasons.
def __getattr__ (self, attr): def __getattr__ (self, attr):
return getattr (self.socket, attr) return getattr (self.socket, attr)
# log and log_info maybe overriden to provide more sophisitcated # log and log_info maybe overriden to provide more sophisitcated
# logging and warning methods. In general, log is for 'hit' logging # logging and warning methods. In general, log is for 'hit' logging
# and 'log_info' is for informational, warning and error logging. # and 'log_info' is for informational, warning and error logging.
def log (self, message): def log (self, message):
print 'log:', message sys.stderr.write ('log: %s\n' % str(message))
def log_info (self, message, type='info'): def log_info (self, message, type='info'):
if __debug__ or type != 'info': if __debug__ or type != 'info':
print '%s: %s' %(type, message) print '%s: %s' % (type, message)
def handle_read_event (self): def handle_read_event (self):
if self.accepting: if self.accepting:
...@@ -398,7 +423,7 @@ def compact_traceback (): ...@@ -398,7 +423,7 @@ def compact_traceback ():
def close_all (): def close_all ():
global socket_map global socket_map
for x in socket_map.keys(): for x in socket_map.values():
x.socket.close() x.socket.close()
socket_map.clear() socket_map.clear()
...@@ -449,6 +474,7 @@ if os.name == 'posix': ...@@ -449,6 +474,7 @@ if os.name == 'posix':
self.set_file (fd) self.set_file (fd)
def set_file (self, fd): def set_file (self, fd):
self._fileno = fd
self.socket = file_wrapper (fd) self.socket = file_wrapper (fd)
self.add_channel() self.add_channel()
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
# If you are interested in using this software in a commercial context, # If you are interested in using this software in a commercial context,
# or in purchasing support, please contact the author. # or in purchasing support, please contact the author.
RCS_ID = '$Id: ftp_server.py,v 1.8 1999/11/15 21:53:56 amos Exp $' RCS_ID = '$Id: ftp_server.py,v 1.9 2000/01/14 02:35:56 amos Exp $'
# An extensible, configurable, asynchronous FTP server. # An extensible, configurable, asynchronous FTP server.
# #
...@@ -336,6 +336,7 @@ class ftp_channel (asynchat.async_chat): ...@@ -336,6 +336,7 @@ class ftp_channel (asynchat.async_chat):
self.current_mode = t self.current_mode = t
self.respond ('200 Type set to %s.' % self.type_map[t]) self.respond ('200 Type set to %s.' % self.type_map[t])
def cmd_quit (self, line): def cmd_quit (self, line):
'terminate session' 'terminate session'
self.respond ('221 Goodbye.') self.respond ('221 Goodbye.')
...@@ -847,13 +848,16 @@ class xmit_channel (asynchat.async_chat): ...@@ -847,13 +848,16 @@ class xmit_channel (asynchat.async_chat):
self.channel = channel self.channel = channel
self.client_addr = client_addr self.client_addr = client_addr
asynchat.async_chat.__init__ (self) asynchat.async_chat.__init__ (self)
# def __del__ (self):
# print 'xmit_channel.__del__()'
def readable (self): def log (*args):
return 0
def log(self, *args):
pass pass
def readable (self):
return not self.connected
def writable (self): def writable (self):
return 1 return 1
...@@ -1054,13 +1058,7 @@ if os.name == 'posix': ...@@ -1054,13 +1058,7 @@ if os.name == 'posix':
self.log_info('FTP server shutting down. (received SIGINT)', 'warning') self.log_info('FTP server shutting down. (received SIGINT)', 'warning')
# close everything down on SIGINT. # close everything down on SIGINT.
# of course this should be a cleaner shutdown. # of course this should be a cleaner shutdown.
sm = socket.socket_map asyncore.close_all()
socket.socket_map = {}
for sock in sm.values():
try:
sock.close()
except:
pass
if __name__ == '__main__': if __name__ == '__main__':
test (sys.argv[1]) test (sys.argv[1])
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
# interested in using this software in a commercial context, or in # interested in using this software in a commercial context, or in
# purchasing support, please contact the author. # purchasing support, please contact the author.
RCS_ID = '$Id: http_server.py,v 1.13 1999/11/15 21:53:56 amos Exp $' RCS_ID = '$Id: http_server.py,v 1.14 2000/01/14 02:35:56 amos Exp $'
# python modules # python modules
import os import os
...@@ -269,7 +269,7 @@ class http_request: ...@@ -269,7 +269,7 @@ class http_request:
user_agent=self.get_header('user-agent') user_agent=self.get_header('user-agent')
if not user_agent: user_agent='' if not user_agent: user_agent=''
referer=self.get_header('referer') referer=self.get_header('referer')
if not referer: referer='' if not referer: referer=''
self.channel.server.logger.log ( self.channel.server.logger.log (
self.channel.addr[0], self.channel.addr[0],
' - - [%s] "%s" %d %d "%s" "%s"\n' % ( ' - - [%s] "%s" %d %d "%s" "%s"\n' % (
...@@ -385,7 +385,7 @@ class http_channel (asynchat.async_chat): ...@@ -385,7 +385,7 @@ class http_channel (asynchat.async_chat):
def kill_zombies (self): def kill_zombies (self):
now = int (time.time()) now = int (time.time())
for channel in asyncore.socket_map.keys(): for channel in asyncore.socket_map.values():
if channel.__class__ == self.__class__: if channel.__class__ == self.__class__:
if (now - channel.creation_time) > channel.zombie_timeout: if (now - channel.creation_time) > channel.zombie_timeout:
channel.close() channel.close()
...@@ -448,7 +448,7 @@ class http_channel (asynchat.async_chat): ...@@ -448,7 +448,7 @@ class http_channel (asynchat.async_chat):
# -------------------------------------------------- # --------------------------------------------------
# crack the request header # crack the request header
# -------------------------------------------------- # --------------------------------------------------
while lines and not lines[0]: while lines and not lines[0]:
# as per the suggestion of http-1.1 section 4.1, (and # as per the suggestion of http-1.1 section 4.1, (and
# Eric Parker <eparker@zyvex.com>), ignore a leading # Eric Parker <eparker@zyvex.com>), ignore a leading
...@@ -461,27 +461,18 @@ class http_channel (asynchat.async_chat): ...@@ -461,27 +461,18 @@ class http_channel (asynchat.async_chat):
return return
request = lines[0] request = lines[0]
try: command, uri, version = crack_request (request)
command, uri, version = crack_request (request)
except:
# deal with broken HTTP requests
try:
# maybe there were spaces in the URL
parts=string.split(request)
command, uri, version = crack_request(
'%s %s %s' % (parts[0], parts[1], parts[-1]))
except:
self.log_info('Bad HTTP request: %s' % request, 'error')
r = http_request (self, request,
None, None, None, join_headers(lines[1:]))
r.error(400)
return
header = join_headers (lines[1:]) header = join_headers (lines[1:])
r = http_request (self, request, command, uri, version, header) r = http_request (self, request, command, uri, version, header)
self.request_counter.increment() self.request_counter.increment()
self.server.total_requests.increment() self.server.total_requests.increment()
if command is None:
self.log_info('Bad HTTP request: %s' % request, 'error')
r.error(400)
return
# -------------------------------------------------- # --------------------------------------------------
# handler selection and dispatch # handler selection and dispatch
# -------------------------------------------------- # --------------------------------------------------
...@@ -612,7 +603,14 @@ class http_server (asyncore.dispatcher): ...@@ -612,7 +603,14 @@ class http_server (asyncore.dispatcher):
# accept. socketmodule.c:makesockaddr complains that the # accept. socketmodule.c:makesockaddr complains that the
# address family is unknown. We don't want the whole server # address family is unknown. We don't want the whole server
# to shut down because of this. # to shut down because of this.
self.log_info('Server accept() threw an exception', 'warning') self.log_info ('warning: server accept() threw an exception', 'warning')
return
except TypeError:
# unpack non-sequence. this can happen when a read event
# fires on a listening socket, but when we call accept()
# we get EWOULDBLOCK, so dispatcher.accept() returns None.
# Seen on FreeBSD3.
self.log_info ('warning: server accept() threw EWOULDBLOCK', 'warning')
return return
self.channel_class (self, conn, addr) self.channel_class (self, conn, addr)
...@@ -689,6 +687,8 @@ def crack_request (r): ...@@ -689,6 +687,8 @@ def crack_request (r):
else: else:
version = None version = None
return string.lower (REQUEST.group (1)), REQUEST.group(2), version return string.lower (REQUEST.group (1)), REQUEST.group(2), version
else:
return None, None, None
class fifo: class fifo:
def __init__ (self, list=None): def __init__ (self, list=None):
......
# -*- Mode: Python; tab-width: 4 -*- # -*- Mode: Python; tab-width: 4 -*-
# Author: Sam Rushing <rushing@nightmare.com> # Author: Sam Rushing <rushing@nightmare.com>
# #
# python REPL channel. # python REPL channel.
# #
RCS_ID = '$Id: monitor.py,v 1.5 1999/07/21 17:15:53 klm Exp $' RCS_ID = '$Id: monitor.py,v 1.6 2000/01/14 02:35:56 amos Exp $'
import md5 import md5
import socket import socket
import string import string
import sys import sys
import time import time
import traceback
VERSION = string.split(RCS_ID)[2] VERSION = string.split(RCS_ID)[2]
...@@ -23,330 +22,325 @@ from counter import counter ...@@ -23,330 +22,325 @@ from counter import counter
import producers import producers
class monitor_channel (asynchat.async_chat): class monitor_channel (asynchat.async_chat):
try_linemode = 1 try_linemode = 1
def __init__ (self, server, sock, addr): def __init__ (self, server, sock, addr):
asynchat.async_chat.__init__ (self, sock) asynchat.async_chat.__init__ (self, sock)
self.server = server self.server = server
self.addr = addr self.addr = addr
self.set_terminator ('\r\n') self.set_terminator ('\r\n')
self.data = '' self.data = ''
# local bindings specific to this channel # local bindings specific to this channel
self.local_env = {} self.local_env = sys.modules['__main__'].__dict__.copy()
self.push ('Python ' + sys.version + '\r\n') self.push ('Python ' + sys.version + '\r\n')
self.push (sys.copyright+'\r\n') self.push (sys.copyright+'\r\n')
self.push ('Welcome to %s\r\n' % self) self.push ('Welcome to %s\r\n' % self)
self.push ("[Hint: try 'from __main__ import *']\r\n") self.push ("[Hint: try 'from __main__ import *']\r\n")
self.prompt() self.prompt()
self.number = server.total_sessions.as_long() self.number = server.total_sessions.as_long()
self.line_counter = counter() self.line_counter = counter()
self.multi_line = [] self.multi_line = []
def handle_connect (self): def handle_connect (self):
# send IAC DO LINEMODE # send IAC DO LINEMODE
self.push ('\377\375\"') self.push ('\377\375\"')
def close (self): def close (self):
self.server.closed_sessions.increment() self.server.closed_sessions.increment()
asynchat.async_chat.close(self) asynchat.async_chat.close(self)
def prompt (self): def prompt (self):
self.push ('>>> ') self.push ('>>> ')
def collect_incoming_data (self, data): def collect_incoming_data (self, data):
self.data = self.data + data self.data = self.data + data
if len(self.data) > 1024: if len(self.data) > 1024:
# denial of service. # denial of service.
self.push ('BCNU\r\n') self.push ('BCNU\r\n')
self.close_when_done() self.close_when_done()
def found_terminator (self): def found_terminator (self):
line = self.clean_line (self.data) line = self.clean_line (self.data)
self.data = '' self.data = ''
self.line_counter.increment() self.line_counter.increment()
# check for special case inputs... # check for special case inputs...
if not line and not self.multi_line: if not line and not self.multi_line:
self.prompt() self.prompt()
return return
if line in ['\004', 'exit']: if line in ['\004', 'exit']:
self.push ('BCNU\r\n') self.push ('BCNU\r\n')
self.close_when_done() self.close_when_done()
return return
oldout = sys.stdout oldout = sys.stdout
olderr = sys.stderr olderr = sys.stderr
try: try:
p = output_producer(self, olderr) p = output_producer(self, olderr)
sys.stdout = p sys.stdout = p
sys.stderr = p sys.stderr = p
try: try:
# this is, of course, a blocking operation. # this is, of course, a blocking operation.
# if you wanted to thread this, you would have # if you wanted to thread this, you would have
# to synchronize, etc... and treat the output # to synchronize, etc... and treat the output
# like a pipe. Not Fun. # like a pipe. Not Fun.
# #
# try eval first. If that fails, try exec. If that fails, # try eval first. If that fails, try exec. If that fails,
# hurl. # hurl.
try: try:
if self.multi_line: if self.multi_line:
# oh, this is horrible... # oh, this is horrible...
raise SyntaxError raise SyntaxError
co = compile (line, repr(self), 'eval') co = compile (line, repr(self), 'eval')
result = eval (co, self.local_env) result = eval (co, self.local_env)
method = 'eval' method = 'eval'
if result is not None: if result is not None:
self.log_info(repr(result)) print repr(result)
print repr(result) self.local_env['_'] = result
self.local_env['_'] = result except SyntaxError:
except SyntaxError: try:
try: if self.multi_line:
if self.multi_line: if line and line[0] in [' ','\t']:
if line and line[0] in [' ','\t']: self.multi_line.append (line)
self.multi_line.append (line) self.push ('... ')
self.push ('... ') return
return else:
else: self.multi_line.append (line)
self.multi_line.append (line) line = string.join (self.multi_line, '\n')
line = string.join (self.multi_line, '\n') co = compile (line, repr(self), 'exec')
co = compile (line, repr(self), 'exec') self.multi_line = []
self.multi_line = [] else:
else: co = compile (line, repr(self), 'exec')
co = compile (line, repr(self), 'exec') except SyntaxError, why:
except SyntaxError, why: if why[0] == 'unexpected EOF while parsing':
if why[0] == 'unexpected EOF while parsing': self.push ('... ')
self.push ('... ') self.multi_line.append (line)
self.multi_line.append (line) return
return exec co in self.local_env
exec co in self.local_env method = 'exec'
method = 'exec' except:
except: method = 'exception'
method = 'exception' self.multi_line = []
self.multi_line = [] (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
t, v, tb = sys.exc_info() self.log_info('%s %s %s' %(t, v, tbinfo), 'warning')
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback() finally:
self.log_info('%s %s %s' %(t, v, tbinfo), 'warning') sys.stdout = oldout
traceback.print_exception(t, v, tb) sys.stderr = olderr
del tb self.log_info('%s:%s (%s)> %s' % (
finally: self.number,
sys.stdout = oldout self.line_counter,
sys.stderr = olderr method,
self.log_info('%s:%s (%s)> %s' % ( repr(line))
self.number, )
self.line_counter, self.push_with_producer (p)
method, self.prompt()
repr(line))
) # for now, we ignore any telnet option stuff sent to
self.push_with_producer (p) # us, and we process the backspace key ourselves.
self.prompt() # gee, it would be fun to write a full-blown line-editing
# environment, etc...
# for now, we ignore any telnet option stuff sent to def clean_line (self, line):
# us, and we process the backspace key ourselves. chars = []
# gee, it would be fun to write a full-blown line-editing for ch in line:
# environment, etc... oc = ord(ch)
def clean_line (self, line): if oc < 127:
chars = [] if oc in [8,177]:
for ch in line: # backspace
oc = ord(ch) chars = chars[:-1]
if oc < 127: else:
if oc in [8,177]: chars.append (ch)
# backspace return string.join (chars, '')
chars = chars[:-1]
else:
chars.append (ch)
return string.join (chars, '')
class monitor_server (asyncore.dispatcher): class monitor_server (asyncore.dispatcher):
SERVER_IDENT = 'Monitor Server (V%s)' % VERSION SERVER_IDENT = 'Monitor Server (V%s)' % VERSION
channel_class = monitor_channel channel_class = monitor_channel
def __init__ (self, hostname='127.0.0.1', port=8023): def __init__ (self, hostname='127.0.0.1', port=8023):
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
self.create_socket (socket.AF_INET, socket.SOCK_STREAM) self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr() self.set_reuse_addr()
self.bind ((hostname, port)) self.bind ((hostname, port))
self.log_info('%s started on port %d' % (self.SERVER_IDENT, port)) self.log_info('%s started on port %d' % (self.SERVER_IDENT, port))
self.listen (5) self.listen (5)
self.closed = 0 self.closed = 0
self.failed_auths = 0 self.failed_auths = 0
self.total_sessions = counter() self.total_sessions = counter()
self.closed_sessions = counter() self.closed_sessions = counter()
def writable (self): def writable (self):
return 0 return 0
def handle_accept (self): def handle_accept (self):
conn, addr = self.accept() conn, addr = self.accept()
self.log_info('Incoming monitor connection from %s:%d' % addr) self.log_info('Incoming monitor connection from %s:%d' % addr)
self.channel_class (self, conn, addr) self.channel_class (self, conn, addr)
self.total_sessions.increment() self.total_sessions.increment()
def status (self): def status (self):
return producers.simple_producer ( return producers.simple_producer (
'<h2>%s</h2>' % self.SERVER_IDENT '<h2>%s</h2>' % self.SERVER_IDENT
+ '<br><b>Total Sessions:</b> %s' % self.total_sessions + '<br><b>Total Sessions:</b> %s' % self.total_sessions
+ '<br><b>Current Sessions:</b> %d' % ( + '<br><b>Current Sessions:</b> %d' % (
self.total_sessions.as_long()-self.closed_sessions.as_long() self.total_sessions.as_long()-self.closed_sessions.as_long()
) )
) )
def hex_digest (s): def hex_digest (s):
m = md5.md5() m = md5.md5()
m.update (s) m.update (s)
return string.joinfields ( return string.joinfields (
map (lambda x: hex (ord (x))[2:], map (None, m.digest())), map (lambda x: hex (ord (x))[2:], map (None, m.digest())),
'', '',
) )
class secure_monitor_channel (monitor_channel): class secure_monitor_channel (monitor_channel):
authorized = 0 authorized = 0
def __init__ (self, server, sock, addr): def __init__ (self, server, sock, addr):
asynchat.async_chat.__init__ (self, sock) asynchat.async_chat.__init__ (self, sock)
self.server = server self.server = server
self.addr = addr self.addr = addr
self.set_terminator ('\r\n') self.set_terminator ('\r\n')
self.data = '' self.data = ''
# local bindings specific to this channel # local bindings specific to this channel
self.local_env = {} self.local_env = {}
# send timestamp string # send timestamp string
self.timestamp = str(time.time()) self.timestamp = str(time.time())
self.count = 0 self.count = 0
self.line_counter = counter() self.line_counter = counter()
self.number = int(server.total_sessions.as_long()) self.number = int(server.total_sessions.as_long())
self.multi_line = [] self.multi_line = []
self.push (self.timestamp + '\r\n') self.push (self.timestamp + '\r\n')
def found_terminator (self): def found_terminator (self):
if not self.authorized: if not self.authorized:
if hex_digest ('%s%s' % (self.timestamp, self.server.password)) != self.data: if hex_digest ('%s%s' % (self.timestamp, self.server.password)) != self.data:
self.log_info ('%s: failed authorization' % self, 'warning') self.log_info ('%s: failed authorization' % self, 'warning')
self.server.failed_auths = self.server.failed_auths + 1 self.server.failed_auths = self.server.failed_auths + 1
self.close() self.close()
else: else:
self.authorized = 1 self.authorized = 1
self.push ('Python ' + sys.version + '\r\n') self.push ('Python ' + sys.version + '\r\n')
self.push (sys.copyright+'\r\n') self.push (sys.copyright+'\r\n')
self.push ('Welcome to %s\r\n' % self) self.push ('Welcome to %s\r\n' % self)
self.push ("[Hint: try 'from __main__ import *']\r\n") self.prompt()
self.prompt() self.data = ''
self.data = '' else:
else: monitor_channel.found_terminator (self)
monitor_channel.found_terminator (self)
class secure_encrypted_monitor_channel (secure_monitor_channel): class secure_encrypted_monitor_channel (secure_monitor_channel):
"Wrap send() and recv() with a stream cipher" "Wrap send() and recv() with a stream cipher"
def __init__ (self, server, conn, addr): def __init__ (self, server, conn, addr):
key = server.password key = server.password
self.outgoing = server.cipher.new (key) self.outgoing = server.cipher.new (key)
self.incoming = server.cipher.new (key) self.incoming = server.cipher.new (key)
secure_monitor_channel.__init__ (self, server, conn, addr) secure_monitor_channel.__init__ (self, server, conn, addr)
def send (self, data): def send (self, data):
# send the encrypted data instead # send the encrypted data instead
ed = self.outgoing.encrypt (data) ed = self.outgoing.encrypt (data)
return secure_monitor_channel.send (self, ed) return secure_monitor_channel.send (self, ed)
def recv (self, block_size): def recv (self, block_size):
data = secure_monitor_channel.recv (self, block_size) data = secure_monitor_channel.recv (self, block_size)
if data: if data:
dd = self.incoming.decrypt (data) dd = self.incoming.decrypt (data)
return dd return dd
else: else:
return data return data
class secure_monitor_server (monitor_server): class secure_monitor_server (monitor_server):
channel_class = secure_monitor_channel channel_class = secure_monitor_channel
def __init__ (self, password, hostname='', port=8023): def __init__ (self, password, hostname='', port=8023):
monitor_server.__init__ (self, hostname, port) monitor_server.__init__ (self, hostname, port)
self.password = password self.password = password
def status (self): def status (self):
p = monitor_server.status (self) p = monitor_server.status (self)
# kludge # kludge
p.data = p.data + ('<br><b>Failed Authorizations:</b> %d' % self.failed_auths) p.data = p.data + ('<br><b>Failed Authorizations:</b> %d' % self.failed_auths)
return p return p
# don't try to print from within any of the methods # don't try to print from within any of the methods
# of this object. 8^) # of this object. 8^)
class output_producer: class output_producer:
def __init__ (self, channel, real_stderr): def __init__ (self, channel, real_stderr):
self.channel = channel self.channel = channel
self.data = '' self.data = ''
# use _this_ for debug output # use _this_ for debug output
self.stderr = real_stderr self.stderr = real_stderr
def check_data (self): def check_data (self):
if len(self.data) > 1<<16: if len(self.data) > 1<<16:
# runaway output, close it. # runaway output, close it.
self.channel.close() self.channel.close()
def write (self, data): def write (self, data):
lines = string.splitfields (data, '\n') lines = string.splitfields (data, '\n')
data = string.join (lines, '\r\n') data = string.join (lines, '\r\n')
self.data = self.data + data self.data = self.data + data
self.check_data() self.check_data()
def writeline (self, line): def writeline (self, line):
self.data = self.data + line + '\r\n' self.data = self.data + line + '\r\n'
self.check_data() self.check_data()
def writelines (self, lines): def writelines (self, lines):
self.data = self.data + string.joinfields ( self.data = self.data + string.joinfields (
lines, lines,
'\r\n' '\r\n'
) + '\r\n' ) + '\r\n'
self.check_data() self.check_data()
def ready (self): def ready (self):
return (len (self.data) > 0) return (len (self.data) > 0)
def flush (self): def flush (self):
pass pass
def softspace (self, *args): def softspace (self, *args):
pass pass
def more (self): def more (self):
if self.data: if self.data:
result = self.data[:512] result = self.data[:512]
self.data = self.data[512:] self.data = self.data[512:]
return result return result
else: else:
return '' return ''
if __name__ == '__main__': if __name__ == '__main__':
import string import string
import sys import sys
if '-s' in sys.argv: if '-s' in sys.argv:
sys.argv.remove ('-s') sys.argv.remove ('-s')
print 'Enter password: ', print 'Enter password: ',
password = raw_input() password = raw_input()
else: else:
password = None password = None
if '-e' in sys.argv: if '-e' in sys.argv:
sys.argv.remove ('-e') sys.argv.remove ('-e')
encrypt = 1 encrypt = 1
else: else:
encrypt = 0 encrypt = 0
print sys.argv print sys.argv
if len(sys.argv) > 1: if len(sys.argv) > 1:
port = string.atoi (sys.argv[1]) port = string.atoi (sys.argv[1])
else: else:
port = 8023 port = 8023
if password is not None: if password is not None:
s = secure_monitor_server (password, '', port) s = secure_monitor_server (password, '', port)
if encrypt: if encrypt:
s.channel_class = secure_encrypted_monitor_channel s.channel_class = secure_encrypted_monitor_channel
import sapphire import sapphire
s.cipher = sapphire s.cipher = sapphire
else: else:
s = monitor_server ('', port) s = monitor_server ('', port)
asyncore.loop() asyncore.loop()
...@@ -60,7 +60,7 @@ class monitor_client (asynchat.async_chat): ...@@ -60,7 +60,7 @@ class monitor_client (asynchat.async_chat):
def handle_close (self): def handle_close (self):
# close all the channels, which will make the standard main # close all the channels, which will make the standard main
# loop exit. # loop exit.
map (lambda x: x.close(), asyncore.socket_map.keys()) map (lambda x: x.close(), asyncore.socket_map.values())
def log (self, *ignore): def log (self, *ignore):
pass pass
......
# -*- Mode: Python; tab-width: 4 -*- # -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING = "$Id: select_trigger.py,v 1.8 1999/10/29 15:11:58 brian Exp $" VERSION_STRING = "$Id: select_trigger.py,v 1.9 2000/01/14 02:35:56 amos Exp $"
import asyncore import asyncore
import asynchat import asynchat
...@@ -9,275 +9,259 @@ import os ...@@ -9,275 +9,259 @@ import os
import socket import socket
import string import string
import thread import thread
if os.name == 'posix': if os.name == 'posix':
class trigger (asyncore.file_dispatcher): class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread" "Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O # This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another # subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to # thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have # select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns. # to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately # If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the # generate a READ event on the trigger object, which will force the
# select() invocation to return. # select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a # A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a # large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is # thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is # acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently # accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries # by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data # rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections] # to low-bandwidth connections]
# The other major feature provided by this class is the ability to # The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger() # move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the # with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main # event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around # purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see # Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some # why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that # new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some] # the main thread is trying to remove some]
def __init__ (self): def __init__ (self):
r, w = os.pipe() r, w = os.pipe()
self.trigger = w self.trigger = w
asyncore.file_dispatcher.__init__ (self, r) asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock() self.lock = thread.allocate_lock()
self.thunks = [] self.thunks = []
def __repr__ (self): def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self) return '<select-trigger (pipe) at %x>' % id(self)
def readable (self): def readable (self):
return 1 return 1
def writable (self): def writable (self):
return 0 return 0
def handle_connect (self): def handle_connect (self):
pass pass
def pull_trigger (self, thunk=None): def pull_trigger (self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks) # print 'PULL_TRIGGER: ', len(self.thunks)
if thunk: if thunk:
try: try:
self.lock.acquire() self.lock.acquire()
self.thunks.append (thunk) self.thunks.append (thunk)
finally: finally:
self.lock.release() self.lock.release()
os.write (self.trigger, 'x') os.write (self.trigger, 'x')
def handle_read (self): def handle_read (self):
self.recv (8192) self.recv (8192)
try: try:
self.lock.acquire() self.lock.acquire()
for thunk in self.thunks: for thunk in self.thunks:
try: try:
thunk() thunk()
except: except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback() (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
self.log_info( print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo), self.thunks = []
'error') finally:
self.thunks = [] self.lock.release()
finally:
self.lock.release()
else: else:
class trigger (asyncore.dispatcher): # win32-safe version
address = ('127.9.9.9', 19999) class trigger (asyncore.dispatcher):
def __init__ (self): address = ('127.9.9.9', 19999)
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM) def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
# tricky: get a pair of connected sockets a.bind (self.address)
host='127.9.9.9' a.listen (1)
port=19999 w.setblocking (0)
while 1: try:
try: w.connect (self.address)
self.address=(host, port) except:
a.bind(self.address) pass
break r, addr = a.accept()
except: a.close()
if port <= 19950: w.setblocking (1)
raise 'Bind Error', 'Cannot bind trigger!' self.trigger = w
port=port - 1
asyncore.dispatcher.__init__ (self, r)
a.listen (1) self.lock = thread.allocate_lock()
w.setblocking (0) self.thunks = []
try: self._trigger_connected = 0
w.connect (self.address)
except: def __repr__ (self):
pass return '<select-trigger (loopback) at %x>' % id(self)
r, addr = a.accept()
a.close() def readable (self):
w.setblocking (1) return 1
self.trigger = w
def writable (self):
asyncore.dispatcher.__init__ (self, r) return 0
self.lock = thread.allocate_lock()
self.thunks = [] def handle_connect (self):
self._trigger_connected = 0 pass
def __repr__ (self): def pull_trigger (self, thunk=None):
return '<select-trigger (loopback) at %x>' % id(self) if thunk:
try:
def readable (self): self.lock.acquire()
return 1 self.thunks.append (thunk)
finally:
def writable (self): self.lock.release()
return 0 self.trigger.send ('x')
def handle_connect (self): def handle_read (self):
pass self.recv (8192)
try:
def pull_trigger (self, thunk=None): self.lock.acquire()
if thunk: for thunk in self.thunks:
try: try:
self.lock.acquire() thunk()
self.thunks.append (thunk) except:
finally: (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
self.lock.release() print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.trigger.send ('x') self.thunks = []
finally:
def handle_read (self): self.lock.release()
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
self.log_info(
'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo),
'error')
self.thunks = []
finally:
self.lock.release()
the_trigger = None the_trigger = None
class trigger_file: class trigger_file:
"A 'triggered' file object" "A 'triggered' file object"
buffer_size = 4096 buffer_size = 4096
def __init__ (self, parent): def __init__ (self, parent):
global the_trigger global the_trigger
if the_trigger is None: if the_trigger is None:
the_trigger = trigger() the_trigger = trigger()
self.parent = parent self.parent = parent
self.buffer = '' self.buffer = ''
def write (self, data): def write (self, data):
self.buffer = self.buffer + data self.buffer = self.buffer + data
if len(self.buffer) > self.buffer_size: if len(self.buffer) > self.buffer_size:
d, self.buffer = self.buffer, '' d, self.buffer = self.buffer, ''
the_trigger.pull_trigger ( the_trigger.pull_trigger (
lambda d=d,p=self.parent: p.push (d) lambda d=d,p=self.parent: p.push (d)
) )
def writeline (self, line): def writeline (self, line):
self.write (line+'\r\n') self.write (line+'\r\n')
def writelines (self, lines): def writelines (self, lines):
self.write ( self.write (
string.joinfields ( string.joinfields (
lines, lines,
'\r\n' '\r\n'
) + '\r\n' ) + '\r\n'
) )
def flush (self): def flush (self):
if self.buffer: if self.buffer:
d, self.buffer = self.buffer, '' d, self.buffer = self.buffer, ''
the_trigger.pull_trigger ( the_trigger.pull_trigger (
lambda p=self.parent,d=d: p.push (d) lambda p=self.parent,d=d: p.push (d)
) )
def softspace (self, *args): def softspace (self, *args):
pass pass
def close (self): def close (self):
# in a derived class, you may want to call trigger_close() instead. # in a derived class, you may want to call trigger_close() instead.
self.flush() self.flush()
self.parent = None self.parent = None
def trigger_close (self): def trigger_close (self):
d, self.buffer = self.buffer, '' d, self.buffer = self.buffer, ''
p, self.parent = self.parent, None p, self.parent = self.parent, None
the_trigger.pull_trigger ( the_trigger.pull_trigger (
lambda p=p,d=d: (p.push(d), p.close_when_done()) lambda p=p,d=d: (p.push(d), p.close_when_done())
) )
if __name__ == '__main__': if __name__ == '__main__':
import time import time
def thread_function (output_file, i, n): def thread_function (output_file, i, n):
print 'entering thread_function' print 'entering thread_function'
while n: while n:
time.sleep (5) time.sleep (5)
output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file)) output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
output_file.flush() output_file.flush()
n = n - 1 n = n - 1
output_file.close() output_file.close()
print 'exiting thread_function' print 'exiting thread_function'
class thread_parent (asynchat.async_chat): class thread_parent (asynchat.async_chat):
def __init__ (self, conn, addr): def __init__ (self, conn, addr):
self.addr = addr self.addr = addr
asynchat.async_chat.__init__ (self, conn) asynchat.async_chat.__init__ (self, conn)
self.set_terminator ('\r\n') self.set_terminator ('\r\n')
self.buffer = '' self.buffer = ''
self.count = 0 self.count = 0
def collect_incoming_data (self, data): def collect_incoming_data (self, data):
self.buffer = self.buffer + data self.buffer = self.buffer + data
def found_terminator (self): def found_terminator (self):
data, self.buffer = self.buffer, '' data, self.buffer = self.buffer, ''
if not data: if not data:
asyncore.close_all() asyncore.close_all()
print "done" print "done"
return return
n = string.atoi (string.split (data)[0]) n = string.atoi (string.split (data)[0])
tf = trigger_file (self) tf = trigger_file (self)
self.count = self.count + 1 self.count = self.count + 1
thread.start_new_thread (thread_function, (tf, self.count, n)) thread.start_new_thread (thread_function, (tf, self.count, n))
class thread_server (asyncore.dispatcher): class thread_server (asyncore.dispatcher):
def __init__ (self, family=socket.AF_INET, address=('', 9003)): def __init__ (self, family=socket.AF_INET, address=('', 9003)):
asyncore.dispatcher.__init__ (self) asyncore.dispatcher.__init__ (self)
self.create_socket (family, socket.SOCK_STREAM) self.create_socket (family, socket.SOCK_STREAM)
self.set_reuse_addr() self.set_reuse_addr()
self.bind (address) self.bind (address)
self.listen (5) self.listen (5)
def handle_accept (self): def handle_accept (self):
conn, addr = self.accept() conn, addr = self.accept()
tp = thread_parent (conn, addr) tp = thread_parent (conn, addr)
thread_server() thread_server()
#asyncore.loop(1.0, use_poll=1) #asyncore.loop(1.0, use_poll=1)
try: try:
asyncore.loop () asyncore.loop ()
except: except:
asyncore.close_all() asyncore.close_all()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment