Commit 69e5d62a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bad73a41
...@@ -45,7 +45,6 @@ import ( ...@@ -45,7 +45,6 @@ import (
// //
// Use DB.Open to open a connection. // Use DB.Open to open a connection.
type Connection struct { type Connection struct {
stor IStorage // underlying storage
db *DB // Connection is part of this DB db *DB // Connection is part of this DB
txn transaction.Transaction // opened under this txn; nil if idle in DB pool. txn transaction.Transaction // opened under this txn; nil if idle in DB pool.
at Tid // current view of database; stable inside a transaction. at Tid // current view of database; stable inside a transaction.
...@@ -137,7 +136,6 @@ type LiveCacheControl interface { ...@@ -137,7 +136,6 @@ type LiveCacheControl interface {
// newConnection creates new Connection associated with db. // newConnection creates new Connection associated with db.
func newConnection(db *DB, at Tid) *Connection { func newConnection(db *DB, at Tid) *Connection {
return &Connection{ return &Connection{
stor: db.stor,
db: db, db: db,
at: at, at: at,
cache: LiveCache{ cache: LiveCache{
...@@ -262,7 +260,7 @@ func (conn *Connection) Get(ctx context.Context, oid Oid) (_ IPersistent, err er ...@@ -262,7 +260,7 @@ func (conn *Connection) Get(ctx context.Context, oid Oid) (_ IPersistent, err er
// load loads object specified by oid. // load loads object specified by oid.
func (conn *Connection) load(ctx context.Context, oid Oid) (_ *mem.Buf, serial Tid, _ error) { func (conn *Connection) load(ctx context.Context, oid Oid) (_ *mem.Buf, serial Tid, _ error) {
conn.checkTxnCtx(ctx, "load") conn.checkTxnCtx(ctx, "load")
return conn.stor.Load(ctx, Xid{Oid: oid, At: conn.at}) return conn.db.stor.Load(ctx, Xid{Oid: oid, At: conn.at})
} }
// ---------------------------------------- // ----------------------------------------
......
...@@ -55,7 +55,7 @@ type DB struct { ...@@ -55,7 +55,7 @@ type DB struct {
// .at and invalidating live objects based on δtail info. // .at and invalidating live objects based on δtail info.
// //
// not all connections here have δtail coverage. // not all connections here have δtail coverage.
connv []*Connection // order by ↑= .at pool []*Connection // order by ↑= .at
// δtail of database changes. // δtail of database changes.
// //
...@@ -79,35 +79,31 @@ type DB struct { ...@@ -79,35 +79,31 @@ type DB struct {
// ΔTnext δhead(when_open) - δtail.Head when connection was opened // ΔTnext δhead(when_open) - δtail.Head when connection was opened
// Twork(conn) - time the connection is used // Twork(conn) - time the connection is used
// Twork(conn) // Twork(conn)
// work = ─────────── // lwork = ───────────
// ΔTnext // ΔTnext
// //
// if heady >> 1 - it is case "1" and δtail coverage is not needed. // if heady >> 1 - it is case "1" and δtail coverage is not needed.
// if heady ~ 1 - it is case "2" and δtail coverage might be needed depending on work. // if heady ~ 1 - it is case "2" and δtail coverage might be needed depending on lwork.
// if work >> 1 - the number of objects that will need to be invalidated // if lwork >> 1 - the number of objects that will need to be invalidated
// when updating conn to current head grows to ~ 100% of // when updating conn to current head grows to ~ 100% of
// connection's live cache. It thus does not make // connection's live cache. It thus does not make
// sense to keep δtail past some reasonable time. // sense to keep δtail past some reasonable time.
// //
// A good system would monitor both ΔTnext, and work for connections // A good system would monitor both ΔTnext, and lwork for connections
// with small heady, and adjust δtail cut time as e.g. // with small heady, and adjust δtail cut time as e.g.
// //
// timelen(δtail) = 3·work·ΔTnext // timelen(δtail) = 3·lwork·ΔTnext
// //
// //
// FIXME for now we just fix // FIXME for now we just fix
// //
// Theady = 1min
// Tδkeep = 10min // Tδkeep = 10min
// //
// and assume that a connection is heady if
//
// |at - δhead(when_open)| ≤ Theady
//
// and keep δtail coverage for Tδkeep time // and keep δtail coverage for Tδkeep time
// //
// timelen(δtail) = Tδkeep // timelen(δtail) = Tδkeep
δtail *ΔTail // [](rev↑, []oid) δtail *ΔTail // [](rev↑, []oid)
tδkeep time.Duration
// openers waiting for δtail.Head to become covering their at. // openers waiting for δtail.Head to become covering their at.
δwait map[δwaiter]struct{} // set{(at, ready)} δwait map[δwaiter]struct{} // set{(at, ready)}
...@@ -130,6 +126,8 @@ func NewDB(stor IStorage) *DB { ...@@ -130,6 +126,8 @@ func NewDB(stor IStorage) *DB {
stor: stor, stor: stor,
δtail: NewΔTail(), δtail: NewΔTail(),
δwait: make(map[δwaiter]struct{}), δwait: make(map[δwaiter]struct{}),
tδkeep: 10*time.Minute, // see δtail discussion
} }
watchq := make(chan CommitEvent) watchq := make(chan CommitEvent)
...@@ -144,6 +142,14 @@ func NewDB(stor IStorage) *DB { ...@@ -144,6 +142,14 @@ func NewDB(stor IStorage) *DB {
type ConnOptions struct { type ConnOptions struct {
At Tid // if !0, open Connection bound to `at` view of database; not latest. At Tid // if !0, open Connection bound to `at` view of database; not latest.
NoSync bool // don't sync with storage to get its last tid. NoSync bool // don't sync with storage to get its last tid.
// don't put connection back into DB pool after transaction ends.
//
// This is low-level option that allows to inspect/use connection's
// LiveCache after transaction finishes, and to manually resync the
// connection onto another database view. See Connection.Resync for
// details.
NoPool bool
} }
// String represents connection options in human-readable form. // String represents connection options in human-readable form.
...@@ -208,7 +214,9 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ? ...@@ -208,7 +214,9 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
// //
// Open must be called under transaction. // Open must be called under transaction.
// Opened connection must be used only under the same transaction and only // Opened connection must be used only under the same transaction and only
// until that transaction is complete. // until that transaction is complete(*).
//
// (*) XXX unless NoPool option is used.
func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err error) { func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err error) {
defer func() { defer func() {
if err == nil { if err == nil {
...@@ -287,6 +295,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -287,6 +295,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
conn = newConnection(db, at) conn = newConnection(db, at)
} else { } else {
// invalidate changed live objects // invalidate changed live objects
// XXX -> Connection.resync(at)
for _, δ := range δtail.SliceByRev(conn.at, at) { for _, δ := range δtail.SliceByRev(conn.at, at) {
for _, oid := range δ.changev { for _, oid := range δ.changev {
obj := conn.cache.Get(oid) obj := conn.cache.Get(oid)
...@@ -333,6 +342,8 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -333,6 +342,8 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
return conn, nil return conn, nil
} }
//func (conn *Connection) resync(at Tid
// get returns connection from db pool most close to at. // get returns connection from db pool most close to at.
// //
// it creates new one if there is no close-enough connection in the pool. XXX -> no // it creates new one if there is no close-enough connection in the pool. XXX -> no
...@@ -343,12 +354,12 @@ func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to ...@@ -343,12 +354,12 @@ func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to
// XXX at < δtail.Tail -> getHistoric; else -> here // XXX at < δtail.Tail -> getHistoric; else -> here
l := len(db.connv) l := len(db.pool)
// find connv index corresponding to at: // find pool index corresponding to at:
// [i-1].at ≤ at < [i].at // [i-1].at ≤ at < [i].at
i := sort.Search(l, func(i int) bool { i := sort.Search(l, func(i int) bool {
return at < db.connv[i].at return at < db.pool[i].at
}) })
// search through window of X previous connections and find out the one // search through window of X previous connections and find out the one
...@@ -370,15 +381,15 @@ func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to ...@@ -370,15 +381,15 @@ func (db *DB) get(at Tid, FIXME ...Tid) *Connection { // XXX FIXME added only to
// nothing found or too distant // nothing found or too distant
const Tnear = 10*time.Minute // XXX hardcoded const Tnear = 10*time.Minute // XXX hardcoded
if jδmin < 0 || tabs(δtid(at, db.connv[jδmin].at)) > Tnear { if jδmin < 0 || tabs(δtid(at, db.pool[jδmin].at)) > Tnear {
return newConnection(db, at) return newConnection(db, at)
} }
// reuse the connection // reuse the connection
conn := db.connv[jδmin] conn := db.pool[jδmin]
copy(db.connv[jδmin:], db.connv[jδmin+1:]) copy(db.pool[jδmin:], db.pool[jδmin+1:])
db.connv[l-1] = nil db.pool[l-1] = nil
db.connv = db.connv[:l-1] db.pool = db.pool[:l-1]
if conn.db != db { if conn.db != db {
panic("DB.get: foreign connection in the pool") panic("DB.get: foreign connection in the pool")
...@@ -400,21 +411,19 @@ func (db *DB) put(conn *Connection) { ...@@ -400,21 +411,19 @@ func (db *DB) put(conn *Connection) {
panic("DB.put: conn.db != db") panic("DB.put: conn.db != db")
} }
conn.txn = nil
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()
// XXX check if len(connv) > X, and drop conn if yes // XXX check if len(pool) > X, and drop conn if yes
// [i-1].at ≤ at < [i].at // [i-1].at ≤ at < [i].at
i := sort.Search(len(db.connv), func(i int) bool { i := sort.Search(len(db.pool), func(i int) bool {
return conn.at < db.connv[i].at return conn.at < db.pool[i].at
}) })
//db.connv = append(db.connv[:i], conn, db.connv[i:]...) //db.pool = append(db.pool[:i], conn, db.pool[i:]...)
db.connv = append(db.connv, nil) db.pool = append(db.pool, nil)
copy(db.connv[i+1:], db.connv[i:]) copy(db.pool[i+1:], db.pool[i:])
db.connv[i] = conn db.pool[i] = conn
// XXX GC too idle connections here? // XXX GC too idle connections here?
} }
...@@ -434,7 +443,10 @@ func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) { ...@@ -434,7 +443,10 @@ func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) {
conn := (*Connection)(csync) conn := (*Connection)(csync)
conn.checkTxn(txn, "AfterCompletion") conn.checkTxn(txn, "AfterCompletion")
// XXX check that conn was explicitly closed by user? // mark the connection as no longer being live
conn.txn = nil
if !conn.flags & noPool {
conn.db.put(conn) conn.db.put(conn)
}
} }
...@@ -222,7 +222,7 @@ func TestPersistentDB(t *testing.T) { ...@@ -222,7 +222,7 @@ func TestPersistentDB(t *testing.T) {
conn1, err := db.Open(ctx1, &ConnOptions{}); X(err) conn1, err := db.Open(ctx1, &ConnOptions{}); X(err)
println("555") println("555")
assert.Equal(conn1.At(), at1) assert.Equal(conn1.At(), at1)
assert.Equal(db.connv, []*Connection(nil)) assert.Equal(db.pool, []*Connection(nil))
assert.Equal(conn1.db, db) assert.Equal(conn1.db, db)
assert.Equal(conn1.txn, txn1) assert.Equal(conn1.txn, txn1)
...@@ -290,7 +290,7 @@ func TestPersistentDB(t *testing.T) { ...@@ -290,7 +290,7 @@ func TestPersistentDB(t *testing.T) {
txn2, ctx2 := transaction.New(ctx) txn2, ctx2 := transaction.New(ctx)
conn2, err := db.Open(ctx2, &ConnOptions{}); X(err) conn2, err := db.Open(ctx2, &ConnOptions{}); X(err)
assert.Equal(conn2.At(), at2) assert.Equal(conn2.At(), at2)
assert.Equal(db.connv, []*Connection(nil)) assert.Equal(db.pool, []*Connection(nil))
assert.Equal(conn2.db, db) assert.Equal(conn2.db, db)
assert.Equal(conn2.txn, txn2) assert.Equal(conn2.txn, txn2)
...@@ -333,7 +333,7 @@ func TestPersistentDB(t *testing.T) { ...@@ -333,7 +333,7 @@ func TestPersistentDB(t *testing.T) {
assert.Equal(conn1.txn, txn1) assert.Equal(conn1.txn, txn1)
txn1.Abort() txn1.Abort()
assert.Equal(conn1.txn, nil) assert.Equal(conn1.txn, nil)
assert.Equal(db.connv, []*Connection{conn1}) assert.Equal(db.pool, []*Connection{conn1})
println("BBB") println("BBB")
...@@ -345,7 +345,7 @@ func TestPersistentDB(t *testing.T) { ...@@ -345,7 +345,7 @@ func TestPersistentDB(t *testing.T) {
assert.Equal(conn1.At(), at2) assert.Equal(conn1.At(), at2)
assert.Equal(conn1.db, db) assert.Equal(conn1.db, db)
assert.Equal(conn1.txn, txn3) assert.Equal(conn1.txn, txn3)
assert.Equal(db.connv, []*Connection{}) assert.Equal(db.pool, []*Connection{})
// XXX ctx1 = ctx3 (not to use 3 below) ? // XXX ctx1 = ctx3 (not to use 3 below) ?
println("CCC") println("CCC")
......
...@@ -86,7 +86,7 @@ func pyGetState(obj PyStateful, objClass string) *mem.Buf { ...@@ -86,7 +86,7 @@ func pyGetState(obj PyStateful, objClass string) *mem.Buf {
// It only returns decoded database data. // It only returns decoded database data.
func (conn *Connection) loadpy(ctx context.Context, oid Oid) (class string, pystate interface{}, serial Tid, _ error) { func (conn *Connection) loadpy(ctx context.Context, oid Oid) (class string, pystate interface{}, serial Tid, _ error) {
xid := Xid{Oid: oid, At: conn.at} xid := Xid{Oid: oid, At: conn.at}
buf, serial, err := conn.stor.Load(ctx, xid) buf, serial, err := conn.db.stor.Load(ctx, xid)
if err != nil { if err != nil {
return "", nil, 0, err return "", nil, 0, err
} }
...@@ -96,7 +96,7 @@ func (conn *Connection) loadpy(ctx context.Context, oid Oid) (class string, pyst ...@@ -96,7 +96,7 @@ func (conn *Connection) loadpy(ctx context.Context, oid Oid) (class string, pyst
pyclass, pystate, err := PyData(buf.Data).decode(conn) pyclass, pystate, err := PyData(buf.Data).decode(conn)
if err != nil { if err != nil {
err = &OpError{ err = &OpError{
URL: conn.stor.URL(), URL: conn.db.stor.URL(),
Op: "loadpy", Op: "loadpy",
Args: xid, Args: xid,
Err: err, Err: err,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment