Commit 7f63e821 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 18f5f402
...@@ -120,17 +120,45 @@ type LiveCache struct { ...@@ -120,17 +120,45 @@ type LiveCache struct {
} }
// LiveCacheControl is the interface that allows applications to influence // LiveCacheControl is the interface that allows applications to influence
// Connection's decisions with respect to Connection's live cache. // Connection's decisions with respect to Connection's LiveCache.
//
// See Connection.Cache and LiveCache.SetControl for how to install
// LiveCacheControl on a connection's live cache.
type LiveCacheControl interface { type LiveCacheControl interface {
// WantEvict is called when object is going to be evicted from live // PCacheClassify is called to classify an object and returns live
// cache on deactivation and made ghost. // cache policy that should be used for this object.
PCacheClassify(obj IPersistent) PCachePolicy
}
// PCachePolicy describes live caching policy for a persistent object.
//
// It is | combination of PCache* flags with 0 meaning "use default policy".
//
// See LiveCacheControl for how to apply a caching policy.
type PCachePolicy int
const (
// keep object pinned into cache, even if in ghost state.
// //
// If !ok the object will remain live. // This allows to rely on object being never evicted from live cache.
// //
// NOTE on invalidation invalidated objects are evicted from live cache // Note: object's state can still be evicted and the object can go into
// ghost state. Use PCacheKeepState to prevent such automatic eviction
// until it is really needed.
PCachePinObject PCachePolicy = 1 << iota
// don't discard object state.
//
// Note: on invalidation, state of invalidated objects is discarded
// unconditionally. // unconditionally.
WantEvict(obj IPersistent) (ok bool) PCacheKeepState // XXX PCachePolicy explicitly?
}
// data access is non-temporal.
//
// Object state is used once and then won't be used for a long time.
// There is no reason to preserve object state in cache.
PCacheNonTemporal // XXX PCachePolicy ?
)
// ---------------------------------------- // ----------------------------------------
......
...@@ -262,11 +262,14 @@ func (obj *Persistent) PDeactivate() { ...@@ -262,11 +262,14 @@ func (obj *Persistent) PDeactivate() {
// no constant load/unload on object access. XXX -> MRU cache? // no constant load/unload on object access. XXX -> MRU cache?
// NOTE wcfs manages its objects explicitly and does not need this. // NOTE wcfs manages its objects explicitly and does not need this.
// XXX reenable
/*
if cc := obj.jar.cache.control; cc != nil { if cc := obj.jar.cache.control; cc != nil {
if !cc.WantEvict(obj.instance) { if !cc.WantEvict(obj.instance) {
return return
} }
} }
*/
// already ghost // already ghost
if obj.state == GHOST { if obj.state == GHOST {
......
...@@ -25,7 +25,7 @@ import ( ...@@ -25,7 +25,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"reflect" "reflect"
// "runtime" "runtime"
"testing" "testing"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
...@@ -39,7 +39,8 @@ import ( ...@@ -39,7 +39,8 @@ import (
type MyObject struct { type MyObject struct {
Persistent Persistent
value string value string // persistent state
_v_value string // volatile in-RAM only state; not managed by persistency layer
} }
func NewMyObject(jar *Connection) *MyObject { func NewMyObject(jar *Connection) *MyObject {
...@@ -194,33 +195,138 @@ func TestPersistentBasic(t *testing.T) { ...@@ -194,33 +195,138 @@ func TestPersistentBasic(t *testing.T) {
// ---- TestPersistentDB ---- // ---- TestPersistentDB ----
// zcacheControl is simple live cache control that prevents specified objects // zcacheControl is simple live cache control that is organized as `{} oid ->
// to be evicted from live cache. // PCachePolicy` table.
type zcacheControl struct { type zcacheControl struct {
keep []Oid // objects that must not be evicted pCachePolicy map[Oid]PCachePolicy
} }
func (cc *zcacheControl) WantEvict(obj IPersistent) bool { func (cc *zcacheControl) PCacheClassify(obj IPersistent) PCachePolicy {
for _, oid := range cc.keep { return cc.pCachePolicy[obj.POid()] // default -> 0
if obj.POid() == oid {
return false
}
}
return true
} }
// tPersistentDB represents one testing environment inside TestPersistentDB. // tPersistentDB represents testing database. XXX -> tDB ?
type tPersistentDB struct { type tPersistentDB struct {
*testing.T *testing.T
work string // working directory
zurl string // zurl for database under work
// zodb/go stor/db handle for the database
stor IStorage
db *DB
head Tid // last committed transaction
commitq []toCommit // queue to be committed
}
// XXX place? name?
type toCommit struct {
oid Oid
value string
}
// tPersistentConn represents testing Connection. XXX -> tConn ?
type tPersistentConn struct {
*testing.T
// a transaction and DB connection opened under it // a transaction and DB connection opened under it
txn transaction.Transaction txn transaction.Transaction
ctx context.Context ctx context.Context
conn *Connection conn *Connection
} }
// testdb creates and initializes new test database.
func testdb(t0 *testing.T) *tPersistentDB {
t0.Helper()
t := &tPersistentDB{T: t0}
X := t.fatalif
work, err := ioutil.TempDir("", "t-persistent"); X(err)
t.work = work
t.zurl = work + "1/.fs"
finishok := false
defer func() {
if !finishok {
err := os.RemoveAll(t.work); X(err)
}
}()
// create test db via py with 2 objects
t.Add(11, "init")
t.Add(12, "db")
t.Commit()
// open the db via zodb/go
stor, err := Open(context.Background(), t.zurl, &OpenOptions{ReadOnly: true, NoCache: !rawcache}); X(err)
db := NewDB(stor)
t.stor = stor
t.db = db
finishok = true
return t
}
// Close release resources associated with test database.
func (t *tPersistentDB) Close() {
t.Helper()
X := t.fatalif
err := t.db.Close(); X(err)
err = t.stor.Close(); X(err)
err = os.RemoveAll(t.work); X(err)
}
// Add marks object with oid as modified and queues it to be committed as
// MyObject(value).
//
// The commit is performed by Commit.
func (t *tPersistentDB) Add(oid Oid, value string) {
t.commitq = append(t.commitq, toCommit{oid, value})
}
// Commit commits objects queued by Add.
func (t *tPersistentDB) Commit() {
t.Helper()
var objv []IPersistent
for _, tc := range t.commitq {
obj := NewMyObject(nil) // XXX hack - goes without jar
obj.oid = tc.oid
obj.value = tc.value
objv = append(objv, obj)
}
head, err := ZPyCommit(t.zurl, t.head, objv...)
if err != nil {
t.Fatal(err)
}
t.head = head
t.commitq = nil
}
// Open opens new test transaction/connection.
func (t *tPersistentDB) Open(opt *ConnOptions) *tPersistentConn {
t.Helper()
X := t.fatalif
txn, ctx := transaction.New(context.Background())
conn, err := t.db.Open(ctx, opt); X(err)
assert.Same(t, conn.db, t.db)
assert.Same(t, conn.txn, txn)
return &tPersistentConn{
T: t.T,
txn: txn,
ctx: ctx,
conn: conn,
}
}
// Get gets oid from t.conn and asserts its type. // Get gets oid from t.conn and asserts its type.
func (t *tPersistentDB) Get(oid Oid) *MyObject { func (t *tPersistentConn) Get(oid Oid) *MyObject {
t.Helper() t.Helper()
xobj, err := t.conn.Get(t.ctx, oid) xobj, err := t.conn.Get(t.ctx, oid)
if err != nil { if err != nil {
...@@ -237,7 +343,7 @@ func (t *tPersistentDB) Get(oid Oid) *MyObject { ...@@ -237,7 +343,7 @@ func (t *tPersistentDB) Get(oid Oid) *MyObject {
} }
// PActivate activates obj in t environment. // PActivate activates obj in t environment.
func (t *tPersistentDB) PActivate(obj IPersistent) { func (t *tPersistentConn) PActivate(obj IPersistent) {
t.Helper() t.Helper()
err := obj.PActivate(t.ctx) err := obj.PActivate(t.ctx)
if err != nil { if err != nil {
...@@ -248,7 +354,7 @@ func (t *tPersistentDB) PActivate(obj IPersistent) { ...@@ -248,7 +354,7 @@ func (t *tPersistentDB) PActivate(obj IPersistent) {
// checkObj checks state of obj and that obj ∈ t.conn. // checkObj checks state of obj and that obj ∈ t.conn.
// //
// if object is !GHOST - it also verifies its value. // if object is !GHOST - it also verifies its value.
func (t *tPersistentDB) checkObj(obj *MyObject, oid Oid, serial Tid, state ObjectState, refcnt int32, valueOk ...string) { func (t *tPersistentConn) checkObj(obj *MyObject, oid Oid, serial Tid, state ObjectState, refcnt int32, valueOk ...string) {
t.Helper() t.Helper()
// any object with live pointer to it must be also in conn's cache. // any object with live pointer to it must be also in conn's cache.
...@@ -288,7 +394,7 @@ func (t *tPersistentDB) checkObj(obj *MyObject, oid Oid, serial Tid, state Objec ...@@ -288,7 +394,7 @@ func (t *tPersistentDB) checkObj(obj *MyObject, oid Oid, serial Tid, state Objec
} }
// Resync resyncs t to new transaction @at. // Resync resyncs t to new transaction @at.
func (t *tPersistentDB) Resync(at Tid) { func (t *tPersistentConn) Resync(at Tid) {
t.Helper() t.Helper()
db := t.conn.db db := t.conn.db
...@@ -307,13 +413,24 @@ func (t *tPersistentDB) Resync(at Tid) { ...@@ -307,13 +413,24 @@ func (t *tPersistentDB) Resync(at Tid) {
} }
// Abort aborts t's connection and verifies it becomes !live. // Abort aborts t's connection and verifies it becomes !live.
func (t *tPersistentDB) Abort() { func (t *tPersistentConn) Abort() {
t.Helper() t.Helper()
assert.Same(t, t.conn.txn, t.txn) assert.Same(t, t.conn.txn, t.txn)
t.txn.Abort() t.txn.Abort()
assert.Equal(t, t.conn.txn, nil) assert.Equal(t, t.conn.txn, nil)
} }
func (t *tPersistentDB) fatalif(err error) {
if err != nil {
t.Fatal(err)
}
}
func (t *tPersistentConn) fatalif(err error) {
if err != nil {
t.Fatal(err)
}
}
// Persistent tests with storage. // Persistent tests with storage.
// //
...@@ -329,13 +446,16 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -329,13 +446,16 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
X := exc.Raiseif X := exc.Raiseif
assert := assert.New(t0) assert := assert.New(t0)
work, err := ioutil.TempDir("", "t-persistent"); X(err) tdb := testdb(t0)
defer func() { defer tdb.Close()
err := os.RemoveAll(work); X(err) at0 := tdb.head
}()
zurl := work + "/1.fs" tdb.Add(101, "hello")
tdb.Add(102, "world")
tdb.Commit()
at1 := tdb.head
/*
// create test db via py with 2 objects // create test db via py with 2 objects
// XXX hack as _objX go without jar. // XXX hack as _objX go without jar.
_obj1 := NewMyObject(nil); _obj1.oid = 101; _obj1.value = "init" _obj1 := NewMyObject(nil); _obj1.oid = 101; _obj1.value = "init"
...@@ -371,8 +491,12 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -371,8 +491,12 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
conn: conn, conn: conn,
} }
} }
*/
db := tdb.db
t1 := testopen(&ConnOptions{})
t1 := tdb.Open(&ConnOptions{})
t := t1 t := t1
assert.Equal(t.conn.At(), at1) assert.Equal(t.conn.At(), at1)
assert.Equal(db.pool, []*Connection(nil)) assert.Equal(db.pool, []*Connection(nil))
...@@ -382,9 +506,13 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -382,9 +506,13 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
assert.Equal(db.δtail.Head(), at1) assert.Equal(db.δtail.Head(), at1)
// do not evict obj2 from live cache. obj1 is ok to be evicted. // do not evict obj2 from live cache. obj1 is ok to be evicted.
zcc := &zcacheControl{map[Oid]PCachePolicy{
102: PCachePinObject | PCacheKeepState,
}}
zcache1 := t.conn.Cache() zcache1 := t.conn.Cache()
zcache1.Lock() zcache1.Lock()
zcache1.SetControl(&zcacheControl{[]Oid{_obj2.oid}}) zcache1.SetControl(zcc)
zcache1.Unlock() zcache1.Unlock()
// get objects // get objects
...@@ -424,11 +552,12 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -424,11 +552,12 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
t.checkObj(obj2, 102, InvalidTid, GHOST, 0) t.checkObj(obj2, 102, InvalidTid, GHOST, 0)
// commit change to obj2 from external process // commit change to obj2 from external process
_obj2.value = "kitty" tdb.Add(102, "kitty")
at2, err := ZPyCommit(zurl, at1, _obj2); X(err) tdb.Commit()
at2 := tdb.head
// new db connection should see the change // new db connection should see the change
t2 := testopen(&ConnOptions{}) t2 := tdb.Open(&ConnOptions{})
assert.Equal(t2.conn.At(), at2) assert.Equal(t2.conn.At(), at2)
assert.Equal(db.pool, []*Connection(nil)) assert.Equal(db.pool, []*Connection(nil))
...@@ -469,7 +598,7 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -469,7 +598,7 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
// open new connection - it should be conn1 but at updated database view // open new connection - it should be conn1 but at updated database view
t3 := testopen(&ConnOptions{}) t3 := tdb.Open(&ConnOptions{})
assert.Same(t3.conn, t1.conn) assert.Same(t3.conn, t1.conn)
t = t3 t = t3
assert.Equal(t.conn.At(), at2) assert.Equal(t.conn.At(), at2)
...@@ -490,9 +619,9 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -490,9 +619,9 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
t.checkObj(obj1, 101, InvalidTid, GHOST, 0) t.checkObj(obj1, 101, InvalidTid, GHOST, 0)
t.checkObj(obj2, 102, at2, UPTODATE, 0, "kitty") t.checkObj(obj2, 102, at2, UPTODATE, 0, "kitty")
// FIXME test that live cache keeps pinned object live even if we drop // live cache keeps pinned object live even if we drop
// all regular pointers to it and do GC. // all regular pointers to it and do GC.
/* // XXX also PCachePinObject without PCacheKeepState
obj1 = nil obj1 = nil
obj2 = nil obj2 = nil
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
...@@ -505,7 +634,6 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -505,7 +634,6 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
assert.NotEqual(xobj2, nil) assert.NotEqual(xobj2, nil)
obj2 = xobj2.(*MyObject) obj2 = xobj2.(*MyObject)
t.checkObj(obj2, 102, at2, UPTODATE, 0, "kitty") t.checkObj(obj2, 102, at2, UPTODATE, 0, "kitty")
*/
// finish tnx3 and txn2 - conn1 and conn2 go back to db pool // finish tnx3 and txn2 - conn1 and conn2 go back to db pool
...@@ -513,8 +641,11 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -513,8 +641,11 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
t2.Abort() t2.Abort()
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn}) assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
// ---- Resync ----
// open new connection in nopool mode to verify resync // open new connection in nopool mode to verify resync
t4 := testopen(&ConnOptions{NoPool: true}) t4 := tdb.Open(&ConnOptions{NoPool: true})
t = t4 t = t4
assert.Equal(t.conn.At(), at2) assert.Equal(t.conn.At(), at2)
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn}) assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
...@@ -522,7 +653,7 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -522,7 +653,7 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
// pin obj2 into live cache, similarly to conn1 // pin obj2 into live cache, similarly to conn1
rzcache := t.conn.Cache() rzcache := t.conn.Cache()
rzcache.Lock() rzcache.Lock()
rzcache.SetControl(&zcacheControl{[]Oid{_obj2.oid}}) rzcache.SetControl(zcc)
rzcache.Unlock() rzcache.Unlock()
// it should see latest data // it should see latest data
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment