Commit 39fb497b authored by Jim Fulton's avatar Jim Fulton

Fixed a bug in file pool: it didn't properly handle multiple write

locks.

In fixing, also made it work with with.
parent 37f26139
......@@ -14,6 +14,8 @@
"""Storage implementation using a log written to a single file.
"""
from __future__ import with_statement
from cPickle import Pickler, Unpickler, loads
from persistent.TimeStamp import TimeStamp
from struct import pack, unpack
......@@ -32,6 +34,7 @@ from ZODB.utils import p64, u64, z64
import base64
import BTrees.OOBTree
import contextlib
import errno
import logging
import os
......@@ -409,8 +412,7 @@ class FileStorage(
"""Return pickle data and serial number."""
assert not version
_file = self._files.get()
try:
with self._files.get() as _file:
pos = self._lookup_pos(oid)
h = self._read_data_header(pos, oid, _file)
if h.plen:
......@@ -423,8 +425,6 @@ class FileStorage(
return data, h.tid
else:
raise POSKeyError(oid)
finally:
self._files.put(_file)
def loadSerial(self, oid, serial):
self._lock_acquire()
......@@ -445,8 +445,7 @@ class FileStorage(
self._lock_release()
def loadBefore(self, oid, tid):
_file = self._files.get()
try:
with self._files.get() as _file:
pos = self._lookup_pos(oid)
end_tid = None
while True:
......@@ -464,8 +463,6 @@ class FileStorage(
return data, h.tid, end_tid
else:
return _file.read(h.plen), h.tid, end_tid
finally:
self._files.put(_file)
def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
......@@ -718,12 +715,8 @@ class FileStorage(
self._lock_release()
def tpc_finish(self, transaction, f=None):
# Get write lock
self._files.write_lock()
try:
self._lock_acquire()
try:
with self._files.write_lock():
with self._lock:
if transaction is not self._transaction:
raise POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
......@@ -737,11 +730,6 @@ class FileStorage(
self._ude = None
self._transaction = None
self._commit_lock_release()
finally:
self._lock_release()
finally:
self._files.write_unlock()
def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
......@@ -1139,25 +1127,21 @@ class FileStorage(
return
have_commit_lock = True
opos, index = pack_result
self._files.write_lock()
self._lock_acquire()
try:
self._files.empty()
self._file.close()
try:
os.rename(self._file_name, oldpath)
except Exception:
self._file = open(self._file_name, 'r+b')
raise
with self._files.write_lock():
with self._lock:
self._files.empty()
self._file.close()
try:
os.rename(self._file_name, oldpath)
except Exception:
self._file = open(self._file_name, 'r+b')
raise
# OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name)
self._file = open(self._file_name, 'r+b')
self._initIndex(index, self._tindex)
self._pos = opos
finally:
self._files.write_unlock()
self._lock_release()
# OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name)
self._file = open(self._file_name, 'r+b')
self._initIndex(index, self._tindex)
self._pos = opos
# We're basically done. Now we need to deal with removed
# blobs and removing the .old file (see further down).
......@@ -2053,6 +2037,7 @@ class FilePool:
closed = False
writing = False
writers = 0
def __init__(self, file_name):
self.name = file_name
......@@ -2060,26 +2045,31 @@ class FilePool:
self._out = []
self._cond = threading.Condition()
@contextlib.contextmanager
def write_lock(self):
self._cond.acquire()
try:
self.writing = True
while self._out:
with self._cond:
self.writers += 1
while self.writing or self._out:
self._cond.wait()
finally:
self._cond.release()
if self.closed:
raise ValueError('closed')
self.writing = True
def write_unlock(self):
self._cond.acquire()
self.writing = False
self._cond.notifyAll()
self._cond.release()
try:
yield None
finally:
with self._cond:
self.writing = False
if self.writers > 0:
self.writers -= 1
self._cond.notifyAll()
@contextlib.contextmanager
def get(self):
self._cond.acquire()
try:
while self.writing:
with self._cond:
while self.writers:
self._cond.wait()
assert not self.writing
if self.closed:
raise ValueError('closed')
......@@ -2088,32 +2078,25 @@ class FilePool:
except IndexError:
f = open(self.name, 'rb')
self._out.append(f)
return f
finally:
self._cond.release()
def put(self, f):
self._out.remove(f)
self._files.append(f)
if not self._out:
self._cond.acquire()
try:
if self.writing and not self._out:
self._cond.notifyAll()
finally:
self._cond.release()
try:
yield f
finally:
self._out.remove(f)
self._files.append(f)
if not self._out:
with self._cond:
if self.writers and not self._out:
self._cond.notifyAll()
def empty(self):
while self._files:
self._files.pop().close()
def close(self):
self._cond.acquire()
self.closed = True
self._cond.release()
self.write_lock()
try:
with self._cond:
self.closed = True
while self._out:
self._out.pop().close()
self.empty()
finally:
self.write_unlock()
self.writing = self.writers = 0
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment