Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZODB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Nicolas Wavrant
ZODB
Commits
32bc881f
Commit
32bc881f
authored
Jan 09, 2003
by
Jeremy Hylton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Swap order of StorageServer and ZEOStorage to avoid patchup at the
end.
parent
330d1ca7
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
202 additions
and
205 deletions
+202
-205
src/ZEO/StorageServer.py
src/ZEO/StorageServer.py
+202
-205
No files found.
src/ZEO/StorageServer.py
View file @
32bc881f
...
...
@@ -56,208 +56,6 @@ def log(message, level=zLOG.INFO, label=None, error=None):
class
StorageServerError
(
StorageError
):
"""Error reported when an unpickleable exception is raised."""
class
StorageServer
:
"""The server side implementation of ZEO.
The StorageServer is the 'manager' for incoming connections. Each
connection is associated with its own ZEOStorage instance (defined
below). The StorageServer may handle multiple storages; each
ZEOStorage instance only handles a single storage.
"""
# Classes we instantiate. A subclass might override.
DispatcherClass
=
Dispatcher
ZEOStorageClass
=
None
# patched up later
ManagedServerConnectionClass
=
ManagedServerConnection
def
__init__
(
self
,
addr
,
storages
,
read_only
=
0
,
invalidation_queue_size
=
100
,
transaction_timeout
=
None
):
"""StorageServer constructor.
This is typically invoked from the start.py script.
Arguments (the first two are required and positional):
addr -- the address at which the server should listen. This
can be a tuple (host, port) to signify a TCP/IP connection
or a pathname string to signify a Unix domain socket
connection. A hostname may be a DNS name or a dotted IP
address.
storages -- a dictionary giving the storage(s) to handle. The
keys are the storage names, the values are the storage
instances, typically FileStorage or Berkeley storage
instances. By convention, storage names are typically
strings representing small integers starting at '1'.
read_only -- an optional flag saying whether the server should
operate in read-only mode. Defaults to false. Note that
even if the server is operating in writable mode,
individual storages may still be read-only. But if the
server is in read-only mode, no write operations are
allowed, even if the storages are writable. Note that
pack() is considered a read-only operation.
invalidation_queue_size -- The storage server keeps a queue
of the objects modified by the last N transactions, where
N == invalidation_queue_size. This queue is used to
speed client cache verification when a client disconnects
for a short period of time.
transaction_timout -- The maximum amount of time to wait for
a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection
will be closed and the transaction aborted.
"""
self
.
addr
=
addr
self
.
storages
=
storages
set_label
()
msg
=
", "
.
join
(
[
"%s:%s:%s"
%
(
name
,
storage
.
isReadOnly
()
and
"RO"
or
"RW"
,
storage
.
getName
())
for
name
,
storage
in
storages
.
items
()])
log
(
"%s created %s with storages: %s"
%
(
self
.
__class__
.
__name__
,
read_only
and
"RO"
or
"RW"
,
msg
))
for
s
in
storages
.
values
():
s
.
_waiting
=
[]
self
.
read_only
=
read_only
# A list of at most invalidation_queue_size invalidations
self
.
invq
=
[]
self
.
invq_bound
=
invalidation_queue_size
self
.
connections
=
{}
self
.
dispatcher
=
self
.
DispatcherClass
(
addr
,
factory
=
self
.
new_connection
,
reuse_addr
=
1
)
self
.
timeouts
=
{}
for
name
in
self
.
storages
.
keys
():
if
transaction_timeout
is
None
:
# An object with no-op methods
timeout
=
StubTimeoutThread
()
else
:
timeout
=
TimeoutThread
(
transaction_timeout
)
timeout
.
start
()
self
.
timeouts
[
name
]
=
timeout
def
new_connection
(
self
,
sock
,
addr
):
"""Internal: factory to create a new connection.
This is called by the Dispatcher class in ZEO.zrpc.server
whenever accept() returns a socket for a new incoming
connection.
"""
z
=
self
.
ZEOStorageClass
(
self
,
self
.
read_only
)
c
=
self
.
ManagedServerConnectionClass
(
sock
,
addr
,
z
,
self
)
log
(
"new connection %s: %s"
%
(
addr
,
`c`
))
return
c
def
register_connection
(
self
,
storage_id
,
conn
):
"""Internal: register a connection with a particular storage.
This is called by ZEOStorage.register().
The dictionary self.connections maps each storage name to a
list of current connections for that storage; this information
is needed to handle invalidation. This function updates this
dictionary.
Returns the timeout object for the appropriate storage.
"""
l
=
self
.
connections
.
get
(
storage_id
)
if
l
is
None
:
l
=
self
.
connections
[
storage_id
]
=
[]
l
.
append
(
conn
)
return
self
.
timeouts
[
storage_id
]
def
invalidate
(
self
,
conn
,
storage_id
,
tid
,
invalidated
=
(),
info
=
None
):
"""Internal: broadcast info and invalidations to clients.
This is called from several ZEOStorage methods.
This can do three different things:
- If the invalidated argument is non-empty, it broadcasts
invalidateTransaction() messages to all clients of the given
storage except the current client (the conn argument).
- If the invalidated argument is empty and the info argument
is a non-empty dictionary, it broadcasts info() messages to
all clients of the given storage, including the current
client.
- If both the invalidated argument and the info argument are
non-empty, it broadcasts invalidateTransaction() messages to all
clients except the current, and sends an info() message to
the current client.
"""
if
invalidated
:
if
len
(
self
.
invq
)
>=
self
.
invq_bound
:
del
self
.
invq
[
0
]
self
.
invq
.
append
((
tid
,
invalidated
))
for
p
in
self
.
connections
.
get
(
storage_id
,
()):
if
invalidated
and
p
is
not
conn
:
p
.
client
.
invalidateTransaction
(
tid
,
invalidated
)
elif
info
is
not
None
:
p
.
client
.
info
(
info
)
def
get_invalidations
(
self
,
tid
):
"""Return a tid and list of all objects invalidation since tid.
The tid is the most recent transaction id committed by the server.
Returns None if it is unable to provide a complete list
of invalidations for tid. In this case, client should
do full cache verification.
"""
if
not
self
.
invq
:
log
(
"invq empty"
)
return
None
,
[]
earliest_tid
=
self
.
invq
[
0
][
0
]
if
earliest_tid
>
tid
:
log
(
"tid to old for invq %s < %s"
%
(
u64
(
tid
),
u64
(
earliest_tid
)))
return
None
,
[]
oids
=
{}
for
tid
,
L
in
self
.
invq
:
for
key
in
L
:
oids
[
key
]
=
1
latest_tid
=
self
.
invq
[
-
1
][
0
]
return
latest_tid
,
oids
.
keys
()
def
close_server
(
self
):
"""Close the dispatcher so that there are no new connections.
This is only called from the test suite, AFAICT.
"""
for
timeout
in
self
.
timeouts
.
values
():
timeout
.
stop
()
self
.
dispatcher
.
close
()
for
storage
in
self
.
storages
.
values
():
storage
.
close
()
# Force the asyncore mainloop to exit by hackery, i.e. close
# every socket in the map. loop() will return when the map is
# empty.
for
s
in
asyncore
.
socket_map
.
values
():
try
:
s
.
close
()
except
:
pass
def
close_conn
(
self
,
conn
):
"""Internal: remove the given connection from self.connections.
This is the inverse of register_connection().
"""
for
cl
in
self
.
connections
.
values
():
if
conn
.
obj
in
cl
:
cl
.
remove
(
conn
.
obj
)
class
ZEOStorage
:
"""Proxy to underlying storage for a single remote client."""
...
...
@@ -727,6 +525,208 @@ class ZEOStorage:
else
:
return
1
class
StorageServer
:
"""The server side implementation of ZEO.
The StorageServer is the 'manager' for incoming connections. Each
connection is associated with its own ZEOStorage instance (defined
below). The StorageServer may handle multiple storages; each
ZEOStorage instance only handles a single storage.
"""
# Classes we instantiate. A subclass might override.
DispatcherClass
=
Dispatcher
ZEOStorageClass
=
ZEOStorage
ManagedServerConnectionClass
=
ManagedServerConnection
def
__init__
(
self
,
addr
,
storages
,
read_only
=
0
,
invalidation_queue_size
=
100
,
transaction_timeout
=
None
):
"""StorageServer constructor.
This is typically invoked from the start.py script.
Arguments (the first two are required and positional):
addr -- the address at which the server should listen. This
can be a tuple (host, port) to signify a TCP/IP connection
or a pathname string to signify a Unix domain socket
connection. A hostname may be a DNS name or a dotted IP
address.
storages -- a dictionary giving the storage(s) to handle. The
keys are the storage names, the values are the storage
instances, typically FileStorage or Berkeley storage
instances. By convention, storage names are typically
strings representing small integers starting at '1'.
read_only -- an optional flag saying whether the server should
operate in read-only mode. Defaults to false. Note that
even if the server is operating in writable mode,
individual storages may still be read-only. But if the
server is in read-only mode, no write operations are
allowed, even if the storages are writable. Note that
pack() is considered a read-only operation.
invalidation_queue_size -- The storage server keeps a queue
of the objects modified by the last N transactions, where
N == invalidation_queue_size. This queue is used to
speed client cache verification when a client disconnects
for a short period of time.
transaction_timout -- The maximum amount of time to wait for
a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection
will be closed and the transaction aborted.
"""
self
.
addr
=
addr
self
.
storages
=
storages
set_label
()
msg
=
", "
.
join
(
[
"%s:%s:%s"
%
(
name
,
storage
.
isReadOnly
()
and
"RO"
or
"RW"
,
storage
.
getName
())
for
name
,
storage
in
storages
.
items
()])
log
(
"%s created %s with storages: %s"
%
(
self
.
__class__
.
__name__
,
read_only
and
"RO"
or
"RW"
,
msg
))
for
s
in
storages
.
values
():
s
.
_waiting
=
[]
self
.
read_only
=
read_only
# A list of at most invalidation_queue_size invalidations
self
.
invq
=
[]
self
.
invq_bound
=
invalidation_queue_size
self
.
connections
=
{}
self
.
dispatcher
=
self
.
DispatcherClass
(
addr
,
factory
=
self
.
new_connection
,
reuse_addr
=
1
)
self
.
timeouts
=
{}
for
name
in
self
.
storages
.
keys
():
if
transaction_timeout
is
None
:
# An object with no-op methods
timeout
=
StubTimeoutThread
()
else
:
timeout
=
TimeoutThread
(
transaction_timeout
)
timeout
.
start
()
self
.
timeouts
[
name
]
=
timeout
def
new_connection
(
self
,
sock
,
addr
):
"""Internal: factory to create a new connection.
This is called by the Dispatcher class in ZEO.zrpc.server
whenever accept() returns a socket for a new incoming
connection.
"""
z
=
self
.
ZEOStorageClass
(
self
,
self
.
read_only
)
c
=
self
.
ManagedServerConnectionClass
(
sock
,
addr
,
z
,
self
)
log
(
"new connection %s: %s"
%
(
addr
,
`c`
))
return
c
def
register_connection
(
self
,
storage_id
,
conn
):
"""Internal: register a connection with a particular storage.
This is called by ZEOStorage.register().
The dictionary self.connections maps each storage name to a
list of current connections for that storage; this information
is needed to handle invalidation. This function updates this
dictionary.
Returns the timeout object for the appropriate storage.
"""
l
=
self
.
connections
.
get
(
storage_id
)
if
l
is
None
:
l
=
self
.
connections
[
storage_id
]
=
[]
l
.
append
(
conn
)
return
self
.
timeouts
[
storage_id
]
def
invalidate
(
self
,
conn
,
storage_id
,
tid
,
invalidated
=
(),
info
=
None
):
"""Internal: broadcast info and invalidations to clients.
This is called from several ZEOStorage methods.
This can do three different things:
- If the invalidated argument is non-empty, it broadcasts
invalidateTransaction() messages to all clients of the given
storage except the current client (the conn argument).
- If the invalidated argument is empty and the info argument
is a non-empty dictionary, it broadcasts info() messages to
all clients of the given storage, including the current
client.
- If both the invalidated argument and the info argument are
non-empty, it broadcasts invalidateTransaction() messages to all
clients except the current, and sends an info() message to
the current client.
"""
if
invalidated
:
if
len
(
self
.
invq
)
>=
self
.
invq_bound
:
del
self
.
invq
[
0
]
self
.
invq
.
append
((
tid
,
invalidated
))
for
p
in
self
.
connections
.
get
(
storage_id
,
()):
if
invalidated
and
p
is
not
conn
:
p
.
client
.
invalidateTransaction
(
tid
,
invalidated
)
elif
info
is
not
None
:
p
.
client
.
info
(
info
)
def
get_invalidations
(
self
,
tid
):
"""Return a tid and list of all objects invalidation since tid.
The tid is the most recent transaction id committed by the server.
Returns None if it is unable to provide a complete list
of invalidations for tid. In this case, client should
do full cache verification.
"""
if
not
self
.
invq
:
log
(
"invq empty"
)
return
None
,
[]
earliest_tid
=
self
.
invq
[
0
][
0
]
if
earliest_tid
>
tid
:
log
(
"tid to old for invq %s < %s"
%
(
u64
(
tid
),
u64
(
earliest_tid
)))
return
None
,
[]
oids
=
{}
for
tid
,
L
in
self
.
invq
:
for
key
in
L
:
oids
[
key
]
=
1
latest_tid
=
self
.
invq
[
-
1
][
0
]
return
latest_tid
,
oids
.
keys
()
def
close_server
(
self
):
"""Close the dispatcher so that there are no new connections.
This is only called from the test suite, AFAICT.
"""
for
timeout
in
self
.
timeouts
.
values
():
timeout
.
stop
()
self
.
dispatcher
.
close
()
for
storage
in
self
.
storages
.
values
():
storage
.
close
()
# Force the asyncore mainloop to exit by hackery, i.e. close
# every socket in the map. loop() will return when the map is
# empty.
for
s
in
asyncore
.
socket_map
.
values
():
try
:
s
.
close
()
except
:
pass
def
close_conn
(
self
,
conn
):
"""Internal: remove the given connection from self.connections.
This is the inverse of register_connection().
"""
for
cl
in
self
.
connections
.
values
():
if
conn
.
obj
in
cl
:
cl
.
remove
(
conn
.
obj
)
class
StubTimeoutThread
:
def
begin
(
self
,
client
):
...
...
@@ -844,6 +844,3 @@ class SlowMethodThread(threading.Thread):
self
.
delay
.
error
(
sys
.
exc_info
())
else
:
self
.
delay
.
reply
(
result
)
# Patch up class references
StorageServer
.
ZEOStorageClass
=
ZEOStorage
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment