Commit 69fd5651 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent eae3b427
......@@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/internal/xzlib"
......@@ -77,6 +78,7 @@ type Client struct {
at0 zodb.Tid // at0 obtained when initially connecting to server
eventq0 []*zodb.EventCommit // buffer for initial messages, until .at0 is initialized
at0Initialized bool // true after .at0 is initialized
at0Ready chan(struct{}) // ready after .at0 is initialized
}
var _ zodb.IStorageDriver = (*Client)(nil)
......@@ -108,9 +110,6 @@ func (c *Client) Close() error {
c.talkMasterCancel()
// XXX wait talkMaster finishes -> XXX return err from that?
// XXX what else?
if c.watchq != nil {
close(c.watchq)
}
return nil
}
......@@ -222,6 +221,8 @@ func (c *Client) talkMaster(ctx context.Context) (err error) {
// XXX dup wrt Storage.talkMaster
for {
// XXX .nodeTab.Reset() ?
err := c.talkMaster1(ctx)
log.Warning(ctx, err) // XXX Warning ok? -> Error?
// XXX if err == "reject identification / protocol error" -> shutdown client
......@@ -289,12 +290,68 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
return wg.Wait()
}
// initFromMaster asks M for partTab and DB head right after identification.
func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "init")(&err)
// query partTab
rpt := proto.AnswerPartitionTable{}
err = mlink.Ask1(&proto.AskPartitionTable{}, &rpt)
if err != nil {
return err
}
pt := PartTabFromDump(rpt.PTid, rpt.RowList)
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
c.node.StateMu.Lock()
c.node.PartTab = pt
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
// query last_tid
lastTxn := proto.AnswerLastTransaction{}
err = mlink.Ask1(&proto.LastTransaction{}, &lastTxn)
if err != nil {
return err
}
if c.at0Initialized {
// XXX c.head locking?
if lastTxn.Tid != c.head {
return fmt.Errorf("New transactions were committed while we were disconnected from master (%s -> %s)", c.head, lastTxn.Tid)
}
} // XXX -> else?
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
c.at0Mu.Lock()
c.at0 = lastTxn.Tid
c.at0Initialized = true
// c.flushEventq0() XXX reenable
c.at0Mu.Unlock()
// XXX what next?
return nil
// TODO transaction control? -> better in original goroutines doing the txn (just share mlink)
}
// recvMaster receives and handles notifications from master
func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "rx")(&err)
// XXX .nodeTab.Reset()
for {
req, err := mlink.Recv1()
if err != nil {
......@@ -345,61 +402,6 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
return nil
}
func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "init")(&err)
// ask M for PT
rpt := proto.AnswerPartitionTable{}
err = mlink.Ask1(&proto.AskPartitionTable{}, &rpt)
if err != nil {
return err
}
pt := PartTabFromDump(rpt.PTid, rpt.RowList)
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
c.node.StateMu.Lock()
c.node.PartTab = pt
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
// ask M about last_tid
lastTxn := proto.AnswerLastTransaction{}
err = mlink.Ask1(&proto.LastTransaction{}, &lastTxn)
if err != nil {
return err
}
if c.at0Initialized {
// XXX c.head locking?
if lastTxn.Tid != c.head {
return fmt.Errorf("New transactions were committed while we were disconnected from master (%s -> %s)", c.head, lastTxn.Tid)
}
} // XXX -> else?
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
c.at0Mu.Lock()
c.at0 = lastTxn.Tid
c.at0Initialized = true
// c.flushEventq0() XXX reenable
c.at0Mu.Unlock()
// XXX what next?
return nil
// TODO transaction control? -> better in original goroutines doing the txn (just share mlink)
}
// --- user API calls ---
func (c *Client) Sync(ctx context.Context) (_ zodb.Tid, err error) {
......@@ -530,67 +532,58 @@ func (c *Client) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
// ---- ZODB open/url support ----
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, zodb.Tid, error) {
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb.IStorageDriver, _ zodb.Tid, err error) {
// neo://name@master1,master2,...,masterN?options
defer xerr.Contextf(&err, "neo: open %s", u)
if u.User == nil {
return nil, zodb.InvalidTid, fmt.Errorf("neo: open %q: cluster name not specified", u)
return nil, zodb.InvalidTid, fmt.Errorf("cluster name not specified")
}
// XXX readonly stub
// XXX place = ?
if !opt.ReadOnly {
return nil, zodb.InvalidTid, fmt.Errorf("neo: %s: TODO write mode not implemented", u)
}
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
// TODO watcher(when implementing): filter-out first < at0 messages.
if opt.Watchq != nil {
log.Error(ctx, "neo: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented")
}
// XXX check/use other url fields
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
// XXX we are not passing ctx to NewClient - right?
// as ctx for open can be done after open finishes - not covering
// whole storage working lifetime.
c := NewClient(u.User.Username(), u.Host, net)
c.watchq = opt.Watchq
defer func() {
if err != nil {
c.Close()
}
}()
// XXX go c.Run()
// XXX wait c.ready
// start client node serve loop
errq := make(chan error, 1)
go func() {
err := c.Run(context.Background()) // NOTE not ctx
/*
lastTid, err := c.Sync(ctx)
if err != nil {
c.Close() // XXX lclose
return nil, zodb.InvalidTid, fmt.Errorf("neo: open %q: %s", u, err)
}
// close .watchq after serve is over
c.at0Mu.Lock()
defer c.at0Mu.Unlock()
if c.at0Initialized {
// c.flushEventq0()
}
if c.watchq != nil {
if err != nil {
c.watchq <- &zodb.EventError{Err: err}
}
close(c.watchq)
}
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
//
// XXX -> move talkMaster1 / Client.Run
c.at0Mu.Lock()
c.at0 = lastTid
c.at0Initialized = true
c.flushEventq0()
c.at0Mu.Unlock()
*/
errq <- err
}()
return c, c.at0, nil
select {
case <-ctx.Done():
return nil, zodb.InvalidTid, ctx.Err()
case <-errq:
return nil, zodb.InvalidTid, err
case <-c.at0Ready:
return c, c.at0, nil
}
}
func (c *Client) URL() string {
......
......@@ -390,7 +390,7 @@ func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply {
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb.IStorageDriver, at0 zodb.Tid, err error) {
url := u.String()
defer xerr.Contextf(&err, "open %s", url)
defer xerr.Contextf(&err, "zeo: open %s", url)
// zeo://host:port/path?storage=...&...
var net xnet.Networker
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment