Commit 63f11c8c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 36ade85d
...@@ -110,11 +110,10 @@ type DB struct { ...@@ -110,11 +110,10 @@ type DB struct {
δtail *ΔTail // [](rev↑, []oid) δtail *ΔTail // [](rev↑, []oid)
tδkeep time.Duration tδkeep time.Duration
// openers waiting for δtail.Head to become covering their at. // waiters for δtail.Head to become ≥ their at.
// XXX -> headWait? hwait map[hwaiter]struct{} // set{(at, ready)}
δwait map[δwaiter]struct{} // set{(at, ready)}
// XXX δtail/δwait -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case) // XXX δtail/hwait -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case)
} }
...@@ -127,7 +126,7 @@ func NewDB(stor IStorage) *DB { ...@@ -127,7 +126,7 @@ func NewDB(stor IStorage) *DB {
stor: stor, stor: stor,
watchq: make(chan Event), watchq: make(chan Event),
down: make(chan struct{}), down: make(chan struct{}),
δwait: make(map[δwaiter]struct{}), hwait: make(map[hwaiter]struct{}),
tδkeep: 10*time.Minute, // see δtail discussion tδkeep: 10*time.Minute, // see δtail discussion
} }
...@@ -197,12 +196,6 @@ func (opt *ConnOptions) String() string { ...@@ -197,12 +196,6 @@ func (opt *ConnOptions) String() string {
return s return s
} }
// δwaiter represents someone waiting for δtail.Head to become ≥ at.
type δwaiter struct {
at Tid
ready chan struct{}
}
// watcher receives events about new committed transactions and updates δtail. // watcher receives events about new committed transactions and updates δtail.
// //
// It also notifies δtail waiters. // It also notifies δtail waiters.
...@@ -250,10 +243,10 @@ func (db *DB) watcher() (err error) { ...@@ -250,10 +243,10 @@ func (db *DB) watcher() (err error) {
db.mu.Lock() db.mu.Lock()
db.δtail.Append(δ.Tid, δ.Changev) db.δtail.Append(δ.Tid, δ.Changev)
for w := range db.δwait { for w := range db.hwait {
if w.at <= δ.Tid { if w.at <= δ.Tid {
readyv = append(readyv, w.ready) readyv = append(readyv, w.ready)
delete(db.δwait, w) delete(db.hwait, w)
} }
} }
...@@ -275,6 +268,45 @@ func (db *DB) watcher() (err error) { ...@@ -275,6 +268,45 @@ func (db *DB) watcher() (err error) {
} }
} }
// hwaiter represents someone waiting for δtail.Head to become ≥ at.
type hwaiter struct {
at Tid
ready chan struct{}
}
// headWait waits till db.Head becomes ≥ at.
//
// must be called with db.mu locked and can release/relock it in the process.
func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
if at <= db.δtail.Head() {
return nil // we already have the coverage
}
// we have some δtail coverage, but at is ahead of that.
// wait till δtail.head is up to date covering ≥ at,
// and retry the loop (δtail.tail might go over at while we are waiting)
δready := make(chan struct{})
db.hwait[hwaiter{at, δready}] = struct{}{}
db.mu.Unlock()
defer xerr.Contextf(&err, "wait head ≥ %s", at)
select {
case <-δready:
// ok - δtail.head went over at; relock db XXX ok to relock?
db.mu.Lock()
return nil
// on error leave db.mu unlocked XXX ok?
case <-ctx.Done():
return ctx.Err()
case <-db.down:
return db.downErr
}
}
// Open opens new connection to the database. // Open opens new connection to the database.
// //
// By default the connection is opened to current latest database state; opt.At // By default the connection is opened to current latest database state; opt.At
...@@ -383,7 +415,7 @@ retry: ...@@ -383,7 +415,7 @@ retry:
// wait till δtail.head is up to date covering ≥ at, // wait till δtail.head is up to date covering ≥ at,
// and retry the loop (δtail.tail might go over at while we are waiting) // and retry the loop (δtail.tail might go over at while we are waiting)
δready := make(chan struct{}) δready := make(chan struct{})
db.δwait[δwaiter{at, δready}] = struct{}{} db.hwait[hwaiter{at, δready}] = struct{}{}
db.mu.Unlock() db.mu.Unlock()
select { select {
...@@ -445,7 +477,7 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error { ...@@ -445,7 +477,7 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error {
// resyncAndDBUnlock serves Connection.Resync and DB.Open . // resyncAndDBUnlock serves Connection.Resync and DB.Open .
// //
// must be called with conn.db locked and unlocks it at the end. // must be called with conn.db locked and unlocks it at the end.
func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) error { func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err error) {
db := conn.db db := conn.db
txn := transaction.Current(ctx) txn := transaction.Current(ctx)
...@@ -454,8 +486,11 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) error { ...@@ -454,8 +486,11 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) error {
panic("Conn.resync: previous transaction is not yet complete") panic("Conn.resync: previous transaction is not yet complete")
} }
// XXX err ctx
// upon exit, with all locks released, register conn to txn. // upon exit, with all locks released, register conn to txn.
defer func() { defer func() {
if err != nil { return }
conn.at = at conn.at = at
conn.txn = txn conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn)) txn.RegisterSync((*connTxnSync)(conn))
...@@ -467,10 +502,14 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) error { ...@@ -467,10 +502,14 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) error {
return nil return nil
} }
// XXX -> DB.δobj(at1, at2) // first wait for db.Head to cover at (else if at is slightly
// not covered yet -> we'll hit δall case).
err = db.headWait(ctx, at)
if err != nil {
return err
}
// XXX first wait for db.stor.head to cover at (else if at is slightly // XXX -> DB.δobj(at1, at2)
// not covered yet -> we'll hit δall case()
// conn.at != at - have to invalidate objects in live cache. // conn.at != at - have to invalidate objects in live cache.
δtail := db.δtail δtail := db.δtail
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment