Commit c17c904b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 54137dd2
...@@ -325,7 +325,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -325,7 +325,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
if len(storv) == 0 { if len(storv) == 0 {
// XXX recheck it adds traceback to log -> XXX it does not -> add our Bugf which always forces +v on such error print // XXX recheck it adds traceback to log -> XXX it does not -> add our Bugf which always forces +v on such error print
return nil, 0, errors.Errorf("internal inconsistency: cluster is operational, but no storages alive for oid %v", xid.Oid) return nil, 0, errors.Errorf("internal inconsistency: cluster is operational, but no storages alive for oid %s", xid.Oid)
} }
// XXX vvv temp stub -> TODO pick up 3 random storages and send load // XXX vvv temp stub -> TODO pick up 3 random storages and send load
...@@ -371,7 +371,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -371,7 +371,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
udata, err := xzlib.Decompress(buf.Data) udata, err := xzlib.Decompress(buf.Data)
buf.Release() buf.Release()
if err != nil { if err != nil {
return nil, 0, fmt.Errorf("data corrupt: %v", err) return nil, 0, fmt.Errorf("data corrupt: %s", err)
} }
buf2.Data = udata buf2.Data = udata
buf = buf2 buf = buf2
......
...@@ -1047,10 +1047,9 @@ func (m *Master) updateNodeState(ctx context.Context, peer *xneo.PeerNode, state ...@@ -1047,10 +1047,9 @@ func (m *Master) updateNodeState(ctx context.Context, peer *xneo.PeerNode, state
m.updateNodeTab(ctx, nodei) m.updateNodeTab(ctx, nodei)
} }
// notifyAll notifies all peers about event. // notifyAll notifies all peers about δstate event.
// XXX place // XXX move close to _MasteredPeer.notify
func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) { func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) {
// XXX locking
for nid, peer := range m.peerTab { for nid, peer := range m.peerTab {
// TODO change limiting by buffer size to limiting by time - // TODO change limiting by buffer size to limiting by time -
// - i.e. detach peer if event queue grows more than 30s of time. // - i.e. detach peer if event queue grows more than 30s of time.
...@@ -1077,9 +1076,7 @@ func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) { ...@@ -1077,9 +1076,7 @@ func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) {
// //
// If node identification is accepted .nodeTab and .peerTab are updated and // If node identification is accepted .nodeTab and .peerTab are updated and
// corresponding peer entry is returned. New task is spawned to reply with // corresponding peer entry is returned. New task is spawned to reply with
// either accept or reject. // either reject or accept + notify.
//
// XXX If the peer is accepted (run something after initial accept completes)
func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ok bool) { func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ok bool) {
// XXX also verify ? : // XXX also verify ? :
// - NodeType valid // - NodeType valid
...@@ -1096,18 +1093,18 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1096,18 +1093,18 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
if nid == 0 || nid == proto.NID(n.idReq.NodeType, 0) /* XXX <- stub for "temp" check */ { if nid == 0 || nid == proto.NID(n.idReq.NodeType, 0) /* XXX <- stub for "temp" check */ {
nid = m.allocNID(nodeType) nid = m.allocNID(nodeType)
} }
// XXX nid < 0 (temporary) -> reallocate if conflict ? // TODO nid < 0 (temporary) -> reallocate if conflict ?
// XXX check nid matches NodeType // TODO check nid matches NodeType
node := m.node.State.NodeTab.Get(nid) node := m.node.State.NodeTab.Get(nid)
if node != nil { if node != nil {
// reject - nid is already occupied by someone else // reject - nid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting // TODO check also for down state - it could be the same node reconnecting
return &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("nid %v already used by another node", nid)} return &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("nid %s already used by another node", nid)}
} }
// accept only certain kind of nodes depending on .clusterState, e.g. // accept only certain kind of nodes depending on .clusterState, e.g.
// XXX ok to have this logic inside identify? (better provide from outside ?) // TODO caller should pass this logic to identify
switch nodeType { switch nodeType {
case proto.CLIENT: case proto.CLIENT:
if m.node.State.Code != proto.ClusterRunning { if m.node.State.Code != proto.ClusterRunning {
...@@ -1119,7 +1116,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1119,7 +1116,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
// TODO +master, admin // TODO +master, admin
default: default:
return &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("not accepting node type %v", nodeType)} return &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("not accepting node type %s", nodeType)}
} }
return nil return nil
......
...@@ -93,7 +93,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -93,7 +93,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// XXX move -> SetNumReplicas handler // XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1` // // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) { // if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
// return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas) // return fmt.Errorf("TODO for 1-storage POC: Npt: %d Nreplica: %d", accept.NumPartitions, accept.NumReplicas)
// } // }
// let master initialize us. If successful this ends with StartOperation command. // let master initialize us. If successful this ends with StartOperation command.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment