Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
Zope
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
Zope
Commits
435912cb
Commit
435912cb
authored
May 22, 2000
by
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
un-freaking-screwed indenting on select_trigger. I promise I'll never
touch it again.
parent
44190dce
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
506 additions
and
506 deletions
+506
-506
ZServer/medusa/select_trigger.py
ZServer/medusa/select_trigger.py
+253
-253
lib/python/ZServer/medusa/select_trigger.py
lib/python/ZServer/medusa/select_trigger.py
+253
-253
No files found.
ZServer/medusa/select_trigger.py
View file @
435912cb
# -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING
=
"$Id: select_trigger.py,v 1.1
1 2000/05/22 16:28:42
brian Exp $"
VERSION_STRING
=
"$Id: select_trigger.py,v 1.1
2 2000/05/22 18:28:55
brian Exp $"
import
asyncore
import
asynchat
...
...
@@ -9,273 +9,273 @@ import os
import
socket
import
string
import
thread
if
os
.
name
==
'posix'
:
class
trigger
(
asyncore
.
file_dispatcher
):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def
__init__
(
self
):
r
,
w
=
os
.
pipe
()
self
.
trigger
=
w
asyncore
.
file_dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
def
__repr__
(
self
):
return
'<select-trigger (pipe) at %x>'
%
id
(
self
)
def
readable
(
self
):
return
1
def
writable
(
self
):
return
0
def
handle_connect
(
self
):
pass
def
pull_trigger
(
self
,
thunk
=
None
):
# print 'PULL_TRIGGER: ', len(self.thunks)
if
thunk
:
try
:
self
.
lock
.
acquire
()
self
.
thunks
.
append
(
thunk
)
finally
:
self
.
lock
.
release
()
os
.
write
(
self
.
trigger
,
'x'
)
def
handle_read
(
self
):
self
.
recv
(
8192
)
try
:
self
.
lock
.
acquire
()
for
thunk
in
self
.
thunks
:
try
:
thunk
()
except
:
(
file
,
fun
,
line
),
t
,
v
,
tbinfo
=
asyncore
.
compact_traceback
()
print
'exception in trigger thunk: (%s:%s %s)'
%
(
t
,
v
,
tbinfo
)
self
.
thunks
=
[]
finally
:
self
.
lock
.
release
()
class
trigger
(
asyncore
.
file_dispatcher
):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def
__init__
(
self
):
r
,
w
=
os
.
pipe
()
self
.
trigger
=
w
asyncore
.
file_dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
def
__repr__
(
self
):
return
'<select-trigger (pipe) at %x>'
%
id
(
self
)
def
readable
(
self
):
return
1
def
writable
(
self
):
return
0
def
handle_connect
(
self
):
pass
def
pull_trigger
(
self
,
thunk
=
None
):
# print 'PULL_TRIGGER: ', len(self.thunks)
if
thunk
:
try
:
self
.
lock
.
acquire
()
self
.
thunks
.
append
(
thunk
)
finally
:
self
.
lock
.
release
()
os
.
write
(
self
.
trigger
,
'x'
)
def
handle_read
(
self
):
self
.
recv
(
8192
)
try
:
self
.
lock
.
acquire
()
for
thunk
in
self
.
thunks
:
try
:
thunk
()
except
:
(
file
,
fun
,
line
),
t
,
v
,
tbinfo
=
asyncore
.
compact_traceback
()
print
'exception in trigger thunk: (%s:%s %s)'
%
(
t
,
v
,
tbinfo
)
self
.
thunks
=
[]
finally
:
self
.
lock
.
release
()
else
:
# win32-safe version
# win32-safe version
class
trigger
(
asyncore
.
dispatcher
):
class
trigger
(
asyncore
.
dispatcher
):
address
=
(
'127.9.9.9'
,
19999
)
address
=
(
'127.9.9.9'
,
19999
)
def
__init__
(
self
):
def
__init__
(
self
):
a
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
w
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
# set TCP_NODELAY to true to avoid buffering
w
.
setsockopt
(
socket
.
IPPROTO_TCP
,
1
,
1
)
# tricky: get a pair of connected sockets
host
=
'127.0.0.1'
port
=
19999
while
1
:
try
:
self
.
address
=
(
host
,
port
)
a
.
bind
(
self
.
address
)
break
except
:
if
port
<=
19950
:
raise
'Bind Error'
,
'Cannot bind trigger!'
port
=
port
-
1
a
.
listen
(
1
)
w
.
setblocking
(
0
)
try
:
w
.
connect
(
self
.
address
)
except
:
pass
r
,
addr
=
a
.
accept
()
a
.
close
()
w
.
setblocking
(
1
)
self
.
trigger
=
w
asyncore
.
dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
self
.
_trigger_connected
=
0
def
__repr__
(
self
):
return
'<select-trigger (loopback) at %x>'
%
id
(
self
)
def
readable
(
self
):
return
1
def
writable
(
self
):
return
0
def
handle_connect
(
self
):
pass
def
pull_trigger
(
self
,
thunk
=
None
):
if
thunk
:
w
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
# set TCP_NODELAY to true to avoid buffering
w
.
setsockopt
(
socket
.
IPPROTO_TCP
,
1
,
1
)
# tricky: get a pair of connected sockets
host
=
'127.0.0.1'
port
=
19999
while
1
:
try
:
self
.
lock
.
acquire
()
self
.
thunks
.
append
(
thunk
)
finally
:
self
.
lock
.
release
()
self
.
trigger
.
send
(
'x'
)
def
handle_read
(
self
):
self
.
recv
(
8192
)
try
:
self
.
lock
.
acquire
()
for
thunk
in
self
.
thunks
:
try
:
thunk
()
except
:
(
file
,
fun
,
line
),
t
,
v
,
tbinfo
=
asyncore
.
compact_traceback
()
print
'exception in trigger thunk: (%s:%s %s)'
%
(
t
,
v
,
tbinfo
)
self
.
thunks
=
[]
finally
:
self
.
lock
.
release
()
self
.
address
=
(
host
,
port
)
a
.
bind
(
self
.
address
)
break
except
:
if
port
<=
19950
:
raise
'Bind Error'
,
'Cannot bind trigger!'
port
=
port
-
1
a
.
listen
(
1
)
w
.
setblocking
(
0
)
try
:
w
.
connect
(
self
.
address
)
except
:
pass
r
,
addr
=
a
.
accept
()
a
.
close
()
w
.
setblocking
(
1
)
self
.
trigger
=
w
asyncore
.
dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
self
.
_trigger_connected
=
0
def
__repr__
(
self
):
return
'<select-trigger (loopback) at %x>'
%
id
(
self
)
def
readable
(
self
):
return
1
def
writable
(
self
):
return
0
def
handle_connect
(
self
):
pass
def
pull_trigger
(
self
,
thunk
=
None
):
if
thunk
:
try
:
self
.
lock
.
acquire
()
self
.
thunks
.
append
(
thunk
)
finally
:
self
.
lock
.
release
()
self
.
trigger
.
send
(
'x'
)
def
handle_read
(
self
):
self
.
recv
(
8192
)
try
:
self
.
lock
.
acquire
()
for
thunk
in
self
.
thunks
:
try
:
thunk
()
except
:
(
file
,
fun
,
line
),
t
,
v
,
tbinfo
=
asyncore
.
compact_traceback
()
print
'exception in trigger thunk: (%s:%s %s)'
%
(
t
,
v
,
tbinfo
)
self
.
thunks
=
[]
finally
:
self
.
lock
.
release
()
the_trigger
=
None
class
trigger_file
:
"A 'triggered' file object"
buffer_size
=
4096
def
__init__
(
self
,
parent
):
global
the_trigger
if
the_trigger
is
None
:
the_trigger
=
trigger
()
self
.
parent
=
parent
self
.
buffer
=
''
def
write
(
self
,
data
):
self
.
buffer
=
self
.
buffer
+
data
if
len
(
self
.
buffer
)
>
self
.
buffer_size
:
d
,
self
.
buffer
=
self
.
buffer
,
''
the_trigger
.
pull_trigger
(
lambda
d
=
d
,
p
=
self
.
parent
:
p
.
push
(
d
)
)
def
writeline
(
self
,
line
):
self
.
write
(
line
+
'
\
r
\
n
'
)
def
writelines
(
self
,
lines
):
self
.
write
(
string
.
joinfields
(
lines
,
'
\
r
\
n
'
)
+
'
\
r
\
n
'
)
def
flush
(
self
):
if
self
.
buffer
:
d
,
self
.
buffer
=
self
.
buffer
,
''
the_trigger
.
pull_trigger
(
lambda
p
=
self
.
parent
,
d
=
d
:
p
.
push
(
d
)
)
def
softspace
(
self
,
*
args
):
pass
def
close
(
self
):
# in a derived class, you may want to call trigger_close() instead.
self
.
flush
()
self
.
parent
=
None
def
trigger_close
(
self
):
d
,
self
.
buffer
=
self
.
buffer
,
''
p
,
self
.
parent
=
self
.
parent
,
None
the_trigger
.
pull_trigger
(
lambda
p
=
p
,
d
=
d
:
(
p
.
push
(
d
),
p
.
close_when_done
())
)
"A 'triggered' file object"
buffer_size
=
4096
def
__init__
(
self
,
parent
):
global
the_trigger
if
the_trigger
is
None
:
the_trigger
=
trigger
()
self
.
parent
=
parent
self
.
buffer
=
''
def
write
(
self
,
data
):
self
.
buffer
=
self
.
buffer
+
data
if
len
(
self
.
buffer
)
>
self
.
buffer_size
:
d
,
self
.
buffer
=
self
.
buffer
,
''
the_trigger
.
pull_trigger
(
lambda
d
=
d
,
p
=
self
.
parent
:
p
.
push
(
d
)
)
def
writeline
(
self
,
line
):
self
.
write
(
line
+
'
\
r
\
n
'
)
def
writelines
(
self
,
lines
):
self
.
write
(
string
.
joinfields
(
lines
,
'
\
r
\
n
'
)
+
'
\
r
\
n
'
)
def
flush
(
self
):
if
self
.
buffer
:
d
,
self
.
buffer
=
self
.
buffer
,
''
the_trigger
.
pull_trigger
(
lambda
p
=
self
.
parent
,
d
=
d
:
p
.
push
(
d
)
)
def
softspace
(
self
,
*
args
):
pass
def
close
(
self
):
# in a derived class, you may want to call trigger_close() instead.
self
.
flush
()
self
.
parent
=
None
def
trigger_close
(
self
):
d
,
self
.
buffer
=
self
.
buffer
,
''
p
,
self
.
parent
=
self
.
parent
,
None
the_trigger
.
pull_trigger
(
lambda
p
=
p
,
d
=
d
:
(
p
.
push
(
d
),
p
.
close_when_done
())
)
if
__name__
==
'__main__'
:
import
time
def
thread_function
(
output_file
,
i
,
n
):
print
'entering thread_function'
while
n
:
time
.
sleep
(
5
)
output_file
.
write
(
'%2d.%2d %s
\
r
\
n
'
%
(
i
,
n
,
output_file
))
output_file
.
flush
()
n
=
n
-
1
output_file
.
close
()
print
'exiting thread_function'
class
thread_parent
(
asynchat
.
async_chat
):
def
__init__
(
self
,
conn
,
addr
):
self
.
addr
=
addr
asynchat
.
async_chat
.
__init__
(
self
,
conn
)
self
.
set_terminator
(
'
\
r
\
n
'
)
self
.
buffer
=
''
self
.
count
=
0
def
collect_incoming_data
(
self
,
data
):
self
.
buffer
=
self
.
buffer
+
data
def
found_terminator
(
self
):
data
,
self
.
buffer
=
self
.
buffer
,
''
if
not
data
:
asyncore
.
close_all
()
print
"done"
return
n
=
string
.
atoi
(
string
.
split
(
data
)[
0
])
tf
=
trigger_file
(
self
)
self
.
count
=
self
.
count
+
1
thread
.
start_new_thread
(
thread_function
,
(
tf
,
self
.
count
,
n
))
class
thread_server
(
asyncore
.
dispatcher
):
def
__init__
(
self
,
family
=
socket
.
AF_INET
,
address
=
(
''
,
9003
)):
asyncore
.
dispatcher
.
__init__
(
self
)
self
.
create_socket
(
family
,
socket
.
SOCK_STREAM
)
self
.
set_reuse_addr
()
self
.
bind
(
address
)
self
.
listen
(
5
)
def
handle_accept
(
self
):
conn
,
addr
=
self
.
accept
()
tp
=
thread_parent
(
conn
,
addr
)
thread_server
()
#asyncore.loop(1.0, use_poll=1)
try
:
asyncore
.
loop
()
except
:
asyncore
.
close_all
()
import
time
def
thread_function
(
output_file
,
i
,
n
):
print
'entering thread_function'
while
n
:
time
.
sleep
(
5
)
output_file
.
write
(
'%2d.%2d %s
\
r
\
n
'
%
(
i
,
n
,
output_file
))
output_file
.
flush
()
n
=
n
-
1
output_file
.
close
()
print
'exiting thread_function'
class
thread_parent
(
asynchat
.
async_chat
):
def
__init__
(
self
,
conn
,
addr
):
self
.
addr
=
addr
asynchat
.
async_chat
.
__init__
(
self
,
conn
)
self
.
set_terminator
(
'
\
r
\
n
'
)
self
.
buffer
=
''
self
.
count
=
0
def
collect_incoming_data
(
self
,
data
):
self
.
buffer
=
self
.
buffer
+
data
def
found_terminator
(
self
):
data
,
self
.
buffer
=
self
.
buffer
,
''
if
not
data
:
asyncore
.
close_all
()
print
"done"
return
n
=
string
.
atoi
(
string
.
split
(
data
)[
0
])
tf
=
trigger_file
(
self
)
self
.
count
=
self
.
count
+
1
thread
.
start_new_thread
(
thread_function
,
(
tf
,
self
.
count
,
n
))
class
thread_server
(
asyncore
.
dispatcher
):
def
__init__
(
self
,
family
=
socket
.
AF_INET
,
address
=
(
''
,
9003
)):
asyncore
.
dispatcher
.
__init__
(
self
)
self
.
create_socket
(
family
,
socket
.
SOCK_STREAM
)
self
.
set_reuse_addr
()
self
.
bind
(
address
)
self
.
listen
(
5
)
def
handle_accept
(
self
):
conn
,
addr
=
self
.
accept
()
tp
=
thread_parent
(
conn
,
addr
)
thread_server
()
#asyncore.loop(1.0, use_poll=1)
try
:
asyncore
.
loop
()
except
:
asyncore
.
close_all
()
lib/python/ZServer/medusa/select_trigger.py
View file @
435912cb
# -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING
=
"$Id: select_trigger.py,v 1.1
1 2000/05/22 16:28:42
brian Exp $"
VERSION_STRING
=
"$Id: select_trigger.py,v 1.1
2 2000/05/22 18:28:55
brian Exp $"
import
asyncore
import
asynchat
...
...
@@ -9,273 +9,273 @@ import os
import
socket
import
string
import
thread
if
os
.
name
==
'posix'
:
class
trigger
(
asyncore
.
file_dispatcher
):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def
__init__
(
self
):
r
,
w
=
os
.
pipe
()
self
.
trigger
=
w
asyncore
.
file_dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
def
__repr__
(
self
):
return
'<select-trigger (pipe) at %x>'
%
id
(
self
)
def
readable
(
self
):
return
1
def
writable
(
self
):
return
0
def
handle_connect
(
self
):
pass
def
pull_trigger
(
self
,
thunk
=
None
):
# print 'PULL_TRIGGER: ', len(self.thunks)
if
thunk
:
try
:
self
.
lock
.
acquire
()
self
.
thunks
.
append
(
thunk
)
finally
:
self
.
lock
.
release
()
os
.
write
(
self
.
trigger
,
'x'
)
def
handle_read
(
self
):
self
.
recv
(
8192
)
try
:
self
.
lock
.
acquire
()
for
thunk
in
self
.
thunks
:
try
:
thunk
()
except
:
(
file
,
fun
,
line
),
t
,
v
,
tbinfo
=
asyncore
.
compact_traceback
()
print
'exception in trigger thunk: (%s:%s %s)'
%
(
t
,
v
,
tbinfo
)
self
.
thunks
=
[]
finally
:
self
.
lock
.
release
()
class
trigger
(
asyncore
.
file_dispatcher
):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def
__init__
(
self
):
r
,
w
=
os
.
pipe
()
self
.
trigger
=
w
asyncore
.
file_dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
def
__repr__
(
self
):
return
'<select-trigger (pipe) at %x>'
%
id
(
self
)
def
readable
(
self
):
return
1
def
writable
(
self
):
return
0
def
handle_connect
(
self
):
pass
def
pull_trigger
(
self
,
thunk
=
None
):
# print 'PULL_TRIGGER: ', len(self.thunks)
if
thunk
:
try
:
self
.
lock
.
acquire
()
self
.
thunks
.
append
(
thunk
)
finally
:
self
.
lock
.
release
()
os
.
write
(
self
.
trigger
,
'x'
)
def
handle_read
(
self
):
self
.
recv
(
8192
)
try
:
self
.
lock
.
acquire
()
for
thunk
in
self
.
thunks
:
try
:
thunk
()
except
:
(
file
,
fun
,
line
),
t
,
v
,
tbinfo
=
asyncore
.
compact_traceback
()
print
'exception in trigger thunk: (%s:%s %s)'
%
(
t
,
v
,
tbinfo
)
self
.
thunks
=
[]
finally
:
self
.
lock
.
release
()
else
:
# win32-safe version
# win32-safe version
class
trigger
(
asyncore
.
dispatcher
):
class
trigger
(
asyncore
.
dispatcher
):
address
=
(
'127.9.9.9'
,
19999
)
address
=
(
'127.9.9.9'
,
19999
)
def
__init__
(
self
):
def
__init__
(
self
):
a
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
w
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
# set TCP_NODELAY to true to avoid buffering
w
.
setsockopt
(
socket
.
IPPROTO_TCP
,
1
,
1
)
# tricky: get a pair of connected sockets
host
=
'127.0.0.1'
port
=
19999
while
1
:
try
:
self
.
address
=
(
host
,
port
)
a
.
bind
(
self
.
address
)
break
except
:
if
port
<=
19950
:
raise
'Bind Error'
,
'Cannot bind trigger!'
port
=
port
-
1
a
.
listen
(
1
)
w
.
setblocking
(
0
)
try
:
w
.
connect
(
self
.
address
)
except
:
pass
r
,
addr
=
a
.
accept
()
a
.
close
()
w
.
setblocking
(
1
)
self
.
trigger
=
w
asyncore
.
dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
self
.
_trigger_connected
=
0
def
__repr__
(
self
):
return
'<select-trigger (loopback) at %x>'
%
id
(
self
)
def
readable
(
self
):
return
1
def
writable
(
self
):
return
0
def
handle_connect
(
self
):
pass
def
pull_trigger
(
self
,
thunk
=
None
):
if
thunk
:
w
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
# set TCP_NODELAY to true to avoid buffering
w
.
setsockopt
(
socket
.
IPPROTO_TCP
,
1
,
1
)
# tricky: get a pair of connected sockets
host
=
'127.0.0.1'
port
=
19999
while
1
:
try
:
self
.
lock
.
acquire
()
self
.
thunks
.
append
(
thunk
)
finally
:
self
.
lock
.
release
()
self
.
trigger
.
send
(
'x'
)
def
handle_read
(
self
):
self
.
recv
(
8192
)
try
:
self
.
lock
.
acquire
()
for
thunk
in
self
.
thunks
:
try
:
thunk
()
except
:
(
file
,
fun
,
line
),
t
,
v
,
tbinfo
=
asyncore
.
compact_traceback
()
print
'exception in trigger thunk: (%s:%s %s)'
%
(
t
,
v
,
tbinfo
)
self
.
thunks
=
[]
finally
:
self
.
lock
.
release
()
self
.
address
=
(
host
,
port
)
a
.
bind
(
self
.
address
)
break
except
:
if
port
<=
19950
:
raise
'Bind Error'
,
'Cannot bind trigger!'
port
=
port
-
1
a
.
listen
(
1
)
w
.
setblocking
(
0
)
try
:
w
.
connect
(
self
.
address
)
except
:
pass
r
,
addr
=
a
.
accept
()
a
.
close
()
w
.
setblocking
(
1
)
self
.
trigger
=
w
asyncore
.
dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
self
.
_trigger_connected
=
0
def
__repr__
(
self
):
return
'<select-trigger (loopback) at %x>'
%
id
(
self
)
def
readable
(
self
):
return
1
def
writable
(
self
):
return
0
def
handle_connect
(
self
):
pass
def
pull_trigger
(
self
,
thunk
=
None
):
if
thunk
:
try
:
self
.
lock
.
acquire
()
self
.
thunks
.
append
(
thunk
)
finally
:
self
.
lock
.
release
()
self
.
trigger
.
send
(
'x'
)
def
handle_read
(
self
):
self
.
recv
(
8192
)
try
:
self
.
lock
.
acquire
()
for
thunk
in
self
.
thunks
:
try
:
thunk
()
except
:
(
file
,
fun
,
line
),
t
,
v
,
tbinfo
=
asyncore
.
compact_traceback
()
print
'exception in trigger thunk: (%s:%s %s)'
%
(
t
,
v
,
tbinfo
)
self
.
thunks
=
[]
finally
:
self
.
lock
.
release
()
the_trigger
=
None
class
trigger_file
:
"A 'triggered' file object"
buffer_size
=
4096
def
__init__
(
self
,
parent
):
global
the_trigger
if
the_trigger
is
None
:
the_trigger
=
trigger
()
self
.
parent
=
parent
self
.
buffer
=
''
def
write
(
self
,
data
):
self
.
buffer
=
self
.
buffer
+
data
if
len
(
self
.
buffer
)
>
self
.
buffer_size
:
d
,
self
.
buffer
=
self
.
buffer
,
''
the_trigger
.
pull_trigger
(
lambda
d
=
d
,
p
=
self
.
parent
:
p
.
push
(
d
)
)
def
writeline
(
self
,
line
):
self
.
write
(
line
+
'
\
r
\
n
'
)
def
writelines
(
self
,
lines
):
self
.
write
(
string
.
joinfields
(
lines
,
'
\
r
\
n
'
)
+
'
\
r
\
n
'
)
def
flush
(
self
):
if
self
.
buffer
:
d
,
self
.
buffer
=
self
.
buffer
,
''
the_trigger
.
pull_trigger
(
lambda
p
=
self
.
parent
,
d
=
d
:
p
.
push
(
d
)
)
def
softspace
(
self
,
*
args
):
pass
def
close
(
self
):
# in a derived class, you may want to call trigger_close() instead.
self
.
flush
()
self
.
parent
=
None
def
trigger_close
(
self
):
d
,
self
.
buffer
=
self
.
buffer
,
''
p
,
self
.
parent
=
self
.
parent
,
None
the_trigger
.
pull_trigger
(
lambda
p
=
p
,
d
=
d
:
(
p
.
push
(
d
),
p
.
close_when_done
())
)
"A 'triggered' file object"
buffer_size
=
4096
def
__init__
(
self
,
parent
):
global
the_trigger
if
the_trigger
is
None
:
the_trigger
=
trigger
()
self
.
parent
=
parent
self
.
buffer
=
''
def
write
(
self
,
data
):
self
.
buffer
=
self
.
buffer
+
data
if
len
(
self
.
buffer
)
>
self
.
buffer_size
:
d
,
self
.
buffer
=
self
.
buffer
,
''
the_trigger
.
pull_trigger
(
lambda
d
=
d
,
p
=
self
.
parent
:
p
.
push
(
d
)
)
def
writeline
(
self
,
line
):
self
.
write
(
line
+
'
\
r
\
n
'
)
def
writelines
(
self
,
lines
):
self
.
write
(
string
.
joinfields
(
lines
,
'
\
r
\
n
'
)
+
'
\
r
\
n
'
)
def
flush
(
self
):
if
self
.
buffer
:
d
,
self
.
buffer
=
self
.
buffer
,
''
the_trigger
.
pull_trigger
(
lambda
p
=
self
.
parent
,
d
=
d
:
p
.
push
(
d
)
)
def
softspace
(
self
,
*
args
):
pass
def
close
(
self
):
# in a derived class, you may want to call trigger_close() instead.
self
.
flush
()
self
.
parent
=
None
def
trigger_close
(
self
):
d
,
self
.
buffer
=
self
.
buffer
,
''
p
,
self
.
parent
=
self
.
parent
,
None
the_trigger
.
pull_trigger
(
lambda
p
=
p
,
d
=
d
:
(
p
.
push
(
d
),
p
.
close_when_done
())
)
if
__name__
==
'__main__'
:
import
time
def
thread_function
(
output_file
,
i
,
n
):
print
'entering thread_function'
while
n
:
time
.
sleep
(
5
)
output_file
.
write
(
'%2d.%2d %s
\
r
\
n
'
%
(
i
,
n
,
output_file
))
output_file
.
flush
()
n
=
n
-
1
output_file
.
close
()
print
'exiting thread_function'
class
thread_parent
(
asynchat
.
async_chat
):
def
__init__
(
self
,
conn
,
addr
):
self
.
addr
=
addr
asynchat
.
async_chat
.
__init__
(
self
,
conn
)
self
.
set_terminator
(
'
\
r
\
n
'
)
self
.
buffer
=
''
self
.
count
=
0
def
collect_incoming_data
(
self
,
data
):
self
.
buffer
=
self
.
buffer
+
data
def
found_terminator
(
self
):
data
,
self
.
buffer
=
self
.
buffer
,
''
if
not
data
:
asyncore
.
close_all
()
print
"done"
return
n
=
string
.
atoi
(
string
.
split
(
data
)[
0
])
tf
=
trigger_file
(
self
)
self
.
count
=
self
.
count
+
1
thread
.
start_new_thread
(
thread_function
,
(
tf
,
self
.
count
,
n
))
class
thread_server
(
asyncore
.
dispatcher
):
def
__init__
(
self
,
family
=
socket
.
AF_INET
,
address
=
(
''
,
9003
)):
asyncore
.
dispatcher
.
__init__
(
self
)
self
.
create_socket
(
family
,
socket
.
SOCK_STREAM
)
self
.
set_reuse_addr
()
self
.
bind
(
address
)
self
.
listen
(
5
)
def
handle_accept
(
self
):
conn
,
addr
=
self
.
accept
()
tp
=
thread_parent
(
conn
,
addr
)
thread_server
()
#asyncore.loop(1.0, use_poll=1)
try
:
asyncore
.
loop
()
except
:
asyncore
.
close_all
()
import
time
def
thread_function
(
output_file
,
i
,
n
):
print
'entering thread_function'
while
n
:
time
.
sleep
(
5
)
output_file
.
write
(
'%2d.%2d %s
\
r
\
n
'
%
(
i
,
n
,
output_file
))
output_file
.
flush
()
n
=
n
-
1
output_file
.
close
()
print
'exiting thread_function'
class
thread_parent
(
asynchat
.
async_chat
):
def
__init__
(
self
,
conn
,
addr
):
self
.
addr
=
addr
asynchat
.
async_chat
.
__init__
(
self
,
conn
)
self
.
set_terminator
(
'
\
r
\
n
'
)
self
.
buffer
=
''
self
.
count
=
0
def
collect_incoming_data
(
self
,
data
):
self
.
buffer
=
self
.
buffer
+
data
def
found_terminator
(
self
):
data
,
self
.
buffer
=
self
.
buffer
,
''
if
not
data
:
asyncore
.
close_all
()
print
"done"
return
n
=
string
.
atoi
(
string
.
split
(
data
)[
0
])
tf
=
trigger_file
(
self
)
self
.
count
=
self
.
count
+
1
thread
.
start_new_thread
(
thread_function
,
(
tf
,
self
.
count
,
n
))
class
thread_server
(
asyncore
.
dispatcher
):
def
__init__
(
self
,
family
=
socket
.
AF_INET
,
address
=
(
''
,
9003
)):
asyncore
.
dispatcher
.
__init__
(
self
)
self
.
create_socket
(
family
,
socket
.
SOCK_STREAM
)
self
.
set_reuse_addr
()
self
.
bind
(
address
)
self
.
listen
(
5
)
def
handle_accept
(
self
):
conn
,
addr
=
self
.
accept
()
tp
=
thread_parent
(
conn
,
addr
)
thread_server
()
#asyncore.loop(1.0, use_poll=1)
try
:
asyncore
.
loop
()
except
:
asyncore
.
close_all
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment