Commit d8b9dda5 authored by Jim Fulton's avatar Jim Fulton

Merged from Trunk:

  ------------------------------------------------------------------------
  r105024 | jim | 2009-10-12 09:01:47 -0400 (Mon, 12 Oct 2009) | 2 lines
  Changed paths:
     M /ZODB/trunk/src/CHANGES.txt

Let's hope we get this out today.

------------------------------------------------------------------------
  r105023 | jim | 2009-10-12 09:01:20 -0400 (Mon, 12 Oct 2009) | 1 line
  Changed paths:
     M /ZODB/trunk/src/CHANGES.txt

*** empty log message ***
  ------------------------------------------------------------------------
  r105015 | jim | 2009-10-11 13:42:41 -0400 (Sun, 11 Oct 2009) | 3 lines
  Changed paths:
     M /ZODB/trunk/src/ZEO/ServerStub.py

Time out waiting for protocol handshake.  Otherwise, we sometmes see
  hangs under extreme conditions.

------------------------------------------------------------------------
  r104977 | jim | 2009-10-09 15:41:15 -0400 (Fri, 09 Oct 2009) | 3 lines
  Changed paths:
     M /ZODB/trunk/src/CHANGES.txt
     M /ZODB/trunk/src/ZODB/FileStorage/FileStorage.py

- File-storage pack clean-up tasks that can take a long time
    unnecessarily blocked other activity.

------------------------------------------------------------------------
  r104954 | jim | 2009-10-08 17:09:19 -0400 (Thu, 08 Oct 2009) | 2 lines
  Changed paths:
     M /ZODB/trunk/src/ZODB/tests/testFileStorage.py

Cleaned up trailing whitespace and long lines.

------------------------------------------------------------------------
  r104953 | jim | 2009-10-08 17:04:31 -0400 (Thu, 08 Oct 2009) | 2 lines
  Changed paths:
     M /ZODB/trunk/src/ZODB/tests/PackableStorage.py

Defining a multable attr in a class is insane.

------------------------------------------------------------------------
  r104951 | jim | 2009-10-08 16:19:18 -0400 (Thu, 08 Oct 2009) | 2 lines
  Changed paths:
     M /ZODB/trunk/src/ZODB/tests/PackableStorage.py

Removed trailing whitespace.

------------------------------------------------------------------------
  r104950 | jim | 2009-10-08 14:30:11 -0400 (Thu, 08 Oct 2009) | 2 lines
  Changed paths:
     M /ZODB/trunk/src/ZEO/zrpc/connection.py

Fixed atexit handler to deal with the possibility that the ex

------------------------------------------------------------------------
  r104949 | jim | 2009-10-08 13:50:10 -0400 (Thu, 08 Oct 2009) | 6 lines
  Changed paths:
     M /ZODB/trunk/src/CHANGES.txt
     M /ZODB/trunk/src/ZEO/tests/testZEO.py
     M /ZODB/trunk/src/ZEO/zrpc/client.py
     M /ZODB/trunk/src/ZEO/zrpc/connection.py

Bug Fixed:
    ZEO manages a separate thread for client network IO.  It created
    this thread on import, which caused problems for applications that
    implemented daemon behavior by forking.  Now, the client thread
    isn't created until needed.
parent ddf46573
...@@ -2,6 +2,23 @@ ...@@ -2,6 +2,23 @@
Change History Change History
================ ================
3.9.2 (2009-10-13)
==================
Bugs Fixed
----------
- ZEO manages a separate thread for client network IO. It created
this thread on import, which caused problems for applications that
implemented daemon behavior by forking. Now, the client thread
isn't created until needed.
- File-storage pack clean-up tasks that can take a long time
unnecessarily blocked other activity.
- In certain rare situations, ZEO client connections would hand during
the initial connection setup.
3.9.1 (2009-10-01) 3.9.1 (2009-10-01)
================== ==================
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
############################################################################## ##############################################################################
"""RPC stubs for interface exported by StorageServer.""" """RPC stubs for interface exported by StorageServer."""
import os
import time import time
## ##
...@@ -368,11 +369,12 @@ class StorageServer308(StorageServer): ...@@ -368,11 +369,12 @@ class StorageServer308(StorageServer):
def iterator_gc(self, iids): def iterator_gc(self, iids):
raise NotImplementedError raise NotImplementedError
def stub(client, connection): def stub(client, connection):
start = time.time()
# Wait until we know what version the other side is using. # Wait until we know what version the other side is using.
while connection.peer_protocol_version is None: while connection.peer_protocol_version is None:
if time.time()-start > 10:
raise ValueError("Timeout waiting for protocol handshake")
time.sleep(0.1) time.sleep(0.1)
if connection.peer_protocol_version < 'Z309': if connection.peer_protocol_version < 'Z309':
......
...@@ -52,6 +52,8 @@ import zope.testing.setupstack ...@@ -52,6 +52,8 @@ import zope.testing.setupstack
logger = logging.getLogger('ZEO.tests.testZEO') logger = logging.getLogger('ZEO.tests.testZEO')
ZEO.zrpc.connection.start_client_thread()
class DummyDB: class DummyDB:
def invalidate(self, *args): def invalidate(self, *args):
pass pass
......
...@@ -24,12 +24,13 @@ from ZODB.loglevels import BLATHER ...@@ -24,12 +24,13 @@ from ZODB.loglevels import BLATHER
from ZEO.zrpc.log import log from ZEO.zrpc.log import log
import ZEO.zrpc.trigger import ZEO.zrpc.trigger
from ZEO.zrpc.connection import ManagedClientConnection from ZEO.zrpc.connection import ManagedClientConnection, start_client_thread
class ConnectionManager(object): class ConnectionManager(object):
"""Keeps a connection up over time""" """Keeps a connection up over time"""
def __init__(self, addrs, client, tmin=1, tmax=180): def __init__(self, addrs, client, tmin=1, tmax=180):
start_client_thread()
self.addrlist = self._parse_addrs(addrs) self.addrlist = self._parse_addrs(addrs)
self.client = client self.client = client
self.tmin = min(tmin, tmax) self.tmin = min(tmin, tmax)
......
...@@ -42,9 +42,10 @@ client_map = {} ...@@ -42,9 +42,10 @@ client_map = {}
client_trigger = trigger(client_map) client_trigger = trigger(client_map)
client_logger = logging.getLogger('ZEO.zrpc.client_loop') client_logger = logging.getLogger('ZEO.zrpc.client_loop')
client_exit_event = threading.Event() client_exit_event = threading.Event()
client_running = True client_running = False
def client_exit(): def client_exit():
global client_running global client_running
if client_running:
client_running = False client_running = False
client_trigger.pull_trigger() client_trigger.pull_trigger()
client_exit_event.wait(99) client_exit_event.wait(99)
...@@ -52,15 +53,15 @@ def client_exit(): ...@@ -52,15 +53,15 @@ def client_exit():
atexit.register(client_exit) atexit.register(client_exit)
def client_loop(): def client_loop():
map = client_map global client_running
client_running = True
client_exit_event.clear()
map = client_map
read = asyncore.read read = asyncore.read
write = asyncore.write write = asyncore.write
_exception = asyncore._exception _exception = asyncore._exception
loop_failures = 0 loop_failures = 0
client_exit_event.clear()
global client_running
client_running = True
while client_running and map: while client_running and map:
try: try:
...@@ -153,9 +154,19 @@ def client_loop(): ...@@ -153,9 +154,19 @@ def client_loop():
client_exit_event.set() client_exit_event.set()
client_thread = threading.Thread(target=client_loop, name=__name__) client_thread_lock = threading.Lock()
client_thread.setDaemon(True) client_thread = None
client_thread.start() def start_client_thread():
client_thread_lock.acquire()
try:
global client_thread
if client_thread is None:
client_thread = threading.Thread(target=client_loop, name=__name__)
client_thread.setDaemon(True)
client_thread.start()
finally:
client_thread_lock.release()
# #
############################################################################## ##############################################################################
......
...@@ -1117,6 +1117,8 @@ class FileStorage( ...@@ -1117,6 +1117,8 @@ class FileStorage(
if self.blob_dir and os.path.exists(self.blob_dir + ".old"): if self.blob_dir and os.path.exists(self.blob_dir + ".old"):
ZODB.blob.remove_committed_dir(self.blob_dir + ".old") ZODB.blob.remove_committed_dir(self.blob_dir + ".old")
cleanup = []
have_commit_lock = False have_commit_lock = False
try: try:
pack_result = None pack_result = None
...@@ -1139,19 +1141,20 @@ class FileStorage( ...@@ -1139,19 +1141,20 @@ class FileStorage(
# OK, we're beyond the point of no return # OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name) os.rename(self._file_name + '.pack', self._file_name)
if not self.pack_keep_old:
os.remove(oldpath)
self._file = open(self._file_name, 'r+b') self._file = open(self._file_name, 'r+b')
self._initIndex(index, self._tindex) self._initIndex(index, self._tindex)
self._pos = opos self._pos = opos
self._save_index() finally:
self._lock_release()
# We're basically done. Now we need to deal with removed
# blobs and removing the .old file (see further down).
if self.blob_dir: if self.blob_dir:
self._commit_lock_release() self._commit_lock_release()
have_commit_lock = False have_commit_lock = False
self._remove_blob_files_tagged_for_removal_during_pack() self._remove_blob_files_tagged_for_removal_during_pack()
finally:
self._lock_release()
finally: finally:
if have_commit_lock: if have_commit_lock:
self._commit_lock_release() self._commit_lock_release()
...@@ -1159,6 +1162,14 @@ class FileStorage( ...@@ -1159,6 +1162,14 @@ class FileStorage(
self._pack_is_in_progress = False self._pack_is_in_progress = False
self._lock_release() self._lock_release()
if not self.pack_keep_old:
os.remove(oldpath)
self._lock_acquire()
try:
self._save_index()
finally:
self._lock_release()
def _remove_blob_files_tagged_for_removal_during_pack(self): def _remove_blob_files_tagged_for_removal_during_pack(self):
lblob_dir = len(self.blob_dir) lblob_dir = len(self.blob_dir)
...@@ -1167,13 +1178,38 @@ class FileStorage( ...@@ -1167,13 +1178,38 @@ class FileStorage(
link_or_copy = ZODB.blob.link_or_copy link_or_copy = ZODB.blob.link_or_copy
# Helper to clean up dirs left empty after moving things to old # Helper to clean up dirs left empty after moving things to old
def maybe_remove_empty_dir_containing(path): def maybe_remove_empty_dir_containing(path, level=0):
path = os.path.dirname(path) path = os.path.dirname(path)
if len(path) <= lblob_dir: if len(path) <= lblob_dir or os.listdir(path):
return return
# Path points to an empty dir. There may be a race. We
# might have just removed the dir for an oid (or a parent
# dir) and while we're cleaning up it's parent, another
# thread is adding a new entry to it.
# We don't have to worry about level 0, as this is just a
# directory containing an object's revisions. If it is
# enmpty, the object must have been garbage.
# If the level is 1 or higher, we need to be more
# careful. We'll get the storage lock and double check
# that the dir is still empty before removing it.
removed = False
if level:
self._lock_acquire()
try:
if not os.listdir(path): if not os.listdir(path):
os.rmdir(path) os.rmdir(path)
maybe_remove_empty_dir_containing(path) removed = True
finally:
if level:
self._lock_release()
if removed:
maybe_remove_empty_dir_containing(path, level+1)
if self.pack_keep_old: if self.pack_keep_old:
# Helpers that move oid dir or revision file to the old dir. # Helpers that move oid dir or revision file to the old dir.
...@@ -1203,7 +1239,7 @@ class FileStorage( ...@@ -1203,7 +1239,7 @@ class FileStorage(
# Hm, already gone. Odd. # Hm, already gone. Odd.
continue continue
handle_dir(path) handle_dir(path)
maybe_remove_empty_dir_containing(path) maybe_remove_empty_dir_containing(path, 1)
continue continue
if len(line) != 16: if len(line) != 16:
......
...@@ -92,7 +92,14 @@ def pdumps(obj): ...@@ -92,7 +92,14 @@ def pdumps(obj):
class PackableStorageBase: class PackableStorageBase:
# We keep a cache of object ids to instances so that the unpickler can # We keep a cache of object ids to instances so that the unpickler can
# easily return any persistent object. # easily return any persistent object.
_cache = {}
@property
def _cache(self):
try:
return self.__cache
except AttributeError:
self.__cache = {}
return self.__cache
def _newobj(self): def _newobj(self):
# This is a convenience method to create a new persistent Object # This is a convenience method to create a new persistent Object
......
...@@ -365,13 +365,15 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase): ...@@ -365,13 +365,15 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase):
# sometimes data is in this format # sometimes data is in this format
j = 0 j = 0
oid, revid = oids[j] oid, revid = oids[j]
serial = self._storage.store(oid, revid, pickle.dumps(OOBTree, 1), "", t) serial = self._storage.store(
oid, revid, pickle.dumps(OOBTree, 1), "", t)
oids[j][1] = serial oids[j][1] = serial
# and it could be from a broken module # and it could be from a broken module
j = 1 j = 1
oid, revid = oids[j] oid, revid = oids[j]
serial = self._storage.store(oid, revid, pickle.dumps(Broken, 1), "", t) serial = self._storage.store(
oid, revid, pickle.dumps(Broken, 1), "", t)
oids[j][1] = serial oids[j][1] = serial
# but mostly it looks like this # but mostly it looks like this
...@@ -401,7 +403,8 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase): ...@@ -401,7 +403,8 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase):
pct = rep.TYPESIZE[t] * 100.0 / rep.DBYTES pct = rep.TYPESIZE[t] * 100.0 / rep.DBYTES
cumpct += pct cumpct += pct
self.assertAlmostEqual(cumpct, 100.0, 0, "Failed to analyze some records") self.assertAlmostEqual(cumpct, 100.0, 0,
"Failed to analyze some records")
# Raise an exception if the tids in FileStorage fs aren't # Raise an exception if the tids in FileStorage fs aren't
# strictly increasing. # strictly increasing.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment