Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZODB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Kirill Smelkov
ZODB
Commits
0431ca7b
Commit
0431ca7b
authored
Jan 11, 2002
by
Jeremy Hylton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Undo merge of ZEO-ZRPC-Dev branch into the trunk (for now).
parent
1dbb6eed
Changes
13
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
1235 additions
and
2098 deletions
+1235
-2098
trunk/src/ZEO/ClientCache.py
trunk/src/ZEO/ClientCache.py
+55
-56
trunk/src/ZEO/ClientStorage.py
trunk/src/ZEO/ClientStorage.py
+452
-423
trunk/src/ZEO/ClientStub.py
trunk/src/ZEO/ClientStub.py
+0
-25
trunk/src/ZEO/Exceptions.py
trunk/src/ZEO/Exceptions.py
+0
-5
trunk/src/ZEO/ServerStub.py
trunk/src/ZEO/ServerStub.py
+0
-108
trunk/src/ZEO/StorageServer.py
trunk/src/ZEO/StorageServer.py
+443
-310
trunk/src/ZEO/TransactionBuffer.py
trunk/src/ZEO/TransactionBuffer.py
+0
-68
trunk/src/ZEO/smac.py
trunk/src/ZEO/smac.py
+89
-103
trunk/src/ZEO/start.py
trunk/src/ZEO/start.py
+94
-82
trunk/src/ZEO/tests/forker.py
trunk/src/ZEO/tests/forker.py
+13
-35
trunk/src/ZEO/tests/testTransactionBuffer.py
trunk/src/ZEO/tests/testTransactionBuffer.py
+0
-64
trunk/src/ZEO/zrpc/smac.py
trunk/src/ZEO/zrpc/smac.py
+89
-103
trunk/src/ZEO/zrpc2.py
trunk/src/ZEO/zrpc2.py
+0
-716
No files found.
trunk/src/ZEO/ClientCache.py
View file @
0431ca7b
...
...
@@ -144,20 +144,18 @@ file 0 and file 1.
"""
__version__
=
"$Revision: 1.
19
$"
[
11
:
-
2
]
__version__
=
"$Revision: 1.
20
$"
[
11
:
-
2
]
import
os
,
tempfile
from
struct
import
pack
,
unpack
from
thread
import
allocate_lock
import
sys
import
zLOG
def
log
(
msg
,
level
=
zLOG
.
INFO
):
zLOG
.
LOG
(
"ZEC"
,
level
,
msg
)
magic
=
'ZEC0'
def
LOG
(
msg
,
level
=
zLOG
.
BLATHER
):
zLOG
.
LOG
(
"ZEC"
,
level
,
msg
)
class
ClientCache
:
def
__init__
(
self
,
storage
=
''
,
size
=
20000000
,
client
=
None
,
var
=
None
):
...
...
@@ -213,14 +211,16 @@ class ClientCache:
f
[
0
].
write
(
magic
)
current
=
0
log
(
"cache opened. current = %s"
%
current
)
self
.
_limit
=
size
/
2
self
.
_current
=
current
def
close
(
self
):
try
:
self
.
_f
[
self
.
_current
].
close
()
except
(
os
.
error
,
ValueError
):
pass
def
open
(
self
):
# XXX open is overloaded to perform two tasks for
# optimization reasons
self
.
_acquire
()
try
:
self
.
_index
=
index
=
{}
...
...
@@ -235,19 +235,6 @@ class ClientCache:
return
serial
.
items
()
finally
:
self
.
_release
()
def
close
(
self
):
for
f
in
self
.
_f
:
if
f
is
not
None
:
f
.
close
()
def
verify
(
self
,
verifyFunc
):
"""Call the verifyFunc on every object in the cache.
verifyFunc(oid, serialno, version)
"""
for
oid
,
(
s
,
vs
)
in
self
.
open
():
verifyFunc
(
oid
,
s
,
vs
)
def
invalidate
(
self
,
oid
,
version
):
self
.
_acquire
()
try
:
...
...
@@ -386,6 +373,8 @@ class ClientCache:
self
.
_f
[
current
]
=
open
(
self
.
_p
[
current
],
'w+b'
)
else
:
# Temporary cache file:
if
self
.
_f
[
current
]
is
not
None
:
self
.
_f
[
current
].
close
()
self
.
_f
[
current
]
=
tempfile
.
TemporaryFile
(
suffix
=
'.zec'
)
self
.
_f
[
current
].
write
(
magic
)
self
.
_pos
=
pos
=
4
...
...
@@ -394,57 +383,55 @@ class ClientCache:
def
store
(
self
,
oid
,
p
,
s
,
version
,
pv
,
sv
):
self
.
_acquire
()
try
:
self
.
_store
(
oid
,
p
,
s
,
version
,
pv
,
sv
)
finally
:
self
.
_release
()
try
:
self
.
_store
(
oid
,
p
,
s
,
version
,
pv
,
sv
)
finally
:
self
.
_release
()
def
_store
(
self
,
oid
,
p
,
s
,
version
,
pv
,
sv
):
if
not
s
:
p
=
''
s
=
'
\
0
\
0
\
0
\
0
\
0
\
0
\
0
\
0
'
tlen
=
31
+
len
(
p
)
p
=
''
s
=
'
\
0
\
0
\
0
\
0
\
0
\
0
\
0
\
0
'
tlen
=
31
+
len
(
p
)
if
version
:
tlen
=
tlen
+
len
(
version
)
+
12
+
len
(
pv
)
vlen
=
len
(
version
)
tlen
=
tlen
+
len
(
version
)
+
12
+
len
(
pv
)
vlen
=
len
(
version
)
else
:
vlen
=
0
vlen
=
0
stlen
=
pack
(
">I"
,
tlen
)
# accumulate various data to write into a list
l
=
[
oid
,
'v'
,
stlen
,
pack
(
">HI"
,
vlen
,
len
(
p
)),
s
]
if
p
:
l
.
append
(
p
)
pos
=
self
.
_pos
current
=
self
.
_current
f
=
self
.
_f
[
current
]
f
.
seek
(
pos
)
stlen
=
pack
(
">I"
,
tlen
)
write
=
f
.
write
write
(
oid
+
'v'
+
stlen
+
pack
(
">HI"
,
vlen
,
len
(
p
))
+
s
)
if
p
:
write
(
p
)
if
version
:
l
.
extend
([
version
,
pack
(
">I"
,
len
(
pv
)),
pv
,
sv
])
l
.
append
(
stlen
)
f
=
self
.
_f
[
self
.
_current
]
f
.
seek
(
self
.
_pos
)
f
.
write
(
""
.
join
(
l
))
if
self
.
_current
:
self
.
_index
[
oid
]
=
-
self
.
_pos
else
:
self
.
_index
[
oid
]
=
self
.
_pos
write
(
version
)
write
(
pack
(
">I"
,
len
(
pv
)))
write
(
pv
)
write
(
sv
)
write
(
stlen
)
if
current
:
self
.
_index
[
oid
]
=-
pos
else
:
self
.
_index
[
oid
]
=
pos
self
.
_pos
+=
tlen
self
.
_pos
=
pos
+
tlen
def
read_index
(
index
,
serial
,
f
,
current
):
LOG
(
"read_index(%s)"
%
f
.
name
)
seek
=
f
.
seek
read
=
f
.
read
pos
=
4
seek
(
0
,
2
)
size
=
f
.
tell
()
while
1
:
f
.
seek
(
pos
)
seek
(
pos
)
h
=
read
(
27
)
if
len
(
h
)
==
27
and
h
[
8
]
in
'vni'
:
tlen
,
vlen
,
dlen
=
unpack
(
">iHi"
,
h
[
9
:
19
])
else
:
tlen
=-
1
else
:
break
if
tlen
<=
0
or
vlen
<
0
or
dlen
<
0
or
vlen
+
dlen
>
tlen
:
break
...
...
@@ -479,3 +466,15 @@ def read_index(index, serial, f, current):
except
:
pass
return
pos
def
main
(
files
):
for
file
in
files
:
print
file
index
=
{}
serial
=
{}
read_index
(
index
,
serial
,
open
(
file
),
0
)
print
index
.
keys
()
if
__name__
==
"__main__"
:
import
sys
main
(
sys
.
argv
[
1
:])
trunk/src/ZEO/ClientStorage.py
View file @
0431ca7b
This diff is collapsed.
Click to expand it.
trunk/src/ZEO/ClientStub.py
deleted
100644 → 0
View file @
1dbb6eed
"""Stub for interface exported by ClientStorage"""
class
ClientStorage
:
def
__init__
(
self
,
rpc
):
self
.
rpc
=
rpc
def
beginVerify
(
self
):
self
.
rpc
.
callAsync
(
'begin'
)
# XXX what's the difference between these two?
def
invalidate
(
self
,
args
):
self
.
rpc
.
callAsync
(
'invalidate'
,
args
)
def
Invalidate
(
self
,
args
):
self
.
rpc
.
callAsync
(
'Invalidate'
,
args
)
def
endVerify
(
self
):
self
.
rpc
.
callAsync
(
'end'
)
def
serialno
(
self
,
arg
):
self
.
rpc
.
callAsync
(
'serialno'
,
arg
)
def
info
(
self
,
arg
):
self
.
rpc
.
callAsync
(
'info'
,
arg
)
trunk/src/ZEO/Exceptions.py
deleted
100644 → 0
View file @
1dbb6eed
"""Exceptions for ZEO."""
class
Disconnected
(
Exception
):
"""Exception raised when a ZEO client is disconnected from the
ZEO server."""
trunk/src/ZEO/ServerStub.py
deleted
100644 → 0
View file @
1dbb6eed
"""Stub for interface exposed by StorageServer"""
class
StorageServer
:
def
__init__
(
self
,
rpc
):
self
.
rpc
=
rpc
def
register
(
self
,
storage_name
,
read_only
):
self
.
rpc
.
call
(
'register'
,
storage_name
,
read_only
)
def
get_info
(
self
):
return
self
.
rpc
.
call
(
'get_info'
)
def
get_size_info
(
self
):
return
self
.
rpc
.
call
(
'get_size_info'
)
def
beginZeoVerify
(
self
):
self
.
rpc
.
callAsync
(
'beginZeoVerify'
)
def
zeoVerify
(
self
,
oid
,
s
,
sv
):
self
.
rpc
.
callAsync
(
'zeoVerify'
,
oid
,
s
,
sv
)
def
endZeoVerify
(
self
):
self
.
rpc
.
callAsync
(
'endZeoVerify'
)
def
new_oids
(
self
,
n
=
None
):
if
n
is
None
:
return
self
.
rpc
.
call
(
'new_oids'
)
else
:
return
self
.
rpc
.
call
(
'new_oids'
,
n
)
def
pack
(
self
,
t
,
wait
=
None
):
if
wait
is
None
:
self
.
rpc
.
call
(
'pack'
,
t
)
else
:
self
.
rpc
.
call
(
'pack'
,
t
,
wait
)
def
zeoLoad
(
self
,
oid
):
return
self
.
rpc
.
call
(
'zeoLoad'
,
oid
)
def
storea
(
self
,
oid
,
serial
,
data
,
version
,
id
):
self
.
rpc
.
callAsync
(
'storea'
,
oid
,
serial
,
data
,
version
,
id
)
def
tpc_begin
(
self
,
id
,
user
,
descr
,
ext
):
return
self
.
rpc
.
call
(
'tpc_begin'
,
id
,
user
,
descr
,
ext
)
def
vote
(
self
,
trans_id
):
return
self
.
rpc
.
call
(
'vote'
,
trans_id
)
def
tpc_finish
(
self
,
id
):
return
self
.
rpc
.
call
(
'tpc_finish'
,
id
)
def
tpc_abort
(
self
,
id
):
self
.
rpc
.
callAsync
(
'tpc_abort'
,
id
)
def
abortVersion
(
self
,
src
,
id
):
return
self
.
rpc
.
call
(
'abortVersion'
,
src
,
id
)
def
commitVersion
(
self
,
src
,
dest
,
id
):
return
self
.
rpc
.
call
(
'commitVersion'
,
src
,
dest
,
id
)
def
history
(
self
,
oid
,
version
,
length
=
None
):
if
length
is
not
None
:
return
self
.
rpc
.
call
(
'history'
,
oid
,
version
)
else
:
return
self
.
rpc
.
call
(
'history'
,
oid
,
version
,
length
)
def
load
(
self
,
oid
,
version
):
return
self
.
rpc
.
call
(
'load'
,
oid
,
version
)
def
loadSerial
(
self
,
oid
,
serial
):
return
self
.
rpc
.
call
(
'loadSerial'
,
oid
,
serial
)
def
modifiedInVersion
(
self
,
oid
):
return
self
.
rpc
.
call
(
'modifiedInVersion'
,
oid
)
def
new_oid
(
self
,
last
=
None
):
if
last
is
None
:
return
self
.
rpc
.
call
(
'new_oid'
)
else
:
return
self
.
rpc
.
call
(
'new_oid'
,
last
)
def
store
(
self
,
oid
,
serial
,
data
,
version
,
trans
):
return
self
.
rpc
.
call
(
'store'
,
oid
,
serial
,
data
,
version
,
trans
)
def
transactionalUndo
(
self
,
trans_id
,
trans
):
return
self
.
rpc
.
call
(
'transactionalUndo'
,
trans_id
,
trans
)
def
undo
(
self
,
trans_id
):
return
self
.
rpc
.
call
(
'undo'
,
trans_id
)
def
undoLog
(
self
,
first
,
last
):
# XXX filter not allowed across RPC
return
self
.
rpc
.
call
(
'undoLog'
,
first
,
last
)
def
undoInfo
(
self
,
first
,
last
,
spec
):
return
self
.
rpc
.
call
(
'undoInfo'
,
first
,
last
,
spec
)
def
versionEmpty
(
self
,
vers
):
return
self
.
rpc
.
call
(
'versionEmpty'
,
vers
)
def
versions
(
self
,
max
=
None
):
if
max
is
None
:
return
self
.
rpc
.
call
(
'versions'
)
else
:
return
self
.
rpc
.
call
(
'versions'
,
max
)
trunk/src/ZEO/StorageServer.py
View file @
0431ca7b
This diff is collapsed.
Click to expand it.
trunk/src/ZEO/TransactionBuffer.py
deleted
100644 → 0
View file @
1dbb6eed
"""A TransactionBuffer store transaction updates until commit or abort.
A transaction may generate enough data that it is not practical to
always hold pending updates in memory. Instead, a TransactionBuffer
is used to store the data until a commit or abort.
"""
# XXX Figure out what a sensible storage format is
# XXX A faster implementation might store trans data in memory until
# it reaches a certain size.
import
tempfile
import
cPickle
class
TransactionBuffer
:
def
__init__
(
self
):
self
.
file
=
tempfile
.
TemporaryFile
()
self
.
count
=
0
self
.
size
=
0
# It's safe to use a fast pickler because the only objects
# stored are builtin types -- strings or None.
self
.
pickler
=
cPickle
.
Pickler
(
self
.
file
,
1
)
self
.
pickler
.
fast
=
1
def
store
(
self
,
oid
,
version
,
data
):
"""Store oid, version, data for later retrieval"""
self
.
pickler
.
dump
((
oid
,
version
,
data
))
self
.
count
+=
1
# Estimate per-record cache size
self
.
size
=
self
.
size
+
len
(
data
)
+
(
27
+
12
)
if
version
:
self
.
size
=
self
.
size
+
len
(
version
)
+
4
def
invalidate
(
self
,
oid
,
version
):
self
.
pickler
.
dump
((
oid
,
version
,
None
))
self
.
count
+=
1
def
clear
(
self
):
"""Mark the buffer as empty"""
self
.
file
.
seek
(
0
)
self
.
count
=
0
self
.
size
=
0
# XXX unchecked constraints:
# 1. can't call store() after begin_iterate()
# 2. must call clear() after iteration finishes
def
begin_iterate
(
self
):
"""Move the file pointer in advance of iteration"""
self
.
file
.
flush
()
self
.
file
.
seek
(
0
)
self
.
unpickler
=
cPickle
.
Unpickler
(
self
.
file
)
def
next
(
self
):
"""Return next tuple of data or None if EOF"""
if
self
.
count
==
0
:
del
self
.
unpickler
return
None
oid_ver_data
=
self
.
unpickler
.
load
()
self
.
count
-=
1
return
oid_ver_data
def
get_size
(
self
):
"""Return size of data stored in buffer (just a hint)."""
return
self
.
size
trunk/src/ZEO/smac.py
View file @
0431ca7b
...
...
@@ -85,14 +85,11 @@
"""Sized message async connections
"""
__version__
=
"$Revision: 1.12 $"
[
11
:
-
2
]
import
asyncore
,
struct
from
Exceptions
import
Disconnected
from
zLOG
import
LOG
,
TRACE
,
ERROR
,
INFO
,
BLATHER
from
types
import
StringType
__version__
=
"$Revision: 1.13 $"
[
11
:
-
2
]
import
asyncore
,
string
,
struct
,
zLOG
,
sys
,
Acquisition
import
socket
,
errno
from
zLOG
import
LOG
,
TRACE
,
ERROR
,
INFO
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
...
...
@@ -112,101 +109,81 @@ tmp_dict = {errno.EAGAIN: 0,
expected_socket_write_errors
=
tuple
(
tmp_dict
.
keys
())
del
tmp_dict
class
SizedMessageAsyncConnection
(
asyncore
.
dispatcher
):
__super_init
=
asyncore
.
dispatcher
.
__init__
__super_close
=
asyncore
.
dispatcher
.
close
__closed
=
1
# Marker indicating that we're closed
class
SizedMessageAsyncConnection
(
Acquisition
.
Explicit
,
asyncore
.
dispatcher
):
socket
=
None
# to outwit Sam's getattr
__append
=
None
# Marker indicating that we're closed
READ_SIZE
=
8096
socket
=
None
# to outwit Sam's getattr
def
__init__
(
self
,
sock
,
addr
,
map
=
None
,
debug
=
None
):
self
.
__super_init
(
sock
,
map
)
self
.
addr
=
addr
SizedMessageAsyncConnection
.
inheritedAttribute
(
'__init__'
)(
self
,
sock
,
map
)
self
.
addr
=
addr
if
debug
is
not
None
:
self
.
_debug
=
debug
self
.
_debug
=
debug
elif
not
hasattr
(
self
,
'_debug'
):
self
.
_debug
=
__debug__
and
'smac'
self
.
__state
=
None
self
.
__inp
=
None
# None, a single String, or a list
self
.
__input_len
=
0
self
.
__msg_size
=
4
self
.
__output
=
[]
self
.
__closed
=
None
# XXX avoid expensive getattr calls?
def
__nonzero__
(
self
):
return
1
def
handle_read
(
self
):
# Use a single __inp buffer and integer indexes to make this
# fast.
self
.
_debug
=
__debug__
and
'smac'
self
.
__state
=
None
self
.
__inp
=
None
self
.
__inpl
=
0
self
.
__l
=
4
self
.
__output
=
output
=
[]
self
.
__append
=
output
.
append
self
.
__pop
=
output
.
pop
def
handle_read
(
self
,
join
=
string
.
join
,
StringType
=
type
(
''
),
_type
=
type
,
_None
=
None
):
try
:
d
=
self
.
recv
(
8096
)
except
socket
.
error
,
err
:
if
err
[
0
]
in
expected_socket_read_errors
:
return
raise
if
not
d
:
return
input_len
=
self
.
__input_len
+
len
(
d
)
msg_size
=
self
.
__msg_size
state
=
self
.
__state
inp
=
self
.
__inp
if
msg_size
>
input_len
:
if
inp
is
None
:
self
.
__inp
=
d
elif
type
(
self
.
__inp
)
is
StringType
:
self
.
__inp
=
[
self
.
__inp
,
d
]
else
:
self
.
__inp
.
append
(
d
)
self
.
__input_len
=
input_len
return
# keep waiting for more input
# load all previous input and d into single string inp
if
isinstance
(
inp
,
StringType
):
inp
=
inp
+
d
elif
inp
is
None
:
inp
=
d
if
not
d
:
return
inp
=
self
.
__inp
if
inp
is
_None
:
inp
=
d
elif
_type
(
inp
)
is
StringType
:
inp
=
[
inp
,
d
]
else
:
inp
.
append
(
d
)
inp
=
""
.
join
(
inp
)
offset
=
0
while
(
offset
+
msg_size
)
<=
input_len
:
msg
=
inp
[
offset
:
offset
+
msg_size
]
offset
=
offset
+
msg_size
if
state
is
None
:
# waiting for message
msg_size
=
struct
.
unpack
(
">i"
,
msg
)[
0
]
state
=
1
inpl
=
self
.
__inpl
+
len
(
d
)
l
=
self
.
__l
while
1
:
if
l
<=
inpl
:
# Woo hoo, we have enough data
if
_type
(
inp
)
is
not
StringType
:
inp
=
join
(
inp
,
''
)
d
=
inp
[:
l
]
inp
=
inp
[
l
:]
inpl
=
inpl
-
l
if
self
.
__state
is
_None
:
# waiting for message
l
=
struct
.
unpack
(
">i"
,
d
)[
0
]
self
.
__state
=
1
else
:
l
=
4
self
.
__state
=
_None
self
.
message_input
(
d
)
else
:
msg_size
=
4
state
=
None
self
.
message_input
(
msg
)
break
# not enough data
self
.
__l
=
l
self
.
__inp
=
inp
self
.
__inpl
=
inpl
self
.
__state
=
state
self
.
__msg_size
=
msg_size
self
.
__inp
=
inp
[
offset
:]
self
.
__input_len
=
input_len
-
offset
def
readable
(
self
):
return
1
def
writable
(
self
):
if
len
(
self
.
__output
)
==
0
:
return
0
else
:
return
1
def
readable
(
self
):
return
1
def
writable
(
self
):
return
not
not
self
.
__output
def
handle_write
(
self
):
output
=
self
.
__output
output
=
self
.
__output
while
output
:
v
=
output
[
0
]
v
=
output
[
0
]
try
:
n
=
self
.
send
(
v
)
except
socket
.
error
,
err
:
...
...
@@ -214,33 +191,42 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
break
# we couldn't write anything
raise
if
n
<
len
(
v
):
output
[
0
]
=
v
[
n
:]
output
[
0
]
=
v
[
n
:]
break
# we can't write any more
else
:
del
output
[
0
]
#break # waaa
def
handle_close
(
self
):
self
.
close
()
def
message_output
(
self
,
message
):
if
__debug__
:
if
self
.
_debug
:
if
len
(
message
)
>
40
:
m
=
message
[:
40
]
+
' ...'
else
:
m
=
message
LOG
(
self
.
_debug
,
TRACE
,
'message_output %s'
%
`m`
)
if
self
.
__closed
is
not
None
:
raise
Disconnected
,
(
"This action is temporarily unavailable."
"<p>"
)
# do two separate appends to avoid copying the message string
self
.
__output
.
append
(
struct
.
pack
(
">i"
,
len
(
message
)))
self
.
__output
.
append
(
message
)
def
message_output
(
self
,
message
,
pack
=
struct
.
pack
,
len
=
len
):
if
self
.
_debug
:
if
len
(
message
)
>
40
:
m
=
message
[:
40
]
+
' ...'
else
:
m
=
message
LOG
(
self
.
_debug
,
TRACE
,
'message_output %s'
%
`m`
)
append
=
self
.
__append
if
append
is
None
:
raise
Disconnected
(
"This action is temporarily unavailable.<p>"
)
append
(
pack
(
">i"
,
len
(
message
))
+
message
)
def
log_info
(
self
,
message
,
type
=
'info'
):
if
type
==
'error'
:
type
=
ERROR
else
:
type
=
INFO
LOG
(
'ZEO'
,
type
,
message
)
log
=
log_info
def
close
(
self
):
if
self
.
__closed
is
None
:
self
.
__closed
=
1
self
.
__super_close
()
if
self
.
__append
is
not
None
:
self
.
__append
=
None
SizedMessageAsyncConnection
.
inheritedAttribute
(
'close'
)(
self
)
class
Disconnected
(
Exception
):
"""The client has become disconnected from the server
"""
trunk/src/ZEO/start.py
View file @
0431ca7b
...
...
@@ -86,13 +86,10 @@
"""Start the server storage.
"""
__version__
=
"$Revision: 1.2
7
$"
[
11
:
-
2
]
__version__
=
"$Revision: 1.2
8
$"
[
11
:
-
2
]
import
sys
,
os
,
getopt
,
string
import
StorageServer
import
asyncore
def
directory
(
p
,
n
=
1
):
d
=
p
while
n
:
...
...
@@ -118,11 +115,9 @@ def get_storage(m, n, cache={}):
def
main
(
argv
):
me
=
argv
[
0
]
sys
.
path
[:]
==
filter
(
None
,
sys
.
path
)
sys
.
path
.
insert
(
0
,
directory
(
me
,
2
))
# XXX hack for profiling support
global
unix
,
storages
,
zeo_pid
,
asyncore
args
=
[]
last
=
''
for
a
in
argv
[
1
:]:
...
...
@@ -135,13 +130,25 @@ def main(argv):
args
.
append
(
a
)
last
=
a
INSTANCE_HOME
=
os
.
environ
.
get
(
'INSTANCE_HOME'
,
directory
(
me
,
4
))
if
os
.
environ
.
has_key
(
'INSTANCE_HOME'
):
INSTANCE_HOME
=
os
.
environ
[
'INSTANCE_HOME'
]
elif
os
.
path
.
isdir
(
os
.
path
.
join
(
directory
(
me
,
4
),
'var'
)):
INSTANCE_HOME
=
directory
(
me
,
4
)
else
:
INSTANCE_HOME
=
os
.
getcwd
()
if
os
.
path
.
isdir
(
os
.
path
.
join
(
INSTANCE_HOME
,
'var'
)):
var
=
os
.
path
.
join
(
INSTANCE_HOME
,
'var'
)
else
:
var
=
INSTANCE_HOME
zeo_pid
=
os
.
environ
.
get
(
'ZEO_SERVER_PID'
,
os
.
path
.
join
(
INSTANCE_HOME
,
'var'
,
'ZEO_SERVER.pid'
)
os
.
path
.
join
(
var
,
'ZEO_SERVER.pid'
)
)
fs
=
os
.
path
.
join
(
INSTANCE_HOME
,
'var'
,
'Data.fs'
)
opts
,
args
=
getopt
.
getopt
(
args
,
'p:Ddh:U:sS:u:'
)
fs
=
os
.
path
.
join
(
var
,
'Data.fs'
)
usage
=
"""%s [options] [filename]
...
...
@@ -149,14 +156,17 @@ def main(argv):
-D -- Run in debug mode
-d -- Generate detailed debug logging without running
in the foreground.
-U -- Unix-domain socket file to listen on
-u username or uid number
The username to run the ZEO server as. You may want to run
the ZEO server as 'nobody' or some other user with limited
resouces. The only works under Unix, and if
ZServer is
started by root.
resouces. The only works under Unix, and if
the storage
s
erver is s
tarted by root.
-p port -- port to listen on
...
...
@@ -179,42 +189,23 @@ def main(argv):
attr_name -- This is the name to which the storage object
is assigned in the module.
-P file -- Run under profile and dump output to file. Implies the
-s flag.
if no file name is specified, then %s is used.
"""
%
(
me
,
fs
)
try
:
opts
,
args
=
getopt
.
getopt
(
args
,
'p:Dh:U:sS:u:P:'
)
except
getopt
.
error
,
msg
:
print
usage
print
msg
sys
.
exit
(
1
)
port
=
None
debug
=
0
debug
=
detailed
=
0
host
=
''
unix
=
None
Z
=
1
UID
=
'nobody'
prof
=
None
for
o
,
v
in
opts
:
if
o
==
'-p'
:
port
=
string
.
atoi
(
v
)
elif
o
==
'-h'
:
host
=
v
elif
o
==
'-U'
:
unix
=
v
elif
o
==
'-u'
:
UID
=
v
elif
o
==
'-D'
:
debug
=
1
elif
o
==
'-d'
:
detailed
=
1
elif
o
==
'-s'
:
Z
=
0
elif
o
==
'-P'
:
prof
=
v
if
prof
:
Z
=
0
try
:
from
ZServer.medusa
import
asyncore
sys
.
modules
[
'asyncore'
]
=
asyncore
except
:
pass
if
port
is
None
and
unix
is
None
:
print
usage
...
...
@@ -228,9 +219,10 @@ def main(argv):
sys
.
exit
(
1
)
fs
=
args
[
0
]
__builtins__
.
__debug__
=
debug
if
debug
:
os
.
environ
[
'Z_DEBUG_MODE'
]
=
'1'
if
detailed
:
os
.
environ
[
'STUPID_LOG_SEVERITY'
]
=
'-99999'
from
zLOG
import
LOG
,
INFO
,
ERROR
# Try to set uid to "-u" -provided uid.
...
...
@@ -271,54 +263,71 @@ def main(argv):
import
zdaemon
zdaemon
.
run
(
sys
.
argv
,
''
)
storages
=
{}
for
o
,
v
in
opts
:
if
o
==
'-S'
:
n
,
m
=
string
.
split
(
v
,
'='
)
if
string
.
find
(
m
,
':'
):
# we got an attribute name
m
,
a
=
string
.
split
(
m
,
':'
)
else
:
# attribute name must be same as storage name
a
=
n
storages
[
n
]
=
get_storage
(
m
,
a
)
try
:
if
not
storages
:
import
ZODB.FileStorage
storages
[
'1'
]
=
ZODB
.
FileStorage
.
FileStorage
(
fs
)
import
ZEO.StorageServer
,
asyncore
storages
=
{}
for
o
,
v
in
opts
:
if
o
==
'-S'
:
n
,
m
=
string
.
split
(
v
,
'='
)
if
string
.
find
(
m
,
':'
):
# we got an attribute name
m
,
a
=
string
.
split
(
m
,
':'
)
else
:
# attribute name must be same as storage name
a
=
n
storages
[
n
]
=
get_storage
(
m
,
a
)
if
not
storages
:
import
ZODB.FileStorage
storages
[
'1'
]
=
ZODB
.
FileStorage
.
FileStorage
(
fs
)
# Try to set up a signal handler
try
:
import
signal
signal
.
signal
(
signal
.
SIGTERM
,
lambda
sig
,
frame
,
s
=
storages
:
shutdown
(
s
)
)
signal
.
signal
(
signal
.
SIGINT
,
lambda
sig
,
frame
,
s
=
storages
:
shutdown
(
s
,
0
)
)
try
:
signal
.
signal
(
signal
.
SIGHUP
,
rotate_logs_handler
)
except
:
pass
except
:
pass
items
=
storages
.
items
()
items
.
sort
()
for
kv
in
items
:
LOG
(
'ZEO Server'
,
INFO
,
'Serving %s:
\
t
%s'
%
kv
)
if
not
unix
:
unix
=
host
,
port
ZEO
.
StorageServer
.
StorageServer
(
unix
,
storages
)
try
:
ppid
,
pid
=
os
.
getppid
(),
os
.
getpid
()
except
:
pass
# getpid not supported
else
:
open
(
zeo_pid
,
'w'
).
write
(
"%s %s"
%
(
ppid
,
pid
))
except
:
# Log startup exception and tell zdaemon not to restart us.
info
=
sys
.
exc_info
()
try
:
import
zLOG
zLOG
.
LOG
(
"z2"
,
zLOG
.
PANIC
,
"Startup exception"
,
error
=
info
)
except
:
pass
import
traceback
apply
(
traceback
.
print_exception
,
info
)
sys
.
exit
(
0
)
asyncore
.
loop
()
# Try to set up a signal handler
try
:
import
signal
signal
.
signal
(
signal
.
SIGTERM
,
lambda
sig
,
frame
,
s
=
storages
:
shutdown
(
s
)
)
signal
.
signal
(
signal
.
SIGINT
,
lambda
sig
,
frame
,
s
=
storages
:
shutdown
(
s
,
0
)
)
signal
.
signal
(
signal
.
SIGHUP
,
rotate_logs_handler
)
finally
:
pass
items
=
storages
.
items
()
items
.
sort
()
for
kv
in
items
:
LOG
(
'ZEO Server'
,
INFO
,
'Serving %s:
\
t
%s'
%
kv
)
if
not
unix
:
unix
=
host
,
port
if
prof
:
cmds
=
\
"StorageServer.StorageServer(unix, storages);"
\
'open(zeo_pid,"w").write("%s %s" % (os.getppid(), os.getpid()));'
\
"asyncore.loop()"
import
profile
profile
.
run
(
cmds
,
prof
)
else
:
StorageServer
.
StorageServer
(
unix
,
storages
)
open
(
zeo_pid
,
'w'
).
write
(
"%s %s"
%
(
os
.
getppid
(),
os
.
getpid
()))
asyncore
.
loop
()
def
rotate_logs
():
import
zLOG
...
...
@@ -326,7 +335,10 @@ def rotate_logs():
zLOG
.
log_write
.
reinitialize
()
else
:
# Hm, lets at least try to take care of the stupid logger:
zLOG
.
_stupid_dest
=
None
if
hasattr
(
zLOG
,
'_set_stupid_dest'
):
zLOG
.
_set_stupid_dest
(
None
)
else
:
zLOG
.
_stupid_dest
=
None
def
rotate_logs_handler
(
signum
,
frame
):
rotate_logs
()
...
...
@@ -347,7 +359,7 @@ def shutdown(storages, die=1):
for
storage
in
storages
.
values
():
try
:
storage
.
close
()
finally
:
pass
except
:
pass
try
:
from
zLOG
import
LOG
,
INFO
...
...
trunk/src/ZEO/tests/forker.py
View file @
0431ca7b
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this
# distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
"""Library for forking storage server and connecting client storage"""
import
asyncore
import
os
import
profile
import
random
import
socket
import
sys
import
traceback
import
types
import
ZEO.ClientStorage
,
ZEO
.
StorageServer
# Change value of PROFILE to enable server-side profiling
PROFILE
=
0
if
PROFILE
:
import
hotshot
def
get_port
():
"""Return a port that is not in use.
...
...
@@ -78,11 +66,9 @@ else:
buf
=
self
.
recv
(
4
)
if
buf
:
assert
buf
==
"done"
server
.
close_server
()
asyncore
.
socket_map
.
clear
()
def
handle_close
(
self
):
server
.
close_server
()
asyncore
.
socket_map
.
clear
()
class
ZEOClientExit
:
...
...
@@ -91,27 +77,20 @@ else:
self
.
pipe
=
pipe
def
close
(
self
):
try
:
os
.
write
(
self
.
pipe
,
"done"
)
os
.
close
(
self
.
pipe
)
except
os
.
error
:
pass
os
.
write
(
self
.
pipe
,
"done"
)
os
.
close
(
self
.
pipe
)
def
start_zeo_server
(
storage
,
addr
):
rd
,
wr
=
os
.
pipe
()
pid
=
os
.
fork
()
if
pid
==
0
:
try
:
if
PROFILE
:
p
=
hotshot
.
Profile
(
"stats.s.%d"
%
os
.
getpid
())
p
.
runctx
(
"run_server(storage, addr, rd, wr)"
,
globals
(),
locals
())
p
.
close
()
else
:
run_server
(
storage
,
addr
,
rd
,
wr
)
except
:
print
"Exception in ZEO server process"
traceback
.
print_exc
()
if
PROFILE
:
p
=
profile
.
Profile
()
p
.
runctx
(
"run_server(storage, addr, rd, wr)"
,
globals
(),
locals
())
p
.
dump_stats
(
"stats.s.%d"
%
os
.
getpid
())
else
:
run_server
(
storage
,
addr
,
rd
,
wr
)
os
.
_exit
(
0
)
else
:
os
.
close
(
rd
)
...
...
@@ -119,11 +98,11 @@ else:
def
run_server
(
storage
,
addr
,
rd
,
wr
):
# in the child, run the storage server
global
server
os
.
close
(
wr
)
ZEOServerExit
(
rd
)
serv
er
=
ZEO
.
StorageServer
.
StorageServer
(
addr
,
{
'1'
:
storage
})
serv
=
ZEO
.
StorageServer
.
StorageServer
(
addr
,
{
'1'
:
storage
})
asyncore
.
loop
()
os
.
close
(
rd
)
storage
.
close
()
if
isinstance
(
addr
,
types
.
StringType
):
os
.
unlink
(
addr
)
...
...
@@ -149,7 +128,6 @@ else:
s
=
ZEO
.
ClientStorage
.
ClientStorage
(
addr
,
storage_id
,
debug
=
1
,
client
=
cache
,
cache_size
=
cache_size
,
min_disconnect_poll
=
0.5
,
wait_for_server_on_startup
=
1
)
min_disconnect_poll
=
0.5
)
return
s
,
exit
,
pid
trunk/src/ZEO/tests/testTransactionBuffer.py
deleted
100644 → 0
View file @
1dbb6eed
import
random
import
unittest
from
ZEO.TransactionBuffer
import
TransactionBuffer
def
random_string
(
size
):
"""Return a random string of size size."""
l
=
[
chr
(
random
.
randrange
(
256
))
for
i
in
range
(
size
)]
return
""
.
join
(
l
)
def
new_store_data
():
"""Return arbitrary data to use as argument to store() method."""
return
random_string
(
8
),
''
,
random_string
(
random
.
randrange
(
1000
))
def
new_invalidate_data
():
"""Return arbitrary data to use as argument to invalidate() method."""
return
random_string
(
8
),
''
class
TransBufTests
(
unittest
.
TestCase
):
def
checkTypicalUsage
(
self
):
tbuf
=
TransactionBuffer
()
tbuf
.
store
(
*
new_store_data
())
tbuf
.
invalidate
(
*
new_invalidate_data
())
tbuf
.
begin_iterate
()
while
1
:
o
=
tbuf
.
next
()
if
o
is
None
:
break
tbuf
.
clear
()
def
doUpdates
(
self
,
tbuf
):
data
=
[]
for
i
in
range
(
10
):
d
=
new_store_data
()
tbuf
.
store
(
*
d
)
data
.
append
(
d
)
d
=
new_invalidate_data
()
tbuf
.
invalidate
(
*
d
)
data
.
append
(
d
)
tbuf
.
begin_iterate
()
for
i
in
range
(
len
(
data
)):
x
=
tbuf
.
next
()
if
x
[
2
]
is
None
:
# the tbuf add a dummy None to invalidates
x
=
x
[:
2
]
self
.
assertEqual
(
x
,
data
[
i
])
def
checkOrderPreserved
(
self
):
tbuf
=
TransactionBuffer
()
self
.
doUpdates
(
tbuf
)
def
checkReusable
(
self
):
tbuf
=
TransactionBuffer
()
self
.
doUpdates
(
tbuf
)
tbuf
.
clear
()
self
.
doUpdates
(
tbuf
)
tbuf
.
clear
()
self
.
doUpdates
(
tbuf
)
def
test_suite
():
return
unittest
.
makeSuite
(
TransBufTests
,
'check'
)
trunk/src/ZEO/zrpc/smac.py
View file @
0431ca7b
...
...
@@ -85,14 +85,11 @@
"""Sized message async connections
"""
__version__
=
"$Revision: 1.12 $"
[
11
:
-
2
]
import
asyncore
,
struct
from
Exceptions
import
Disconnected
from
zLOG
import
LOG
,
TRACE
,
ERROR
,
INFO
,
BLATHER
from
types
import
StringType
__version__
=
"$Revision: 1.13 $"
[
11
:
-
2
]
import
asyncore
,
string
,
struct
,
zLOG
,
sys
,
Acquisition
import
socket
,
errno
from
zLOG
import
LOG
,
TRACE
,
ERROR
,
INFO
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
...
...
@@ -112,101 +109,81 @@ tmp_dict = {errno.EAGAIN: 0,
expected_socket_write_errors
=
tuple
(
tmp_dict
.
keys
())
del
tmp_dict
class
SizedMessageAsyncConnection
(
asyncore
.
dispatcher
):
__super_init
=
asyncore
.
dispatcher
.
__init__
__super_close
=
asyncore
.
dispatcher
.
close
__closed
=
1
# Marker indicating that we're closed
class
SizedMessageAsyncConnection
(
Acquisition
.
Explicit
,
asyncore
.
dispatcher
):
socket
=
None
# to outwit Sam's getattr
__append
=
None
# Marker indicating that we're closed
READ_SIZE
=
8096
socket
=
None
# to outwit Sam's getattr
def
__init__
(
self
,
sock
,
addr
,
map
=
None
,
debug
=
None
):
self
.
__super_init
(
sock
,
map
)
self
.
addr
=
addr
SizedMessageAsyncConnection
.
inheritedAttribute
(
'__init__'
)(
self
,
sock
,
map
)
self
.
addr
=
addr
if
debug
is
not
None
:
self
.
_debug
=
debug
self
.
_debug
=
debug
elif
not
hasattr
(
self
,
'_debug'
):
self
.
_debug
=
__debug__
and
'smac'
self
.
__state
=
None
self
.
__inp
=
None
# None, a single String, or a list
self
.
__input_len
=
0
self
.
__msg_size
=
4
self
.
__output
=
[]
self
.
__closed
=
None
# XXX avoid expensive getattr calls?
def
__nonzero__
(
self
):
return
1
def
handle_read
(
self
):
# Use a single __inp buffer and integer indexes to make this
# fast.
self
.
_debug
=
__debug__
and
'smac'
self
.
__state
=
None
self
.
__inp
=
None
self
.
__inpl
=
0
self
.
__l
=
4
self
.
__output
=
output
=
[]
self
.
__append
=
output
.
append
self
.
__pop
=
output
.
pop
def
handle_read
(
self
,
join
=
string
.
join
,
StringType
=
type
(
''
),
_type
=
type
,
_None
=
None
):
try
:
d
=
self
.
recv
(
8096
)
except
socket
.
error
,
err
:
if
err
[
0
]
in
expected_socket_read_errors
:
return
raise
if
not
d
:
return
input_len
=
self
.
__input_len
+
len
(
d
)
msg_size
=
self
.
__msg_size
state
=
self
.
__state
inp
=
self
.
__inp
if
msg_size
>
input_len
:
if
inp
is
None
:
self
.
__inp
=
d
elif
type
(
self
.
__inp
)
is
StringType
:
self
.
__inp
=
[
self
.
__inp
,
d
]
else
:
self
.
__inp
.
append
(
d
)
self
.
__input_len
=
input_len
return
# keep waiting for more input
# load all previous input and d into single string inp
if
isinstance
(
inp
,
StringType
):
inp
=
inp
+
d
elif
inp
is
None
:
inp
=
d
if
not
d
:
return
inp
=
self
.
__inp
if
inp
is
_None
:
inp
=
d
elif
_type
(
inp
)
is
StringType
:
inp
=
[
inp
,
d
]
else
:
inp
.
append
(
d
)
inp
=
""
.
join
(
inp
)
offset
=
0
while
(
offset
+
msg_size
)
<=
input_len
:
msg
=
inp
[
offset
:
offset
+
msg_size
]
offset
=
offset
+
msg_size
if
state
is
None
:
# waiting for message
msg_size
=
struct
.
unpack
(
">i"
,
msg
)[
0
]
state
=
1
inpl
=
self
.
__inpl
+
len
(
d
)
l
=
self
.
__l
while
1
:
if
l
<=
inpl
:
# Woo hoo, we have enough data
if
_type
(
inp
)
is
not
StringType
:
inp
=
join
(
inp
,
''
)
d
=
inp
[:
l
]
inp
=
inp
[
l
:]
inpl
=
inpl
-
l
if
self
.
__state
is
_None
:
# waiting for message
l
=
struct
.
unpack
(
">i"
,
d
)[
0
]
self
.
__state
=
1
else
:
l
=
4
self
.
__state
=
_None
self
.
message_input
(
d
)
else
:
msg_size
=
4
state
=
None
self
.
message_input
(
msg
)
break
# not enough data
self
.
__l
=
l
self
.
__inp
=
inp
self
.
__inpl
=
inpl
self
.
__state
=
state
self
.
__msg_size
=
msg_size
self
.
__inp
=
inp
[
offset
:]
self
.
__input_len
=
input_len
-
offset
def
readable
(
self
):
return
1
def
writable
(
self
):
if
len
(
self
.
__output
)
==
0
:
return
0
else
:
return
1
def
readable
(
self
):
return
1
def
writable
(
self
):
return
not
not
self
.
__output
def
handle_write
(
self
):
output
=
self
.
__output
output
=
self
.
__output
while
output
:
v
=
output
[
0
]
v
=
output
[
0
]
try
:
n
=
self
.
send
(
v
)
except
socket
.
error
,
err
:
...
...
@@ -214,33 +191,42 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
break
# we couldn't write anything
raise
if
n
<
len
(
v
):
output
[
0
]
=
v
[
n
:]
output
[
0
]
=
v
[
n
:]
break
# we can't write any more
else
:
del
output
[
0
]
#break # waaa
def
handle_close
(
self
):
self
.
close
()
def
message_output
(
self
,
message
):
if
__debug__
:
if
self
.
_debug
:
if
len
(
message
)
>
40
:
m
=
message
[:
40
]
+
' ...'
else
:
m
=
message
LOG
(
self
.
_debug
,
TRACE
,
'message_output %s'
%
`m`
)
if
self
.
__closed
is
not
None
:
raise
Disconnected
,
(
"This action is temporarily unavailable."
"<p>"
)
# do two separate appends to avoid copying the message string
self
.
__output
.
append
(
struct
.
pack
(
">i"
,
len
(
message
)))
self
.
__output
.
append
(
message
)
def
message_output
(
self
,
message
,
pack
=
struct
.
pack
,
len
=
len
):
if
self
.
_debug
:
if
len
(
message
)
>
40
:
m
=
message
[:
40
]
+
' ...'
else
:
m
=
message
LOG
(
self
.
_debug
,
TRACE
,
'message_output %s'
%
`m`
)
append
=
self
.
__append
if
append
is
None
:
raise
Disconnected
(
"This action is temporarily unavailable.<p>"
)
append
(
pack
(
">i"
,
len
(
message
))
+
message
)
def
log_info
(
self
,
message
,
type
=
'info'
):
if
type
==
'error'
:
type
=
ERROR
else
:
type
=
INFO
LOG
(
'ZEO'
,
type
,
message
)
log
=
log_info
def
close
(
self
):
if
self
.
__closed
is
None
:
self
.
__closed
=
1
self
.
__super_close
()
if
self
.
__append
is
not
None
:
self
.
__append
=
None
SizedMessageAsyncConnection
.
inheritedAttribute
(
'close'
)(
self
)
class
Disconnected
(
Exception
):
"""The client has become disconnected from the server
"""
trunk/src/ZEO/zrpc2.py
deleted
100644 → 0
View file @
1dbb6eed
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment