Commit 688c2f87 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 63f11c8c
...@@ -277,6 +277,7 @@ type hwaiter struct { ...@@ -277,6 +277,7 @@ type hwaiter struct {
// headWait waits till db.Head becomes ≥ at. // headWait waits till db.Head becomes ≥ at.
// //
// must be called with db.mu locked and can release/relock it in the process. // must be called with db.mu locked and can release/relock it in the process.
// on error db.mu remains unlocked.
func (db *DB) headWait(ctx context.Context, at Tid) (err error) { func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
if at <= db.δtail.Head() { if at <= db.δtail.Head() {
return nil // we already have the coverage return nil // we already have the coverage
...@@ -357,17 +358,13 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -357,17 +358,13 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// proceed to open(at) // proceed to open(at)
db.mu.Lock() // unlocked in *DBUnlock db.mu.Lock() // unlocked in *DBUnlock
/* // wait for δtail.head ≥ at
err := db.needHeadOrDBUnlock(ctx, at) XXX wait for δtail.head >= at err := db.waitHeadOrDBUnlock(ctx, at)
if err != nil { if err != nil {
return nil, err return nil, err
} }
*/
conn, err := db.openOrDBUnlock(ctx, at, opt.NoPool) conn := db.open(at, opt.NoPool)
if err != nil {
return nil, err
}
err = conn.resyncAndDBUnlock(ctx, at) err = conn.resyncAndDBUnlock(ctx, at)
if err != nil { if err != nil {
// XXX release conn? // XXX release conn?
...@@ -376,29 +373,25 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -376,29 +373,25 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
return conn, nil return conn, nil
} }
// openOrDBUnlock is internal worker for Open. // open is internal worker for Open.
// //
// it returns either new connection, or connection from the pool. // it returns either new connection, or connection from the pool.
// returned connection does not necessarily have .at=at, and have to go through .resync(). // returned connection does not necessarily have .at=at, and have to go through .resync().
// //
// must be called with db.mu locked. // must be called with db.mu locked.
// db.mu is unlocked on error. func (db *DB) open(at Tid, noPool bool) *Connection {
func (db *DB) openOrDBUnlock(ctx context.Context, at Tid, noPool bool) (*Connection, error) { fmt.Printf("db.open @%s nopool=%v\t; δtail (%s, %s]\n", at, noPool, db.δtail.Tail(), db.δtail.Head())
fmt.Printf("db.openx %s nopool=%v\t; δtail (%s, %s]\n", at, noPool, db.δtail.Tail(), db.δtail.Head())
// NoPool connection - create one anew // NoPool connection - create one anew
if noPool { if noPool {
// XXX wait for at to be covered
conn := newConnection(db, at) conn := newConnection(db, at)
conn.noPool = true conn.noPool = true
return conn, nil return conn
} }
retry:
for {
// check if we already have an exact match // check if we already have an exact match
conn := db.get(at, at) conn := db.get(at, at)
if conn != nil { if conn != nil {
return conn, nil return conn
} }
// no exact match - let's try to find nearest // no exact match - let's try to find nearest
...@@ -407,30 +400,12 @@ retry: ...@@ -407,30 +400,12 @@ retry:
// too far in the past, and we know there is no exact match // too far in the past, and we know there is no exact match
// -> new historic connection. // -> new historic connection.
if at <= δtail.Tail() { if at <= δtail.Tail() {
return newConnection(db, at), nil return newConnection(db, at)
} }
// we have some δtail coverage, but at is ahead of that. // at should be ≤ head (we waited for it before calling here)
if at > δtail.Head() { if head := δtail.Head(); at > head {
// wait till δtail.head is up to date covering ≥ at, panic("open: head (%s) < at (%s)", head, at)
// and retry the loop (δtail.tail might go over at while we are waiting)
δready := make(chan struct{})
db.hwait[hwaiter{at, δready}] = struct{}{}
db.mu.Unlock()
select {
case <-δready:
// ok - δtail.head went over at; relock db and retry
db.mu.Lock()
continue retry
// on error leave db.mu unlocked
case <-ctx.Done():
return nil, ctx.Err()
case <-db.down:
return nil, db.downErr
}
} }
// at ∈ (δtail, δhead] ; try to get nearby idle connection or make a new one // at ∈ (δtail, δhead] ; try to get nearby idle connection or make a new one
...@@ -438,8 +413,7 @@ retry: ...@@ -438,8 +413,7 @@ retry:
if conn == nil { if conn == nil {
conn = newConnection(db, at) conn = newConnection(db, at)
} }
return conn, nil return conn
}
} }
// Resync resyncs the connection onto different database view and transaction. // Resync resyncs the connection onto different database view and transaction.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment