Commit d94bff42 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 412c35a2
...@@ -28,26 +28,27 @@ import ( ...@@ -28,26 +28,27 @@ import (
) )
// Cache adds RAM caching layer over a storage // Cache adds RAM caching layer over a storage
// XXX -> zodb ?
// XXX -> RAMCache ?
type Cache struct { type Cache struct {
mu sync.RWMutex mu sync.RWMutex
// cache is fully synchronized with storage for transactions with tid < before. // cache is fully synchronized with storage for transactions with tid < before.
// XXX clarify ^^^ (it means if revCacheEntry.before=∞ it is Cache.before)
before zodb.Tid before zodb.Tid
oidDir map[zodb.Oid]*oidEntry entryMap map[zodb.Oid]*oidCacheEntry // oid -> cache entries for this oid
size int64 // in bytes // garbage collection:
lru revListHead gcMu sync.Mutex
lru listHead // revCacheEntries in LRU order
size int64 // cached data size in bytes
} }
// oidCacheEntry maintains cached revisions for 1 oid // oidCacheEntry maintains cached revisions for 1 oid
type oidCacheEntry struct { type oidCacheEntry struct {
sync.Mutex // XXX -> rw ? sync.Mutex
// cached revisions in ascending order // cached revisions in ascending order
// .serial < .before <= next.serial < next.before // [i].serial < [i].before <= [i+1].serial < [i+1].before
// //
// XXX or? // XXX or?
// cached revisions in descending order // cached revisions in descending order
...@@ -57,17 +58,17 @@ type oidCacheEntry struct { ...@@ -57,17 +58,17 @@ type oidCacheEntry struct {
// revCacheEntry is information about 1 cached oid revision // revCacheEntry is information about 1 cached oid revision
type revCacheEntry struct { type revCacheEntry struct {
inLRU revListHead inLRU listHead // in Cache.lru
parent *oidCacheEntry parent *oidCacheEntry // oidCacheEntry holding us
// sync.Mutex XXX not needed //sync.Mutex XXX not needed?
// oid revision // oid revision
// 0 if don't know yet - loadBefore(.before) is in progress and actual // 0 if don't know yet - loadBefore(.before) is in progress and actual
// serial not yet obtained from database // serial not yet obtained from database.
serial zodb.Tid serial zodb.Tid
// we know that loadBefore(oid, before) will give this serial:oid. // we know that loadBefore(oid, .before) will give this .serial:oid.
// //
// this is only what we currently know - not neccessarily covering // this is only what we currently know - not neccessarily covering
// whole correct range - e.g. if oid revisions in db are 1 and 5 if we // whole correct range - e.g. if oid revisions in db are 1 and 5 if we
...@@ -75,7 +76,7 @@ type revCacheEntry struct { ...@@ -75,7 +76,7 @@ type revCacheEntry struct {
// remember .before as 3. But for loadBefore(4) we have to redo // remember .before as 3. But for loadBefore(4) we have to redo
// database query again. // database query again.
// //
// if an .before=∞ that actually mean before is cache.before // if .before=∞ here, that actually means before is cache.before
// ( this way we do not need to bump before to next tid in many // ( this way we do not need to bump before to next tid in many
// unchanged cache entries when a transaction invalidation comes ) // unchanged cache entries when a transaction invalidation comes )
// //
...@@ -83,43 +84,21 @@ type revCacheEntry struct { ...@@ -83,43 +84,21 @@ type revCacheEntry struct {
// case when loadBefore with tid > cache.before was called. // case when loadBefore with tid > cache.before was called.
before zodb.Tid before zodb.Tid
// object data or loading error // loading result: object data or error
data []byte data []byte
err error err error
ready chan struct{} // closed when loading finished ready chan struct{} // closed when loading finished
} }
type revListHead struct { // loaded reports whether rce was already loaded
// XXX needs to be created with .next = .prev = self func (rce *revCacheEntry) loaded() bool {
next, prev *revCacheEntry select {
} case <-rce.ready:
return true
func (h *revListHead) rceFromInLRU() (rce *revCacheEntry) { default:
return (*revCacheEntry)(unsafe.Pointer(h) - unsafe.OffsetOf(rce.inLRU)) return false
} }
// XXX -> to ctor?
func (h *revListHead) init() {
h.next = h
h.prev = h
}
// Delete deletes h from list
func (h *revListHead) Delete() {
h.next.prev = h.prev
h.prev.next = h.next
}
// MoveBefore moves a to be before b
// XXX ok to move if a was not previously on the list?
func (a *revListHead) MoveBefore(b *revListHead) {
a.Delete()
a.next = b
b.prev = a
a.prev = b.prev
a.prev.next = a
} }
// XXX doc // XXX doc
...@@ -128,47 +107,40 @@ func (oce *oidCacheEntry) newRevEntry(before zodb.Tid) *revCacheEntry { ...@@ -128,47 +107,40 @@ func (oce *oidCacheEntry) newRevEntry(before zodb.Tid) *revCacheEntry {
parent: oce, parent: oce,
serial: 0, serial: 0,
before: before, before: before,
ready: make(chan struct{})} ready: make(chan struct{})}
} }
rce.inLRU.init() rce.inLRU.init()
return rce return rce
} }
// XXX doc; must be called with oce lock held // find finds rce under oce and returns its index in oce.revv.
func (oce *oidCacheEntry) del(rce *revCacheEntry) { // not found -> -1.
for i, r := range rce.revv { func (oce *oidCacheEntry) find(rce *revCacheEntry) int {
for i, r := oce.revv {
if r == rce { if r == rce {
rce.revv = append(rce.revv[:i], rce.revv[i+1:]) return i
return
} }
} }
return -1
panic("rce not found")
} }
// cleaner is the process that cleans cache by evicting less-needed entries. // XXX doc; must be called with oce lock held
func (c *cache) cleaner() { func (oce *oidCacheEntry) del(rce *revCacheEntry) {
for { i := oce.find(rce)
// cleaner is the only mutator/user of Cache.lru and revCacheEntry.inLRU if i == -1 {
select { panic("rce not found")
case rce := <-c.used:
rce.inLRU.MoveBefore(&c.lru)
default:
for rce := c.lru.next; rce != &c.lru; rce = rce.next {
}
}
} }
}
rce.revv = append(rce.revv[:i], rce.revv[i+1:])
}
// lock order: Cache > cacheEntry > (?) revCacheEntry // lock order: Cache > cacheEntry > (?) revCacheEntry
// XXX maintain nhit / lru // XXX maintain nhit / nmiss?
func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// oid -> cacheEntry ; creating new empty if not yet there // oid -> oce (oidCacheEntry) ; creating new empty if not yet there
// exit with cacheEntry locked and cache.before read consistently // exit with oce locked and cache.before read consistently
c.mu.RLock() c.mu.RLock()
oce := c.entryMap[xid.Oid] oce := c.entryMap[xid.Oid]
...@@ -186,15 +158,15 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -186,15 +158,15 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
oce = &cacheEntry{} oce = &cacheEntry{}
c.entryMap[xid.Oid] = oce c.entryMap[xid.Oid] = oce
} }
cacheBefore = c.before // reload c.before for correctness cacheBefore = c.before // reload c.before becuase we relocked the cache
oce.Lock() oce.Lock()
c.mu.Unlock() c.mu.Unlock()
} }
// oce, before -> rce (revCacheEntry)
var rce *revCacheEntry var rce *revCacheEntry
var rceNew bool // whether new revCacheEntry was created var rceNew bool // whether rce created anew
// before -> revCacheEntry
if xid.TidBefore { if xid.TidBefore {
l := len(oce.revv) l := len(oce.revv)
i := sort.Search(l, func(i int) bool { i := sort.Search(l, func(i int) bool {
...@@ -206,29 +178,35 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -206,29 +178,35 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
}) })
switch { switch {
// not found - tid > max(revv.before) - create new max entry // not found - tid > max(revv.before) - insert new max entry
case i == l: case i == l:
rce = oce.newRevEntry(xid.Tid) rce = oce.newRevEntry(xid.Tid)
if rce.before == cacheBefore { if rce.before == cacheBefore {
// XXX better do this when the entry becomes loaded ? // FIXME better do this when the entry becomes loaded ?
rce.before = zodb.TidMax // XXX vs concurrent invalidations? // XXX vs concurrent invalidations?
rce.before = zodb.TidMax
} }
rceNew = true rceNew = true
oce.revv = append(oce.revv, rce) oce.revv = append(oce.revv, rce) // XXX -> newRevEntry ?
// found: // found:
// tid <= revv[i].before // tid <= revv[i].before
// tid > revv[i-1].before // tid > revv[i-1].before
// exact match - we already have it // exact match - we already have entry for this before
case xid.Tid == revv[i].before: case xid.Tid == oce.revv[i].before:
rce = oce.revv[i]
// non-exact match - same entry if inside (serial, before]
// XXX do we need `oce.revv[i].serial != 0` check vvv ?
case oce.revv[i].loaded() && oce.revv[i].serial != 0 && oce.revv[i].serial < xid.Tid:
rce = oce.revv[i] rce = oce.revv[i]
// if outside [serial, before) - insert new entry // otherwise - insert new entry
case !(revv[i].serial != 0 && revv[i].serial <= xid.Tid): default:
rce = oce.newRevEntry(xid.Tid) rce = oce.newRevEntry(xid.Tid)
rceNew = true rceNew = true
oce.revv = append(oce.revv[:i], rce, oce.revv[i:]) oce.revv = append(oce.revv[:i], rce, oce.revv[i:]) // XXX -> newRevEntry ?
} }
// XXX serial -> revCacheEntry // XXX serial -> revCacheEntry
...@@ -241,27 +219,85 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -241,27 +219,85 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// entry was already in cache - use it // entry was already in cache - use it
if !rceNew { if !rceNew {
<-rce.ready <-rce.ready
// XXX update lru c.gcMu.Lock()
rce.inLRU.MoveBefore(&c.lru)
c.gcMu.Unlock()
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.err
} }
// entry was not in cache - this goroutine becomes responsible for loading it // entry was not in cache - this goroutine becomes responsible for loading it
data, serial, err := c.stor.Load(xid) data, serial, err := c.stor.Load(xid)
// verify db gives serial < before
if serial >= rce.before {
// XXX err != nil - also check vvv?
// XXX loadSerial?
xxx.Errorf("E: cache: database inconsistency: oid: %v: load(<%v) -> %v", xid.Oid, rce.before, serial)
}
rce.serial = serial rce.serial = serial
rce.data = data rce.data = data
rce.err = err rce.err = err
close(ready) close(ready)
δsize := len(data)
// merge rce with adjacent entries in parent
// ( e.g. loadBefore(3) and loadBefore(4) results in the same data loaded if
// there are only revisions with serials 1 and 5 )
oce.Lock() oce.Lock()
// XXX merge with adjacent entries in revv i := oce.find(rce)
if i == -1 {
// rce was already dropped / evicted
oce.Unlock()
return rce.data, rce.serial, rce.err
}
// if rce & rceNext cover the same range -> drop rce
if i + 1 < len(oce.revv) {
rceNext := oce.revv[i+1]
if rceNext.loaded() && rceNext.serial < rce.before { // XXX rceNext.serial=0 ?
// verify rce.serial == rceNext.serial
if rce.serial != rceNext.serial {
// XXX -> where to put? rce.err?
xxx.Errorf("E: cache: database inconsistency: oid: %v: load(<%v) -> %v; load(<%v) -> %v", xid.Oid, rce.before, rce.serial, rceNext.before, rceNext.serial)
}
// drop rce
//oce.revv = append(oce.revv[:i], oce.revv[i+1:])
oce.del(i)
δsize -= len(rce.data)
rce = rceNext
}
}
// if rcePrev & rce cover the same range -> drop rcePrev
if i > 0 {
rcePrev = oce.revv[i-1]
if rce.serial < rcePrev.before {
// verify rce.serial == rcePrev.serial (if that is ready)
if rcePrev.loaded() && rcePrev.serial != rce.serial { // XXX rcePrev.serial=0 ?
// XXX dup wrt ^^^ -> func
xxx.Errorf("E: cache: database inconsistency: oid: %v: load(<%v) -> %v; load(<%v) -> %v", xid.Oid, rcePrev.before, rcePrev, serial, rce.before, rce.serial)
}
// drop rcePrev
//oce.revv =
oce.del(i-1)
δsize -= len(rcePrev.data)
}
}
oce.Unlock() oce.Unlock()
c.Lock() // update lru & cache size
c.size += len(data) c.gcMu.Lock()
rce.inLRU.MoveBefore(&c.lru)
c.size += δsize
if c.size > c.sizeTarget { if c.size > c.sizeTarget {
-> run gc -> run gc
} }
c.Unlock() c.gcMu.Unlock()
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.err
} }
...@@ -281,3 +317,25 @@ func (c *cache) gc(...) { ...@@ -281,3 +317,25 @@ func (c *cache) gc(...) {
} }
} }
// cleaner is the process that cleans cache by evicting less-needed entries.
func (c *cache) cleaner() {
for {
// cleaner is the only mutator/user of Cache.lru and revCacheEntry.inLRU
select {
case rce := <-c.used:
rce.inLRU.MoveBefore(&c.lru)
default:
for rce := c.lru.next; rce != &c.lru; rce = rce.next {
}
}
}
}
// revCacheEntry: .inLRU -> .
func (h *listHead) rceFromInLRU() (rce *revCacheEntry) {
return (*revCacheEntry)(unsafe.Pointer(h) - unsafe.OffsetOf(rce.inLRU))
}
...@@ -29,4 +29,9 @@ func TestCache(t *testing.T) { ...@@ -29,4 +29,9 @@ func TestCache(t *testing.T) {
// q<85 -> a) inside 90.serial..90 b) outside // q<85 -> a) inside 90.serial..90 b) outside
// //
// XXX cases when .serial=0 (not yet determined - 1st loadBefore is in progress) // XXX cases when .serial=0 (not yet determined - 1st loadBefore is in progress)
// XXX for every serial check before = (s-1, s, s+1)
// merge: rce + rceNext
// rcePrev + rce
// rcePrev + (rce + rceNext)
} }
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package client
// base for intrusive list
// listHead is a list head entry for an element in an intrusive doubly-linked list.
//
// XXX doc how to get to container of this list head via unsafe.OffsetOf
//
// always call Init() to initialize a head before using it.
type listHead struct {
// XXX needs to be created with .next = .prev = self
next, prev *listHead
}
// Init initializes a head making it point to itself via .next and .prev
func (h *listHead) Init() {
h.next = h
h.prev = h
}
// Delete deletes h from its list
func (h *listHead) Delete() {
h.next.prev = h.prev
h.prev.next = h.next
h.init()
}
// MoveBefore moves a to be before b
// XXX ok to move if a was not previously on the list?
func (a *listHead) MoveBefore(b *listHead) {
a.Delete()
a.next = b
b.prev = a
a.prev = b.prev
a.prev.next = a
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment