Commit c2a8e340 authored by Jim Fulton's avatar Jim Fulton

Now require Blob files to be stored even for unopened blobs.

parent 2794dae8
...@@ -899,8 +899,7 @@ class ClientStorage(object): ...@@ -899,8 +899,7 @@ class ClientStorage(object):
else: else:
self._server.storeBlob( self._server.storeBlob(
oid, serial, data, blobfilename, version, txn) oid, serial, data, blobfilename, version, txn)
if blobfilename is not None: self._tbuf.storeBlob(oid, blobfilename)
self._tbuf.storeBlob(oid, blobfilename)
return serials return serials
def _storeBlob_shared(self, oid, serial, data, filename, version, txn): def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
...@@ -965,6 +964,7 @@ class ClientStorage(object): ...@@ -965,6 +964,7 @@ class ClientStorage(object):
# Load a blob. If it isn't present and we have a shared blob # Load a blob. If it isn't present and we have a shared blob
# directory, then assume that it doesn't exist on the server # directory, then assume that it doesn't exist on the server
# and return None. # and return None.
if self.fshelper is None: if self.fshelper is None:
raise POSException.Unsupported("No blob cache directory is " raise POSException.Unsupported("No blob cache directory is "
"configured.") "configured.")
...@@ -976,8 +976,8 @@ class ClientStorage(object): ...@@ -976,8 +976,8 @@ class ClientStorage(object):
if self.blob_cache_writable: if self.blob_cache_writable:
# We're using a server shared cache. If the file isn't # We're using a server shared cache. If the file isn't
# here, it's not anywahere. # here, it's not anywhere.
return None raise POSKeyError("No blob file", oid, serial)
# First, we'll create the directory for this oid, if it doesn't exist. # First, we'll create the directory for this oid, if it doesn't exist.
targetpath = self.fshelper.getPathForOID(oid) targetpath = self.fshelper.getPathForOID(oid)
...@@ -1040,7 +1040,7 @@ class ClientStorage(object): ...@@ -1040,7 +1040,7 @@ class ClientStorage(object):
if self._have_blob(blob_filename, oid, serial): if self._have_blob(blob_filename, oid, serial):
return blob_filename return blob_filename
return None raise POSKeyError("No blob file", oid, serial)
finally: finally:
lock.close() lock.close()
......
...@@ -226,11 +226,6 @@ class StorageServer: ...@@ -226,11 +226,6 @@ class StorageServer:
# the data into memory, so we use a message iterator. This # the data into memory, so we use a message iterator. This
# allows us to read the blob data as needed. # allows us to read the blob data as needed.
if blobfilename is None:
self.rpc.callAsync('storeEmptyBlob',
oid, serial, data, version, id(txn))
return
def store(): def store():
yield ('storeBlobStart', ()) yield ('storeBlobStart', ())
f = open(blobfilename, 'rb') f = open(blobfilename, 'rb')
......
...@@ -539,9 +539,6 @@ class ZEOStorage: ...@@ -539,9 +539,6 @@ class ZEOStorage:
os.close(fd) os.close(fd)
self.blob_log.append((oid, serial, data, tempname, version)) self.blob_log.append((oid, serial, data, tempname, version))
def storeEmptyBlob(self, oid, serial, data, version, id):
self.blob_log.append((oid, serial, data, None, version))
def storeBlobShared(self, oid, serial, data, filename, version, id): def storeBlobShared(self, oid, serial, data, filename, version, id):
# Reconstruct the full path from the filename in the OID directory # Reconstruct the full path from the filename in the OID directory
filename = os.path.join(self.storage.fshelper.getPathForOID(oid), filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
......
...@@ -305,7 +305,6 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -305,7 +305,6 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# Z308 -- named after the ZODB release 3.8 # Z308 -- named after the ZODB release 3.8
# Added blob-support server methods: # Added blob-support server methods:
# sendBlob # sendBlob
# storeEmptyBlob
# storeBlobStart # storeBlobStart
# storeBlobChunk # storeBlobChunk
# storeBlobEnd # storeBlobEnd
......
...@@ -1072,7 +1072,6 @@ class Connection(ExportImport, object): ...@@ -1072,7 +1072,6 @@ class Connection(ExportImport, object):
def savepoint(self): def savepoint(self):
if self._savepoint_storage is None: if self._savepoint_storage is None:
# XXX what to do about IBlobStorages?
tmpstore = TmpStore(self._version, self._normal_storage) tmpstore = TmpStore(self._version, self._normal_storage)
self._savepoint_storage = tmpstore self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage self._storage = self._savepoint_storage
...@@ -1115,12 +1114,8 @@ class Connection(ExportImport, object): ...@@ -1115,12 +1114,8 @@ class Connection(ExportImport, object):
for oid in oids: for oid in oids:
data, serial = src.load(oid, src) data, serial = src.load(oid, src)
try: if isinstance(self._reader.getGhost(data), Blob):
blobfilename = src.loadBlob(oid, serial) blobfilename = src.loadBlob(oid, serial)
except (POSKeyError, Unsupported):
s = self._storage.store(oid, serial, data,
self._version, transaction)
else:
s = self._storage.storeBlob(oid, serial, data, blobfilename, s = self._storage.storeBlob(oid, serial, data, blobfilename,
self._version, transaction) self._version, transaction)
# we invalidate the object here in order to ensure # we invalidate the object here in order to ensure
...@@ -1128,6 +1123,10 @@ class Connection(ExportImport, object): ...@@ -1128,6 +1123,10 @@ class Connection(ExportImport, object):
# unghostify it, which will cause its blob data # unghostify it, which will cause its blob data
# to be reattached "cleanly" # to be reattached "cleanly"
self.invalidate(s, {oid:True}) self.invalidate(s, {oid:True})
else:
s = self._storage.store(oid, serial, data,
self._version, transaction)
self._handle_serial(s, oid, change=False) self._handle_serial(s, oid, change=False)
src.close() src.close()
...@@ -1240,8 +1239,6 @@ class TmpStore: ...@@ -1240,8 +1239,6 @@ class TmpStore:
def storeBlob(self, oid, serial, data, blobfilename, version, def storeBlob(self, oid, serial, data, blobfilename, version,
transaction): transaction):
serial = self.store(oid, serial, data, version, transaction) serial = self.store(oid, serial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
targetpath = self._getBlobPath(oid) targetpath = self._getBlobPath(oid)
if not os.path.exists(targetpath): if not os.path.exists(targetpath):
...@@ -1259,7 +1256,7 @@ class TmpStore: ...@@ -1259,7 +1256,7 @@ class TmpStore:
self._storage) self._storage)
filename = self._getCleanFilename(oid, serial) filename = self._getCleanFilename(oid, serial)
if not os.path.exists(filename): if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob." raise POSKeyError("No blob file", oid, serial)
return filename return filename
def _getBlobPath(self, oid): def _getBlobPath(self, oid):
......
...@@ -20,10 +20,11 @@ from cPickle import Pickler, Unpickler ...@@ -20,10 +20,11 @@ from cPickle import Pickler, Unpickler
from tempfile import TemporaryFile from tempfile import TemporaryFile
import logging import logging
from ZODB.POSException import ExportError, POSKeyError from ZODB.blob import Blob
from ZODB.utils import p64, u64, cp, mktemp
from ZODB.interfaces import IBlobStorage from ZODB.interfaces import IBlobStorage
from ZODB.POSException import ExportError, POSKeyError
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.utils import p64, u64, cp, mktemp
logger = logging.getLogger('ZODB.ExportImport') logger = logging.getLogger('ZODB.ExportImport')
...@@ -55,14 +56,10 @@ class ExportImport: ...@@ -55,14 +56,10 @@ class ExportImport:
f.writelines([oid, p64(len(p)), p]) f.writelines([oid, p64(len(p)), p])
if supports_blobs: if supports_blobs:
if 'Blob' not in p: if not isinstance(self._reader.getGhost(p), Blob):
continue # filter out most non-blobs continue # not a blob
blobfilename = self._storage.loadBlob(oid, serial) blobfilename = self._storage.loadBlob(oid, serial)
if blobfilename is None:
# This could be a non-blob or a blob with unsaved data.
continue
f.write(blob_begin_marker) f.write(blob_begin_marker)
f.write(p64(os.stat(blobfilename).st_size)) f.write(p64(os.stat(blobfilename).st_size))
blobdata = open(blobfilename, "rb") blobdata = open(blobfilename, "rb")
......
...@@ -245,6 +245,8 @@ class Blob(persistent.Persistent): ...@@ -245,6 +245,8 @@ class Blob(persistent.Persistent):
# hand uncommitted data to connection, relinquishing responsibility # hand uncommitted data to connection, relinquishing responsibility
# for it. # for it.
filename = self._p_blob_uncommitted filename = self._p_blob_uncommitted
if filename is None and self._p_blob_committed is None:
filename = self._create_uncommitted_file()
self._p_blob_uncommitted = self._p_blob_ref = None self._p_blob_uncommitted = self._p_blob_ref = None
return filename return filename
...@@ -413,22 +415,21 @@ class BlobStorage(SpecificationDecoratorBase): ...@@ -413,22 +415,21 @@ class BlobStorage(SpecificationDecoratorBase):
# the user may not have called "open" on the blob object, # the user may not have called "open" on the blob object,
# in which case, the blob will not have a filename. # in which case, the blob will not have a filename.
if blobfilename is not None: self._lock_acquire()
self._lock_acquire() try:
try: targetpath = self.fshelper.getPathForOID(oid)
targetpath = self.fshelper.getPathForOID(oid) if not os.path.exists(targetpath):
if not os.path.exists(targetpath): os.makedirs(targetpath, 0700)
os.makedirs(targetpath, 0700)
targetname = self.fshelper.getBlobFilename(oid, serial) targetname = self.fshelper.getBlobFilename(oid, serial)
utils.rename_or_copy(blobfilename, targetname) utils.rename_or_copy(blobfilename, targetname)
# XXX if oid already in there, something is really hosed. # XXX if oid already in there, something is really hosed.
# The underlying storage should have complained anyway # The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial)) self.dirty_oids.append((oid, serial))
finally: finally:
self._lock_release() self._lock_release()
return self._tid return self._tid
@non_overridable @non_overridable
def tpc_finish(self, *arg, **kw): def tpc_finish(self, *arg, **kw):
...@@ -456,7 +457,7 @@ class BlobStorage(SpecificationDecoratorBase): ...@@ -456,7 +457,7 @@ class BlobStorage(SpecificationDecoratorBase):
""" """
filename = self.fshelper.getBlobFilename(oid, serial) filename = self.fshelper.getBlobFilename(oid, serial)
if not os.path.exists(filename): if not os.path.exists(filename):
return None raise POSKeyError("No blob file", oid, serial)
return filename return filename
@non_overridable @non_overridable
......
...@@ -946,7 +946,7 @@ class IBlobStorage(Interface): ...@@ -946,7 +946,7 @@ class IBlobStorage(Interface):
def loadBlob(oid, serial): def loadBlob(oid, serial):
"""Return the filename of the Blob data for this OID and serial. """Return the filename of the Blob data for this OID and serial.
Returns a filename or None if no Blob data is connected with this OID. Returns a filename.
Raises POSKeyError if the blobfile cannot be found. Raises POSKeyError if the blobfile cannot be found.
""" """
......
...@@ -44,7 +44,7 @@ Putting a Blob into a Connection works like every other object: ...@@ -44,7 +44,7 @@ Putting a Blob into a Connection works like every other object:
>>> transaction.commit() >>> transaction.commit()
We can also commit a transaction that seats a blob into place without We can also commit a transaction that seats a blob into place without
calling the blob's open method (this currently fails): calling the blob's open method:
>>> nothing = transaction.begin() >>> nothing = transaction.begin()
>>> anotherblob = Blob() >>> anotherblob = Blob()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment