Commit 505cf34b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3689c816
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
package neo // Package client provides ZODB interface for accessing NEO cluster
// client node package client
import ( import (
"context" "context"
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package neo package server
// master node // master node
import ( import (
...@@ -530,7 +530,7 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -530,7 +530,7 @@ func (m *Master) service(ctx context.Context) (err error) {
fmt.Println("master: service") fmt.Println("master: service")
defer xerr.Context(&err, "master: service") defer xerr.Context(&err, "master: service")
m.setClusterState(ClusterRunning) m.setClusterState(neo.ClusterRunning)
loop: loop:
for { for {
...@@ -580,7 +580,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) { ...@@ -580,7 +580,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
// - IdTimestamp ? // - IdTimestamp ?
if n.idReq.ClusterName != m.clusterName { if n.idReq.ClusterName != m.clusterName {
n.idResp <- &Error{PROTOCOL_ERROR, "cluster name mismatch"} // XXX n.idResp <- &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"} // XXX
return nil, false return nil, false
} }
...@@ -596,20 +596,20 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) { ...@@ -596,20 +596,20 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
if node != nil { if node != nil {
// reject - uuid is already occupied by someone else // reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting // XXX check also for down state - it could be the same node reconnecting
n.idResp <- &Error{PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX n.idResp <- &neo.Error{neo.PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX
return nil, false return nil, false
} }
// XXX accept only certain kind of nodes depending on .clusterState, e.g. // XXX accept only certain kind of nodes depending on .clusterState, e.g.
switch nodeType { switch nodeType {
case CLIENT: case neo.CLIENT:
n.idResp <- &Error{NOT_READY, "cluster not operational"} n.idResp <- &neo.Error{neo.NOT_READY, "cluster not operational"}
// XXX ... // XXX ...
} }
n.idResp <- &AcceptIdentification{ n.idResp <- &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
MyNodeUUID: m.nodeUUID, MyNodeUUID: m.nodeUUID,
NumPartitions: 1, // FIXME hardcoded NumPartitions: 1, // FIXME hardcoded
...@@ -618,17 +618,17 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) { ...@@ -618,17 +618,17 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
} }
// update nodeTab // update nodeTab
var nodeState NodeState var nodeState neo.NodeState
switch nodeType { switch nodeType {
case STORAGE: case neo.STORAGE:
// FIXME py sets to RUNNING/PENDING depending on cluster state // FIXME py sets to RUNNING/PENDING depending on cluster state
nodeState = PENDING nodeState = neo.PENDING
default: default:
nodeState = RUNNING nodeState = neo.RUNNING
} }
nodeInfo := NodeInfo{ nodeInfo := neo.NodeInfo{
NodeType: nodeType, NodeType: nodeType,
Address: n.idReq.Address, Address: n.idReq.Address,
NodeUUID: uuid, NodeUUID: uuid,
...@@ -649,7 +649,7 @@ func (m *Master) allocUUID(nodeType neo.NodeType) neo.NodeUUID { ...@@ -649,7 +649,7 @@ func (m *Master) allocUUID(nodeType neo.NodeType) neo.NodeUUID {
// XXX but since whole uuid assign idea is not good - let's keep it dirty here // XXX but since whole uuid assign idea is not good - let's keep it dirty here
typ := int(nodeType & 7) << (24 + 4) // note temp=0 typ := int(nodeType & 7) << (24 + 4) // note temp=0
for num := 1; num < 1<<24; num++ { for num := 1; num < 1<<24; num++ {
uuid := NodeUUID(typ | num) uuid := neo.NodeUUID(typ | num)
if m.nodeTab.Get(uuid) == nil { if m.nodeTab.Get(uuid) == nil {
return uuid return uuid
} }
...@@ -692,12 +692,12 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -692,12 +692,12 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
return return
} }
idReq := RequestIdentification{} idReq := neo.RequestIdentification{}
err = Expect(conn, &idReq) err = neo.Expect(conn, &idReq)
if err != nil { if err != nil {
logf("identify: %v", err) logf("identify: %v", err)
// XXX ok to let peer know error as is? e.g. even IO error on Recv? // XXX ok to let peer know error as is? e.g. even IO error on Recv?
err = EncodeAndSend(conn, &Error{PROTOCOL_ERROR, err.Error()}) err = neo.EncodeAndSend(conn, &neo.Error{neo.PROTOCOL_ERROR, err.Error()})
if err != nil { if err != nil {
logf("failed to send error: %v", err) logf("failed to send error: %v", err)
} }
...@@ -718,7 +718,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -718,7 +718,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
} }
// let the peer know identification result // let the peer know identification result
err = EncodeAndSend(conn, idResp) err = neo.EncodeAndSend(conn, idResp)
if err != nil { if err != nil {
return return
} }
...@@ -771,7 +771,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -771,7 +771,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
return return
case nodeUpdateV := <-nodeCh: case nodeUpdateV := <-nodeCh:
pkt = &NotifyNodeInformation{ pkt = &neo.NotifyNodeInformation{
IdTimestamp: math.NaN(), // XXX IdTimestamp: math.NaN(), // XXX
NodeList: nodeUpdateV, NodeList: nodeUpdateV,
} }
...@@ -780,7 +780,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -780,7 +780,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
// changed = true // changed = true
} }
err = EncodeAndSend(connNotify, pkt) err = neo.EncodeAndSend(connNotify, pkt)
if err != nil { if err != nil {
// XXX err // XXX err
} }
...@@ -863,7 +863,7 @@ func (m *Master) DriveStorage(ctx context.Context, link *neo.NodeLink) { ...@@ -863,7 +863,7 @@ func (m *Master) DriveStorage(ctx context.Context, link *neo.NodeLink) {
// // XXX timeout ? // // XXX timeout ?
// // XXX -> chat2 ? // // XXX -> chat2 ?
// err = EncodeAndSend(conn, &StartOperation{Backup: false /* XXX hardcoded */}) // err = neo.EncodeAndSend(conn, &StartOperation{Backup: false /* XXX hardcoded */})
// if err != nil { // if err != nil {
// // XXX err // // XXX err
// } // }
...@@ -1004,7 +1004,7 @@ func masterMain(argv []string) { ...@@ -1004,7 +1004,7 @@ func masterMain(argv []string) {
}() }()
*/ */
net := NetPlain("tcp") // TODO + TLS; not only "tcp" ? net := neo.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
err := ListenAndServe(ctx, net, bind, masterSrv) err := ListenAndServe(ctx, net, bind, masterSrv)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
......
...@@ -15,14 +15,13 @@ ...@@ -15,14 +15,13 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
package neo package server
// common parts for organizing network servers // common parts for organizing network servers
import ( import (
"context" "context"
"fmt" "fmt"
"net" "net"
"reflect"
"../../neo" "../../neo"
...@@ -69,7 +68,7 @@ func Serve(ctx context.Context, l net.Listener, srv Server) error { ...@@ -69,7 +68,7 @@ func Serve(ctx context.Context, l net.Listener, srv Server) error {
} }
go func() { go func() {
link, err := Handshake(ctx, peerConn, LinkServer) link, err := neo.Handshake(ctx, peerConn, neo.LinkServer)
if err != nil { if err != nil {
fmt.Printf("xxx: %s\n", err) fmt.Printf("xxx: %s\n", err)
return return
...@@ -113,8 +112,8 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req ...@@ -113,8 +112,8 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req
} }
}() }()
req := RequestIdentification{} req := neo.RequestIdentification{}
err = Expect(conn, &req) err = neo.Expect(conn, &req)
if err != nil { if err != nil {
return nodeInfo, err return nodeInfo, err
} }
...@@ -123,7 +122,7 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req ...@@ -123,7 +122,7 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req
// TODO hook here in logic to check identification request, assign nodeID etc // TODO hook here in logic to check identification request, assign nodeID etc
err = EncodeAndSend(conn, &AcceptIdentification{ err = neo.EncodeAndSend(conn, &neo.AcceptIdentification{
NodeType: myNodeType, NodeType: myNodeType,
MyNodeUUID: 0, // XXX MyNodeUUID: 0, // XXX
NumPartitions: 1, // XXX NumPartitions: 1, // XXX
...@@ -156,7 +155,7 @@ func IdentifyWith(expectPeerType neo.NodeType, link *neo.NodeLink, myInfo neo.No ...@@ -156,7 +155,7 @@ func IdentifyWith(expectPeerType neo.NodeType, link *neo.NodeLink, myInfo neo.No
}() }()
accept = &neo.AcceptIdentification{} accept = &neo.AcceptIdentification{}
err = Ask(conn, &RequestIdentification{ err = neo.Ask(conn, &neo.RequestIdentification{
NodeType: myInfo.NodeType, NodeType: myInfo.NodeType,
NodeUUID: myInfo.NodeUUID, NodeUUID: myInfo.NodeUUID,
Address: myInfo.Address, Address: myInfo.Address,
...@@ -174,121 +173,3 @@ func IdentifyWith(expectPeerType neo.NodeType, link *neo.NodeLink, myInfo neo.No ...@@ -174,121 +173,3 @@ func IdentifyWith(expectPeerType neo.NodeType, link *neo.NodeLink, myInfo neo.No
return accept, nil return accept, nil
} }
// ----------------------------------------
// XXX place = ok ? not ok -> move out of here
// XXX naming for RecvAndDecode and EncodeAndSend
// RecvAndDecode receives packet from conn and decodes it
func RecvAndDecode(conn *neo.Conn) (neo.Pkt, error) {
pkt, err := conn.Recv()
if err != nil {
return nil, err
}
// decode packet
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
msgType := pktTypeRegistry[msgCode]
if msgType == nil {
err = fmt.Errorf("invalid msgCode (%d)", msgCode)
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
}
// TODO use free-list for decoded packets + when possible decode in-place
pktObj := reflect.New(msgType).Interface().(neo.Pkt)
_, err = pktObj.NEOPktDecode(pkt.Payload())
if err != nil {
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
}
return pktObj, nil
}
// EncodeAndSend encodes pkt and sends it to conn
func EncodeAndSend(conn *neo.Conn, pkt neo.Pkt) error {
l := pkt.NEOPktEncodedLen()
buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist
h := buf.Header()
// h.ConnId will be set by conn.Send
h.MsgCode = hton16(pkt.NEOPktMsgCode())
h.MsgLen = hton32(uint32(l)) // XXX casting: think again
pkt.NEOPktEncode(buf.Payload())
return conn.Send(&buf) // XXX why pointer?
}
// Ask does simple request/response protocol exchange
// It expects the answer to be exactly of resp type and errors otherwise
func Ask(conn *neo.Conn, req neo.Pkt, resp neo.Pkt) error {
err := EncodeAndSend(conn, req)
if err != nil {
return err
}
err = Expect(conn, resp)
return err
}
// ProtoError is returned when there waa a protocol error, like receiving
// unexpected packet or packet with wrong header
// XXX -> ConnError{Op: "decode"} ?
type ProtoError struct {
Conn *neo.Conn
Err error
}
func (e *ProtoError) Error() string {
return fmt.Sprintf("%v: %v", e.Conn, e.Err)
}
// Expect receives 1 packet and expects it to be exactly of msg type
// XXX naming (-> Recv1 ?)
func Expect(conn *neo.Conn, msg neo.Pkt) (err error) {
pkt, err := conn.Recv()
if err != nil {
return err
}
// received ok. Now it is all decoding
// XXX dup wrt RecvAndDecode
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
if msgCode != msg.NEOPktMsgCode() {
// unexpected Error response
if msgCode == (&Error{}).NEOPktMsgCode() {
errResp := Error{}
_, err = errResp.NEOPktDecode(pkt.Payload())
if err != nil {
return &ProtoError{conn, err}
}
// FIXME clarify error decoding logic:
// - in some cases Error is one of "expected" answers (e.g. Ask(GetObject))
// - in other cases Error is completely not expected
// (e.g. getting 1st packet on connection)
return errDecode(&errResp) // XXX err ctx vs ^^^ errcontextf ?
}
msgType := pktTypeRegistry[msgCode]
if msgType == nil {
return &ProtoError{conn, fmt.Errorf("invalid msgCode (%d)", msgCode)}
}
return &ProtoError{conn, fmt.Errorf("unexpected packet: %v", msgType)}
}
_, err = msg.NEOPktDecode(pkt.Payload())
if err != nil {
return &ProtoError{conn, err}
}
return nil
}
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
package neo package server
// storage node // storage node
import ( import (
...@@ -54,13 +54,13 @@ type Storage struct { ...@@ -54,13 +54,13 @@ type Storage struct {
// To actually start running the node - call Run. XXX text // To actually start running the node - call Run. XXX text
func NewStorage(cluster string, masterAddr string, serveAddr string, net neo.Network, zstor zodb.IStorage) *Storage { func NewStorage(cluster string, masterAddr string, serveAddr string, net neo.Network, zstor zodb.IStorage) *Storage {
// convert serveAddr into neo format // convert serveAddr into neo format
addr, err := ParseAddress(serveAddr) addr, err := neo.ParseAddress(serveAddr)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
} }
stor := &Storage{ stor := &Storage{
myInfo: NodeInfo{NodeType: STORAGE, Address: addr}, myInfo: neo.NodeInfo{NodeType: neo.STORAGE, Address: addr},
clusterName: cluster, clusterName: cluster,
net: net, net: net,
masterAddr: masterAddr, masterAddr: masterAddr,
...@@ -87,7 +87,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -87,7 +87,7 @@ func (stor *Storage) Run(ctx context.Context) error {
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and // NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234 // listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty // -> host is never empty
addr, err := ParseAddress(l.Addr().String()) addr, err := neo.ParseAddress(l.Addr().String())
if err != nil { if err != nil {
// XXX -> panic here ? // XXX -> panic here ?
return err // XXX err ctx return err // XXX err ctx
...@@ -130,7 +130,7 @@ func (stor *Storage) talkMaster(ctx context.Context) { ...@@ -130,7 +130,7 @@ func (stor *Storage) talkMaster(ctx context.Context) {
// it returns error describing why such cycle had to finish // it returns error describing why such cycle had to finish
// XXX distinguish between temporary problems and non-temporary ones? // XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) error { func (stor *Storage) talkMaster1(ctx context.Context) error {
Mlink, err := Dial(ctx, stor.net, stor.masterAddr) Mlink, err := neo.Dial(ctx, stor.net, stor.masterAddr)
if err != nil { if err != nil {
return err return err
} }
...@@ -138,7 +138,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error { ...@@ -138,7 +138,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
// TODO Mlink.Close() on return / cancel // TODO Mlink.Close() on return / cancel
// request identification this way registering our node to master // request identification this way registering our node to master
accept, err := IdentifyWith(MASTER, Mlink, stor.myInfo, stor.clusterName) accept, err := IdentifyWith(neo.MASTER, Mlink, stor.myInfo, stor.clusterName)
if err != nil { if err != nil {
return err return err
} }
...@@ -161,7 +161,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error { ...@@ -161,7 +161,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
if err != nil { panic(err) } // XXX if err != nil { panic(err) } // XXX
for { for {
notify, err := RecvAndDecode(conn) notify, err := neo.RecvAndDecode(conn)
if err != nil { if err != nil {
// XXX TODO // XXX TODO
} }
...@@ -198,7 +198,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -198,7 +198,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
}() }()
// XXX recheck identification logic here // XXX recheck identification logic here
nodeInfo, err := IdentifyPeer(link, STORAGE) nodeInfo, err := IdentifyPeer(link, neo.STORAGE)
if err != nil { if err != nil {
fmt.Printf("stor: %v\n", err) fmt.Printf("stor: %v\n", err)
return return
...@@ -206,7 +206,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -206,7 +206,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
var serveConn func(context.Context, *neo.Conn) var serveConn func(context.Context, *neo.Conn)
switch nodeInfo.NodeType { switch nodeInfo.NodeType {
case CLIENT: case neo.CLIENT:
serveConn = stor.ServeClient serveConn = stor.ServeClient
default: default:
...@@ -231,36 +231,6 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -231,36 +231,6 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
} }
// XXX move err{Encode,Decode} out of here
// errEncode translates an error into Error packet
func errEncode(err error) *neo.Error {
switch err := err.(type) {
case *Error:
return err
case *zodb.ErrXidMissing:
// XXX abusing message for xid
return &neo.Error{Code: OID_NOT_FOUND, Message: err.Xid.String()}
default:
return &neo.Error{Code: NOT_READY /* XXX how to report 503? was BROKEN_NODE */, Message: err.Error()}
}
}
// errDecode decodes error from Error packet
func errDecode(e *neo.Error) error {
switch e.Code {
case OID_NOT_FOUND:
xid, err := zodb.ParseXid(e.Message) // XXX abusing message for xid
if err == nil {
return &zodb.ErrXidMissing{xid}
}
}
return e
}
func (stor *Storage) ServeMaster(ctx context.Context, conn *neo.Conn) { func (stor *Storage) ServeMaster(ctx context.Context, conn *neo.Conn) {
// state changes: // state changes:
...@@ -297,15 +267,15 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) { ...@@ -297,15 +267,15 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
}() }()
for { for {
req, err := RecvAndDecode(conn) req, err := neo.RecvAndDecode(conn)
if err != nil { if err != nil {
return // XXX log / err / send error before closing return // XXX log / err / send error before closing
} }
switch req := req.(type) { switch req := req.(type) {
case *GetObject: case *neo.GetObject:
xid := zodb.Xid{Oid: req.Oid} xid := zodb.Xid{Oid: req.Oid}
if req.Serial != INVALID_TID { if req.Serial != neo.INVALID_TID {
xid.Tid = req.Serial xid.Tid = req.Serial
xid.TidBefore = false xid.TidBefore = false
} else { } else {
...@@ -317,9 +287,9 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) { ...@@ -317,9 +287,9 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
data, tid, err := stor.zstor.Load(xid) data, tid, err := stor.zstor.Load(xid)
if err != nil { if err != nil {
// TODO translate err to NEO protocol error codes // TODO translate err to NEO protocol error codes
reply = errEncode(err) reply = neo.ErrEncode(err)
} else { } else {
reply = &AnswerGetObject{ reply = &neo.AnswerGetObject{
Oid: xid.Oid, Oid: xid.Oid,
Serial: tid, Serial: tid,
...@@ -332,19 +302,19 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) { ...@@ -332,19 +302,19 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
} }
} }
EncodeAndSend(conn, reply) // XXX err neo.EncodeAndSend(conn, reply) // XXX err
case *LastTransaction: case *neo.LastTransaction:
var reply neo.Pkt var reply neo.Pkt
lastTid, err := stor.zstor.LastTid() lastTid, err := stor.zstor.LastTid()
if err != nil { if err != nil {
reply = errEncode(err) reply = neo.ErrEncode(err)
} else { } else {
reply = &AnswerLastTransaction{lastTid} reply = &neo.AnswerLastTransaction{lastTid}
} }
EncodeAndSend(conn, reply) // XXX err neo.EncodeAndSend(conn, reply) // XXX err
//case *ObjectHistory: //case *ObjectHistory:
//case *StoreObject: //case *StoreObject:
...@@ -411,7 +381,7 @@ func storageMain(argv []string) { ...@@ -411,7 +381,7 @@ func storageMain(argv []string) {
log.Fatal(err) log.Fatal(err)
} }
net := NetPlain("tcp") // TODO + TLS; not only "tcp" ? net := neo.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storSrv := NewStorage(*cluster, master, *bind, net, zstor) storSrv := NewStorage(*cluster, master, *bind, net, zstor)
......
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
package neo package server
// time related utilities // time related utilities
// XXX -> neo ?
import ( import (
"time" "time"
......
package neo
// TODO move me properly -> connection.go
import (
"fmt"
"reflect"
"../zodb"
)
// RecvAndDecode receives packet from conn and decodes it
func RecvAndDecode(conn *Conn) (Pkt, error) {
pkt, err := conn.Recv()
if err != nil {
return nil, err
}
// decode packet
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
msgType := pktTypeRegistry[msgCode]
if msgType == nil {
err = fmt.Errorf("invalid msgCode (%d)", msgCode)
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
}
// TODO use free-list for decoded packets + when possible decode in-place
pktObj := reflect.New(msgType).Interface().(Pkt)
_, err = pktObj.NEOPktDecode(pkt.Payload())
if err != nil {
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
}
return pktObj, nil
}
// EncodeAndSend encodes pkt and sends it to conn
func EncodeAndSend(conn *Conn, pkt Pkt) error {
l := pkt.NEOPktEncodedLen()
buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist
h := buf.Header()
// h.ConnId will be set by conn.Send
h.MsgCode = hton16(pkt.NEOPktMsgCode())
h.MsgLen = hton32(uint32(l)) // XXX casting: think again
pkt.NEOPktEncode(buf.Payload())
return conn.Send(&buf) // XXX why pointer?
}
// Ask does simple request/response protocol exchange
// It expects the answer to be exactly of resp type and errors otherwise
func Ask(conn *Conn, req Pkt, resp Pkt) error {
err := EncodeAndSend(conn, req)
if err != nil {
return err
}
err = Expect(conn, resp)
return err
}
// ProtoError is returned when there waa a protocol error, like receiving
// unexpected packet or packet with wrong header
// XXX -> ConnError{Op: "decode"} ?
type ProtoError struct {
Conn *Conn
Err error
}
func (e *ProtoError) Error() string {
return fmt.Sprintf("%v: %v", e.Conn, e.Err)
}
// Expect receives 1 packet and expects it to be exactly of msg type
// XXX naming (-> Recv1 ?)
func Expect(conn *Conn, msg Pkt) (err error) {
pkt, err := conn.Recv()
if err != nil {
return err
}
// received ok. Now it is all decoding
// XXX dup wrt RecvAndDecode
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
if msgCode != msg.NEOPktMsgCode() {
// unexpected Error response
if msgCode == (&Error{}).NEOPktMsgCode() {
errResp := Error{}
_, err = errResp.NEOPktDecode(pkt.Payload())
if err != nil {
return &ProtoError{conn, err}
}
// FIXME clarify error decoding logic:
// - in some cases Error is one of "expected" answers (e.g. Ask(GetObject))
// - in other cases Error is completely not expected
// (e.g. getting 1st packet on connection)
return ErrDecode(&errResp) // XXX err ctx vs ^^^ errcontextf ?
}
msgType := pktTypeRegistry[msgCode]
if msgType == nil {
return &ProtoError{conn, fmt.Errorf("invalid msgCode (%d)", msgCode)}
}
return &ProtoError{conn, fmt.Errorf("unexpected packet: %v", msgType)}
}
_, err = msg.NEOPktDecode(pkt.Payload())
if err != nil {
return &ProtoError{conn, err}
}
return nil
}
// ------------------------------------------
// XXX vvv place = ok ?
// errEncode translates an error into Error packet
// XXX more text describing relation with zodb errors
func ErrEncode(err error) *Error {
switch err := err.(type) {
case *Error:
return err
case *zodb.ErrXidMissing:
// XXX abusing message for xid
return &Error{Code: OID_NOT_FOUND, Message: err.Xid.String()}
default:
return &Error{Code: NOT_READY /* XXX how to report 503? was BROKEN_NODE */, Message: err.Error()}
}
}
// errDecode decodes error from Error packet
// XXX more text describing relation with zodb errors
func ErrDecode(e *Error) error {
switch e.Code {
case OID_NOT_FOUND:
xid, err := zodb.ParseXid(e.Message) // XXX abusing message for xid
if err == nil {
return &zodb.ErrXidMissing{xid}
}
}
return e
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment