Commit 6c7ce736 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c818ce8a
...@@ -106,6 +106,13 @@ type NEODecoder interface { ...@@ -106,6 +106,13 @@ type NEODecoder interface {
NEODecode(data []byte) (nread int, err error) NEODecode(data []byte) (nread int, err error)
} }
// NEOCodec is interface combining NEOEncoder & NEODecoder
// in particular it covers all NEO packets
type NEOCodec interface {
NEOEncoder
NEODecoder
}
type Address struct { type Address struct {
Host string Host string
Port uint16 Port uint16
......
...@@ -64,12 +64,6 @@ func TestPktHeader(t *testing.T) { ...@@ -64,12 +64,6 @@ func TestPktHeader(t *testing.T) {
} }
} }
// XXX move me out of here?
type NEOCodec interface {
NEOEncoder
NEODecoder
}
// test marshalling for one packet type // test marshalling for one packet type
func testPktMarshal(t *testing.T, pkt NEOCodec, encoded string) { func testPktMarshal(t *testing.T, pkt NEOCodec, encoded string) {
typ := reflect.TypeOf(pkt).Elem() // type of *pkt typ := reflect.TypeOf(pkt).Elem() // type of *pkt
......
...@@ -21,6 +21,7 @@ package neo ...@@ -21,6 +21,7 @@ package neo
import ( import (
"context" "context"
"fmt" "fmt"
"reflect"
) )
// Server is an interface that represents networked server // Server is an interface that represents networked server
...@@ -137,15 +138,30 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo ...@@ -137,15 +138,30 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo
// XXX place = ok ? not ok -> move out of here // XXX place = ok ? not ok -> move out of here
// XXX naming for RecvAndDecode and EncodeAndSend // XXX naming for RecvAndDecode and EncodeAndSend
// RecvAndDecode receivs packet from conn and decodes it // RecvAndDecode receives packet from conn and decodes it
func RecvAndDecode(conn *Conn) (interface{}, error) { // XXX interface{} -> NEOEncoder ? func RecvAndDecode(conn *Conn) (NEOEncoder, error) { // XXX NEOEncoder -> interface{}
pkt, err := conn.Recv() pkt, err := conn.Recv()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO decode pkt // decode packet
return pkt, nil // XXX maybe better generate switch on msgCode instead of reflect
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
msgType := pktTypeRegistry[msgCode]
if msgType == nil {
return nil, fmt.Errorf("invalid msgCode (%d)", msgCode) // XXX err context
}
// TODO use free-list for decoded packets + when possible decode in-place
pktObj := reflect.New(msgType).Interface().(NEOCodec)
_, err = pktObj.NEODecode(pkt.Payload())
if err != nil {
return nil, err // XXX err ctx ?
}
return pktObj, nil
} }
// EncodeAndSend encodes pkt and send it to conn // EncodeAndSend encodes pkt and send it to conn
......
...@@ -58,6 +58,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -58,6 +58,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
// XXX tell peers we are shutting down? // XXX tell peers we are shutting down?
case <-retch: case <-retch:
} }
fmt.Printf("stor: closing link to %s\n", link.peerLink.RemoteAddr())
link.Close() // XXX err link.Close() // XXX err
}() }()
...@@ -92,9 +93,15 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -92,9 +93,15 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
} }
// connAddr returns string describing conn XXX text, naming
func connAddr(conn *Conn) string {
return fmt.Sprintf("%s .%d", conn.nodeLink.peerLink.RemoteAddr(), conn.connId)
}
// ServeClient serves incoming connection on which peer identified itself as client // ServeClient serves incoming connection on which peer identified itself as client
func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
fmt.Printf("stor: serving new client conn %s\n", connAddr(conn)
// close connection when either cancelling or returning (e.g. due to an error) // close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to // ( when cancelling - conn.Close will signal to current IO to
// terminate with an error ) // terminate with an error )
...@@ -107,6 +114,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -107,6 +114,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
// XXX tell client we are shutting down? // XXX tell client we are shutting down?
case <-retch: case <-retch:
} }
fmt.Printf("stor: closing client conn %s\n", connAddr(conn))
conn.Close() // XXX err conn.Close() // XXX err
}() }()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment