Commit 889b8d42 authored by Kirill Smelkov's avatar Kirill Smelkov

X move network bits into neo/neonet/

parent 47f069a5
...@@ -36,6 +36,7 @@ import ( ...@@ -36,6 +36,7 @@ import (
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/internal/common" "lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -53,7 +54,7 @@ type Client struct { ...@@ -53,7 +54,7 @@ type Client struct {
// link to master - established and maintained by talkMaster. // link to master - established and maintained by talkMaster.
// users retrieve it via masterLink. // users retrieve it via masterLink.
mlinkMu sync.Mutex mlinkMu sync.Mutex
mlink *neo.NodeLink mlink *neonet.NodeLink
mlinkReady chan struct{} // reinitialized at each new talk cycle mlinkReady chan struct{} // reinitialized at each new talk cycle
// operational state in node is maintained by recvMaster. // operational state in node is maintained by recvMaster.
...@@ -108,7 +109,7 @@ func (c *Client) Close() error { ...@@ -108,7 +109,7 @@ func (c *Client) Close() error {
// NOTE that even if masterLink returns != nil, the master link can become // NOTE that even if masterLink returns != nil, the master link can become
// non-operational at any later time. (such cases will be reported as // non-operational at any later time. (such cases will be reported as
// ErrLinkDown returned by all mlink operations) // ErrLinkDown returned by all mlink operations)
func (c *Client) masterLink(ctx context.Context) (*neo.NodeLink, error) { func (c *Client) masterLink(ctx context.Context) (*neonet.NodeLink, error) {
for { for {
c.mlinkMu.Lock() c.mlinkMu.Lock()
mlink := c.mlink mlink := c.mlink
...@@ -274,7 +275,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -274,7 +275,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
} }
// recvMaster receives and handles notifications from master // recvMaster receives and handles notifications from master
func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) (err error) { func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "rx")(&err) defer task.Running(&ctx, "rx")(&err)
// XXX .nodeTab.Reset() // XXX .nodeTab.Reset()
...@@ -294,7 +295,7 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) (err error ...@@ -294,7 +295,7 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) (err error
} }
// recvMaster1 handles 1 message from master // recvMaster1 handles 1 message from master
func (c *Client) recvMaster1(ctx context.Context, req neo.Request) error { func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
c.node.StateMu.Lock() c.node.StateMu.Lock()
switch msg := req.Msg.(type) { switch msg := req.Msg.(type) {
...@@ -325,7 +326,7 @@ func (c *Client) recvMaster1(ctx context.Context, req neo.Request) error { ...@@ -325,7 +326,7 @@ func (c *Client) recvMaster1(ctx context.Context, req neo.Request) error {
return nil return nil
} }
func (c *Client) initFromMaster(ctx context.Context, mlink *neo.NodeLink) (err error) { func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "init")(&err) defer task.Running(&ctx, "init")(&err)
// ask M for PT // ask M for PT
......
...@@ -39,6 +39,7 @@ import ( ...@@ -39,6 +39,7 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/task" "lab.nexedi.com/kirr/neo/go/xcommon/task"
//"lab.nexedi.com/kirr/neo/go/xcommon/xio" //"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
) )
...@@ -92,10 +93,10 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr, ...@@ -92,10 +93,10 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr,
// //
// Dial does not update .NodeTab or its node entries in any way. // Dial does not update .NodeTab or its node entries in any way.
// For establishing links to peers present in .NodeTab use Node.Dial. // For establishing links to peers present in .NodeTab use Node.Dial.
func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *NodeLink, _ *proto.AcceptIdentification, err error) { func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err) defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err)
link, err := DialLink(ctx, app.Net, addr) link, err := neonet.DialLink(ctx, app.Net, addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
...@@ -159,7 +160,7 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri ...@@ -159,7 +160,7 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri
// The node information about where it listens at is appropriately updated. // The node information about where it listens at is appropriately updated.
func (app *NodeApp) Listen() (Listener, error) { func (app *NodeApp) Listen() (Listener, error) {
// start listening // start listening
ll, err := ListenLink(app.Net, app.MyInfo.Addr.String()) ll, err := neonet.ListenLink(app.Net, app.MyInfo.Addr.String())
if err != nil { if err != nil {
return nil, err // XXX err ctx return nil, err // XXX err ctx
} }
...@@ -205,17 +206,17 @@ type Listener interface { ...@@ -205,17 +206,17 @@ type Listener interface {
// After successful accept it is the caller responsibility to reply the request. // After successful accept it is the caller responsibility to reply the request.
// //
// NOTE established link is Request.Link(). // NOTE established link is Request.Link().
Accept(ctx context.Context) (*Request, *proto.RequestIdentification, error) Accept(ctx context.Context) (*neonet.Request, *proto.RequestIdentification, error)
} }
type listener struct { type listener struct {
l LinkListener l neonet.LinkListener
acceptq chan accepted acceptq chan accepted
closed chan struct {} closed chan struct {}
} }
type accepted struct { type accepted struct {
req *Request req *neonet.Request
idReq *proto.RequestIdentification idReq *proto.RequestIdentification
err error err error
} }
...@@ -243,7 +244,7 @@ func (l *listener) run() { ...@@ -243,7 +244,7 @@ func (l *listener) run() {
} }
} }
func (l *listener) accept(link *NodeLink, err error) { func (l *listener) accept(link *neonet.NodeLink, err error) {
res := make(chan accepted, 1) res := make(chan accepted, 1)
go func() { go func() {
req, idReq, err := l.accept1(context.Background(), link, err) // XXX ctx cancel on l close? req, idReq, err := l.accept1(context.Background(), link, err) // XXX ctx cancel on l close?
...@@ -273,7 +274,7 @@ func (l *listener) accept(link *NodeLink, err error) { ...@@ -273,7 +274,7 @@ func (l *listener) accept(link *NodeLink, err error) {
} }
} }
func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *Request, _ *proto.RequestIdentification, err error) { func (l *listener) accept1(ctx context.Context, link *neonet.NodeLink, err0 error) (_ *neonet.Request, _ *proto.RequestIdentification, err error) {
if err0 != nil { if err0 != nil {
return nil, nil, err0 return nil, nil, err0
} }
...@@ -297,7 +298,7 @@ func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ * ...@@ -297,7 +298,7 @@ func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *
return nil, nil, emsg return nil, nil, emsg
} }
func (l *listener) Accept(ctx context.Context) (*Request, *proto.RequestIdentification, error) { func (l *listener) Accept(ctx context.Context) (*neonet.Request, *proto.RequestIdentification, error) {
select{ select{
case <-l.closed: case <-l.closed:
// we know raw listener is already closed - return proper error about it // we know raw listener is already closed - return proper error about it
......
// Copyright (C) 2016-2017 Nexedi SA and Contributors. // Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -17,8 +17,13 @@ ...@@ -17,8 +17,13 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package neo // Package neonet provides service to establish links and exchange messages in
// Connection management // a NEO network.
//
// XXX text (Dial, Listen, ...)
package neonet
//go:generate gotrace gen .
import ( import (
"context" "context"
......
// Copyright (C) 2016-2017 Nexedi SA and Contributors. // Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -17,8 +17,7 @@ ...@@ -17,8 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package neo package neonet
// Connection management. Tests
import ( import (
"bytes" "bytes"
......
// Copyright (C) 2016-2017 Nexedi SA and Contributors. // Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package neo package neonet
// packets and packet buffers management // packets and packet buffers management
import ( import (
......
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package neonet
// code generated for tracepoints
import (
"lab.nexedi.com/kirr/go123/tracing"
"unsafe"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// traceevent: traceMsgRecv(c *Conn, msg proto.Msg)
type _t_traceMsgRecv struct {
tracing.Probe
probefunc func(c *Conn, msg proto.Msg)
}
var _traceMsgRecv *_t_traceMsgRecv
func traceMsgRecv(c *Conn, msg proto.Msg) {
if _traceMsgRecv != nil {
_traceMsgRecv_run(c, msg)
}
}
func _traceMsgRecv_run(c *Conn, msg proto.Msg) {
for p := _traceMsgRecv; p != nil; p = (*_t_traceMsgRecv)(unsafe.Pointer(p.Next())) {
p.probefunc(c, msg)
}
}
func traceMsgRecv_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg proto.Msg)) *tracing.Probe {
p := _t_traceMsgRecv{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMsgRecv)), &p.Probe)
return &p.Probe
}
// traceevent: traceMsgSendPre(l *NodeLink, connId uint32, msg proto.Msg)
type _t_traceMsgSendPre struct {
tracing.Probe
probefunc func(l *NodeLink, connId uint32, msg proto.Msg)
}
var _traceMsgSendPre *_t_traceMsgSendPre
func traceMsgSendPre(l *NodeLink, connId uint32, msg proto.Msg) {
if _traceMsgSendPre != nil {
_traceMsgSendPre_run(l, connId, msg)
}
}
func _traceMsgSendPre_run(l *NodeLink, connId uint32, msg proto.Msg) {
for p := _traceMsgSendPre; p != nil; p = (*_t_traceMsgSendPre)(unsafe.Pointer(p.Next())) {
p.probefunc(l, connId, msg)
}
}
func traceMsgSendPre_Attach(pg *tracing.ProbeGroup, probe func(l *NodeLink, connId uint32, msg proto.Msg)) *tracing.Probe {
p := _t_traceMsgSendPre{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMsgSendPre)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_c54acca8f21ba38c3ba9672c3d38021c3c8b9484() {}
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"sync" "sync"
"time" "time"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
...@@ -83,7 +84,7 @@ type Node struct { ...@@ -83,7 +84,7 @@ type Node struct {
proto.NodeInfo // .type, .addr, .uuid, ... XXX also protect by mu? proto.NodeInfo // .type, .addr, .uuid, ... XXX also protect by mu?
linkMu sync.Mutex linkMu sync.Mutex
link *NodeLink // link to this peer; nil if not connected link *neonet.NodeLink // link to this peer; nil if not connected
dialT time.Time // last dial finished at this time dialT time.Time // last dial finished at this time
// dialer notifies waiters via this; reinitialized at each redial; nil while not dialing // dialer notifies waiters via this; reinitialized at each redial; nil while not dialing
...@@ -284,7 +285,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe ...@@ -284,7 +285,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe
// XXX // XXX
// //
// See also: Link, CloseLink, Dial. // See also: Link, CloseLink, Dial.
func (p *Node) SetLink(link *NodeLink) { func (p *Node) SetLink(link *neonet.NodeLink) {
// XXX see Link about locking - whether it is needed here or not // XXX see Link about locking - whether it is needed here or not
p.linkMu.Lock() p.linkMu.Lock()
p.link = link p.link = link
...@@ -296,7 +297,7 @@ func (p *Node) SetLink(link *NodeLink) { ...@@ -296,7 +297,7 @@ func (p *Node) SetLink(link *NodeLink) {
// If the link is not yet established - Link returns nil. // If the link is not yet established - Link returns nil.
// //
// See also: Dial. // See also: Dial.
func (p *Node) Link() *NodeLink { func (p *Node) Link() *neonet.NodeLink {
// XXX do we need lock here? // XXX do we need lock here?
// XXX usages where Link is used (contrary to Dial) there is no need for lock // XXX usages where Link is used (contrary to Dial) there is no need for lock
p.linkMu.Lock() p.linkMu.Lock()
...@@ -326,7 +327,7 @@ func (p *Node) CloseLink(ctx context.Context) { ...@@ -326,7 +327,7 @@ func (p *Node) CloseLink(ctx context.Context) {
// dial does low-level work to dial peer // dial does low-level work to dial peer
// XXX p.* reading without lock - ok? // XXX p.* reading without lock - ok?
// XXX app.MyInfo without lock - ok? // XXX app.MyInfo without lock - ok?
func (p *Node) dial(ctx context.Context) (_ *NodeLink, err error) { func (p *Node) dial(ctx context.Context) (_ *neonet.NodeLink, err error) {
defer task.Runningf(&ctx, "connect %s", p.UUID)(&err) // XXX "connect" good word here? defer task.Runningf(&ctx, "connect %s", p.UUID)(&err) // XXX "connect" good word here?
app := p.nodeTab.nodeApp app := p.nodeTab.nodeApp
...@@ -363,7 +364,7 @@ const δtRedial = 3 * time.Second ...@@ -363,7 +364,7 @@ const δtRedial = 3 * time.Second
// dialed is result of dialing a peer. // dialed is result of dialing a peer.
type dialed struct { type dialed struct {
link *NodeLink link *neonet.NodeLink
err error err error
ready chan struct{} ready chan struct{}
} }
...@@ -379,7 +380,7 @@ type dialed struct { ...@@ -379,7 +380,7 @@ type dialed struct {
// //
// In case Dial returns an error - future Dial will attempt to reconnect with // In case Dial returns an error - future Dial will attempt to reconnect with
// "don't reconnect too fast" throttling. // "don't reconnect too fast" throttling.
func (p *Node) Dial(ctx context.Context) (*NodeLink, error) { func (p *Node) Dial(ctx context.Context) (*neonet.NodeLink, error) {
p.linkMu.Lock() p.linkMu.Lock()
// ok if already connected // ok if already connected
...@@ -412,7 +413,7 @@ func (p *Node) Dial(ctx context.Context) (*NodeLink, error) { ...@@ -412,7 +413,7 @@ func (p *Node) Dial(ctx context.Context) (*NodeLink, error) {
p.linkMu.Unlock() p.linkMu.Unlock()
go func() { go func() {
link, err := func() (*NodeLink, error) { link, err := func() (*neonet.NodeLink, error) {
// throttle redialing if too fast // throttle redialing if too fast
δt := time.Now().Sub(dialT) δt := time.Now().Sub(dialT)
if δt < δtRedial && !dialT.IsZero() { if δt < δtRedial && !dialT.IsZero() {
...@@ -451,7 +452,7 @@ func (p *Node) Dial(ctx context.Context) (*NodeLink, error) { ...@@ -451,7 +452,7 @@ func (p *Node) Dial(ctx context.Context) (*NodeLink, error) {
// //
// For established link Conn either creates new connection over the link, // For established link Conn either creates new connection over the link,
// XXX (currently inactive) or gets one from the pool of unused connections (see PutConn). // XXX (currently inactive) or gets one from the pool of unused connections (see PutConn).
func (p *Node) Conn(ctx context.Context) (*Conn, error) { func (p *Node) Conn(ctx context.Context) (*neonet.Conn, error) {
var err error var err error
/* /*
......
...@@ -31,8 +31,8 @@ ...@@ -31,8 +31,8 @@
// A message type can be looked up by message code with MsgType. // A message type can be looked up by message code with MsgType.
// //
// The proto packages provides only message definitions and low-level // The proto packages provides only message definitions and low-level
// primitives for their marshalling. Package lab.nexedi.com/kirr/neo/go/neo/net // primitives for their marshalling. Package lab.nexedi.com/kirr/neo/go/neo/neonet
// (XXX) provides actual service for message exchange over network. // provides actual service for message exchange over network.
package proto package proto
// This file defines everything that relates to messages on the wire. // This file defines everything that relates to messages on the wire.
......
...@@ -38,6 +38,7 @@ import ( ...@@ -38,6 +38,7 @@ import (
//"github.com/kylelemons/godebug/pretty" //"github.com/kylelemons/godebug/pretty"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/client" "lab.nexedi.com/kirr/neo/go/neo/client"
//"lab.nexedi.com/kirr/neo/go/neo/internal/common" //"lab.nexedi.com/kirr/neo/go/neo/internal/common"
...@@ -309,13 +310,14 @@ func NewTraceCollector(dispatch *tsync.EventDispatcher) *TraceCollector { ...@@ -309,13 +310,14 @@ func NewTraceCollector(dispatch *tsync.EventDispatcher) *TraceCollector {
} }
//trace:import "lab.nexedi.com/kirr/neo/go/neo" //trace:import "lab.nexedi.com/kirr/neo/go/neo"
//trace:import "lab.nexedi.com/kirr/neo/go/neo/neonet"
//trace:import "lab.nexedi.com/kirr/neo/go/neo/proto" //trace:import "lab.nexedi.com/kirr/neo/go/neo/proto"
// Attach attaches the tracer to appropriate trace points. // Attach attaches the tracer to appropriate trace points.
func (t *TraceCollector) Attach() { func (t *TraceCollector) Attach() {
tracing.Lock() tracing.Lock()
//neo_traceMsgRecv_Attach(t.pg, t.traceNeoMsgRecv) //neo_traceMsgRecv_Attach(t.pg, t.traceNeoMsgRecv)
neo_traceMsgSendPre_Attach(t.pg, t.traceNeoMsgSendPre) neonet_traceMsgSendPre_Attach(t.pg, t.traceNeoMsgSendPre)
proto_traceClusterStateChanged_Attach(t.pg, t.traceClusterState) proto_traceClusterStateChanged_Attach(t.pg, t.traceClusterState)
neo_traceNodeChanged_Attach(t.pg, t.traceNode) neo_traceNodeChanged_Attach(t.pg, t.traceNode)
traceMasterStartReady_Attach(t.pg, t.traceMasterStartReady) traceMasterStartReady_Attach(t.pg, t.traceMasterStartReady)
...@@ -356,7 +358,7 @@ func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) { ...@@ -356,7 +358,7 @@ func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) {
func (t *TraceCollector) TraceNetTx(ev *xnet.TraceTx) {} // we use traceNeoMsgSend instead func (t *TraceCollector) TraceNetTx(ev *xnet.TraceTx) {} // we use traceNeoMsgSend instead
func (t *TraceCollector) traceNeoMsgSendPre(l *neo.NodeLink, connID uint32, msg proto.Msg) { func (t *TraceCollector) traceNeoMsgSendPre(l *neonet.NodeLink, connID uint32, msg proto.Msg) {
t.d.Dispatch(&eventNeoSend{l.LocalAddr().String(), l.RemoteAddr().String(), connID, msg}) t.d.Dispatch(&eventNeoSend{l.LocalAddr().String(), l.RemoteAddr().String(), connID, msg})
} }
......
...@@ -34,6 +34,7 @@ import ( ...@@ -34,6 +34,7 @@ import (
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
...@@ -948,7 +949,7 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms ...@@ -948,7 +949,7 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// ---------------------------------------- // ----------------------------------------
// keepPeerUpdated sends cluster state updates to peer on the link // keepPeerUpdated sends cluster state updates to peer on the link
func (m *Master) keepPeerUpdated(ctx context.Context, link *neo.NodeLink) (err error) { func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (err error) {
// link should be already in parent ctx (XXX and closed on cancel ?) // link should be already in parent ctx (XXX and closed on cancel ?)
defer task.Runningf(&ctx, "keep updated")(&err) defer task.Runningf(&ctx, "keep updated")(&err)
......
...@@ -28,7 +28,7 @@ import ( ...@@ -28,7 +28,7 @@ import (
// "net" // "net"
"sync" "sync"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
...@@ -40,7 +40,7 @@ import ( ...@@ -40,7 +40,7 @@ import (
type Server interface { type Server interface {
// ServeLink serves already established nodelink (connection) in a blocking way. // ServeLink serves already established nodelink (connection) in a blocking way.
// ServeLink is usually run in separate goroutine // ServeLink is usually run in separate goroutine
ServeLink(ctx context.Context, link *neo.NodeLink) ServeLink(ctx context.Context, link *neonet.NodeLink)
} }
// Serve runs service on a listener // Serve runs service on a listener
...@@ -78,7 +78,7 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error { ...@@ -78,7 +78,7 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
// IdentifyPeer identifies peer on the link // IdentifyPeer identifies peer on the link
// it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes. // it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes.
// returns information about identified node or error. // returns information about identified node or error.
func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType proto.NodeType) (nodeInfo proto.RequestIdentification, err error) { func IdentifyPeer(ctx context.Context, link *neonet.NodeLink, myNodeType proto.NodeType) (nodeInfo proto.RequestIdentification, err error) {
defer xerr.Contextf(&err, "%s: identify", link) defer xerr.Contextf(&err, "%s: identify", link)
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
...@@ -124,7 +124,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType proto.Node ...@@ -124,7 +124,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType proto.Node
// event: node connects // event: node connects
type nodeCome struct { type nodeCome struct {
req *neo.Request req *neonet.Request
idReq *proto.RequestIdentification // we received this identification request idReq *proto.RequestIdentification // we received this identification request
} }
...@@ -138,7 +138,7 @@ type nodeLeave struct { ...@@ -138,7 +138,7 @@ type nodeLeave struct {
// reject sends rejective identification response and closes associated link // reject sends rejective identification response and closes associated link
func reject(ctx context.Context, req *neo.Request, resp proto.Msg) { func reject(ctx context.Context, req *neonet.Request, resp proto.Msg) {
// XXX cancel on ctx? // XXX cancel on ctx?
// log.Info(ctx, "identification rejected") ? // log.Info(ctx, "identification rejected") ?
err1 := req.Reply(resp) err1 := req.Reply(resp)
...@@ -150,7 +150,7 @@ func reject(ctx context.Context, req *neo.Request, resp proto.Msg) { ...@@ -150,7 +150,7 @@ func reject(ctx context.Context, req *neo.Request, resp proto.Msg) {
} }
// goreject spawns reject in separate goroutine properly added/done on wg // goreject spawns reject in separate goroutine properly added/done on wg
func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp proto.Msg) { func goreject(ctx context.Context, wg *sync.WaitGroup, req *neonet.Request, resp proto.Msg) {
wg.Add(1) wg.Add(1)
defer wg.Done() defer wg.Done()
go reject(ctx, req, resp) go reject(ctx, req, resp)
...@@ -158,7 +158,7 @@ func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp pr ...@@ -158,7 +158,7 @@ func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp pr
// accept replies with acceptive identification response // accept replies with acceptive identification response
// XXX spawn ping goroutine from here? // XXX spawn ping goroutine from here?
func accept(ctx context.Context, req *neo.Request, resp proto.Msg) error { func accept(ctx context.Context, req *neonet.Request, resp proto.Msg) error {
// XXX cancel on ctx // XXX cancel on ctx
err1 := req.Reply(resp) err1 := req.Reply(resp)
return err1 // XXX while trying to work on single conn return err1 // XXX while trying to work on single conn
......
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/internal/common" "lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -244,7 +245,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -244,7 +245,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// return error indicates: // return error indicates:
// - nil: initialization was ok and a command came from master to start operation. // - nil: initialization was ok and a command came from master to start operation.
// - !nil: initialization was cancelled or failed somehow. // - !nil: initialization was cancelled or failed somehow.
func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (reqStart *neo.Request, err error) { func (stor *Storage) m1initialize(ctx context.Context, mlink *neonet.NodeLink) (reqStart *neonet.Request, err error) {
defer task.Runningf(&ctx, "init %v", mlink)(&err) defer task.Runningf(&ctx, "init %v", mlink)(&err)
for { for {
...@@ -267,7 +268,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (req ...@@ -267,7 +268,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (req
var cmdStart = errors.New("start requested") var cmdStart = errors.New("start requested")
// m1initialize1 handles one message from master from under m1initialize // m1initialize1 handles one message from master from under m1initialize
func (stor *Storage) m1initialize1(ctx context.Context, req neo.Request) error { func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) error {
// XXX vvv move Send out of reply preparing logic // XXX vvv move Send out of reply preparing logic
var err error var err error
...@@ -334,7 +335,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neo.Request) error { ...@@ -334,7 +335,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neo.Request) error {
// it always returns with an error describing why serve has to be stopped - // it always returns with an error describing why serve has to be stopped -
// either due to master commanding us to stop, or context cancel or some other // either due to master commanding us to stop, or context cancel or some other
// error. // error.
func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err error) { func (stor *Storage) m1serve(ctx context.Context, reqStart *neonet.Request) (err error) {
mlink := reqStart.Link() mlink := reqStart.Link()
defer task.Runningf(&ctx, "serve %v", mlink)(&err) defer task.Runningf(&ctx, "serve %v", mlink)(&err)
...@@ -369,7 +370,7 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er ...@@ -369,7 +370,7 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er
} }
// m1serve1 handles one message from master under m1serve // m1serve1 handles one message from master under m1serve
func (stor *Storage) m1serve1(ctx context.Context, req neo.Request) error { func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error {
switch msg := req.Msg.(type) { switch msg := req.Msg.(type) {
default: default:
return fmt.Errorf("unexpected message: %T", msg) return fmt.Errorf("unexpected message: %T", msg)
...@@ -436,7 +437,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, ...@@ -436,7 +437,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
// serveLink serves incoming node-node link connection // serveLink serves incoming node-node link connection
func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *proto.RequestIdentification) (err error) { func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *proto.RequestIdentification) (err error) {
link := req.Link() link := req.Link()
defer task.Runningf(&ctx, "serve %s", link)(&err) defer task.Runningf(&ctx, "serve %s", link)(&err)
defer xio.CloseWhenDone(ctx, link)() defer xio.CloseWhenDone(ctx, link)()
...@@ -467,7 +468,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *pro ...@@ -467,7 +468,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *pro
switch errors.Cause(err) { switch errors.Cause(err) {
// XXX closed by main or peer down // XXX closed by main or peer down
// XXX review // XXX review
case neo.ErrLinkDown, neo.ErrLinkClosed: case neonet.ErrLinkDown, neonet.ErrLinkClosed:
log.Info(ctx, err) log.Info(ctx, err)
// ok // ok
...@@ -494,7 +495,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *pro ...@@ -494,7 +495,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *pro
// //
// XXX version that reuses goroutine to serve next client requests // XXX version that reuses goroutine to serve next client requests
// XXX for py compatibility (py has no way to tell us Conn is closed) // XXX for py compatibility (py has no way to tell us Conn is closed)
func (stor *Storage) serveClient(ctx context.Context, req neo.Request) { func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) {
link := req.Link() link := req.Link()
for { for {
...@@ -519,7 +520,7 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) { ...@@ -519,7 +520,7 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) {
switch errors.Cause(err) { switch errors.Cause(err) {
// XXX closed by main or peer down - all logged by main called // XXX closed by main or peer down - all logged by main called
// XXX review // XXX review
case neo.ErrLinkDown, neo.ErrLinkClosed: case neonet.ErrLinkDown, neonet.ErrLinkClosed:
// ok // ok
default: default:
......
...@@ -8,26 +8,35 @@ import ( ...@@ -8,26 +8,35 @@ import (
_ "unsafe" _ "unsafe"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
) )
// traceimport: "lab.nexedi.com/kirr/neo/go/neo" // traceimport: "lab.nexedi.com/kirr/neo/go/neo"
// rerun "gotrace gen" if you see link failure ↓↓↓ // rerun "gotrace gen" if you see link failure ↓↓↓
//go:linkname neo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo._trace_exporthash_470beceafeb4cecc8dee4072ee06329e20eef0f1 //go:linkname neo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo._trace_exporthash_3520b2da37a17b902760c32971b0fd9ccb6d2ddb
func neo_trace_exporthash() func neo_trace_exporthash()
func init() { neo_trace_exporthash() } func init() { neo_trace_exporthash() }
//go:linkname neo_traceMsgRecv_Attach lab.nexedi.com/kirr/neo/go/neo.traceMsgRecv_Attach
func neo_traceMsgRecv_Attach(*tracing.ProbeGroup, func(c *neo.Conn, msg proto.Msg)) *tracing.Probe
//go:linkname neo_traceMsgSendPre_Attach lab.nexedi.com/kirr/neo/go/neo.traceMsgSendPre_Attach
func neo_traceMsgSendPre_Attach(*tracing.ProbeGroup, func(l *neo.NodeLink, connId uint32, msg proto.Msg)) *tracing.Probe
//go:linkname neo_traceNodeChanged_Attach lab.nexedi.com/kirr/neo/go/neo.traceNodeChanged_Attach //go:linkname neo_traceNodeChanged_Attach lab.nexedi.com/kirr/neo/go/neo.traceNodeChanged_Attach
func neo_traceNodeChanged_Attach(*tracing.ProbeGroup, func(nt *neo.NodeTable, n *neo.Node)) *tracing.Probe func neo_traceNodeChanged_Attach(*tracing.ProbeGroup, func(nt *neo.NodeTable, n *neo.Node)) *tracing.Probe
// traceimport: "lab.nexedi.com/kirr/neo/go/neo/neonet"
// rerun "gotrace gen" if you see link failure ↓↓↓
//go:linkname neonet_trace_exporthash lab.nexedi.com/kirr/neo/go/neo/neonet._trace_exporthash_c54acca8f21ba38c3ba9672c3d38021c3c8b9484
func neonet_trace_exporthash()
func init() { neonet_trace_exporthash() }
//go:linkname neonet_traceMsgRecv_Attach lab.nexedi.com/kirr/neo/go/neo/neonet.traceMsgRecv_Attach
func neonet_traceMsgRecv_Attach(*tracing.ProbeGroup, func(c *neonet.Conn, msg proto.Msg)) *tracing.Probe
//go:linkname neonet_traceMsgSendPre_Attach lab.nexedi.com/kirr/neo/go/neo/neonet.traceMsgSendPre_Attach
func neonet_traceMsgSendPre_Attach(*tracing.ProbeGroup, func(l *neonet.NodeLink, connId uint32, msg proto.Msg)) *tracing.Probe
// traceimport: "lab.nexedi.com/kirr/neo/go/neo/proto" // traceimport: "lab.nexedi.com/kirr/neo/go/neo/proto"
// rerun "gotrace gen" if you see link failure ↓↓↓ // rerun "gotrace gen" if you see link failure ↓↓↓
......
...@@ -6,64 +6,8 @@ package neo ...@@ -6,64 +6,8 @@ package neo
import ( import (
"lab.nexedi.com/kirr/go123/tracing" "lab.nexedi.com/kirr/go123/tracing"
"unsafe" "unsafe"
"lab.nexedi.com/kirr/neo/go/neo/proto"
) )
// traceevent: traceMsgRecv(c *Conn, msg proto.Msg)
type _t_traceMsgRecv struct {
tracing.Probe
probefunc func(c *Conn, msg proto.Msg)
}
var _traceMsgRecv *_t_traceMsgRecv
func traceMsgRecv(c *Conn, msg proto.Msg) {
if _traceMsgRecv != nil {
_traceMsgRecv_run(c, msg)
}
}
func _traceMsgRecv_run(c *Conn, msg proto.Msg) {
for p := _traceMsgRecv; p != nil; p = (*_t_traceMsgRecv)(unsafe.Pointer(p.Next())) {
p.probefunc(c, msg)
}
}
func traceMsgRecv_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg proto.Msg)) *tracing.Probe {
p := _t_traceMsgRecv{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMsgRecv)), &p.Probe)
return &p.Probe
}
// traceevent: traceMsgSendPre(l *NodeLink, connId uint32, msg proto.Msg)
type _t_traceMsgSendPre struct {
tracing.Probe
probefunc func(l *NodeLink, connId uint32, msg proto.Msg)
}
var _traceMsgSendPre *_t_traceMsgSendPre
func traceMsgSendPre(l *NodeLink, connId uint32, msg proto.Msg) {
if _traceMsgSendPre != nil {
_traceMsgSendPre_run(l, connId, msg)
}
}
func _traceMsgSendPre_run(l *NodeLink, connId uint32, msg proto.Msg) {
for p := _traceMsgSendPre; p != nil; p = (*_t_traceMsgSendPre)(unsafe.Pointer(p.Next())) {
p.probefunc(l, connId, msg)
}
}
func traceMsgSendPre_Attach(pg *tracing.ProbeGroup, probe func(l *NodeLink, connId uint32, msg proto.Msg)) *tracing.Probe {
p := _t_traceMsgSendPre{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMsgSendPre)), &p.Probe)
return &p.Probe
}
// traceevent: traceNodeChanged(nt *NodeTable, n *Node) // traceevent: traceNodeChanged(nt *NodeTable, n *Node)
type _t_traceNodeChanged struct { type _t_traceNodeChanged struct {
...@@ -92,4 +36,4 @@ func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n ...@@ -92,4 +36,4 @@ func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n
} }
// trace export signature // trace export signature
func _trace_exporthash_470beceafeb4cecc8dee4072ee06329e20eef0f1() {} func _trace_exporthash_3520b2da37a17b902760c32971b0fd9ccb6d2ddb() {}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment