Commit ada0f636 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4c999c89
...@@ -276,15 +276,16 @@ type hwaiter struct { ...@@ -276,15 +276,16 @@ type hwaiter struct {
// headWait waits till db.Head becomes ≥ at. // headWait waits till db.Head becomes ≥ at.
// //
// Must be called with db.mu locked and can release/relock it in the process. // Must be called db.mu released.
// On error db.mu is unlocked.
// //
// XXX -> waitHead? // XXX -> waitHead?
// XXX -> waitHeadOrDBUnlock?
func (db *DB) headWait(ctx context.Context, at Tid) (err error) { func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
db.mu.Lock()
// XXX check if db is already down -> error even if at is under coverage? // XXX check if db is already down -> error even if at is under coverage?
if at <= db.δtail.Head() { if at <= db.δtail.Head() {
db.mu.Unlock()
return nil // we already have the coverage return nil // we already have the coverage
} }
...@@ -298,11 +299,9 @@ func (db *DB) headWait(ctx context.Context, at Tid) (err error) { ...@@ -298,11 +299,9 @@ func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
select { select {
case <-δready: case <-δready:
// ok - δtail.head went over at; relock db XXX ok to relock? // ok - δtail.head went over at
db.mu.Lock()
return nil return nil
// on error leave db.mu unlocked XXX ok?
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
...@@ -364,16 +363,15 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -364,16 +363,15 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
// proceed to open(at) // proceed to open(at)
db.mu.Lock() // unlocked in *DBUnlock
// wait for db.Head ≥ at // wait for db.Head ≥ at
err = db.headWait(ctx, at) // XXX -> waitHeadOrDBUnlock ? err = db.headWait(ctx, at)
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn := db.open(at, opt.NoPool) conn := db.open(at, opt.NoPool)
conn.resyncAndDBUnlock(ctx, at) conn.resync(ctx, at)
return conn, nil return conn, nil
} }
...@@ -383,9 +381,12 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -383,9 +381,12 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// Returned connection does not generally have .at=at, and have to go through .resync(). // Returned connection does not generally have .at=at, and have to go through .resync().
// //
// Must be called with at ≤ db.Head . // Must be called with at ≤ db.Head .
// Must be called with db.mu locked. // Must be called with db.mu released.
func (db *DB) open(at Tid, noPool bool) *Connection { func (db *DB) open(at Tid, noPool bool) *Connection {
δtail := db.δtail. db.mu.Lock()
defer db.mu.Unlock()
δtail := db.δtail
fmt.Printf("db.open @%s nopool=%v\t; δtail (%s, %s]\n", at, noPool, δtail.Tail(), δtail.Head()) fmt.Printf("db.open @%s nopool=%v\t; δtail (%s, %s]\n", at, noPool, δtail.Tail(), δtail.Head())
...@@ -458,31 +459,29 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error { ...@@ -458,31 +459,29 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error {
// XXX check db is already down/closed // XXX check db is already down/closed
// XXX or -> headWait? // XXX or -> headWait?
db.mu.Lock()
// wait for db.Head ≥ at // wait for db.Head ≥ at
err := db.headWait(ctx, at) err := db.headWait(ctx, at)
if err != nil { if err != nil {
return err return err
} }
conn.resyncAndDBUnlock(ctx, at) conn.resync(ctx, at)
return nil return nil
} }
// resyncAndDBUnlock serves Connection.Resync and DB.Open . // resync serves Connection.Resync and DB.Open .
// //
// Must be called with at ≤ conn.db.Head . // Must be called with at ≤ conn.db.Head .
// Must be called with conn.db locked and unlocks it at the end. // Must be called with conn.db released.
func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) { func (conn *Connection) resync(ctx context.Context, at Tid) {
db := conn.db txn := transaction.Current(ctx)
txn := transaction.Current(ctx) // XXX no unlock here if !txn
if conn.txn != nil { if conn.txn != nil {
db.mu.Unlock()
panic("Conn.resync: previous transaction is not yet complete") panic("Conn.resync: previous transaction is not yet complete")
} }
db := conn.db
db.mu.Lock()
// at should be ≤ head (caller waited for it before invoking us) // at should be ≤ head (caller waited for it before invoking us)
if head := db.δtail.Head(); at > head { if head := db.δtail.Head(); at > head {
db.mu.Unlock() db.mu.Unlock()
...@@ -490,6 +489,7 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) { ...@@ -490,6 +489,7 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) {
} }
// upon exit, with all locks released, register conn to txn. // upon exit, with all locks released, register conn to txn.
// XXX -> outer func (more clear control flow) ?
defer func() { defer func() {
conn.at = at conn.at = at
conn.txn = txn conn.txn = txn
...@@ -502,8 +502,6 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) { ...@@ -502,8 +502,6 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) {
return return
} }
// XXX -> DB.δobj(at1, at2) ?
// conn.at != at - have to invalidate objects in live cache. // conn.at != at - have to invalidate objects in live cache.
δtail := db.δtail δtail := db.δtail
δobj := make(map[Oid]struct{}) // set(oid) - what to invalidate δobj := make(map[Oid]struct{}) // set(oid) - what to invalidate
...@@ -535,7 +533,6 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) { ...@@ -535,7 +533,6 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) {
// unlock db before locking cache and txn // unlock db before locking cache and txn
db.mu.Unlock() db.mu.Unlock()
// XXX -> separate func? (then we can drop "AndDBUnlock")
conn.cache.Lock() conn.cache.Lock()
defer conn.cache.Unlock() defer conn.cache.Unlock()
...@@ -564,7 +561,7 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) { ...@@ -564,7 +561,7 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) {
// get returns connection from db pool most close to at with conn.at ∈ [atMin, at]. // get returns connection from db pool most close to at with conn.at ∈ [atMin, at].
// //
// XXX recheck [atMin or (atMin -- see "= δtail.Tail" in resyncAndDBUnlock. // XXX recheck [atMin or (atMin -- see "= δtail.Tail" in resync.
// //
// if there is no such connection in the pool - nil is returned. // if there is no such connection in the pool - nil is returned.
// must be called with db.mu locked. // must be called with db.mu locked.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment