Commit 8e84ac9e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 250c5485
...@@ -98,6 +98,15 @@ func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ * ...@@ -98,6 +98,15 @@ func (n *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *
accept := &AcceptIdentification{} accept := &AcceptIdentification{}
// FIXME error if peer sends us something with another connID // FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock) // (currently we ignore and serveRecv will deadlock)
//
// XXX solution could be:
// link.CloseAccept()
// link.Ask1(req, accept)
// link.Listen()
// XXX but there is a race window in between recv in ask and listen
// start, and if peer sends new connection in that window it will be rejected.
//
// TODO thinking.
err = link.Ask1(req, accept) err = link.Ask1(req, accept)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
...@@ -163,11 +172,13 @@ type Listener interface { ...@@ -163,11 +172,13 @@ type Listener interface {
// packet which we did not yet answer. // packet which we did not yet answer.
// //
// On success returned are: // On success returned are:
// - primary link connection which carried identification // - original peer request that carried identification
// - requested identification packet // - requested identification packet
// //
// XXX Conn, RequestIdentification -> Request // After successful accept it is the caller responsibility to reply the request.
Accept(ctx context.Context) (*Conn, *RequestIdentification, error) //
// NOTE established link is Request.Link().
Accept(ctx context.Context) (*Request, *RequestIdentification, error)
} }
type listener struct { type listener struct {
...@@ -177,7 +188,7 @@ type listener struct { ...@@ -177,7 +188,7 @@ type listener struct {
} }
type accepted struct { type accepted struct {
conn *Conn req *Request
idReq *RequestIdentification idReq *RequestIdentification
err error err error
} }
...@@ -206,8 +217,8 @@ func (l *listener) run() { ...@@ -206,8 +217,8 @@ func (l *listener) run() {
func (l *listener) accept(link *NodeLink, err error) { func (l *listener) accept(link *NodeLink, err error) {
res := make(chan accepted, 1) res := make(chan accepted, 1)
go func() { go func() {
conn, idReq, err := l.accept1(context.Background(), link, err) // XXX ctx cancel on l close? req, idReq, err := l.accept1(context.Background(), link, err) // XXX ctx cancel on l close?
res <- accepted{conn, idReq, err} res <- accepted{req, idReq, err}
}() }()
// wait for accept1 result & resend it to .acceptq // wait for accept1 result & resend it to .acceptq
...@@ -233,7 +244,7 @@ func (l *listener) accept(link *NodeLink, err error) { ...@@ -233,7 +244,7 @@ func (l *listener) accept(link *NodeLink, err error) {
} }
} }
func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *Conn, _ *RequestIdentification, err error) { func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *Request, _ *RequestIdentification, err error) {
if err0 != nil { if err0 != nil {
return nil, nil, err0 return nil, nil, err0
} }
...@@ -242,28 +253,22 @@ func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ * ...@@ -242,28 +253,22 @@ func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *
// identify peer // identify peer
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
conn, err := link.Accept(/*ctx*/) req, err := link.Recv1(/*ctx*/)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
// NOTE NodeLink currently guarantees that after link.Accept() there is switch msg := req.Msg.(type) {
// at least 1 packet in accepted conn. This way the following won't case *RequestIdentification:
// block/deadlock if packets with other ConnID comes. return &req, msg, nil
// Still it is a bit fragile.
idReq := &RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err2 := conn.Send(&Error{PROTOCOL_ERROR, err.Error()})
err = xerr.Merge(err, err2)
return nil, nil, err
} }
return conn, idReq, nil emsg := &Error{PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)}
req.Reply(emsg) // XXX err
return nil, nil, emsg
} }
func (l *listener) Accept(ctx context.Context) (*Conn, *RequestIdentification, error) { func (l *listener) Accept(ctx context.Context) (*Request, *RequestIdentification, error) {
select{ select{
case <-l.closed: case <-l.closed:
// we know raw listener is already closed - return proper error about it // we know raw listener is already closed - return proper error about it
...@@ -274,7 +279,7 @@ func (l *listener) Accept(ctx context.Context) (*Conn, *RequestIdentification, e ...@@ -274,7 +279,7 @@ func (l *listener) Accept(ctx context.Context) (*Conn, *RequestIdentification, e
return nil, nil, ctx.Err() return nil, nil, ctx.Err()
case a := <-l.acceptq: case a := <-l.acceptq:
return a.conn, a.idReq, a.err return a.req, a.idReq, a.err
} }
} }
......
...@@ -71,7 +71,7 @@ type Master struct { ...@@ -71,7 +71,7 @@ type Master struct {
// event: node connects // event: node connects
type nodeCome struct { type nodeCome struct {
conn *neo.Conn req *neo.Request
idReq *neo.RequestIdentification // we received this identification request idReq *neo.RequestIdentification // we received this identification request
} }
...@@ -188,7 +188,7 @@ func (m *Master) Run(ctx context.Context) (err error) { ...@@ -188,7 +188,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
// XXX dup in storage // XXX dup in storage
for serveCtx.Err() == nil { for serveCtx.Err() == nil {
conn, idReq, err := l.Accept(serveCtx) req, idReq, err := l.Accept(serveCtx)
if err != nil { if err != nil {
// TODO log / throttle // TODO log / throttle
continue continue
...@@ -204,17 +204,17 @@ func (m *Master) Run(ctx context.Context) (err error) { ...@@ -204,17 +204,17 @@ func (m *Master) Run(ctx context.Context) (err error) {
case neo.STORAGE: case neo.STORAGE:
fallthrough fallthrough
default: default:
conn.Link().CloseAccept() req.Link().CloseAccept()
} }
// handover to main driver // handover to main driver
select { select {
case m.nodeCome <- nodeCome{conn, idReq}: case m.nodeCome <- nodeCome{req, idReq}:
// ok // ok
case <-serveCtx.Done(): case <-serveCtx.Done():
// shutdown // shutdown
lclose(serveCtx, conn.Link()) lclose(serveCtx, req.Link())
return return
} }
} }
...@@ -336,7 +336,7 @@ loop: ...@@ -336,7 +336,7 @@ loop:
node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */) node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
if node == nil { if node == nil {
goreject(ctx, wg, n.conn, resp) goreject(ctx, wg, n.req, resp)
break break
} }
...@@ -346,7 +346,7 @@ loop: ...@@ -346,7 +346,7 @@ loop:
go func() { go func() {
defer wg.Done() defer wg.Done()
err := m.accept(ctx, n.conn, resp) err := m.accept(ctx, n.req, resp)
if err != nil { if err != nil {
recovery <- storRecovery{stor: node, err: err} recovery <- storRecovery{stor: node, err: err}
return return
...@@ -583,7 +583,7 @@ loop: ...@@ -583,7 +583,7 @@ loop:
node, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */) node, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
if node == nil { if node == nil {
goreject(ctx, wg, n.conn, resp) goreject(ctx, wg, n.req, resp)
break break
} }
...@@ -593,7 +593,7 @@ loop: ...@@ -593,7 +593,7 @@ loop:
go func() { go func() {
defer wg.Done() defer wg.Done()
err := m.accept(ctx, n.conn, resp) err := m.accept(ctx, n.req, resp)
if err != nil { if err != nil {
verify <- storVerify{stor: node, err: err} verify <- storVerify{stor: node, err: err}
return return
...@@ -791,7 +791,7 @@ loop: ...@@ -791,7 +791,7 @@ loop:
node, resp := m.identify(ctx, n, /* XXX accept everyone */) node, resp := m.identify(ctx, n, /* XXX accept everyone */)
if node == nil { if node == nil {
goreject(ctx, wg, n.conn, resp) goreject(ctx, wg, n.req, resp)
break break
} }
...@@ -799,7 +799,7 @@ loop: ...@@ -799,7 +799,7 @@ loop:
go func() { go func() {
defer wg.Done() defer wg.Done()
err = m.accept(ctx, n.conn, resp) err = m.accept(ctx, n.req, resp)
if err != nil { if err != nil {
serviced <- serviceDone{node: node, err: err} serviced <- serviceDone{node: node, err: err}
return return
...@@ -1082,7 +1082,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -1082,7 +1082,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
return nil return nil
}() }()
subj := fmt.Sprintf("identify: %s (%s)", n.conn.Link().RemoteAddr(), n.idReq.UUID) subj := fmt.Sprintf("identify: %s (%s)", n.req.Link().RemoteAddr(), n.idReq.UUID)
if err != nil { if err != nil {
log.Infof(ctx, "%s: rejecting: %s", subj, err) log.Infof(ctx, "%s: rejecting: %s", subj, err)
return nil, err return nil, err
...@@ -1118,36 +1118,35 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -1118,36 +1118,35 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
} }
node = m.node.NodeTab.Update(nodeInfo) // NOTE this notifies all nodeTab subscribers node = m.node.NodeTab.Update(nodeInfo) // NOTE this notifies all nodeTab subscribers
node.SetLink(n.conn.Link()) node.SetLink(n.req.Link())
return node, accept return node, accept
} }
// reject sends rejective identification response and closes associated link // reject sends rejective identification response and closes associated link
func reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) { func reject(ctx context.Context, req *neo.Request, resp neo.Msg) {
// XXX cancel on ctx? // XXX cancel on ctx?
// XXX log? // XXX log?
err1 := conn.Send(resp) err1 := req.Reply(resp)
err2 := conn.Close() err2 := req.Link().Close()
err3 := conn.Link().Close() err := xerr.Merge(err1, err2)
err := xerr.Merge(err1, err2, err3)
if err != nil { if err != nil {
log.Error(ctx, "reject:", err) log.Error(ctx, "reject:", err)
} }
} }
// goreject spawns reject in separate goroutine properly added/done on wg // goreject spawns reject in separate goroutine properly added/done on wg
func goreject(ctx context.Context, wg *sync.WaitGroup, conn *neo.Conn, resp neo.Msg) { func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp neo.Msg) {
wg.Add(1) wg.Add(1)
defer wg.Done() defer wg.Done()
go reject(ctx, conn, resp) go reject(ctx, req, resp)
} }
// accept sends acceptive identification response and closes conn // accept replies with acceptive identification response
// XXX if problem -> .nodeLeave // XXX if problem -> .nodeLeave
// XXX spawn ping goroutine from here? // XXX spawn ping goroutine from here?
func (m *Master) accept(ctx context.Context, conn *neo.Conn, resp neo.Msg) error { func (m *Master) accept(ctx context.Context, req *neo.Request, resp neo.Msg) error {
// XXX cancel on ctx // XXX cancel on ctx
err1 := conn.Send(resp) err1 := req.Reply(resp)
return err1 // XXX while trying to work on single conn return err1 // XXX while trying to work on single conn
//err2 := conn.Close() //err2 := conn.Close()
//return xerr.First(err1, err2) //return xerr.First(err1, err2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment