Commit 16822c52 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e4023e90
...@@ -5,23 +5,47 @@ ...@@ -5,23 +5,47 @@
package main package main
import ( import (
"context"
"flag"
"fmt"
"log"
"os"
//_ "../../storage" // XXX rel ok? //_ "../../storage" // XXX rel ok?
neo "../.." neo "../.."
"fmt"
"context" zodb "../../zodb"
_ "time" _ "../../zodb/wks"
) )
// TODO options: // TODO options:
// cluster, masterv, bind ... // cluster, masterv, bind ...
func usage() {
fmt.Fprintf(os.Stderr,
`neostorage runs one NEO storage server.
Usage: neostorage [options] zstor XXX
`)
}
func main() { func main() {
// var t neo.Tid = neo.MAX_TID flag.Usage = usage
// fmt.Printf("%T %x\n", t, t) flag.Parse()
// println("TODO") argv := flag.Args()
if len(argv) == 0 {
usage()
os.Exit(2)
}
zstor, err := zodb.OpenStorageURL(argv[0])
if err != nil {
log.Fatal(err)
}
storsrv := &neo.StorageApplication{} storsrv := neo.NewStorage(zstor)
/* /*
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
...@@ -32,6 +56,8 @@ func main() { ...@@ -32,6 +56,8 @@ func main() {
*/ */
ctx := context.Background() ctx := context.Background()
err := neo.ListenAndServe(ctx, "tcp", "localhost:1234", storsrv) err = neo.ListenAndServe(ctx, "tcp", "localhost:1234", storsrv)
fmt.Println(err) if err != nil {
log.Fatal(err)
}
} }
...@@ -275,6 +275,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -275,6 +275,7 @@ func (nl *NodeLink) serveRecv() {
if handleNewConn != nil { if handleNewConn != nil {
// TODO avoid spawning goroutine for each new Ask request - // TODO avoid spawning goroutine for each new Ask request -
// - by keeping pool of read inactive goroutine / conn pool ? // - by keeping pool of read inactive goroutine / conn pool ?
// XXX rework interface for this to be Accept-like ?
go func() { go func() {
nl.handleWg.Add(1) nl.handleWg.Add(1)
defer nl.handleWg.Done() defer nl.handleWg.Done()
...@@ -397,7 +398,7 @@ func (c *Conn) close() { ...@@ -397,7 +398,7 @@ func (c *Conn) close() {
}) })
} }
// Close connection // Close closes connection
// Any blocked Send() or Recv() will be unblocked and return error // Any blocked Send() or Recv() will be unblocked and return error
// //
// NOTE for Send() - once transmission was started - it will complete in the // NOTE for Send() - once transmission was started - it will complete in the
......
...@@ -29,6 +29,10 @@ type Storage struct { ...@@ -29,6 +29,10 @@ type Storage struct {
zstor zodb.IStorage // underlying ZODB storage XXX temp ? zstor zodb.IStorage // underlying ZODB storage XXX temp ?
} }
func NewStorage(zstor zodb.IStorage) *Storage {
return &Storage{zstor}
}
/* /*
// XXX change to bytes.Buffer if we need to access it as I/O // XXX change to bytes.Buffer if we need to access it as I/O
...@@ -98,13 +102,11 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -98,13 +102,11 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
} }
type StorageClientHandler struct { // XXX naming for RecvAndDecode and EncodeAndSend
stor *Storage
}
// XXX stub // XXX stub
// XXX move me out of here // XXX move me out of here
func RecvAndDecode(conn *Conn) (interface{}, error) { func RecvAndDecode(conn *Conn) (interface{}, error) { // XXX interface{} -> NEODecoder ?
pkt, err := conn.Recv() pkt, err := conn.Recv()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -123,34 +125,46 @@ func EncodeAndSend(conn *Conn, pkt NEOEncoder) error { ...@@ -123,34 +125,46 @@ func EncodeAndSend(conn *Conn, pkt NEOEncoder) error {
return conn.Send(&buf) // XXX why pointer? return conn.Send(&buf) // XXX why pointer?
} }
// XXX naming for RecvAndDecode and EncodeAndSend // ServeClient serves incoming connection on which peer identified itself as client
func (ch *StorageClientHandler) ServeConn(ctx context.Context, conn *Conn) { func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
// TODO ctx.Done -> close conn // close connection when either cancelling or returning (e.g. due to an error)
defer conn.Close() // XXX err // ( when cancelling - conn.Close will signal to current IO to
// terminate with an error )
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
select {
case <-ctx.Done():
// XXX tell client we are shutting down?
case <-retch:
}
conn.Close() // XXX err
}()
pkt, err := RecvAndDecode(conn) for {
req, err := RecvAndDecode(conn)
if err != nil { if err != nil {
return // XXX log / err / send error before closing return // XXX log / err / send error before closing
} }
switch pkt := pkt.(type) { switch req := req.(type) {
case *GetObject: case *GetObject:
xid := zodb.Xid{Oid: pkt.Oid} xid := zodb.Xid{Oid: req.Oid}
if pkt.Serial != INVALID_TID { if req.Serial != INVALID_TID {
xid.Tid = pkt.Serial xid.Tid = req.Serial
xid.TidBefore = false xid.TidBefore = false
} else { } else {
xid.Tid = pkt.Tid xid.Tid = req.Tid
xid.TidBefore = true xid.TidBefore = true
} }
data, tid, err := ch.stor.zstor.Load(xid) var reply NEOEncoder
data, tid, err := stor.zstor.Load(xid)
if err != nil { if err != nil {
// TODO translate err to NEO protocol error codes // TODO translate err to NEO protocol error codes
errPkt := Error{Code: 0, Message: err.Error()} reply = &Error{Code: 0, Message: err.Error()}
EncodeAndSend(conn, &errPkt) // XXX err
} else { } else {
answer := AnswerGetObject{ reply = &AnswerGetObject{
Oid: xid.Oid, Oid: xid.Oid,
Serial: tid, Serial: tid,
...@@ -161,16 +175,18 @@ func (ch *StorageClientHandler) ServeConn(ctx context.Context, conn *Conn) { ...@@ -161,16 +175,18 @@ func (ch *StorageClientHandler) ServeConn(ctx context.Context, conn *Conn) {
// XXX .NextSerial // XXX .NextSerial
// XXX .DataSerial // XXX .DataSerial
} }
EncodeAndSend(conn, &answer) // XXX err
} }
case *LastTransaction: EncodeAndSend(conn, reply) // XXX err
// ----//---- for zstor.LastTid()
case *ObjectHistory: case *LastTransaction:
lastTid := stor.zstor.LastTid()
EncodeAndSend(conn, &AnswerLastTransaction{lastTid}) // XXX err
//case *ObjectHistory:
//case *StoreObject: //case *StoreObject:
} }
//pkt.Put(...) //req.Put(...)
}
} }
...@@ -122,7 +122,7 @@ type IStorage interface { ...@@ -122,7 +122,7 @@ type IStorage interface {
// LastTid returns the id of the last committed transaction. // LastTid returns the id of the last committed transaction.
// if not transactions have been committed yet, LastTid returns Tid zero value // if not transactions have been committed yet, LastTid returns Tid zero value
// XXX ^^^ ok ? // XXX ^^^ ok ?
LastTid() Tid // XXX -> Tid, ok ? LastTid() Tid // XXX -> Tid, ok ? ,err ?
// LoadSerial and LoadBefore generalized into 1 Load (see Xid for details) // LoadSerial and LoadBefore generalized into 1 Load (see Xid for details)
// TODO data []byte -> something allocated from slab ? // TODO data []byte -> something allocated from slab ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment