Commit f741a067 authored by Jim Fulton's avatar Jim Fulton

Merged Deiter's branch that provides object-size-based object cache

control.
parent 1cceda96
......@@ -8,6 +8,17 @@ Change History
New Features
------------
- The connection now estimates the object size based on its pickle size
and informs the cache about size changes.
The database got additional configurations options (`cache-size-bytes`
and `historical-cache-size-bytes`) to limit the
cache size based on the estimated total size of cached objects.
The default values are 0 which has the interpretation "do not limit
based on the total estimated size".
There are corresponding methods to read and set the new configuration
parameters.
- Cleaned-up the storage iteration API and provided an iterator implementation
for ZEO.
......
......@@ -79,7 +79,7 @@ class Connection(ExportImport, object):
##########################################################################
# Connection methods, ZODB.IConnection
def __init__(self, db, cache_size=400, before=None):
def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
"""Create a new Connection."""
self._log = logging.getLogger('ZODB.Connection')
......@@ -109,7 +109,7 @@ class Connection(ExportImport, object):
# Cache which can ghostify (forget the state of) objects not
# recently used. Its API is roughly that of a dict, with
# additional gc-related and invalidation-related methods.
self._cache = PickleCache(self, cache_size)
self._cache = PickleCache(self, cache_size, cache_size_bytes)
# The pre-cache is used by get to avoid infinite loops when
# objects immediately load their state whern they get their
......@@ -629,6 +629,10 @@ class Connection(ExportImport, object):
obj._p_invalidate()
else:
s = self._storage.store(oid, serial, p, '', transaction)
self._cache.update_object_size_estimation(oid,
len(p)
)
obj._p_estimated_size = len(p)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
......@@ -870,6 +874,10 @@ class Connection(ExportImport, object):
self._reader.setGhostState(obj, p)
obj._p_serial = serial
self._cache.update_object_size_estimation(obj._p_oid,
len(p)
)
obj._p_estimated_size = len(p)
# Blob support
if isinstance(obj, Blob):
......@@ -1034,7 +1042,8 @@ class Connection(ExportImport, object):
self._invalidated.clear()
self._invalidatedCache = False
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
cache_size_bytes = self._cache.cache_size_bytes
self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
##########################################################################
# Python protocol
......@@ -1117,6 +1126,12 @@ class Connection(ExportImport, object):
for oid in oids:
data, serial = src.load(oid, src)
obj = self._cache.get(oid, None)
if obj is not None:
self._cache.update_object_size_estimation(obj._p_oid,
len(data)
)
obj._p_estimated_size = len(data)
if isinstance(self._reader.getGhost(data), Blob):
blobfilename = src.loadBlob(oid, serial)
s = self._storage.storeBlob(oid, serial, data, blobfilename,
......
......@@ -398,8 +398,10 @@ class DB(object):
def __init__(self, storage,
pool_size=7,
cache_size=400,
cache_size_bytes=0,
historical_pool_size=3,
historical_cache_size=1000,
historical_cache_size_bytes=0,
historical_timeout=300,
database_name='unnamed',
databases=None,
......@@ -410,10 +412,15 @@ class DB(object):
- `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache
- `cache_size_bytes`: target size measured in total estimated size
of objects in the Connection object cache.
"0" means unlimited.
- `historical_pool_size`: expected maximum number of total
historical connections
- `historical_cache_size`: target size of Connection object cache for
historical (`at` or `before`) connections
- `historical_cache_size_bytes` -- similar to `cache_size_bytes` for
the historical connection.
- `historical_timeout`: minimum number of seconds that
an unused historical connection will be kept, or None.
"""
......@@ -427,7 +434,9 @@ class DB(object):
self.historical_pool = KeyedConnectionPool(historical_pool_size,
historical_timeout)
self._cache_size = cache_size
self._cache_size_bytes = cache_size_bytes
self._historical_cache_size = historical_cache_size
self._historical_cache_size_bytes = historical_cache_size_bytes
# Setup storage
self._storage=storage
......@@ -642,6 +651,9 @@ class DB(object):
def getCacheSize(self):
return self._cache_size
def getCacheSizeBytes(self):
return self._cache_size_bytes
def lastTransaction(self):
return self._storage.lastTransaction()
......@@ -657,6 +669,9 @@ class DB(object):
def getHistoricalCacheSize(self):
return self._historical_cache_size
def getHistoricalCacheSizeBytes(self):
return self._historical_cache_size_bytes
def getHistoricalPoolSize(self):
return self.historical_pool.size
......@@ -720,13 +735,21 @@ class DB(object):
if before is not None:
result = self.historical_pool.pop(before)
if result is None:
c = self.klass(self, self._historical_cache_size, before)
c = self.klass(self,
self._historical_cache_size,
before,
self._historical_cache_size_bytes,
)
self.historical_pool.push(c, before)
result = self.historical_pool.pop(before)
else:
result = self.pool.pop()
if result is None:
c = self.klass(self, self._cache_size)
c = self.klass(self,
self._cache_size,
None,
self._cache_size_bytes,
)
self.pool.push(c)
result = self.pool.pop()
assert result is not None
......@@ -813,6 +836,16 @@ class DB(object):
finally:
self._r()
def setCacheSizeBytes(self, size):
self._a()
try:
self._cache_size_bytes = size
def setsize(c):
c._cache.cache_size_bytes = size
self.pool.map(setsize)
finally:
self._r()
def setHistoricalCacheSize(self, size):
self._a()
try:
......@@ -823,6 +856,16 @@ class DB(object):
finally:
self._r()
def setHistoricalCacheSizeBytes(self, size):
self._a()
try:
self._historical_cache_size_bytes = size
def setsize(c):
c._cache.cache_size_bytes = size
self.historical_pool.map(setsize)
finally:
self._r()
def setPoolSize(self, size):
self._a()
try:
......
......@@ -181,6 +181,13 @@
Target size, in number of objects, of each connection's
object cache.
</description>
<key name="cache-size-bytes" datatype="byte-size" default="0">
<description>
Target size, in total estimated size for objects, of each connection's
object cache.
"0" means no limit.
</description>
</key>
<key name="pool-size" datatype="integer" default="7"/>
<description>
The expected maximum number of simultaneously open connections.
......@@ -200,6 +207,12 @@
Target size, in number of objects, of each historical connection's
object cache.
</description>
<key name="historical-cache-size-bytes" datatype="byte-size" default="0">
<description>
Target size, in total estimated size of objects, of each historical connection's
object cache.
</description>
</key>
<key name="historical-timeout" datatype="time-interval"
default="5m"/>
<description>
......
......@@ -95,14 +95,18 @@ class ZODBDatabase(BaseConfig):
section = self.config
storage = section.storage.open()
try:
return ZODB.DB(storage,
return ZODB.DB(
storage,
pool_size=section.pool_size,
cache_size=section.cache_size,
cache_size_bytes=section.cache_size_bytes,
historical_pool_size=section.historical_pool_size,
historical_cache_size=section.historical_cache_size,
historical_cache_size_bytes=section.historical_cache_size_bytes,
historical_timeout=section.historical_timeout,
database_name=section.database_name,
databases=databases)
databases=databases,
)
except:
storage.close()
raise
......
......@@ -512,6 +512,114 @@ def test_invalidateCache():
>>> db.close()
"""
class _PlayPersistent(Persistent):
def setValueWithSize(self, size=0): self.value = size*' '
__init__ = setValueWithSize
class EstimatedSizeTests(unittest.TestCase):
"""check that size estimations are handled correctly."""
def setUp(self):
self.db = db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
self.conn = c = db.open()
self.obj = obj = _PlayPersistent()
c.root()['obj'] = obj
transaction.commit()
def test_size_set_on_write_commit(self):
obj, cache = self.obj, self.conn._cache
# we have just written "obj". Its size should not be zero
size, cache_size = obj._p_estimated_size, cache.total_estimated_size
self.assert_(size > 0)
self.assert_(cache_size > size)
# increase the size, write again and check that the size changed
obj.setValueWithSize(1000)
transaction.commit()
new_size = obj._p_estimated_size
self.assert_(new_size > size)
self.assertEqual(cache.total_estimated_size, cache_size + new_size - size)
def test_size_set_on_write_savepoint(self):
obj, cache = self.obj, self.conn._cache
# we have just written "obj". Its size should not be zero
size, cache_size = obj._p_estimated_size, cache.total_estimated_size
# increase the size, write again and check that the size changed
obj.setValueWithSize(1000)
transaction.savepoint()
new_size = obj._p_estimated_size
self.assert_(new_size > size)
self.assertEqual(cache.total_estimated_size, cache_size + new_size - size)
def test_size_set_on_load(self):
c = self.db.open() # new connection
obj = c.root()['obj']
# the object is still a ghost and '_p_estimated_size' not yet set
# access to unghost
cache = c._cache
cache_size = cache.total_estimated_size
obj.value
size = obj._p_estimated_size
self.assert_(size > 0)
self.assertEqual(cache.total_estimated_size, cache_size + size)
# we test here as well that the deactivation works reduced the cache size
obj._p_deactivate()
self.assertEqual(cache.total_estimated_size, cache_size)
def test_configuration(self):
# verify defaults ....
expected = 0
# ... on db
db = self.db
self.assertEqual(db.getCacheSizeBytes(), expected)
self.assertEqual(db.getHistoricalCacheSizeBytes(), expected)
# ... on connection
conn = self.conn
self.assertEqual(conn._cache.cache_size_bytes, expected)
# verify explicit setting ...
expected = 10000
# ... on db
db = databaseFromString("<zodb>\n"
" cache-size-bytes %d\n"
" historical-cache-size-bytes %d\n"
" <mappingstorage />\n"
"</zodb>"
% (expected, expected+1)
)
self.assertEqual(db.getCacheSizeBytes(), expected)
self.assertEqual(db.getHistoricalCacheSizeBytes(), expected+1)
# ... on connectionB
conn = db.open()
self.assertEqual(conn._cache.cache_size_bytes, expected)
# test huge (larger than 4 byte) size limit
db = databaseFromString("<zodb>\n"
" cache-size-bytes 8GB\n"
" <mappingstorage />\n"
"</zodb>"
)
self.assertEqual(db.getCacheSizeBytes(), 0x1L << 33)
def test_cache_garbage_collection(self):
db = self.db
# activate size based cache garbage collection
db.setCacheSizeBytes(1)
conn = self.conn
cache = conn._cache
# verify the change worked as expected
self.assertEqual(cache.cache_size_bytes, 1)
# verify our entrance assumption is fullfilled
self.assert_(cache.total_estimated_size > 1)
conn.cacheGC()
self.assert_(cache.total_estimated_size <= 1)
# sanity check
self.assert_(cache.total_estimated_size >= 0)
# ---- stubs
class StubObject(Persistent):
......@@ -649,4 +757,5 @@ def test_suite():
s = unittest.makeSuite(ConnectionDotAdd, 'check')
s.addTest(doctest.DocTestSuite())
s.addTest(unittest.makeSuite(TestConnectionInterface))
s.addTest(unittest.makeSuite(EstimatedSizeTests))
return s
......@@ -89,6 +89,7 @@ unghostify(cPersistentObject *self)
if (self->cache) {
/* Create a node in the ring for this unghostified object. */
self->cache->non_ghost_count++;
self->cache->total_estimated_size += self->estimated_size;
ring_add(&self->cache->ring_home, &self->ring);
Py_INCREF(self);
}
......@@ -144,6 +145,7 @@ unlink_from_ring(cPersistentObject *self)
/* if we're ghostifying an object, we better have some non-ghosts */
assert(self->cache->non_ghost_count > 0);
self->cache->non_ghost_count--;
self->cache->total_estimated_size -= self->estimated_size;
ring_del(&self->ring);
}
......@@ -174,6 +176,7 @@ ghostify(cPersistentObject *self)
/* If we're ghostifying an object, we better have some non-ghosts. */
assert(self->cache->non_ghost_count > 0);
self->cache->non_ghost_count--;
self->cache->total_estimated_size -= self->estimated_size;
ring_del(&self->ring);
self->state = cPersistent_GHOST_STATE;
dictptr = _PyObject_GetDictPtr((PyObject *)self);
......@@ -1011,6 +1014,34 @@ Per_get_state(cPersistentObject *self)
return PyInt_FromLong(self->state);
}
static PyObject *
Per_get_estimated_size(cPersistentObject *self)
{
return PyInt_FromLong(self->estimated_size);
}
static int
Per_set_estimated_size(cPersistentObject *self, PyObject *v)
{
if (v) {
if (PyInt_Check(v)) {
if (PyInt_AS_LONG(v) < 0) {
PyErr_SetString(PyExc_ValueError,
"_p_estimated_size must not be negative");
return -1;
}
self->estimated_size = PyInt_AS_LONG(v);
}
else {
PyErr_SetString(PyExc_ValueError,
"_p_estimated_size must be an integer");
return -1;
}
} else
self->estimated_size = 0;
return 0;
}
static PyGetSetDef Per_getsets[] = {
{"_p_changed", (getter)Per_get_changed, (setter)Per_set_changed},
{"_p_jar", (getter)Per_get_jar, (setter)Per_set_jar},
......@@ -1018,6 +1049,9 @@ static PyGetSetDef Per_getsets[] = {
{"_p_oid", (getter)Per_get_oid, (setter)Per_set_oid},
{"_p_serial", (getter)Per_get_serial, (setter)Per_set_serial},
{"_p_state", (getter)Per_get_state},
{"_p_estimated_size",
(getter)Per_get_estimated_size, (setter)Per_set_estimated_size
},
{NULL}
};
......
......@@ -23,7 +23,8 @@
#define CACHE_HEAD \
PyObject_HEAD \
CPersistentRing ring_home; \
int non_ghost_count;
int non_ghost_count; \
PY_LONG_LONG total_estimated_size; /* total estimated size of items in cache */
struct ccobject_head_struct;
......@@ -38,13 +39,14 @@ typedef struct ccobject_head_struct PerCache;
8 ring struct
8 serialno
4 state + extra
4 size info
(52) so far
(56) so far
4 dict ptr
4 weaklist ptr
-------------------------
64 only need 62, but obmalloc rounds up to multiple of eight
68 only need 62, but obmalloc rounds up to multiple of eight
Even a ghost requires 64 bytes. It's possible to make a persistent
instance with slots and no dict, which changes the storage needed.
......@@ -59,7 +61,8 @@ typedef struct ccobject_head_struct PerCache;
CPersistentRing ring; \
char serial[8]; \
signed char state; \
unsigned char reserved[3];
unsigned char reserved[3]; \
unsigned long estimated_size;
#define cPersistent_GHOST_STATE -1
#define cPersistent_UPTODATE_STATE 0
......
......@@ -116,6 +116,7 @@ typedef struct {
PyObject *data; /* oid -> object dict */
PyObject *jar; /* Connection object */
int cache_size; /* target number of items in cache */
PY_LONG_LONG cache_size_bytes; /* target total estimated size of items in cache */
/* Most of the time the ring contains only:
* many nodes corresponding to persistent objects
......@@ -167,7 +168,7 @@ unlink_from_ring(CPersistentRing *self)
}
static int
scan_gc_items(ccobject *self, int target)
scan_gc_items(ccobject *self, int target, PY_LONG_LONG target_bytes)
{
/* This function must only be called with the ring lock held,
because it places non-object placeholders in the ring.
......@@ -189,7 +190,11 @@ scan_gc_items(ccobject *self, int target)
*/
insert_after(&before_original_home, self->ring_home.r_prev);
here = self->ring_home.r_next; /* least recently used object */
while (here != &before_original_home && self->non_ghost_count > target) {
while (here != &before_original_home &&
(self->non_ghost_count > target
|| (target_bytes && self->total_estimated_size > target_bytes)
)
) {
assert(self->ring_lock);
assert(here != &self->ring_home);
......@@ -244,7 +249,7 @@ scan_gc_items(ccobject *self, int target)
}
static PyObject *
lockgc(ccobject *self, int target_size)
lockgc(ccobject *self, int target_size, PY_LONG_LONG target_size_bytes)
{
/* This is thread-safe because of the GIL, and there's nothing
* in between checking the ring_lock and acquiring it that calls back
......@@ -256,7 +261,7 @@ lockgc(ccobject *self, int target_size)
}
self->ring_lock = 1;
if (scan_gc_items(self, target_size) < 0) {
if (scan_gc_items(self, target_size, target_size_bytes) < 0) {
self->ring_lock = 0;
return NULL;
}
......@@ -272,6 +277,7 @@ cc_incrgc(ccobject *self, PyObject *args)
int obsolete_arg = -999;
int starting_size = self->non_ghost_count;
int target_size = self->cache_size;
PY_LONG_LONG target_size_bytes = self->cache_size_bytes;
if (self->cache_drain_resistance >= 1) {
/* This cache will gradually drain down to a small size. Check
......@@ -294,7 +300,7 @@ cc_incrgc(ccobject *self, PyObject *args)
< 0))
return NULL;
return lockgc(self, target_size);
return lockgc(self, target_size, target_size_bytes);
}
static PyObject *
......@@ -307,7 +313,7 @@ cc_full_sweep(ccobject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "|i:full_sweep", &dt))
return NULL;
if (dt == -999)
return lockgc(self, 0);
return lockgc(self, 0, 0);
else
return cc_incrgc(self, args);
}
......@@ -327,7 +333,7 @@ cc_minimize(ccobject *self, PyObject *args)
< 0))
return NULL;
return lockgc(self, 0);
return lockgc(self, 0, 0);
}
static int
......@@ -629,6 +635,32 @@ cc_ringlen(ccobject *self)
return PyInt_FromLong(c);
}
static PyObject *
cc_update_object_size_estimation(ccobject *self, PyObject *args)
{
PyObject *oid;
cPersistentObject *v;
unsigned int new_size;
if (!PyArg_ParseTuple(args, "OI:updateObjectSizeEstimation", &oid, &new_size))
return NULL;
/* Note: reference borrowed */
v = (cPersistentObject *)PyDict_GetItem(self->data, oid);
if (v) {
/* we know this object -- update our "total_size_estimation"
we must only update when the object is in the ring
*/
if (v->ring.r_next) {
self->total_estimated_size += new_size - v->estimated_size;
/* we do this in "Connection" as we need it even when the
object is not in the cache (or not the ring)
*/
/* v->estimated_size = new_size; */
}
}
Py_RETURN_NONE;
}
static struct PyMethodDef cc_methods[] = {
{"items", (PyCFunction)cc_items, METH_NOARGS,
"Return list of oid, object pairs for all items in cache."},
......@@ -655,6 +687,10 @@ static struct PyMethodDef cc_methods[] = {
"ringlen() -- Returns number of non-ghost items in cache."},
{"debug_info", (PyCFunction)cc_debug_info, METH_NOARGS,
"debug_info() -- Returns debugging data about objects in the cache."},
{"update_object_size_estimation",
(PyCFunction)cc_update_object_size_estimation,
METH_VARARGS,
"update_object_size_estimation(oid, new_size) -- update the caches size estimation for *oid* (if this is known to the cache)."},
{NULL, NULL} /* sentinel */
};
......@@ -662,9 +698,10 @@ static int
cc_init(ccobject *self, PyObject *args, PyObject *kwds)
{
int cache_size = 100;
PY_LONG_LONG cache_size_bytes = 0;
PyObject *jar;
if (!PyArg_ParseTuple(args, "O|i", &jar, &cache_size))
if (!PyArg_ParseTuple(args, "O|iL", &jar, &cache_size, &cache_size_bytes))
return -1;
self->jar = NULL;
......@@ -687,7 +724,9 @@ cc_init(ccobject *self, PyObject *args, PyObject *kwds)
self->jar = jar;
Py_INCREF(jar);
self->cache_size = cache_size;
self->cache_size_bytes = cache_size_bytes;
self->non_ghost_count = 0;
self->total_estimated_size = 0;
self->klass_count = 0;
self->cache_drain_resistance = 0;
self->ring_lock = 0;
......@@ -1018,6 +1057,8 @@ static PyGetSetDef cc_getsets[] = {
static PyMemberDef cc_members[] = {
{"cache_size", T_INT, offsetof(ccobject, cache_size)},
{"cache_size_bytes", T_LONG, offsetof(ccobject, cache_size_bytes)},
{"total_estimated_size", T_LONG, offsetof(ccobject, total_estimated_size), RO},
{"cache_drain_resistance", T_INT,
offsetof(ccobject, cache_drain_resistance)},
{"cache_non_ghost_count", T_INT, offsetof(ccobject, non_ghost_count), RO},
......
......@@ -85,6 +85,24 @@ Try all sorts of different ways to change the object's state.
>>> p.x
2
We can store a size estimation in ``_p_estimated_size``. Its default is 0.
The size estimation can be used by a cache associated with the data manager
to help in the implementation of its replacement strategy or its size bounds.
Of course, the estimated size must not be negative.
>>> p._p_estimated_size
0
>>> p._p_estimated_size = 1000
>>> p._p_estimated_size
1000
>>> p._p_estimated_size = -1
Traceback (most recent call last):
....
ValueError: _p_estimated_size must not be negative
Test Persistent with Data Manager
---------------------------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment