Commit 3438ab6a authored by Kirill Smelkov's avatar Kirill Smelkov

X neo: Fixes for "Close vs watchq"

parent f8fc7d5c
...@@ -68,6 +68,9 @@ type Client struct { ...@@ -68,6 +68,9 @@ type Client struct {
at0Initialized bool // true after .at0 is initialized at0Initialized bool // true after .at0 is initialized
at0Ready chan(struct{}) // ready after .at0 is initialized at0Ready chan(struct{}) // ready after .at0 is initialized
closeOnce sync.Once
closed chan struct{} // ready when Closed
ownNet bool // true if Client "owns" networker and should release it on Close ownNet bool // true if Client "owns" networker and should release it on Close
} }
...@@ -79,8 +82,9 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -79,8 +82,9 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
c := &Client{ c := &Client{
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
closed: make(chan struct{}),
} }
var runCtx context.Context var runCtx context.Context
...@@ -91,6 +95,9 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { ...@@ -91,6 +95,9 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
// Close implements zodb.IStorageDriver. // Close implements zodb.IStorageDriver.
func (c *Client) Close() (err error) { func (c *Client) Close() (err error) {
c.closeOnce.Do(func() {
close(c.closed)
})
c.runCancel() c.runCancel()
err = c.runWG.Wait() err = c.runWG.Wait()
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
...@@ -190,7 +197,7 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error { ...@@ -190,7 +197,7 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error {
defer c.at0Mu.Unlock() defer c.at0Mu.Unlock()
// queue initial events until .at0 is initialized after register // queue initial events until .at0 is initialized after register
// queued events will be sent to watchq by zeo ctor after initializing .at0 // queued events will be sent to watchq by syncMaster after initializing .at0
if !c.at0Initialized { if !c.at0Initialized {
c.eventq0 = append(c.eventq0, event) c.eventq0 = append(c.eventq0, event)
return nil return nil
...@@ -198,7 +205,13 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error { ...@@ -198,7 +205,13 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error {
// at0 is initialized - ok to send current event if it goes > at0 // at0 is initialized - ok to send current event if it goes > at0
if tid > c.at0 { if tid > c.at0 {
c.watchq <- event select {
case <-c.closed:
// closed - client does not read watchq anymore
case c.watchq <- event:
// ok
}
} }
return nil return nil
} }
...@@ -250,7 +263,13 @@ func (c *Client) flushEventq0() { ...@@ -250,7 +263,13 @@ func (c *Client) flushEventq0() {
if c.watchq != nil { if c.watchq != nil {
for _, e := range c.eventq0 { for _, e := range c.eventq0 {
if e.Tid > c.at0 { if e.Tid > c.at0 {
c.watchq <- e select {
case <-c.closed:
// closed - client does not read watchq anymore
case c.watchq <- e:
// ok
}
} }
} }
} }
...@@ -487,14 +506,18 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) ( ...@@ -487,14 +506,18 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
// close .watchq after serve is over // close .watchq after serve is over
c.at0Mu.Lock() c.at0Mu.Lock()
defer c.at0Mu.Unlock() defer c.at0Mu.Unlock()
if c.at0Initialized {
c.flushEventq0()
}
if c.watchq != nil { if c.watchq != nil {
if err != nil { if err != nil && /* already flushed .eventq0 */c.at0Initialized {
c.watchq <- &zodb.EventError{Err: err} select {
case <-c.closed:
// closed - client does not read watchq anymore
case c.watchq <- &zodb.EventError{Err: err}:
// ok
}
} }
close(c.watchq) close(c.watchq)
c.watchq = nil // prevent flushEventq0 to send to closed chan
} }
errq <- err errq <- err
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment