Commit 5d6a5cf4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2f8be58f
...@@ -276,8 +276,11 @@ type hwaiter struct { ...@@ -276,8 +276,11 @@ type hwaiter struct {
// headWait waits till db.Head becomes ≥ at. // headWait waits till db.Head becomes ≥ at.
// //
// must be called with db.mu locked and can release/relock it in the process. // Must be called with db.mu locked and can release/relock it in the process.
// on error db.mu remains unlocked. // On error db.mu is unlocked.
//
// XXX -> waitHead?
// XXX -> waitHeadOrDBUnlock?
func (db *DB) headWait(ctx context.Context, at Tid) (err error) { func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
if at <= db.δtail.Head() { if at <= db.δtail.Head() {
return nil // we already have the coverage return nil // we already have the coverage
...@@ -285,7 +288,6 @@ func (db *DB) headWait(ctx context.Context, at Tid) (err error) { ...@@ -285,7 +288,6 @@ func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
// we have some δtail coverage, but at is ahead of that. // we have some δtail coverage, but at is ahead of that.
// wait till δtail.head is up to date covering ≥ at, // wait till δtail.head is up to date covering ≥ at,
// and retry the loop (δtail.tail might go over at while we are waiting)
δready := make(chan struct{}) δready := make(chan struct{})
db.hwait[hwaiter{at, δready}] = struct{}{} db.hwait[hwaiter{at, δready}] = struct{}{}
db.mu.Unlock() db.mu.Unlock()
...@@ -358,7 +360,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -358,7 +360,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// proceed to open(at) // proceed to open(at)
db.mu.Lock() // unlocked in *DBUnlock db.mu.Lock() // unlocked in *DBUnlock
// wait for δtail.head ≥ at // wait for db.Head ≥ at
err = db.headWait(ctx, at) // XXX -> waitHeadOrDBUnlock ? err = db.headWait(ctx, at) // XXX -> waitHeadOrDBUnlock ?
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -375,10 +377,11 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -375,10 +377,11 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// open is internal worker for Open. // open is internal worker for Open.
// //
// it returns either new connection, or connection from the pool. // It returns either new connection, or connection from the pool.
// returned connection does not necessarily have .at=at, and have to go through .resync(). // Returned connection does not generally have .at=at, and have to go through .resync().
// //
// must be called with db.mu locked. // Must be called with at ≤ db.Head .
// Must be called with db.mu locked.
func (db *DB) open(at Tid, noPool bool) *Connection { func (db *DB) open(at Tid, noPool bool) *Connection {
fmt.Printf("db.open @%s nopool=%v\t; δtail (%s, %s]\n", at, noPool, db.δtail.Tail(), db.δtail.Head()) fmt.Printf("db.open @%s nopool=%v\t; δtail (%s, %s]\n", at, noPool, db.δtail.Tail(), db.δtail.Head())
// NoPool connection - create one anew // NoPool connection - create one anew
...@@ -403,7 +406,7 @@ func (db *DB) open(at Tid, noPool bool) *Connection { ...@@ -403,7 +406,7 @@ func (db *DB) open(at Tid, noPool bool) *Connection {
return newConnection(db, at) return newConnection(db, at)
} }
// at should be ≤ head (we waited for it before calling here) // at should be ≤ head (caller waited for it before invoking us)
if head := δtail.Head(); at > head { if head := δtail.Head(); at > head {
panic(fmt.Sprintf("open: at (%s) > head (%s)", at, head)) panic(fmt.Sprintf("open: at (%s) > head (%s)", at, head))
} }
...@@ -444,22 +447,37 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error { ...@@ -444,22 +447,37 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error {
panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)") panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)")
} }
conn.db.mu.Lock() db := conn.db
db.mu.Lock()
// wait for db.Head ≥ at
err = db.headWait(ctx, at)
if err != nil {
return err
}
return conn.resyncAndDBUnlock(ctx, at) return conn.resyncAndDBUnlock(ctx, at)
} }
// resyncAndDBUnlock serves Connection.Resync and DB.Open . // resyncAndDBUnlock serves Connection.Resync and DB.Open .
// //
// must be called with conn.db locked and unlocks it at the end. // Must be called with at ≤ conn.db.Head .
// Must be called with conn.db locked and unlocks it at the end.
func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err error) { func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err error) {
db := conn.db db := conn.db
txn := transaction.Current(ctx) txn := transaction.Current(ctx) // XXX no unlock here
if conn.txn != nil { if conn.txn != nil {
db.mu.Unlock() db.mu.Unlock()
panic("Conn.resync: previous transaction is not yet complete") panic("Conn.resync: previous transaction is not yet complete")
} }
// at should be ≤ head (caller waited for it before invoking us)
if head := db.δtail.Head(); at > head {
db.mu.Unlock()
panic(fmt.Sprintf("resync: at (%s) > head (%s)", at, head))
}
// XXX err ctx // XXX err ctx
// upon exit, with all locks released, register conn to txn. // upon exit, with all locks released, register conn to txn.
...@@ -476,13 +494,6 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err erro ...@@ -476,13 +494,6 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err erro
return nil return nil
} }
// first wait for db.Head to cover at (else if at is slightly
// not covered yet -> we'll hit δall case).
err = db.headWait(ctx, at)
if err != nil {
return err
}
// XXX -> DB.δobj(at1, at2) // XXX -> DB.δobj(at1, at2)
// conn.at != at - have to invalidate objects in live cache. // conn.at != at - have to invalidate objects in live cache.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment