Commit 7775207d authored by Chris McDonough's avatar Chris McDonough

Add (badly implemented) methods to TmpStore to allow blob import to work...

Add (badly implemented) methods to TmpStore to allow blob import to work again.  This will need to be redone, but it gets us to the point where all the tests pass.

Also provide a __repr__ to the BlobStorage proxy which lets us know that we're using a proxied storage.
parent 142e6931
...@@ -42,6 +42,11 @@ class BlobStorage(ProxyBase): ...@@ -42,6 +42,11 @@ class BlobStorage(ProxyBase):
ProxyBase.__init__(self, storage) ProxyBase.__init__(self, storage)
self.base_directory = base_directory self.base_directory = base_directory
self.dirty_oids = [] self.dirty_oids = []
def __repr__(self):
normal_storage = getProxiedObject(self)
return '<BlobStorage proxy for %r at %s>' % (normal_storage,
hex(id(self)))
def storeBlob(self, oid, oldserial, data, blobfilename, version, def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction): transaction):
......
...@@ -143,8 +143,8 @@ connections should result in a write conflict error. ...@@ -143,8 +143,8 @@ connections should result in a write conflict error.
>>> tm1 = transaction.TransactionManager() >>> tm1 = transaction.TransactionManager()
>>> tm2 = transaction.TransactionManager() >>> tm2 = transaction.TransactionManager()
>>> root3 = database.open(txn_mgr=tm1).root() >>> root3 = database.open(transaction_manager=tm1).root()
>>> root4 = database.open(txn_mgr=tm2).root() >>> root4 = database.open(transaction_manager=tm2).root()
>>> blob1c3 = root3['blob1'] >>> blob1c3 = root3['blob1']
>>> blob1c4 = root4['blob1'] >>> blob1c4 = root4['blob1']
>>> blob1c3fh1 = blob1c3.open('a') >>> blob1c3fh1 = blob1c3.open('a')
......
...@@ -20,6 +20,7 @@ import sys ...@@ -20,6 +20,7 @@ import sys
import tempfile import tempfile
import threading import threading
import warnings import warnings
import os
from time import time from time import time
from persistent import PickleCache from persistent import PickleCache
...@@ -42,9 +43,11 @@ from ZODB import POSException ...@@ -42,9 +43,11 @@ from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.POSException import Unsupported from ZODB.POSException import Unsupported
from ZODB.POSException import POSKeyError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36 from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from ZODB.utils import p64, u64, z64, oid_repr, positive_id from ZODB.utils import p64, u64, z64, oid_repr, positive_id
from ZODB import utils
global_reset_counter = 0 global_reset_counter = 0
...@@ -1120,10 +1123,15 @@ class Connection(ExportImport, object): ...@@ -1120,10 +1123,15 @@ class Connection(ExportImport, object):
for oid in oids: for oid in oids:
data, serial = src.load(oid, src) data, serial = src.load(oid, src)
s = self._storage.store(oid, serial, data, try:
self._version, transaction) blobfilename = src.loadBlob(oid, serial, self._version)
except POSKeyError:
s = self._storage.store(oid, serial, data,
self._version, transaction)
else:
s = self._storage.storeBlob(oid, serial, data, blobfilename,
self._version, transaction)
self._handle_serial(s, oid, change=False) self._handle_serial(s, oid, change=False)
src.close() src.close()
def _abort_savepoint(self): def _abort_savepoint(self):
...@@ -1166,6 +1174,9 @@ class Savepoint: ...@@ -1166,6 +1174,9 @@ class Savepoint:
def rollback(self): def rollback(self):
self.datamanager._rollback(self.state) self.datamanager._rollback(self.state)
BLOB_SUFFIX = ".blob"
BLOB_DIRTY = "store"
class TmpStore: class TmpStore:
"""A storage-like thing to support savepoints.""" """A storage-like thing to support savepoints."""
...@@ -1179,6 +1190,7 @@ class TmpStore: ...@@ -1179,6 +1190,7 @@ class TmpStore:
self._base_version = base_version self._base_version = base_version
self._file = tempfile.TemporaryFile() self._file = tempfile.TemporaryFile()
self._blobdir = tempfile.mkdtemp()
# position: current file position # position: current file position
# _tpos: file position at last commit point # _tpos: file position at last commit point
self.position = 0L self.position = 0L
...@@ -1222,6 +1234,38 @@ class TmpStore: ...@@ -1222,6 +1234,38 @@ class TmpStore:
self.position += l + len(header) self.position += l + len(header)
return serial return serial
def storeBlob(self, oid, serial, data, blobfilename, version,
transaction):
# XXX we need to clean up after ourselves!
serial = self.store(oid, serial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
targetpath = self._getBlobPath(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self._getCleanFilename(oid, serial)
utils.best_rename(blobfilename, targetname)
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
"""
filename = self._getCleanFilename(oid, serial)
if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob."
return filename
def _getBlobPath(self, oid):
return os.path.join(self._blobdir,
utils.oid_repr(oid)
)
def _getCleanFilename(self, oid, tid):
return os.path.join(self._getBlobPath(oid),
"%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def reset(self, position, index): def reset(self, position, index):
self._file.truncate(position) self._file.truncate(position)
self.position = position self.position = position
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment