Commit a58e4c16 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 340387f5
...@@ -239,7 +239,6 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -239,7 +239,6 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
if opt.NoSync { if opt.NoSync {
db.mu.Lock() db.mu.Lock()
// XXX prevent retrieved head to be removed from δtail ?
head = db.δtail.Head() // = 0 if δtail was not yet initialized with first event head = db.δtail.Head() // = 0 if δtail was not yet initialized with first event
db.mu.Unlock() db.mu.Unlock()
} }
...@@ -261,92 +260,80 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -261,92 +260,80 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at = head at = head
} }
// XXX conn, err := db.open(ctx, at, opt.NoPool)
var conn *Connection if err != nil {
if opt.NoPool { return nil, err
conn = newConnection(db, at) }
conn.resync(txn, at)
return conn, nil
}
// open is internal worker for Open.
//
// it returns either new connection, or connection from the pool.
// returned connection does not neccessarily have .at=at, and have to go through .resync().
func (db *DB) open(ctx context.Context, at Tid, noPool bool) (*Connection, error) {
// NoPool connection - create anew
if noPool {
conn := newConnection(db, at)
conn.noPool = true conn.noPool = true
return conn, nil
} }
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock()
// check if we already have the exact match retry: // each loop iteration starts with db.mu locked
conn = db.get(at, at) for {
// check if we already have the exact match
conn := db.get(at, at)
if conn != nil {
return conn, nil
}
if conn == nil { // no exact match - let's try to find nearest
δtail := db.δtail // XXX δtail := db.δtail
δhead := db.δtail.Head() // XXX δhead := db.δtail.Head()
switch { // too far in the past, and we know there is no exact match
// too far in the past -> historic connection // -> new historic connection.
case at < db.δtail.Tail(): if at <= db.δtail.Tail() {
//conn = db.get(at, at) return newConnection(db, at), nil
conn = newConnection(db, at) }
// δtail !initialized yet // δtail !initialized yet
case db.δtail.Head() == 0: if db.δtail.Head() == 0 {
// XXX δtail could be not yet initialized, but e.g. last_tid changed // XXX δtail could be not yet initialized, but e.g. last_tid changed
// -> we have to wait for δtail not to loose just-released live cache // -> we have to wait for δtail not to loose just-released live cache
conn = newConnection(db, at) return newConnection(db, at), nil
// we already have some history coverage
default:
if at > δhead {
// XXX wait
// XXX -> retry loop (δtail.tail might go over at)
}
// at ∈ [δtail, δhead]
conn = db.get(δtail.Tail(), at)
if conn == nil {
conn = newConnection(db, at)
} else {
// invalidate changed live objects
// XXX -> Connection.resync(at)
for _, δ := range δtail.SliceByRev(conn.at, at) {
for _, oid := range δ.changev {
obj := conn.cache.Get(oid)
if obj != nil {
obj.PInvalidate()
}
}
}
conn.at = at
}
} }
db.mu.Unlock() // we have some δtail coverage, but at is ahead of that.
if at > δhead {
// wait till .δtail.head is up to date covering ≥ at // wait till .δtail.head is up to date covering ≥ at
var δready chan struct{} // and retry the loop (δtail.tail might go over at while we are waiting)
db.mu.Lock() δready := make(chan struct{})
δhead = db.δtail.Head()
// XXX prevent head from going away?
if δhead < at {
δready = make(chan struct{})
db.δwait[δwaiter{at, δready}] = struct{}{} db.δwait[δwaiter{at, δready}] = struct{}{}
} db.mu.Unlock()
db.mu.Unlock()
if δready != nil {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX db.mu unlocked twice
return nil, ctx.Err() return nil, ctx.Err()
case <-δready: case <-δready:
// ok // ok - δtail.head went over at
continue retry
} }
} }
}
// now we have both at and invalidation data covering it -> proceed to // at ∈ (δtail, δhead] ; try to get nearby idle connection or make a new one
// get connection from the pool. conn = db.get(δtail.Tail(), at)
conn = db.get(at) if conn == nil {
conn.txn = txn conn = newConnection(db, at)
txn.RegisterSync((*connTxnSync)(conn)) }
return conn, nil
return conn, nil }
} }
// Resync resyncs the connection onto different database view and transaction. // Resync resyncs the connection onto different database view and transaction.
...@@ -359,9 +346,9 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -359,9 +346,9 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// //
// Resync must be used only under the following conditions: // Resync must be used only under the following conditions:
// //
// - the connection was initially opened with NoPool flag. // - the connection was initially opened with NoPool flag;
// - previous transaction, under which connection was previously // - previous transaction, under which connection was previously
// opened/resynced, must be already complete. // opened/resynced, must be already complete;
// - contrary to DB.Open, at cannot be 0. // - contrary to DB.Open, at cannot be 0.
// //
// Note: new at can be both higher and lower than previous connection at. // Note: new at can be both higher and lower than previous connection at.
...@@ -375,14 +362,25 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) { ...@@ -375,14 +362,25 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) {
if at == 0 { if at == 0 {
panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)") panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)")
} }
conn.resync(txn, at)
}
// resync serves Connection.resync and DB.Open .
func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
// XXX conn.cache.Lock ? - yes (e.g. if user also checks it from outside, right?) // XXX conn.cache.Lock ? - yes (e.g. if user also checks it from outside, right?)
// XXX assert conn.txn == nil
// XXX lock in caller?
db := conn.db db := conn.db
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
δtail := db.δtail δtail := db.δtail
// XXX conn.at == at - do nothing (even if out of δtail coverave)
// both conn.at and at are covered by δtail - we can invalidate selectively // both conn.at and at are covered by δtail - we can invalidate selectively
if (δtail.Tail() < conn.at && conn.at <= δtail.Head()) && if (δtail.Tail() < conn.at && conn.at <= δtail.Head()) &&
(δtail.Tail() < at && at <= δtail.Head()) { (δtail.Tail() < at && at <= δtail.Head()) {
...@@ -418,6 +416,8 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) { ...@@ -418,6 +416,8 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) {
} }
} }
// XXX unlock db before txn.RegisterSync (it locks txn.mu)
conn.at = at conn.at = at
conn.txn = txn conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn)) txn.RegisterSync((*connTxnSync)(conn))
......
...@@ -367,10 +367,15 @@ func TestPersistentDB(t *testing.T) { ...@@ -367,10 +367,15 @@ func TestPersistentDB(t *testing.T) {
assert.Equal(obj2.value, "kitty") assert.Equal(obj2.value, "kitty")
// XXX also Resync
// XXX // XXX
txn3.Abort() txn3.Abort()
txn2.Abort() txn2.Abort()
// XXX DB.Open with at on and +-1 δtail edges
// TODO Resync ↑ (with δtail coverage)
// TODO Resync ↓ (with δtail coverage)
// TODO Resync (without δtail coverage)
// XXX cache dropping entries after GC // XXX cache dropping entries after GC
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment