Commit a66a7172 authored by Kirill Smelkov's avatar Kirill Smelkov

X neo/storage: Also provide NextSerial in AnswerObject

Do this by ways of directly using FileStorage and temporarily extending
it with load function which returns both serial and next_serial on load.

We currently need next_serial to be there in AnswerObject for neo/py
client cache - as its code currently depends on next_serial (= next_tid)
for its computations.

This however can be reworked and next_serial not used there at all.
See also 5886aad3 (X zodb: Clarify Load interface)
.
parent 7b9e2c5d
...@@ -469,7 +469,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -469,7 +469,7 @@ func TestMasterStorage(t *testing.T) {
tc.Expect(conntx("s:3", "c:2", 3, &neo.AnswerObject{ tc.Expect(conntx("s:3", "c:2", 3, &neo.AnswerObject{
Oid: xid1.Oid, Oid: xid1.Oid,
Serial: serial1, Serial: serial1,
NextSerial: 0, // XXX NextSerial: neo.INVALID_TID,
Compression: false, Compression: false,
Data: buf1, Data: buf1,
DataSerial: 0, // XXX DataSerial: 0, // XXX
...@@ -478,6 +478,41 @@ func TestMasterStorage(t *testing.T) { ...@@ -478,6 +478,41 @@ func TestMasterStorage(t *testing.T) {
xwait(wg) xwait(wg)
// verify NextSerial is properly returned in AnswerObject via trace-loading prev. revision of obj1
// (XXX we currently need NextSerial for neo/py client cache)
wg = &errgroup.Group{}
xid1prev := zodb.Xid{Oid: 1, At: serial1 - 1}
buf1prev, serial1prev, err := zstor.Load(bg, xid1prev)
exc.Raiseif(err)
gox(wg, func() {
buf, serial, err := C.Load(bg, xid1prev)
exc.Raiseif(err)
if !(bytes.Equal(buf.Data, buf1prev.Data) && serial==serial1prev) {
exc.Raisef("C.Load(%v) ->\ndata:\n%s\nserial:\n%s\n", xid1prev,
pretty.Compare(buf1prev.Data, buf.Data), pretty.Compare(serial1prev, serial))
}
})
// ... -> GetObject(xid1prev)
tc.Expect(conntx("c:2", "s:3", 5, &neo.GetObject{
Oid: xid1prev.Oid,
Tid: serial1,
Serial: neo.INVALID_TID,
}))
tc.Expect(conntx("s:3", "c:2", 5, &neo.AnswerObject{
Oid: xid1prev.Oid,
Serial: serial1prev,
NextSerial: serial1,
Compression: false,
Data: buf1prev,
DataSerial: 0, // XXX
Checksum: sha1.Sum(buf1prev.Data),
}))
xwait(wg)
// C loads every other {<,=}serial:oid - established link is reused // C loads every other {<,=}serial:oid - established link is reused
ziter := zstor.Iterate(0, zodb.TidMax) ziter := zstor.Iterate(0, zodb.TidMax)
......
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/internal/common" "lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task" "lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext" "lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
...@@ -58,7 +59,15 @@ type Storage struct { ...@@ -58,7 +59,15 @@ type Storage struct {
// 1 inbox/ (commit queues) // 1 inbox/ (commit queues)
// 2 ? (data.fs) // 2 ? (data.fs)
// 3 packed/ (deltified objects) // 3 packed/ (deltified objects)
zstor zodb.IStorageDriver // underlying ZODB storage XXX -> directly work with fs1 & friends //
// XXX we currently depend on extra functionality FS provides over
// plain zodb.IStorage (e.g. loading with nextSerial) and even if
// nextSerial will be gone in the future, we will probably depend on
// particular layout more and more -> directly work with fs1 & friends.
//
// TODO -> abstract into backend interfaces so various backands are
// possible (e.g. +SQL)
zstor *fs1.FileStorage // underlying ZODB storage
//nodeCome chan nodeCome // node connected //nodeCome chan nodeCome // node connected
} }
...@@ -67,7 +76,7 @@ type Storage struct { ...@@ -67,7 +76,7 @@ type Storage struct {
// //
// The storage uses zstor as underlying backend for storing data. // The storage uses zstor as underlying backend for storing data.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorageDriver) *Storage { func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor *fs1.FileStorage) *Storage {
stor := &Storage{ stor := &Storage{
node: neo.NewNodeApp(net, neo.STORAGE, clusterName, masterAddr, serveAddr), node: neo.NewNodeApp(net, neo.STORAGE, clusterName, masterAddr, serveAddr),
zstor: zstor, zstor: zstor,
...@@ -548,7 +557,8 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -548,7 +557,8 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
xid.At = common.Before2At(req.Tid) xid.At = common.Before2At(req.Tid)
} }
buf, serial, err := stor.zstor.Load(ctx, xid) // FIXME kill nextSerial support after neo/py cache does not depend on next_serial
buf, serial, nextSerial, err := stor.zstor.Load_XXXWithNextSerialXXX(ctx, xid)
if err != nil { if err != nil {
// translate err to NEO protocol error codes // translate err to NEO protocol error codes
e := err.(*zodb.OpError) // XXX move this to ErrEncode? e := err.(*zodb.OpError) // XXX move this to ErrEncode?
...@@ -566,10 +576,15 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -566,10 +576,15 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
} }
} }
// no next serial -> None
if nextSerial == zodb.TidMax {
nextSerial = neo.INVALID_TID
}
return &neo.AnswerObject{ return &neo.AnswerObject{
Oid: xid.Oid, Oid: xid.Oid,
Serial: serial, Serial: serial,
NextSerial: nextSerial,
Compression: false, Compression: false,
Data: buf, Data: buf,
......
...@@ -126,21 +126,34 @@ func (dh *DataHeader) Free() { ...@@ -126,21 +126,34 @@ func (dh *DataHeader) Free() {
dhPool.Put(dh) dhPool.Put(dh)
} }
func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) { func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
// FIXME zodb.TidMax is only 7fff... tid from outside can be ffff... // FIXME zodb.TidMax is only 7fff... tid from outside can be ffff...
// -> TODO reject tid out of range // -> TODO reject tid out of range
buf, serial, err = fs.load(xid)
// FIXME kill Load_XXXWithNextSerial after neo/py cache does not depend on next_serial
buf, serial, _, err = fs.Load_XXXWithNextSerialXXX(nil, xid)
return buf, serial, err
}
// XXX temporary function - will go away:
//
// FIXME kill Load_XXXWithNextSerialXXX after neo/py cache does not depend on next_serial
func (fs *FileStorage) Load_XXXWithNextSerialXXX(_ context.Context, xid zodb.Xid) (buf *mem.Buf, serial, nextSerial zodb.Tid, err error) {
buf, serial, nextSerial, err = fs.load(xid)
if err != nil { if err != nil {
err = &zodb.OpError{URL: fs.URL(), Op: "load", Args: xid, Err: err} err = &zodb.OpError{URL: fs.URL(), Op: "load", Args: xid, Err: err}
} }
return buf, serial, err return buf, serial, nextSerial, err
} }
func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
// FIXME kill nextSerial support after neo/py cache does not depend on next_serial
func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial, nextSerial zodb.Tid, err error) {
// lookup in index position of oid data record within latest transaction which changed this oid // lookup in index position of oid data record within latest transaction which changed this oid
dataPos, ok := fs.index.Get(xid.Oid) dataPos, ok := fs.index.Get(xid.Oid)
if !ok { if !ok {
return nil, 0, &zodb.NoObjectError{Oid: xid.Oid} return nil, 0, 0, &zodb.NoObjectError{Oid: xid.Oid}
} }
// XXX go compiler cannot deduce dh should be on stack here // XXX go compiler cannot deduce dh should be on stack here
...@@ -150,14 +163,17 @@ func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err er ...@@ -150,14 +163,17 @@ func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err er
dh.Tid = zodb.TidMax dh.Tid = zodb.TidMax
dh.PrevRevPos = dataPos dh.PrevRevPos = dataPos
//defer dh.Free() //defer dh.Free()
buf, serial, err = fs._load(dh, xid) buf, serial, nextSerial, err = fs._load(dh, xid)
dh.Free() dh.Free()
return buf, serial, err return buf, serial, nextSerial, err
} }
func (fs *FileStorage) _load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) { // FIXME kill nextSerial support after neo/py cache does not depend on next_serial
func (fs *FileStorage) _load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid, zodb.Tid, error) {
// search backwards for when we first have data record with tid satisfying xid.At // search backwards for when we first have data record with tid satisfying xid.At
var nextSerial zodb.Tid
for { for {
nextSerial = dh.Tid
err := dh.LoadPrevRev(fs.file) err := dh.LoadPrevRev(fs.file)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
...@@ -165,7 +181,7 @@ func (fs *FileStorage) _load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid, ...@@ -165,7 +181,7 @@ func (fs *FileStorage) _load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid,
err = &zodb.NoDataError{Oid: xid.Oid, DeletedAt: 0} err = &zodb.NoDataError{Oid: xid.Oid, DeletedAt: 0}
} }
return nil, 0, err return nil, 0, 0, err
} }
if dh.Tid <= xid.At { if dh.Tid <= xid.At {
...@@ -179,14 +195,14 @@ func (fs *FileStorage) _load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid, ...@@ -179,14 +195,14 @@ func (fs *FileStorage) _load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid,
buf, err := dh.LoadData(fs.file) buf, err := dh.LoadData(fs.file)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, 0, err
} }
if buf.Data == nil { if buf.Data == nil {
// object was deleted // object was deleted
return nil, 0, &zodb.NoDataError{Oid: xid.Oid, DeletedAt: serial} return nil, 0, 0, &zodb.NoDataError{Oid: xid.Oid, DeletedAt: serial}
} }
return buf, serial, nil return buf, serial, nextSerial, nil
} }
// --- ZODB-level iteration --- // --- ZODB-level iteration ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment