Commit 47f069a5 authored by Kirill Smelkov's avatar Kirill Smelkov

X move protocol bits into neo/proto/ package

parent 7b4ae7bd
...@@ -36,6 +36,7 @@ import ( ...@@ -36,6 +36,7 @@ import (
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/internal/common" "lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
...@@ -77,7 +78,7 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -77,7 +78,7 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// It will connect to master @masterAddr and identify with specified cluster name. // It will connect to master @masterAddr and identify with specified cluster name.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
cli := &Client{ cli := &Client{
node: neo.NewNodeApp(net, neo.CLIENT, clusterName, masterAddr, ""), node: neo.NewNodeApp(net, proto.CLIENT, clusterName, masterAddr, ""),
mlinkReady: make(chan struct{}), mlinkReady: make(chan struct{}),
operational: false, operational: false,
opReady: make(chan struct{}), opReady: make(chan struct{}),
...@@ -225,7 +226,7 @@ func (c *Client) talkMaster(ctx context.Context) (err error) { ...@@ -225,7 +226,7 @@ func (c *Client) talkMaster(ctx context.Context) (err error) {
} }
func (c *Client) talkMaster1(ctx context.Context) (err error) { func (c *Client) talkMaster1(ctx context.Context) (err error) {
mlink, accept, err := c.node.Dial(ctx, neo.MASTER, c.node.MasterAddr) mlink, accept, err := c.node.Dial(ctx, proto.MASTER, c.node.MasterAddr)
if err != nil { if err != nil {
// FIXME it is not only identification - e.g. ECONNREFUSED // FIXME it is not only identification - e.g. ECONNREFUSED
return err return err
...@@ -302,17 +303,17 @@ func (c *Client) recvMaster1(ctx context.Context, req neo.Request) error { ...@@ -302,17 +303,17 @@ func (c *Client) recvMaster1(ctx context.Context, req neo.Request) error {
return fmt.Errorf("unexpected message: %T", msg) return fmt.Errorf("unexpected message: %T", msg)
// M sends whole PT // M sends whole PT
case *neo.SendPartitionTable: case *proto.SendPartitionTable:
c.node.UpdatePartTab(ctx, msg) c.node.UpdatePartTab(ctx, msg)
// M sends δPT // M sends δPT
//case *neo.NotifyPartitionChanges: //case *proto.NotifyPartitionChanges:
// TODO // TODO
case *neo.NotifyNodeInformation: case *proto.NotifyNodeInformation:
c.node.UpdateNodeTab(ctx, msg) c.node.UpdateNodeTab(ctx, msg)
case *neo.NotifyClusterState: case *proto.NotifyClusterState:
c.node.UpdateClusterState(ctx, msg) c.node.UpdateClusterState(ctx, msg)
} }
...@@ -328,8 +329,8 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neo.NodeLink) (err e ...@@ -328,8 +329,8 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neo.NodeLink) (err e
defer task.Running(&ctx, "init")(&err) defer task.Running(&ctx, "init")(&err)
// ask M for PT // ask M for PT
rpt := neo.AnswerPartitionTable{} rpt := proto.AnswerPartitionTable{}
err = mlink.Ask1(&neo.AskPartitionTable{}, &rpt) err = mlink.Ask1(&proto.AskPartitionTable{}, &rpt)
if err != nil { if err != nil {
return err return err
} }
...@@ -379,8 +380,8 @@ func (c *Client) LastTid(ctx context.Context) (_ zodb.Tid, err error) { ...@@ -379,8 +380,8 @@ func (c *Client) LastTid(ctx context.Context) (_ zodb.Tid, err error) {
// XXX mlink can become down while we are making the call. // XXX mlink can become down while we are making the call.
// XXX do we want to return error or retry? // XXX do we want to return error or retry?
reply := neo.AnswerLastTransaction{} reply := proto.AnswerLastTransaction{}
err = mlink.Ask1(&neo.LastTransaction{}, &reply) // XXX Ask += ctx err = mlink.Ask1(&proto.LastTransaction{}, &reply) // XXX Ask += ctx
if err != nil { if err != nil {
return 0, err // XXX err ctx return 0, err // XXX err ctx
} }
...@@ -418,7 +419,7 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e ...@@ -418,7 +419,7 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e
if cell.Readable() { if cell.Readable() {
stor := c.node.NodeTab.Get(cell.UUID) stor := c.node.NodeTab.Get(cell.UUID)
// this storage might not yet come up // this storage might not yet come up
if stor != nil && stor.State == neo.RUNNING { if stor != nil && stor.State == proto.RUNNING {
storv = append(storv, stor) storv = append(storv, stor)
} }
} }
...@@ -443,13 +444,13 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e ...@@ -443,13 +444,13 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e
// S decides to send us something) // S decides to send us something)
// on the wire it comes as "before", not "at" // on the wire it comes as "before", not "at"
req := neo.GetObject{ req := proto.GetObject{
Oid: xid.Oid, Oid: xid.Oid,
Tid: common.At2Before(xid.At), Tid: common.At2Before(xid.At),
Serial: neo.INVALID_TID, Serial: proto.INVALID_TID,
} }
resp := neo.AnswerObject{} resp := proto.AnswerObject{}
err = slink.Ask1(&req, &resp) err = slink.Ask1(&req, &resp)
if err != nil { if err != nil {
return nil, 0, err // XXX err context return nil, 0, err // XXX err context
......
This diff is collapsed.
...@@ -35,7 +35,10 @@ import ( ...@@ -35,7 +35,10 @@ import (
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/xcommon/packed"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -103,11 +106,11 @@ func xconnError(err error) error { ...@@ -103,11 +106,11 @@ func xconnError(err error) error {
// Prepare PktBuf with content // Prepare PktBuf with content
func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
pkt := &PktBuf{make([]byte, pktHeaderLen + len(payload))} pkt := &PktBuf{make([]byte, proto.PktHeaderLen + len(payload))}
h := pkt.Header() h := pkt.Header()
h.ConnId = hton32(connid) h.ConnId = packed.Hton32(connid)
h.MsgCode = hton16(msgcode) h.MsgCode = packed.Hton16(msgcode)
h.MsgLen = hton32(uint32(len(payload))) h.MsgLen = packed.Hton32(uint32(len(payload)))
copy(pkt.Payload(), payload) copy(pkt.Payload(), payload)
return pkt return pkt
} }
...@@ -122,14 +125,14 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) { ...@@ -122,14 +125,14 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
errv := xerr.Errorv{} errv := xerr.Errorv{}
h := pkt.Header() h := pkt.Header()
// TODO include caller location // TODO include caller location
if ntoh32(h.ConnId) != connid { if packed.Ntoh32(h.ConnId) != connid {
errv.Appendf("header: unexpected connid %v (want %v)", ntoh32(h.ConnId), connid) errv.Appendf("header: unexpected connid %v (want %v)", packed.Ntoh32(h.ConnId), connid)
} }
if ntoh16(h.MsgCode) != msgcode { if packed.Ntoh16(h.MsgCode) != msgcode {
errv.Appendf("header: unexpected msgcode %v (want %v)", ntoh16(h.MsgCode), msgcode) errv.Appendf("header: unexpected msgcode %v (want %v)", packed.Ntoh16(h.MsgCode), msgcode)
} }
if ntoh32(h.MsgLen) != uint32(len(payload)) { if packed.Ntoh32(h.MsgLen) != uint32(len(payload)) {
errv.Appendf("header: unexpected msglen %v (want %v)", ntoh32(h.MsgLen), len(payload)) errv.Appendf("header: unexpected msglen %v (want %v)", packed.Ntoh32(h.MsgLen), len(payload))
} }
if !bytes.Equal(pkt.Payload(), payload) { if !bytes.Equal(pkt.Payload(), payload) {
errv.Appendf("payload differ:\n%s", errv.Appendf("payload differ:\n%s",
...@@ -140,10 +143,10 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) { ...@@ -140,10 +143,10 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
} }
// Verify PktBuf to match expected message // Verify PktBuf to match expected message
func xverifyPktMsg(pkt *PktBuf, connid uint32, msg Msg) { func xverifyPktMsg(pkt *PktBuf, connid uint32, msg proto.Msg) {
data := make([]byte, msg.neoMsgEncodedLen()) data := make([]byte, msg.NEOMsgEncodedLen())
msg.neoMsgEncode(data) msg.NEOMsgEncode(data)
xverifyPkt(pkt, connid, msg.neoMsgCode(), data) xverifyPkt(pkt, connid, msg.NEOMsgCode(), data)
} }
// delay a bit // delay a bit
...@@ -625,7 +628,7 @@ func TestNodeLink(t *testing.T) { ...@@ -625,7 +628,7 @@ func TestNodeLink(t *testing.T) {
gox(wg, func() { gox(wg, func() {
pkt := xrecvPkt(c) pkt := xrecvPkt(c)
n := ntoh16(pkt.Header().MsgCode) n := packed.Ntoh16(pkt.Header().MsgCode)
x := replyOrder[n] x := replyOrder[n]
// wait before it is our turn & echo pkt back // wait before it is our turn & echo pkt back
...@@ -743,12 +746,12 @@ func TestHandshake(t *testing.T) { ...@@ -743,12 +746,12 @@ func TestHandshake(t *testing.T) {
// ---- recv1 mode ---- // ---- recv1 mode ----
func xSend(c *Conn, msg Msg) { func xSend(c *Conn, msg proto.Msg) {
err := c.Send(msg) err := c.Send(msg)
exc.Raiseif(err) exc.Raiseif(err)
} }
func xRecv(c *Conn) Msg { func xRecv(c *Conn) proto.Msg {
msg, err := c.Recv() msg, err := c.Recv()
exc.Raiseif(err) exc.Raiseif(err)
return msg return msg
...@@ -760,12 +763,12 @@ func xRecv1(l *NodeLink) Request { ...@@ -760,12 +763,12 @@ func xRecv1(l *NodeLink) Request {
return req return req
} }
func xSend1(l *NodeLink, msg Msg) { func xSend1(l *NodeLink, msg proto.Msg) {
err := l.Send1(msg) err := l.Send1(msg)
exc.Raiseif(err) exc.Raiseif(err)
} }
func xverifyMsg(msg1, msg2 Msg) { func xverifyMsg(msg1, msg2 proto.Msg) {
if !reflect.DeepEqual(msg1, msg2) { if !reflect.DeepEqual(msg1, msg2) {
exc.Raisef("messages differ:\n%s", pretty.Compare(msg1, msg2)) exc.Raisef("messages differ:\n%s", pretty.Compare(msg1, msg2))
} }
...@@ -789,8 +792,8 @@ func TestRecv1Mode(t *testing.T) { ...@@ -789,8 +792,8 @@ func TestRecv1Mode(t *testing.T) {
//println("X aaa + 1") //println("X aaa + 1")
msg := xRecv(c) msg := xRecv(c)
//println("X aaa + 2") //println("X aaa + 2")
xverifyMsg(msg, &Ping{}) xverifyMsg(msg, &proto.Ping{})
xSend(c, &Pong{}) xSend(c, &proto.Pong{})
//println("X aaa + 3") //println("X aaa + 3")
msg = xRecv(c) msg = xRecv(c)
//println("X aaa + 4") //println("X aaa + 4")
...@@ -803,8 +806,8 @@ func TestRecv1Mode(t *testing.T) { ...@@ -803,8 +806,8 @@ func TestRecv1Mode(t *testing.T) {
c = xaccept(nl2) c = xaccept(nl2)
msg = xRecv(c) msg = xRecv(c)
//fmt.Println("X zzz + 1", c, msg) //fmt.Println("X zzz + 1", c, msg)
xverifyMsg(msg, &Error{ACK, "hello up there"}) xverifyMsg(msg, &proto.Error{proto.ACK, "hello up there"})
xSend(c, &Error{ACK, "hello to you too"}) xSend(c, &proto.Error{proto.ACK, "hello to you too"})
msg = xRecv(c) msg = xRecv(c)
//println("X zzz + 2") //println("X zzz + 2")
xverifyMsg(msg, errConnClosed) xverifyMsg(msg, errConnClosed)
...@@ -813,14 +816,14 @@ func TestRecv1Mode(t *testing.T) { ...@@ -813,14 +816,14 @@ func TestRecv1Mode(t *testing.T) {
}) })
//println("aaa") //println("aaa")
xSend1(nl1, &Ping{}) xSend1(nl1, &proto.Ping{})
// before next Send1 wait till peer receives errConnClosed from us // before next Send1 wait till peer receives errConnClosed from us
// otherwise peer could be already in accept while our errConnClosed is received // otherwise peer could be already in accept while our errConnClosed is received
// and there is only one receiving thread there ^^^ // and there is only one receiving thread there ^^^
<-sync <-sync
//println("bbb") //println("bbb")
xSend1(nl1, &Error{ACK, "hello up there"}) xSend1(nl1, &proto.Error{proto.ACK, "hello up there"})
//println("ccc") //println("ccc")
xwait(wg) xwait(wg)
...@@ -832,9 +835,9 @@ func TestRecv1Mode(t *testing.T) { ...@@ -832,9 +835,9 @@ func TestRecv1Mode(t *testing.T) {
c := xnewconn(nl2) c := xnewconn(nl2)
//println("aaa") //println("aaa")
xSend(c, &Ping{}) xSend(c, &proto.Ping{})
//println("bbb") //println("bbb")
xSend(c, &Ping{}) xSend(c, &proto.Ping{})
//println("ccc") //println("ccc")
msg := xRecv(c) msg := xRecv(c)
//println("ddd") //println("ddd")
...@@ -1158,8 +1161,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) { ...@@ -1158,8 +1161,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
default: default:
b.Fatalf("read -> unexpected message %T", msg) b.Fatalf("read -> unexpected message %T", msg)
case *GetObject: case *proto.GetObject:
err = req.Reply(&AnswerObject{ err = req.Reply(&proto.AnswerObject{
Oid: msg.Oid, Oid: msg.Oid,
Serial: msg.Serial, Serial: msg.Serial,
DataSerial: msg.Tid, DataSerial: msg.Tid,
...@@ -1175,8 +1178,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) { ...@@ -1175,8 +1178,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
// NOTE keeping inside loop to simulate what happens in real Load // NOTE keeping inside loop to simulate what happens in real Load
get := &GetObject{} get := &proto.GetObject{}
obj := &AnswerObject{} obj := &proto.AnswerObject{}
get.Oid = zodb.Oid(i) get.Oid = zodb.Oid(i)
get.Serial = zodb.Tid(i+1) get.Serial = zodb.Tid(i+1)
......
...@@ -38,26 +38,15 @@ import ( ...@@ -38,26 +38,15 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task" "lab.nexedi.com/kirr/neo/go/xcommon/task"
//"lab.nexedi.com/kirr/neo/go/xcommon/xio" //"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/zodb"
)
const (
//INVALID_UUID UUID = 0
// XXX -> zodb? "lab.nexedi.com/kirr/neo/go/neo/proto"
INVALID_TID zodb.Tid = 1<<64 - 1 // 0xffffffffffffffff
INVALID_OID zodb.Oid = 1<<64 - 1
// OID_LEN = 8
// TID_LEN = 8
) )
// NodeApp is base for implementing NEO node applications. // NodeApp is base for implementing NEO node applications.
// //
// XXX -> internal? // XXX -> internal?
type NodeApp struct { type NodeApp struct {
MyInfo NodeInfo MyInfo proto.NodeInfo
ClusterName string ClusterName string
Net xnet.Networker // network AP we are sending/receiving on Net xnet.Networker // network AP we are sending/receiving on
...@@ -66,22 +55,22 @@ type NodeApp struct { ...@@ -66,22 +55,22 @@ type NodeApp struct {
StateMu sync.RWMutex // <- XXX just embed? StateMu sync.RWMutex // <- XXX just embed?
NodeTab *NodeTable // information about nodes in the cluster NodeTab *NodeTable // information about nodes in the cluster
PartTab *PartitionTable // information about data distribution in the cluster PartTab *PartitionTable // information about data distribution in the cluster
ClusterState ClusterState // master idea about cluster state ClusterState proto.ClusterState // master idea about cluster state
// should be set by user so NodeApp can notify when master tells this node to shutdown // should be set by user so NodeApp can notify when master tells this node to shutdown
OnShutdown func() OnShutdown func()
} }
// NewNodeApp creates new node application // NewNodeApp creates new node application
func NewNodeApp(net xnet.Networker, typ NodeType, clusterName, masterAddr, serveAddr string) *NodeApp { func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr, serveAddr string) *NodeApp {
// convert serveAddr into neo format // convert serveAddr into neo format
addr, err := AddrString(net.Network(), serveAddr) addr, err := proto.AddrString(net.Network(), serveAddr)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
} }
app := &NodeApp{ app := &NodeApp{
MyInfo: NodeInfo{Type: typ, Addr: addr, IdTime: IdTimeNone}, MyInfo: proto.NodeInfo{Type: typ, Addr: addr, IdTime: proto.IdTimeNone},
ClusterName: clusterName, ClusterName: clusterName,
Net: net, Net: net,
MasterAddr: masterAddr, MasterAddr: masterAddr,
...@@ -103,7 +92,7 @@ func NewNodeApp(net xnet.Networker, typ NodeType, clusterName, masterAddr, serve ...@@ -103,7 +92,7 @@ func NewNodeApp(net xnet.Networker, typ NodeType, clusterName, masterAddr, serve
// //
// Dial does not update .NodeTab or its node entries in any way. // Dial does not update .NodeTab or its node entries in any way.
// For establishing links to peers present in .NodeTab use Node.Dial. // For establishing links to peers present in .NodeTab use Node.Dial.
func (app *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ *NodeLink, _ *AcceptIdentification, err error) { func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err) defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err)
link, err := DialLink(ctx, app.Net, addr) link, err := DialLink(ctx, app.Net, addr)
...@@ -125,14 +114,14 @@ func (app *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ ...@@ -125,14 +114,14 @@ func (app *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_
} }
}() }()
req := &RequestIdentification{ req := &proto.RequestIdentification{
NodeType: app.MyInfo.Type, NodeType: app.MyInfo.Type,
UUID: app.MyInfo.UUID, UUID: app.MyInfo.UUID,
Address: app.MyInfo.Addr, Address: app.MyInfo.Addr,
ClusterName: app.ClusterName, ClusterName: app.ClusterName,
IdTime: app.MyInfo.IdTime, // XXX ok? IdTime: app.MyInfo.IdTime, // XXX ok?
} }
accept := &AcceptIdentification{} accept := &proto.AcceptIdentification{}
// FIXME error if peer sends us something with another connID // FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock) // (currently we ignore and serveRecv will deadlock)
// //
...@@ -179,7 +168,7 @@ func (app *NodeApp) Listen() (Listener, error) { ...@@ -179,7 +168,7 @@ func (app *NodeApp) Listen() (Listener, error) {
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and // NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234 // listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty // -> host is never empty
addr, err := Addr(ll.Addr()) addr, err := proto.Addr(ll.Addr())
if err != nil { if err != nil {
// XXX -> panic here ? // XXX -> panic here ?
ll.Close() ll.Close()
...@@ -216,7 +205,7 @@ type Listener interface { ...@@ -216,7 +205,7 @@ type Listener interface {
// After successful accept it is the caller responsibility to reply the request. // After successful accept it is the caller responsibility to reply the request.
// //
// NOTE established link is Request.Link(). // NOTE established link is Request.Link().
Accept(ctx context.Context) (*Request, *RequestIdentification, error) Accept(ctx context.Context) (*Request, *proto.RequestIdentification, error)
} }
type listener struct { type listener struct {
...@@ -227,7 +216,7 @@ type listener struct { ...@@ -227,7 +216,7 @@ type listener struct {
type accepted struct { type accepted struct {
req *Request req *Request
idReq *RequestIdentification idReq *proto.RequestIdentification
err error err error
} }
...@@ -284,7 +273,7 @@ func (l *listener) accept(link *NodeLink, err error) { ...@@ -284,7 +273,7 @@ func (l *listener) accept(link *NodeLink, err error) {
} }
} }
func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *Request, _ *RequestIdentification, err error) { func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *Request, _ *proto.RequestIdentification, err error) {
if err0 != nil { if err0 != nil {
return nil, nil, err0 return nil, nil, err0
} }
...@@ -299,16 +288,16 @@ func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ * ...@@ -299,16 +288,16 @@ func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *
} }
switch msg := req.Msg.(type) { switch msg := req.Msg.(type) {
case *RequestIdentification: case *proto.RequestIdentification:
return &req, msg, nil return &req, msg, nil
} }
emsg := &Error{PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)} emsg := &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)}
req.Reply(emsg) // XXX err req.Reply(emsg) // XXX err
return nil, nil, emsg return nil, nil, emsg
} }
func (l *listener) Accept(ctx context.Context) (*Request, *RequestIdentification, error) { func (l *listener) Accept(ctx context.Context) (*Request, *proto.RequestIdentification, error) {
select{ select{
case <-l.closed: case <-l.closed:
// we know raw listener is already closed - return proper error about it // we know raw listener is already closed - return proper error about it
...@@ -330,7 +319,7 @@ func (l *listener) Addr() net.Addr { ...@@ -330,7 +319,7 @@ func (l *listener) Addr() net.Addr {
// ---------------------------------------- // ----------------------------------------
// UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately. // UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately.
func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *NotifyNodeInformation) { func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) {
// XXX msg.IdTime ? // XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList { for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "node update: %v", nodeInfo) log.Infof(ctx, "node update: %v", nodeInfo)
...@@ -344,7 +333,7 @@ func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *NotifyNodeInformatio ...@@ -344,7 +333,7 @@ func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *NotifyNodeInformatio
app.MyInfo.IdTime = nodeInfo.IdTime app.MyInfo.IdTime = nodeInfo.IdTime
// FIXME hack - better it be separate command and handled cleanly // FIXME hack - better it be separate command and handled cleanly
if nodeInfo.State == DOWN { if nodeInfo.State == proto.DOWN {
log.Info(ctx, "master told us to shutdown") log.Info(ctx, "master told us to shutdown")
log.Flush() log.Flush()
app.OnShutdown() app.OnShutdown()
...@@ -359,7 +348,7 @@ func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *NotifyNodeInformatio ...@@ -359,7 +348,7 @@ func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *NotifyNodeInformatio
} }
// UpdatePartTab applies updates to .PartTab from message and logs changes appropriately. // UpdatePartTab applies updates to .PartTab from message and logs changes appropriately.
func (app *NodeApp) UpdatePartTab(ctx context.Context, msg *SendPartitionTable) { func (app *NodeApp) UpdatePartTab(ctx context.Context, msg *proto.SendPartitionTable) {
pt := PartTabFromDump(msg.PTid, msg.RowList) pt := PartTabFromDump(msg.PTid, msg.RowList)
// XXX logging under lock // XXX logging under lock
log.Infof(ctx, "parttab update: %v", pt) log.Infof(ctx, "parttab update: %v", pt)
...@@ -367,7 +356,7 @@ func (app *NodeApp) UpdatePartTab(ctx context.Context, msg *SendPartitionTable) ...@@ -367,7 +356,7 @@ func (app *NodeApp) UpdatePartTab(ctx context.Context, msg *SendPartitionTable)
} }
// UpdateClusterState applies update to .ClusterState from message and logs change appropriately. // UpdateClusterState applies update to .ClusterState from message and logs change appropriately.
func (app *NodeApp) UpdateClusterState(ctx context.Context, msg *NotifyClusterState) { func (app *NodeApp) UpdateClusterState(ctx context.Context, msg *proto.NotifyClusterState) {
// XXX loging under lock // XXX loging under lock
log.Infof(ctx, "state update: %v", msg.State) log.Infof(ctx, "state update: %v", msg.State)
app.ClusterState.Set(msg.State) app.ClusterState.Set(msg.State)
......
...@@ -27,6 +27,8 @@ import ( ...@@ -27,6 +27,8 @@ import (
"sync" "sync"
"time" "time"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task" "lab.nexedi.com/kirr/neo/go/xcommon/task"
) )
...@@ -67,7 +69,7 @@ type NodeTable struct { ...@@ -67,7 +69,7 @@ type NodeTable struct {
//storv []*Node // storages //storv []*Node // storages
nodev []*Node // all other nodes nodev []*Node // all other nodes
notifyv []chan NodeInfo // subscribers notifyv []chan proto.NodeInfo // subscribers
} }
//trace:event traceNodeChanged(nt *NodeTable, n *Node) //trace:event traceNodeChanged(nt *NodeTable, n *Node)
...@@ -78,7 +80,7 @@ type NodeTable struct { ...@@ -78,7 +80,7 @@ type NodeTable struct {
type Node struct { type Node struct {
nodeTab *NodeTable // this node is part of nodeTab *NodeTable // this node is part of
NodeInfo // .type, .addr, .uuid, ... XXX also protect by mu? proto.NodeInfo // .type, .addr, .uuid, ... XXX also protect by mu?
linkMu sync.Mutex linkMu sync.Mutex
link *NodeLink // link to this peer; nil if not connected link *NodeLink // link to this peer; nil if not connected
...@@ -115,7 +117,7 @@ func (nt *NodeTable) All() []*Node { ...@@ -115,7 +117,7 @@ func (nt *NodeTable) All() []*Node {
} }
// Get finds node by uuid. // Get finds node by uuid.
func (nt *NodeTable) Get(uuid NodeUUID) *Node { func (nt *NodeTable) Get(uuid proto.NodeUUID) *Node {
// FIXME linear scan // FIXME linear scan
for _, node := range nt.nodev { for _, node := range nt.nodev {
if node.UUID == uuid { if node.UUID == uuid {
...@@ -130,7 +132,7 @@ func (nt *NodeTable) Get(uuid NodeUUID) *Node { ...@@ -130,7 +132,7 @@ func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// Update updates information about a node. // Update updates information about a node.
// //
// it returns corresponding node entry for convenience. // it returns corresponding node entry for convenience.
func (nt *NodeTable) Update(nodeInfo NodeInfo) *Node { func (nt *NodeTable) Update(nodeInfo proto.NodeInfo) *Node {
node := nt.Get(nodeInfo.UUID) node := nt.Get(nodeInfo.UUID)
if node == nil { if node == nil {
node = &Node{nodeTab: nt} node = &Node{nodeTab: nt}
...@@ -156,7 +158,7 @@ func (nt *NodeTable) StorageList() []*Node { ...@@ -156,7 +158,7 @@ func (nt *NodeTable) StorageList() []*Node {
// FIXME linear scan // FIXME linear scan
sl := []*Node{} sl := []*Node{}
for _, node := range nt.nodev { for _, node := range nt.nodev {
if node.Type == STORAGE { if node.Type == proto.STORAGE {
sl = append(sl, node) sl = append(sl, node)
} }
} }
...@@ -165,7 +167,7 @@ func (nt *NodeTable) StorageList() []*Node { ...@@ -165,7 +167,7 @@ func (nt *NodeTable) StorageList() []*Node {
// XXX doc // XXX doc
func (n *Node) SetState(state NodeState) { func (n *Node) SetState(state proto.NodeState) {
n.State = state n.State = state
traceNodeChanged(n.nodeTab, n) traceNodeChanged(n.nodeTab, n)
n.nodeTab.notify(n.NodeInfo) n.nodeTab.notify(n.NodeInfo)
...@@ -188,7 +190,7 @@ func (nt *NodeTable) String() string { ...@@ -188,7 +190,7 @@ func (nt *NodeTable) String() string {
// ---- subscription to nodetab updates ---- // ---- subscription to nodetab updates ----
// notify notifies NodeTable subscribers that nodeInfo was updated // notify notifies NodeTable subscribers that nodeInfo was updated
func (nt *NodeTable) notify(nodeInfo NodeInfo) { func (nt *NodeTable) notify(nodeInfo proto.NodeInfo) {
// XXX rlock for .notifyv ? // XXX rlock for .notifyv ?
for _, notify := range nt.notifyv { for _, notify := range nt.notifyv {
notify <- nodeInfo notify <- nodeInfo
...@@ -200,8 +202,8 @@ func (nt *NodeTable) notify(nodeInfo NodeInfo) { ...@@ -200,8 +202,8 @@ func (nt *NodeTable) notify(nodeInfo NodeInfo) {
// It returns a channel via which updates will be delivered and function to unsubscribe. // It returns a channel via which updates will be delivered and function to unsubscribe.
// //
// XXX locking: client for subscribe/unsubscribe XXX ok? // XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) { func (nt *NodeTable) Subscribe() (ch chan proto.NodeInfo, unsubscribe func()) {
ch = make(chan NodeInfo) // XXX how to specify ch buf size if needed ? ch = make(chan proto.NodeInfo) // XXX how to specify ch buf size if needed ?
nt.notifyv = append(nt.notifyv, ch) nt.notifyv = append(nt.notifyv, ch)
unsubscribe = func() { unsubscribe = func() {
...@@ -227,12 +229,12 @@ func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) { ...@@ -227,12 +229,12 @@ func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) {
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown. // to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown.
// //
// XXX locking: client for subscribe/unsubscribe XXX ok? // XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func()) { func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe func()) {
in, unsubscribe := nt.Subscribe() in, unsubscribe := nt.Subscribe()
ch = make(chan []NodeInfo) ch = make(chan []proto.NodeInfo)
go func() { go func() {
var updatev []NodeInfo var updatev []proto.NodeInfo
shutdown := false shutdown := false
for { for {
......
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/neo/proto"
) )
// PartitionTable represents object space partitioning in a cluster // PartitionTable represents object space partitioning in a cluster
...@@ -118,12 +119,12 @@ type PartitionTable struct { ...@@ -118,12 +119,12 @@ type PartitionTable struct {
tab [][]Cell // [#Np] pid -> []Cell tab [][]Cell // [#Np] pid -> []Cell
PTid PTid // ↑ for versioning XXX -> ver ? XXX move out of here? PTid proto.PTid // ↑ for versioning XXX -> ver ? XXX move out of here?
} }
// Cell describes one storage in a pid entry in partition table // Cell describes one storage in a pid entry in partition table
type Cell struct { type Cell struct {
CellInfo // .uuid + .state proto.CellInfo // .uuid + .state
// XXX ? + .haveUpToTid associated node has data up to such tid // XXX ? + .haveUpToTid associated node has data up to such tid
// = uptodate if haveUpToTid == lastTid // = uptodate if haveUpToTid == lastTid
...@@ -149,7 +150,7 @@ func (pt *PartitionTable) Get(oid zodb.Oid) []Cell { ...@@ -149,7 +150,7 @@ func (pt *PartitionTable) Get(oid zodb.Oid) []Cell {
// Readable reports whether it is ok to read data from a cell // Readable reports whether it is ok to read data from a cell
func (c *Cell) Readable() bool { func (c *Cell) Readable() bool {
switch c.State { switch c.State {
case UP_TO_DATE, FEEDING: case proto.UP_TO_DATE, proto.FEEDING:
return true return true
} }
return false return false
...@@ -165,7 +166,7 @@ func MakePartTab(np int, nodev []*Node) *PartitionTable { ...@@ -165,7 +166,7 @@ func MakePartTab(np int, nodev []*Node) *PartitionTable {
node := nodev[j] node := nodev[j]
// XXX assert node.State > DOWN // XXX assert node.State > DOWN
//fmt.Printf("tab[%d] <- %v\n", i, node.UUID) //fmt.Printf("tab[%d] <- %v\n", i, node.UUID)
tab[i] = []Cell{{CellInfo: CellInfo{node.UUID, UP_TO_DATE /*XXX ok?*/}}} tab[i] = []Cell{{CellInfo: proto.CellInfo{node.UUID, proto.UP_TO_DATE /*XXX ok?*/}}}
} }
return &PartitionTable{tab: tab} return &PartitionTable{tab: tab}
...@@ -203,7 +204,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool { ...@@ -203,7 +204,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
// //
// We leave it as is for now. // We leave it as is for now.
node := nt.Get(cell.UUID) node := nt.Get(cell.UUID)
if node == nil || node.State != RUNNING { // XXX PENDING is also ok ? if node == nil || node.State != proto.RUNNING { // XXX PENDING is also ok ?
continue continue
} }
...@@ -245,20 +246,20 @@ func (pt *PartitionTable) String() string { ...@@ -245,20 +246,20 @@ func (pt *PartitionTable) String() string {
} }
// XXX -> RowList() ? // XXX -> RowList() ?
func (pt *PartitionTable) Dump() []RowInfo { // XXX also include .ptid? -> struct ? func (pt *PartitionTable) Dump() []proto.RowInfo { // XXX also include .ptid? -> struct ?
rowv := make([]RowInfo, len(pt.tab)) rowv := make([]proto.RowInfo, len(pt.tab))
for i, row := range pt.tab { for i, row := range pt.tab {
cellv := make([]CellInfo, len(row)) cellv := make([]proto.CellInfo, len(row))
for j, cell := range row { for j, cell := range row {
cellv[j] = cell.CellInfo cellv[j] = cell.CellInfo
} }
rowv[i] = RowInfo{Offset: uint32(i), CellList: cellv} // XXX cast? rowv[i] = proto.RowInfo{Offset: uint32(i), CellList: cellv} // XXX cast?
} }
return rowv return rowv
} }
func PartTabFromDump(ptid PTid, rowv []RowInfo) *PartitionTable { func PartTabFromDump(ptid proto.PTid, rowv []proto.RowInfo) *PartitionTable {
// reconstruct partition table from response // reconstruct partition table from response
pt := &PartitionTable{} pt := &PartitionTable{}
pt.PTid = ptid pt.PTid = ptid
......
...@@ -21,19 +21,21 @@ package neo ...@@ -21,19 +21,21 @@ package neo
import ( import (
"testing" "testing"
"lab.nexedi.com/kirr/neo/go/neo/proto"
) )
func TestPartTabOperational(t *testing.T) { func TestPartTabOperational(t *testing.T) {
s1 := UUID(STORAGE, 1) s1 := proto.UUID(proto.STORAGE, 1)
s2 := UUID(STORAGE, 2) s2 := proto.UUID(proto.STORAGE, 2)
// create nodeinfo for uuid/state // create nodeinfo for uuid/state
n := func(uuid NodeUUID, state NodeState) NodeInfo { n := func(uuid proto.NodeUUID, state proto.NodeState) proto.NodeInfo {
return NodeInfo{UUID: uuid, State: state} // XXX .Type? return proto.NodeInfo{UUID: uuid, State: state} // XXX .Type?
} }
// create nodetab with [](uuid, state) // create nodetab with [](uuid, state)
N := func(nodeiv ...NodeInfo) *NodeTable { N := func(nodeiv ...proto.NodeInfo) *NodeTable {
nt := &NodeTable{} nt := &NodeTable{}
for _, nodei := range nodeiv { for _, nodei := range nodeiv {
nt.Update(nodei) nt.Update(nodei)
...@@ -42,8 +44,8 @@ func TestPartTabOperational(t *testing.T) { ...@@ -42,8 +44,8 @@ func TestPartTabOperational(t *testing.T) {
} }
// create cell with uuid/state // create cell with uuid/state
C := func(uuid NodeUUID, state CellState) Cell { C := func(uuid proto.NodeUUID, state proto.CellState) Cell {
return Cell{CellInfo{UUID: uuid, State: state}} return Cell{proto.CellInfo{UUID: uuid, State: state}}
} }
// shortcut to create []Cell // shortcut to create []Cell
...@@ -57,14 +59,14 @@ func TestPartTabOperational(t *testing.T) { ...@@ -57,14 +59,14 @@ func TestPartTabOperational(t *testing.T) {
var testv = []struct{pt *PartitionTable; nt *NodeTable; operational bool}{ var testv = []struct{pt *PartitionTable; nt *NodeTable; operational bool}{
// empty parttab is non-operational // empty parttab is non-operational
{P(), N(), false}, {P(), N(), false},
{P(), N(n(s1, RUNNING)), false}, {P(), N(n(s1, proto.RUNNING)), false},
// parttab with 1 storage // parttab with 1 storage
{P(v(C(s1, UP_TO_DATE))), N(), false}, {P(v(C(s1, proto.UP_TO_DATE))), N(), false},
{P(v(C(s1, UP_TO_DATE))), N(n(s2, RUNNING)), false}, {P(v(C(s1, proto.UP_TO_DATE))), N(n(s2, proto.RUNNING)), false},
{P(v(C(s1, OUT_OF_DATE))), N(n(s1, RUNNING)), false}, {P(v(C(s1, proto.OUT_OF_DATE))),N(n(s1, proto.RUNNING)), false},
{P(v(C(s1, UP_TO_DATE))), N(n(s1, RUNNING)), true}, {P(v(C(s1, proto.UP_TO_DATE))), N(n(s1, proto.RUNNING)), true},
{P(v(C(s1, FEEDING))), N(n(s1, RUNNING)), true}, {P(v(C(s1, proto.FEEDING))), N(n(s1, proto.RUNNING)), true},
// TODO more tests // TODO more tests
} }
......
...@@ -27,26 +27,29 @@ import ( ...@@ -27,26 +27,29 @@ import (
"unsafe" "unsafe"
"lab.nexedi.com/kirr/go123/xbytes" "lab.nexedi.com/kirr/go123/xbytes"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/xcommon/packed"
) )
// PktBuf is a buffer with full raw packet (header + data). // PktBuf is a buffer with full raw packet (header + data).
// //
// variables of type PktBuf are usually named "pkb" (packet buffer), similar to "skb" in Linux. // variables of type PktBuf are usually named "pkb" (packet buffer), similar to "skb" in Linux.
// //
// Allocate PktBuf via allocPkt() and free via PktBuf.Free(). // Allocate PktBuf via pktAlloc() and free via PktBuf.Free().
type PktBuf struct { type PktBuf struct {
Data []byte // whole packet data including all headers Data []byte // whole packet data including all headers
} }
// Header returns pointer to packet header. // Header returns pointer to packet header.
func (pkt *PktBuf) Header() *PktHeader { func (pkt *PktBuf) Header() *proto.PktHeader {
// XXX check len(Data) < PktHeader ? -> no, Data has to be allocated with cap >= pktHeaderLen // XXX check len(Data) < PktHeader ? -> no, Data has to be allocated with cap >= PktHeaderLen
return (*PktHeader)(unsafe.Pointer(&pkt.Data[0])) return (*proto.PktHeader)(unsafe.Pointer(&pkt.Data[0]))
} }
// Payload returns []byte representing packet payload. // Payload returns []byte representing packet payload.
func (pkt *PktBuf) Payload() []byte { func (pkt *PktBuf) Payload() []byte {
return pkt.Data[pktHeaderLen:] return pkt.Data[proto.PktHeaderLen:]
} }
// ---- PktBuf freelist ---- // ---- PktBuf freelist ----
...@@ -73,25 +76,25 @@ func (pkt *PktBuf) Free() { ...@@ -73,25 +76,25 @@ func (pkt *PktBuf) Free() {
// Strings dumps a packet in human-readable form // Strings dumps a packet in human-readable form
func (pkt *PktBuf) String() string { func (pkt *PktBuf) String() string {
if len(pkt.Data) < pktHeaderLen { if len(pkt.Data) < proto.PktHeaderLen {
return fmt.Sprintf("(! < pktHeaderLen) % x", pkt.Data) return fmt.Sprintf("(! < PktHeaderLen) % x", pkt.Data)
} }
h := pkt.Header() h := pkt.Header()
s := fmt.Sprintf(".%d", ntoh32(h.ConnId)) s := fmt.Sprintf(".%d", packed.Ntoh32(h.ConnId))
msgCode := ntoh16(h.MsgCode) msgCode := packed.Ntoh16(h.MsgCode)
msgLen := ntoh32(h.MsgLen) msgLen := packed.Ntoh32(h.MsgLen)
data := pkt.Payload() data := pkt.Payload()
msgType := msgTypeRegistry[msgCode] msgType := proto.MsgType(msgCode)
if msgType == nil { if msgType == nil {
s += fmt.Sprintf(" ? (%d) #%d [%d]: % x", msgCode, msgLen, len(data), data) s += fmt.Sprintf(" ? (%d) #%d [%d]: % x", msgCode, msgLen, len(data), data)
return s return s
} }
// XXX dup wrt Conn.Recv // XXX dup wrt Conn.Recv
msg := reflect.New(msgType).Interface().(Msg) msg := reflect.New(msgType).Interface().(proto.Msg)
n, err := msg.neoMsgDecode(data) n, err := msg.NEOMsgDecode(data)
if err != nil { if err != nil {
s += fmt.Sprintf(" (%s) %v; #%d [%d]: % x", msgType, err, msgLen, len(data), data) s += fmt.Sprintf(" (%s) %v; #%d [%d]: % x", msgType, err, msgLen, len(data), data)
} }
...@@ -108,12 +111,12 @@ func (pkt *PktBuf) String() string { ...@@ -108,12 +111,12 @@ func (pkt *PktBuf) String() string {
// Dump dumps a packet in raw form // Dump dumps a packet in raw form
func (pkt *PktBuf) Dump() string { func (pkt *PktBuf) Dump() string {
if len(pkt.Data) < pktHeaderLen { if len(pkt.Data) < proto.PktHeaderLen {
return fmt.Sprintf("(! < pktHeaderLen) % x", pkt.Data) return fmt.Sprintf("(! < pktHeaderLen) % x", pkt.Data)
} }
h := pkt.Header() h := pkt.Header()
data := pkt.Payload() data := pkt.Payload()
return fmt.Sprintf(".%d (%d) #%d [%d]: % x", return fmt.Sprintf(".%d (%d) #%d [%d]: % x",
ntoh32(h.ConnId), ntoh16(h.MsgCode), ntoh32(h.MsgLen), len(data), data) packed.Ntoh32(h.ConnId), packed.Ntoh16(h.MsgCode), packed.Ntoh32(h.MsgLen), len(data), data)
} }
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package neo package proto
// error related utilities // error related utilities
import ( import (
...@@ -27,7 +27,8 @@ import ( ...@@ -27,7 +27,8 @@ import (
) )
// XXX place=? -> methods of Error // XXX name -> zodbErrEncode, zodbErrDecode ?
// XXX should be not in proto/ ?
// ErrEncode translates an error into Error packet. // ErrEncode translates an error into Error packet.
// XXX more text describing relation with zodb errors // XXX more text describing relation with zodb errors
......
...@@ -17,20 +17,29 @@ ...@@ -17,20 +17,29 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
//go:generate stringer -output zproto-str.go -type ErrorCode,ClusterState,NodeType,NodeState,CellState proto.go packed.go //go:generate stringer -output zproto-str.go -type ErrorCode,ClusterState,NodeType,NodeState,CellState proto.go
package neo package proto
// supporting code for types defined in proto.go // supporting code for types defined in proto.go
import ( import (
"fmt" "fmt"
"math" "math"
"net" "net"
"reflect"
"strconv" "strconv"
"strings" "strings"
"time" "time"
) )
// MsgType looks up message type by message code.
//
// Nil is returned if message code is not valid.
func MsgType(msgCode uint16) reflect.Type {
return msgTypeRegistry[msgCode]
}
// XXX or better translate to some other errors ? // XXX or better translate to some other errors ?
// XXX here - not in proto.go - because else stringer will be confused // XXX here - not in proto.go - because else stringer will be confused
func (e *Error) Error() string { func (e *Error) Error() string {
...@@ -44,6 +53,8 @@ func (e *Error) Error() string { ...@@ -44,6 +53,8 @@ func (e *Error) Error() string {
// Set sets cluster state value to v. // Set sets cluster state value to v.
// Use Set instead of direct assignment for ClusterState tracing to work. // Use Set instead of direct assignment for ClusterState tracing to work.
//
// XXX move this to neo.clusterState wrapping proto.ClusterState?
func (cs *ClusterState) Set(v ClusterState) { func (cs *ClusterState) Set(v ClusterState) {
*cs = v *cs = v
traceClusterStateChanged(cs) traceClusterStateChanged(cs)
......
// Copyright (C) 2006-2017 Nexedi SA and Contributors. // Copyright (C) 2006-2018 Nexedi SA and Contributors.
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -16,8 +16,24 @@ ...@@ -16,8 +16,24 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package neo // Package proto provides definition of NEO messages and their marshalling
// protocol definition // to/from wire format.
//
// Two NEO nodes can exchange messages over underlying network link after
// performing NEO-specific handshake. A message is sent as a packet specifying
// ID of subconnection multiplexed on top of the underlying link, carried
// message code and message data.
//
// PktHeader describes packet header structure.
//
// Messages are represented by corresponding types that all implement Msg interface.
//
// A message type can be looked up by message code with MsgType.
//
// The proto packages provides only message definitions and low-level
// primitives for their marshalling. Package lab.nexedi.com/kirr/neo/go/neo/net
// (XXX) provides actual service for message exchange over network.
package proto
// This file defines everything that relates to messages on the wire. // This file defines everything that relates to messages on the wire.
// In particular every type that is included in a message is defined here as well. // In particular every type that is included in a message is defined here as well.
...@@ -39,6 +55,7 @@ package neo ...@@ -39,6 +55,7 @@ package neo
// The code to marshal/unmarshal messages is generated by protogen.go . // The code to marshal/unmarshal messages is generated by protogen.go .
//go:generate sh -c "go run protogen.go >zproto-marshal.go" //go:generate sh -c "go run protogen.go >zproto-marshal.go"
//go:generate gotrace gen .
// TODO regroup messages definitions to stay more close to 1 communication topic // TODO regroup messages definitions to stay more close to 1 communication topic
...@@ -53,6 +70,8 @@ import ( ...@@ -53,6 +70,8 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/xcommon/packed"
"encoding/binary" "encoding/binary"
"errors" "errors"
"math" "math"
...@@ -62,17 +81,24 @@ const ( ...@@ -62,17 +81,24 @@ const (
// The protocol version must be increased whenever upgrading a node may require // The protocol version must be increased whenever upgrading a node may require
// to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and // to upgrade other nodes. It is encoded as a 4-bytes big-endian integer and
// the high order byte 0 is different from TLS Handshake (0x16). // the high order byte 0 is different from TLS Handshake (0x16).
ProtocolVersion = 1 Version = 1
// length of packet header // length of packet header
pktHeaderLen = 10 // = unsafe.Sizeof(PktHeader{}), but latter gives typed constant (uintptr) PktHeaderLen = 10 // = unsafe.Sizeof(PktHeader{}), but latter gives typed constant (uintptr)
// we are not accepting packets larger than pktMaxSize. // packets larger than PktMaxSize are not allowed.
// in particular this avoids out-of-memory error on packets with corrupt message len. // this helps to avoid out-of-memory error on packets with corrupt message len.
pktMaxSize = 0x4000000 PktMaxSize = 0x4000000
// answerBit is set in message code in answer messages for compatibility with neo/py // answerBit is set in message code in answer messages for compatibility with neo/py
answerBit = 0x8000 answerBit = 0x8000
//INVALID_UUID UUID = 0
// XXX -> zodb?
INVALID_TID zodb.Tid = 1<<64 - 1 // 0xffffffffffffffff
INVALID_OID zodb.Oid = 1<<64 - 1
) )
// PktHeader represents header of a raw packet. // PktHeader represents header of a raw packet.
...@@ -81,29 +107,29 @@ const ( ...@@ -81,29 +107,29 @@ const (
// //
//neo:proto typeonly //neo:proto typeonly
type PktHeader struct { type PktHeader struct {
ConnId be32 // NOTE is .msgid in py ConnId packed.BE32 // NOTE is .msgid in py
MsgCode be16 // payload message code MsgCode packed.BE16 // payload message code
MsgLen be32 // payload message length (excluding packet header) MsgLen packed.BE32 // payload message length (excluding packet header)
} }
// Msg is the interface implemented by all NEO messages. // Msg is the interface implemented by all NEO messages.
type Msg interface { type Msg interface {
// marshal/unmarshal into/from wire format: // marshal/unmarshal into/from wire format:
// neoMsgCode returns message code needed to be used for particular message type // NEOMsgCode returns message code needed to be used for particular message type
// on the wire. // on the wire.
neoMsgCode() uint16 NEOMsgCode() uint16
// neoMsgEncodedLen returns how much space is needed to encode current message payload. // NEOMsgEncodedLen returns how much space is needed to encode current message payload.
neoMsgEncodedLen() int NEOMsgEncodedLen() int
// neoMsgEncode encodes current message state into buf. // NEOMsgEncode encodes current message state into buf.
// //
// len(buf) must be >= neoMsgEncodedLen(). // len(buf) must be >= neoMsgEncodedLen().
neoMsgEncode(buf []byte) NEOMsgEncode(buf []byte)
// neoMsgDecode decodes data into message in-place. // NEOMsgDecode decodes data into message in-place.
neoMsgDecode(data []byte) (nread int, err error) NEOMsgDecode(data []byte) (nread int, err error)
} }
// ErrDecodeOverflow is the error returned by neoMsgDecode when decoding hits buffer overflow // ErrDecodeOverflow is the error returned by neoMsgDecode when decoding hits buffer overflow
...@@ -127,6 +153,7 @@ const ( ...@@ -127,6 +153,7 @@ const (
INCOMPLETE_TRANSACTION INCOMPLETE_TRANSACTION
) )
// XXX move this to neo.clusterState wrapping proto.ClusterState?
//trace:event traceClusterStateChanged(cs *ClusterState) //trace:event traceClusterStateChanged(cs *ClusterState)
type ClusterState int32 type ClusterState int32
...@@ -584,7 +611,7 @@ type AnswerRebaseObject struct { ...@@ -584,7 +611,7 @@ type AnswerRebaseObject struct {
// FIXME POption('data') // FIXME POption('data')
Compression bool Compression bool
Checksum Checksum Checksum Checksum
Data []byte // XXX was string Data *mem.Buf
} }
...@@ -596,7 +623,7 @@ type StoreObject struct { ...@@ -596,7 +623,7 @@ type StoreObject struct {
Serial zodb.Tid Serial zodb.Tid
Compression bool Compression bool
Checksum Checksum Checksum Checksum
Data []byte // TODO encode -> separately (for writev) Data []byte // TODO -> msg.Buf, separately (for writev)
DataSerial zodb.Tid DataSerial zodb.Tid
Tid zodb.Tid Tid zodb.Tid
} }
...@@ -1036,7 +1063,7 @@ type AddObject struct { ...@@ -1036,7 +1063,7 @@ type AddObject struct {
Serial zodb.Tid Serial zodb.Tid
Compression bool Compression bool
Checksum Checksum Checksum Checksum
Data []byte // TODO from pool, separately (?) Data *mem.Buf
DataSerial zodb.Tid DataSerial zodb.Tid
} }
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package neo package proto
// protocol tests // NEO. protocol encoding tests
import ( import (
hexpkg "encoding/hex" hexpkg "encoding/hex"
...@@ -64,12 +64,12 @@ func u64(v uint64) string { ...@@ -64,12 +64,12 @@ func u64(v uint64) string {
} }
func TestPktHeader(t *testing.T) { func TestPktHeader(t *testing.T) {
// make sure PktHeader is really packed and its size matches pktHeaderLen // make sure PktHeader is really packed and its size matches PktHeaderLen
if unsafe.Sizeof(PktHeader{}) != 10 { if unsafe.Sizeof(PktHeader{}) != 10 {
t.Fatalf("sizeof(PktHeader) = %v ; want 10", unsafe.Sizeof(PktHeader{})) t.Fatalf("sizeof(PktHeader) = %v ; want 10", unsafe.Sizeof(PktHeader{}))
} }
if unsafe.Sizeof(PktHeader{}) != pktHeaderLen { if unsafe.Sizeof(PktHeader{}) != PktHeaderLen {
t.Fatalf("sizeof(PktHeader) = %v ; want %v", unsafe.Sizeof(PktHeader{}), pktHeaderLen) t.Fatalf("sizeof(PktHeader) = %v ; want %v", unsafe.Sizeof(PktHeader{}), PktHeaderLen)
} }
} }
...@@ -85,9 +85,9 @@ func testMsgMarshal(t *testing.T, msg Msg, encoded string) { ...@@ -85,9 +85,9 @@ func testMsgMarshal(t *testing.T, msg Msg, encoded string) {
}() }()
// msg.encode() == expected // msg.encode() == expected
msgCode := msg.neoMsgCode() msgCode := msg.NEOMsgCode()
n := msg.neoMsgEncodedLen() n := msg.NEOMsgEncodedLen()
msgType := msgTypeRegistry[msgCode] msgType := MsgType(msgCode)
if msgType != typ { if msgType != typ {
t.Errorf("%v: msgCode = %v which corresponds to %v", typ, msgCode, msgType) t.Errorf("%v: msgCode = %v which corresponds to %v", typ, msgCode, msgType)
} }
...@@ -96,7 +96,7 @@ func testMsgMarshal(t *testing.T, msg Msg, encoded string) { ...@@ -96,7 +96,7 @@ func testMsgMarshal(t *testing.T, msg Msg, encoded string) {
} }
buf := make([]byte, n) buf := make([]byte, n)
msg.neoMsgEncode(buf) msg.NEOMsgEncode(buf)
if string(buf) != encoded { if string(buf) != encoded {
t.Errorf("%v: encode result unexpected:", typ) t.Errorf("%v: encode result unexpected:", typ)
t.Errorf("\thave: %s", hexpkg.EncodeToString(buf)) t.Errorf("\thave: %s", hexpkg.EncodeToString(buf))
...@@ -126,13 +126,13 @@ func testMsgMarshal(t *testing.T, msg Msg, encoded string) { ...@@ -126,13 +126,13 @@ func testMsgMarshal(t *testing.T, msg Msg, encoded string) {
} }
}() }()
msg.neoMsgEncode(buf[:l]) msg.NEOMsgEncode(buf[:l])
}() }()
} }
// msg.decode() == expected // msg.decode() == expected
data := []byte(encoded + "noise") data := []byte(encoded + "noise")
n, err := msg2.neoMsgDecode(data) n, err := msg2.NEOMsgDecode(data)
if err != nil { if err != nil {
t.Errorf("%v: decode error %v", typ, err) t.Errorf("%v: decode error %v", typ, err)
} }
...@@ -146,7 +146,7 @@ func testMsgMarshal(t *testing.T, msg Msg, encoded string) { ...@@ -146,7 +146,7 @@ func testMsgMarshal(t *testing.T, msg Msg, encoded string) {
// decode must detect buffer overflow // decode must detect buffer overflow
for l := len(encoded)-1; l >= 0; l-- { for l := len(encoded)-1; l >= 0; l-- {
n, err = msg2.neoMsgDecode(data[:l]) n, err = msg2.NEOMsgDecode(data[:l])
if !(n==0 && err==ErrDecodeOverflow) { if !(n==0 && err==ErrDecodeOverflow) {
t.Errorf("%v: decode overflow not detected on [:%v]", typ, l) t.Errorf("%v: decode overflow not detected on [:%v]", typ, l)
} }
...@@ -281,11 +281,11 @@ func TestMsgMarshalAllOverflowLightly(t *testing.T) { ...@@ -281,11 +281,11 @@ func TestMsgMarshalAllOverflowLightly(t *testing.T) {
for _, typ := range msgTypeRegistry { for _, typ := range msgTypeRegistry {
// zero-value for a type // zero-value for a type
msg := reflect.New(typ).Interface().(Msg) msg := reflect.New(typ).Interface().(Msg)
l := msg.neoMsgEncodedLen() l := msg.NEOMsgEncodedLen()
zerol := make([]byte, l) zerol := make([]byte, l)
// decoding will turn nil slice & map into empty allocated ones. // decoding will turn nil slice & map into empty allocated ones.
// we need it so that reflect.DeepEqual works for msg encode/decode comparison // we need it so that reflect.DeepEqual works for msg encode/decode comparison
n, err := msg.neoMsgDecode(zerol) n, err := msg.NEOMsgDecode(zerol)
if !(n == l && err == nil) { if !(n == l && err == nil) {
t.Errorf("%v: zero-decode unexpected: %v, %v ; want %v, nil", typ, n, err, l) t.Errorf("%v: zero-decode unexpected: %v, %v ; want %v, nil", typ, n, err, l)
} }
...@@ -316,7 +316,7 @@ func TestMsgDecodeLenOverflow(t *testing.T) { ...@@ -316,7 +316,7 @@ func TestMsgDecodeLenOverflow(t *testing.T) {
} }
}() }()
n, err := tt.msg.neoMsgDecode(data) n, err := tt.msg.NEOMsgDecode(data)
if !(n == 0 && err == ErrDecodeOverflow) { if !(n == 0 && err == ErrDecodeOverflow) {
t.Errorf("%T: decode %x\nhave: %d, %v\nwant: %d, %v", tt.msg, data, t.Errorf("%T: decode %x\nhave: %d, %v\nwant: %d, %v", tt.msg, data,
n, err, 0, ErrDecodeOverflow) n, err, 0, ErrDecodeOverflow)
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package neo package proto
// test wire protocol compatibility with python // NEO. test wire protocol compatibility with python
//go:generate ./py/pyneo-gen-testdata //go:generate ./py/pyneo-gen-testdata
......
// Copyright (C) 2016-2017 Nexedi SA and Contributors. // Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -25,10 +25,10 @@ NEO. Protocol module. Code generator ...@@ -25,10 +25,10 @@ NEO. Protocol module. Code generator
This program generates marshalling code for message types defined in proto.go . This program generates marshalling code for message types defined in proto.go .
For every type 4 methods are generated in accordance with neo.Msg interface: For every type 4 methods are generated in accordance with neo.Msg interface:
neoMsgCode() uint16 NEOMsgCode() uint16
neoMsgEncodedLen() int NEOMsgEncodedLen() int
neoMsgEncode(buf []byte) NEOMsgEncode(buf []byte)
neoMsgDecode(data []byte) (nread int, err error) NEOMsgDecode(data []byte) (nread int, err error)
List of message types is obtained via searching through proto.go AST - looking List of message types is obtained via searching through proto.go AST - looking
for appropriate struct declarations there. for appropriate struct declarations there.
...@@ -95,11 +95,11 @@ func pos(x interface { Pos() token.Pos }) token.Position { ...@@ -95,11 +95,11 @@ func pos(x interface { Pos() token.Pos }) token.Position {
// get type name in context of neo package // get type name in context of neo package
var zodbPkg *types.Package var zodbPkg *types.Package
var neoPkg *types.Package var protoPkg *types.Package
func typeName(typ types.Type) string { func typeName(typ types.Type) string {
qf := func(pkg *types.Package) string { qf := func(pkg *types.Package) string {
switch pkg { switch pkg {
case neoPkg: case protoPkg:
// same package - unqualified // same package - unqualified
return "" return ""
...@@ -247,11 +247,11 @@ func main() { ...@@ -247,11 +247,11 @@ func main() {
log.SetFlags(0) log.SetFlags(0)
// go through proto.go and AST'ify & typecheck it // go through proto.go and AST'ify & typecheck it
zodbPkg = loadPkg("lab.nexedi.com/kirr/neo/go/zodb", "../zodb/zodb.go") zodbPkg = loadPkg("lab.nexedi.com/kirr/neo/go/zodb", "../../zodb/zodb.go")
neoPkg = loadPkg("lab.nexedi.com/kirr/neo/go/neo", "proto.go", "packed.go") protoPkg = loadPkg("lab.nexedi.com/kirr/neo/go/neo/proto", "proto.go")
// extract neo.customCodec // extract neo.customCodec
cc := neoPkg.Scope().Lookup("customCodec") cc := protoPkg.Scope().Lookup("customCodec")
if cc == nil { if cc == nil {
log.Fatal("cannot find `customCodec`") log.Fatal("cannot find `customCodec`")
} }
...@@ -285,8 +285,8 @@ func main() { ...@@ -285,8 +285,8 @@ func main() {
buf := Buffer{} buf := Buffer{}
buf.emit(`// Code generated by protogen.go; DO NOT EDIT. buf.emit(`// Code generated by protogen.go; DO NOT EDIT.
package neo package proto
// protocol messages to/from wire marshalling. // NEO. protocol messages to/from wire marshalling.
import ( import (
"encoding/binary" "encoding/binary"
...@@ -350,7 +350,7 @@ import ( ...@@ -350,7 +350,7 @@ import (
fmt.Fprintf(&buf, "// %s. %s\n\n", msgCode, typename) fmt.Fprintf(&buf, "// %s. %s\n\n", msgCode, typename)
buf.emit("func (*%s) neoMsgCode() uint16 {", typename) buf.emit("func (*%s) NEOMsgCode() uint16 {", typename)
buf.emit("return %s", msgCode) buf.emit("return %s", msgCode)
buf.emit("}\n") buf.emit("}\n")
...@@ -629,7 +629,7 @@ type sizer struct { ...@@ -629,7 +629,7 @@ type sizer struct {
// //
// when type is recursively walked, for every case code to update `data[n:]` is generated. // when type is recursively walked, for every case code to update `data[n:]` is generated.
// no overflow checks are generated as by neo.Msg interface provided data // no overflow checks are generated as by neo.Msg interface provided data
// buffer should have at least payloadLen length returned by neoMsgEncodedInfo() // buffer should have at least payloadLen length returned by NEOMsgEncodedLen()
// (the size computed by sizer). // (the size computed by sizer).
// //
// the code emitted looks like: // the code emitted looks like:
...@@ -638,7 +638,7 @@ type sizer struct { ...@@ -638,7 +638,7 @@ type sizer struct {
// encode<typ2>(data[n2:], path2) // encode<typ2>(data[n2:], path2)
// ... // ...
// //
// TODO encode have to care in neoMsgEncode to emit preamble such that bound // TODO encode have to care in NEOMsgEncode to emit preamble such that bound
// checking is performed only once (currently compiler emits many of them) // checking is performed only once (currently compiler emits many of them)
type encoder struct { type encoder struct {
commonCodeGen commonCodeGen
...@@ -686,7 +686,7 @@ var _ CodeGenerator = (*decoder)(nil) ...@@ -686,7 +686,7 @@ var _ CodeGenerator = (*decoder)(nil)
func (s *sizer) generatedCode() string { func (s *sizer) generatedCode() string {
code := Buffer{} code := Buffer{}
// prologue // prologue
code.emit("func (%s *%s) neoMsgEncodedLen() int {", s.recvName, s.typeName) code.emit("func (%s *%s) NEOMsgEncodedLen() int {", s.recvName, s.typeName)
if s.varUsed["size"] { if s.varUsed["size"] {
code.emit("var %s int", s.var_("size")) code.emit("var %s int", s.var_("size"))
} }
...@@ -707,7 +707,7 @@ func (s *sizer) generatedCode() string { ...@@ -707,7 +707,7 @@ func (s *sizer) generatedCode() string {
func (e *encoder) generatedCode() string { func (e *encoder) generatedCode() string {
code := Buffer{} code := Buffer{}
// prologue // prologue
code.emit("func (%s *%s) neoMsgEncode(data []byte) {", e.recvName, e.typeName) code.emit("func (%s *%s) NEOMsgEncode(data []byte) {", e.recvName, e.typeName)
code.Write(e.buf.Bytes()) code.Write(e.buf.Bytes())
...@@ -819,7 +819,7 @@ func (d *decoder) generatedCode() string { ...@@ -819,7 +819,7 @@ func (d *decoder) generatedCode() string {
code := Buffer{} code := Buffer{}
// prologue // prologue
code.emit("func (%s *%s) neoMsgDecode(data []byte) (int, error) {", d.recvName, d.typeName) code.emit("func (%s *%s) NEOMsgDecode(data []byte) (int, error) {", d.recvName, d.typeName)
if d.varUsed["nread"] { if d.varUsed["nread"] {
code.emit("var %v uint64", d.var_("nread")) code.emit("var %v uint64", d.var_("nread"))
} }
......
...@@ -65,7 +65,7 @@ _['NotifyClusterInformation'] = 'NotifyClusterState' ...@@ -65,7 +65,7 @@ _['NotifyClusterInformation'] = 'NotifyClusterState'
def main(): def main():
pyprotog = {} pyprotog = {}
execfile('../../neo/lib/protocol.py', pyprotog) execfile('../../../neo/lib/protocol.py', pyprotog)
pypacket = pyprotog['Packet'] pypacket = pyprotog['Packet']
pypackets = pyprotog['Packets'] pypackets = pyprotog['Packets']
...@@ -74,7 +74,7 @@ def main(): ...@@ -74,7 +74,7 @@ def main():
def emit(v): def emit(v):
print >>f, v print >>f, v
emit("// Code generated by %s; DO NOT EDIT." % __file__) emit("// Code generated by %s; DO NOT EDIT." % __file__)
emit("package neo") emit("package proto")
emit("\nvar pyMsgRegistry = map[uint16]string{") emit("\nvar pyMsgRegistry = map[uint16]string{")
......
// Code generated by "stringer -output zproto-str.go -type ErrorCode,ClusterState,NodeType,NodeState,CellState proto.go packed.go"; DO NOT EDIT. // Code generated by "stringer -output zproto-str.go -type ErrorCode,ClusterState,NodeType,NodeState,CellState proto.go"; DO NOT EDIT.
package neo package proto
import "fmt" import "fmt"
......
// Code generated by ./py/pyneo-gen-testdata; DO NOT EDIT. // Code generated by ./py/pyneo-gen-testdata; DO NOT EDIT.
package neo package proto
var pyMsgRegistry = map[uint16]string{ var pyMsgRegistry = map[uint16]string{
1: "RequestIdentification", 1: "RequestIdentification",
......
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package proto
// code generated for tracepoints
import (
"lab.nexedi.com/kirr/go123/tracing"
"unsafe"
)
// traceevent: traceClusterStateChanged(cs *ClusterState)
type _t_traceClusterStateChanged struct {
tracing.Probe
probefunc func(cs *ClusterState)
}
var _traceClusterStateChanged *_t_traceClusterStateChanged
func traceClusterStateChanged(cs *ClusterState) {
if _traceClusterStateChanged != nil {
_traceClusterStateChanged_run(cs)
}
}
func _traceClusterStateChanged_run(cs *ClusterState) {
for p := _traceClusterStateChanged; p != nil; p = (*_t_traceClusterStateChanged)(unsafe.Pointer(p.Next())) {
p.probefunc(cs)
}
}
func traceClusterStateChanged_Attach(pg *tracing.ProbeGroup, probe func(cs *ClusterState)) *tracing.Probe {
p := _t_traceClusterStateChanged{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceClusterStateChanged)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_20c3e52fbfabe08e304139ab4a6bbf7c569f0994() {}
This diff is collapsed.
This diff is collapsed.
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"sync" "sync"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
...@@ -77,7 +78,7 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error { ...@@ -77,7 +78,7 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
// IdentifyPeer identifies peer on the link // IdentifyPeer identifies peer on the link
// it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes. // it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes.
// returns information about identified node or error. // returns information about identified node or error.
func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.RequestIdentification, err error) { func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType proto.NodeType) (nodeInfo proto.RequestIdentification, err error) {
defer xerr.Contextf(&err, "%s: identify", link) defer xerr.Contextf(&err, "%s: identify", link)
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
...@@ -93,7 +94,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy ...@@ -93,7 +94,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy
} }
}() }()
req := neo.RequestIdentification{} req := proto.RequestIdentification{}
_, err = conn.Expect(&req) _, err = conn.Expect(&req)
if err != nil { if err != nil {
return nodeInfo, err return nodeInfo, err
...@@ -103,7 +104,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy ...@@ -103,7 +104,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy
// TODO hook here in logic to check identification request, assign nodeID etc // TODO hook here in logic to check identification request, assign nodeID etc
err = conn.Send(&neo.AcceptIdentification{ err = conn.Send(&proto.AcceptIdentification{
NodeType: myNodeType, NodeType: myNodeType,
MyUUID: 0, // XXX MyUUID: 0, // XXX
NumPartitions: 1, // XXX NumPartitions: 1, // XXX
...@@ -124,7 +125,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy ...@@ -124,7 +125,7 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy
// event: node connects // event: node connects
type nodeCome struct { type nodeCome struct {
req *neo.Request req *neo.Request
idReq *neo.RequestIdentification // we received this identification request idReq *proto.RequestIdentification // we received this identification request
} }
/* /*
...@@ -137,7 +138,7 @@ type nodeLeave struct { ...@@ -137,7 +138,7 @@ type nodeLeave struct {
// reject sends rejective identification response and closes associated link // reject sends rejective identification response and closes associated link
func reject(ctx context.Context, req *neo.Request, resp neo.Msg) { func reject(ctx context.Context, req *neo.Request, resp proto.Msg) {
// XXX cancel on ctx? // XXX cancel on ctx?
// log.Info(ctx, "identification rejected") ? // log.Info(ctx, "identification rejected") ?
err1 := req.Reply(resp) err1 := req.Reply(resp)
...@@ -149,7 +150,7 @@ func reject(ctx context.Context, req *neo.Request, resp neo.Msg) { ...@@ -149,7 +150,7 @@ func reject(ctx context.Context, req *neo.Request, resp neo.Msg) {
} }
// goreject spawns reject in separate goroutine properly added/done on wg // goreject spawns reject in separate goroutine properly added/done on wg
func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp neo.Msg) { func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp proto.Msg) {
wg.Add(1) wg.Add(1)
defer wg.Done() defer wg.Done()
go reject(ctx, req, resp) go reject(ctx, req, resp)
...@@ -157,7 +158,7 @@ func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp ne ...@@ -157,7 +158,7 @@ func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp ne
// accept replies with acceptive identification response // accept replies with acceptive identification response
// XXX spawn ping goroutine from here? // XXX spawn ping goroutine from here?
func accept(ctx context.Context, req *neo.Request, resp neo.Msg) error { func accept(ctx context.Context, req *neo.Request, resp proto.Msg) error {
// XXX cancel on ctx // XXX cancel on ctx
err1 := req.Reply(resp) err1 := req.Reply(resp)
return err1 // XXX while trying to work on single conn return err1 // XXX while trying to work on single conn
......
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/internal/common" "lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
...@@ -78,7 +79,7 @@ type Storage struct { ...@@ -78,7 +79,7 @@ type Storage struct {
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor *fs1.FileStorage) *Storage { func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor *fs1.FileStorage) *Storage {
stor := &Storage{ stor := &Storage{
node: neo.NewNodeApp(net, neo.STORAGE, clusterName, masterAddr, serveAddr), node: neo.NewNodeApp(net, proto.STORAGE, clusterName, masterAddr, serveAddr),
zstor: zstor, zstor: zstor,
} }
...@@ -198,7 +199,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) { ...@@ -198,7 +199,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) {
// XXX distinguish between temporary problems and non-temporary ones? // XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) { func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX dup in Client.talkMaster1 ? // XXX dup in Client.talkMaster1 ?
mlink, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr) mlink, accept, err := stor.node.Dial(ctx, proto.MASTER, stor.node.MasterAddr)
if err != nil { if err != nil {
return err return err
} }
...@@ -274,49 +275,49 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neo.Request) error { ...@@ -274,49 +275,49 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neo.Request) error {
default: default:
return fmt.Errorf("unexpected message: %T", msg) return fmt.Errorf("unexpected message: %T", msg)
case *neo.StartOperation: case *proto.StartOperation:
// ok, transition to serve // ok, transition to serve
return cmdStart return cmdStart
case *neo.Recovery: case *proto.Recovery:
err = req.Reply(&neo.AnswerRecovery{ err = req.Reply(&proto.AnswerRecovery{
PTid: stor.node.PartTab.PTid, PTid: stor.node.PartTab.PTid,
BackupTid: neo.INVALID_TID, BackupTid: proto.INVALID_TID,
TruncateTid: neo.INVALID_TID}) TruncateTid: proto.INVALID_TID})
case *neo.AskPartitionTable: case *proto.AskPartitionTable:
// TODO initially read PT from disk // TODO initially read PT from disk
err = req.Reply(&neo.AnswerPartitionTable{ err = req.Reply(&proto.AnswerPartitionTable{
PTid: stor.node.PartTab.PTid, PTid: stor.node.PartTab.PTid,
RowList: stor.node.PartTab.Dump()}) RowList: stor.node.PartTab.Dump()})
case *neo.LockedTransactions: case *proto.LockedTransactions:
// XXX r/o stub // XXX r/o stub
err = req.Reply(&neo.AnswerLockedTransactions{}) err = req.Reply(&proto.AnswerLockedTransactions{})
// TODO AskUnfinishedTransactions // TODO AskUnfinishedTransactions
case *neo.LastIDs: case *proto.LastIDs:
lastTid, zerr1 := stor.zstor.LastTid(ctx) lastTid, zerr1 := stor.zstor.LastTid(ctx)
lastOid, zerr2 := stor.zstor.LastOid(ctx) lastOid, zerr2 := stor.zstor.LastOid(ctx)
if zerr := xerr.First(zerr1, zerr2); zerr != nil { if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return zerr // XXX send the error to M return zerr // XXX send the error to M
} }
err = req.Reply(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid}) err = req.Reply(&proto.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
case *neo.SendPartitionTable: case *proto.SendPartitionTable:
// TODO M sends us whole PT -> save locally // TODO M sends us whole PT -> save locally
stor.node.UpdatePartTab(ctx, msg) // XXX lock? stor.node.UpdatePartTab(ctx, msg) // XXX lock?
case *neo.NotifyPartitionChanges: case *proto.NotifyPartitionChanges:
// TODO M sends us δPT -> save locally? // TODO M sends us δPT -> save locally?
case *neo.NotifyNodeInformation: case *proto.NotifyNodeInformation:
// XXX check for myUUID and consider it a command (like neo/py) does? // XXX check for myUUID and consider it a command (like neo/py) does?
stor.node.UpdateNodeTab(ctx, msg) // XXX lock? stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
case *neo.NotifyClusterState: case *proto.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it? stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
} }
...@@ -347,7 +348,7 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er ...@@ -347,7 +348,7 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er
// reply M we are ready // reply M we are ready
// XXX according to current neo/py this is separate send - not reply - and so we do here // XXX according to current neo/py this is separate send - not reply - and so we do here
err = reqStart.Reply(&neo.NotifyReady{}) err = reqStart.Reply(&proto.NotifyReady{})
reqStart.Close() reqStart.Close()
if err != nil { if err != nil {
return err return err
...@@ -373,16 +374,16 @@ func (stor *Storage) m1serve1(ctx context.Context, req neo.Request) error { ...@@ -373,16 +374,16 @@ func (stor *Storage) m1serve1(ctx context.Context, req neo.Request) error {
default: default:
return fmt.Errorf("unexpected message: %T", msg) return fmt.Errorf("unexpected message: %T", msg)
case *neo.StopOperation: case *proto.StopOperation:
return fmt.Errorf("stop requested") return fmt.Errorf("stop requested")
// XXX SendPartitionTable? // XXX SendPartitionTable?
// XXX NotifyPartitionChanges? // XXX NotifyPartitionChanges?
case *neo.NotifyNodeInformation: case *proto.NotifyNodeInformation:
stor.node.UpdateNodeTab(ctx, msg) // XXX lock? stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
case *neo.NotifyClusterState: case *proto.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it? stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
// TODO commit related messages // TODO commit related messages
...@@ -394,13 +395,13 @@ func (stor *Storage) m1serve1(ctx context.Context, req neo.Request) error { ...@@ -394,13 +395,13 @@ func (stor *Storage) m1serve1(ctx context.Context, req neo.Request) error {
// --- serve incoming connections from other nodes --- // --- serve incoming connections from other nodes ---
// identify processes identification request from connected peer. // identify processes identification request from connected peer.
func (stor *Storage) identify(idReq *neo.RequestIdentification) (neo.Msg, bool) { func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bool) {
// XXX stub: we accept clients and don't care about their UUID // XXX stub: we accept clients and don't care about their UUID
if idReq.NodeType != neo.CLIENT { if idReq.NodeType != proto.CLIENT {
return &neo.Error{neo.PROTOCOL_ERROR, "only clients are accepted"}, false return &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}, false
} }
if idReq.ClusterName != stor.node.ClusterName { if idReq.ClusterName != stor.node.ClusterName {
return &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"}, false return &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}, false
} }
// check operational // check operational
...@@ -409,10 +410,10 @@ func (stor *Storage) identify(idReq *neo.RequestIdentification) (neo.Msg, bool) ...@@ -409,10 +410,10 @@ func (stor *Storage) identify(idReq *neo.RequestIdentification) (neo.Msg, bool)
stor.opMu.Unlock() stor.opMu.Unlock()
if !operational { if !operational {
return &neo.Error{neo.NOT_READY, "cluster not operational"}, false return &proto.Error{proto.NOT_READY, "cluster not operational"}, false
} }
return &neo.AcceptIdentification{ return &proto.AcceptIdentification{
NodeType: stor.node.MyInfo.Type, NodeType: stor.node.MyInfo.Type,
MyUUID: stor.node.MyInfo.UUID, // XXX lock wrt update MyUUID: stor.node.MyInfo.UUID, // XXX lock wrt update
NumPartitions: 1, // XXX NumPartitions: 1, // XXX
...@@ -435,7 +436,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, ...@@ -435,7 +436,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
// serveLink serves incoming node-node link connection // serveLink serves incoming node-node link connection
func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *neo.RequestIdentification) (err error) { func (stor *Storage) serveLink(ctx context.Context, req *neo.Request, idReq *proto.RequestIdentification) (err error) {
link := req.Link() link := req.Link()
defer task.Runningf(&ctx, "serve %s", link)(&err) defer task.Runningf(&ctx, "serve %s", link)(&err)
defer xio.CloseWhenDone(ctx, link)() defer xio.CloseWhenDone(ctx, link)()
...@@ -507,7 +508,7 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) { ...@@ -507,7 +508,7 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) {
// XXX hack -> resp.Release() // XXX hack -> resp.Release()
// XXX req.Msg release too? // XXX req.Msg release too?
if resp, ok := resp.(*neo.AnswerObject); ok { if resp, ok := resp.(*proto.AnswerObject); ok {
resp.Data.Release() resp.Data.Release()
} }
...@@ -547,11 +548,11 @@ func sha1Sum(b []byte) [sha1.Size]byte { ...@@ -547,11 +548,11 @@ func sha1Sum(b []byte) [sha1.Size]byte {
} }
// serveClient1 prepares response for 1 request from client // serveClient1 prepares response for 1 request from client
func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) { func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Msg) {
switch req := req.(type) { switch req := req.(type) {
case *neo.GetObject: case *proto.GetObject:
xid := zodb.Xid{Oid: req.Oid} xid := zodb.Xid{Oid: req.Oid}
if req.Serial != neo.INVALID_TID { if req.Serial != proto.INVALID_TID {
xid.At = req.Serial xid.At = req.Serial
} else { } else {
xid.At = common.Before2At(req.Tid) xid.At = common.Before2At(req.Tid)
...@@ -562,15 +563,15 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -562,15 +563,15 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
if err != nil { if err != nil {
// translate err to NEO protocol error codes // translate err to NEO protocol error codes
e := err.(*zodb.OpError) // XXX move this to ErrEncode? e := err.(*zodb.OpError) // XXX move this to ErrEncode?
return neo.ErrEncode(e.Err) return proto.ErrEncode(e.Err)
} }
// compatibility with py side: // compatibility with py side:
// for loadSerial - check we have exact hit - else "nodata" // for loadSerial - check we have exact hit - else "nodata"
if req.Serial != neo.INVALID_TID { if req.Serial != proto.INVALID_TID {
if serial != req.Serial { if serial != req.Serial {
return &neo.Error{ return &proto.Error{
Code: neo.OID_NOT_FOUND, Code: proto.OID_NOT_FOUND,
Message: fmt.Sprintf("%s: no data with serial %s", xid.Oid, req.Serial), Message: fmt.Sprintf("%s: no data with serial %s", xid.Oid, req.Serial),
} }
} }
...@@ -578,10 +579,10 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -578,10 +579,10 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
// no next serial -> None // no next serial -> None
if nextSerial == zodb.TidMax { if nextSerial == zodb.TidMax {
nextSerial = neo.INVALID_TID nextSerial = proto.INVALID_TID
} }
return &neo.AnswerObject{ return &proto.AnswerObject{
Oid: xid.Oid, Oid: xid.Oid,
Serial: serial, Serial: serial,
NextSerial: nextSerial, NextSerial: nextSerial,
...@@ -594,19 +595,19 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -594,19 +595,19 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
// XXX .DataSerial // XXX .DataSerial
} }
case *neo.LastTransaction: case *proto.LastTransaction:
lastTid, err := stor.zstor.LastTid(ctx) lastTid, err := stor.zstor.LastTid(ctx)
if err != nil { if err != nil {
return neo.ErrEncode(err) return proto.ErrEncode(err)
} }
return &neo.AnswerLastTransaction{lastTid} return &proto.AnswerLastTransaction{lastTid}
//case *ObjectHistory: //case *ObjectHistory:
//case *StoreObject: //case *StoreObject:
default: default:
return &neo.Error{neo.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req)} return &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req)}
} }
//req.Put(...) //req.Put(...)
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment