Commit b50f771e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ef72a476
......@@ -240,7 +240,7 @@ type RequestIdentification struct {
ProtocolVersion uint32 // TODO py.PProtocol upon decoding checks for != PROTOCOL_VERSION
NodeType NodeType // XXX name
UUID UUID
Address // where requesting node is also accepting connections
Address Address // where requesting node is also accepting connections
Name string
IdTimestamp float64
}
......
// TODO copyright / license
// Copyright (C) 2016-2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
package neo
// common parts for organizing network servers
import (
"context"
......@@ -14,13 +30,13 @@ type Server interface {
ServeLink(ctx context.Context, link *NodeLink)
}
// Run service on a listener
// Serve runs service on a listener
// - accept incoming connection on the listener
// - for every accepted connection spawn srv.ServeLink() in separate goroutine.
//
// the listener is closed when Serve returns.
func Serve(ctx context.Context, l *Listener, srv Server) error {
fmt.Printf("stor: serving on %s ...\n", l.Addr()) // XXX 'stor' -> generic
fmt.Printf("xxx: serving on %s ...\n", l.Addr()) // XXX 'xxx' -> ?
// close listener when either cancelling or returning (e.g. due to an error)
// ( when cancelling - listener close will signal to all accepts to
......@@ -48,8 +64,7 @@ func Serve(ctx context.Context, l *Listener, srv Server) error {
}
}
// TODO text
// XXX move -> generic place ?
// ListenAndServe listens on network address and then calls Serve to handle incoming connections
// XXX split -> separate Listen() & Serve()
func ListenAndServe(ctx context.Context, net_, laddr string, srv Server) error {
l, err := Listen(net_, laddr)
......@@ -60,3 +75,90 @@ func ListenAndServe(ctx context.Context, net_, laddr string, srv Server) error {
// TODO if TLS config -> tls.NewListener()
return Serve(ctx, l, srv)
}
// ----------------------------------------
// Identify identifies peer on the link
// it expects peer to send RequestIdentification packet and TODO
func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) {
nodeInfo := RequestIdentification{}
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
if err != nil {
return nodeInfo, err // XXX err ctx
}
defer func() {
err2 := conn.Close()
if err == nil {
err = err2
// XXX also clear nodeInfo ?
}
}()
pkt, err := RecvAndDecode(conn)
if err != nil {
return nodeInfo, err // XXX err ctx
}
switch pkt := pkt.(type) {
default:
return nodeInfo, fmt.Errorf("expected RequestIdentification ; got %T", pkt)
case *RequestIdentification:
if pkt.ProtocolVersion != PROTOCOL_VERSION {
// TODO also tell peer with Error
return nodeInfo, fmt.Errorf("protocol version mismatch: peer = %d ; our side = %d", pkt.ProtocolVersion, PROTOCOL_VERSION)
}
err = EncodeAndSend(conn, &AcceptIdentification{
NodeType: pkt.NodeType,
MyUUID: 0, // XXX
NumPartitions: 0, // XXX
NumReplicas: 0, // XXX
YourUUID: pkt.UUID,
Primary: Address{}, // XXX
//KnownMasterList: // XXX
})
if err != nil {
return nodeInfo, err
}
nodeInfo = *pkt
}
return nodeInfo, nil
}
// ----------------------------------------
// XXX place = ok ? not ok -> move out of here
// XXX naming for RecvAndDecode and EncodeAndSend
// RecvAndDecode receivs packet from conn and decodes it
func RecvAndDecode(conn *Conn) (interface{}, error) { // XXX interface{} -> NEOEncoder ?
pkt, err := conn.Recv()
if err != nil {
return nil, err
}
// TODO decode pkt
return pkt, nil
}
// EncodeAndSend encodes pkt and send it to conn
func EncodeAndSend(conn *Conn, pkt NEOEncoder) error {
msgCode, l := pkt.NEOEncodedInfo()
l += PktHeadLen
buf := PktBuf{make([]byte, l)} // XXX -> freelist
h := buf.Header()
// h.ConnId will be set by conn.Send
h.MsgCode = hton16(msgCode)
h.Len = hton32(uint32(l)) // XXX casting: think again
pkt.NEOEncode(buf.Payload())
return conn.Send(&buf) // XXX why pointer?
}
......@@ -26,13 +26,13 @@ import (
"log"
"os"
//"time"
"../zodb"
"../zodb/storage/fs1"
)
// NEO Storage application
// XXX naming
// Storage is NEO storage server application
type Storage struct {
zstor zodb.IStorage // underlying ZODB storage XXX temp ?
}
......@@ -42,16 +42,31 @@ func NewStorage(zstor zodb.IStorage) *Storage {
}
/*
// XXX change to bytes.Buffer if we need to access it as I/O
type Buffer struct {
buf []byte
}
*/
// ServeLink serves incoming node-node link connection
func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
fmt.Printf("stor: serving new node %s <-> %s\n", link.peerLink.LocalAddr(), link.peerLink.RemoteAddr())
// close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to
// terminate with an error )
// XXX dup -> utility
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
select {
case <-ctx.Done():
// XXX tell peers we are shutting down?
case <-retch:
}
link.Close() // XXX err
}()
nodeInfo, err := Identify(link)
if err != nil {
fmt.Printf("stor: peer identification failed: %v\n", err)
return
}
/*
pktri, err := expect(RequestIdentification)
if err != nil {
......@@ -70,79 +85,17 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
// TODO mark link as identified
pkt, err := recv()
if err != nil {
err
return
}
switch pkt.MsgCode {
case GetObject:
req := GetObject{}
err = req.NEODecode(pkt.Payload())
if err != nil {
sendErr("malformed GetObject packet:", err)
}
-> DM.getObject(req.Oid, req.Serial, req.Tid)
case StoreObject:
case StoreTransaction:
}
*/
//fmt.Fprintf(conn, "Hello up there, you address is %s\n", conn.RemoteAddr()) // XXX err
//conn.Close() // XXX err
/*
// TODO: use bytes.Buffer{}
// .Bytes() -> buf -> can grow up again up to buf[:cap(buf)]
// NewBuffer(buf) -> can use same buffer for later reading via bytes.Buffer
// TODO read PktHeader (fixed length) (-> length, PktType (by .code))
//rxbuf := bytes.Buffer{}
rxbuf := bytes.NewBuffer(make([]byte, 4096))
n, err := conn.Read(rxbuf.Bytes())
*/
//recvPkt()
}
// XXX naming for RecvAndDecode and EncodeAndSend
// XXX stub
// XXX move me out of here
func RecvAndDecode(conn *Conn) (interface{}, error) { // XXX interface{} -> NEODecoder ?
pkt, err := conn.Recv()
if err != nil {
return nil, err
}
// TODO decode pkt
return pkt, nil
}
func EncodeAndSend(conn *Conn, pkt NEOEncoder) error {
// TODO encode pkt
msgCode, l := pkt.NEOEncodedInfo()
l += PktHeadLen
buf := PktBuf{make([]byte, l)} // XXX -> freelist
h := buf.Header()
h.MsgCode = hton16(msgCode)
h.Len = hton32(uint32(l)) // XXX casting: think again
pkt.NEOEncode(buf.Payload())
return conn.Send(&buf) // XXX why pointer?
}
// ServeClient serves incoming connection on which peer identified itself as client
func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
// close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to
// terminate with an error )
// XXX dup -> utility
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
......@@ -247,24 +200,25 @@ func storageMain(argv []string) {
os.Exit(2)
}
// XXX hack
// XXX hack to use existing zodb storage for data
zstor, err := fs1.Open(argv[0])
if err != nil {
log.Fatal(err)
}
storsrv := NewStorage(zstor)
storSrv := NewStorage(zstor)
ctx := context.Background()
/*
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)
cancel()
}()
*/
ctx := context.Background()
err = ListenAndServe(ctx, "tcp", bind, storsrv) // XXX hardcoded
// TODO + TLS
err = ListenAndServe(ctx, "tcp", bind, storSrv) // XXX "tcp" hardcoded
if err != nil {
log.Fatal(err)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment