Commit a2233b80 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 57a61770
...@@ -182,6 +182,8 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -182,6 +182,8 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// rce is not in cache - this goroutine becomes responsible for loading it // rce is not in cache - this goroutine becomes responsible for loading it
} else { } else {
// XXX use connection poll
// XXX or it should be cared by loader?
c.loadRCE(rce, xid) c.loadRCE(rce, xid)
} }
...@@ -191,15 +193,20 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -191,15 +193,20 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
func (c *Cache) Prefetch(xid zodb.Xid) { func (c *Cache) Prefetch(xid zodb.Xid) {
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid)
// XXX!rceNew -> adjust LRU?
// spawn prefetch in the background if rce was not yet loaded // spawn prefetch in the background if rce was not yet loaded
if rceNew { if rceNew {
// XXX use connection poll
go c.loadRCE(rce, xid) go c.loadRCE(rce, xid)
} }
} }
// lookupRCE returns revCacheEntry corresponding to xid. // lookupRCE returns revCacheEntry corresponding to xid.
// rceNew indicates whether RCE is new and loading on it has not been initiated. //
// rceNew indicates whether rce is new and loading on it has not been initiated.
// rce should be loaded with loadRCE.
func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there // oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.before read consistently // exit with oce locked and cache.before read consistently
...@@ -275,7 +282,9 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { ...@@ -275,7 +282,9 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
return rce, rceNew return rce, rceNew
} }
// loadRCE performs data loading from database to RCE // loadRCE performs data loading from database into rce.
//
// rce must be new just created by lookupRCE() with returned rceNew=true.
func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) { func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) {
oce := rce.parent oce := rce.parent
data, serial, err := c.loader.Load(xid) data, serial, err := c.loader.Load(xid)
......
...@@ -243,18 +243,34 @@ func TestCache(t *testing.T) { ...@@ -243,18 +243,34 @@ func TestCache(t *testing.T) {
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12)
// simulate case where <14 and <16 were loaded in parallel, both are ready // simulate case where <14 and <16 were loaded in parallel, both are ready
// but <14 takes oce lock first before <16 ans so <12 is not yet merged // but <14 takes oce lock first before <16 and so <12 is not yet merged
// with <16 -> <12 and <14 should be merged into <16. // with <16 -> <12 and <14 should be merged into <16.
// (manually add rce1_b16 so it is not merged with <12) // (manually add rce1_b16 so it is not merged with <12)
rce1_b16 := oce1.newRevEntry(len(oce1.rcev), 16) rce1_b16, new16 := c.lookupRCE(xidlt(1,16))
ok1(new16)
rce1_b16.serial = 9 rce1_b16.serial = 9
rce1_b16.data = world rce1_b16.data = world
close(rce1_b16.ready) // XXX
ok1(rce1_b16.loaded())
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12, rce1_b16) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12, rce1_b16)
ok1(!rce1_b16.loaded())
// (lookup <14 while <16 is not yet loaded so <16 is not picked
// automatically at lookup phase)
rce1_b14, new14 := c.lookupRCE(xidlt(1,14))
ok1(new14)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12, rce1_b14, rce1_b16)
// XXX launch load(<14) before <16.ready // (now <16 becomes ready but not yet takes oce lock)
close(rce1_b16.ready)
ok1(rce1_b16.loaded())
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b12, rce1_b14, rce1_b16)
// (<14 also becomes ready and takes oce lock first, merging <12 and <14 into <16)
c.loadRCE(rce1_b14, xidlt(1,14))
checkRCE(rce1_b14, 14, 9, world, nil)
checkRCE(rce1_b16, 16, 9, world, nil)
checkRCE(rce1_b12, 12, 9, world, nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b16)
} }
type Checker struct { type Checker struct {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment