Commit db81e0de authored by Levin Zimmermann's avatar Levin Zimmermann Committed by Kirill Smelkov

X: Teach NEO/go to handle multiple master nodes

See kirr/neo!2 for discussion,
context and details.

/reviewed-by @kirr

* t-with-multiple-master-nodes:
  fixup! client_test: Add nmaster={1,2} to test matrix
  fixup! client_test: Support test cluster /w >1 master
  fixup! TalkMaster: Switch master if dialed M is secondary
  fixup! Node: Add support for NEO cluster with > 1 master
  fixup! Dial: Catch NotPrimaryMaster & return custom error
  fixup! proto: Implement Error for NotPrimaryMaster
  fixup! proto.NotPrimaryMaster: Fix .Primary data type (2)
  fixup! proto.NotPrimaryMaster: Fix .Primary data type (1)
  client_test: Add nmaster={1,2} to test matrix
  client_test: Support test cluster /w >1 master
  proto.NotPrimaryMaster: Fix .Primary data type
  TalkMaster: Switch master if dialed M is secondary
  Dial: Catch NotPrimaryMaster & return custom error
  proto: Implement Error for NotPrimaryMaster
  openClientByURL: Fix for >1 master (split URL host)
  Client.URL: Fix incomplete URL if > 1 master nodes
  Node: Add support for NEO cluster with > 1 master
parents 7a0674c2 4605cba1
// Copyright (C) 2017-2021 Nexedi SA and Contributors. // Copyright (C) 2017-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -78,11 +78,11 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -78,11 +78,11 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// NewClient creates new client node. // NewClient creates new client node.
// //
// It will connect to master @masterAddr and identify with specified cluster name. // It will connect to a master with address from masterAddrSlice and identify with specified cluster name.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName string, masterAddrSlice []string, net xnet.Networker) *Client {
c := &Client{ c := &Client{
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddrSlice),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
...@@ -483,7 +483,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) ( ...@@ -483,7 +483,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
return nil, zodb.InvalidTid, err return nil, zodb.InvalidTid, err
} }
c := NewClient(name, u.Host, net) c := NewClient(name, strings.Split(u.Host, ","), net)
c.ownNet = true c.ownNet = true
c.watchq = opt.Watchq c.watchq = opt.Watchq
defer func() { defer func() {
...@@ -542,7 +542,8 @@ func (c *Client) URL() string { ...@@ -542,7 +542,8 @@ func (c *Client) URL() string {
if strings.Contains(c.node.Net.Network(), "+tls") { if strings.Contains(c.node.Net.Network(), "+tls") {
zurl += "s" zurl += "s"
} }
zurl += fmt.Sprintf("://%s/%s", c.node.MasterAddr, c.node.ClusterName) zurl += fmt.Sprintf("://%s/%s", strings.Join(c.node.MasterAddrSlice, ","), c.node.ClusterName,
)
return zurl return zurl
} }
......
...@@ -63,7 +63,7 @@ type NEOSrv interface { ...@@ -63,7 +63,7 @@ type NEOSrv interface {
type NEOSrvOptions struct { type NEOSrvOptions struct {
workdir string // location for database and log files workdir string // location for database and log files
name string // name of the cluster name string // name of the cluster
// nmaster nmaster int // how many masters our cluster has
// npartition // npartition
// nreplica // nreplica
...@@ -82,7 +82,7 @@ type NEOPySrv struct { ...@@ -82,7 +82,7 @@ type NEOPySrv struct {
done chan struct{} // ready after Wait completes done chan struct{} // ready after Wait completes
errExit error // error from Wait errExit error // error from Wait
masterAddr string // address of master in spawned cluster masterAddrSlice []string // addresses of master in spawned cluster
} }
func (_ *NEOPySrv) Bugs() []string { func (_ *NEOPySrv) Bugs() []string {
...@@ -96,11 +96,11 @@ func (_ *NEOPySrv) BugEncFixed() (bool, proto.Encoding) { ...@@ -96,11 +96,11 @@ func (_ *NEOPySrv) BugEncFixed() (bool, proto.Encoding) {
// StartNEOPySrv starts NEO/py server specified by options. // StartNEOPySrv starts NEO/py server specified by options.
func StartNEOPySrv(opt NEOSrvOptions) (_ *NEOPySrv, err error) { func StartNEOPySrv(opt NEOSrvOptions) (_ *NEOPySrv, err error) {
workdir := opt.workdir defer xerr.Contextf(&err, "start neo/py %s", opt)
defer xerr.Contextf(&err, "start neo/py %s/%s", workdir, opt.name)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
workdir := opt.workdir
readyf := workdir + "/ready" readyf := workdir + "/ready"
err = os.Remove(readyf) err = os.Remove(readyf)
if os.IsNotExist(err) { if os.IsNotExist(err) {
...@@ -113,11 +113,11 @@ func StartNEOPySrv(opt NEOSrvOptions) (_ *NEOPySrv, err error) { ...@@ -113,11 +113,11 @@ func StartNEOPySrv(opt NEOSrvOptions) (_ *NEOPySrv, err error) {
n := &NEOPySrv{opt: opt, cancel: cancel, done: make(chan struct{})} n := &NEOPySrv{opt: opt, cancel: cancel, done: make(chan struct{})}
// TODO set $PYTHONPATH to top, so that `import neo` works without `pip install -e .` // TODO set $PYTHONPATH to top, so that `import neo` works without `pip install -e .`
n.pysrv = xexec.Command("./py/runneo.py", workdir, opt.name) n.pysrv = xexec.Command("./py/runneo.py", workdir, opt.name, "master_count="+fmt.Sprintf("%d", opt.nmaster))
if opt.SSL { if opt.SSL {
n.pysrv.Args = append(n.pysrv.Args, "ca=" +opt.CA()) n.pysrv.Args = append(n.pysrv.Args, fmt.Sprintf("ca=%q", opt.CA()))
n.pysrv.Args = append(n.pysrv.Args, "cert="+opt.Cert()) n.pysrv.Args = append(n.pysrv.Args, fmt.Sprintf("cert=%q", opt.Cert()))
n.pysrv.Args = append(n.pysrv.Args, "key=" +opt.Key()) n.pysrv.Args = append(n.pysrv.Args, fmt.Sprintf("key=%q", opt.Key()))
} }
// $TEMP -> workdir (else NEO/py creates another one for e.g. coverage) // $TEMP -> workdir (else NEO/py creates another one for e.g. coverage)
n.pysrv.Env = append(os.Environ(), "TEMP="+workdir) n.pysrv.Env = append(os.Environ(), "TEMP="+workdir)
...@@ -160,12 +160,12 @@ func StartNEOPySrv(opt NEOSrvOptions) (_ *NEOPySrv, err error) { ...@@ -160,12 +160,12 @@ func StartNEOPySrv(opt NEOSrvOptions) (_ *NEOPySrv, err error) {
time.Sleep(10*time.Millisecond) time.Sleep(10*time.Millisecond)
} }
// retrieve master address // retrieve masters addresses
masterAddr, err := ioutil.ReadFile(readyf) maddrv, err := ioutil.ReadFile(readyf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
n.masterAddr = string(masterAddr) n.masterAddrSlice = strings.Split(string(maddrv), " ")
return n, nil return n, nil
} }
...@@ -175,7 +175,7 @@ func (n *NEOPySrv) clusterName() string { ...@@ -175,7 +175,7 @@ func (n *NEOPySrv) clusterName() string {
} }
func (n *NEOPySrv) URL() string { func (n *NEOPySrv) URL() string {
return fmt.Sprintf("%s%s/%s", n.opt.URLPrefix(), n.masterAddr, n.clusterName()) return fmt.Sprintf("%s%s/%s", n.opt.URLPrefix(), strings.Join(n.masterAddrSlice, ","), n.clusterName())
} }
func (n *NEOPySrv) LogTail() (string, error) { func (n *NEOPySrv) LogTail() (string, error) {
...@@ -253,7 +253,10 @@ func (_ *NEOGoSrv) BugEncFixed() (bool, proto.Encoding) { ...@@ -253,7 +253,10 @@ func (_ *NEOGoSrv) BugEncFixed() (bool, proto.Encoding) {
// StartNEOGoSrv starts NEO/go server specified by options. // StartNEOGoSrv starts NEO/go server specified by options.
func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) { func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) {
defer xerr.Contextf(&err, "start neo/go %s/%s", opt.workdir, opt.name) defer xerr.Contextf(&err, "start neo/go %s", opt)
if opt.nmaster > 1 {
return nil, fmt.Errorf("TODO: nmaster > 1 not implemented")
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
serveWG := xsync.NewWorkGroup(ctx) serveWG := xsync.NewWorkGroup(ctx)
...@@ -290,7 +293,7 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) { ...@@ -290,7 +293,7 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
n.S = NewStorage(opt.name, n.masterAddr(), net, n.Sback) n.S = NewStorage(opt.name, n.masterAddrSlice(), net, n.Sback)
serveWG.Go(func(ctx context.Context) error { serveWG.Go(func(ctx context.Context) error {
return n.S.Run(ctx, n.Sl) return n.S.Run(ctx, n.Sl)
}) })
...@@ -353,12 +356,12 @@ func (n *NEOGoSrv) Close() (err error) { ...@@ -353,12 +356,12 @@ func (n *NEOGoSrv) Close() (err error) {
return err return err
} }
func (n *NEOGoSrv) masterAddr() string { func (n *NEOGoSrv) masterAddrSlice() []string {
return n.Ml.Addr().String() return []string {n.Ml.Addr().String()}
} }
func (n *NEOGoSrv) URL() string { func (n *NEOGoSrv) URL() string {
return fmt.Sprintf("%s%s/%s", n.opt.URLPrefix(), n.masterAddr(), n.opt.name) return fmt.Sprintf("%s%s/%s", n.opt.URLPrefix(), strings.Join(n.masterAddrSlice(), ","), n.opt.name)
} }
...@@ -366,6 +369,13 @@ func (n *NEOGoSrv) URL() string { ...@@ -366,6 +369,13 @@ func (n *NEOGoSrv) URL() string {
const npytests = "../../neo/tests/" const npytests = "../../neo/tests/"
// String returns human-readble representation of server options.
func (opt NEOSrvOptions) String() string {
s := fmt.Sprintf("%s/%s M%d ", opt.workdir, opt.name, opt.nmaster)
if opt.SSL { s += "ssl" } else { s += "!ssl" }
return s
}
// CA/Cert/Key files to use if opt.SSL=y; empty if SSL=n. // CA/Cert/Key files to use if opt.SSL=y; empty if SSL=n.
func (opt NEOSrvOptions) CA() string { func (opt NEOSrvOptions) CA() string {
if !opt.SSL { return "" } if !opt.SSL { return "" }
...@@ -435,101 +445,109 @@ func withNEOSrv(t *testing.T, f func(t *testing.T, nsrv NEOSrv), optv ...tOption ...@@ -435,101 +445,109 @@ func withNEOSrv(t *testing.T, f func(t *testing.T, nsrv NEOSrv), optv ...tOption
f(work) f(work)
} }
// TODO + all variants with nreplica=X, npartition=Y, nmaster=Z, ... ? // TODO + all variants with nreplica=X, npartition=Y, ... ?
for _, ssl := range []bool{false, true} { for _, nmaster := range []int{1, 2} {
kind := "" for _, ssl := range []bool{false, true} {
if ssl { kind = "ssl" } else { kind = "!ssl" } kind := fmt.Sprintf("M%d/", nmaster)
if ssl { kind += "ssl" } else { kind += "!ssl" }
neoOpt := NEOSrvOptions{ neoOpt := NEOSrvOptions{
name: "1", name: "1",
SSL: ssl, SSL: ssl,
} nmaster: nmaster,
}
// startNEOpy starts NEO/py server with database in workdir/ // startNEOpy starts NEO/py server with database in workdir/
// and preloads it with data according to opt.Preload. // and preloads it with data according to opt.Preload.
startNEOpy := func(t *testing.T, workdir string, opt tOptions) *NEOPySrv { startNEOpy := func(t *testing.T, workdir string, opt tOptions) *NEOPySrv {
X := xtesting.FatalIf(t) X := xtesting.FatalIf(t)
neoOpt := neoOpt neoOpt := neoOpt
neoOpt.workdir = workdir neoOpt.workdir = workdir
npy, err := StartNEOPySrv(neoOpt); X(err) npy, err := StartNEOPySrv(neoOpt); X(err)
if opt.Preload != "" { if opt.Preload != "" {
cmd := exec.Command("python", "-c", cmd := exec.Command("python", "-c",
"from neo.scripts.neomigrate import main; main()", "from neo.scripts.neomigrate import main; main()",
"-q", "-q",
"-c", npy.clusterName(), "-c", npy.clusterName(),
) )
if ssl { if ssl {
cmd.Args = append(cmd.Args, "--ca", neoOpt.CA()) cmd.Args = append(cmd.Args, "--ca", neoOpt.CA())
cmd.Args = append(cmd.Args, "--cert", neoOpt.Cert()) cmd.Args = append(cmd.Args, "--cert", neoOpt.Cert())
cmd.Args = append(cmd.Args, "--key", neoOpt.Key()) cmd.Args = append(cmd.Args, "--key", neoOpt.Key())
}
cmd.Args = append(cmd.Args,
opt.Preload,
// py internal representation of master_nodes is a
// string where each addr is separated by 1 space
strings.Join(npy.masterAddrSlice, " "),
)
cmd.Stdin = nil
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run(); X(err)
} }
cmd.Args = append(cmd.Args, return npy
opt.Preload,
npy.masterAddr,
)
cmd.Stdin = nil
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run(); X(err)
} }
return npy
}
// NEO/py // NEO/py
t.Run("py/"+kind, func(t *testing.T) { t.Run("py/"+kind, func(t *testing.T) {
t.Helper() t.Helper()
// TODO t.NeedPy(...) // TODO t.NeedPy(...)
inWorkDir(t, func(workdir string) { inWorkDir(t, func(workdir string) {
X := xtesting.FatalIf(t) X := xtesting.FatalIf(t)
npy := startNEOpy(t, workdir, opt) npy := startNEOpy(t, workdir, opt)
defer func() { defer func() {
err := npy.Close(); X(err) err := npy.Close(); X(err)
}() }()
f(t, npy) f(t, npy)
})
}) })
})
// NEO/go // NEO/go
t.Run("go/"+kind, func(t *testing.T) { t.Run("go/"+kind, func(t *testing.T) {
t.Helper() t.Helper()
inWorkDir(t, func(workdir string) { if neoOpt.nmaster > 1 {
X := xtesting.FatalIf(t) t.Skip("nmaster > 1 is TODO for NEO/go server")
}
inWorkDir(t, func(workdir string) {
X := xtesting.FatalIf(t)
neoOpt := neoOpt neoOpt := neoOpt
neoOpt.workdir = workdir neoOpt.workdir = workdir
// start NEO/py first. We need it to create the // start NEO/py first. We need it to create the
// database and to preload it, because NEO/go // database and to preload it, because NEO/go
// does not currently support commit. // does not currently support commit.
npy := startNEOpy(t, workdir, opt) npy := startNEOpy(t, workdir, opt)
err := npy.Close(); X(err) err := npy.Close(); X(err)
// start NEO/py again to flush ttrans/tobj -> trans/obj tables // start NEO/py again to flush ttrans/tobj -> trans/obj tables
// if preload was requested. We need this because currently // if preload was requested. We need this because currently
// NEO/go does not implement recovery (which is write operation), // NEO/go does not implement recovery (which is write operation),
// and NEO/py leaves data after ZODB commit in ttrans/tobj (to // and NEO/py leaves data after ZODB commit in ttrans/tobj (to
// be moved to trans/obj on next ZODB commit). See // be moved to trans/obj on next ZODB commit). See
// lab.nexedi.com/nexedi/neoppod/commit/7eb7cf1b // lab.nexedi.com/nexedi/neoppod/commit/7eb7cf1b
if opt.Preload != "" { if opt.Preload != "" {
optRestart := opt optRestart := opt
optRestart.Preload = "" optRestart.Preload = ""
npy = startNEOpy(t, workdir, optRestart) npy = startNEOpy(t, workdir, optRestart)
err = npy.Close(); X(err) err = npy.Close(); X(err)
} }
// start NEO/go, as the database is created and preloaded // start NEO/go, as the database is created and preloaded
ngo, err := StartNEOGoSrv(neoOpt); X(err) ngo, err := StartNEOGoSrv(neoOpt); X(err)
defer func() { defer func() {
err := ngo.Close(); X(err) err := ngo.Close(); X(err)
}() }()
f(t, ngo) f(t, ngo)
})
}) })
}) }
} }
} }
......
...@@ -160,7 +160,7 @@ func (_ *_ΔStateCode) δClusterState() {} ...@@ -160,7 +160,7 @@ func (_ *_ΔStateCode) δClusterState() {}
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewMaster(clusterName string, net xnet.Networker) *Master { func NewMaster(clusterName string, net xnet.Networker) *Master {
return &Master{ return &Master{
node: xneo.NewNode(proto.MASTER, clusterName, net, ""), node: xneo.NewNode(proto.MASTER, clusterName, net, []string{}),
ctlStart: make(chan chan error), ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}), ctlStop: make(chan chan struct{}),
...@@ -219,7 +219,8 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -219,7 +219,8 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
if err != nil { if err != nil {
return err return err
} }
m.node.MasterAddr = addr.String() // XXX How can this master node know the address of the other master nodes?
m.node.MasterAddrSlice = []string{addr.String()}
m.node.MyInfo = proto.NodeInfo{ m.node.MyInfo = proto.NodeInfo{
Type: proto.MASTER, Type: proto.MASTER,
Addr: naddr, Addr: naddr,
......
// Copyright (C) 2017-2021 Nexedi SA and Contributors. // Copyright (C) 2017-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -84,10 +84,12 @@ const ( ...@@ -84,10 +84,12 @@ const (
δPartTabPassThrough _MasteredNodeFlags = 1 << iota δPartTabPassThrough _MasteredNodeFlags = 1 << iota
) )
// newMasteredNode creates new _MasteredNode that connects to masterAddr/cluster via net. // newMasteredNode creates new _MasteredNode that connects to masters/cluster via net.
func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *_MasteredNode { //
// Addresses of known masters are specified by masterAddrSlice.
func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddrSlice []string) *_MasteredNode {
node := &_MasteredNode{ node := &_MasteredNode{
Node: xneo.NewNode(typ, clusterName, net, masterAddr), Node: xneo.NewNode(typ, clusterName, net, masterAddrSlice),
opReady: make(chan struct{}), opReady: make(chan struct{}),
} }
...@@ -108,19 +110,41 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -108,19 +110,41 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) { func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) {
// start logging with initial NID (that might be temporary, and which master can tell us to change) // start logging with initial NID (that might be temporary, and which master can tell us to change)
ctx0 := ctx ctx0 := ctx
defer task.Runningf(&ctx, "%s: talk master(%s)", node.MyInfo.NID, node.MasterAddr)(&err)
// When a node is created with "NewNode", we don't know yet, which of the
// the provided master nodes is the primary master. We'll figure this out here
// and simply start with the first node.
maddr := node.MasterAddrSlice[0]
defer task.Runningf(&ctx, "%s: talk master(%s)", node.MyInfo.NID, maddr)(&err)
for { for {
node.updateOperational(func() { node.updateOperational(func() {
node.mlink = nil node.mlink = nil
}) })
err := node.talkMaster1(ctx, ctx0, f) // TODO log "trying maddr as PM"
err := node.talkMaster1(ctx, ctx0, maddr, f)
log.Warning(ctx, err) // XXX Warning -> Error? log.Warning(ctx, err) // XXX Warning -> Error?
if errors.Is(err, cmdShutdown) { if errors.Is(err, cmdShutdown) {
return err // M commands to shutdown return err // M commands to shutdown
} }
// NotPrimaryMaster -> jump to trying what it says is primary
var notPrimaryMaster *proto.NotPrimaryMaster
if errors.As(err, &notPrimaryMaster) {
p := int(notPrimaryMaster.Primary)
if !(0 <= p && p < len(notPrimaryMaster.KnownMasterList)) {
log.Warning(ctx, "malformed NotPrimaryMaster reply - ignoring")
} else {
// TODO update masterRegistry from received KnownMasterList
primary := notPrimaryMaster.KnownMasterList[p]
maddr = primary.Address.String()
log.Info(ctx, "switching to try %s as primary master", maddr)
}
}
// TODO if err == "reject identification / protocol error" -> shutdown client? // TODO if err == "reject identification / protocol error" -> shutdown client?
// TODO if err == "not a primary" -> try redirected address // TODO if err == "cannot connect" -> start trying other master nodes from MasterAddrSlice
// exit on cancel / throttle reconnecting // exit on cancel / throttle reconnecting
select { select {
...@@ -134,7 +158,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -134,7 +158,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
} }
} }
func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error { func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, maddr string, f func(context.Context, *_MasterLink) error) error {
reqID := &proto.RequestIdentification{ reqID := &proto.RequestIdentification{
NodeType: node.MyInfo.Type, NodeType: node.MyInfo.Type,
NID: node.MyInfo.NID, NID: node.MyInfo.NID,
...@@ -144,7 +168,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -144,7 +168,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
DevPath: nil, // XXX stub DevPath: nil, // XXX stub
NewNID: nil, // XXX stub NewNID: nil, // XXX stub
} }
mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Net, node.MasterAddr, reqID) mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Net, maddr, reqID)
if err != nil { if err != nil {
return err return err
} }
......
// Copyright (C) 2017-2021 Nexedi SA and Contributors. // Copyright (C) 2017-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -56,6 +56,19 @@ func (e *Error) Error() string { ...@@ -56,6 +56,19 @@ func (e *Error) Error() string {
} }
func (n *NotPrimaryMaster) Error() string {
s := "not primary master; primary: "
p := int(n.Primary)
if 0 <= p && p < len(n.KnownMasterList) {
s += n.KnownMasterList[p].String()
} else {
s += "?"
}
s += fmt.Sprintf(" ; known masters: %v", n.KnownMasterList)
return s
}
// Set sets cluster state value to v. // Set sets cluster state value to v.
// //
// Use Set instead of direct assignment for ClusterState tracing to work. // Use Set instead of direct assignment for ClusterState tracing to work.
......
// Copyright (C) 2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package proto
import (
"testing"
)
func TestNotPrimaryMasterError(t *testing.T) {
var testv = []struct {
msg *NotPrimaryMaster
estrok string
}{
{&NotPrimaryMaster{},
"not primary master; primary: ? ; known masters: []",
},
{&NotPrimaryMaster{1, nil},
"not primary master; primary: ? ; known masters: []",
},
{&NotPrimaryMaster{1, []struct{Address}{{Address{"α",111}}, {Address{"β",222}}, {Address{"γ",333}}}},
"not primary master; primary: β:222 ; known masters: [α:111 β:222 γ:333]",
},
}
for _, tt := range testv {
estr := tt.msg.Error()
if estr != tt.estrok {
t.Errorf("%v: invalid error:\nhave: %q\nwant: %q", tt.msg, estr, tt.estrok)
}
}
}
// Copyright (C) 2006-2021 Nexedi SA and Contributors. // Copyright (C) 2006-2023 Nexedi SA and Contributors.
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -457,7 +457,7 @@ type AnswerPrimary struct { ...@@ -457,7 +457,7 @@ type AnswerPrimary struct {
// //
//neo:nodes SM -> * //neo:nodes SM -> *
type NotPrimaryMaster struct { type NotPrimaryMaster struct {
Primary NodeID // XXX PSignedNull in py Primary int32 // index of PM in KnownMasterList
KnownMasterList []struct { KnownMasterList []struct {
Address Address
} }
......
// Copyright (C) 2016-2021 Nexedi SA and Contributors. // Copyright (C) 2016-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -356,6 +356,18 @@ func TestMsgMarshal(t *testing.T) { ...@@ -356,6 +356,18 @@ func TestMsgMarshal(t *testing.T) {
hex("90"), hex("90"),
}, },
// NotPrimaryMaster (.Primary used to have wrong type)
{&NotPrimaryMaster{0x01020304, []struct{Address}{{Address{"m111", 111}}, {Address{"m222", 222}}}},
// N
u32(0x01020304) + u32(2) + u32(4)+"m111"+u16(111) + u32(4)+"m222"+u16(222),
// M
hex("92") +
hex("d2" + "01020304") +
hex("92") +
hex("9192") + hex("c4")+u8(4)+"m111" + u8(111) +
hex("9192") + hex("c4")+u8(4)+"m222" + hex("ccde"),
},
// TODO we need tests for: // TODO we need tests for:
// []varsize + trailing // []varsize + trailing
// map[]varsize + trailing // map[]varsize + trailing
......
...@@ -787,7 +787,7 @@ func (p *NotPrimaryMaster) neoMsgEncodedLenN() int { ...@@ -787,7 +787,7 @@ func (p *NotPrimaryMaster) neoMsgEncodedLenN() int {
} }
func (p *NotPrimaryMaster) neoMsgEncodeN(data []byte) { func (p *NotPrimaryMaster) neoMsgEncodeN(data []byte) {
binary.BigEndian.PutUint32(data[0:], uint32(int32(p.Primary))) binary.BigEndian.PutUint32(data[0:], uint32(p.Primary))
{ {
l := len(p.KnownMasterList) l := len(p.KnownMasterList)
binary.BigEndian.PutUint32(data[4:], uint32(l)) binary.BigEndian.PutUint32(data[4:], uint32(l))
...@@ -807,7 +807,7 @@ func (p *NotPrimaryMaster) neoMsgDecodeN(data []byte) (int, error) { ...@@ -807,7 +807,7 @@ func (p *NotPrimaryMaster) neoMsgDecodeN(data []byte) (int, error) {
if len(data) < 8 { if len(data) < 8 {
goto overflow goto overflow
} }
p.Primary = NodeID(int32(binary.BigEndian.Uint32(data[0 : 0+4]))) p.Primary = int32(binary.BigEndian.Uint32(data[0 : 0+4]))
{ {
l := binary.BigEndian.Uint32(data[4 : 4+4]) l := binary.BigEndian.Uint32(data[4 : 4+4])
data = data[8:] data = data[8:]
...@@ -836,13 +836,13 @@ func (p *NotPrimaryMaster) neoMsgEncodedLenM() int { ...@@ -836,13 +836,13 @@ func (p *NotPrimaryMaster) neoMsgEncodedLenM() int {
a := &p.KnownMasterList[i] a := &p.KnownMasterList[i]
size += msgpack.BinHeadSize(len((*a).Address.Host)) + len((*a).Address.Host) + msgpack.Uint16Size((*a).Address.Port) size += msgpack.BinHeadSize(len((*a).Address.Host)) + len((*a).Address.Host) + msgpack.Uint16Size((*a).Address.Port)
} }
return 1 + msgpack.Int32Size(int32(p.Primary)) + msgpack.ArrayHeadSize(len(p.KnownMasterList)) + len(p.KnownMasterList)*2 + size return 1 + msgpack.Int32Size(p.Primary) + msgpack.ArrayHeadSize(len(p.KnownMasterList)) + len(p.KnownMasterList)*2 + size
} }
func (p *NotPrimaryMaster) neoMsgEncodeM(data []byte) { func (p *NotPrimaryMaster) neoMsgEncodeM(data []byte) {
data[0] = byte(msgpack.FixArray_4 | 2) data[0] = byte(msgpack.FixArray_4 | 2)
{ {
n := msgpack.PutInt32(data[1:], int32(p.Primary)) n := msgpack.PutInt32(data[1:], p.Primary)
data = data[1+n:] data = data[1+n:]
} }
{ {
...@@ -882,7 +882,7 @@ func (p *NotPrimaryMaster) neoMsgDecodeM(data []byte) (int, error) { ...@@ -882,7 +882,7 @@ func (p *NotPrimaryMaster) neoMsgDecodeM(data []byte) (int, error) {
if err != nil { if err != nil {
return 0, mdecodeErr("NotPrimaryMaster.Primary", err) return 0, mdecodeErr("NotPrimaryMaster.Primary", err)
} }
p.Primary = NodeID(v) p.Primary = v
nread += uint64(len(data) - len(tail)) nread += uint64(len(data) - len(tail))
data = tail data = tail
} }
......
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2020-2021 Nexedi SA and Contributors. # Copyright (C) 2020-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -24,14 +24,14 @@ Usage: runneo.py <workdir> <cluster-name> [k1=v1] [k2=v2] ... ...@@ -24,14 +24,14 @@ Usage: runneo.py <workdir> <cluster-name> [k1=v1] [k2=v2] ...
{k->v} dict is optional arguments for NEOCluster. {k->v} dict is optional arguments for NEOCluster.
<workdir>/ready is created with address of master after spawned cluster becomes <workdir>/ready is created with addresses of masters after spawned cluster becomes
operational. operational.
""" """
from neo.tests.functional import NEOCluster from neo.tests.functional import NEOCluster
from golang import func, defer from golang import func, defer
import sys, os import sys, os, json
from time import sleep from time import sleep
from signal import signal, SIGTERM from signal import signal, SIGTERM
...@@ -45,6 +45,7 @@ def main(): ...@@ -45,6 +45,7 @@ def main():
kw = {'clear_databases': False} # switch default not to clear data on startup kw = {'clear_databases': False} # switch default not to clear data on startup
for arg in sys.argv[3:]: for arg in sys.argv[3:]:
k, v = arg.split('=') k, v = arg.split('=')
v = json.loads(v)
kw[k] = v kw[k] = v
...@@ -79,7 +80,7 @@ def main(): ...@@ -79,7 +80,7 @@ def main():
# dump information about ready cluster into readyf # dump information about ready cluster into readyf
with open("%s.tmp" % readyf, "w") as f: with open("%s.tmp" % readyf, "w") as f:
f.write(cluster.master_nodes) # XXX ' ' separated if multiple masters f.write(cluster.master_nodes) # NOTE ' ' separated if multiple masters
os.rename("%s.tmp" % readyf, readyf) # atomic os.rename("%s.tmp" % readyf, readyf) # atomic
def _(): def _():
......
...@@ -61,9 +61,9 @@ type Storage struct { ...@@ -61,9 +61,9 @@ type Storage struct {
// //
// The storage uses back as underlying backend for storing data. // The storage uses back as underlying backend for storing data.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage.Backend) *Storage { func NewStorage(clusterName string, masterAddrSlice []string, net xnet.Networker, back storage.Backend) *Storage {
return &Storage{ return &Storage{
node: newMasteredNode(proto.STORAGE, clusterName, net, masterAddr), node: newMasteredNode(proto.STORAGE, clusterName, net, masterAddrSlice),
back: back, back: back,
} }
} }
......
...@@ -319,7 +319,7 @@ func (t *tCluster) Storage(name string) ITestStorage { ...@@ -319,7 +319,7 @@ func (t *tCluster) Storage(name string) ITestStorage {
// {New,}Client are similar to {New,}Master but for client nodes. // {New,}Client are similar to {New,}Master but for client nodes.
func (t *tCluster) NewClient(name, masterAddr string) ITestClient { func (t *tCluster) NewClient(name, masterAddr string) ITestClient {
tnode := t.registerNewNode(name) tnode := t.registerNewNode(name)
c := NewClient(t.name, masterAddr, tnode.net) c := NewClient(t.name, []string{masterAddr}, tnode.net)
t.gotracer.RegisterNode(c.node.Node, name) t.gotracer.RegisterNode(c.node.Node, name)
t.runWG.Go(func(ctx context.Context) error { t.runWG.Go(func(ctx context.Context) error {
return c.Run(ctx) return c.Run(ctx)
...@@ -346,7 +346,7 @@ type tStorage struct { ...@@ -346,7 +346,7 @@ type tStorage struct {
func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back storage.Backend) *tStorage { func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back storage.Backend) *tStorage {
return &tStorage{ return &tStorage{
Storage: NewStorage(clusterName, masterAddr, net, back), Storage: NewStorage(clusterName, []string{masterAddr}, net, back),
serveAddr: serveAddr, serveAddr: serveAddr,
} }
} }
......
// Copyright (C) 2016-2021 Nexedi SA and Contributors. // Copyright (C) 2016-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -135,17 +135,43 @@ func Dial(ctx context.Context, typ proto.NodeType, net xnet.Networker, addr stri ...@@ -135,17 +135,43 @@ func Dial(ctx context.Context, typ proto.NodeType, net xnet.Networker, addr stri
// start, and if peer sends new connection in that window it will be rejected. // start, and if peer sends new connection in that window it will be rejected.
// //
// TODO thinking. // TODO thinking.
err = link.Ask1(reqID, accept)
conn, err := link.NewConn()
if err != nil {
return err
}
defer conn.Close()
err = conn.Send(reqID)
if err != nil { if err != nil {
return err return err
} }
if accept.NodeType != typ { // besides AcceptIdentification and Error
// TODO send Error to peer? // also expect NotPrimaryMaster if we are connecting to a master
return fmt.Errorf("accepted, but peer is not %s (identifies as %s)", typ, accept.NodeType) nerr := &proto.Error{}
notPrimary := &proto.NotPrimaryMaster{}
respv := []proto.Msg{accept, nerr}
if typ == proto.MASTER {
respv = append(respv, notPrimary)
}
which, err := conn.Expect(respv...)
switch which {
case 0:
if accept.NodeType != typ {
// TODO send Error to peer?
return fmt.Errorf("accepted, but peer is not %s (identifies as %s)", typ, accept.NodeType)
}
return nil
case 1:
return nerr
case 2:
return notPrimary
} }
return err
return nil
}) })
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
......
...@@ -76,10 +76,10 @@ func (cs *ClusterState) Snapshot() *ClusterStateSnapshot { ...@@ -76,10 +76,10 @@ func (cs *ClusterState) Snapshot() *ClusterStateSnapshot {
// - current partition table (how data is split around storage nodes), // - current partition table (how data is split around storage nodes),
// - current cluster state. // - current cluster state.
type Node struct { type Node struct {
MyInfo proto.NodeInfo // type, laddr, nid, state, idtime MyInfo proto.NodeInfo // type, laddr, nid, state, idtime
ClusterName string ClusterName string
Net xnet.Networker // network AP we are sending/receiving on Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master TODO -> masterRegistry MasterAddrSlice []string // address of all known masters TODO -> masterRegistry
// XXX reconsider not using State and have just .NodeTab, .PartTab, .ClusterState // XXX reconsider not using State and have just .NodeTab, .PartTab, .ClusterState
// StateMu sync.RWMutex // <- XXX unexport ? XXX not used -> move to MasteredNode ? // StateMu sync.RWMutex // <- XXX unexport ? XXX not used -> move to MasteredNode ?
...@@ -90,7 +90,7 @@ type Node struct { ...@@ -90,7 +90,7 @@ type Node struct {
} }
// NewNode creates new node. // NewNode creates new node.
func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *Node { func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddrSlice []string) *Node {
node := &Node{ node := &Node{
MyInfo: proto.NodeInfo{ MyInfo: proto.NodeInfo{
Type: typ, Type: typ,
...@@ -100,8 +100,8 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA ...@@ -100,8 +100,8 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA
}, },
ClusterName: clusterName, ClusterName: clusterName,
Net: net, Net: net,
MasterAddr: masterAddr, MasterAddrSlice: masterAddrSlice,
State: ClusterState{ State: ClusterState{
NodeTab: &NodeTable{}, NodeTab: &NodeTable{},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment