Commit 841d1a73 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e44a6719
...@@ -111,13 +111,6 @@ type DB struct { ...@@ -111,13 +111,6 @@ type DB struct {
// XXX δtail/δwait -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case) // XXX δtail/δwait -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case)
} }
// δwaiter represents someone waiting for δtail.Head to become ≥ at.
// XXX place
type δwaiter struct {
at Tid
ready chan struct{}
}
// NewDB creates new database handle. // NewDB creates new database handle.
func NewDB(stor IStorage) *DB { func NewDB(stor IStorage) *DB {
...@@ -173,6 +166,12 @@ func (opt *ConnOptions) String() string { ...@@ -173,6 +166,12 @@ func (opt *ConnOptions) String() string {
return s return s
} }
// δwaiter represents someone waiting for δtail.Head to become ≥ at.
type δwaiter struct {
at Tid
ready chan struct{}
}
// watcher receives events about new committed transactions and updates δtail. // watcher receives events about new committed transactions and updates δtail.
// //
// it also wakes up δtail waiters. // it also wakes up δtail waiters.
...@@ -197,6 +196,12 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ? ...@@ -197,6 +196,12 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
delete(db.δwait, w) delete(db.δwait, w)
} }
} }
// forget older δtail entries
tcut := db.δtail.Head().Time().Add(-db.tδkeep)
δcut := TidFromTime(tcut)
db.δtail.ForgetBefore(δcut)
db.mu.Unlock() db.mu.Unlock()
// wakeup waiters outside of db.mu // wakeup waiters outside of db.mu
...@@ -261,7 +266,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -261,7 +266,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
db.mu.Lock() db.mu.Lock()
// unlock is either in db.open -> err, or in resyncAndDBUnlock // unlock is either in resyncAndDBUnlock, or in db.openOrDBUnlock -> err
conn, err := db.openOrDBUnlock(ctx, at, opt.NoPool) conn, err := db.openOrDBUnlock(ctx, at, opt.NoPool)
if err != nil { if err != nil {
...@@ -274,7 +279,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -274,7 +279,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// openOrDBUnlock is internal worker for Open. // openOrDBUnlock is internal worker for Open.
// //
// it returns either new connection, or connection from the pool. // it returns either new connection, or connection from the pool.
// returned connection does not neccessarily have .at=at, and have to go through .resync(). // returned connection does not necessarily have .at=at, and have to go through .resync().
// //
// must be called with db.mu locked. // must be called with db.mu locked.
// db.mu is unlocked on error. // db.mu is unlocked on error.
...@@ -288,7 +293,7 @@ func (db *DB) openOrDBUnlock(ctx context.Context, at Tid, noPool bool) (*Connect ...@@ -288,7 +293,7 @@ func (db *DB) openOrDBUnlock(ctx context.Context, at Tid, noPool bool) (*Connect
retry: retry:
for { for {
// check if we already have the exact match // check if we already have an exact match
conn := db.get(at, at) conn := db.get(at, at)
if conn != nil { if conn != nil {
return conn, nil return conn, nil
...@@ -450,16 +455,11 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) { ...@@ -450,16 +455,11 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) {
return return
} }
// get returns connection from db pool most close to at. // get returns connection from db pool most close to at with conn.at ∈ (atMin, at].
// //
// it creates new one if there is no close-enough connection in the pool. XXX -> no // if there is no such connection in the pool - nil is returned.
// XXX -> must be run with db.mu locked. // must be called with db.mu locked.
func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to make it temp. compile func (db *DB) get(atMin, at Tid) *Connection {
db.mu.Lock()
defer db.mu.Unlock()
// XXX at < δtail.Tail -> getHistoric; else -> here
l := len(db.pool) l := len(db.pool)
// find pool index corresponding to at: // find pool index corresponding to at:
...@@ -470,10 +470,11 @@ func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to ...@@ -470,10 +470,11 @@ func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to
// search through window of X previous connections and find out the one // search through window of X previous connections and find out the one
// with minimal distance to get to state @at. If all connections are too // with minimal distance to get to state @at. If all connections are too
// distant - create connection anew. // distant - create connection anew. // XXX no -> nil
// //
// XXX search not only previous, but future too? (we can get back to // XXX search not only previous, but future too? (we can get back to
// past by invalidating what was later changed) // past by invalidating what was later changed) (but likely it will
// hurt by destroying cache of more recent connection)
const X = 10 // XXX hardcoded const X = 10 // XXX hardcoded
jδmin := -1 jδmin := -1
for j := i - X; j < i; j++ { for j := i - X; j < i; j++ {
...@@ -488,7 +489,7 @@ func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to ...@@ -488,7 +489,7 @@ func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to
// nothing found or too distant // nothing found or too distant
const Tnear = 10*time.Minute // XXX hardcoded const Tnear = 10*time.Minute // XXX hardcoded
if jδmin < 0 || tabs(δtid(at, db.pool[jδmin].at)) > Tnear { if jδmin < 0 || tabs(δtid(at, db.pool[jδmin].at)) > Tnear {
return newConnection(db, at) return newConnection(db, at) // XXX no -> nil
} }
// reuse the connection // reuse the connection
...@@ -531,7 +532,7 @@ func (db *DB) put(conn *Connection) { ...@@ -531,7 +532,7 @@ func (db *DB) put(conn *Connection) {
copy(db.pool[i+1:], db.pool[i:]) copy(db.pool[i+1:], db.pool[i:])
db.pool[i] = conn db.pool[i] = conn
// XXX GC too idle connections here? // XXX GC too idle connections here? XXX
} }
// ---- txn sync ---- // ---- txn sync ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment