Commit 1f3f884f authored by Kirill Smelkov's avatar Kirill Smelkov

X Took decision that driver should give at₀ and guarantee that watchq will...

X Took decision that driver should give at₀ and guarantee that watchq will receive only and all events in (at₀, +∞] range
parent 59649ba3
......@@ -500,21 +500,22 @@ func (c *Client) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
// ---- ZODB open/url support ----
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, error) {
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, zodb.Tid, error) {
// neo://name@master1,master2,...,masterN?options
if u.User == nil {
return nil, fmt.Errorf("neo: open %q: cluster name not specified", u)
return nil, zodb.InvalidTid, fmt.Errorf("neo: open %q: cluster name not specified", u)
}
// XXX readonly stub
// XXX place = ?
if !opt.ReadOnly {
return nil, fmt.Errorf("neo: %s: TODO write mode not implemented", u)
return nil, zodb.InvalidTid, fmt.Errorf("neo: %s: TODO write mode not implemented", u)
}
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
// TODO watcher(when implementing): filter-out first < at0 messages.
if opt.Watchq != nil {
log.Error(ctx, "neo: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
......@@ -528,7 +529,23 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
// whole storage working lifetime.
c := NewClient(u.User.Username(), u.Host, net)
c.watchq = opt.Watchq
return c, nil
// XXX since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
at0, err := c.LastTid(ctx)
if err != nil {
c.Close() // XXX lclose
return nil, zodb.InvalidTid, fmt.Errorf("neo: open %q: %s", u, err)
}
return c, at0, nil
}
func (c *Client) URL() string {
......
......@@ -49,7 +49,7 @@ type Backend struct {
var _ storage.Backend = (*Backend)(nil)
func Open(ctx context.Context, path string) (*Backend, error) {
zstor, err := fs1.Open(ctx, path, &zodb.DriverOptions{ReadOnly: true}) // XXX RO? +Watchq?
zstor, _, err := fs1.Open(ctx, path, &zodb.DriverOptions{ReadOnly: true}) // XXX RO? +Watchq?
if err != nil {
return nil, err
}
......
......@@ -41,7 +41,7 @@ func gox(wg interface { Go(func() error) }, xf func()) {
}
func xfs1stor(path string) *zfs1.FileStorage {
stor, err := zfs1.Open(bg, path, &zodb.DriverOptions{ReadOnly: true}) // XXX opts = ?
stor, _, err := zfs1.Open(bg, path, &zodb.DriverOptions{ReadOnly: true}) // XXX opts = ?
exc.Raiseif(err)
return stor
}
......
......@@ -102,7 +102,7 @@ type DB struct {
// and keep δtail coverage for Tδkeep time
//
// timelen(δtail) = Tδkeep
δtail *ΔTail // [](rev↑, []oid)
δtail *ΔTail // [](rev↑, []oid)
tδkeep time.Duration
// openers waiting for δtail.Head to become covering their at.
......@@ -174,21 +174,22 @@ type δwaiter struct {
// watcher receives events about new committed transactions and updates δtail.
//
// it also wakes up δtail waiters.
// it also notifies δtail waiters.
func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
for {
event, ok := <-watchq
if !ok {
fmt.Printf("db: watcher: close")
fmt.Printf("db: watcher: close\n")
// XXX wake up all waiters?
return // closed
}
fmt.Printf("db: watcher <- %v", event)
fmt.Printf("db: watcher <- %v\n", event)
var readyv []chan struct{} // waiters that become ready
db.mu.Lock()
db.δtail.Append(event.Tid, event.Changev)
for w := range db.δwait {
if w.at <= event.Tid {
......@@ -206,7 +207,7 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
// wakeup waiters outside of db.mu
for _, ready := range readyv {
fmt.Printf("db: watcher: wakeup %v", ready)
fmt.Printf("db: watcher: wakeup %v\n", ready)
close(ready)
}
}
......@@ -238,6 +239,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
txn := transaction.Current(ctx)
// find out db state we should open at
at := opt.At
if at == 0 {
head := Tid(0)
......@@ -265,8 +267,15 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at = head
}
db.mu.Lock()
// unlock is either in resyncAndDBUnlock, or in db.openOrDBUnlock -> err
// proceed to open(at)
db.mu.Lock() // unlocked in *DBUnlock
/*
err := db.needHeadOrDBUnlock(ctx, at) // XXX what if δtail !init yet?
if err != nil {
return nil, err
}
*/
conn, err := db.openOrDBUnlock(ctx, at, opt.NoPool)
if err != nil {
......@@ -287,6 +296,7 @@ func (db *DB) openOrDBUnlock(ctx context.Context, at Tid, noPool bool) (*Connect
fmt.Printf("db.openx %s %v\t; δtail (%s, %s]\n", at, noPool, db.δtail.Tail(), db.δtail.Head())
// NoPool connection - create one anew
if noPool {
// XXX wait for at to be covered
conn := newConnection(db, at)
conn.noPool = true
return conn, nil
......@@ -354,7 +364,7 @@ retry:
// cache, even if maybe in ghost state (e.g. if they have to be invalidated due
// to database changes).
//
// Resync can be used several times.
// Resync can be used many times.
//
// Resync must be used only under the following conditions:
//
......@@ -406,7 +416,7 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) {
δall := false // if we have to invalidate all objects
// both conn.at and at are covered by δtail - we can invalidate selectively
if (δtail.Tail() < conn.at && conn.at <= δtail.Head()) &&
if (δtail.Tail() < conn.at && conn.at <= δtail.Head()) && // XXX conn.at can = δtail.Tail
(δtail.Tail() < at && at <= δtail.Head()) {
var δv []δRevEntry
if conn.at <= at {
......@@ -431,13 +441,14 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) {
// unlock db before locking cache and txn
db.mu.Unlock()
// XXX -> separate func? (then we can drop "AndDBUnlock")
conn.cache.Lock()
defer conn.cache.Unlock()
if δall {
// XXX keep synced with LiveCache details
// XXX -> conn.cache.forEach?
// XXX should we wait for db.stor.head to cover at?
// XXX should we wait for db.stor.head to cover at? FIXME openOrDBUnlock does this
// or leave this wait till .Load() time?
for _, wobj := range conn.cache.objtab {
obj, _ := wobj.Get().(IPersistent)
......@@ -460,7 +471,7 @@ func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) {
// get returns connection from db pool most close to at with conn.at ∈ [atMin, at].
//
// XXX recheck [atMin or (atMin
// XXX recheck [atMin or (atMin -- see "= δtail.Tail" in resyncAndDBUnlock.
//
// if there is no such connection in the pool - nil is returned.
// must be called with db.mu locked.
......
......@@ -48,12 +48,18 @@ type DriverOptions struct {
//
// The storage driver closes !nil Watchq when the driver is closed.
//
// The storage driver will send only and all events in (at₀, +∞] range,
// where at₀ is at returned by driver open.
//
// TODO extend watchq to also receive errors from watcher.
Watchq chan<- CommitEvent
}
// DriverOpener is a function to open a storage driver.
type DriverOpener func (ctx context.Context, u *url.URL, opt *DriverOptions) (IStorageDriver, error)
//
// at₀ gives database state at open time. The driver will send to Watchq (see
// DriverOptions) only and all events in (at₀, +∞] range.
type DriverOpener func (ctx context.Context, u *url.URL, opt *DriverOptions) (_ IStorageDriver, at0 Tid, _ error)
// {} scheme -> DriverOpener
var driverRegistry = map[string]DriverOpener{}
......@@ -100,7 +106,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
Watchq: drvWatchq,
}
storDriver, err := opener(ctx, u, drvOpt)
storDriver, at0, err := opener(ctx, u, drvOpt)
if err != nil {
return nil, err
}
......@@ -117,6 +123,8 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
cache = nil
}
_ = at0
// XXX stor.δtail - init with (at0, at]
stor := &storage{
IStorageDriver: storDriver,
l1cache: cache,
......
......@@ -287,6 +287,7 @@ func TestPersistentDB(t *testing.T) {
println("000")
// new db connection should see the change
// XXX currently there is a race because db.Open does not do proper Sync
txn2, ctx2 := transaction.New(ctx)
conn2, err := db.Open(ctx2, &ConnOptions{}); X(err)
assert.Equal(conn2.At(), at2)
......
......@@ -730,10 +730,10 @@ func (fs *FileStorage) Close() error {
}
// Open opens FileStorage @path.
func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileStorage, err error) {
func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileStorage, at0 zodb.Tid, err error) {
// TODO read-write support
if !opt.ReadOnly {
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
return nil, zodb.InvalidTid, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
}
fs := &FileStorage{
......@@ -743,7 +743,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
f, err := os.Open(path)
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
fs.file = f
defer func() {
......@@ -758,7 +758,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
fh := FileHeader{}
err = fh.Load(f)
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
// load index
......@@ -791,7 +791,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
checkTailGarbage = true
}
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
fs.index = index
......@@ -802,7 +802,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll)
err = noEOF(err)
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
_ = fs.txnhMax.Load(f, index.TopPos, LoadAll)
......@@ -811,19 +811,21 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
// that we read .LenPrev ok.
switch fs.txnhMax.LenPrev {
case -1:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
return nil, zodb.InvalidTid, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
case 0:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction): unexpected EOF backward", f.Name(), fs.txnhMax.Pos)
return nil, zodb.InvalidTid, fmt.Errorf("%s: could not read LenPrev @%d (last transaction): unexpected EOF backward", f.Name(), fs.txnhMax.Pos)
default:
// .LenPrev is ok - read last previous record
err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
}
}
at0 = fs.txnhMax.Tid
// there might be simultaneous updates to the data file from outside.
// launch the watcher who will observe them.
//
......@@ -831,12 +833,12 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
// race of missing early file writes.
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
err = w.Add(f.Name())
if err != nil {
w.Close() // XXX lclose
return nil, err
return nil, zodb.InvalidTid, err
}
var errFirstRead chan error
......@@ -851,14 +853,14 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
if checkTailGarbage {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, zodb.InvalidTid, ctx.Err()
case err = <-errFirstRead:
if err != nil {
return nil, err // it was garbage
return nil, zodb.InvalidTid, err // it was garbage
}
}
}
return fs, nil
return fs, at0, nil
}
......@@ -110,22 +110,22 @@ func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect objState) {
}
}
func xfsopen(t testing.TB, path string) *FileStorage {
func xfsopen(t testing.TB, path string) (*FileStorage, zodb.Tid) {
t.Helper()
return xfsopenopt(t, path, &zodb.DriverOptions{ReadOnly: true})
}
func xfsopenopt(t testing.TB, path string, opt *zodb.DriverOptions) *FileStorage {
func xfsopenopt(t testing.TB, path string, opt *zodb.DriverOptions) (*FileStorage, zodb.Tid) {
t.Helper()
fs, err := Open(context.Background(), path, opt)
fs, at0, err := Open(context.Background(), path, opt)
if err != nil {
t.Fatal(err)
}
return fs
return fs, at0
}
func TestLoad(t *testing.T) {
fs := xfsopen(t, "testdata/1.fs")
fs, _ := xfsopen(t, "testdata/1.fs")
defer exc.XRun(fs.Close)
// current knowledge of what was "before" for an oid as we scan over
......@@ -155,7 +155,7 @@ func TestLoad(t *testing.T) {
}
// load at ∞ with TidMax
// XXX should we get "no such transaction" with at > head?
// XXX should we get "no such transaction" with at > head? - yes
for oid, expect := range before {
xid := zodb.Xid{zodb.TidMax, oid}
checkLoad(t, fs, xid, expect)
......@@ -274,7 +274,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
}
func TestIterate(t *testing.T) {
fs := xfsopen(t, "testdata/1.fs")
fs, _ := xfsopen(t, "testdata/1.fs")
defer exc.XRun(fs.Close)
// all []tids in test database
......@@ -310,7 +310,7 @@ func TestIterate(t *testing.T) {
}
func BenchmarkIterate(b *testing.B) {
fs := xfsopen(b, "testdata/1.fs")
fs, _ := xfsopen(b, "testdata/1.fs")
defer exc.XRun(fs.Close)
ctx := context.Background()
......@@ -378,7 +378,10 @@ func TestWatch(t *testing.T) {
at := xcommit(0, xtesting.ZRawObject{0, b("data0")})
watchq := make(chan zodb.CommitEvent)
fs := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq})
fs, at0 := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq})
if at0 != at {
t.Fatalf("opened @ %s ; want %s", at0, at)
}
ctx := context.Background()
checkLastTid := func(lastOk zodb.Tid) {
......@@ -476,10 +479,13 @@ func TestOpenRecovery(t *testing.T) {
}
for _, l := range lok {
checkL(t, l, func(t *testing.T, tfs string) {
fs := xfsopen(t, tfs)
fs, at0 := xfsopen(t, tfs)
defer func() {
err = fs.Close(); X(err)
}()
if at0 != lastTidOk {
t.Fatalf("at0: %s ; expected: %s", at0, lastTidOk)
}
head, err := fs.LastTid(ctx); X(err)
if head != lastTidOk {
t.Fatalf("last_tid: %s ; expected: %s", head, lastTidOk)
......@@ -491,7 +497,7 @@ func TestOpenRecovery(t *testing.T) {
// XXX better check 0..sizeof(txnh)-1 but in this range each check is slow.
for _, l := range []int{TxnHeaderFixSize-1,1} {
checkL(t, l, func(t *testing.T, tfs string) {
_, err := Open(ctx, tfs, &zodb.DriverOptions{ReadOnly: true})
_, _, err := Open(ctx, tfs, &zodb.DriverOptions{ReadOnly: true})
estr := ""
if err != nil {
estr = err.Error()
......
......@@ -27,13 +27,12 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb"
)
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, error) {
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, zodb.Tid, error) {
// TODO handle query
// XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path
fs, err := Open(ctx, path, opt)
return fs, err
return Open(ctx, path, opt)
}
func init() {
......
......@@ -283,7 +283,7 @@ func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply {
// ---- open ----
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb.IStorageDriver, err error) {
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb.IStorageDriver, at0 zodb.Tid, err error) {
url := u.String()
defer xerr.Contextf(&err, "open %s:", url)
......@@ -307,11 +307,12 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
}
if !opt.ReadOnly {
return nil, fmt.Errorf("TODO write mode not implemented")
return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented")
}
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
// TODO watcher(when implementing): filter-out first < at0 messages.
if opt.Watchq != nil {
log.Print("zeo: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
......@@ -319,7 +320,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
zl, err := dialZLink(ctx, net, addr) // XXX + methodTable {invalidateTransaction tid, oidv} -> ...
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
defer func() {
......@@ -334,7 +335,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
// register returns last_tid in ZEO5 but nothing earlier.
......@@ -343,17 +344,24 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
rpc = z.rpc("lastTransaction")
xlastTid, err = rpc.call(ctx)
if err != nil {
return nil, err
return nil, zodb.InvalidTid, err
}
}
lastTid, ok := tidUnpack(xlastTid) // XXX -> xlastTid -> scan
if !ok {
return nil, rpc.ereplyf("got %v; expect tid", xlastTid)
return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid)
}
z.lastTid = lastTid
// XXX since we read lastTid, at least with ZEO < 5, in separate RPC
// call, there is a chance, that by the time when lastTid was read some
// new transactions were committed. This way lastTid will be > than
// some first transactions received by watcher via
// "invalidateTransaction" server notification.
at0 = lastTid
//call('get_info') -> {}str->str, ex // XXX can be omitted
/*
......@@ -371,7 +379,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
'supports_record_iternext': True})
*/
return z, nil
return z, at0, nil
}
func (z *zeo) Close() error {
......
......@@ -522,7 +522,6 @@ class Application(BaseApplication):
client_node.send(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId())
else:
# NOTE notifies all clients irregardless of whether who was subscribed
client_node.send(invalidate_objects)
# Unlock Information to relevant storage nodes.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment