Commit 3012dfe5 authored by Christian Theune's avatar Christian Theune

- Implemented ZEO support.

parent b44fbe43
......@@ -27,6 +27,7 @@ import time
import types
import logging
from zope.interface import implements
from ZEO import ServerStub
from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
......@@ -34,8 +35,11 @@ from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
from ZODB.Blobs.BlobStorage import BLOB_SUFFIX, BLOB_DIRTY
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
from ZODB.Blobs.interfaces import IBlobStorage
from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZEO.ClientStorage')
......@@ -93,6 +97,7 @@ class ClientStorage(object):
tpc_begin().
"""
implements(IBlobStorage)
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
......@@ -106,7 +111,7 @@ class ClientStorage(object):
wait_for_server_on_startup=None, # deprecated alias for wait
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
username='', password='', realm=None):
username='', password='', realm=None, blob_dir="/tmp"):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
......@@ -303,6 +308,8 @@ class ClientStorage(object):
# is executing.
self._lock = threading.Lock()
self.blob_dir = blob_dir
# Decide whether to use non-temporary files
if client is not None:
dir = var or os.getcwd()
......@@ -885,6 +892,58 @@ class ClientStorage(object):
self._tbuf.store(oid, version, data)
return self._check_serials()
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
serials = self.store(oid, serial, data, version, txn)
blobfile = open(blobfilename, "rb")
while True:
chunk = blobfile.read(4096)
if not chunk:
self._server.storeBlobEnd(oid, serial, data, version, id(txn))
break
self._server.storeBlob(oid, serial, chunk, version, id(txn))
return serials
def _getDirtyFilename(self, oid, serial):
"""Generate an intermediate filename for two-phase commit.
"""
return self._getCleanFilename(oid, serial) + "." + BLOB_DIRTY
def _getBlobPath(self, oid):
return self.blob_dir
def _getCleanFilename(self, oid, tid):
return os.path.join(self._getBlobPath(oid),
"%s-%s%s" % (utils.oid_repr(oid),
utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def loadBlob(self, oid, serial, version):
blob_filename = self._getCleanFilename(oid, serial)
if os.path.exists(blob_filename): # XXX see race condition below
return blob_filename
self._load_lock.acquire()
try:
if self._server is None:
raise ClientDisconnected()
tempfilename = self._getDirtyFilename(oid, serial)
tempfile = open(tempfilename, "wb")
offset = 0
while True:
chunk = self._server.loadBlob(oid, serial, version, offset)
if not chunk:
break
offset += len(chunk)
tempfile.write(chunk)
tempfile.close()
utils.best_rename(tempfilename, blob_filename)
return blob_filename
finally:
self._load_lock.release()
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
if txn is not self._transaction:
......
......@@ -211,6 +211,12 @@ class StorageServer:
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
def storeBlobEnd(self, oid, serial, data, version, id):
self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
def storeBlob(self, oid, serial, chunk, version, id):
self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
##
# Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The
......@@ -250,6 +256,9 @@ class StorageServer:
def load(self, oid, version):
return self.rpc.call('load', oid, version)
def loadBlob(self, oid, serial, version, offset):
return self.rpc.call('loadBlob', oid, serial, version, offset)
def getSerial(self, oid):
return self.rpc.call('getSerial', oid)
......
......@@ -42,7 +42,7 @@ from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
from ZODB.utils import u64, oid_repr
from ZODB.utils import u64, oid_repr, mktemp
from ZODB.loglevels import BLATHER
logger = logging.getLogger('ZEO.StorageServer')
......@@ -93,6 +93,9 @@ class ZEOStorage:
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_transfer = {}
self.blob_log = []
self.blob_loads = {}
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
......@@ -454,6 +457,49 @@ class ZEOStorage:
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
def storeBlobEnd(self, oid, serial, data, version, id):
key = (oid, id)
if key not in self.blob_transfer:
raise Exception, "Can't finish a non-started Blob"
tempname, tempfile = self.blob_transfer.pop(key)
tempfile.close()
self.blob_log.append((oid, serial, data, tempname, version))
def storeBlob(self, oid, serial, chunk, version, id):
# XXX check that underlying storage supports blobs
key = (oid, id)
if key not in self.blob_transfer:
tempname = mktemp()
tempfile = open(tempname, "wb")
self.blob_transfer[key] = (tempname, tempfile) # XXX Force close and remove them when Storage closes
else:
tempname, tempfile = self.blob_transfer[key]
tempfile.write(chunk)
def loadBlob(self, oid, serial, version, offset):
key = (oid, serial)
if not key in self.blob_loads:
self.blob_loads[key] = \
open(self.storage.loadBlob(oid, serial, version))
blobdata = self.blob_loads[key]
blobdata.seek(offset)
chunk = blobdata.read(4096)
if not chunk:
del self.blob_loads[key]
return chunk
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
......@@ -596,6 +642,13 @@ class ZEOStorage:
# load oid, serial, data, version
if not self._store(*loader.load()):
break
# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename, version = self.blob_log.pop()
self.storage.storeBlob(oid, oldserial, data, blobfilename,
version, self.transaction,)
resp = self._thunk()
if delay is not None:
delay.reply(resp)
......
......@@ -13,12 +13,18 @@
##############################################################################
import os
import shutil
import base64
from zope.interface import implements
from zope.proxy import ProxyBase, getProxiedObject
from ZODB import utils
from ZODB.Blobs.interfaces import IBlobStorage, IBlob
from ZODB.POSException import POSKeyError
BLOB_SUFFIX = ".blob"
BLOB_DIRTY = "store"
class BlobStorage(ProxyBase):
"""A storage to support blobs."""
......@@ -35,7 +41,7 @@ class BlobStorage(ProxyBase):
ProxyBase.__init__(self, storage)
self.base_directory = base_directory
self.dirty_oids = []
def storeBlob(self, oid, oldserial, data, blobfilename, version, transaction):
"""Stores data that has a BLOB attached."""
serial = self.store(oid, oldserial, data, version, transaction)
......@@ -44,6 +50,10 @@ class BlobStorage(ProxyBase):
self._lock_acquire()
try:
targetpath = self._getBlobPath(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self._getCleanFilename(oid, serial)
try:
os.rename(blobfilename, targetname)
......@@ -64,21 +74,24 @@ class BlobStorage(ProxyBase):
def _getDirtyFilename(self, oid):
"""Generate an intermediate filename for two-phase commit.
XXX Not used right now due to conceptual flux. Please keep it around
anyway.
"""
return self._getCleanFilename(oid, "store")
return self._getCleanFilename(oid, BLOB_DIRTY)
def _getBlobPath(self, oid):
return os.path.join(self.base_directory,
utils.oid_repr(oid)
)
def _getCleanFilename(self, oid, tid):
return "%s/%s-%s.blob" % \
(self.base_directory,
utils.oid_repr(oid),
utils.tid_repr(tid),
)
return os.path.join(self._getBlobPath(oid),
"%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def _finish(self, tid, u, d, e):
ProxyBase._finish(self, tid, u, d, e)
# Move dirty blobs if they are "really" dirty
self.dirty_blobs = []
def _abort(self):
......@@ -87,10 +100,118 @@ class BlobStorage(ProxyBase):
# Throw away the stuff we'd had committed
while self.dirty_blobs:
oid, serial = self.dirty_blobs.pop()
os.unlink(self._getCleanFilename(oid))
clean = self._getCleanFilename(oid, serial)
dirty = self._getDirtyFilename(oid, serial)
for filename in [clean, dirty]:
if os.exists(filename):
os.unlink(filename)
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
"""
return self._getCleanFilename(oid, serial)
def _getNewestBlobSerial(self, oid):
blob_path = self._getBlobPath(oid)
serials = os.listdir(blob_path)
serials = [ os.path.join(blob_path, serial) for serial in serials ]
serials.sort(lambda x,y: cmp(os.stat(x).st_mtime,
os.stat(y).st_mtime)
)
return self._splitBlobFilename(serials[-1])[1]
def pack(self, packtime, referencesf):
"""Remove all unused oid/tid combinations."""
getProxiedObject(self).pack(packtime, referencesf)
self._lock_acquire()
try:
# Walk over all existing files and check if they are still needed
for filename in os.listdir(self.base_directory):
oid = utils.repr_to_oid(filename)
serial = self._getNewestBlobSerial(oid)
file_path = os.path.join(self.base_directory, filename)
try:
self.loadSerial(oid, serial) # XXX Is that expensive?
except POSKeyError:
# The object doesn't exist anymore at all. We can remove
# everything belonging to that oid
shutil.rmtree(file_path)
else:
# The object still exists. We can remove everything but the
# last recent object before pack time.
serials = os.listdir(file_path)
recent_candidate = \
os.path.split(self._getCleanFilename(oid, serial))[1]
serials.remove(recent_candidate)
for serial_candidate in serials:
cfname = os.path.join(file_path, serial_candidate)
mtime = os.stat(cfname).st_mtime
if mtime < packtime:
os.unlink(cfname)
finally:
self._lock_release()
def getSize(self):
"""Return the size of the database in bytes."""
orig_size = getProxiedObject(self).getSize()
blob_size = 0
for oid in os.listdir(self.base_directory):
for serial in os.listdir(os.path.join(self.base_directory, oid)):
if not serial.endswith(BLOB_SUFFIX):
continue
file_path = os.path.join(self.base_directory, oid, serial)
blob_size += os.stat(file_path).st_size
return orig_size + blob_size
def _splitBlobFilename(self, filename):
"""Returns OID, TID for a given blob filename.
If it's not a blob filename, (None, None) is returned.
"""
if not filename.endswith(BLOB_SUFFIX):
return None, None
path, filename = os.path.split(filename)
oid = os.path.split(path)[1]
serial = filename[:-len(BLOB_SUFFIX)]
oid = utils.repr_to_oid(oid)
if serial != BLOB_DIRTY:
serial = utils.repr_to_oid(serial)
else:
serial = None
return oid, serial
def undo(self, serial_id, transaction):
serial, keys = getProxiedObject(self).undo(serial_id, transaction)
self._lock_acquire()
try:
# The old serial_id is given in base64 encoding ...
serial_id = base64.decodestring(serial_id+ '\n')
for oid in self._getOIDsForSerial(serial_id):
data, serial_before, serial_after = \
self.loadBefore(oid, serial_id)
orig = file(self._getCleanFilename(oid, serial_before), "r")
new = file(self._getCleanFilename(oid, serial), "w")
utils.cp(orig, new)
orig.close()
new.close()
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
return serial, keys
def _getOIDsForSerial(self, search_serial):
oids = []
for oidpath in os.listdir(self.base_directory):
for filename in os.listdir(os.path.join(self.base_directory,
oidpath)):
blob_path = os.path.join(self.base_directory, oidpath,
filename)
oid, serial = self._splitBlobFilename(blob_path)
if search_serial == serial:
oids.append(oid)
return oids
- Blob instances should clean up temporary files after committing
- Support database import/export
- Support ZEO
- Support selection of text/binary mode for blobs
- implement loadBlob and storeBlob
- loadBlob needs to handle the BLOB_CACHE_DIRECTORY
- storeBlob needs to hand the actual file data off to the server
......@@ -350,6 +350,7 @@ class Connection(ExportImport, object):
s = self._storage.storeBlob(oid, serial, p,
obj._p_blob_uncommitted,
self._version, transaction)
obj._p_invalidate()
else:
s = self._storage.store(oid, serial, p, self._version,
transaction)
......@@ -371,12 +372,6 @@ class Connection(ExportImport, object):
self._handle_serial(s, oid)
if IBlob.providedBy(obj):
# We need to update internals of the blobs here
obj._p_blob_uncommitted = None
obj._p_blob_data = \
self._storage.loadBlob(oid, obj._p_serial,
self._version )
def commit_sub(self, t):
"""Commit all changes made in subtransactions and begin 2-phase commit
......@@ -687,6 +682,7 @@ class Connection(ExportImport, object):
# Blob support
if IBlob.providedBy(obj):
obj._p_blob_uncommitted = None
obj._p_blob_data = \
self._storage.loadBlob(oid, serial, self._version)
......
......@@ -64,6 +64,12 @@
<sectiontype name="zeoclient" datatype=".ZEOClient"
implements="ZODB.storage">
<key name="blob-dir" required="no" default="/tmp">
<description>
Path name to the blob storage directory.
</description>
</key>
<multikey name="server" datatype="socket-address" required="yes"/>
<key name="storage" default="1">
<description>
......
......@@ -150,6 +150,7 @@ class ZEOClient(BaseConfig):
L = [server.address for server in self.config.server]
return ClientStorage(
L,
blob_dir=self.config.blob_dir,
storage=self.config.storage,
cache_size=self.config.cache_size,
name=self.config.name,
......
......@@ -386,7 +386,7 @@ class IStorage(Interface):
"""XXX"""
def getSize():
"""XXX"""
"""Return the size of the database in bytes."""
def history(oid, version, length=1, filter=None):
"""XXX"""
......
......@@ -16,7 +16,7 @@ import sys
import time
import struct
from struct import pack, unpack
from binascii import hexlify
from binascii import hexlify, unhexlify
import cPickle as pickle
from cStringIO import StringIO
import weakref
......@@ -88,7 +88,7 @@ def u64(v):
U64 = u64
def cp(f1, f2, length):
def cp(f1, f2, length=None):
"""Copy all data from one file to another.
It copies the data from the current position of the input file (f1)
......@@ -101,6 +101,12 @@ def cp(f1, f2, length):
write = f2.write
n = 8192
if length is None:
old_pos = f1.tell()
f1.seek(0,2)
length = f1.tell()
f1.seek(old_pos)
while length > 0:
if n > length:
n = length
......@@ -133,6 +139,13 @@ def oid_repr(oid):
else:
return repr(oid)
def repr_to_oid(repr):
if repr.startswith("0x"):
repr = repr[2:]
as_bin = unhexlify(repr)
as_bin = "\x00"*(8-len(as_bin)) + as_bin
return as_bin
serial_repr = oid_repr
tid_repr = serial_repr
......@@ -314,3 +327,20 @@ def mktemp():
handle, filename = mkstemp()
os.close(handle)
return filename
def best_rename(sourcename, targetname):
try:
os.rename(sourcename, targetname)
except OSError:
# XXX This creates a race condition for un-locked return above
source = open(sourcename, "rb")
target = open(targetname, "wb")
while True:
chunk = source.read(4096)
if not chunk:
break
target.write(chunk)
source.close()
target.close()
os.unlink(sourcename)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment