Commit 807e2cca authored by Jim Fulton's avatar Jim Fulton

Merged Deiter's branch that provides object-size-based object cache

control.
parent a8ab748f
...@@ -8,6 +8,17 @@ Change History ...@@ -8,6 +8,17 @@ Change History
New Features New Features
------------ ------------
- The connection now estimates the object size based on its pickle size
and informs the cache about size changes.
The database got additional configurations options (`cache-size-bytes`
and `historical-cache-size-bytes`) to limit the
cache size based on the estimated total size of cached objects.
The default values are 0 which has the interpretation "do not limit
based on the total estimated size".
There are corresponding methods to read and set the new configuration
parameters.
- Cleaned-up the storage iteration API and provided an iterator implementation - Cleaned-up the storage iteration API and provided an iterator implementation
for ZEO. for ZEO.
......
...@@ -79,7 +79,7 @@ class Connection(ExportImport, object): ...@@ -79,7 +79,7 @@ class Connection(ExportImport, object):
########################################################################## ##########################################################################
# Connection methods, ZODB.IConnection # Connection methods, ZODB.IConnection
def __init__(self, db, cache_size=400, before=None): def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
"""Create a new Connection.""" """Create a new Connection."""
self._log = logging.getLogger('ZODB.Connection') self._log = logging.getLogger('ZODB.Connection')
...@@ -109,7 +109,7 @@ class Connection(ExportImport, object): ...@@ -109,7 +109,7 @@ class Connection(ExportImport, object):
# Cache which can ghostify (forget the state of) objects not # Cache which can ghostify (forget the state of) objects not
# recently used. Its API is roughly that of a dict, with # recently used. Its API is roughly that of a dict, with
# additional gc-related and invalidation-related methods. # additional gc-related and invalidation-related methods.
self._cache = PickleCache(self, cache_size) self._cache = PickleCache(self, cache_size, cache_size_bytes)
# The pre-cache is used by get to avoid infinite loops when # The pre-cache is used by get to avoid infinite loops when
# objects immediately load their state whern they get their # objects immediately load their state whern they get their
...@@ -629,6 +629,10 @@ class Connection(ExportImport, object): ...@@ -629,6 +629,10 @@ class Connection(ExportImport, object):
obj._p_invalidate() obj._p_invalidate()
else: else:
s = self._storage.store(oid, serial, p, '', transaction) s = self._storage.store(oid, serial, p, '', transaction)
self._cache.update_object_size_estimation(oid,
len(p)
)
obj._p_estimated_size = len(p)
self._store_count += 1 self._store_count += 1
# Put the object in the cache before handling the # Put the object in the cache before handling the
# response, just in case the response contains the # response, just in case the response contains the
...@@ -870,6 +874,10 @@ class Connection(ExportImport, object): ...@@ -870,6 +874,10 @@ class Connection(ExportImport, object):
self._reader.setGhostState(obj, p) self._reader.setGhostState(obj, p)
obj._p_serial = serial obj._p_serial = serial
self._cache.update_object_size_estimation(obj._p_oid,
len(p)
)
obj._p_estimated_size = len(p)
# Blob support # Blob support
if isinstance(obj, Blob): if isinstance(obj, Blob):
...@@ -1034,7 +1042,8 @@ class Connection(ExportImport, object): ...@@ -1034,7 +1042,8 @@ class Connection(ExportImport, object):
self._invalidated.clear() self._invalidated.clear()
self._invalidatedCache = False self._invalidatedCache = False
cache_size = self._cache.cache_size cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size) cache_size_bytes = self._cache.cache_size_bytes
self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
########################################################################## ##########################################################################
# Python protocol # Python protocol
...@@ -1117,6 +1126,12 @@ class Connection(ExportImport, object): ...@@ -1117,6 +1126,12 @@ class Connection(ExportImport, object):
for oid in oids: for oid in oids:
data, serial = src.load(oid, src) data, serial = src.load(oid, src)
obj = self._cache.get(oid, None)
if obj is not None:
self._cache.update_object_size_estimation(obj._p_oid,
len(data)
)
obj._p_estimated_size = len(data)
if isinstance(self._reader.getGhost(data), Blob): if isinstance(self._reader.getGhost(data), Blob):
blobfilename = src.loadBlob(oid, serial) blobfilename = src.loadBlob(oid, serial)
s = self._storage.storeBlob(oid, serial, data, blobfilename, s = self._storage.storeBlob(oid, serial, data, blobfilename,
......
...@@ -398,8 +398,10 @@ class DB(object): ...@@ -398,8 +398,10 @@ class DB(object):
def __init__(self, storage, def __init__(self, storage,
pool_size=7, pool_size=7,
cache_size=400, cache_size=400,
cache_size_bytes=0,
historical_pool_size=3, historical_pool_size=3,
historical_cache_size=1000, historical_cache_size=1000,
historical_cache_size_bytes=0,
historical_timeout=300, historical_timeout=300,
database_name='unnamed', database_name='unnamed',
databases=None, databases=None,
...@@ -410,10 +412,15 @@ class DB(object): ...@@ -410,10 +412,15 @@ class DB(object):
- `storage`: the storage used by the database, e.g. FileStorage - `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: expected maximum number of open connections - `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache - `cache_size`: target size of Connection object cache
- `cache_size_bytes`: target size measured in total estimated size
of objects in the Connection object cache.
"0" means unlimited.
- `historical_pool_size`: expected maximum number of total - `historical_pool_size`: expected maximum number of total
historical connections historical connections
- `historical_cache_size`: target size of Connection object cache for - `historical_cache_size`: target size of Connection object cache for
historical (`at` or `before`) connections historical (`at` or `before`) connections
- `historical_cache_size_bytes` -- similar to `cache_size_bytes` for
the historical connection.
- `historical_timeout`: minimum number of seconds that - `historical_timeout`: minimum number of seconds that
an unused historical connection will be kept, or None. an unused historical connection will be kept, or None.
""" """
...@@ -427,7 +434,9 @@ class DB(object): ...@@ -427,7 +434,9 @@ class DB(object):
self.historical_pool = KeyedConnectionPool(historical_pool_size, self.historical_pool = KeyedConnectionPool(historical_pool_size,
historical_timeout) historical_timeout)
self._cache_size = cache_size self._cache_size = cache_size
self._cache_size_bytes = cache_size_bytes
self._historical_cache_size = historical_cache_size self._historical_cache_size = historical_cache_size
self._historical_cache_size_bytes = historical_cache_size_bytes
# Setup storage # Setup storage
self._storage=storage self._storage=storage
...@@ -642,6 +651,9 @@ class DB(object): ...@@ -642,6 +651,9 @@ class DB(object):
def getCacheSize(self): def getCacheSize(self):
return self._cache_size return self._cache_size
def getCacheSizeBytes(self):
return self._cache_size_bytes
def lastTransaction(self): def lastTransaction(self):
return self._storage.lastTransaction() return self._storage.lastTransaction()
...@@ -657,6 +669,9 @@ class DB(object): ...@@ -657,6 +669,9 @@ class DB(object):
def getHistoricalCacheSize(self): def getHistoricalCacheSize(self):
return self._historical_cache_size return self._historical_cache_size
def getHistoricalCacheSizeBytes(self):
return self._historical_cache_size_bytes
def getHistoricalPoolSize(self): def getHistoricalPoolSize(self):
return self.historical_pool.size return self.historical_pool.size
...@@ -720,13 +735,21 @@ class DB(object): ...@@ -720,13 +735,21 @@ class DB(object):
if before is not None: if before is not None:
result = self.historical_pool.pop(before) result = self.historical_pool.pop(before)
if result is None: if result is None:
c = self.klass(self, self._historical_cache_size, before) c = self.klass(self,
self._historical_cache_size,
before,
self._historical_cache_size_bytes,
)
self.historical_pool.push(c, before) self.historical_pool.push(c, before)
result = self.historical_pool.pop(before) result = self.historical_pool.pop(before)
else: else:
result = self.pool.pop() result = self.pool.pop()
if result is None: if result is None:
c = self.klass(self, self._cache_size) c = self.klass(self,
self._cache_size,
None,
self._cache_size_bytes,
)
self.pool.push(c) self.pool.push(c)
result = self.pool.pop() result = self.pool.pop()
assert result is not None assert result is not None
...@@ -813,6 +836,16 @@ class DB(object): ...@@ -813,6 +836,16 @@ class DB(object):
finally: finally:
self._r() self._r()
def setCacheSizeBytes(self, size):
self._a()
try:
self._cache_size_bytes = size
def setsize(c):
c._cache.cache_size_bytes = size
self.pool.map(setsize)
finally:
self._r()
def setHistoricalCacheSize(self, size): def setHistoricalCacheSize(self, size):
self._a() self._a()
try: try:
...@@ -823,6 +856,16 @@ class DB(object): ...@@ -823,6 +856,16 @@ class DB(object):
finally: finally:
self._r() self._r()
def setHistoricalCacheSizeBytes(self, size):
self._a()
try:
self._historical_cache_size_bytes = size
def setsize(c):
c._cache.cache_size_bytes = size
self.historical_pool.map(setsize)
finally:
self._r()
def setPoolSize(self, size): def setPoolSize(self, size):
self._a() self._a()
try: try:
......
...@@ -181,6 +181,13 @@ ...@@ -181,6 +181,13 @@
Target size, in number of objects, of each connection's Target size, in number of objects, of each connection's
object cache. object cache.
</description> </description>
<key name="cache-size-bytes" datatype="byte-size" default="0">
<description>
Target size, in total estimated size for objects, of each connection's
object cache.
"0" means no limit.
</description>
</key>
<key name="pool-size" datatype="integer" default="7"/> <key name="pool-size" datatype="integer" default="7"/>
<description> <description>
The expected maximum number of simultaneously open connections. The expected maximum number of simultaneously open connections.
...@@ -200,6 +207,12 @@ ...@@ -200,6 +207,12 @@
Target size, in number of objects, of each historical connection's Target size, in number of objects, of each historical connection's
object cache. object cache.
</description> </description>
<key name="historical-cache-size-bytes" datatype="byte-size" default="0">
<description>
Target size, in total estimated size of objects, of each historical connection's
object cache.
</description>
</key>
<key name="historical-timeout" datatype="time-interval" <key name="historical-timeout" datatype="time-interval"
default="5m"/> default="5m"/>
<description> <description>
......
...@@ -95,14 +95,18 @@ class ZODBDatabase(BaseConfig): ...@@ -95,14 +95,18 @@ class ZODBDatabase(BaseConfig):
section = self.config section = self.config
storage = section.storage.open() storage = section.storage.open()
try: try:
return ZODB.DB(storage, return ZODB.DB(
pool_size=section.pool_size, storage,
cache_size=section.cache_size, pool_size=section.pool_size,
historical_pool_size=section.historical_pool_size, cache_size=section.cache_size,
historical_cache_size=section.historical_cache_size, cache_size_bytes=section.cache_size_bytes,
historical_timeout=section.historical_timeout, historical_pool_size=section.historical_pool_size,
database_name=section.database_name, historical_cache_size=section.historical_cache_size,
databases=databases) historical_cache_size_bytes=section.historical_cache_size_bytes,
historical_timeout=section.historical_timeout,
database_name=section.database_name,
databases=databases,
)
except: except:
storage.close() storage.close()
raise raise
......
...@@ -512,6 +512,114 @@ def test_invalidateCache(): ...@@ -512,6 +512,114 @@ def test_invalidateCache():
>>> db.close() >>> db.close()
""" """
class _PlayPersistent(Persistent):
def setValueWithSize(self, size=0): self.value = size*' '
__init__ = setValueWithSize
class EstimatedSizeTests(unittest.TestCase):
"""check that size estimations are handled correctly."""
def setUp(self):
self.db = db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
self.conn = c = db.open()
self.obj = obj = _PlayPersistent()
c.root()['obj'] = obj
transaction.commit()
def test_size_set_on_write_commit(self):
obj, cache = self.obj, self.conn._cache
# we have just written "obj". Its size should not be zero
size, cache_size = obj._p_estimated_size, cache.total_estimated_size
self.assert_(size > 0)
self.assert_(cache_size > size)
# increase the size, write again and check that the size changed
obj.setValueWithSize(1000)
transaction.commit()
new_size = obj._p_estimated_size
self.assert_(new_size > size)
self.assertEqual(cache.total_estimated_size, cache_size + new_size - size)
def test_size_set_on_write_savepoint(self):
obj, cache = self.obj, self.conn._cache
# we have just written "obj". Its size should not be zero
size, cache_size = obj._p_estimated_size, cache.total_estimated_size
# increase the size, write again and check that the size changed
obj.setValueWithSize(1000)
transaction.savepoint()
new_size = obj._p_estimated_size
self.assert_(new_size > size)
self.assertEqual(cache.total_estimated_size, cache_size + new_size - size)
def test_size_set_on_load(self):
c = self.db.open() # new connection
obj = c.root()['obj']
# the object is still a ghost and '_p_estimated_size' not yet set
# access to unghost
cache = c._cache
cache_size = cache.total_estimated_size
obj.value
size = obj._p_estimated_size
self.assert_(size > 0)
self.assertEqual(cache.total_estimated_size, cache_size + size)
# we test here as well that the deactivation works reduced the cache size
obj._p_deactivate()
self.assertEqual(cache.total_estimated_size, cache_size)
def test_configuration(self):
# verify defaults ....
expected = 0
# ... on db
db = self.db
self.assertEqual(db.getCacheSizeBytes(), expected)
self.assertEqual(db.getHistoricalCacheSizeBytes(), expected)
# ... on connection
conn = self.conn
self.assertEqual(conn._cache.cache_size_bytes, expected)
# verify explicit setting ...
expected = 10000
# ... on db
db = databaseFromString("<zodb>\n"
" cache-size-bytes %d\n"
" historical-cache-size-bytes %d\n"
" <mappingstorage />\n"
"</zodb>"
% (expected, expected+1)
)
self.assertEqual(db.getCacheSizeBytes(), expected)
self.assertEqual(db.getHistoricalCacheSizeBytes(), expected+1)
# ... on connectionB
conn = db.open()
self.assertEqual(conn._cache.cache_size_bytes, expected)
# test huge (larger than 4 byte) size limit
db = databaseFromString("<zodb>\n"
" cache-size-bytes 8GB\n"
" <mappingstorage />\n"
"</zodb>"
)
self.assertEqual(db.getCacheSizeBytes(), 0x1L << 33)
def test_cache_garbage_collection(self):
db = self.db
# activate size based cache garbage collection
db.setCacheSizeBytes(1)
conn = self.conn
cache = conn._cache
# verify the change worked as expected
self.assertEqual(cache.cache_size_bytes, 1)
# verify our entrance assumption is fullfilled
self.assert_(cache.total_estimated_size > 1)
conn.cacheGC()
self.assert_(cache.total_estimated_size <= 1)
# sanity check
self.assert_(cache.total_estimated_size >= 0)
# ---- stubs # ---- stubs
class StubObject(Persistent): class StubObject(Persistent):
...@@ -649,4 +757,5 @@ def test_suite(): ...@@ -649,4 +757,5 @@ def test_suite():
s = unittest.makeSuite(ConnectionDotAdd, 'check') s = unittest.makeSuite(ConnectionDotAdd, 'check')
s.addTest(doctest.DocTestSuite()) s.addTest(doctest.DocTestSuite())
s.addTest(unittest.makeSuite(TestConnectionInterface)) s.addTest(unittest.makeSuite(TestConnectionInterface))
s.addTest(unittest.makeSuite(EstimatedSizeTests))
return s return s
...@@ -89,6 +89,7 @@ unghostify(cPersistentObject *self) ...@@ -89,6 +89,7 @@ unghostify(cPersistentObject *self)
if (self->cache) { if (self->cache) {
/* Create a node in the ring for this unghostified object. */ /* Create a node in the ring for this unghostified object. */
self->cache->non_ghost_count++; self->cache->non_ghost_count++;
self->cache->total_estimated_size += self->estimated_size;
ring_add(&self->cache->ring_home, &self->ring); ring_add(&self->cache->ring_home, &self->ring);
Py_INCREF(self); Py_INCREF(self);
} }
...@@ -144,6 +145,7 @@ unlink_from_ring(cPersistentObject *self) ...@@ -144,6 +145,7 @@ unlink_from_ring(cPersistentObject *self)
/* if we're ghostifying an object, we better have some non-ghosts */ /* if we're ghostifying an object, we better have some non-ghosts */
assert(self->cache->non_ghost_count > 0); assert(self->cache->non_ghost_count > 0);
self->cache->non_ghost_count--; self->cache->non_ghost_count--;
self->cache->total_estimated_size -= self->estimated_size;
ring_del(&self->ring); ring_del(&self->ring);
} }
...@@ -174,6 +176,7 @@ ghostify(cPersistentObject *self) ...@@ -174,6 +176,7 @@ ghostify(cPersistentObject *self)
/* If we're ghostifying an object, we better have some non-ghosts. */ /* If we're ghostifying an object, we better have some non-ghosts. */
assert(self->cache->non_ghost_count > 0); assert(self->cache->non_ghost_count > 0);
self->cache->non_ghost_count--; self->cache->non_ghost_count--;
self->cache->total_estimated_size -= self->estimated_size;
ring_del(&self->ring); ring_del(&self->ring);
self->state = cPersistent_GHOST_STATE; self->state = cPersistent_GHOST_STATE;
dictptr = _PyObject_GetDictPtr((PyObject *)self); dictptr = _PyObject_GetDictPtr((PyObject *)self);
...@@ -1011,6 +1014,34 @@ Per_get_state(cPersistentObject *self) ...@@ -1011,6 +1014,34 @@ Per_get_state(cPersistentObject *self)
return PyInt_FromLong(self->state); return PyInt_FromLong(self->state);
} }
static PyObject *
Per_get_estimated_size(cPersistentObject *self)
{
return PyInt_FromLong(self->estimated_size);
}
static int
Per_set_estimated_size(cPersistentObject *self, PyObject *v)
{
if (v) {
if (PyInt_Check(v)) {
if (PyInt_AS_LONG(v) < 0) {
PyErr_SetString(PyExc_ValueError,
"_p_estimated_size must not be negative");
return -1;
}
self->estimated_size = PyInt_AS_LONG(v);
}
else {
PyErr_SetString(PyExc_ValueError,
"_p_estimated_size must be an integer");
return -1;
}
} else
self->estimated_size = 0;
return 0;
}
static PyGetSetDef Per_getsets[] = { static PyGetSetDef Per_getsets[] = {
{"_p_changed", (getter)Per_get_changed, (setter)Per_set_changed}, {"_p_changed", (getter)Per_get_changed, (setter)Per_set_changed},
{"_p_jar", (getter)Per_get_jar, (setter)Per_set_jar}, {"_p_jar", (getter)Per_get_jar, (setter)Per_set_jar},
...@@ -1018,6 +1049,9 @@ static PyGetSetDef Per_getsets[] = { ...@@ -1018,6 +1049,9 @@ static PyGetSetDef Per_getsets[] = {
{"_p_oid", (getter)Per_get_oid, (setter)Per_set_oid}, {"_p_oid", (getter)Per_get_oid, (setter)Per_set_oid},
{"_p_serial", (getter)Per_get_serial, (setter)Per_set_serial}, {"_p_serial", (getter)Per_get_serial, (setter)Per_set_serial},
{"_p_state", (getter)Per_get_state}, {"_p_state", (getter)Per_get_state},
{"_p_estimated_size",
(getter)Per_get_estimated_size, (setter)Per_set_estimated_size
},
{NULL} {NULL}
}; };
......
...@@ -23,11 +23,12 @@ ...@@ -23,11 +23,12 @@
#define CACHE_HEAD \ #define CACHE_HEAD \
PyObject_HEAD \ PyObject_HEAD \
CPersistentRing ring_home; \ CPersistentRing ring_home; \
int non_ghost_count; int non_ghost_count; \
PY_LONG_LONG total_estimated_size; /* total estimated size of items in cache */
struct ccobject_head_struct; struct ccobject_head_struct;
typedef struct ccobject_head_struct PerCache; typedef struct ccobject_head_struct PerCache;
/* How big is a persistent object? /* How big is a persistent object?
...@@ -38,13 +39,14 @@ typedef struct ccobject_head_struct PerCache; ...@@ -38,13 +39,14 @@ typedef struct ccobject_head_struct PerCache;
8 ring struct 8 ring struct
8 serialno 8 serialno
4 state + extra 4 state + extra
4 size info
(52) so far (56) so far
4 dict ptr 4 dict ptr
4 weaklist ptr 4 weaklist ptr
------------------------- -------------------------
64 only need 62, but obmalloc rounds up to multiple of eight 68 only need 62, but obmalloc rounds up to multiple of eight
Even a ghost requires 64 bytes. It's possible to make a persistent Even a ghost requires 64 bytes. It's possible to make a persistent
instance with slots and no dict, which changes the storage needed. instance with slots and no dict, which changes the storage needed.
...@@ -59,7 +61,8 @@ typedef struct ccobject_head_struct PerCache; ...@@ -59,7 +61,8 @@ typedef struct ccobject_head_struct PerCache;
CPersistentRing ring; \ CPersistentRing ring; \
char serial[8]; \ char serial[8]; \
signed char state; \ signed char state; \
unsigned char reserved[3]; unsigned char reserved[3]; \
unsigned long estimated_size;
#define cPersistent_GHOST_STATE -1 #define cPersistent_GHOST_STATE -1
#define cPersistent_UPTODATE_STATE 0 #define cPersistent_UPTODATE_STATE 0
......
...@@ -116,6 +116,7 @@ typedef struct { ...@@ -116,6 +116,7 @@ typedef struct {
PyObject *data; /* oid -> object dict */ PyObject *data; /* oid -> object dict */
PyObject *jar; /* Connection object */ PyObject *jar; /* Connection object */
int cache_size; /* target number of items in cache */ int cache_size; /* target number of items in cache */
PY_LONG_LONG cache_size_bytes; /* target total estimated size of items in cache */
/* Most of the time the ring contains only: /* Most of the time the ring contains only:
* many nodes corresponding to persistent objects * many nodes corresponding to persistent objects
...@@ -167,7 +168,7 @@ unlink_from_ring(CPersistentRing *self) ...@@ -167,7 +168,7 @@ unlink_from_ring(CPersistentRing *self)
} }
static int static int
scan_gc_items(ccobject *self, int target) scan_gc_items(ccobject *self, int target, PY_LONG_LONG target_bytes)
{ {
/* This function must only be called with the ring lock held, /* This function must only be called with the ring lock held,
because it places non-object placeholders in the ring. because it places non-object placeholders in the ring.
...@@ -189,7 +190,11 @@ scan_gc_items(ccobject *self, int target) ...@@ -189,7 +190,11 @@ scan_gc_items(ccobject *self, int target)
*/ */
insert_after(&before_original_home, self->ring_home.r_prev); insert_after(&before_original_home, self->ring_home.r_prev);
here = self->ring_home.r_next; /* least recently used object */ here = self->ring_home.r_next; /* least recently used object */
while (here != &before_original_home && self->non_ghost_count > target) { while (here != &before_original_home &&
(self->non_ghost_count > target
|| (target_bytes && self->total_estimated_size > target_bytes)
)
) {
assert(self->ring_lock); assert(self->ring_lock);
assert(here != &self->ring_home); assert(here != &self->ring_home);
...@@ -244,7 +249,7 @@ scan_gc_items(ccobject *self, int target) ...@@ -244,7 +249,7 @@ scan_gc_items(ccobject *self, int target)
} }
static PyObject * static PyObject *
lockgc(ccobject *self, int target_size) lockgc(ccobject *self, int target_size, PY_LONG_LONG target_size_bytes)
{ {
/* This is thread-safe because of the GIL, and there's nothing /* This is thread-safe because of the GIL, and there's nothing
* in between checking the ring_lock and acquiring it that calls back * in between checking the ring_lock and acquiring it that calls back
...@@ -256,7 +261,7 @@ lockgc(ccobject *self, int target_size) ...@@ -256,7 +261,7 @@ lockgc(ccobject *self, int target_size)
} }
self->ring_lock = 1; self->ring_lock = 1;
if (scan_gc_items(self, target_size) < 0) { if (scan_gc_items(self, target_size, target_size_bytes) < 0) {
self->ring_lock = 0; self->ring_lock = 0;
return NULL; return NULL;
} }
...@@ -272,6 +277,7 @@ cc_incrgc(ccobject *self, PyObject *args) ...@@ -272,6 +277,7 @@ cc_incrgc(ccobject *self, PyObject *args)
int obsolete_arg = -999; int obsolete_arg = -999;
int starting_size = self->non_ghost_count; int starting_size = self->non_ghost_count;
int target_size = self->cache_size; int target_size = self->cache_size;
PY_LONG_LONG target_size_bytes = self->cache_size_bytes;
if (self->cache_drain_resistance >= 1) { if (self->cache_drain_resistance >= 1) {
/* This cache will gradually drain down to a small size. Check /* This cache will gradually drain down to a small size. Check
...@@ -294,7 +300,7 @@ cc_incrgc(ccobject *self, PyObject *args) ...@@ -294,7 +300,7 @@ cc_incrgc(ccobject *self, PyObject *args)
< 0)) < 0))
return NULL; return NULL;
return lockgc(self, target_size); return lockgc(self, target_size, target_size_bytes);
} }
static PyObject * static PyObject *
...@@ -307,7 +313,7 @@ cc_full_sweep(ccobject *self, PyObject *args) ...@@ -307,7 +313,7 @@ cc_full_sweep(ccobject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "|i:full_sweep", &dt)) if (!PyArg_ParseTuple(args, "|i:full_sweep", &dt))
return NULL; return NULL;
if (dt == -999) if (dt == -999)
return lockgc(self, 0); return lockgc(self, 0, 0);
else else
return cc_incrgc(self, args); return cc_incrgc(self, args);
} }
...@@ -327,7 +333,7 @@ cc_minimize(ccobject *self, PyObject *args) ...@@ -327,7 +333,7 @@ cc_minimize(ccobject *self, PyObject *args)
< 0)) < 0))
return NULL; return NULL;
return lockgc(self, 0); return lockgc(self, 0, 0);
} }
static int static int
...@@ -629,6 +635,32 @@ cc_ringlen(ccobject *self) ...@@ -629,6 +635,32 @@ cc_ringlen(ccobject *self)
return PyInt_FromLong(c); return PyInt_FromLong(c);
} }
static PyObject *
cc_update_object_size_estimation(ccobject *self, PyObject *args)
{
PyObject *oid;
cPersistentObject *v;
unsigned int new_size;
if (!PyArg_ParseTuple(args, "OI:updateObjectSizeEstimation", &oid, &new_size))
return NULL;
/* Note: reference borrowed */
v = (cPersistentObject *)PyDict_GetItem(self->data, oid);
if (v) {
/* we know this object -- update our "total_size_estimation"
we must only update when the object is in the ring
*/
if (v->ring.r_next) {
self->total_estimated_size += new_size - v->estimated_size;
/* we do this in "Connection" as we need it even when the
object is not in the cache (or not the ring)
*/
/* v->estimated_size = new_size; */
}
}
Py_RETURN_NONE;
}
static struct PyMethodDef cc_methods[] = { static struct PyMethodDef cc_methods[] = {
{"items", (PyCFunction)cc_items, METH_NOARGS, {"items", (PyCFunction)cc_items, METH_NOARGS,
"Return list of oid, object pairs for all items in cache."}, "Return list of oid, object pairs for all items in cache."},
...@@ -655,6 +687,10 @@ static struct PyMethodDef cc_methods[] = { ...@@ -655,6 +687,10 @@ static struct PyMethodDef cc_methods[] = {
"ringlen() -- Returns number of non-ghost items in cache."}, "ringlen() -- Returns number of non-ghost items in cache."},
{"debug_info", (PyCFunction)cc_debug_info, METH_NOARGS, {"debug_info", (PyCFunction)cc_debug_info, METH_NOARGS,
"debug_info() -- Returns debugging data about objects in the cache."}, "debug_info() -- Returns debugging data about objects in the cache."},
{"update_object_size_estimation",
(PyCFunction)cc_update_object_size_estimation,
METH_VARARGS,
"update_object_size_estimation(oid, new_size) -- update the caches size estimation for *oid* (if this is known to the cache)."},
{NULL, NULL} /* sentinel */ {NULL, NULL} /* sentinel */
}; };
...@@ -662,9 +698,10 @@ static int ...@@ -662,9 +698,10 @@ static int
cc_init(ccobject *self, PyObject *args, PyObject *kwds) cc_init(ccobject *self, PyObject *args, PyObject *kwds)
{ {
int cache_size = 100; int cache_size = 100;
PY_LONG_LONG cache_size_bytes = 0;
PyObject *jar; PyObject *jar;
if (!PyArg_ParseTuple(args, "O|i", &jar, &cache_size)) if (!PyArg_ParseTuple(args, "O|iL", &jar, &cache_size, &cache_size_bytes))
return -1; return -1;
self->jar = NULL; self->jar = NULL;
...@@ -687,7 +724,9 @@ cc_init(ccobject *self, PyObject *args, PyObject *kwds) ...@@ -687,7 +724,9 @@ cc_init(ccobject *self, PyObject *args, PyObject *kwds)
self->jar = jar; self->jar = jar;
Py_INCREF(jar); Py_INCREF(jar);
self->cache_size = cache_size; self->cache_size = cache_size;
self->cache_size_bytes = cache_size_bytes;
self->non_ghost_count = 0; self->non_ghost_count = 0;
self->total_estimated_size = 0;
self->klass_count = 0; self->klass_count = 0;
self->cache_drain_resistance = 0; self->cache_drain_resistance = 0;
self->ring_lock = 0; self->ring_lock = 0;
...@@ -1018,6 +1057,8 @@ static PyGetSetDef cc_getsets[] = { ...@@ -1018,6 +1057,8 @@ static PyGetSetDef cc_getsets[] = {
static PyMemberDef cc_members[] = { static PyMemberDef cc_members[] = {
{"cache_size", T_INT, offsetof(ccobject, cache_size)}, {"cache_size", T_INT, offsetof(ccobject, cache_size)},
{"cache_size_bytes", T_LONG, offsetof(ccobject, cache_size_bytes)},
{"total_estimated_size", T_LONG, offsetof(ccobject, total_estimated_size), RO},
{"cache_drain_resistance", T_INT, {"cache_drain_resistance", T_INT,
offsetof(ccobject, cache_drain_resistance)}, offsetof(ccobject, cache_drain_resistance)},
{"cache_non_ghost_count", T_INT, offsetof(ccobject, non_ghost_count), RO}, {"cache_non_ghost_count", T_INT, offsetof(ccobject, non_ghost_count), RO},
......
...@@ -85,6 +85,24 @@ Try all sorts of different ways to change the object's state. ...@@ -85,6 +85,24 @@ Try all sorts of different ways to change the object's state.
>>> p.x >>> p.x
2 2
We can store a size estimation in ``_p_estimated_size``. Its default is 0.
The size estimation can be used by a cache associated with the data manager
to help in the implementation of its replacement strategy or its size bounds.
Of course, the estimated size must not be negative.
>>> p._p_estimated_size
0
>>> p._p_estimated_size = 1000
>>> p._p_estimated_size
1000
>>> p._p_estimated_size = -1
Traceback (most recent call last):
....
ValueError: _p_estimated_size must not be negative
Test Persistent with Data Manager Test Persistent with Data Manager
--------------------------------- ---------------------------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment