Commit e4f4f6b6 authored by Jim Fulton's avatar Jim Fulton

Reimplemented the ZEO Blob protocol:

- Avoid more than one round-trip call when loading blobs via copy from
  the server.

- Avoid loading large amounts of blob data into memory.  The old
  storeBlob implementation was likely to queue blob adta faster than
  it could be sent, leading to a large memory foot print for the
  queue. Now, iterators are used to read data from files only when the
  network layer is ready to send it.

- Fixed storeBlob to move the input file to the blob cache (when not
  sharing the blob directiry with the server).

- Extended the loadBlob locking model to work with multiple processes
  by using file locks rather than threading locks.  A common
  configuration is to use a client process per core, so that a machine
  is likely to have many client processes and it should be possible
  for the client processes to share a common blob cache.
parent 61aeabc6
......@@ -21,6 +21,7 @@ ClientStorage -- the main class, implementing the Storage API
import cPickle
import os
import socket
import sys
import tempfile
import threading
import time
......@@ -35,6 +36,7 @@ from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
import ZODB.lock_file
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
......@@ -329,10 +331,6 @@ class ClientStorage(object):
else:
self.fshelper = None
# Initialize locks
self.blob_status_lock = threading.Lock()
self.blob_status = {}
# Decide whether to use non-temporary files
if client is not None:
dir = var or os.getcwd()
......@@ -896,9 +894,13 @@ class ClientStorage(object):
"""Storage API: store a blob object."""
serials = self.store(oid, serial, data, version, txn)
if self.blob_cache_writable:
self._storeBlob_shared(oid, serial, data, blobfilename, version, txn)
self._storeBlob_shared(
oid, serial, data, blobfilename, version, txn)
else:
self._storeBlob_copy(oid, serial, data, blobfilename, version, txn)
self._server.storeBlob(
oid, serial, data, blobfilename, version, txn)
if blobfilename is not None:
self._tbuf.storeBlob(oid, blobfilename)
return serials
def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
......@@ -908,120 +910,144 @@ class ClientStorage(object):
os.mkdir(dir)
fd, target = self.fshelper.blob_mkstemp(oid, serial)
os.close(fd)
os.rename(filename, target)
if sys.platform == 'win32':
# On windows, we can't rename to an existing file. That's
# OK. We don't care what file we get as long as it is
# unique. We'll just keep trying until the rename suceeds.
os.remove(target)
i = 0
while 1:
try:
os.rename(filename, target + str(i))
except OSError:
i += 1
else:
break
target += str(i)
else:
os.rename(filename, target)
# Now tell the server where we put it
self._server.storeBlobShared(oid, serial, data,
os.path.basename(target), version, id(txn))
def _storeBlob_copy(self, oid, serial, data, blobfilename, version, txn):
"""Version of storeBlob() that copies the data over the ZEO protocol."""
blobfile = open(blobfilename, "rb")
while True:
chunk = blobfile.read(1<<16)
# even if the blobfile is completely empty, we need to call
# storeBlob at least once in order to be able to call
# storeBlobEnd successfully.
self._server.storeBlob(oid, serial, chunk, version, id(txn))
if not chunk:
self._server.storeBlobEnd(oid, serial, data, version, id(txn))
break
blobfile.close()
os.unlink(blobfilename)
self._server.storeBlobShared(
oid, serial, data,
os.path.basename(target), version, id(txn))
def _do_load_blob(self, oid, serial, version):
"""Do the actual loading from the RPC server."""
blob_filename = self.fshelper.getBlobFilename(oid, serial)
if self._server is None:
raise ClientDisconnected()
def _have_blob(self, blob_filename, oid, serial):
if os.path.exists(blob_filename):
log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
utils.tid_repr(serial)), level=BLATHER)
return True
return False
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
def recieveBlobStart(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
assert not os.path.exists(blob_filename)
assert os.path.exists(blob_filename+'.lock')
blob_filename += '.dl'
assert not os.path.exists(blob_filename)
f = open(blob_filename, 'wb')
f.close()
# We write to a temporary file first, so we do not accidentally
# allow half-baked copies of this blob be loaded
tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
tempfile = os.fdopen(tempfd, 'wb')
def recieveBlobChunk(self, oid, serial, chunk):
blob_filename = self.fshelper.getBlobFilename(oid, serial)+'.dl'
assert os.path.exists(blob_filename)
f = open(blob_filename, 'ab')
f.write(chunk)
f.close()
def recieveBlobStop(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
os.rename(blob_filename+'.dl', blob_filename)
def loadBlob(self, oid, serial):
offset = 0
while True:
chunk = self._server.loadBlob(oid, serial, version, offset)
if not chunk:
break
offset += len(chunk)
tempfile.write(chunk)
tempfile.close()
# XXX will fail on Windows if file is open
os.rename(tempfilename, blob_filename)
return blob_filename
def loadBlob(self, oid, serial, version):
"""Loading a blob has to know about loading the same blob
from another thread as the same time.
1. Check if the blob is downloaded already
2. Check whether it is currently beeing downloaded
2a. Wait for other download to finish, return
3. If not beeing downloaded, start download
"""
# Load a blob. If it isn't present and we have a shared blob
# directory, then assume that it doesn't exist on the server
# and return None.
if self.fshelper is None:
raise POSException.Unsupported("No blob cache directory is "
"configured.")
blob_filename = self.fshelper.getBlobFilename(oid, serial)
# Case 1: Blob is available already, just use it
if os.path.exists(blob_filename):
log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
utils.tid_repr(serial)), level=BLATHER)
if self._have_blob(blob_filename, oid, serial):
return blob_filename
# Case 2,3: Blob might still be downloading or not there yet
if self.blob_cache_writable:
# We're using a server shared cache. If the file isn't
# here, it's not anywahere.
return None
# Try to get or create a lock for the downloading of this blob,
# identified by it's oid and serial
lock_key = (oid, serial)
# We need to make the check for an existing lock and the possible
# creation of a new one atomic, so there is another lock:
self.blob_status_lock.acquire()
# First, we'll create the directory for this oid, if it doesn't exist.
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
try:
os.makedirs(targetpath, 0700)
except OSError:
# We might have lost a race. If so, the directory
# must exist now
assert os.path.exists(targetpath)
# OK, it's not here and we (or someone) needs to get it. We
# want to avoid getting it multiple times. We want to avoid
# getting it multiple times even accross separate client
# processes on the same machine. We'll use file locking.
lockfilename = blob_filename+'.lock'
try:
if not self.blob_status.has_key(oid):
self.blob_status[lock_key] = self.getBlobLock()
lock = self.blob_status[lock_key]
finally:
self.blob_status_lock.release()
lock = ZODB.lock_file.LockFile(lockfilename)
except ZODB.lock_file.LockError:
# Someone is already downloading the Blob. Wait for the
# lock to be freed. How long should we be willing to wait?
# TODO: maybe find some way to assess download progress.
while 1:
time.sleep(0.1)
try:
lock = ZODB.lock_file.LockFile(lockfilename)
except ZODB.lock_file.LockError:
pass
else:
# We have the lock. We should be able to get the file now.
lock.close()
try:
os.remove(lockfilename)
except OSError:
pass
break
if self._have_blob(blob_filename, oid, serial):
return blob_filename
return None
# We acquire the lock to either start downloading, or wait
# for another download to finish
lock.acquire()
try:
# If there was another download that is finished by now,
# we just take the result.
if os.path.exists(blob_filename):
log2("Found blob %s/%s in cache after it was downloaded "
"from another thread." % (utils.oid_repr(oid),
utils.tid_repr(serial)), level=BLATHER)
# We got the lock, so it's our job to download it. First,
# we'll double check that someone didn't download it while we
# were getting the lock:
if self._have_blob(blob_filename, oid, serial):
return blob_filename
# Otherwise we download and use that
return self._do_load_blob(oid, serial, version)
finally:
# When done we remove the download lock ...
lock.release()
# Ask the server to send it to us. When this function
# returns, it will have been sent. (The recieving will
# have been handled by the asyncore thread.)
self._server.sendBlob(oid, serial)
# And the status information isn't needed as well,
# but we have to use the second lock here as well, to avoid
# making the creation of this status lock non-atomic (see above)
self.blob_status_lock.acquire()
try:
del self.blob_status[lock_key]
finally:
self.blob_status_lock.release()
if self._have_blob(blob_filename, oid, serial):
return blob_filename
def getBlobLock(self):
# indirection to support unit testing
return threading.Lock()
return None
finally:
lock.close()
try:
os.remove(lockfilename)
except OSError:
pass
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
......@@ -1144,6 +1170,20 @@ class ClientStorage(object):
if s != ResolvedSerial:
assert s == tid, (s, tid)
self._cache.store(oid, version, s, None, data)
if self.fshelper is not None:
blobs = self._tbuf.blobs
while blobs:
oid, blobfilename = blobs.pop()
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
os.rename(blobfilename,
self.fshelper.getBlobFilename(oid, tid),
)
self._tbuf.clear()
def undo(self, trans_id, txn):
......
......@@ -60,3 +60,18 @@ class ClientStorage:
def info(self, arg):
self.rpc.callAsync('info', arg)
def storeBlob(self, oid, serial, blobfilename):
def store():
yield ('recieveBlobStart', (oid, serial))
f = open(blobfilename, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
break
yield ('recieveBlobChunk', (oid, serial, chunk, ))
f.close()
yield ('recieveBlobStop', (oid, serial))
self.rpc.callAsyncIterator(store())
......@@ -13,6 +13,7 @@
##############################################################################
"""RPC stubs for interface exported by StorageServer."""
import os
import time
##
......@@ -219,11 +220,29 @@ class StorageServer:
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
def storeBlobEnd(self, oid, serial, data, version, id):
self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
def storeBlob(self, oid, serial, chunk, version, id):
self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
# Store a blob to the server. We don't want to real all of
# the data into memory, so we use a message iterator. This
# allows us to read the blob data as needed.
if blobfilename is None:
self.rpc.callAsync('storeEmptyBlob',
oid, serial, data, version, id(txn))
return
def store():
yield ('storeBlobStart', ())
f = open(blobfilename, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
break
yield ('storeBlobChunk', (chunk, ))
f.close()
yield ('storeBlobEnd', (oid, serial, data, version, id(txn)))
self.rpc.callAsyncIterator(store())
def storeBlobShared(self, oid, serial, data, filename, version, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
......@@ -271,8 +290,8 @@ class StorageServer:
def load(self, oid, version):
return self.rpc.call('load', oid, version)
def loadBlob(self, oid, serial, version, offset):
return self.rpc.call('loadBlob', oid, serial, version, offset)
def sendBlob(self, oid, serial):
return self.rpc.call('sendBlob', oid, serial)
def getTid(self, oid):
return self.rpc.call('getTid', oid)
......
......@@ -25,6 +25,7 @@ import cPickle
import logging
import os
import sys
import tempfile
import threading
import time
import warnings
......@@ -103,9 +104,8 @@ class ZEOStorage:
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_transfer = {}
self.blob_tempfile = None
self.blob_log = []
self.blob_loads = {}
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
......@@ -525,25 +525,22 @@ class ZEOStorage:
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
def storeBlobStart(self):
assert self.blob_tempfile is None
self.blob_tempfile = tempfile.mkstemp(
dir=self.storage.temporaryDirectory())
def storeBlobChunk(self, chunk):
os.write(self.blob_tempfile[0], chunk)
def storeBlobEnd(self, oid, serial, data, version, id):
key = (oid, id)
if key not in self.blob_transfer:
raise Exception, "Can't finish a non-started Blob"
tempname, tempfile = self.blob_transfer.pop(key)
tempfile.close()
fd, tempname = self.blob_tempfile
self.blob_tempfile = None
os.close(fd)
self.blob_log.append((oid, serial, data, tempname, version))
def storeBlob(self, oid, serial, chunk, version, id):
# XXX check that underlying storage supports blobs
key = (oid, id)
if key not in self.blob_transfer:
tempname = mktemp()
tempfile = open(tempname, "wb")
# XXX Force close and remove them when Storage closes
self.blob_transfer[key] = (tempname, tempfile)
else:
tempname, tempfile = self.blob_transfer[key]
tempfile.write(chunk)
def storeEmptyBlob(self, oid, serial, data, version, id):
self.blob_log.append((oid, serial, data, None, version))
def storeBlobShared(self, oid, serial, data, filename, version, id):
# Reconstruct the full path from the filename in the OID directory
......@@ -551,17 +548,8 @@ class ZEOStorage:
filename)
self.blob_log.append((oid, serial, data, filename, version))
def loadBlob(self, oid, serial, version, offset):
key = (oid, serial)
if not key in self.blob_loads:
self.blob_loads[key] = \
open(self.storage.loadBlob(oid, serial, version))
blobdata = self.blob_loads[key]
blobdata.seek(offset)
chunk = blobdata.read(4096)
if not chunk:
del self.blob_loads[key]
return chunk
def sendBlob(self, oid, serial):
self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
......
......@@ -59,12 +59,14 @@ class TransactionBuffer:
self.closed = 0
self.count = 0
self.size = 0
self.blobs = []
# It's safe to use a fast pickler because the only objects
# stored are builtin types -- strings or None.
self.pickler = cPickle.Pickler(self.file, 1)
self.pickler.fast = 1
def close(self):
self.clear()
self.lock.acquire()
try:
self.closed = 1
......@@ -82,6 +84,9 @@ class TransactionBuffer:
finally:
self.lock.release()
def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))
def _store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
if self.closed:
......@@ -113,6 +118,10 @@ class TransactionBuffer:
self.file.seek(0)
self.count = 0
self.size = 0
while self.blobs:
oid, serial, blobfilename = self.blobs.pop()
if os.path.exists(blobfilename):
os.remove(blobfilename)
finally:
self.lock.release()
......
......@@ -22,6 +22,7 @@ import random
import signal
import socket
import tempfile
import threading
import time
import unittest
import shutil
......@@ -520,92 +521,100 @@ class CommonBlobTests:
self._storage.tpc_abort(t)
raise
filename = self._storage.loadBlob(oid, serial, version)
filename = self._storage.loadBlob(oid, serial)
self.assertEquals(somedata, open(filename, 'rb').read())
class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
self.blobdir = tempfile.mkdtemp() # blob directory on ZEO server
self.filestorage = tempfile.mktemp()
super(BlobAdaptedFileStorageTests, self).setUp()
def checkLoadBlobLocks(self):
def checkStoreAndLoadBlob(self):
from ZODB.utils import oid_repr, tid_repr
from ZODB.Blobs.Blob import Blob
from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials
import transaction
version = ''
somedata = 'a' * 10
somedata_path = os.path.join(self.blob_cache_dir, 'somedata')
somedata = open(somedata_path, 'w+b')
for i in range(1000000):
somedata.write("%s\n" % i)
somedata.seek(0)
blob = Blob()
bd_fh = blob.open('w')
bd_fh.write(somedata)
ZODB.utils.cp(somedata, bd_fh)
bd_fh.close()
tfname = bd_fh.name
oid = self._storage.new_oid()
data = zodb_pickle(blob)
self.assert_(os.path.exists(tfname))
t = transaction.Transaction()
try:
self._storage.tpc_begin(t)
r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
r2 = self._storage.tpc_vote(t)
serial = handle_serials(oid, r1, r2)
revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
# The uncommitted data file should have been removed
self.assert_(not os.path.exists(tfname))
class Dummy:
def __init__(self):
self.acquired = 0
self.released = 0
def acquire(self):
self.acquired += 1
def release(self):
self.released += 1
class statusdict(dict):
def __init__(self):
self.added = []
self.removed = []
def check_data(path):
self.assert_(os.path.exists(path))
f = open(path, 'rb')
somedata.seek(0)
d1 = d2 = 1
while d1 or d2:
d1 = f.read(8096)
d2 = somedata.read(8096)
self.assertEqual(d1, d2)
def __setitem__(self, k, v):
self.added.append(k)
super(statusdict, self).__setitem__(k, v)
def __delitem__(self, k):
self.removed.append(k)
super(statusdict, self).__delitem__(k)
# ensure that we do locking properly
filename = self._storage.fshelper.getBlobFilename(oid, serial)
thestatuslock = self._storage.blob_status_lock = Dummy()
thebloblock = Dummy()
def getBlobLock():
return thebloblock
# override getBlobLock to test that locking is performed
self._storage.getBlobLock = getBlobLock
thestatusdict = self._storage.blob_status = statusdict()
filename = self._storage.loadBlob(oid, serial, version)
self.assertEqual(thestatuslock.acquired, 2)
self.assertEqual(thestatuslock.released, 2)
self.assertEqual(thebloblock.acquired, 1)
self.assertEqual(thebloblock.released, 1)
self.assertEqual(thestatusdict.added, [(oid, serial)])
self.assertEqual(thestatusdict.removed, [(oid, serial)])
# The file should have been copied to the server:
filename = os.path.join(self.blobdir, oid_repr(oid),
tid_repr(revid) + BLOB_SUFFIX)
check_data(filename)
# It should also be in the cache:
filename = os.path.join(self.blob_cache_dir, oid_repr(oid),
tid_repr(revid) + BLOB_SUFFIX)
check_data(filename)
# If we remove it from the cache and call loadBlob, it should
# come back. We can do this in many threads. We'll instrument
# the method that is used to request data from teh server to
# verify that it is only called once.
sendBlob_org = ZEO.ServerStub.StorageServer.sendBlob
calls = []
def sendBlob(self, oid, serial):
calls.append((oid, serial))
sendBlob_org(self, oid, serial)
os.remove(filename)
returns = []
threads = [
threading.Thread(
target=lambda :
returns.append(self._storage.loadBlob(oid, revid))
)
for i in range(10)
]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
[self.assertEqual(r, filename) for r in returns]
check_data(filename)
class BlobWritableCacheTests(GenericTests, CommonBlobTests):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment