Commit 1160f6b3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4ebf39a5
...@@ -106,6 +106,9 @@ func TestBTree(t *testing.T) { ...@@ -106,6 +106,9 @@ func TestBTree(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
db := zodb.NewDB(stor) db := zodb.NewDB(stor)
defer func() {
err := db.Close(); X(err)
}()
txn, ctx := transaction.New(ctx) txn, ctx := transaction.New(ctx)
defer txn.Abort() defer txn.Abort()
......
...@@ -45,7 +45,8 @@ import ( ...@@ -45,7 +45,8 @@ import (
// //
// DB is safe to access from multiple goroutines simultaneously. // DB is safe to access from multiple goroutines simultaneously.
type DB struct { type DB struct {
stor IStorage stor IStorage
watchq chan Event // we are watching .stor via here
mu sync.Mutex mu sync.Mutex
...@@ -114,25 +115,42 @@ type DB struct { ...@@ -114,25 +115,42 @@ type DB struct {
// NewDB creates new database handle. // NewDB creates new database handle.
//
// Created database handle must be closed when no longer needed.
func NewDB(stor IStorage) *DB { func NewDB(stor IStorage) *DB {
// XXX db options? // XXX db options?
db := &DB{ db := &DB{
stor: stor, stor: stor,
δwait: make(map[δwaiter]struct{}), watchq: make(chan Event),
δwait: make(map[δwaiter]struct{}),
tδkeep: 10*time.Minute, // see δtail discussion tδkeep: 10*time.Minute, // see δtail discussion
} }
watchq := make(chan Event) at0 := stor.AddWatch(db.watchq)
at0 := stor.AddWatch(watchq)
db.δtail = NewΔTail(at0) // init to (at0, at0] db.δtail = NewΔTail(at0) // init to (at0, at0]
go db.watcher(watchq) go db.watcher()
// XXX DelWatch? in db.Close() ?
return db return db
} }
// XXX DB.shutdown(reason error) ?
// Close closes database handle.
//
// After Close DB.Open calls will return error. However it is ok to continue
// working with connections opened prior Close.
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
stor.DelWatch(db.watchq)
// XXX stub
return nil
}
// ConnOptions describes options to DB.Open . // ConnOptions describes options to DB.Open .
type ConnOptions struct { type ConnOptions struct {
At Tid // if !0, open Connection bound to `at` view of database; not latest. At Tid // if !0, open Connection bound to `at` view of database; not latest.
...@@ -177,9 +195,10 @@ type δwaiter struct { ...@@ -177,9 +195,10 @@ type δwaiter struct {
// watcher receives events about new committed transactions and updates δtail. // watcher receives events about new committed transactions and updates δtail.
// //
// it also notifies δtail waiters. // it also notifies δtail waiters.
func (db *DB) watcher(watchq <-chan Event) { // XXX err ? func (db *DB) watcher() { // XXX err ?
for { for {
event, ok := <-watchq // XXX check for db.down
event, ok := <-db.watchq
if !ok { if !ok {
fmt.Printf("db: watcher: close\n") fmt.Printf("db: watcher: close\n")
// XXX wake up all waiters? // XXX wake up all waiters?
...@@ -255,6 +274,8 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -255,6 +274,8 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
}() }()
// XXX check db is aready down/closed
txn := transaction.Current(ctx) txn := transaction.Current(ctx)
// find out db state we should open at // find out db state we should open at
......
...@@ -347,6 +347,9 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -347,6 +347,9 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
ctx := context.Background() ctx := context.Background()
stor, err := OpenStorage(ctx, zurl, &OpenOptions{ReadOnly: true, NoCache: !rawcache}); X(err) stor, err := OpenStorage(ctx, zurl, &OpenOptions{ReadOnly: true, NoCache: !rawcache}); X(err)
db := NewDB(stor) db := NewDB(stor)
defer func() {
err := db.Close(); X(err)
}()
// testopen opens new db transaction/connection and wraps it with tPersistentDB. // testopen opens new db transaction/connection and wraps it with tPersistentDB.
testopen := func(opt *ConnOptions) *tPersistentDB { testopen := func(opt *ConnOptions) *tPersistentDB {
......
...@@ -359,6 +359,8 @@ type IStorageDriver interface { ...@@ -359,6 +359,8 @@ type IStorageDriver interface {
// LastTid returns the id of the last committed transaction. // LastTid returns the id of the last committed transaction.
// //
// If no transactions have been committed yet, LastTid returns 0. // If no transactions have been committed yet, LastTid returns 0.
//
// XXX clarify semantic XXX -> Sync + Head ?
LastTid(ctx context.Context) (Tid, error) LastTid(ctx context.Context) (Tid, error)
Loader Loader
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment