Commit d9d3aba7 authored by Jim Fulton's avatar Jim Fulton

Rearranged, reformatted and added comments to make more

understandable.

Sped up startup a tiny bit by not doing synchronous get_info call on
startup. Instead, server will send info asynchrously on startup.
parent 78136615
...@@ -84,7 +84,7 @@ ...@@ -84,7 +84,7 @@
############################################################################## ##############################################################################
"""Network ZODB storage client """Network ZODB storage client
""" """
__version__='$Revision: 1.17 $'[11:-2] __version__='$Revision: 1.18 $'[11:-2]
import struct, time, os, socket, string, Sync, zrpc, ClientCache import struct, time, os, socket, string, Sync, zrpc, ClientCache
import tempfile, Invalidator, ExtensionClass, thread import tempfile, Invalidator, ExtensionClass, thread
...@@ -151,32 +151,90 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -151,32 +151,90 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
ThreadedAsync.register_loop_callback(self.becomeAsync) ThreadedAsync.register_loop_callback(self.becomeAsync)
# IMPORTANT: Note that we aren't fully "there" yet.
# In particular, we don't actually connect to the server
# until we have a controlling database set with registerDB
# below.
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB.
"""
# Among other things, we know that our data methods won't get
# called until after this call.
invalidator=Invalidator.Invalidator(
db.invalidate,
self._cache.invalidate)
def out_of_band_hook(
code, args,
get_hook={
'b': (invalidator.begin, 0),
'i': (invalidator.invalidate, 1),
'e': (invalidator.end, 0),
'I': (invalidator.Invalidate, 1),
'U': (self._commit_lock_release, 0),
's': (self._serials.append, 1),
'S': (self._info.update, 1),
}.get):
hook = get_hook(code, None)
if hook is None: return
hook, flag = hook
if flag: hook(args)
else: hook()
self._call.setOutOfBand(out_of_band_hook)
# Now that we have our callback system in place, we can
# try to connect
self._startup()
def _startup(self): def _startup(self):
if not self._call.connect(): if not self._call.connect():
# If we can't connect right away, go ahead and open the cache # If we can't connect right away, go ahead and open the cache
# and start a separate thread to try and reconnect. # and start a separate thread to try and reconnect.
LOG("ClientStorage", PROBLEM, "Failed to connect to storage") LOG("ClientStorage", PROBLEM, "Failed to connect to storage")
self._cache.open() self._cache.open()
thread.start_new_thread(self._call.connect,(0,)) thread.start_new_thread(self._call.connect,(0,))
# If the connect succeeds then this work will be done by
# notifyConnected
def notifyConnected(self, s): def notifyConnected(self, s):
LOG("ClientStorage", INFO, "Connected to storage") LOG("ClientStorage", INFO, "Connected to storage")
self._lock_acquire() self._lock_acquire()
try: try:
# We let the connection keep coming up now that # We let the connection keep coming up now that
# we have the storage lock. This way, we know no calls # we have the storage lock. This way, we know no calls
# will be made while in the process of coming up. # will be made while in the process of coming up.
self._call.finishConnect(s) self._call.finishConnect(s)
self._connected=1 self._connected=1
self._oids=[] self._oids=[]
# we do synchronous commits until we are sure that
# we have and are ready for a main loop.
# Hm. This is a little silly. If self._async, then
# we will really never do a synchronous commit.
# See below.
self.__begin='tpc_begin_sync' self.__begin='tpc_begin_sync'
self._call.message_output(str(self._storage)) self._call.message_output(str(self._storage))
self._info.update(self._call('get_info'))
### This seems silly. We should get the info asynchronously.
# self._info.update(self._call('get_info'))
cached=self._cache.open() cached=self._cache.open()
### This is a little expensive for large caches
if cached: if cached:
self._call.sendMessage('beginZeoVerify') self._call.sendMessage('beginZeoVerify')
for oid, (s, vs) in cached: for oid, (s, vs) in cached:
...@@ -189,6 +247,19 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -189,6 +247,19 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
import ZServer.medusa.asyncore import ZServer.medusa.asyncore
self.becomeAsync(ZServer.medusa.asyncore.socket_map) self.becomeAsync(ZServer.medusa.asyncore.socket_map)
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
### notifyDisconnected in the middle of notifyConnected?
### The danger is that we'll proceed as if we were connected
### without worrying if we were, but this would happen any way if
### notifyDisconnected had to get the instance lock. There's
### nothing to gain by getting the instance lock.
### Note that we *don't* have to worry about getting connected
### in the middle of notifyDisconnected, because *it's*
### responsible for starting the thread that makes the connection.
def notifyDisconnected(self, ignored): def notifyDisconnected(self, ignored):
LOG("ClientStorage", PROBLEM, "Disconnected from storage") LOG("ClientStorage", PROBLEM, "Disconnected from storage")
self._connected=0 self._connected=0
...@@ -209,34 +280,6 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -209,34 +280,6 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
self.__begin='tpc_begin' self.__begin='tpc_begin'
finally: self._lock_release() finally: self._lock_release()
def registerDB(self, db, limit):
invalidator=Invalidator.Invalidator(
db.invalidate,
self._cache.invalidate)
def out_of_band_hook(
code, args,
get_hook={
'b': (invalidator.begin, 0),
'i': (invalidator.invalidate, 1),
'e': (invalidator.end, 0),
'I': (invalidator.Invalidate, 1),
'U': (self._commit_lock_release, 0),
's': (self._serials.append, 1),
'S': (self._info.update, 1),
}.get):
hook = get_hook(code, None)
if hook is None: return
hook, flag = hook
if flag: hook(args)
else: hook()
self._call.setOutOfBand(out_of_band_hook)
self._startup()
def __len__(self): return self._info['length'] def __len__(self): return self._info['length']
def abortVersion(self, src, transaction): def abortVersion(self, src, transaction):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment