Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZODB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Kirill Smelkov
ZODB
Commits
f3bffbef
Commit
f3bffbef
authored
Nov 02, 2001
by
Jeremy Hylton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Merge changes from zeo-1_0-branch to the trunk.
The trunk now has the same code ZEO 1.0b5 plus a few minor changes.
parent
965b0139
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
273 additions
and
57 deletions
+273
-57
trunk/src/ZEO/ClientCache.py
trunk/src/ZEO/ClientCache.py
+4
-2
trunk/src/ZEO/ClientStorage.py
trunk/src/ZEO/ClientStorage.py
+33
-19
trunk/src/ZEO/Invalidator.py
trunk/src/ZEO/Invalidator.py
+4
-0
trunk/src/ZEO/StorageServer.py
trunk/src/ZEO/StorageServer.py
+35
-12
trunk/src/ZEO/start.py
trunk/src/ZEO/start.py
+2
-2
trunk/src/ZEO/tests/Cache.py
trunk/src/ZEO/tests/Cache.py
+10
-0
trunk/src/ZEO/tests/forker.py
trunk/src/ZEO/tests/forker.py
+12
-6
trunk/src/ZEO/tests/stress.py
trunk/src/ZEO/tests/stress.py
+122
-0
trunk/src/ZEO/tests/testZEO.py
trunk/src/ZEO/tests/testZEO.py
+20
-5
trunk/src/ZEO/trigger.py
trunk/src/ZEO/trigger.py
+5
-1
trunk/src/ZEO/zrpc.py
trunk/src/ZEO/zrpc.py
+26
-10
No files found.
trunk/src/ZEO/ClientCache.py
View file @
f3bffbef
...
...
@@ -144,7 +144,7 @@ file 0 and file 1.
"""
__version__
=
"$Revision: 1.1
7
$"
[
11
:
-
2
]
__version__
=
"$Revision: 1.1
8
$"
[
11
:
-
2
]
import
os
,
tempfile
from
struct
import
pack
,
unpack
...
...
@@ -217,7 +217,7 @@ class ClientCache:
def
close
(
self
):
try
:
self
.
_f
[
self
.
_current
].
close
()
except
OSError
:
except
(
os
.
error
,
ValueError
)
:
pass
def
open
(
self
):
...
...
@@ -373,6 +373,8 @@ class ClientCache:
self
.
_f
[
current
]
=
open
(
self
.
_p
[
current
],
'w+b'
)
else
:
# Temporary cache file:
if
self
.
_f
[
current
]
is
not
None
:
self
.
_f
[
current
].
close
()
self
.
_f
[
current
]
=
tempfile
.
TemporaryFile
(
suffix
=
'.zec'
)
self
.
_f
[
current
].
write
(
magic
)
self
.
_pos
=
pos
=
4
...
...
trunk/src/ZEO/ClientStorage.py
View file @
f3bffbef
...
...
@@ -84,7 +84,8 @@
##############################################################################
"""Network ZODB storage client
"""
__version__
=
'$Revision: 1.34 $'
[
11
:
-
2
]
__version__
=
'$Revision: 1.35 $'
[
11
:
-
2
]
import
struct
,
time
,
os
,
socket
,
string
,
Sync
,
zrpc
,
ClientCache
import
tempfile
,
Invalidator
,
ExtensionClass
,
thread
...
...
@@ -168,17 +169,16 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
# Among other things, we know that our data methods won't get
# called until after this call.
invalidator
=
Invalidator
.
Invalidator
(
db
.
invalidate
,
self
.
_cache
.
invalidate
)
self
.
invalidator
=
Invalidator
.
Invalidator
(
db
.
invalidate
,
self
.
_cache
.
invalidate
)
def
out_of_band_hook
(
code
,
args
,
get_hook
=
{
'b'
:
(
invalidator
.
begin
,
0
),
'i'
:
(
invalidator
.
invalidate
,
1
),
'e'
:
(
invalidator
.
end
,
0
),
'I'
:
(
invalidator
.
Invalidate
,
1
),
'b'
:
(
self
.
invalidator
.
begin
,
0
),
'i'
:
(
self
.
invalidator
.
invalidate
,
1
),
'e'
:
(
self
.
invalidator
.
end
,
0
),
'I'
:
(
self
.
invalidator
.
Invalidate
,
1
),
'U'
:
(
self
.
_commit_lock_release
,
0
),
's'
:
(
self
.
_serials
.
append
,
1
),
'S'
:
(
self
.
_info
.
update
,
1
),
...
...
@@ -307,8 +307,18 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
try
:
LOG
(
"ClientStorage"
,
INFO
,
"close"
)
self
.
_call
.
closeIntensionally
()
try
:
self
.
_tfile
.
close
()
except
os
.
error
:
# On Windows, this can fail if it is called more than
# once, because it tries to delete the file each
# time.
pass
self
.
_cache
.
close
()
self
.
closed
=
1
if
self
.
invalidator
is
not
None
:
self
.
invalidator
.
close
()
self
.
invalidator
=
None
self
.
closed
=
1
finally
:
self
.
_lock_release
()
def
commitVersion
(
self
,
src
,
dest
,
transaction
):
...
...
@@ -317,7 +327,6 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
self
.
_lock_acquire
()
try
:
oids
=
self
.
_call
(
'commitVersion'
,
src
,
dest
,
self
.
_serial
)
invalidate
=
self
.
_cache
.
invalidate
if
dest
:
vlen
=
pack
(
">H"
,
len
(
src
))
# just invalidate our version data
...
...
@@ -436,12 +445,17 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
finally
:
self
.
_lock_release
()
def
supportsUndo
(
self
):
return
self
.
_info
[
'supportsUndo'
]
def
supportsVersions
(
self
):
return
self
.
_info
[
'supportsVersions'
]
def
supportsUndo
(
self
):
return
self
.
_info
[
'supportsUndo'
]
def
supportsVersions
(
self
):
return
self
.
_info
[
'supportsVersions'
]
def
supportsTransactionalUndo
(
self
):
return
self
.
_info
[
'supportsTransactionalUndo'
]
try
:
return
self
.
_info
[
'supportsTransactionalUndo'
]
except
KeyError
:
return
0
def
tpc_abort
(
self
,
transaction
):
self
.
_lock_acquire
()
...
...
@@ -522,7 +536,6 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
seek
=
tfile
.
seek
read
=
tfile
.
read
cache
=
self
.
_cache
update
=
cache
.
update
size
=
tfile
.
tell
()
cache
.
checkSize
(
size
)
seek
(
0
)
...
...
@@ -543,9 +556,9 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
"temporary file."
)
if
s
==
ResolvedSerial
:
cache
.
invalidate
(
oid
,
v
)
self
.
_
cache
.
invalidate
(
oid
,
v
)
else
:
update
(
oid
,
s
,
v
,
p
)
self
.
_cache
.
update
(
oid
,
s
,
v
,
p
)
i
=
i
+
15
+
vlen
+
dlen
elif
opcode
==
"i"
:
oid
=
read
(
8
)
...
...
@@ -578,7 +591,8 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
try
:
oids
=
self
.
_call
(
'undo'
,
transaction_id
)
cinvalidate
=
self
.
_cache
.
invalidate
for
oid
in
oids
:
cinvalidate
(
oid
,
''
)
for
oid
in
oids
:
cinvalidate
(
oid
,
''
)
return
oids
finally
:
self
.
_lock_release
()
...
...
trunk/src/ZEO/Invalidator.py
View file @
f3bffbef
...
...
@@ -98,6 +98,10 @@ class Invalidator:
self
.
dinvalidate
=
dinvalidate
self
.
cinvalidate
=
cinvalidate
def
close
(
self
):
self
.
dinvalidate
=
None
self
.
cinvalidate
=
None
def
begin
(
self
):
self
.
_tfile
=
tempfile
.
TemporaryFile
()
pickler
=
cPickle
.
Pickler
(
self
.
_tfile
,
1
)
...
...
trunk/src/ZEO/StorageServer.py
View file @
f3bffbef
...
...
@@ -83,7 +83,7 @@
#
##############################################################################
__version__
=
"$Revision: 1.
29
$"
[
11
:
-
2
]
__version__
=
"$Revision: 1.
30
$"
[
11
:
-
2
]
import
asyncore
,
socket
,
string
,
sys
,
os
from
smac
import
SizedMessageAsyncConnection
...
...
@@ -104,8 +104,17 @@ class StorageServerError(POSException.StorageError): pass
max_blather
=
120
def
blather
(
*
args
):
m
=
string
.
join
(
map
(
str
,
args
))
if
len
(
m
)
>
max_blather
:
m
=
m
[:
max_blather
]
+
' ...'
accum
=
[]
total_len
=
0
for
arg
in
args
:
if
not
isinstance
(
arg
,
StringType
):
arg
=
str
(
arg
)
accum
.
append
(
arg
)
total_len
=
total_len
+
len
(
arg
)
if
total_len
>=
max_blather
:
break
m
=
string
.
join
(
accum
)
if
len
(
m
)
>
max_blather
:
m
=
m
[:
max_blather
]
+
' ...'
LOG
(
'ZEO Server'
,
TRACE
,
m
)
...
...
@@ -121,11 +130,13 @@ class StorageServer(asyncore.dispatcher):
def
__init__
(
self
,
connection
,
storages
):
self
.
__storages
=
storages
for
n
,
s
in
storages
.
items
():
init_storage
(
s
)
for
n
,
s
in
storages
.
items
():
init_storage
(
s
)
self
.
__connections
=
{}
self
.
__get_connections
=
self
.
__connections
.
get
self
.
_pack_trigger
=
trigger
.
trigger
()
asyncore
.
dispatcher
.
__init__
(
self
)
if
type
(
connection
)
is
type
(
''
):
...
...
@@ -258,7 +269,12 @@ class ZEOConnection(SizedMessageAsyncConnection):
def
message_input
(
self
,
message
,
dump
=
dump
,
Unpickler
=
Unpickler
,
StringIO
=
StringIO
,
None
=
None
):
if
__debug__
:
blather
(
'message_input'
,
id
(
self
),
`message`
)
if
__debug__
:
if
len
(
message
)
>
max_blather
:
tmp
=
`message[:max_blather]`
else
:
tmp
=
`message`
blather
(
'message_input'
,
id
(
self
),
tmp
)
if
self
.
__storage
is
None
:
# This is the first communication from the client
...
...
@@ -276,7 +292,9 @@ class ZEOConnection(SizedMessageAsyncConnection):
args
=
unpickler
.
load
()
name
,
args
=
args
[
0
],
args
[
1
:]
if
__debug__
:
blather
(
'call %s: %s%s'
%
(
id
(
self
),
name
,
`args`
))
if
__debug__
:
apply
(
blather
,
(
"call"
,
id
(
self
),
":"
,
name
,)
+
args
)
if
not
storage_method
(
name
):
raise
'Invalid Method Name'
,
name
...
...
@@ -294,7 +312,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
self
.
return_error
(
sys
.
exc_info
()[
0
],
sys
.
exc_info
()[
1
])
return
if
__debug__
:
blather
(
"%s R: %s"
%
(
id
(
self
),
`r`
))
if
__debug__
:
blather
(
"%s R: %s"
%
(
id
(
self
),
`r`
))
r
=
dump
(
r
,
1
)
self
.
message_output
(
'R'
+
r
)
...
...
@@ -303,7 +322,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
if
type
(
err_value
)
is
not
type
(
self
):
err_value
=
err_type
,
err_value
if
__debug__
:
blather
(
"%s E: %s"
%
(
id
(
self
),
`err_value`
))
if
__debug__
:
blather
(
"%s E: %s"
%
(
id
(
self
),
`err_value`
))
try
:
r
=
dump
(
err_value
,
1
)
except
:
...
...
@@ -396,11 +416,12 @@ class ZEOConnection(SizedMessageAsyncConnection):
error
=
sys
.
exc_info
())
if
wait
:
self
.
return_error
(
sys
.
exc_info
()[
0
],
sys
.
exc_info
()[
1
])
self
.
_pack_trigger
.
pull_trigger
()
self
.
_
_server
.
_
pack_trigger
.
pull_trigger
()
else
:
if
wait
:
self
.
message_output
(
'RN.'
)
self
.
_pack_trigger
.
pull_trigger
()
self
.
__server
.
_pack_trigger
.
pull_trigger
()
else
:
# Broadcast new size statistics
self
.
__server
.
invalidate
(
0
,
self
.
__storage_id
,
(),
...
...
@@ -582,6 +603,8 @@ if __name__=='__main__':
port
=
''
,
int
(
port
)
except
:
pass
StorageServer
(
port
,
ZODB
.
FileStorage
.
FileStorage
(
name
))
d
=
{
'1'
:
ZODB
.
FileStorage
.
FileStorage
(
name
)}
StorageServer
(
port
,
d
)
asyncwrap
.
loop
()
trunk/src/ZEO/start.py
View file @
f3bffbef
...
...
@@ -86,7 +86,7 @@
"""Start the server storage.
"""
__version__
=
"$Revision: 1.2
5
$"
[
11
:
-
2
]
__version__
=
"$Revision: 1.2
6
$"
[
11
:
-
2
]
import
sys
,
os
,
getopt
,
string
...
...
@@ -359,7 +359,7 @@ def shutdown(storages, die=1):
for
storage
in
storages
.
values
():
try
:
storage
.
close
()
finally
:
pass
except
:
pass
try
:
from
zLOG
import
LOG
,
INFO
...
...
trunk/src/ZEO/tests/Cache.py
View file @
f3bffbef
...
...
@@ -13,7 +13,17 @@ class TransUndoStorageWithCache:
revid
=
self
.
_dostore
(
oid
,
revid
=
revid
,
data
=
MinPO
(
25
))
info
=
self
.
_storage
.
undoInfo
()
if
not
info
:
# XXX perhaps we have an old storage implementation that
# does do the negative nonsense
info
=
self
.
_storage
.
undoInfo
(
0
,
20
)
tid
=
info
[
0
][
'id'
]
# We may need to bail at this point if the storage doesn't
# support transactional undo
if
not
self
.
_storage
.
supportsTransactionalUndo
():
return
# Now start an undo transaction
self
.
_transaction
.
note
(
'undo1'
)
self
.
_storage
.
tpc_begin
(
self
.
_transaction
)
...
...
trunk/src/ZEO/tests/forker.py
View file @
f3bffbef
...
...
@@ -23,10 +23,13 @@ def get_port():
port
=
random
.
randrange
(
20000
,
30000
)
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
try
:
s
.
connect
((
'localhost'
,
port
))
except
socket
.
error
:
# XXX check value of error?
return
port
try
:
s
.
connect
((
'localhost'
,
port
))
except
socket
.
error
:
# XXX check value of error?
return
port
finally
:
s
.
close
()
raise
RuntimeError
,
"Can't find port"
if
os
.
name
==
"nt"
:
...
...
@@ -37,14 +40,15 @@ if os.name == "nt":
Returns the ZEO port, the test server port, and the pid.
"""
import
ZEO.tests.winserver
port
=
get_port
()
if
port
is
None
:
port
=
get_port
()
script
=
ZEO
.
tests
.
winserver
.
__file__
if
script
.
endswith
(
'.pyc'
):
script
=
script
[:
-
1
]
args
=
(
sys
.
executable
,
script
,
str
(
port
),
storage_name
)
+
args
d
=
os
.
environ
.
copy
()
d
[
'PYTHONPATH'
]
=
os
.
pathsep
.
join
(
sys
.
path
)
pid
=
os
.
spawnve
(
os
.
P_NOWAIT
,
sys
.
executable
,
args
,
d
)
pid
=
os
.
spawnve
(
os
.
P_NOWAIT
,
sys
.
executable
,
args
,
os
.
environ
)
return
(
'localhost'
,
port
),
(
'localhost'
,
port
+
1
),
pid
else
:
...
...
@@ -74,6 +78,7 @@ else:
def
close
(
self
):
os
.
write
(
self
.
pipe
,
"done"
)
os
.
close
(
self
.
pipe
)
def
start_zeo_server
(
storage
,
addr
):
rd
,
wr
=
os
.
pipe
()
...
...
@@ -97,6 +102,7 @@ else:
ZEOServerExit
(
rd
)
serv
=
ZEO
.
StorageServer
.
StorageServer
(
addr
,
{
'1'
:
storage
})
asyncore
.
loop
()
os
.
close
(
rd
)
storage
.
close
()
if
isinstance
(
addr
,
types
.
StringType
):
os
.
unlink
(
addr
)
...
...
trunk/src/ZEO/tests/stress.py
0 → 100644
View file @
f3bffbef
"""A ZEO client-server stress test to look for leaks.
The stress test should run in an infinite loop and should involve
multiple connections.
"""
from
__future__
import
nested_scopes
import
ZODB
from
ZEO.ClientStorage
import
ClientStorage
from
ZODB.MappingStorage
import
MappingStorage
from
ZEO.tests
import
forker
from
ZODB.tests
import
MinPO
import
zLOG
import
os
import
random
import
sys
import
types
NUM_TRANSACTIONS_PER_CONN
=
10
NUM_CONNECTIONS
=
10
NUM_ROOTS
=
20
MAX_DEPTH
=
20
MIN_OBJSIZE
=
128
MAX_OBJSIZE
=
2048
def
an_object
():
"""Return an object suitable for a PersistentMapping key"""
size
=
random
.
randrange
(
MIN_OBJSIZE
,
MAX_OBJSIZE
)
if
os
.
path
.
exists
(
"/dev/urandom"
):
f
=
open
(
"/dev/urandom"
)
buf
=
f
.
read
(
size
)
f
.
close
()
return
buf
else
:
f
=
open
(
MinPO
.
__file__
)
l
=
list
(
f
.
read
(
size
))
f
.
close
()
random
.
shuffle
(
l
)
return
""
.
join
(
l
)
def
setup
(
cn
):
"""Initialize the database with some objects"""
root
=
cn
.
root
()
for
i
in
range
(
NUM_ROOTS
):
prev
=
an_object
()
for
j
in
range
(
random
.
randrange
(
1
,
MAX_DEPTH
)):
o
=
MinPO
.
MinPO
(
prev
)
prev
=
o
root
[
an_object
()]
=
o
get_transaction
().
commit
()
cn
.
close
()
def
work
(
cn
):
"""Do some work with a transaction"""
cn
.
sync
()
root
=
cn
.
root
()
obj
=
random
.
choice
(
root
.
values
())
# walk down to the bottom
while
not
isinstance
(
obj
.
value
,
types
.
StringType
):
obj
=
obj
.
value
obj
.
value
=
an_object
()
get_transaction
().
commit
()
def
main
():
# Yuck! Need to cleanup forker so that the API is consistent
# across Unix and Windows, at least if that's possible.
if
os
.
name
==
"nt"
:
zaddr
,
tport
,
pid
=
forker
.
start_zeo_server
(
'MappingStorage'
,
())
def
exitserver
():
import
socket
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
s
.
connect
(
tport
)
s
.
close
()
else
:
zaddr
=
''
,
random
.
randrange
(
20000
,
30000
)
pid
,
exitobj
=
forker
.
start_zeo_server
(
MappingStorage
(),
zaddr
)
def
exitserver
():
exitobj
.
close
()
while
1
:
pid
=
start_child
(
zaddr
)
print
"started"
,
pid
os
.
waitpid
(
pid
,
0
)
exitserver
()
def
start_child
(
zaddr
):
pid
=
os
.
fork
()
if
pid
!=
0
:
return
pid
storage
=
ClientStorage
(
zaddr
,
debug
=
1
,
min_disconnect_poll
=
0.5
)
db
=
ZODB
.
DB
(
storage
,
pool_size
=
NUM_CONNECTIONS
)
setup
(
db
.
open
())
conns
=
[]
conn_count
=
0
for
i
in
range
(
NUM_CONNECTIONS
):
c
=
db
.
open
()
c
.
__count
=
0
conns
.
append
(
c
)
conn_count
+=
1
while
conn_count
<
25
:
c
=
random
.
choice
(
conns
)
if
c
.
__count
>
NUM_TRANSACTIONS_PER_CONN
:
conns
.
remove
(
c
)
c
.
close
()
conn_count
+=
1
c
=
db
.
open
()
c
.
__count
=
0
conns
.
append
(
c
)
else
:
c
.
__count
+=
1
work
(
c
)
os
.
_exit
(
0
)
if
__name__
==
"__main__"
:
main
()
trunk/src/ZEO/tests/testZEO.py
View file @
f3bffbef
...
...
@@ -98,8 +98,26 @@ class ZEOTestBase(StorageTestBase.StorageTestBase):
raise
serial
d
[
oid
]
=
serial
return
d
# Some of the ZEO tests depend on the version of FileStorage available
# for the tests. If we run these tests using Zope 2.3, FileStorage
# doesn't support TransactionalUndo.
if
hasattr
(
FileStorage
,
'supportsTransactionalUndo'
):
# XXX Assume that a FileStorage that supports transactional undo
# also supports conflict resolution.
class
VersionDependentTests
(
TransactionalUndoStorage
.
TransactionalUndoStorage
,
TransactionalUndoVersionStorage
.
TransactionalUndoVersionStorage
,
ConflictResolution
.
ConflictResolvingStorage
,
ConflictResolution
.
ConflictResolvingTransUndoStorage
):
pass
else
:
class
VersionDependentTests
:
pass
class
GenericTests
(
ZEOTestBase
,
VersionDependentTests
,
Cache
.
StorageWithCache
,
Cache
.
TransUndoStorageWithCache
,
BasicStorage
.
BasicStorage
,
...
...
@@ -107,10 +125,6 @@ class GenericTests(ZEOTestBase,
RevisionStorage
.
RevisionStorage
,
PackableStorage
.
PackableStorage
,
Synchronization
.
SynchronizedStorage
,
ConflictResolution
.
ConflictResolvingStorage
,
ConflictResolution
.
ConflictResolvingTransUndoStorage
,
TransactionalUndoStorage
.
TransactionalUndoStorage
,
TransactionalUndoVersionStorage
.
TransactionalUndoVersionStorage
,
):
"""An abstract base class for ZEO tests
...
...
@@ -187,7 +201,7 @@ class WindowsGenericTests(GenericTests):
zeo_addr
,
self
.
test_addr
,
self
.
test_pid
=
\
forker
.
start_zeo_server
(
name
,
args
)
storage
=
ZEO
.
ClientStorage
.
ClientStorage
(
zeo_addr
,
debug
=
1
,
min_disconnect_poll
=
0.
5
)
min_disconnect_poll
=
0.
1
)
self
.
_storage
=
PackWaitWrapper
(
storage
)
storage
.
registerDB
(
DummyDB
(),
None
)
...
...
@@ -195,6 +209,7 @@ class WindowsGenericTests(GenericTests):
self
.
_storage
.
close
()
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
s
.
connect
(
self
.
test_addr
)
s
.
close
()
# the connection should cause the storage server to die
## os.waitpid(self.test_pid, 0)
time
.
sleep
(
0.5
)
...
...
trunk/src/ZEO/trigger.py
View file @
f3bffbef
...
...
@@ -130,12 +130,16 @@ if os.name == 'posix':
# the main thread is trying to remove some]
def
__init__
(
self
):
r
,
w
=
os
.
pipe
()
r
,
w
=
self
.
_fds
=
os
.
pipe
()
self
.
trigger
=
w
asyncore
.
file_dispatcher
.
__init__
(
self
,
r
)
self
.
lock
=
thread
.
allocate_lock
()
self
.
thunks
=
[]
def
__del__
(
self
):
os
.
close
(
self
.
_fds
[
0
])
os
.
close
(
self
.
_fds
[
1
])
def
__repr__
(
self
):
return
'<select-trigger (pipe) at %x>'
%
id
(
self
)
...
...
trunk/src/ZEO/zrpc.py
View file @
f3bffbef
...
...
@@ -85,7 +85,7 @@
"""Simple rpc mechanisms
"""
__version__
=
"$Revision: 1.
19
$"
[
11
:
-
2
]
__version__
=
"$Revision: 1.
20
$"
[
11
:
-
2
]
from
cPickle
import
loads
import
cPickle
...
...
@@ -166,8 +166,10 @@ class asyncRPC(SizedMessageAsyncConnection):
return
1
def
finishConnect
(
self
,
s
):
if
self
.
__haveMainLoop
:
map
=
None
# use the main loop map
else
:
map
=
{}
# provide a dummy map
if
self
.
__haveMainLoop
:
map
=
None
# use the main loop map
else
:
map
=
{}
# provide a dummy map
SizedMessageAsyncConnection
.
__init__
(
self
,
s
,
''
,
map
)
# we are our own socket map!
...
...
@@ -221,12 +223,21 @@ class asyncRPC(SizedMessageAsyncConnection):
if
c
==
'R'
:
if
r
==
'RN.'
:
return
None
# Common case!
return
loads
(
r
[
1
:])
# If c == 'E', an error occured on the server. In
# this case, the return value is a pickled exception.
# Unpickle it and raise it on the client side. The
# traceback for this exception ends at this method,
# but the real error occurred somewhere in the server
# code. To diagnose the error, look for the real
# traceback in the server's zLOG output.
if
c
==
'E'
:
try
:
r
=
loads
(
r
[
1
:])
except
:
raise
UnUnPickleableError
(
r
[
1
:])
if
type
(
r
)
is
TupleType
:
raise
r
[
0
],
r
[
1
]
raise
r
if
type
(
r
)
is
TupleType
:
raise
r
[
0
],
r
[
1
]
# see server log for real traceback
raise
r
oob
=
self
.
_outOfBand
if
oob
is
not
None
:
r
=
r
[
1
:]
...
...
@@ -260,8 +271,10 @@ class asyncRPC(SizedMessageAsyncConnection):
def
message_input
(
self
,
m
):
if
self
.
_debug
:
md
=
`m`
if
len
(
m
)
>
60
:
md
=
md
[:
60
]
+
' ...'
if
len
(
m
)
>
60
:
md
=
repr
(
m
[:
60
])
+
' ...'
else
:
md
=
repr
(
m
)
LOG
(
self
.
_debug
,
TRACE
,
'message_input %s'
%
md
)
c
=
m
[:
1
]
...
...
@@ -292,6 +305,7 @@ class asyncRPC(SizedMessageAsyncConnection):
self
.
__Wakeup
(
lambda
self
=
self
:
self
.
close
())
else
:
self
.
close
()
self
.
_outOfBand
=
None
self
.
__closed
=
1
def
close
(
self
):
...
...
@@ -299,6 +313,8 @@ class asyncRPC(SizedMessageAsyncConnection):
self
.
aq_parent
.
notifyDisconnected
(
self
)
# causes read call to raise last exception, which should be
# the socket error that caused asyncore to close the socket.
self
.
__r
=
'E'
+
dump
(
sys
.
exc_info
()[:
2
],
1
)
try
:
self
.
__lr
()
except
:
pass
self
.
__r
=
'E'
+
dump
(
sys
.
exc_info
()[:
2
],
1
)
try
:
self
.
__lr
()
except
:
pass
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment