Commit b03da68b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c68f5386
...@@ -27,6 +27,8 @@ import ( ...@@ -27,6 +27,8 @@ import (
"time" "time"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
"fmt"
) )
// DB represents a handle to database at application level and contains pool // DB represents a handle to database at application level and contains pool
...@@ -74,10 +76,12 @@ func NewDB(stor IStorage) *DB { ...@@ -74,10 +76,12 @@ func NewDB(stor IStorage) *DB {
δtail: NewΔTail(), δtail: NewΔTail(),
δwait: make(map[δwaiter]struct{}), δwait: make(map[δwaiter]struct{}),
} }
watchq := make(chan CommitEvent) watchq := make(chan CommitEvent)
stor.AddWatch(watchq) stor.AddWatch(watchq)
// XXX DelWatch? in db.Close() ? // XXX DelWatch? in db.Close() ?
go db.watcher(watchq) go db.watcher(watchq)
return db return db
} }
...@@ -115,10 +119,13 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ? ...@@ -115,10 +119,13 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
for { for {
event, ok := <-watchq event, ok := <-watchq
if !ok { if !ok {
fmt.Printf("db: watcher: close")
// XXX wake up all waiters? // XXX wake up all waiters?
return // closed return // closed
} }
fmt.Printf("db: watcher <- %v", event)
var readyv []chan struct{} // waiters that become ready var readyv []chan struct{} // waiters that become ready
db.mu.Lock() db.mu.Lock()
...@@ -133,6 +140,7 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ? ...@@ -133,6 +140,7 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
// wakeup waiters outside of db.mu // wakeup waiters outside of db.mu
for _, ready := range readyv { for _, ready := range readyv {
fmt.Printf("db: watcher: wakeup %v", ready)
close(ready) close(ready)
} }
} }
......
...@@ -23,6 +23,7 @@ package zodb ...@@ -23,6 +23,7 @@ package zodb
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"net/url" "net/url"
"strings" "strings"
...@@ -109,6 +110,11 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -109,6 +110,11 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
// small cache so that prefetch can work for loading // small cache so that prefetch can work for loading
// XXX 512K hardcoded (= ~ 128 · 4K-entries) // XXX 512K hardcoded (= ~ 128 · 4K-entries)
cache = NewCache(storDriver, 128 * 4*1024) cache = NewCache(storDriver, 128 * 4*1024)
// FIXME teach cache for watching and remove vvv
log.Printf("zodb: FIXME: open %s: cache is not ready for invalidations" +
" -> NoCache forced", storageURL)
cache = nil
} }
stor := &storage{ stor := &storage{
......
...@@ -191,6 +191,7 @@ func (cc *zcacheControl) WantEvict(obj IPersistent) bool { ...@@ -191,6 +191,7 @@ func (cc *zcacheControl) WantEvict(obj IPersistent) bool {
// XXX test for cache=y/n (raw data cache) // XXX test for cache=y/n (raw data cache)
// XXX test both txn.Abort() and conn.Resync() // XXX test both txn.Abort() and conn.Resync()
func TestPersistentDB(t *testing.T) { func TestPersistentDB(t *testing.T) {
println("111")
X := exc.Raiseif X := exc.Raiseif
assert := require.New(t) assert := require.New(t)
checkObj := tCheckObj(t) checkObj := tCheckObj(t)
...@@ -207,14 +208,19 @@ func TestPersistentDB(t *testing.T) { ...@@ -207,14 +208,19 @@ func TestPersistentDB(t *testing.T) {
_obj1 := NewMyObject(nil); _obj1.oid = 101; _obj1.value = "hello" _obj1 := NewMyObject(nil); _obj1.oid = 101; _obj1.value = "hello"
_obj2 := NewMyObject(nil); _obj2.oid = 102; _obj2.value = "world" _obj2 := NewMyObject(nil); _obj2.oid = 102; _obj2.value = "world"
at1, err := ZPyCommit(zurl, 0, _obj1, _obj2); X(err) at1, err := ZPyCommit(zurl, 0, _obj1, _obj2); X(err)
println("222")
// open connection to it via zodb/go // open connection to it via zodb/go
ctx := context.Background() ctx := context.Background()
stor, err := OpenStorage(ctx, zurl, &OpenOptions{ReadOnly: true}); X(err) stor, err := OpenStorage(ctx, zurl, &OpenOptions{ReadOnly: true}); X(err)
println("333")
db := NewDB(stor) db := NewDB(stor)
println("444")
txn1, ctx1 := transaction.New(ctx) txn1, ctx1 := transaction.New(ctx)
println("444b")
conn1, err := db.Open(ctx1, &ConnOptions{}); X(err) conn1, err := db.Open(ctx1, &ConnOptions{}); X(err)
println("555")
assert.Equal(conn1.At(), at1) assert.Equal(conn1.At(), at1)
assert.Equal(db.connv, []*Connection(nil)) assert.Equal(db.connv, []*Connection(nil))
assert.Equal(conn1.db, db) assert.Equal(conn1.db, db)
...@@ -232,6 +238,7 @@ func TestPersistentDB(t *testing.T) { ...@@ -232,6 +238,7 @@ func TestPersistentDB(t *testing.T) {
assert.Equal(ClassOf(xobj2), "t.zodb.MyObject") assert.Equal(ClassOf(xobj2), "t.zodb.MyObject")
// XXX objX -> c1objX // XXX objX -> c1objX
println("ZZZ")
obj1 := xobj1.(*MyObject) obj1 := xobj1.(*MyObject)
obj2 := xobj2.(*MyObject) obj2 := xobj2.(*MyObject)
...@@ -275,6 +282,8 @@ func TestPersistentDB(t *testing.T) { ...@@ -275,6 +282,8 @@ func TestPersistentDB(t *testing.T) {
_obj2.value = "kitty" _obj2.value = "kitty"
at2, err := ZPyCommit(zurl, at1, _obj2); X(err) at2, err := ZPyCommit(zurl, at1, _obj2); X(err)
println("000")
// new db connection should see the change // new db connection should see the change
txn2, ctx2 := transaction.New(ctx) txn2, ctx2 := transaction.New(ctx)
conn2, err := db.Open(ctx2, &ConnOptions{}); X(err) conn2, err := db.Open(ctx2, &ConnOptions{}); X(err)
...@@ -316,12 +325,16 @@ func TestPersistentDB(t *testing.T) { ...@@ -316,12 +325,16 @@ func TestPersistentDB(t *testing.T) {
checkObj(obj1, conn1, 101, InvalidTid, GHOST, 0, nil) checkObj(obj1, conn1, 101, InvalidTid, GHOST, 0, nil)
checkObj(obj2, conn1, 102, at1, UPTODATE, 0, nil) checkObj(obj2, conn1, 102, at1, UPTODATE, 0, nil)
println("AAA")
// txn1 completes - conn1 goes back to db pool // txn1 completes - conn1 goes back to db pool
assert.Equal(conn1.txn, txn1) assert.Equal(conn1.txn, txn1)
txn1.Abort() txn1.Abort()
assert.Equal(conn1.txn, nil) assert.Equal(conn1.txn, nil)
assert.Equal(db.connv, []*Connection{conn1}) assert.Equal(db.connv, []*Connection{conn1})
println("BBB")
// open new connection - it should be conn1 but at updated database view // open new connection - it should be conn1 but at updated database view
txn3, ctx3 := transaction.New(ctx) txn3, ctx3 := transaction.New(ctx)
assert.NotEqual(txn3, txn1) assert.NotEqual(txn3, txn1)
...@@ -333,6 +346,8 @@ func TestPersistentDB(t *testing.T) { ...@@ -333,6 +346,8 @@ func TestPersistentDB(t *testing.T) {
assert.Equal(db.connv, []*Connection{}) assert.Equal(db.connv, []*Connection{})
// XXX ctx1 = ctx3 (not to use 3 below) ? // XXX ctx1 = ctx3 (not to use 3 below) ?
println("CCC")
// obj2 should be invalidated // obj2 should be invalidated
assert.Equal(conn1.Cache().Get(101), obj1) // XXX is assert.Equal(conn1.Cache().Get(101), obj1) // XXX is
assert.Equal(conn1.Cache().Get(101), obj2) // XXX is assert.Equal(conn1.Cache().Get(101), obj2) // XXX is
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment