Commit 8dfe0341 authored by Kirill Smelkov's avatar Kirill Smelkov

X neonet: unexport PktBuf

not used outside neonet.
parent 889b8d42
...@@ -120,7 +120,7 @@ type Conn struct { ...@@ -120,7 +120,7 @@ type Conn struct {
link *NodeLink link *NodeLink
connId uint32 connId uint32
rxq chan *PktBuf // received packets for this Conn go here rxq chan *pktBuf // received packets for this Conn go here
rxqWrite atomic32 // 1 while serveRecv is doing `rxq <- ...` rxqWrite atomic32 // 1 while serveRecv is doing `rxq <- ...`
rxqRead atomic32 // +1 while Conn.Recv is doing `... <- rxq` rxqRead atomic32 // +1 while Conn.Recv is doing `... <- rxq`
rxdownFlag atomic32 // 1 when RX is marked no longer operational rxdownFlag atomic32 // 1 when RX is marked no longer operational
...@@ -234,7 +234,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -234,7 +234,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
// XXX make it per-link? // XXX make it per-link?
var connPool = sync.Pool{New: func() interface{} { var connPool = sync.Pool{New: func() interface{} {
return &Conn{ return &Conn{
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf ? rxq: make(chan *pktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf ?
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
txdown: make(chan struct{}), txdown: make(chan struct{}),
// rxdown: make(chan struct{}), // rxdown: make(chan struct{}),
...@@ -681,7 +681,7 @@ func (c *Conn) errRecvShutdown() error { ...@@ -681,7 +681,7 @@ func (c *Conn) errRecvShutdown() error {
} }
// recvPkt receives raw packet from connection // recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*PktBuf, error) { func (c *Conn) recvPkt() (*pktBuf, error) {
// semantically equivalent to the following: // semantically equivalent to the following:
// (this is hot path and select is not used for performance reason) // (this is hot path and select is not used for performance reason)
// //
...@@ -693,7 +693,7 @@ func (c *Conn) recvPkt() (*PktBuf, error) { ...@@ -693,7 +693,7 @@ func (c *Conn) recvPkt() (*PktBuf, error) {
// return pkt, nil // return pkt, nil
// } // }
var pkt *PktBuf var pkt *pktBuf
var err error var err error
c.rxqRead.Add(1) c.rxqRead.Add(1)
...@@ -943,7 +943,7 @@ func (link *NodeLink) replyNoConn(connId uint32, errMsg proto.Msg) { ...@@ -943,7 +943,7 @@ func (link *NodeLink) replyNoConn(connId uint32, errMsg proto.Msg) {
// txReq is request to transmit a packet. Result error goes back to errch // txReq is request to transmit a packet. Result error goes back to errch
type txReq struct { type txReq struct {
pkt *PktBuf pkt *pktBuf
errch chan error errch chan error
} }
...@@ -968,12 +968,12 @@ func (c *Conn) errSendShutdown() error { ...@@ -968,12 +968,12 @@ func (c *Conn) errSendShutdown() error {
// sendPkt sends raw packet via connection. // sendPkt sends raw packet via connection.
// //
// on success pkt is freed. // on success pkt is freed.
func (c *Conn) sendPkt(pkt *PktBuf) error { func (c *Conn) sendPkt(pkt *pktBuf) error {
err := c.sendPkt2(pkt) err := c.sendPkt2(pkt)
return c.err("send", err) return c.err("send", err)
} }
func (c *Conn) sendPkt2(pkt *PktBuf) error { func (c *Conn) sendPkt2(pkt *pktBuf) error {
// connId must be set to one associated with this connection // connId must be set to one associated with this connection
if pkt.Header().ConnId != packed.Hton32(c.connId) { if pkt.Header().ConnId != packed.Hton32(c.connId) {
panic("Conn.sendPkt: connId wrong") panic("Conn.sendPkt: connId wrong")
...@@ -1058,7 +1058,7 @@ func (nl *NodeLink) serveSend() { ...@@ -1058,7 +1058,7 @@ func (nl *NodeLink) serveSend() {
// however this adds overhead and is not needed in light mode. // however this adds overhead and is not needed in light mode.
// sendPktDirect sends raw packet with appropriate connection ID directly via link. // sendPktDirect sends raw packet with appropriate connection ID directly via link.
func (c *Conn) sendPktDirect(pkt *PktBuf) error { func (c *Conn) sendPktDirect(pkt *pktBuf) error {
// set pkt connId associated with this connection // set pkt connId associated with this connection
pkt.Header().ConnId = packed.Hton32(c.connId) pkt.Header().ConnId = packed.Hton32(c.connId)
...@@ -1088,7 +1088,7 @@ const dumpio = false ...@@ -1088,7 +1088,7 @@ const dumpio = false
// tx error, if any, is returned as is and is analyzed in serveSend. // tx error, if any, is returned as is and is analyzed in serveSend.
// //
// XXX pkt should be freed always or only on error? // XXX pkt should be freed always or only on error?
func (nl *NodeLink) sendPkt(pkt *PktBuf) error { func (nl *NodeLink) sendPkt(pkt *pktBuf) error {
if dumpio { if dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
...@@ -1106,7 +1106,7 @@ var ErrPktTooBig = errors.New("packet too big") ...@@ -1106,7 +1106,7 @@ var ErrPktTooBig = errors.New("packet too big")
// recvPkt receives raw packet from peer. // recvPkt receives raw packet from peer.
// //
// rx error, if any, is returned as is and is analyzed in serveRecv // rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) { func (nl *NodeLink) recvPkt() (*pktBuf, error) {
pkt := pktAlloc(4096) pkt := pktAlloc(4096)
// len=4K but cap can be more since pkt is from pool - use all space to buffer reads // len=4K but cap can be more since pkt is from pool - use all space to buffer reads
// XXX vvv -> pktAlloc() ? // XXX vvv -> pktAlloc() ?
...@@ -1462,8 +1462,8 @@ func (c *Conn) err(op string, e error) error { ...@@ -1462,8 +1462,8 @@ func (c *Conn) err(op string, e error) error {
//trace:event traceMsgSendPre(l *NodeLink, connId uint32, msg proto.Msg) //trace:event traceMsgSendPre(l *NodeLink, connId uint32, msg proto.Msg)
// XXX do we also need traceConnSend? // XXX do we also need traceConnSend?
// msgPack allocates PktBuf and encodes msg into it. // msgPack allocates pktBuf and encodes msg into it.
func msgPack(connId uint32, msg proto.Msg) *PktBuf { func msgPack(connId uint32, msg proto.Msg) *pktBuf {
l := msg.NEOMsgEncodedLen() l := msg.NEOMsgEncodedLen()
buf := pktAlloc(proto.PktHeaderLen+l) buf := pktAlloc(proto.PktHeaderLen+l)
...@@ -1491,7 +1491,7 @@ func (c *Conn) Recv() (proto.Msg, error) { ...@@ -1491,7 +1491,7 @@ func (c *Conn) Recv() (proto.Msg, error) {
return msg, err return msg, err
} }
func (c *Conn) _Recv(pkt *PktBuf) (proto.Msg, error) { func (c *Conn) _Recv(pkt *pktBuf) (proto.Msg, error) {
// decode packet // decode packet
pkth := pkt.Header() pkth := pkt.Header()
msgCode := packed.Ntoh16(pkth.MsgCode) msgCode := packed.Ntoh16(pkth.MsgCode)
...@@ -1562,7 +1562,7 @@ func (c *Conn) Expect(msgv ...proto.Msg) (which int, err error) { ...@@ -1562,7 +1562,7 @@ func (c *Conn) Expect(msgv ...proto.Msg) (which int, err error) {
return which, err return which, err
} }
func (c *Conn) _Expect(pkt *PktBuf, msgv ...proto.Msg) (int, error) { func (c *Conn) _Expect(pkt *pktBuf, msgv ...proto.Msg) (int, error) {
pkth := pkt.Header() pkth := pkt.Header()
msgCode := packed.Ntoh16(pkth.MsgCode) msgCode := packed.Ntoh16(pkth.MsgCode)
......
...@@ -60,12 +60,12 @@ func xaccept(nl *NodeLink) *Conn { ...@@ -60,12 +60,12 @@ func xaccept(nl *NodeLink) *Conn {
return c return c
} }
func xsendPkt(c interface { sendPkt(*PktBuf) error }, pkt *PktBuf) { func xsendPkt(c interface { sendPkt(*pktBuf) error }, pkt *pktBuf) {
err := c.sendPkt(pkt) err := c.sendPkt(pkt)
exc.Raiseif(err) exc.Raiseif(err)
} }
func xrecvPkt(c interface { recvPkt() (*PktBuf, error) }) *PktBuf { func xrecvPkt(c interface { recvPkt() (*pktBuf, error) }) *pktBuf {
pkt, err := c.recvPkt() pkt, err := c.recvPkt()
exc.Raiseif(err) exc.Raiseif(err)
return pkt return pkt
...@@ -103,9 +103,9 @@ func xconnError(err error) error { ...@@ -103,9 +103,9 @@ func xconnError(err error) error {
return ce.Err return ce.Err
} }
// Prepare PktBuf with content // Prepare pktBuf with content
func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { func _mkpkt(connid uint32, msgcode uint16, payload []byte) *pktBuf {
pkt := &PktBuf{make([]byte, proto.PktHeaderLen + len(payload))} pkt := &pktBuf{make([]byte, proto.PktHeaderLen + len(payload))}
h := pkt.Header() h := pkt.Header()
h.ConnId = packed.Hton32(connid) h.ConnId = packed.Hton32(connid)
h.MsgCode = packed.Hton16(msgcode) h.MsgCode = packed.Hton16(msgcode)
...@@ -114,13 +114,13 @@ func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { ...@@ -114,13 +114,13 @@ func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
return pkt return pkt
} }
func (c *Conn) mkpkt(msgcode uint16, payload []byte) *PktBuf { func (c *Conn) mkpkt(msgcode uint16, payload []byte) *pktBuf {
// in Conn exchange connid is automatically set by Conn.sendPkt // in Conn exchange connid is automatically set by Conn.sendPkt
return _mkpkt(c.connId, msgcode, payload) return _mkpkt(c.connId, msgcode, payload)
} }
// Verify PktBuf is as expected // Verify pktBuf is as expected
func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) { func xverifyPkt(pkt *pktBuf, connid uint32, msgcode uint16, payload []byte) {
errv := xerr.Errorv{} errv := xerr.Errorv{}
h := pkt.Header() h := pkt.Header()
// TODO include caller location // TODO include caller location
...@@ -141,8 +141,8 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) { ...@@ -141,8 +141,8 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
exc.Raiseif( errv.Err() ) exc.Raiseif( errv.Err() )
} }
// Verify PktBuf to match expected message // Verify pktBuf to match expected message
func xverifyPktMsg(pkt *PktBuf, connid uint32, msg proto.Msg) { func xverifyPktMsg(pkt *pktBuf, connid uint32, msg proto.Msg) {
data := make([]byte, msg.NEOMsgEncodedLen()) data := make([]byte, msg.NEOMsgEncodedLen())
msg.NEOMsgEncode(data) msg.NEOMsgEncode(data)
xverifyPkt(pkt, connid, msg.NEOMsgCode(), data) xverifyPkt(pkt, connid, msg.NEOMsgCode(), data)
...@@ -201,7 +201,7 @@ func TestNodeLink(t *testing.T) { ...@@ -201,7 +201,7 @@ func TestNodeLink(t *testing.T) {
tdelay() tdelay()
xclose(nl1) xclose(nl1)
}) })
pkt = &PktBuf{[]byte("data")} pkt = &pktBuf{[]byte("data")}
err = nl1.sendPkt(pkt) err = nl1.sendPkt(pkt)
if err != io.ErrClosedPipe { if err != io.ErrClosedPipe {
t.Fatalf("NodeLink.sendPkt() after close: err = %v", err) t.Fatalf("NodeLink.sendPkt() after close: err = %v", err)
...@@ -259,7 +259,7 @@ func TestNodeLink(t *testing.T) { ...@@ -259,7 +259,7 @@ func TestNodeLink(t *testing.T) {
tdelay() tdelay()
xclose(nl2) xclose(nl2)
}) })
pkt = &PktBuf{[]byte("data")} pkt = &pktBuf{[]byte("data")}
err = nl1.sendPkt(pkt) err = nl1.sendPkt(pkt)
if err != io.ErrClosedPipe { // NOTE io.ErrClosedPipe on Write per io.Pipe if err != io.ErrClosedPipe { // NOTE io.ErrClosedPipe on Write per io.Pipe
t.Fatalf("NodeLink.sendPkt() after peer shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("NodeLink.sendPkt() after peer shutdown: pkt = %v err = %v", pkt, err)
......
...@@ -32,50 +32,48 @@ import ( ...@@ -32,50 +32,48 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/packed" "lab.nexedi.com/kirr/neo/go/xcommon/packed"
) )
// PktBuf is a buffer with full raw packet (header + data). // pktBuf is a buffer with full raw packet (header + data).
// //
// variables of type PktBuf are usually named "pkb" (packet buffer), similar to "skb" in Linux. // Allocate pktBuf via pktAlloc() and free via pktBuf.Free().
// type pktBuf struct {
// Allocate PktBuf via pktAlloc() and free via PktBuf.Free().
type PktBuf struct {
Data []byte // whole packet data including all headers Data []byte // whole packet data including all headers
} }
// Header returns pointer to packet header. // Header returns pointer to packet header.
func (pkt *PktBuf) Header() *proto.PktHeader { func (pkt *pktBuf) Header() *proto.PktHeader {
// XXX check len(Data) < PktHeader ? -> no, Data has to be allocated with cap >= PktHeaderLen // XXX check len(Data) < PktHeader ? -> no, Data has to be allocated with cap >= PktHeaderLen
return (*proto.PktHeader)(unsafe.Pointer(&pkt.Data[0])) return (*proto.PktHeader)(unsafe.Pointer(&pkt.Data[0]))
} }
// Payload returns []byte representing packet payload. // Payload returns []byte representing packet payload.
func (pkt *PktBuf) Payload() []byte { func (pkt *pktBuf) Payload() []byte {
return pkt.Data[proto.PktHeaderLen:] return pkt.Data[proto.PktHeaderLen:]
} }
// ---- PktBuf freelist ---- // ---- pktBuf freelist ----
// pktBufPool is sync.Pool<pktBuf> // pktBufPool is sync.Pool<pktBuf>
var pktBufPool = sync.Pool{New: func() interface{} { var pktBufPool = sync.Pool{New: func() interface{} {
return &PktBuf{Data: make([]byte, 0, 4096)} return &pktBuf{Data: make([]byte, 0, 4096)}
}} }}
// pktAlloc allocates PktBuf with len=n // pktAlloc allocates pktBuf with len=n
func pktAlloc(n int) *PktBuf { func pktAlloc(n int) *pktBuf {
pkt := pktBufPool.Get().(*PktBuf) pkt := pktBufPool.Get().(*pktBuf)
pkt.Data = xbytes.Realloc(pkt.Data, n) pkt.Data = xbytes.Realloc(pkt.Data, n)
return pkt return pkt
} }
// Free marks pkt as no longer needed. // Free marks pkt as no longer needed.
func (pkt *PktBuf) Free() { func (pkt *pktBuf) Free() {
pktBufPool.Put(pkt) pktBufPool.Put(pkt)
} }
// ---- PktBuf dump ---- // ---- pktBuf dump ----
// Strings dumps a packet in human-readable form // Strings dumps a packet in human-readable form
func (pkt *PktBuf) String() string { func (pkt *pktBuf) String() string {
if len(pkt.Data) < proto.PktHeaderLen { if len(pkt.Data) < proto.PktHeaderLen {
return fmt.Sprintf("(! < PktHeaderLen) % x", pkt.Data) return fmt.Sprintf("(! < PktHeaderLen) % x", pkt.Data)
} }
...@@ -110,7 +108,7 @@ func (pkt *PktBuf) String() string { ...@@ -110,7 +108,7 @@ func (pkt *PktBuf) String() string {
} }
// Dump dumps a packet in raw form // Dump dumps a packet in raw form
func (pkt *PktBuf) Dump() string { func (pkt *pktBuf) Dump() string {
if len(pkt.Data) < proto.PktHeaderLen { if len(pkt.Data) < proto.PktHeaderLen {
return fmt.Sprintf("(! < pktHeaderLen) % x", pkt.Data) return fmt.Sprintf("(! < pktHeaderLen) % x", pkt.Data)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment