Commit 68e5de9a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cfc26e2b
...@@ -226,7 +226,7 @@ func (c *Client) talkMaster(ctx context.Context) (err error) { ...@@ -226,7 +226,7 @@ func (c *Client) talkMaster(ctx context.Context) (err error) {
err := c.talkMaster1(ctx) err := c.talkMaster1(ctx)
log.Warning(ctx, err) // XXX Warning ok? -> Error? log.Warning(ctx, err) // XXX Warning ok? -> Error?
// XXX if err == "reject identification / protocol error" -> shutdown client // TODO if err == "reject identification / protocol error" -> shutdown client
// TODO if err = shutdown -> return // TODO if err = shutdown -> return
...@@ -370,6 +370,14 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err er ...@@ -370,6 +370,14 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err er
// recvMaster1 handles 1 message from master // recvMaster1 handles 1 message from master
func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error { func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
switch msg := req.Msg.(type) {
// <- committed txn
case *proto.InvalidateObjects:
return c.invalidateObjects(msg)
}
// messages for state changes
c.node.StateMu.Lock() c.node.StateMu.Lock()
switch msg := req.Msg.(type) { switch msg := req.Msg.(type) {
...@@ -393,8 +401,6 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error { ...@@ -393,8 +401,6 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
c.node.UpdateClusterState(ctx, msg) c.node.UpdateClusterState(ctx, msg)
} }
// XXX
// *proto.InvalidateObjects -> watchq
// update .operational + notify those who was waiting for it // update .operational + notify those who was waiting for it
opready := c.updateOperational() opready := c.updateOperational()
...@@ -404,6 +410,42 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error { ...@@ -404,6 +410,42 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
return nil return nil
} }
// invalidateObjects is called by recvMaster1 on receiving invalidateObjects notification.
func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error {
tid := msg.Tid
// likely no need to verify for tid↑ because IStorage watcher does it.
// However until .at0 is initialized we do not send events to IStorage,
// so double check for monotonicity here as well.
if tid <= c.head {
return fmt.Errorf("bad invalidation from master: tid not ↑: %s -> %s", c.head, tid)
}
c.head = tid
if c.watchq == nil {
return nil
}
// invalidation event received and we have to send it to .watchq
event := &zodb.EventCommit{Tid: tid, Changev: msg.OidList}
c.at0Mu.Lock()
defer c.at0Mu.Unlock()
// queue initial events until .at0 is initialized after register
// queued events will be sent to watchq by zeo ctor after initializing .at0
if !c.at0Initialized {
c.eventq0 = append(c.eventq0, event)
return nil
}
// at0 is initialized - ok to send current event if it goes > at0
if tid > c.at0 {
c.watchq <- event
}
return nil
}
// flushEventq0 flushes events queued in c.eventq0. // flushEventq0 flushes events queued in c.eventq0.
// must be called under .at0Mu // must be called under .at0Mu
func (c *Client) flushEventq0() { func (c *Client) flushEventq0() {
......
...@@ -268,7 +268,6 @@ func TestLoad(t *testing.T) { ...@@ -268,7 +268,6 @@ func TestLoad(t *testing.T) {
} }
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
t.Skip("FIXME currently hangs")
withNEOSrv(t, func(t *testing.T, nsrv NEOSrv) { withNEOSrv(t, func(t *testing.T, nsrv NEOSrv) {
xtesting.DrvTestWatch(t, fmt.Sprintf("neo://%s@%s", nsrv.ClusterName(), nsrv.MasterAddr()), openClientByURL) xtesting.DrvTestWatch(t, fmt.Sprintf("neo://%s@%s", nsrv.ClusterName(), nsrv.MasterAddr()), openClientByURL)
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment