Commit bf240897 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/*: Sync with NEO/py on changes to make the number of replicas modifiable at runtime

This corresponds to NEO/py commit ef5fc508 (Make the number of replicas
modifiable when the cluster is running).

One important change in the protocol is that Client no longer queries
Master for partition table - instead M pushed partTab to C right after
identification (after pushing nodeTab).

See also: https://neo.nexedi.com/P-NEO-Protocol.Specification.2019?portal_skin=CI_slideshow#/9/5
parent f609d6df
......@@ -258,6 +258,37 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
c.node.MyInfo.UUID = accept.YourUUID
}
wg, ctx := errgroup.WithContext(ctx) // XXX -> xsync.WorkGroup
defer xio.CloseWhenDone(ctx, mlink)()
// master pushes nodeTab and partTab to us right after identification
// XXX merge into -> node.DialMaster?
// nodeTab
mnt := proto.NotifyNodeInformation{}
_, err = mlink.Expect1(&mnt)
if err != nil {
return fmt.Errorf("after identification: %w", err)
}
// partTab
mpt := proto.SendPartitionTable{}
_, err = mlink.Expect1(&mpt)
if err != nil {
return fmt.Errorf("after identification: %w", err)
}
pt := PartTabFromDump(mpt.PTid, mpt.RowList) // TODO handle mpt.NumReplicas
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
c.node.StateMu.Lock()
c.node.UpdateNodeTab(ctx, &mnt)
c.node.PartTab = pt
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
// set c.mlink and notify waiters
c.mlinkMu.Lock()
c.mlink = mlink
......@@ -266,10 +297,6 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
c.mlinkMu.Unlock()
close(ready)
wg, ctx := errgroup.WithContext(ctx) // XXX -> xsync.WorkGroup
defer xio.CloseWhenDone(ctx, mlink)()
// when we are done - reset .mlink
defer func() {
c.mlinkMu.Lock()
......@@ -298,21 +325,6 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "init")(&err)
// query partTab
rpt := proto.AnswerPartitionTable{}
err = mlink.Ask1(&proto.AskPartitionTable{}, &rpt)
if err != nil {
return err
}
pt := PartTabFromDump(rpt.PTid, rpt.RowList)
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
c.node.StateMu.Lock()
c.node.PartTab = pt
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
// query last_tid
lastTxn := proto.AnswerLastTransaction{}
err = mlink.Ask1(&proto.LastTransaction{}, &lastTxn)
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -505,7 +505,7 @@ func storCtlRecovery(ctx context.Context, stor *Node, res chan storRecovery) {
}
// reconstruct partition table from response
pt := PartTabFromDump(resp.PTid, resp.RowList)
pt := PartTabFromDump(resp.PTid, resp.RowList) // TODO handle resp.NumReplicas
res <- storRecovery{stor: stor, partTab: pt}
}
......@@ -695,8 +695,9 @@ func storCtlVerify(ctx context.Context, stor *Node, pt *PartitionTable, res chan
// send just recovered parttab so storage saves it
err = slink.Send1(&proto.SendPartitionTable{
PTid: pt.PTid,
RowList: pt.Dump(),
PTid: pt.PTid,
NumReplicas: 0, // FIXME hardcoded
RowList: pt.Dump(),
})
if err != nil {
return
......@@ -898,6 +899,9 @@ func (m *Master) serveClient(ctx context.Context, cli *Node) (err error) {
wg, ctx := errgroup.WithContext(ctx)
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.CloseLink?
// FIXME send initial nodeTab and partTab before starting serveClient1
// (move those initial sends from keepPeerUpdated to here)
// M -> C notifications about cluster state
wg.Go(func() error {
return m.keepPeerUpdated(ctx, clink)
......@@ -926,15 +930,6 @@ func (m *Master) serveClient(ctx context.Context, cli *Node) (err error) {
// serveClient1 prepares response for 1 request from client
func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Msg) {
switch req := req.(type) {
case *proto.AskPartitionTable:
m.node.StateMu.RLock()
rpt := &proto.AnswerPartitionTable{
PTid: m.node.PartTab.PTid,
RowList: m.node.PartTab.Dump(),
}
m.node.StateMu.RUnlock()
return rpt
case *proto.LastTransaction:
// XXX lock
return &proto.AnswerLastTransaction{m.lastTid}
......@@ -965,6 +960,10 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
nodeiv[i] = node.NodeInfo
}
ptid := m.node.PartTab.PTid
ptnr := uint32(0) // FIXME hardcoded NumReplicas; NEO/py keeps this as n(replica)-1
ptv := m.node.PartTab.Dump()
// XXX RLock is not enough for subscribe - right?
nodech, nodeUnsubscribe := m.node.NodeTab.SubscribeBuffered()
......@@ -990,6 +989,16 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
return err
}
err = link.Send1(&proto.SendPartitionTable{
PTid: ptid,
NumReplicas: ptnr,
RowList: ptv,
})
if err != nil {
return err
}
// now proxy the updates until we are done
for {
var msg proto.Msg
......@@ -1081,8 +1090,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *Node, resp pro
accept := &proto.AcceptIdentification{
NodeType: proto.MASTER,
MyUUID: m.node.MyInfo.UUID,
NumPartitions: 1, // FIXME hardcoded
NumReplicas: 0, // FIXME hardcoded (neo/py meaning for n(replica) is `n(real-replica) - 1`)
YourUUID: uuid,
}
......
......@@ -324,7 +324,7 @@ func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *proto.NotifyNodeInfo
// UpdatePartTab applies updates to .PartTab from message and logs changes appropriately.
func (app *NodeApp) UpdatePartTab(ctx context.Context, msg *proto.SendPartitionTable) {
pt := PartTabFromDump(msg.PTid, msg.RowList)
pt := PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock
log.Infof(ctx, "parttab update: %v", pt)
app.PartTab = pt
......
......@@ -118,8 +118,6 @@ func TestMasterStorage(t0 *testing.T) {
tSM.Expect(conntx("m:2", "s:2", 1, &proto.AcceptIdentification{
NodeType: proto.MASTER,
MyUUID: proto.UUID(proto.MASTER, 1),
NumPartitions: 1,
NumReplicas: 0,
YourUUID: proto.UUID(proto.STORAGE, 1),
}))
......@@ -137,6 +135,7 @@ func TestMasterStorage(t0 *testing.T) {
tMS.Expect(conntx("m:2", "s:2", 2, &proto.AskPartitionTable{}))
tMS.Expect(conntx("s:2", "m:2", 2, &proto.AnswerPartitionTable{
PTid: 0,
NumReplicas: 0,
RowList: []proto.RowInfo{},
}))
......@@ -163,8 +162,9 @@ func TestMasterStorage(t0 *testing.T) {
tMS.Expect(conntx("m:2", "s:2", 4, &proto.SendPartitionTable{
PTid: 1,
NumReplicas: 0,
RowList: []proto.RowInfo{
{0, []proto.CellInfo{{proto.UUID(proto.STORAGE, 1), proto.UP_TO_DATE}}},
{[]proto.CellInfo{{proto.UUID(proto.STORAGE, 1), proto.UP_TO_DATE}}},
},
}))
......@@ -220,25 +220,10 @@ func TestMasterStorage(t0 *testing.T) {
tCM.Expect(conntx("m:3", "c:1", 1, &proto.AcceptIdentification{
NodeType: proto.MASTER,
MyUUID: proto.UUID(proto.MASTER, 1),
NumPartitions: 1,
NumReplicas: 0,
YourUUID: proto.UUID(proto.CLIENT, 1),
}))
// C asks M about PT and last_tid
// NOTE this might come in parallel with vvv "C <- M NotifyNodeInformation C1,M1,S1"
tCM.Expect(conntx("c:1", "m:3", 3, &proto.AskPartitionTable{}))
tCM.Expect(conntx("m:3", "c:1", 3, &proto.AnswerPartitionTable{
PTid: 1,
RowList: []proto.RowInfo{
{0, []proto.CellInfo{{proto.UUID(proto.STORAGE, 1), proto.UP_TO_DATE}}},
},
}))
tCM.Expect(conntx("c:1", "m:3", 5, &proto.LastTransaction{}))
tCM.Expect(conntx("m:3", "c:1", 5, &proto.AnswerLastTransaction{lastTid}))
// C <- M NotifyNodeInformation C1,M1,S1
// NOTE this might come in parallel with ^^^ "C asks M about PT"
tMC.Expect(conntx("m:3", "c:1", 0, &proto.NotifyNodeInformation{
IdTime: proto.IdTimeNone, // XXX ?
NodeList: []proto.NodeInfo{
......@@ -248,10 +233,22 @@ func TestMasterStorage(t0 *testing.T) {
},
}))
tMC.Expect(conntx("m:3", "c:1", 2, &proto.SendPartitionTable{
PTid: 1,
NumReplicas: 0,
RowList: []proto.RowInfo{
{[]proto.CellInfo{{proto.UUID(proto.STORAGE, 1), proto.UP_TO_DATE}}},
},
}))
tC.Expect(δnode("c", "m:1", proto.MASTER, 1, proto.RUNNING, proto.IdTimeNone))
tC.Expect(δnode("c", "s:1", proto.STORAGE, 1, proto.RUNNING, 0.01))
tC.Expect(δnode("c", "", proto.CLIENT, 1, proto.RUNNING, 0.02))
// C asks M last_tid
tCM.Expect(conntx("c:1", "m:3", 3, &proto.LastTransaction{}))
tCM.Expect(conntx("m:3", "c:1", 3, &proto.AnswerLastTransaction{lastTid}))
// ----------------------------------------
// C asks M about last tid XXX better master sends it itself on new client connected
......@@ -264,8 +261,8 @@ func TestMasterStorage(t0 *testing.T) {
}
})
tCM.Expect(conntx("c:1", "m:3", 7, &proto.LastTransaction{}))
tCM.Expect(conntx("m:3", "c:1", 7, &proto.AnswerLastTransaction{
tCM.Expect(conntx("c:1", "m:3", 5, &proto.LastTransaction{}))
tCM.Expect(conntx("m:3", "c:1", 5, &proto.AnswerLastTransaction{
Tid: lastTid,
}))
......@@ -305,8 +302,6 @@ func TestMasterStorage(t0 *testing.T) {
tCS.Expect(conntx("s:3", "c:2", 1, &proto.AcceptIdentification{
NodeType: proto.STORAGE,
MyUUID: proto.UUID(proto.STORAGE, 1),
NumPartitions: 1,
NumReplicas: 0,
YourUUID: proto.UUID(proto.CLIENT, 1),
}))
......
......@@ -58,7 +58,7 @@
// object. The Request contains the message received and internally the
// connection. A response can be sent back via Request.Reply. Then once
// Request.Close is called the connection object that was accepted is
// immediately put back into pool for later reuse.
// immediately put back into pool for later reuse. XXX Expect1
package neonet
// XXX neonet compatibility with NEO/py depends on the following small NEO/py patch:
......@@ -1552,6 +1552,24 @@ func (link *NodeLink) Send1(msg proto.Msg) error {
return err
}
// Expect1 receives notification in 1-1 model.
//
// See Conn.Expect for semantic details.
//
// See "Lightweight mode" in top-level package doc for overview.
func (link *NodeLink) Expect1(msgv ...proto.Msg) (which int, err error) {
// XXX a bit dup wrt Recv1
conn, err := link.Accept()
if err != nil {
return -1, err
}
// NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq
which, err = conn.Expect(msgv...)
conn.lightClose()
return which, err
}
// Ask1 sends request and receives response in 1-1 model.
//
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
// Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
......@@ -254,7 +254,7 @@ func (pt *PartitionTable) Dump() []proto.RowInfo { // XXX also include .ptid? ->
cellv[j] = cell.CellInfo
}
rowv[i] = proto.RowInfo{Offset: uint32(i), CellList: cellv} // XXX cast?
rowv[i] = proto.RowInfo{CellList: cellv}
}
return rowv
}
......@@ -264,9 +264,8 @@ func PartTabFromDump(ptid proto.PTid, rowv []proto.RowInfo) *PartitionTable {
pt := &PartitionTable{}
pt.PTid = ptid
for _, row := range rowv {
i := row.Offset
for i >= uint32(len(pt.tab)) {
for i, row := range rowv {
for i >= len(pt.tab) {
pt.tab = append(pt.tab, []Cell{})
}
......
......@@ -346,7 +346,6 @@ type CellInfo struct {
//neo:proto typeonly
type RowInfo struct {
Offset uint32 // PNumber XXX -> Pid
CellList []CellInfo
}
......@@ -382,8 +381,6 @@ type RequestIdentification struct {
type AcceptIdentification struct {
NodeType NodeType // XXX name
MyUUID NodeUUID
NumPartitions uint32 // PNumber
NumReplicas uint32 // PNumber
YourUUID NodeUUID
}
......@@ -457,23 +454,24 @@ type AnswerLastIDs struct {
}
// Ask storage node the remaining data needed by master to recover.
// This is also how the clients get the full partition table on connection.
//
//neo:nodes M -> S; C -> M
//neo:nodes M -> S
type AskPartitionTable struct {
}
type AnswerPartitionTable struct {
PTid
RowList []RowInfo
NumReplicas uint32 // PNumber
RowList []RowInfo
}
// Send the full partition table to admin/storage nodes on connection.
// Send the full partition table to admin/client/storage nodes on connection.
//
//neo:nodes M -> A, S
//neo:nodes M -> A, C, S
type SendPartitionTable struct {
PTid
RowList []RowInfo
NumReplicas uint32 // PNumber
RowList []RowInfo
}
// Notify about changes in the partition table.
......@@ -481,7 +479,8 @@ type SendPartitionTable struct {
//neo:nodes M -> *
type NotifyPartitionChanges struct {
PTid
CellList []struct {
NumReplicas uint32 // PNumber
CellList []struct {
Offset uint32 // PNumber XXX -> Pid
CellInfo CellInfo
}
......@@ -850,6 +849,13 @@ type TweakPartitionTable struct {
// answer = Error
}
// Set the number of replicas.
//
//neo:nodes ctl -> A -> M
type SetNumReplicas struct {
NumReplicas uint32 // PNumber
}
// Set the cluster state.
//
//neo:nodes ctl -> A -> M
......
......@@ -189,19 +189,21 @@ func TestMsgMarshal(t *testing.T) {
// PTid, [] (of [] of {UUID, CellState})
{&AnswerPartitionTable{
PTid: 0x0102030405060708,
PTid: 0x0102030405060708,
NumReplicas: 34,
RowList: []RowInfo{
{1, []CellInfo{{11, UP_TO_DATE}, {17, OUT_OF_DATE}}},
{2, []CellInfo{{11, FEEDING}}},
{7, []CellInfo{{11, CORRUPTED}, {15, DISCARDED}, {23, UP_TO_DATE}}},
{[]CellInfo{{11, UP_TO_DATE}, {17, OUT_OF_DATE}}},
{[]CellInfo{{11, FEEDING}}},
{[]CellInfo{{11, CORRUPTED}, {15, DISCARDED}, {23, UP_TO_DATE}}},
},
},
hex("0102030405060708") +
hex("00000022") +
hex("00000003") +
hex("00000001000000020000000b010000001100") +
hex("00000002000000010000000b02") +
hex("00000007000000030000000b030000000f040000001701"),
hex("000000020000000b010000001100") +
hex("000000010000000b02") +
hex("000000030000000b030000000f040000001701"),
},
// map[Oid]struct {Tid,Tid,bool}
......
This diff is collapsed.
......@@ -42,31 +42,32 @@ var pyMsgRegistry = map[uint16]string{
38: "SetNodeState",
39: "AddPendingNodes",
40: "TweakPartitionTable",
41: "SetClusterState",
42: "Repair",
43: "RepairOne",
44: "NotifyClusterState",
45: "AskClusterState",
46: "ObjectUndoSerial",
47: "AskTIDsFrom",
48: "Pack",
49: "CheckReplicas",
50: "CheckPartition",
51: "CheckTIDRange",
52: "CheckSerialRange",
53: "PartitionCorrupted",
54: "NotifyReady",
55: "LastTransaction",
56: "CheckCurrentSerial",
57: "NotifyTransactionFinished",
58: "Replicate",
59: "ReplicationDone",
60: "FetchTransactions",
61: "FetchObjects",
62: "AddTransaction",
63: "AddObject",
64: "Truncate",
65: "FlushLog",
41: "SetNumReplicas",
42: "SetClusterState",
43: "Repair",
44: "RepairOne",
45: "NotifyClusterState",
46: "AskClusterState",
47: "ObjectUndoSerial",
48: "AskTIDsFrom",
49: "Pack",
50: "CheckReplicas",
51: "CheckPartition",
52: "CheckTIDRange",
53: "CheckSerialRange",
54: "PartitionCorrupted",
55: "NotifyReady",
56: "LastTransaction",
57: "CheckCurrentSerial",
58: "NotifyTransactionFinished",
59: "Replicate",
60: "ReplicationDone",
61: "FetchTransactions",
62: "FetchObjects",
63: "AddTransaction",
64: "AddObject",
65: "Truncate",
66: "FlushLog",
32768: "Error",
32769: "AcceptIdentification",
32770: "Pong",
......@@ -92,14 +93,14 @@ var pyMsgRegistry = map[uint16]string{
32803: "AnswerObjectHistory",
32804: "AnswerPartitionList",
32805: "AnswerNodeList",
32813: "AnswerClusterState",
32814: "AnswerObjectUndoSerial",
32815: "AnswerTIDsFrom",
32816: "AnswerPack",
32819: "AnswerCheckTIDRange",
32820: "AnswerCheckSerialRange",
32823: "AnswerLastTransaction",
32824: "AnswerCheckCurrentSerial",
32828: "AnswerFetchTransactions",
32829: "AnswerFetchObjects",
32814: "AnswerClusterState",
32815: "AnswerObjectUndoSerial",
32816: "AnswerTIDsFrom",
32817: "AnswerPack",
32820: "AnswerCheckTIDRange",
32821: "AnswerCheckSerialRange",
32824: "AnswerLastTransaction",
32825: "AnswerCheckCurrentSerial",
32829: "AnswerFetchTransactions",
32830: "AnswerFetchObjects",
}
......@@ -203,10 +203,13 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX add master UUID -> nodeTab ? or master will notify us with it himself ?
// XXX move -> SetNumReplicas handler
/*
// NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
}
*/
// XXX -> node.Dial ?
if accept.YourUUID != stor.node.MyInfo.UUID {
......@@ -284,8 +287,9 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
case *proto.AskPartitionTable:
// TODO initially read PT from disk
err = req.Reply(&proto.AnswerPartitionTable{
PTid: stor.node.PartTab.PTid,
RowList: stor.node.PartTab.Dump()})
PTid: stor.node.PartTab.PTid,
NumReplicas: 0, // FIXME hardcoded; NEO/py uses this as n(replica)-1
RowList: stor.node.PartTab.Dump()})
case *proto.LockedTransactions:
// XXX r/o stub
......@@ -304,7 +308,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
case *proto.SendPartitionTable:
// TODO M sends us whole PT -> save locally
stor.node.UpdatePartTab(ctx, msg) // XXX lock?
stor.node.UpdatePartTab(ctx, msg) // XXX lock? XXX handle msg.NumReplicas
case *proto.NotifyPartitionChanges:
// TODO M sends us δPT -> save locally?
......@@ -412,8 +416,6 @@ func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bo
return &proto.AcceptIdentification{
NodeType: stor.node.MyInfo.Type,
MyUUID: stor.node.MyInfo.UUID, // XXX lock wrt update
NumPartitions: 1, // XXX
NumReplicas: 0, // XXX
YourUUID: idReq.UUID,
}, true
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment