Commit 54ab2674 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5114dc42
......@@ -934,4 +934,23 @@ type NotifyTransactionFinished struct {
// replication
// TODO
// Notify a storage node to replicate partitions up to given 'tid'
// and from given sources.
// M -> S
//
// - upstream_name: replicate from an upstream cluster
// - address: address of the source storage node, or None if there's no new
// data up to 'tid' for the given partition
type Replicate struct {
Tid zodb.Tid
UpstreamName string
SourceDict map[uint32/*PNumber*/]string // partition -> address
}
// Notify the master node that a partition has been successfully replicated
// from a storage to another.
// S -> M
type ReplicationDone struct {
Offset uint32 // PNumber
Tid zodb.Tid
}
......@@ -52,6 +52,7 @@ noask('CheckSerialRange')
nonotify('PartitionCorrupted')
noask('LastTransaction')
noask('CheckCurrentSerial')
nonotify('ReplicationDone')
_ = renames
_['AskPrimary'] = 'PrimaryMaster'
......
......@@ -3447,6 +3447,124 @@ overflow:
return 0, ErrDecodeOverflow
}
// 90. Replicate
func (*Replicate) neoMsgCode() uint16 {
return 90
}
func (p *Replicate) neoMsgEncodedLen() int {
var size int
for key := range p.SourceDict {
size += len(p.SourceDict[key])
}
return 16 + len(p.UpstreamName) + len(p.SourceDict)*8 + size
}
func (p *Replicate) neoMsgEncode(data []byte) {
binary.BigEndian.PutUint64(data[0:], uint64(p.Tid))
{
l := uint32(len(p.UpstreamName))
binary.BigEndian.PutUint32(data[8:], l)
data = data[12:]
copy(data, p.UpstreamName)
data = data[l:]
}
{
l := uint32(len(p.SourceDict))
binary.BigEndian.PutUint32(data[0:], l)
data = data[4:]
keyv := make([]uint32, 0, l)
for key := range p.SourceDict {
keyv = append(keyv, key)
}
sort.Slice(keyv, func(i, j int) bool { return keyv[i] < keyv[j] })
for _, key := range keyv {
binary.BigEndian.PutUint32(data[0:], key)
{
l := uint32(len(p.SourceDict[key]))
binary.BigEndian.PutUint32(data[4:], l)
data = data[8:]
copy(data, p.SourceDict[key])
data = data[l:]
}
data = data[0:]
}
}
}
func (p *Replicate) neoMsgDecode(data []byte) (int, error) {
var nread uint32
if uint32(len(data)) < 12 {
goto overflow
}
p.Tid = zodb.Tid(binary.BigEndian.Uint64(data[0:]))
{
l := binary.BigEndian.Uint32(data[8:])
data = data[12:]
if uint32(len(data)) < 4+l {
goto overflow
}
nread += 4 + l
p.UpstreamName = string(data[:l])
data = data[l:]
}
{
l := binary.BigEndian.Uint32(data[0:])
data = data[4:]
p.SourceDict = make(map[uint32]string, l)
m := p.SourceDict
for i := 0; uint32(i) < l; i++ {
if uint32(len(data)) < 8 {
goto overflow
}
key := binary.BigEndian.Uint32(data[0:])
{
l := binary.BigEndian.Uint32(data[4:])
data = data[8:]
if uint32(len(data)) < l {
goto overflow
}
nread += l
m[key] = string(data[:l])
data = data[l:]
}
}
nread += l * 8
}
return 12 + int(nread), nil
overflow:
return 0, ErrDecodeOverflow
}
// 91. ReplicationDone
func (*ReplicationDone) neoMsgCode() uint16 {
return 91
}
func (p *ReplicationDone) neoMsgEncodedLen() int {
return 12
}
func (p *ReplicationDone) neoMsgEncode(data []byte) {
binary.BigEndian.PutUint32(data[0:], p.Offset)
binary.BigEndian.PutUint64(data[4:], uint64(p.Tid))
}
func (p *ReplicationDone) neoMsgDecode(data []byte) (int, error) {
if uint32(len(data)) < 12 {
goto overflow
}
p.Offset = binary.BigEndian.Uint32(data[0:])
p.Tid = zodb.Tid(binary.BigEndian.Uint64(data[4:]))
return 12, nil
overflow:
return 0, ErrDecodeOverflow
}
// registry of message types
var msgTypeRegistry = map[uint16]reflect.Type{
0 | answerBit: reflect.TypeOf(Error{}),
......@@ -3539,4 +3657,6 @@ var msgTypeRegistry = map[uint16]reflect.Type{
87: reflect.TypeOf(CheckCurrentSerial{}),
88 | answerBit: reflect.TypeOf(AnswerCheckCurrentSerial{}),
89: reflect.TypeOf(NotifyTransactionFinished{}),
90: reflect.TypeOf(Replicate{}),
91: reflect.TypeOf(ReplicationDone{}),
}
......@@ -60,7 +60,7 @@ var pyMsgRegistry = map[uint16]string{
87: "CheckCurrentSerial",
89: "NotifyTransactionFinished",
90: "Replicate",
91: "NotifyReplicationDone",
91: "ReplicationDone",
92: "AskFetchTransactions",
94: "AskFetchObjects",
96: "AddTransaction",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment