Commit 45f14a0c authored by Kirill Smelkov's avatar Kirill Smelkov


parent 51cdfc0a
......@@ -446,7 +446,8 @@ func TestNID(t *testing.T) {
func TestNIDDecode(t *testing.T) {
var testv = []struct{nid uint32; str string}{
{0, "?(0)0"},
// {0, "?(0)0"},
{0, "S?"}, // XXX X0 used as X? until neo.NewNode uses temporary bit
{0x00000001, "S1"},
{0xf0000002, "M2"},
{0xe0000003, "C3"},
......@@ -42,7 +42,6 @@ import (
// ""
// Storage is NEO node that keeps data and provides read/write access to it via network.
......@@ -51,14 +50,6 @@ import (
type Storage struct {
node *_MasteredNode
// context for providing operational service
// it is renewed every time master tells us StartOpertion, so users
// must read it initially only once under opMu via withWhileOperational.
opMu sync.Mutex
opCtx context.Context
lli xneo.Listener
back storage.Backend
......@@ -71,19 +62,10 @@ type Storage struct {
// The storage uses back as underlying backend for storing data.
// Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage.Backend) *Storage {
stor := &Storage{
return &Storage{
node: newMasteredNode(proto.STORAGE, clusterName, net, masterAddr),
back: back,
// operational context is initially done (no service should be provided)
noOpCtx, cancel := context.WithCancel(context.Background())
stor.opCtx = noOpCtx
return stor
......@@ -92,11 +74,9 @@ func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage
// The storage will be serving incoming connections on l.
func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
stor.runCtx = ctx
addr := l.Addr()
// defer task.Runningf(&ctx, "storage(%s)", addr)(&err) // XXX kill
log.Infof(ctx, "%s: listening on %s ...", stor.node.MyInfo.NID, addr)
stor.runCtx = ctx
// update our serving address in node
naddr, err := proto.Addr(addr)
......@@ -108,62 +88,23 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// wrap listener with link / identificaton hello checker
stor.lli = xneo.NewListener(neonet.NewLinkListener(l))
// wg := xsync.NewWorkGroup(ctx) // XXX derive from orig ctx
// connect to master and let it drive us via commands and updates
// wg.Go(func(ctx context.Context) error {
err = stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
// return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
// }
// let master initialize us. If successful this ends with StartOperation command.
reqStart, err := stor.m1initialize(ctx, mlink)
if err != nil {
return err
// we got StartOperation command. Let master drive us during service phase.
return stor.m1serve(ctx, mlink, reqStart)
// })
// serve incoming connections while connected to M
wg.Go(func(ctx context.Context) (err error) {
defer task.Running(&ctx, "accept")(&err)
serveWG := sync.WaitGroup{}
defer serveWG.Wait()
// XXX dup from master -> Node.Listen() -> Accept() ?
// XXX ? -> Node.Accept(lli) (it will verify IdTime against Node.nodeTab[nid])
// XXX ? -> Node.Serve(lli -> func(idReq))
for {
if ctx.Err() != nil {
return ctx.Err()
req, idReq, err := lli.Accept(ctx)
if err != nil {
if !xxcontext.Canceled(err) {
log.Error(ctx, err) // XXX throttle?
go func() {
defer serveWG.Done()
stor.serveLink(ctx, req, idReq) // XXX ignore err? -> logged
err = stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
// return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
// }
// let master initialize us. If successful this ends with StartOperation command.
reqStart, err := stor.m1initialize(ctx, mlink)
if err != nil {
return err
err = wg.Wait()
// we got StartOperation command. Let master drive us during service phase.
return stor.m1serve(ctx, mlink, reqStart)
// XXX should Storage do it, or should it leave back non-closed?
// TODO -> Storage should not close backend.
......@@ -270,15 +211,6 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
func (stor *Storage) m1serve(ctx context.Context, mlink *_MasterLink, reqStart *neonet.Request) (err error) {
defer task.Runningf(&ctx, "mserve")(&err)
// refresh stor.opCtx and cancel it when we finish so that client
// handlers know they need to stop operating as master told us to do so.
opCtx, opCancel := context.WithCancel(ctx)
stor.opCtx = opCtx
defer opCancel()
// serve clients while operational
serveCtx := taskctx.Runningf(stor.runCtx, "%s", stor.node.MyInfo.NID)
serveCtx, serveCancel := xcontext.Merge/*Cancel*/(serveCtx, ctx)
......@@ -386,17 +318,6 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *
return nil, &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}
// check operational
operational := (stor.opCtx.Err() == nil)
if !operational {
return nil, &proto.Error{proto.NOT_READY, "cluster not operational"}
return &proto.AcceptIdentification{
NodeType: stor.node.MyInfo.Type,
MyNID: stor.node.MyInfo.NID, // XXX lock wrt update
......@@ -405,20 +326,6 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *
// withWhileOperational derives new context from ctx which will be cancelled, when either
// - ctx is cancelled, or
// - master tells us to stop operational service
func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, context.CancelFunc) {
opCtx := stor.opCtx
return xcontext.MergeCancel(ctx, opCtx)
// serveLink serves incoming node-node link connection.
func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *proto.RequestIdentification) (err error) {
link := req.Link()
......@@ -438,13 +345,6 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *
// client passed identification, now serve other requests
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
wg := sync.WaitGroup{} // XXX -> errgroup?
for {
req, err := link.Recv1()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment