Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
Zope
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
Zope
Commits
6049e415
Commit
6049e415
authored
Dec 13, 2001
by
Brian Lloyd
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Merged asyncore / asynchat fixes from 2.5 branch.
parent
78d70b09
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
1292 additions
and
174 deletions
+1292
-174
ZServer/__init__.py
ZServer/__init__.py
+15
-2
ZServer/medusa/asynchat.py
ZServer/medusa/asynchat.py
+86
-85
ZServer/medusa/asyncore.py
ZServer/medusa/asyncore.py
+545
-0
lib/python/ZServer/__init__.py
lib/python/ZServer/__init__.py
+15
-2
lib/python/ZServer/medusa/asynchat.py
lib/python/ZServer/medusa/asynchat.py
+86
-85
lib/python/ZServer/medusa/asyncore.py
lib/python/ZServer/medusa/asyncore.py
+545
-0
No files found.
ZServer/__init__.py
View file @
6049e415
...
...
@@ -11,10 +11,23 @@
#
##############################################################################
import
sys
,
os
from
medusa.test
import
max_sockets
import
sys
,
os
# HACKERY to get around asyncore issues. This ought to go away! We're
# currently using the Python 2.2 asyncore bundled with Zope to override
# brokenness in the Python 2.1 version. We need to do some funny business
# to make this work, as a 2.2-ism crept into the asyncore code.
import
fcntl
,
FCNTL
if
not
hasattr
(
fcntl
,
'F_GETFL'
):
fcntl
.
F_GETFL
=
FCNTL
.
F_GETFL
fcntl
.
F_SETFL
=
FCNTL
.
F_SETFL
from
medusa
import
asyncore
sys
.
modules
[
'asyncore'
]
=
asyncore
from
medusa.test
import
max_sockets
CONNECTION_LIMIT
=
max_sockets
.
max_select_sockets
()
ZSERVER_VERSION
=
'1.1b1'
...
...
ZServer/medusa/asynchat.py
View file @
6049e415
# -*- Mode: Python; tab-width: 4 -*-
#
$Id: asynchat.py,v 1.17 2001/05/01 11:44:48 andreas Exp $
#
Author: Sam Rushing <rushing@nightmare.com>
#
Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
#
Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
#
# All Rights Reserved
#
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
...
...
@@ -15,7 +15,7 @@
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
...
...
@@ -25,7 +25,7 @@
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
"""A class supporting chat-style (command/response) protocols.
r
"""A class supporting chat-style (command/response) protocols.
This class adds support for 'chat' style protocols - where one side
sends a 'command', and the other sends a response (examples would be
...
...
@@ -48,59 +48,58 @@ you - by calling your self.found_terminator() method.
import
socket
import
asyncore
import
string
class
async_chat
(
asyncore
.
dispatcher
):
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
# these are overridable defaults
ac_in_buffer_size
=
4096
ac_out_buffer_size
=
4096
ac_in_buffer_size
=
4096
ac_out_buffer_size
=
4096
def
__init__
(
self
,
conn
=
None
):
self
.
ac_in_buffer
=
''
self
.
ac_out_buffer
=
''
self
.
producer_fifo
=
fifo
()
asyncore
.
dispatcher
.
__init__
(
self
,
conn
)
def
set_terminator
(
self
,
term
):
"Set the input delimiter. Can be a fixed string of any length, an integer, or None"
self
.
terminator
=
term
def
get_terminator
(
self
):
return
self
.
terminator
# grab some more data from the socket,
# throw it to the collector method,
# check for the terminator,
# if found, transition to the next state.
# grab some more data from the socket,
# throw it to the collector method,
# check for the terminator,
# if found, transition to the next state.
def
handle_read
(
self
):
try
:
data
=
self
.
recv
(
self
.
ac_in_buffer_size
)
except
socket
.
error
,
why
:
self
.
handle_error
()
return
self
.
ac_in_buffer
=
self
.
ac_in_buffer
+
data
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
# combos with a single recv(1024).
while
self
.
ac_in_buffer
:
lb
=
len
(
self
.
ac_in_buffer
)
terminator
=
self
.
get_terminator
()
if
terminator
is
None
:
# no terminator, collect it all
# no terminator, collect it all
self
.
collect_incoming_data
(
self
.
ac_in_buffer
)
self
.
ac_in_buffer
=
''
elif
type
(
terminator
)
==
type
(
0
):
# numeric terminator
# numeric terminator
n
=
terminator
if
lb
<
n
:
self
.
collect_incoming_data
(
self
.
ac_in_buffer
)
...
...
@@ -112,71 +111,71 @@ class async_chat (asyncore.dispatcher):
self
.
terminator
=
0
self
.
found_terminator
()
else
:
# 3 cases:
# 1) end of buffer matches terminator exactly:
# collect data, transition
# 2) end of buffer matches some prefix:
# collect data to the prefix
# 3) end of buffer does not match any prefix:
# collect data
# 3 cases:
# 1) end of buffer matches terminator exactly:
# collect data, transition
# 2) end of buffer matches some prefix:
# collect data to the prefix
# 3) end of buffer does not match any prefix:
# collect data
terminator_len
=
len
(
terminator
)
index
=
s
tring
.
find
(
self
.
ac_in_buffer
,
terminator
)
index
=
s
elf
.
ac_in_buffer
.
find
(
terminator
)
if
index
!=
-
1
:
# we found the terminator
# we found the terminator
if
index
>
0
:
# don't bother reporting the empty string (source of subtle bugs)
# don't bother reporting the empty string (source of subtle bugs)
self
.
collect_incoming_data
(
self
.
ac_in_buffer
[:
index
])
self
.
ac_in_buffer
=
self
.
ac_in_buffer
[
index
+
terminator_len
:]
# This does the Right Thing if the terminator is changed here.
self
.
found_terminator
()
else
:
# check for a prefix of the terminator
# check for a prefix of the terminator
index
=
find_prefix_at_end
(
self
.
ac_in_buffer
,
terminator
)
if
index
:
if
index
!=
lb
:
# we found a prefix, collect up to the prefix
# we found a prefix, collect up to the prefix
self
.
collect_incoming_data
(
self
.
ac_in_buffer
[:
-
index
])
self
.
ac_in_buffer
=
self
.
ac_in_buffer
[
-
index
:]
break
else
:
# no prefix, collect it all
# no prefix, collect it all
self
.
collect_incoming_data
(
self
.
ac_in_buffer
)
self
.
ac_in_buffer
=
''
def
handle_write
(
self
):
self
.
initiate_send
()
def
handle_close
(
self
):
self
.
close
()
def
push
(
self
,
data
):
self
.
producer_fifo
.
push
(
simple_producer
(
data
))
self
.
initiate_send
()
def
push_with_producer
(
self
,
producer
):
self
.
producer_fifo
.
push
(
producer
)
self
.
initiate_send
()
def
readable
(
self
):
"predicate for inclusion in the readable for select()"
return
(
len
(
self
.
ac_in_buffer
)
<=
self
.
ac_in_buffer_size
)
def
writable
(
self
):
"predicate for inclusion in the writable for select()"
# return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
# this is about twice as fast, though not as clear.
return
not
(
(
self
.
ac_out_buffer
is
''
)
and
(
self
.
ac_out_buffer
==
''
)
and
self
.
producer_fifo
.
is_empty
()
and
self
.
connected
)
def
close_when_done
(
self
):
"automatically close this channel once the outgoing queue is empty"
self
.
producer_fifo
.
push
(
None
)
# refill the outgoing buffer by calling the more() method
# of the first producer in the queue
# refill the outgoing buffer by calling the more() method
# of the first producer in the queue
def
refill_buffer
(
self
):
_string_type
=
type
(
''
)
while
1
:
...
...
@@ -201,38 +200,38 @@ class async_chat (asyncore.dispatcher):
self
.
producer_fifo
.
pop
()
else
:
return
def
initiate_send
(
self
):
obs
=
self
.
ac_out_buffer_size
# try to refill the buffer
if
(
len
(
self
.
ac_out_buffer
)
<
obs
):
self
.
refill_buffer
()
if
self
.
ac_out_buffer
and
self
.
connected
:
# try to send the buffer
# try to send the buffer
try
:
num_sent
=
self
.
send
(
self
.
ac_out_buffer
[:
obs
])
if
num_sent
:
self
.
ac_out_buffer
=
self
.
ac_out_buffer
[
num_sent
:]
except
socket
.
error
,
why
:
self
.
handle_error
()
return
def
discard_buffers
(
self
):
# Emergencies only!
# Emergencies only!
self
.
ac_in_buffer
=
''
self
.
ac_out_buffer
=
''
while
self
.
producer_fifo
:
self
.
producer_fifo
.
pop
()
class
simple_producer
:
def
__init__
(
self
,
data
,
buffer_size
=
512
):
self
.
data
=
data
self
.
buffer_size
=
buffer_size
def
more
(
self
):
if
len
(
self
.
data
)
>
self
.
buffer_size
:
result
=
self
.
data
[:
self
.
buffer_size
]
...
...
@@ -242,26 +241,26 @@ class simple_producer:
result
=
self
.
data
self
.
data
=
''
return
result
class
fifo
:
def
__init__
(
self
,
list
=
None
):
if
not
list
:
self
.
list
=
[]
else
:
self
.
list
=
list
def
__len__
(
self
):
return
len
(
self
.
list
)
def
is_empty
(
self
):
return
self
.
list
==
[]
def
first
(
self
):
return
self
.
list
[
0
]
def
push
(
self
,
data
):
self
.
list
.
append
(
data
)
def
pop
(
self
):
if
self
.
list
:
result
=
self
.
list
[
0
]
...
...
@@ -269,24 +268,26 @@ class fifo:
return
(
1
,
result
)
else
:
return
(
0
,
None
)
# Given 'haystack', see if any prefix of 'needle' is at its end. This
# assumes an exact match has already been checked. Return the number of
# characters matched.
# for example:
# f_p_a_e ("qwerty\r", "\r\n") => 1
# f_p_a_e ("qwertydkjf", "\r\n") => 0
# f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
# this could maybe be made faster with a computed regex?
# [answer: no; circa Python-2.0, Jan 2001]
# new python: 28961/s
# old python: 18307/s
# re: 12820/s
# regex: 14035/s
# Given 'haystack', see if any prefix of 'needle' is at its end. This
# assumes an exact match has already been checked. Return the number of
# characters matched.
# for example:
# f_p_a_e ("qwerty\r", "\r\n") => 1
# f_p_a_e ("qwerty\r\n", "\r\n") => 2
# f_p_a_e ("qwertydkjf", "\r\n") => 0
# this could maybe be made faster with a computed regex?
# [answer: no; circa Python-2.0, Jan 2001]
# python: 18307/s
# re: 12820/s
# regex: 14035/s
def
find_prefix_at_end
(
haystack
,
needle
):
l
=
len
(
needle
)
-
1
while
l
and
not
haystack
.
endswith
(
needle
[:
l
]):
l
-=
1
return
l
nl
=
len
(
needle
)
result
=
0
for
i
in
range
(
1
,
nl
):
if
haystack
[
-
(
nl
-
i
):]
==
needle
[:(
nl
-
i
)]:
result
=
nl
-
i
break
return
result
ZServer/medusa/asyncore.py
0 → 100644
View file @
6049e415
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
"""Basic infrastructure for asynchronous socket service clients and servers.
There are only two ways to have a program on a single processor do "more
than one thing at a time". Multi-threaded programming is the simplest and
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
rarely CPU-bound, however.
If your operating system supports the select() system call in its I/O
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background." Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
"""
import
exceptions
import
select
import
socket
import
sys
import
os
from
errno
import
EALREADY
,
EINPROGRESS
,
EWOULDBLOCK
,
ECONNRESET
,
\
ENOTCONN
,
ESHUTDOWN
,
EINTR
,
EISCONN
try
:
socket_map
except
NameError
:
socket_map
=
{}
class
ExitNow
(
exceptions
.
Exception
):
pass
DEBUG
=
0
def
poll
(
timeout
=
0.0
,
map
=
None
):
if
map
is
None
:
map
=
socket_map
if
map
:
r
=
[];
w
=
[];
e
=
[]
for
fd
,
obj
in
map
.
items
():
if
obj
.
readable
():
r
.
append
(
fd
)
if
obj
.
writable
():
w
.
append
(
fd
)
try
:
r
,
w
,
e
=
select
.
select
(
r
,
w
,
e
,
timeout
)
except
select
.
error
,
err
:
if
err
[
0
]
!=
EINTR
:
raise
if
DEBUG
:
print
r
,
w
,
e
for
fd
in
r
:
try
:
obj
=
map
[
fd
]
except
KeyError
:
continue
try
:
obj
.
handle_read_event
()
except
ExitNow
:
raise
ExitNow
except
:
obj
.
handle_error
()
for
fd
in
w
:
try
:
obj
=
map
[
fd
]
except
KeyError
:
continue
try
:
obj
.
handle_write_event
()
except
ExitNow
:
raise
ExitNow
except
:
obj
.
handle_error
()
def
poll2
(
timeout
=
0.0
,
map
=
None
):
import
poll
if
map
is
None
:
map
=
socket_map
if
timeout
is
not
None
:
# timeout is in milliseconds
timeout
=
int
(
timeout
*
1000
)
if
map
:
l
=
[]
for
fd
,
obj
in
map
.
items
():
flags
=
0
if
obj
.
readable
():
flags
=
poll
.
POLLIN
if
obj
.
writable
():
flags
=
flags
|
poll
.
POLLOUT
if
flags
:
l
.
append
((
fd
,
flags
))
r
=
poll
.
poll
(
l
,
timeout
)
for
fd
,
flags
in
r
:
try
:
obj
=
map
[
fd
]
except
KeyError
:
continue
try
:
if
(
flags
&
poll
.
POLLIN
):
obj
.
handle_read_event
()
if
(
flags
&
poll
.
POLLOUT
):
obj
.
handle_write_event
()
except
ExitNow
:
raise
ExitNow
except
:
obj
.
handle_error
()
def
poll3
(
timeout
=
0.0
,
map
=
None
):
# Use the poll() support added to the select module in Python 2.0
if
map
is
None
:
map
=
socket_map
if
timeout
is
not
None
:
# timeout is in milliseconds
timeout
=
int
(
timeout
*
1000
)
pollster
=
select
.
poll
()
if
map
:
for
fd
,
obj
in
map
.
items
():
flags
=
0
if
obj
.
readable
():
flags
=
select
.
POLLIN
if
obj
.
writable
():
flags
=
flags
|
select
.
POLLOUT
if
flags
:
pollster
.
register
(
fd
,
flags
)
try
:
r
=
pollster
.
poll
(
timeout
)
except
select
.
error
,
err
:
if
err
[
0
]
!=
EINTR
:
raise
r
=
[]
for
fd
,
flags
in
r
:
try
:
obj
=
map
[
fd
]
except
KeyError
:
continue
try
:
if
(
flags
&
select
.
POLLIN
):
obj
.
handle_read_event
()
if
(
flags
&
select
.
POLLOUT
):
obj
.
handle_write_event
()
except
ExitNow
:
raise
ExitNow
except
:
obj
.
handle_error
()
def
loop
(
timeout
=
30.0
,
use_poll
=
0
,
map
=
None
):
if
map
is
None
:
map
=
socket_map
if
use_poll
:
if
hasattr
(
select
,
'poll'
):
poll_fun
=
poll3
else
:
poll_fun
=
poll2
else
:
poll_fun
=
poll
while
map
:
poll_fun
(
timeout
,
map
)
class
dispatcher
:
debug
=
0
connected
=
0
accepting
=
0
closing
=
0
addr
=
None
def
__init__
(
self
,
sock
=
None
,
map
=
None
):
if
sock
:
self
.
set_socket
(
sock
,
map
)
# I think it should inherit this anyway
self
.
socket
.
setblocking
(
0
)
self
.
connected
=
1
self
.
addr
=
sock
.
getpeername
()
else
:
self
.
socket
=
None
def
__repr__
(
self
):
status
=
[
self
.
__class__
.
__module__
+
"."
+
self
.
__class__
.
__name__
]
if
self
.
accepting
and
self
.
addr
:
status
.
append
(
'listening'
)
elif
self
.
connected
:
status
.
append
(
'connected'
)
if
self
.
addr
is
not
None
:
try
:
status
.
append
(
'%s:%d'
%
self
.
addr
)
except
TypeError
:
status
.
append
(
repr
(
self
.
addr
))
return
'<%s at %#x>'
%
(
' '
.
join
(
status
),
id
(
self
))
def
add_channel
(
self
,
map
=
None
):
#self.log_info ('adding channel %s' % self)
if
map
is
None
:
map
=
socket_map
map
[
self
.
_fileno
]
=
self
def
del_channel
(
self
,
map
=
None
):
fd
=
self
.
_fileno
if
map
is
None
:
map
=
socket_map
if
map
.
has_key
(
fd
):
#self.log_info ('closing channel %d:%s' % (fd, self))
del
map
[
fd
]
def
create_socket
(
self
,
family
,
type
):
self
.
family_and_type
=
family
,
type
self
.
socket
=
socket
.
socket
(
family
,
type
)
self
.
socket
.
setblocking
(
0
)
self
.
_fileno
=
self
.
socket
.
fileno
()
self
.
add_channel
()
def
set_socket
(
self
,
sock
,
map
=
None
):
self
.
socket
=
sock
## self.__dict__['socket'] = sock
self
.
_fileno
=
sock
.
fileno
()
self
.
add_channel
(
map
)
def
set_reuse_addr
(
self
):
# try to re-use a server port if possible
try
:
self
.
socket
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
self
.
socket
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
)
|
1
)
except
socket
.
error
:
pass
# ==================================================
# predicates for select()
# these are used as filters for the lists of sockets
# to pass to select().
# ==================================================
def
readable
(
self
):
return
1
if
os
.
name
==
'mac'
:
# The macintosh will select a listening socket for
# write if you let it. What might this mean?
def
writable
(
self
):
return
not
self
.
accepting
else
:
def
writable
(
self
):
return
1
# ==================================================
# socket object methods.
# ==================================================
def
listen
(
self
,
num
):
self
.
accepting
=
1
if
os
.
name
==
'nt'
and
num
>
5
:
num
=
1
return
self
.
socket
.
listen
(
num
)
def
bind
(
self
,
addr
):
self
.
addr
=
addr
return
self
.
socket
.
bind
(
addr
)
def
connect
(
self
,
address
):
self
.
connected
=
0
err
=
self
.
socket
.
connect_ex
(
address
)
if
err
in
(
EINPROGRESS
,
EALREADY
,
EWOULDBLOCK
):
return
if
err
in
(
0
,
EISCONN
):
self
.
addr
=
address
self
.
connected
=
1
self
.
handle_connect
()
else
:
raise
socket
.
error
,
err
def
accept
(
self
):
try
:
conn
,
addr
=
self
.
socket
.
accept
()
return
conn
,
addr
except
socket
.
error
,
why
:
if
why
[
0
]
==
EWOULDBLOCK
:
pass
else
:
raise
socket
.
error
,
why
def
send
(
self
,
data
):
try
:
result
=
self
.
socket
.
send
(
data
)
return
result
except
socket
.
error
,
why
:
if
why
[
0
]
==
EWOULDBLOCK
:
return
0
else
:
raise
socket
.
error
,
why
return
0
def
recv
(
self
,
buffer_size
):
try
:
data
=
self
.
socket
.
recv
(
buffer_size
)
if
not
data
:
# a closed connection is indicated by signaling
# a read condition, and having recv() return 0.
self
.
handle_close
()
return
''
else
:
return
data
except
socket
.
error
,
why
:
# winsock sometimes throws ENOTCONN
if
why
[
0
]
in
[
ECONNRESET
,
ENOTCONN
,
ESHUTDOWN
]:
self
.
handle_close
()
return
''
else
:
raise
socket
.
error
,
why
def
close
(
self
):
self
.
del_channel
()
self
.
socket
.
close
()
# cheap inheritance, used to pass all other attribute
# references to the underlying socket object.
def
__getattr__
(
self
,
attr
):
return
getattr
(
self
.
socket
,
attr
)
# log and log_info maybe overriden to provide more sophisitcated
# logging and warning methods. In general, log is for 'hit' logging
# and 'log_info' is for informational, warning and error logging.
def
log
(
self
,
message
):
sys
.
stderr
.
write
(
'log: %s
\
n
'
%
str
(
message
))
def
log_info
(
self
,
message
,
type
=
'info'
):
if
__debug__
or
type
!=
'info'
:
print
'%s: %s'
%
(
type
,
message
)
def
handle_read_event
(
self
):
if
self
.
accepting
:
# for an accepting socket, getting a read implies
# that we are connected
if
not
self
.
connected
:
self
.
connected
=
1
self
.
handle_accept
()
elif
not
self
.
connected
:
self
.
handle_connect
()
self
.
connected
=
1
self
.
handle_read
()
else
:
self
.
handle_read
()
def
handle_write_event
(
self
):
# getting a write implies that we are connected
if
not
self
.
connected
:
self
.
handle_connect
()
self
.
connected
=
1
self
.
handle_write
()
def
handle_expt_event
(
self
):
self
.
handle_expt
()
def
handle_error
(
self
):
nil
,
t
,
v
,
tbinfo
=
compact_traceback
()
# sometimes a user repr method will crash.
try
:
self_repr
=
repr
(
self
)
except
:
self_repr
=
'<__repr__ (self) failed for object at %0x>'
%
id
(
self
)
self
.
log_info
(
'uncaptured python exception, closing channel %s (%s:%s %s)'
%
(
self_repr
,
t
,
v
,
tbinfo
),
'error'
)
self
.
close
()
def
handle_expt
(
self
):
self
.
log_info
(
'unhandled exception'
,
'warning'
)
def
handle_read
(
self
):
self
.
log_info
(
'unhandled read event'
,
'warning'
)
def
handle_write
(
self
):
self
.
log_info
(
'unhandled write event'
,
'warning'
)
def
handle_connect
(
self
):
self
.
log_info
(
'unhandled connect event'
,
'warning'
)
def
handle_accept
(
self
):
self
.
log_info
(
'unhandled accept event'
,
'warning'
)
def
handle_close
(
self
):
self
.
log_info
(
'unhandled close event'
,
'warning'
)
self
.
close
()
# ---------------------------------------------------------------------------
# adds simple buffered output capability, useful for simple clients.
# [for more sophisticated usage use asynchat.async_chat]
# ---------------------------------------------------------------------------
class
dispatcher_with_send
(
dispatcher
):
def
__init__
(
self
,
sock
=
None
):
dispatcher
.
__init__
(
self
,
sock
)
self
.
out_buffer
=
''
def
initiate_send
(
self
):
num_sent
=
0
num_sent
=
dispatcher
.
send
(
self
,
self
.
out_buffer
[:
512
])
self
.
out_buffer
=
self
.
out_buffer
[
num_sent
:]
def
handle_write
(
self
):
self
.
initiate_send
()
def
writable
(
self
):
return
(
not
self
.
connected
)
or
len
(
self
.
out_buffer
)
def
send
(
self
,
data
):
if
self
.
debug
:
self
.
log_info
(
'sending %s'
%
repr
(
data
))
self
.
out_buffer
=
self
.
out_buffer
+
data
self
.
initiate_send
()
# ---------------------------------------------------------------------------
# used for debugging.
# ---------------------------------------------------------------------------
def
compact_traceback
():
t
,
v
,
tb
=
sys
.
exc_info
()
tbinfo
=
[]
while
1
:
tbinfo
.
append
((
tb
.
tb_frame
.
f_code
.
co_filename
,
tb
.
tb_frame
.
f_code
.
co_name
,
str
(
tb
.
tb_lineno
)
))
tb
=
tb
.
tb_next
if
not
tb
:
break
# just to be safe
del
tb
file
,
function
,
line
=
tbinfo
[
-
1
]
info
=
'['
+
'] ['
.
join
(
map
(
lambda
x
:
'|'
.
join
(
x
),
tbinfo
))
+
']'
return
(
file
,
function
,
line
),
t
,
v
,
info
def
close_all
(
map
=
None
):
if
map
is
None
:
map
=
socket_map
for
x
in
map
.
values
():
x
.
socket
.
close
()
map
.
clear
()
# Asynchronous File I/O:
#
# After a little research (reading man pages on various unixen, and
# digging through the linux kernel), I've determined that select()
# isn't meant for doing doing asynchronous file i/o.
# Heartening, though - reading linux/mm/filemap.c shows that linux
# supports asynchronous read-ahead. So _MOST_ of the time, the data
# will be sitting in memory for us already when we go to read it.
#
# What other OS's (besides NT) support async file i/o? [VMS?]
#
# Regardless, this is useful for pipes, and stdin/stdout...
import
os
if
os
.
name
==
'posix'
:
import
fcntl
class
file_wrapper
:
# here we override just enough to make a file
# look like a socket for the purposes of asyncore.
def
__init__
(
self
,
fd
):
self
.
fd
=
fd
def
recv
(
self
,
*
args
):
return
apply
(
os
.
read
,
(
self
.
fd
,)
+
args
)
def
send
(
self
,
*
args
):
return
apply
(
os
.
write
,
(
self
.
fd
,)
+
args
)
read
=
recv
write
=
send
def
close
(
self
):
return
os
.
close
(
self
.
fd
)
def
fileno
(
self
):
return
self
.
fd
class
file_dispatcher
(
dispatcher
):
def
__init__
(
self
,
fd
):
dispatcher
.
__init__
(
self
)
self
.
connected
=
1
# set it to non-blocking mode
flags
=
fcntl
.
fcntl
(
fd
,
fcntl
.
F_GETFL
,
0
)
flags
=
flags
|
os
.
O_NONBLOCK
fcntl
.
fcntl
(
fd
,
fcntl
.
F_SETFL
,
flags
)
self
.
set_file
(
fd
)
def
set_file
(
self
,
fd
):
self
.
_fileno
=
fd
self
.
socket
=
file_wrapper
(
fd
)
self
.
add_channel
()
lib/python/ZServer/__init__.py
View file @
6049e415
...
...
@@ -11,10 +11,23 @@
#
##############################################################################
import
sys
,
os
from
medusa.test
import
max_sockets
import
sys
,
os
# HACKERY to get around asyncore issues. This ought to go away! We're
# currently using the Python 2.2 asyncore bundled with Zope to override
# brokenness in the Python 2.1 version. We need to do some funny business
# to make this work, as a 2.2-ism crept into the asyncore code.
import
fcntl
,
FCNTL
if
not
hasattr
(
fcntl
,
'F_GETFL'
):
fcntl
.
F_GETFL
=
FCNTL
.
F_GETFL
fcntl
.
F_SETFL
=
FCNTL
.
F_SETFL
from
medusa
import
asyncore
sys
.
modules
[
'asyncore'
]
=
asyncore
from
medusa.test
import
max_sockets
CONNECTION_LIMIT
=
max_sockets
.
max_select_sockets
()
ZSERVER_VERSION
=
'1.1b1'
...
...
lib/python/ZServer/medusa/asynchat.py
View file @
6049e415
# -*- Mode: Python; tab-width: 4 -*-
#
$Id: asynchat.py,v 1.17 2001/05/01 11:44:48 andreas Exp $
#
Author: Sam Rushing <rushing@nightmare.com>
#
Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
#
Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
#
# All Rights Reserved
#
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
...
...
@@ -15,7 +15,7 @@
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
...
...
@@ -25,7 +25,7 @@
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
"""A class supporting chat-style (command/response) protocols.
r
"""A class supporting chat-style (command/response) protocols.
This class adds support for 'chat' style protocols - where one side
sends a 'command', and the other sends a response (examples would be
...
...
@@ -48,59 +48,58 @@ you - by calling your self.found_terminator() method.
import
socket
import
asyncore
import
string
class
async_chat
(
asyncore
.
dispatcher
):
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
# these are overridable defaults
ac_in_buffer_size
=
4096
ac_out_buffer_size
=
4096
ac_in_buffer_size
=
4096
ac_out_buffer_size
=
4096
def
__init__
(
self
,
conn
=
None
):
self
.
ac_in_buffer
=
''
self
.
ac_out_buffer
=
''
self
.
producer_fifo
=
fifo
()
asyncore
.
dispatcher
.
__init__
(
self
,
conn
)
def
set_terminator
(
self
,
term
):
"Set the input delimiter. Can be a fixed string of any length, an integer, or None"
self
.
terminator
=
term
def
get_terminator
(
self
):
return
self
.
terminator
# grab some more data from the socket,
# throw it to the collector method,
# check for the terminator,
# if found, transition to the next state.
# grab some more data from the socket,
# throw it to the collector method,
# check for the terminator,
# if found, transition to the next state.
def
handle_read
(
self
):
try
:
data
=
self
.
recv
(
self
.
ac_in_buffer_size
)
except
socket
.
error
,
why
:
self
.
handle_error
()
return
self
.
ac_in_buffer
=
self
.
ac_in_buffer
+
data
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
# combos with a single recv(1024).
while
self
.
ac_in_buffer
:
lb
=
len
(
self
.
ac_in_buffer
)
terminator
=
self
.
get_terminator
()
if
terminator
is
None
:
# no terminator, collect it all
# no terminator, collect it all
self
.
collect_incoming_data
(
self
.
ac_in_buffer
)
self
.
ac_in_buffer
=
''
elif
type
(
terminator
)
==
type
(
0
):
# numeric terminator
# numeric terminator
n
=
terminator
if
lb
<
n
:
self
.
collect_incoming_data
(
self
.
ac_in_buffer
)
...
...
@@ -112,71 +111,71 @@ class async_chat (asyncore.dispatcher):
self
.
terminator
=
0
self
.
found_terminator
()
else
:
# 3 cases:
# 1) end of buffer matches terminator exactly:
# collect data, transition
# 2) end of buffer matches some prefix:
# collect data to the prefix
# 3) end of buffer does not match any prefix:
# collect data
# 3 cases:
# 1) end of buffer matches terminator exactly:
# collect data, transition
# 2) end of buffer matches some prefix:
# collect data to the prefix
# 3) end of buffer does not match any prefix:
# collect data
terminator_len
=
len
(
terminator
)
index
=
s
tring
.
find
(
self
.
ac_in_buffer
,
terminator
)
index
=
s
elf
.
ac_in_buffer
.
find
(
terminator
)
if
index
!=
-
1
:
# we found the terminator
# we found the terminator
if
index
>
0
:
# don't bother reporting the empty string (source of subtle bugs)
# don't bother reporting the empty string (source of subtle bugs)
self
.
collect_incoming_data
(
self
.
ac_in_buffer
[:
index
])
self
.
ac_in_buffer
=
self
.
ac_in_buffer
[
index
+
terminator_len
:]
# This does the Right Thing if the terminator is changed here.
self
.
found_terminator
()
else
:
# check for a prefix of the terminator
# check for a prefix of the terminator
index
=
find_prefix_at_end
(
self
.
ac_in_buffer
,
terminator
)
if
index
:
if
index
!=
lb
:
# we found a prefix, collect up to the prefix
# we found a prefix, collect up to the prefix
self
.
collect_incoming_data
(
self
.
ac_in_buffer
[:
-
index
])
self
.
ac_in_buffer
=
self
.
ac_in_buffer
[
-
index
:]
break
else
:
# no prefix, collect it all
# no prefix, collect it all
self
.
collect_incoming_data
(
self
.
ac_in_buffer
)
self
.
ac_in_buffer
=
''
def
handle_write
(
self
):
self
.
initiate_send
()
def
handle_close
(
self
):
self
.
close
()
def
push
(
self
,
data
):
self
.
producer_fifo
.
push
(
simple_producer
(
data
))
self
.
initiate_send
()
def
push_with_producer
(
self
,
producer
):
self
.
producer_fifo
.
push
(
producer
)
self
.
initiate_send
()
def
readable
(
self
):
"predicate for inclusion in the readable for select()"
return
(
len
(
self
.
ac_in_buffer
)
<=
self
.
ac_in_buffer_size
)
def
writable
(
self
):
"predicate for inclusion in the writable for select()"
# return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
# this is about twice as fast, though not as clear.
return
not
(
(
self
.
ac_out_buffer
is
''
)
and
(
self
.
ac_out_buffer
==
''
)
and
self
.
producer_fifo
.
is_empty
()
and
self
.
connected
)
def
close_when_done
(
self
):
"automatically close this channel once the outgoing queue is empty"
self
.
producer_fifo
.
push
(
None
)
# refill the outgoing buffer by calling the more() method
# of the first producer in the queue
# refill the outgoing buffer by calling the more() method
# of the first producer in the queue
def
refill_buffer
(
self
):
_string_type
=
type
(
''
)
while
1
:
...
...
@@ -201,38 +200,38 @@ class async_chat (asyncore.dispatcher):
self
.
producer_fifo
.
pop
()
else
:
return
def
initiate_send
(
self
):
obs
=
self
.
ac_out_buffer_size
# try to refill the buffer
if
(
len
(
self
.
ac_out_buffer
)
<
obs
):
self
.
refill_buffer
()
if
self
.
ac_out_buffer
and
self
.
connected
:
# try to send the buffer
# try to send the buffer
try
:
num_sent
=
self
.
send
(
self
.
ac_out_buffer
[:
obs
])
if
num_sent
:
self
.
ac_out_buffer
=
self
.
ac_out_buffer
[
num_sent
:]
except
socket
.
error
,
why
:
self
.
handle_error
()
return
def
discard_buffers
(
self
):
# Emergencies only!
# Emergencies only!
self
.
ac_in_buffer
=
''
self
.
ac_out_buffer
=
''
while
self
.
producer_fifo
:
self
.
producer_fifo
.
pop
()
class
simple_producer
:
def
__init__
(
self
,
data
,
buffer_size
=
512
):
self
.
data
=
data
self
.
buffer_size
=
buffer_size
def
more
(
self
):
if
len
(
self
.
data
)
>
self
.
buffer_size
:
result
=
self
.
data
[:
self
.
buffer_size
]
...
...
@@ -242,26 +241,26 @@ class simple_producer:
result
=
self
.
data
self
.
data
=
''
return
result
class
fifo
:
def
__init__
(
self
,
list
=
None
):
if
not
list
:
self
.
list
=
[]
else
:
self
.
list
=
list
def
__len__
(
self
):
return
len
(
self
.
list
)
def
is_empty
(
self
):
return
self
.
list
==
[]
def
first
(
self
):
return
self
.
list
[
0
]
def
push
(
self
,
data
):
self
.
list
.
append
(
data
)
def
pop
(
self
):
if
self
.
list
:
result
=
self
.
list
[
0
]
...
...
@@ -269,24 +268,26 @@ class fifo:
return
(
1
,
result
)
else
:
return
(
0
,
None
)
# Given 'haystack', see if any prefix of 'needle' is at its end. This
# assumes an exact match has already been checked. Return the number of
# characters matched.
# for example:
# f_p_a_e ("qwerty\r", "\r\n") => 1
# f_p_a_e ("qwertydkjf", "\r\n") => 0
# f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
# this could maybe be made faster with a computed regex?
# [answer: no; circa Python-2.0, Jan 2001]
# new python: 28961/s
# old python: 18307/s
# re: 12820/s
# regex: 14035/s
# Given 'haystack', see if any prefix of 'needle' is at its end. This
# assumes an exact match has already been checked. Return the number of
# characters matched.
# for example:
# f_p_a_e ("qwerty\r", "\r\n") => 1
# f_p_a_e ("qwerty\r\n", "\r\n") => 2
# f_p_a_e ("qwertydkjf", "\r\n") => 0
# this could maybe be made faster with a computed regex?
# [answer: no; circa Python-2.0, Jan 2001]
# python: 18307/s
# re: 12820/s
# regex: 14035/s
def
find_prefix_at_end
(
haystack
,
needle
):
l
=
len
(
needle
)
-
1
while
l
and
not
haystack
.
endswith
(
needle
[:
l
]):
l
-=
1
return
l
nl
=
len
(
needle
)
result
=
0
for
i
in
range
(
1
,
nl
):
if
haystack
[
-
(
nl
-
i
):]
==
needle
[:(
nl
-
i
)]:
result
=
nl
-
i
break
return
result
lib/python/ZServer/medusa/asyncore.py
0 → 100644
View file @
6049e415
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
"""Basic infrastructure for asynchronous socket service clients and servers.
There are only two ways to have a program on a single processor do "more
than one thing at a time". Multi-threaded programming is the simplest and
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
rarely CPU-bound, however.
If your operating system supports the select() system call in its I/O
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background." Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
"""
import
exceptions
import
select
import
socket
import
sys
import
os
from
errno
import
EALREADY
,
EINPROGRESS
,
EWOULDBLOCK
,
ECONNRESET
,
\
ENOTCONN
,
ESHUTDOWN
,
EINTR
,
EISCONN
try
:
socket_map
except
NameError
:
socket_map
=
{}
class
ExitNow
(
exceptions
.
Exception
):
pass
DEBUG
=
0
def
poll
(
timeout
=
0.0
,
map
=
None
):
if
map
is
None
:
map
=
socket_map
if
map
:
r
=
[];
w
=
[];
e
=
[]
for
fd
,
obj
in
map
.
items
():
if
obj
.
readable
():
r
.
append
(
fd
)
if
obj
.
writable
():
w
.
append
(
fd
)
try
:
r
,
w
,
e
=
select
.
select
(
r
,
w
,
e
,
timeout
)
except
select
.
error
,
err
:
if
err
[
0
]
!=
EINTR
:
raise
if
DEBUG
:
print
r
,
w
,
e
for
fd
in
r
:
try
:
obj
=
map
[
fd
]
except
KeyError
:
continue
try
:
obj
.
handle_read_event
()
except
ExitNow
:
raise
ExitNow
except
:
obj
.
handle_error
()
for
fd
in
w
:
try
:
obj
=
map
[
fd
]
except
KeyError
:
continue
try
:
obj
.
handle_write_event
()
except
ExitNow
:
raise
ExitNow
except
:
obj
.
handle_error
()
def
poll2
(
timeout
=
0.0
,
map
=
None
):
import
poll
if
map
is
None
:
map
=
socket_map
if
timeout
is
not
None
:
# timeout is in milliseconds
timeout
=
int
(
timeout
*
1000
)
if
map
:
l
=
[]
for
fd
,
obj
in
map
.
items
():
flags
=
0
if
obj
.
readable
():
flags
=
poll
.
POLLIN
if
obj
.
writable
():
flags
=
flags
|
poll
.
POLLOUT
if
flags
:
l
.
append
((
fd
,
flags
))
r
=
poll
.
poll
(
l
,
timeout
)
for
fd
,
flags
in
r
:
try
:
obj
=
map
[
fd
]
except
KeyError
:
continue
try
:
if
(
flags
&
poll
.
POLLIN
):
obj
.
handle_read_event
()
if
(
flags
&
poll
.
POLLOUT
):
obj
.
handle_write_event
()
except
ExitNow
:
raise
ExitNow
except
:
obj
.
handle_error
()
def
poll3
(
timeout
=
0.0
,
map
=
None
):
# Use the poll() support added to the select module in Python 2.0
if
map
is
None
:
map
=
socket_map
if
timeout
is
not
None
:
# timeout is in milliseconds
timeout
=
int
(
timeout
*
1000
)
pollster
=
select
.
poll
()
if
map
:
for
fd
,
obj
in
map
.
items
():
flags
=
0
if
obj
.
readable
():
flags
=
select
.
POLLIN
if
obj
.
writable
():
flags
=
flags
|
select
.
POLLOUT
if
flags
:
pollster
.
register
(
fd
,
flags
)
try
:
r
=
pollster
.
poll
(
timeout
)
except
select
.
error
,
err
:
if
err
[
0
]
!=
EINTR
:
raise
r
=
[]
for
fd
,
flags
in
r
:
try
:
obj
=
map
[
fd
]
except
KeyError
:
continue
try
:
if
(
flags
&
select
.
POLLIN
):
obj
.
handle_read_event
()
if
(
flags
&
select
.
POLLOUT
):
obj
.
handle_write_event
()
except
ExitNow
:
raise
ExitNow
except
:
obj
.
handle_error
()
def
loop
(
timeout
=
30.0
,
use_poll
=
0
,
map
=
None
):
if
map
is
None
:
map
=
socket_map
if
use_poll
:
if
hasattr
(
select
,
'poll'
):
poll_fun
=
poll3
else
:
poll_fun
=
poll2
else
:
poll_fun
=
poll
while
map
:
poll_fun
(
timeout
,
map
)
class
dispatcher
:
debug
=
0
connected
=
0
accepting
=
0
closing
=
0
addr
=
None
def
__init__
(
self
,
sock
=
None
,
map
=
None
):
if
sock
:
self
.
set_socket
(
sock
,
map
)
# I think it should inherit this anyway
self
.
socket
.
setblocking
(
0
)
self
.
connected
=
1
self
.
addr
=
sock
.
getpeername
()
else
:
self
.
socket
=
None
def
__repr__
(
self
):
status
=
[
self
.
__class__
.
__module__
+
"."
+
self
.
__class__
.
__name__
]
if
self
.
accepting
and
self
.
addr
:
status
.
append
(
'listening'
)
elif
self
.
connected
:
status
.
append
(
'connected'
)
if
self
.
addr
is
not
None
:
try
:
status
.
append
(
'%s:%d'
%
self
.
addr
)
except
TypeError
:
status
.
append
(
repr
(
self
.
addr
))
return
'<%s at %#x>'
%
(
' '
.
join
(
status
),
id
(
self
))
def
add_channel
(
self
,
map
=
None
):
#self.log_info ('adding channel %s' % self)
if
map
is
None
:
map
=
socket_map
map
[
self
.
_fileno
]
=
self
def
del_channel
(
self
,
map
=
None
):
fd
=
self
.
_fileno
if
map
is
None
:
map
=
socket_map
if
map
.
has_key
(
fd
):
#self.log_info ('closing channel %d:%s' % (fd, self))
del
map
[
fd
]
def
create_socket
(
self
,
family
,
type
):
self
.
family_and_type
=
family
,
type
self
.
socket
=
socket
.
socket
(
family
,
type
)
self
.
socket
.
setblocking
(
0
)
self
.
_fileno
=
self
.
socket
.
fileno
()
self
.
add_channel
()
def
set_socket
(
self
,
sock
,
map
=
None
):
self
.
socket
=
sock
## self.__dict__['socket'] = sock
self
.
_fileno
=
sock
.
fileno
()
self
.
add_channel
(
map
)
def
set_reuse_addr
(
self
):
# try to re-use a server port if possible
try
:
self
.
socket
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
self
.
socket
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
)
|
1
)
except
socket
.
error
:
pass
# ==================================================
# predicates for select()
# these are used as filters for the lists of sockets
# to pass to select().
# ==================================================
def
readable
(
self
):
return
1
if
os
.
name
==
'mac'
:
# The macintosh will select a listening socket for
# write if you let it. What might this mean?
def
writable
(
self
):
return
not
self
.
accepting
else
:
def
writable
(
self
):
return
1
# ==================================================
# socket object methods.
# ==================================================
def
listen
(
self
,
num
):
self
.
accepting
=
1
if
os
.
name
==
'nt'
and
num
>
5
:
num
=
1
return
self
.
socket
.
listen
(
num
)
def
bind
(
self
,
addr
):
self
.
addr
=
addr
return
self
.
socket
.
bind
(
addr
)
def
connect
(
self
,
address
):
self
.
connected
=
0
err
=
self
.
socket
.
connect_ex
(
address
)
if
err
in
(
EINPROGRESS
,
EALREADY
,
EWOULDBLOCK
):
return
if
err
in
(
0
,
EISCONN
):
self
.
addr
=
address
self
.
connected
=
1
self
.
handle_connect
()
else
:
raise
socket
.
error
,
err
def
accept
(
self
):
try
:
conn
,
addr
=
self
.
socket
.
accept
()
return
conn
,
addr
except
socket
.
error
,
why
:
if
why
[
0
]
==
EWOULDBLOCK
:
pass
else
:
raise
socket
.
error
,
why
def
send
(
self
,
data
):
try
:
result
=
self
.
socket
.
send
(
data
)
return
result
except
socket
.
error
,
why
:
if
why
[
0
]
==
EWOULDBLOCK
:
return
0
else
:
raise
socket
.
error
,
why
return
0
def
recv
(
self
,
buffer_size
):
try
:
data
=
self
.
socket
.
recv
(
buffer_size
)
if
not
data
:
# a closed connection is indicated by signaling
# a read condition, and having recv() return 0.
self
.
handle_close
()
return
''
else
:
return
data
except
socket
.
error
,
why
:
# winsock sometimes throws ENOTCONN
if
why
[
0
]
in
[
ECONNRESET
,
ENOTCONN
,
ESHUTDOWN
]:
self
.
handle_close
()
return
''
else
:
raise
socket
.
error
,
why
def
close
(
self
):
self
.
del_channel
()
self
.
socket
.
close
()
# cheap inheritance, used to pass all other attribute
# references to the underlying socket object.
def
__getattr__
(
self
,
attr
):
return
getattr
(
self
.
socket
,
attr
)
# log and log_info maybe overriden to provide more sophisitcated
# logging and warning methods. In general, log is for 'hit' logging
# and 'log_info' is for informational, warning and error logging.
def
log
(
self
,
message
):
sys
.
stderr
.
write
(
'log: %s
\
n
'
%
str
(
message
))
def
log_info
(
self
,
message
,
type
=
'info'
):
if
__debug__
or
type
!=
'info'
:
print
'%s: %s'
%
(
type
,
message
)
def
handle_read_event
(
self
):
if
self
.
accepting
:
# for an accepting socket, getting a read implies
# that we are connected
if
not
self
.
connected
:
self
.
connected
=
1
self
.
handle_accept
()
elif
not
self
.
connected
:
self
.
handle_connect
()
self
.
connected
=
1
self
.
handle_read
()
else
:
self
.
handle_read
()
def
handle_write_event
(
self
):
# getting a write implies that we are connected
if
not
self
.
connected
:
self
.
handle_connect
()
self
.
connected
=
1
self
.
handle_write
()
def
handle_expt_event
(
self
):
self
.
handle_expt
()
def
handle_error
(
self
):
nil
,
t
,
v
,
tbinfo
=
compact_traceback
()
# sometimes a user repr method will crash.
try
:
self_repr
=
repr
(
self
)
except
:
self_repr
=
'<__repr__ (self) failed for object at %0x>'
%
id
(
self
)
self
.
log_info
(
'uncaptured python exception, closing channel %s (%s:%s %s)'
%
(
self_repr
,
t
,
v
,
tbinfo
),
'error'
)
self
.
close
()
def
handle_expt
(
self
):
self
.
log_info
(
'unhandled exception'
,
'warning'
)
def
handle_read
(
self
):
self
.
log_info
(
'unhandled read event'
,
'warning'
)
def
handle_write
(
self
):
self
.
log_info
(
'unhandled write event'
,
'warning'
)
def
handle_connect
(
self
):
self
.
log_info
(
'unhandled connect event'
,
'warning'
)
def
handle_accept
(
self
):
self
.
log_info
(
'unhandled accept event'
,
'warning'
)
def
handle_close
(
self
):
self
.
log_info
(
'unhandled close event'
,
'warning'
)
self
.
close
()
# ---------------------------------------------------------------------------
# adds simple buffered output capability, useful for simple clients.
# [for more sophisticated usage use asynchat.async_chat]
# ---------------------------------------------------------------------------
class
dispatcher_with_send
(
dispatcher
):
def
__init__
(
self
,
sock
=
None
):
dispatcher
.
__init__
(
self
,
sock
)
self
.
out_buffer
=
''
def
initiate_send
(
self
):
num_sent
=
0
num_sent
=
dispatcher
.
send
(
self
,
self
.
out_buffer
[:
512
])
self
.
out_buffer
=
self
.
out_buffer
[
num_sent
:]
def
handle_write
(
self
):
self
.
initiate_send
()
def
writable
(
self
):
return
(
not
self
.
connected
)
or
len
(
self
.
out_buffer
)
def
send
(
self
,
data
):
if
self
.
debug
:
self
.
log_info
(
'sending %s'
%
repr
(
data
))
self
.
out_buffer
=
self
.
out_buffer
+
data
self
.
initiate_send
()
# ---------------------------------------------------------------------------
# used for debugging.
# ---------------------------------------------------------------------------
def
compact_traceback
():
t
,
v
,
tb
=
sys
.
exc_info
()
tbinfo
=
[]
while
1
:
tbinfo
.
append
((
tb
.
tb_frame
.
f_code
.
co_filename
,
tb
.
tb_frame
.
f_code
.
co_name
,
str
(
tb
.
tb_lineno
)
))
tb
=
tb
.
tb_next
if
not
tb
:
break
# just to be safe
del
tb
file
,
function
,
line
=
tbinfo
[
-
1
]
info
=
'['
+
'] ['
.
join
(
map
(
lambda
x
:
'|'
.
join
(
x
),
tbinfo
))
+
']'
return
(
file
,
function
,
line
),
t
,
v
,
info
def
close_all
(
map
=
None
):
if
map
is
None
:
map
=
socket_map
for
x
in
map
.
values
():
x
.
socket
.
close
()
map
.
clear
()
# Asynchronous File I/O:
#
# After a little research (reading man pages on various unixen, and
# digging through the linux kernel), I've determined that select()
# isn't meant for doing doing asynchronous file i/o.
# Heartening, though - reading linux/mm/filemap.c shows that linux
# supports asynchronous read-ahead. So _MOST_ of the time, the data
# will be sitting in memory for us already when we go to read it.
#
# What other OS's (besides NT) support async file i/o? [VMS?]
#
# Regardless, this is useful for pipes, and stdin/stdout...
import
os
if
os
.
name
==
'posix'
:
import
fcntl
class
file_wrapper
:
# here we override just enough to make a file
# look like a socket for the purposes of asyncore.
def
__init__
(
self
,
fd
):
self
.
fd
=
fd
def
recv
(
self
,
*
args
):
return
apply
(
os
.
read
,
(
self
.
fd
,)
+
args
)
def
send
(
self
,
*
args
):
return
apply
(
os
.
write
,
(
self
.
fd
,)
+
args
)
read
=
recv
write
=
send
def
close
(
self
):
return
os
.
close
(
self
.
fd
)
def
fileno
(
self
):
return
self
.
fd
class
file_dispatcher
(
dispatcher
):
def
__init__
(
self
,
fd
):
dispatcher
.
__init__
(
self
)
self
.
connected
=
1
# set it to non-blocking mode
flags
=
fcntl
.
fcntl
(
fd
,
fcntl
.
F_GETFL
,
0
)
flags
=
flags
|
os
.
O_NONBLOCK
fcntl
.
fcntl
(
fd
,
fcntl
.
F_SETFL
,
flags
)
self
.
set_file
(
fd
)
def
set_file
(
self
,
fd
):
self
.
_fileno
=
fd
self
.
socket
=
file_wrapper
(
fd
)
self
.
add_channel
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment