Commit 08b011f5 authored by Kirill Smelkov's avatar Kirill Smelkov Committed by Levin Zimmermann

wcfs: Rework WatchLink.serve exit codepath for better clarity

Bring in more structure:

- final watchlink cleanup is done in its own block
- cancelling spawned handlers is done in another block
- add more comments explaining things

/reviewed-by @levin.zimmermann
/reviewed-on nexedi/wendelin.core!18
parent c7c3b82a
...@@ -1872,11 +1872,6 @@ func (wlink *WatchLink) serve() { ...@@ -1872,11 +1872,6 @@ func (wlink *WatchLink) serve() {
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
head := wlink.head
head.wlinkMu.Lock()
delete(head.wlinkTab, wlink)
head.wlinkMu.Unlock()
} }
func (wlink *WatchLink) _serve() (err error) { func (wlink *WatchLink) _serve() (err error) {
...@@ -1885,20 +1880,9 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1885,20 +1880,9 @@ func (wlink *WatchLink) _serve() (err error) {
ctx0 := context.TODO() // TODO ctx = merge(ctx of wcfs running, ctx of wlink timeout) ctx0 := context.TODO() // TODO ctx = merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0) ctx, cancel := context.WithCancel(ctx0)
wg := xsync.NewWorkGroup(ctx)
// final watchlink cleanup is done on serve exit
defer func() { defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
// sends "bye" and some pin handlers are in progress - they
// anyway don't need to wait for client replies anymore )
cancel()
err2 := wg.Wait()
if err == nil {
err = err2
}
// unregister all watches created on this wlink // unregister all watches created on this wlink
wlink.byfileMu.Lock() wlink.byfileMu.Lock()
for _, w := range wlink.byfile { for _, w := range wlink.byfile {
...@@ -1909,6 +1893,12 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1909,6 +1893,12 @@ func (wlink *WatchLink) _serve() (err error) {
wlink.byfile = nil wlink.byfile = nil
wlink.byfileMu.Unlock() wlink.byfileMu.Unlock()
// unregister wlink itself
head := wlink.head
head.wlinkMu.Lock()
delete(head.wlinkTab, wlink)
head.wlinkMu.Unlock()
// write to peer if it was logical error on client side // write to peer if it was logical error on client side
if err != nil { if err != nil {
_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err)) _ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
...@@ -1916,12 +1906,33 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1916,12 +1906,33 @@ func (wlink *WatchLink) _serve() (err error) {
// close .sk // close .sk
// closing .sk.tx wakes up rx on client side. // closing .sk.tx wakes up rx on client side.
err2 = wlink.sk.Close() err2 := wlink.sk.Close()
if err == nil { if err == nil {
err = err2 err = err2
} }
}() }()
// watch handlers are spawned in dedicated workgroup
//
// Pin handlers are run either inside - for pins run from setupWatch, or,
// for pins run from readPinWatchers, outside.
// Upon serve exit we cancel watch and pin handlers ran inside and wait for their completion.
wg := xsync.NewWorkGroup(ctx)
defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
// sends "bye" and some pin handlers are in progress - they
// anyway don't need to wait for client replies anymore )
cancel()
// wait for setupWatch and pin handlers spawned from it to complete
err2 := wg.Wait()
if err == nil {
err = err2
}
}()
// cancel main thread on any watch handler error // cancel main thread on any watch handler error
ctx, mainCancel := context.WithCancel(ctx) ctx, mainCancel := context.WithCancel(ctx)
defer mainCancel() defer mainCancel()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment