Commit 6ca6a575 authored by Kirill Smelkov's avatar Kirill Smelkov

X zeo: Fixes for "Close vs watchq"

parent 1a42eccf
......@@ -52,6 +52,9 @@ type zeo struct {
// becomes ready when serve loop finishes
serveWG sync.WaitGroup
closeOnce sync.Once
closed chan struct{} // ready when driver is Closed
url string // we were opened via this
}
......@@ -174,7 +177,13 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) {
// at0 is initialized - ok to send current event if it goes > at0
if tid > z.at0 {
z.watchq <- event
select {
case <-z.closed:
// closed - client does not read watchq anymore
case z.watchq <- event:
// ok
}
}
return nil
}
......@@ -188,7 +197,13 @@ func (z *zeo) flushEventq0() {
if z.watchq != nil {
for _, e := range z.eventq0 {
if e.Tid > z.at0 {
z.watchq <- e
select {
case <-z.closed:
// closed - client does not read watchq anymore
case z.watchq <- e:
// ok
}
}
}
}
......@@ -264,7 +279,7 @@ func (r rpc) call(ctx context.Context, argv ...interface{}) (interface{}, error)
// excError returns error corresponding to an exception.
//
// well-known exceptions are mapped to corresponding well-known errors - e.g.
// POSKeyError -> zodb.NoObjectError, and rest are returned wrapper into rpcExcept.
// POSKeyError -> zodb.NoObjectError, and rest are returned wrapped into rpcExcept.
func (r rpc) excError(exc string, argv tuple) error {
// translate well-known exceptions
switch exc {
......@@ -438,7 +453,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
}()
z := &zeo{link: zlink, watchq: opt.Watchq, url: url}
z := &zeo{link: zlink, watchq: opt.Watchq, closed: make(chan struct{}), url: url}
// start serve loop on the link
z.serveWG.Add(1)
......@@ -456,14 +471,18 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// close .watchq after serve is over
z.at0Mu.Lock()
defer z.at0Mu.Unlock()
if z.at0Initialized {
z.flushEventq0()
}
if z.watchq != nil {
if err != nil {
z.watchq <- &zodb.EventError{Err: err}
if err != nil && /* already flushed .eventq0 */z.at0Initialized {
select {
case <-z.closed:
// closed - client does not read watchq anymore
case z.watchq <- &zodb.EventError{Err: err}:
// ok
}
}
close(z.watchq)
z.watchq = nil // prevent flushEventq0 to send to closed chan
}
}()
......@@ -496,13 +515,16 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// "invalidateTransaction" server notification.
//
// filter-out first < at0 messages for this reason.
//
// do this in separate task not to deadlock in watchq<- : we did not
// yet returned z to caller and so noone might be yet reading from watchq.
go func() {
z.at0Mu.Lock()
z.at0 = lastTid
z.at0Initialized = true
z.flushEventq0()
z.at0Mu.Unlock()
}()
//call('get_info') -> {}str->str, ex // XXX can be omitted
/*
......@@ -520,11 +542,15 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
'supports_record_iternext': True})
*/
return z, z.at0, nil
return z, lastTid, nil
}
func (z *zeo) Close() error {
err := z.link.Close()
var err error
z.closeOnce.Do(func() {
close(z.closed)
err = z.link.Close()
})
z.serveWG.Wait()
return err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment