Commit 028e4185 authored by Chris McDonough's avatar Chris McDonough

Factor out blob cache storage into a helper class for use by both ClientStorage and BlobStorage.

parent 6a3a3773
......@@ -35,11 +35,11 @@ from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
from ZODB.Blobs.interfaces import IBlobStorage
from ZODB.Blobs.Blob import FilesystemHelper
from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZEO.ClientStorage')
......@@ -316,17 +316,13 @@ class ClientStorage(object):
# XXX need to check for POSIX-ness here
if blob_dir is not None:
if not os.path.exists(blob_dir):
os.makedirs(blob_dir, 0700)
log2("Blob cache directory '%s' does not exist. "
"Created new directory." % self.base_directory,
level=logging.INFO)
if (os.stat(blob_dir).st_mode & 077) != 0:
self.fshelper = FilesystemHelper(blob_dir)
self.fshelper.create()
if not self.fshelper.isSecure(blob_dir):
log2('Blob dir %s has insecure mode setting' % blob_dir,
level=logging.WARNING)
self.blob_dir = blob_dir
else:
self.fshelper = None
# Initialize locks
self.blob_status_lock = threading.Lock()
self.blob_status = {}
......@@ -929,37 +925,21 @@ class ClientStorage(object):
os.unlink(blobfilename)
return serials
def _getBlobPath(self, oid):
return os.path.join(self.blob_dir,
utils.oid_repr(oid)
)
def _getLoadingFilename(self, oid, serial):
"""Generate an intermediate filename for two-phase commit.
"""
return self._getCleanFilename(oid, serial) + ".loading"
def _getCleanFilename(self, oid, tid):
return os.path.join(self._getBlobPath(oid),
"%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def _do_load_blob(self, oid, serial, version):
"""Do the actual loading from the RPC server."""
blob_filename = self._getCleanFilename(oid, serial)
blob_filename = self.fshelper.getBlobFilename(oid, serial)
if self._server is None:
raise ClientDisconnected()
targetpath = self._getBlobPath(oid)
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
# We write to a temporary file first, so we do not accidentally
# allow half-baked copies of this blob be loaded
tempfilename = self._getLoadingFilename(oid, serial)
tempfile = open(tempfilename, "wb")
tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
tempfile = fdopen(tempfd, 'wb')
offset = 0
while True:
chunk = self._server.loadBlob(oid, serial, version, offset)
......@@ -982,11 +962,11 @@ class ClientStorage(object):
2a. Wait for other download to finish, return
3. If not beeing downloaded, start download
"""
if self.blob_dir is None:
if self.fshelper is None:
raise POSException.Unsupported("No blob cache directory is "
"configured.")
blob_filename = self._getCleanFilename(oid, serial)
blob_filename = self.fshelper.getBlobFilename(oid, serial)
# Case 1: Blob is available already, just use it
if os.path.exists(blob_filename):
return blob_filename
......
......@@ -308,7 +308,7 @@ class BlobAdaptedFileStorageTests(GenericTests):
super(statusdict, self).__delitem__(k)
# ensure that we do locking properly
filename = self._storage._getCleanFilename(oid, serial)
filename = self._storage.fshelper.getBlobFilename(oid, serial)
thestatuslock = self._storage.blob_status_lock = Dummy()
thebloblock = Dummy()
......
......@@ -2,6 +2,7 @@
import os
import time
import tempfile
import logging
from zope.interface import implements
......@@ -12,6 +13,8 @@ import transaction
from transaction.interfaces import IDataManager
from persistent import Persistent
BLOB_SUFFIX = ".blob"
class Blob(Persistent):
implements(IBlob)
......@@ -265,3 +268,87 @@ class BlobFile(file):
# we'll assume they will be for now in the name of not
# muddying the code needlessly.
self.close()
logger = logging.getLogger('ZODB.Blobs')
_pid = str(os.getpid())
def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
message = "(%s) %s" % (subsys, msg)
logger.log(level, message, exc_info=exc_info)
class FilesystemHelper:
# Storages that implement IBlobStorage can choose to use this
# helper class to generate and parse blob filenames. This is not
# a set-in-stone interface for all filesystem operations dealing
# with blobs and storages needn't indirect through this if they
# want to perform blob storage differently.
def __init__(self, base_dir):
self.base_dir = base_dir
def create(self):
if not os.path.exists(self.base_dir):
os.makedirs(self.base_dir, 0700)
log("Blob cache directory '%s' does not exist. "
"Created new directory." % self.base_dir,
level=logging.INFO)
def isSecure(self, path):
""" Ensure that (POSIX) path mode bits are 0700 """
return (os.stat(path).st_mode & 077) != 0
def getPathForOID(self, oid):
""" Given an OID, return the path on the filesystem where
the blob data relating to that OID is stored """
return os.path.join(self.base_dir, utils.oid_repr(oid))
def getBlobFilename(self, oid, tid):
""" Given an oid and a tid, return the full filename of the
'committed' blob file related to that oid and tid. """
oid_path = self.getPathForOID(oid)
filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
return os.path.join(oid_path, filename)
def blob_mkstemp(self, oid, tid):
""" Given an oid and a tid, return a temporary file descriptor
and a related filename. The file is guaranteed to exist on
the same partition as committed data, which is important for
being able to rename the file without a copy operation. The
directory in which the file will be placed, which is the
return value of self.getPathForOID(oid), must exist before
this method may be called successfully."""
oidpath = self.getPathForOID(oid)
fd, name = tempfile.mkstemp(suffix='.tmp', prefix=utils.tid_repr(tid),
dir=oidpath)
return fd, name
def splitBlobFilename(self, filename):
"""Returns the oid and tid for a given blob filename.
If the filename cannot be recognized as a blob filename, (None, None)
is returned.
"""
if not filename.endswith(BLOB_SUFFIX):
return None, None
path, filename = os.path.split(filename)
oid = os.path.split(path)[1]
serial = filename[:-len(BLOB_SUFFIX)]
oid = utils.repr_to_oid(oid)
serial = utils.repr_to_oid(serial)
return oid, serial
def getOIDsForSerial(self, search_serial):
""" Return all oids related to a particular tid that exist in
blob data """
oids = []
base_dir = self.base_dir
for oidpath in os.listdir(base_dir):
for filename in os.listdir(os.path.join(base_dir, oidpath)):
blob_path = os.path.join(base_dir, oidpath, filename)
oid, serial = self.splitBlobFilename(blob_path)
if search_serial == serial:
oids.append(oid)
return oids
......@@ -23,8 +23,8 @@ from zope.proxy import ProxyBase, getProxiedObject
from ZODB import utils
from ZODB.Blobs.interfaces import IBlobStorage, IBlob
from ZODB.POSException import POSKeyError
BLOB_SUFFIX = ".blob"
from ZODB.Blobs.Blob import BLOB_SUFFIX
from ZODB.Blobs.Blob import FilesystemHelper
logger = logging.getLogger('ZODB.BlobStorage')
......@@ -33,7 +33,7 @@ class BlobStorage(ProxyBase):
implements(IBlobStorage)
__slots__ = ('base_directory', 'dirty_oids')
__slots__ = ('fshelper', 'dirty_oids')
# Proxies can't have a __dict__ so specifying __slots__ here allows
# us to have instance attributes explicitly on the proxy.
......@@ -43,7 +43,7 @@ class BlobStorage(ProxyBase):
def __init__(self, base_directory, storage):
# TODO Complain if storage is ClientStorage
ProxyBase.__init__(self, storage)
self.base_directory = base_directory
self.fshelper = FilesystemHelper(base_directory)
if not os.path.exists(self.base_directory):
os.makedirs(self.base_directory, 0700)
logger.info("Blob directory '%s' does not exist. "
......@@ -64,11 +64,11 @@ class BlobStorage(ProxyBase):
self._lock_acquire()
try:
targetpath = self._getBlobPath(oid)
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self._getCleanFilename(oid, serial)
targetname = self.fshelper.getBlobFilename(oid, serial)
os.rename(blobfilename, targetname)
# XXX if oid already in there, something is really hosed.
......@@ -78,17 +78,6 @@ class BlobStorage(ProxyBase):
self._lock_release()
return self._tid
def _getBlobPath(self, oid):
return os.path.join(self.base_directory,
utils.oid_repr(oid)
)
def _getCleanFilename(self, oid, tid):
return os.path.join(self._getBlobPath(oid),
"%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def tpc_finish(self, *arg, **kw):
""" We need to override the base storage's tpc_finish instead of
providing a _finish method because methods found on the proxied object
......@@ -103,14 +92,14 @@ class BlobStorage(ProxyBase):
getProxiedObject(self).tpc_abort(*arg, **kw)
while self.dirty_oids:
oid, serial = self.dirty_oids.pop()
clean = self._getCleanFilename(oid, serial)
clean = self.fshelper.getBlobFilename(oid, serial)
if os.exists(clean):
os.unlink(clean)
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
"""
filename = self._getCleanFilename(oid, serial)
filename = self.fshelper.getBlobFilename(oid, serial)
if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob."
return filename
......@@ -125,17 +114,18 @@ class BlobStorage(ProxyBase):
# XXX we should be tolerant of "garbage" directories/files in
# the base_directory here.
for oid_repr in os.listdir(self.base_directory):
base_dir = self.fshelper.base_dir
for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(self.base_directory, oid_repr)
oid_path = os.path.join(base_dir, oid_repr)
files = os.listdir(oid_path)
files.sort()
for filename in files:
filepath = os.path.join(oid_path, filename)
whatever, serial = self._splitBlobFilename(filepath)
whatever, serial = self.fshelper.splitBlobFilename(filepath)
try:
fn = self._getCleanFilename(oid, serial)
fn = self.fshelper.getBlobFilename(oid, serial)
self.loadSerial(oid, serial)
except POSKeyError:
os.unlink(filepath)
......@@ -144,9 +134,10 @@ class BlobStorage(ProxyBase):
shutil.rmtree(oid_path)
def _packNonUndoing(self, packtime, referencesf):
for oid_repr in os.listdir(self.base_directory):
base_dir = self.fshelper.base_dir
for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(self.base_directory, oid_repr)
oid_path = os.path.join(base_dir, oid_repr)
exists = True
try:
......@@ -193,41 +184,29 @@ class BlobStorage(ProxyBase):
orig_size = getProxiedObject(self).getSize()
blob_size = 0
for oid in os.listdir(self.base_directory):
for serial in os.listdir(os.path.join(self.base_directory, oid)):
base_dir = self.fshelper.base_dir
for oid in os.listdir(base_dir):
for serial in os.listdir(os.path.join(base_dir, oid)):
if not serial.endswith(BLOB_SUFFIX):
continue
file_path = os.path.join(self.base_directory, oid, serial)
file_path = os.path.join(base_dir, oid, serial)
blob_size += os.stat(file_path).st_size
return orig_size + blob_size
def _splitBlobFilename(self, filename):
"""Returns OID, TID for a given blob filename.
If it's not a blob filename, (None, None) is returned.
"""
if not filename.endswith(BLOB_SUFFIX):
return None, None
path, filename = os.path.split(filename)
oid = os.path.split(path)[1]
serial = filename[:-len(BLOB_SUFFIX)]
oid = utils.repr_to_oid(oid)
serial = utils.repr_to_oid(serial)
return oid, serial
def undo(self, serial_id, transaction):
serial, keys = getProxiedObject(self).undo(serial_id, transaction)
self._lock_acquire()
try:
# The old serial_id is given in base64 encoding ...
serial_id = base64.decodestring(serial_id+ '\n')
for oid in self._getOIDsForSerial(serial_id):
data, serial_before, serial_after = \
self.loadBefore(oid, serial_id)
orig = file(self._getCleanFilename(oid, serial_before), "r")
new = file(self._getCleanFilename(oid, serial), "w")
for oid in self.fshelper.getOIDsForSerial(serial_id):
data, serial_before, serial_after = self.loadBefore(oid,
serial_id)
orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
orig = open(orig_fn, "r")
new_fn = self.fshelper.getBlobFilename(oid, serial)
new = open(new_fn, "wb")
utils.cp(orig, new)
orig.close()
new.close()
......@@ -236,14 +215,3 @@ class BlobStorage(ProxyBase):
self._lock_release()
return serial, keys
def _getOIDsForSerial(self, search_serial):
oids = []
for oidpath in os.listdir(self.base_directory):
for filename in os.listdir(os.path.join(self.base_directory,
oidpath)):
blob_path = os.path.join(self.base_directory, oidpath,
filename)
oid, serial = self._splitBlobFilename(blob_path)
if search_serial == serial:
oids.append(oid)
return oids
......@@ -83,13 +83,13 @@ Put some revisions of a blob object in our database and on the filesystem:
>>> tids.append(blob_storage._tid)
>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Get our blob filenames for this oid.
>>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
Do a pack to the slightly before the first revision was written:
......@@ -203,13 +203,13 @@ Put some revisions of a blob object in our database and on the filesystem:
>>> tids.append(blob_storage._tid)
>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Get our blob filenames for this oid.
>>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
Do a pack to the slightly before the first revision was written:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment