Commit 8e00bfdc authored by Levin Zimmermann's avatar Levin Zimmermann Committed by Kirill Smelkov

go/neo/proto: Update 'Compression' to int to support different compression algorithms

With nexedi/neoppod@fd80cc30 NEO/py added support to encode the compression
algorithm with the 'Compression' parameter. Before this, compression could
only be true (= with compression) or false (= without compression). Now
the absence of compression is encoded with 0. Any other number than 0
encodes a compression algorithm. The mapping is currently:

	1 = zlib

In the future, 2 could mean zstd [1].

[1] https://github.com/facebook/zstd/issues/1134

/reviewed-by @kirr
/reviewed-on !6
parent 20b37b60
......@@ -381,7 +381,10 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
}
}
if resp.Compression {
if resp.Compression != 0 {
if resp.Compression != 1 {
return nil, 0, fmt.Errorf("unsupported compression algorithm: %v", resp.Compression)
}
buf2 := &mem.Buf{Data: nil}
udata, err := xzlib.Decompress(buf.Data)
buf.Release()
......
......@@ -257,7 +257,7 @@ func _TestMasterStorage(t0 *tEnv) {
Oid: xid1.Oid,
Serial: serial1,
NextSerial: proto.INVALID_TID,
Compression: false,
Compression: 0,
Data: buf1,
DataSerial: 0, // XXX
Checksum: sha1.Sum(buf1.Data),
......@@ -292,7 +292,7 @@ func _TestMasterStorage(t0 *tEnv) {
Oid: xid1prev.Oid,
Serial: serial1prev,
NextSerial: serial1,
Compression: false,
Compression: 0,
Data: buf1prev,
DataSerial: 0, // XXX
Checksum: sha1.Sum(buf1prev.Data),
......
......@@ -336,6 +336,11 @@ func (a *Address) neoDecodeN(b []byte) (uint64, bool) {
// Checksum is a SHA1 hash.
type Checksum [20]byte
// Compression is an integer that tells the compression algorithm:
// 0 = no compression algorithm is used
// 1 = zlib is used
type Compression uint8
// PTid is Partition Table identifier.
//
// Zero value means "invalid id" (<-> None in py.PPTID, nil in msgpack)
......@@ -711,7 +716,7 @@ type AnswerRebaseObject struct {
Serial zodb.Tid
ConflictSerial zodb.Tid
// FIXME POption('data')
Compression bool
Compression Compression
Checksum Checksum
Data *mem.Buf
}
......@@ -725,7 +730,7 @@ type AnswerRebaseObject struct {
type StoreObject struct {
Oid zodb.Oid
Serial zodb.Tid
Compression bool
Compression Compression
Checksum Checksum
Data []byte // TODO -> msg.Buf, separately (for writev)
DataSerial zodb.Tid
......@@ -780,7 +785,7 @@ type AnswerObject struct {
Oid zodb.Oid
Serial zodb.Tid
NextSerial zodb.Tid
Compression bool
Compression Compression
Checksum Checksum
Data *mem.Buf // TODO encode -> separately (for writev)
DataSerial zodb.Tid
......@@ -1215,7 +1220,7 @@ type AddTransaction struct {
type AddObject struct {
Oid zodb.Oid
Serial zodb.Tid
Compression bool
Compression Compression
Checksum Checksum
Data *mem.Buf
DataSerial zodb.Tid
......
......@@ -182,7 +182,7 @@ func TestMsgMarshal(t *testing.T) {
{&StoreObject{
Oid: 0x0102030405060708,
Serial: 0x0a0b0c0d0e0f0102,
Compression: false,
Compression: 0,
Checksum: Checksum{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}, // XXX simpler?
Data: []byte("hello world"),
DataSerial: 0x0a0b0c0d0e0f0103,
......@@ -198,7 +198,7 @@ func TestMsgMarshal(t *testing.T) {
hex("97") +
hex("c408") + hex("0102030405060708") +
hex("c408") + hex("0a0b0c0d0e0f0102") +
hex("c2") +
hex("00") +
hex("c414") + hex("0102030405060708090a0b0c0d0e0f1011121314") +
hex("c40b") + "hello world" +
hex("c408") + hex("0a0b0c0d0e0f0103") +
......
......@@ -4064,7 +4064,7 @@ func (p *AnswerRebaseObject) neoMsgEncodedLenN() int {
func (p *AnswerRebaseObject) neoMsgEncodeN(data []byte) {
binary.BigEndian.PutUint64(data[0:], uint64(p.Serial))
binary.BigEndian.PutUint64(data[8:], uint64(p.ConflictSerial))
(data[16:])[0] = bool2byte(p.Compression)
(data[16:])[0] = uint8(p.Compression)
copy(data[17:], p.Checksum[:])
{
l := uint32(len(p.Data.XData()))
......@@ -4082,7 +4082,7 @@ func (p *AnswerRebaseObject) neoMsgDecodeN(data []byte) (int, error) {
}
p.Serial = zodb.Tid(binary.BigEndian.Uint64(data[0 : 0+8]))
p.ConflictSerial = zodb.Tid(binary.BigEndian.Uint64(data[8 : 8+8]))
p.Compression = byte2bool((data[16 : 16+1])[0])
p.Compression = Compression((data[16 : 16+1])[0])
copy(p.Checksum[:], data[17:37])
{
l := binary.BigEndian.Uint32(data[37 : 37+4])
......@@ -4102,7 +4102,7 @@ overflow:
}
func (p *AnswerRebaseObject) neoMsgEncodedLenM() int {
return 44 + msgpack.BinHeadSize(len(p.Data.XData())) + len(p.Data.XData())
return 43 + msgpack.Uint8Size(uint8(p.Compression)) + msgpack.BinHeadSize(len(p.Data.XData())) + len(p.Data.XData())
}
func (p *AnswerRebaseObject) neoMsgEncodeM(data []byte) {
......@@ -4113,14 +4113,17 @@ func (p *AnswerRebaseObject) neoMsgEncodeM(data []byte) {
data[11] = byte(msgpack.Bin8)
data[12] = 8
binary.BigEndian.PutUint64(data[13:], uint64(p.ConflictSerial))
data[21] = byte(msgpack.Bool(p.Compression))
data[22] = byte(msgpack.Bin8)
data[23] = 20
copy(data[24:], p.Checksum[:])
{
n := msgpack.PutUint8(data[21:], uint8(p.Compression))
data = data[21+n:]
}
data[0] = byte(msgpack.Bin8)
data[1] = 20
copy(data[2:], p.Checksum[:])
{
l := len(p.Data.XData())
n := msgpack.PutBinHead(data[44:], l)
data = data[44+n:]
n := msgpack.PutBinHead(data[22:], l)
data = data[22+n:]
copy(data, p.Data.XData())
data = data[l:]
}
......@@ -4128,7 +4131,7 @@ func (p *AnswerRebaseObject) neoMsgEncodeM(data []byte) {
func (p *AnswerRebaseObject) neoMsgDecodeM(data []byte) (int, error) {
var nread uint64
if len(data) < 44 {
if len(data) < 21 {
goto overflow
}
if op, opOk := msgpack.Op(data[0]), msgpack.FixArray_4|5; op != opOk {
......@@ -4148,22 +4151,27 @@ func (p *AnswerRebaseObject) neoMsgDecodeM(data []byte) (int, error) {
return 0, mdecodeLen8Err("AnswerRebaseObject.ConflictSerial", l, 8)
}
p.ConflictSerial = zodb.Tid(binary.BigEndian.Uint64(data[13:]))
switch op := msgpack.Op(data[21]); op {
default:
return 0, mdecodeOpErr("AnswerRebaseObject.Compression", op, msgpack.True, msgpack.False)
case msgpack.True:
p.Compression = true
case msgpack.False:
p.Compression = false
data = data[21:]
{
v, tail, err := msgp.ReadUint8Bytes(data)
if err != nil {
return 0, mdecodeErr("AnswerRebaseObject.Compression", err)
}
if op := msgpack.Op(data[22]); op != msgpack.Bin8 {
p.Compression = Compression(v)
nread += uint64(len(data) - len(tail))
data = tail
}
if len(data) < 22 {
goto overflow
}
if op := msgpack.Op(data[0]); op != msgpack.Bin8 {
return 0, mdecodeOpErr("AnswerRebaseObject.Checksum", op, msgpack.Bin8)
}
if l := data[23]; l != 20 {
if l := data[1]; l != 20 {
return 0, mdecodeLen8Err("AnswerRebaseObject.Checksum", l, 20)
}
copy(p.Checksum[:], data[24:44])
data = data[44:]
copy(p.Checksum[:], data[2:22])
data = data[22:]
{
b, tail, err := msgp.ReadBytesZC(data)
if err != nil {
......@@ -4174,7 +4182,7 @@ func (p *AnswerRebaseObject) neoMsgDecodeM(data []byte) (int, error) {
nread += uint64(len(data) - len(tail))
data = tail
}
return 44 + int(nread), nil
return 43 + int(nread), nil
overflow:
return 0, ErrDecodeOverflow
......@@ -4193,7 +4201,7 @@ func (p *StoreObject) neoMsgEncodedLenN() int {
func (p *StoreObject) neoMsgEncodeN(data []byte) {
binary.BigEndian.PutUint64(data[0:], uint64(p.Oid))
binary.BigEndian.PutUint64(data[8:], uint64(p.Serial))
(data[16:])[0] = bool2byte(p.Compression)
(data[16:])[0] = uint8(p.Compression)
copy(data[17:], p.Checksum[:])
{
l := uint32(len(p.Data))
......@@ -4213,7 +4221,7 @@ func (p *StoreObject) neoMsgDecodeN(data []byte) (int, error) {
}
p.Oid = zodb.Oid(binary.BigEndian.Uint64(data[0 : 0+8]))
p.Serial = zodb.Tid(binary.BigEndian.Uint64(data[8 : 8+8]))
p.Compression = byte2bool((data[16 : 16+1])[0])
p.Compression = Compression((data[16 : 16+1])[0])
copy(p.Checksum[:], data[17:37])
{
l := binary.BigEndian.Uint32(data[37 : 37+4])
......@@ -4235,7 +4243,7 @@ overflow:
}
func (p *StoreObject) neoMsgEncodedLenM() int {
return 64 + msgpack.BinHeadSize(len(p.Data)) + len(p.Data)
return 63 + msgpack.Uint8Size(uint8(p.Compression)) + msgpack.BinHeadSize(len(p.Data)) + len(p.Data)
}
func (p *StoreObject) neoMsgEncodeM(data []byte) {
......@@ -4246,14 +4254,17 @@ func (p *StoreObject) neoMsgEncodeM(data []byte) {
data[11] = byte(msgpack.Bin8)
data[12] = 8
binary.BigEndian.PutUint64(data[13:], uint64(p.Serial))
data[21] = byte(msgpack.Bool(p.Compression))
data[22] = byte(msgpack.Bin8)
data[23] = 20
copy(data[24:], p.Checksum[:])
{
n := msgpack.PutUint8(data[21:], uint8(p.Compression))
data = data[21+n:]
}
data[0] = byte(msgpack.Bin8)
data[1] = 20
copy(data[2:], p.Checksum[:])
{
l := len(p.Data)
n := msgpack.PutBinHead(data[44:], l)
data = data[44+n:]
n := msgpack.PutBinHead(data[22:], l)
data = data[22+n:]
copy(data, p.Data)
data = data[l:]
}
......@@ -4267,7 +4278,7 @@ func (p *StoreObject) neoMsgEncodeM(data []byte) {
func (p *StoreObject) neoMsgDecodeM(data []byte) (int, error) {
var nread uint64
if len(data) < 44 {
if len(data) < 21 {
goto overflow
}
if op, opOk := msgpack.Op(data[0]), msgpack.FixArray_4|7; op != opOk {
......@@ -4287,22 +4298,27 @@ func (p *StoreObject) neoMsgDecodeM(data []byte) (int, error) {
return 0, mdecodeLen8Err("StoreObject.Serial", l, 8)
}
p.Serial = zodb.Tid(binary.BigEndian.Uint64(data[13:]))
switch op := msgpack.Op(data[21]); op {
default:
return 0, mdecodeOpErr("StoreObject.Compression", op, msgpack.True, msgpack.False)
case msgpack.True:
p.Compression = true
case msgpack.False:
p.Compression = false
data = data[21:]
{
v, tail, err := msgp.ReadUint8Bytes(data)
if err != nil {
return 0, mdecodeErr("StoreObject.Compression", err)
}
if op := msgpack.Op(data[22]); op != msgpack.Bin8 {
p.Compression = Compression(v)
nread += uint64(len(data) - len(tail))
data = tail
}
if len(data) < 22 {
goto overflow
}
if op := msgpack.Op(data[0]); op != msgpack.Bin8 {
return 0, mdecodeOpErr("StoreObject.Checksum", op, msgpack.Bin8)
}
if l := data[23]; l != 20 {
if l := data[1]; l != 20 {
return 0, mdecodeLen8Err("StoreObject.Checksum", l, 20)
}
copy(p.Checksum[:], data[24:44])
data = data[44:]
copy(p.Checksum[:], data[2:22])
data = data[22:]
{
b, tail, err := msgp.ReadBytesZC(data)
if err != nil {
......@@ -4330,7 +4346,7 @@ func (p *StoreObject) neoMsgDecodeM(data []byte) (int, error) {
return 0, mdecodeLen8Err("StoreObject.Tid", l, 8)
}
p.Tid = zodb.Tid(binary.BigEndian.Uint64(data[12:]))
return 64 + int(nread), nil
return 63 + int(nread), nil
overflow:
return 0, ErrDecodeOverflow
......@@ -4957,7 +4973,7 @@ func (p *AnswerObject) neoMsgEncodeN(data []byte) {
binary.BigEndian.PutUint64(data[0:], uint64(p.Oid))
binary.BigEndian.PutUint64(data[8:], uint64(p.Serial))
binary.BigEndian.PutUint64(data[16:], uint64(p.NextSerial))
(data[24:])[0] = bool2byte(p.Compression)
(data[24:])[0] = uint8(p.Compression)
copy(data[25:], p.Checksum[:])
{
l := uint32(len(p.Data.XData()))
......@@ -4977,7 +4993,7 @@ func (p *AnswerObject) neoMsgDecodeN(data []byte) (int, error) {
p.Oid = zodb.Oid(binary.BigEndian.Uint64(data[0 : 0+8]))
p.Serial = zodb.Tid(binary.BigEndian.Uint64(data[8 : 8+8]))
p.NextSerial = zodb.Tid(binary.BigEndian.Uint64(data[16 : 16+8]))
p.Compression = byte2bool((data[24 : 24+1])[0])
p.Compression = Compression((data[24 : 24+1])[0])
copy(p.Checksum[:], data[25:45])
{
l := binary.BigEndian.Uint32(data[45 : 45+4])
......@@ -4998,7 +5014,7 @@ overflow:
}
func (p *AnswerObject) neoMsgEncodedLenM() int {
return 64 + msgpack.BinHeadSize(len(p.Data.XData())) + len(p.Data.XData())
return 63 + msgpack.Uint8Size(uint8(p.Compression)) + msgpack.BinHeadSize(len(p.Data.XData())) + len(p.Data.XData())
}
func (p *AnswerObject) neoMsgEncodeM(data []byte) {
......@@ -5012,14 +5028,17 @@ func (p *AnswerObject) neoMsgEncodeM(data []byte) {
data[21] = byte(msgpack.Bin8)
data[22] = 8
binary.BigEndian.PutUint64(data[23:], uint64(p.NextSerial))
data[31] = byte(msgpack.Bool(p.Compression))
data[32] = byte(msgpack.Bin8)
data[33] = 20
copy(data[34:], p.Checksum[:])
{
n := msgpack.PutUint8(data[31:], uint8(p.Compression))
data = data[31+n:]
}
data[0] = byte(msgpack.Bin8)
data[1] = 20
copy(data[2:], p.Checksum[:])
{
l := len(p.Data.XData())
n := msgpack.PutBinHead(data[54:], l)
data = data[54+n:]
n := msgpack.PutBinHead(data[22:], l)
data = data[22+n:]
copy(data, p.Data.XData())
data = data[l:]
}
......@@ -5030,7 +5049,7 @@ func (p *AnswerObject) neoMsgEncodeM(data []byte) {
func (p *AnswerObject) neoMsgDecodeM(data []byte) (int, error) {
var nread uint64
if len(data) < 54 {
if len(data) < 31 {
goto overflow
}
if op, opOk := msgpack.Op(data[0]), msgpack.FixArray_4|7; op != opOk {
......@@ -5057,22 +5076,27 @@ func (p *AnswerObject) neoMsgDecodeM(data []byte) (int, error) {
return 0, mdecodeLen8Err("AnswerObject.NextSerial", l, 8)
}
p.NextSerial = zodb.Tid(binary.BigEndian.Uint64(data[23:]))
switch op := msgpack.Op(data[31]); op {
default:
return 0, mdecodeOpErr("AnswerObject.Compression", op, msgpack.True, msgpack.False)
case msgpack.True:
p.Compression = true
case msgpack.False:
p.Compression = false
data = data[31:]
{
v, tail, err := msgp.ReadUint8Bytes(data)
if err != nil {
return 0, mdecodeErr("AnswerObject.Compression", err)
}
if op := msgpack.Op(data[32]); op != msgpack.Bin8 {
p.Compression = Compression(v)
nread += uint64(len(data) - len(tail))
data = tail
}
if len(data) < 22 {
goto overflow
}
if op := msgpack.Op(data[0]); op != msgpack.Bin8 {
return 0, mdecodeOpErr("AnswerObject.Checksum", op, msgpack.Bin8)
}
if l := data[33]; l != 20 {
if l := data[1]; l != 20 {
return 0, mdecodeLen8Err("AnswerObject.Checksum", l, 20)
}
copy(p.Checksum[:], data[34:54])
data = data[54:]
copy(p.Checksum[:], data[2:22])
data = data[22:]
{
b, tail, err := msgp.ReadBytesZC(data)
if err != nil {
......@@ -5093,7 +5117,7 @@ func (p *AnswerObject) neoMsgDecodeM(data []byte) (int, error) {
return 0, mdecodeLen8Err("AnswerObject.DataSerial", l, 8)
}
p.DataSerial = zodb.Tid(binary.BigEndian.Uint64(data[2:]))
return 64 + int(nread), nil
return 63 + int(nread), nil
overflow:
return 0, ErrDecodeOverflow
......@@ -10607,7 +10631,7 @@ func (p *AddObject) neoMsgEncodedLenN() int {
func (p *AddObject) neoMsgEncodeN(data []byte) {
binary.BigEndian.PutUint64(data[0:], uint64(p.Oid))
binary.BigEndian.PutUint64(data[8:], uint64(p.Serial))
(data[16:])[0] = bool2byte(p.Compression)
(data[16:])[0] = uint8(p.Compression)
copy(data[17:], p.Checksum[:])
{
l := uint32(len(p.Data.XData()))
......@@ -10626,7 +10650,7 @@ func (p *AddObject) neoMsgDecodeN(data []byte) (int, error) {
}
p.Oid = zodb.Oid(binary.BigEndian.Uint64(data[0 : 0+8]))
p.Serial = zodb.Tid(binary.BigEndian.Uint64(data[8 : 8+8]))
p.Compression = byte2bool((data[16 : 16+1])[0])
p.Compression = Compression((data[16 : 16+1])[0])
copy(p.Checksum[:], data[17:37])
{
l := binary.BigEndian.Uint32(data[37 : 37+4])
......@@ -10647,7 +10671,7 @@ overflow:
}
func (p *AddObject) neoMsgEncodedLenM() int {
return 54 + msgpack.BinHeadSize(len(p.Data.XData())) + len(p.Data.XData())
return 53 + msgpack.Uint8Size(uint8(p.Compression)) + msgpack.BinHeadSize(len(p.Data.XData())) + len(p.Data.XData())
}
func (p *AddObject) neoMsgEncodeM(data []byte) {
......@@ -10658,14 +10682,17 @@ func (p *AddObject) neoMsgEncodeM(data []byte) {
data[11] = byte(msgpack.Bin8)
data[12] = 8
binary.BigEndian.PutUint64(data[13:], uint64(p.Serial))
data[21] = byte(msgpack.Bool(p.Compression))
data[22] = byte(msgpack.Bin8)
data[23] = 20
copy(data[24:], p.Checksum[:])
{
n := msgpack.PutUint8(data[21:], uint8(p.Compression))
data = data[21+n:]
}
data[0] = byte(msgpack.Bin8)
data[1] = 20
copy(data[2:], p.Checksum[:])
{
l := len(p.Data.XData())
n := msgpack.PutBinHead(data[44:], l)
data = data[44+n:]
n := msgpack.PutBinHead(data[22:], l)
data = data[22+n:]
copy(data, p.Data.XData())
data = data[l:]
}
......@@ -10676,7 +10703,7 @@ func (p *AddObject) neoMsgEncodeM(data []byte) {
func (p *AddObject) neoMsgDecodeM(data []byte) (int, error) {
var nread uint64
if len(data) < 44 {
if len(data) < 21 {
goto overflow
}
if op, opOk := msgpack.Op(data[0]), msgpack.FixArray_4|6; op != opOk {
......@@ -10696,22 +10723,27 @@ func (p *AddObject) neoMsgDecodeM(data []byte) (int, error) {
return 0, mdecodeLen8Err("AddObject.Serial", l, 8)
}
p.Serial = zodb.Tid(binary.BigEndian.Uint64(data[13:]))
switch op := msgpack.Op(data[21]); op {
default:
return 0, mdecodeOpErr("AddObject.Compression", op, msgpack.True, msgpack.False)
case msgpack.True:
p.Compression = true
case msgpack.False:
p.Compression = false
data = data[21:]
{
v, tail, err := msgp.ReadUint8Bytes(data)
if err != nil {
return 0, mdecodeErr("AddObject.Compression", err)
}
if op := msgpack.Op(data[22]); op != msgpack.Bin8 {
p.Compression = Compression(v)
nread += uint64(len(data) - len(tail))
data = tail
}
if len(data) < 22 {
goto overflow
}
if op := msgpack.Op(data[0]); op != msgpack.Bin8 {
return 0, mdecodeOpErr("AddObject.Checksum", op, msgpack.Bin8)
}
if l := data[23]; l != 20 {
if l := data[1]; l != 20 {
return 0, mdecodeLen8Err("AddObject.Checksum", l, 20)
}
copy(p.Checksum[:], data[24:44])
data = data[44:]
copy(p.Checksum[:], data[2:22])
data = data[22:]
{
b, tail, err := msgp.ReadBytesZC(data)
if err != nil {
......@@ -10732,7 +10764,7 @@ func (p *AddObject) neoMsgDecodeM(data []byte) (int, error) {
return 0, mdecodeLen8Err("AddObject.DataSerial", l, 8)
}
p.DataSerial = zodb.Tid(binary.BigEndian.Uint64(data[2:]))
return 54 + int(nread), nil
return 53 + int(nread), nil
overflow:
return 0, ErrDecodeOverflow
......
......@@ -87,7 +87,7 @@ func (f *Backend) Load(ctx context.Context, xid zodb.Xid) (*proto.AnswerObject,
Serial: serial,
NextSerial: nextSerial,
Compression: false,
Compression: 0,
Data: buf,
Checksum: xsha1.NEOSum(buf.Data), // XXX computing every time
......
......@@ -68,7 +68,7 @@ loop:
buf := obj.Data
if obj.Compression {
if obj.Compression != 0 {
b.Fatal("compression was used")
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment