Commit 2dad1860 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f515f4b0
......@@ -196,27 +196,56 @@ const (
// watcher dispatches events from driver to subscribers and serves
// {Add,Del}Watch requests.
func (s *storage) watcher() {
for {
select {
case req := <-s.watchReq:
// staging place for AddWatch requests.
//
// during event delivery to registered watchqs, add/del requests are
// also served - not to get stuck and support clients who do DelWatch
// and no longer receive from their watchq. However we cannot register
// added watchq immediately, because it is undefined whether or not
// we'll see it while iterating watchTab. So we queue what was added
// and flush it on the beginning of each cycle.
var addq map[chan<- Event]struct{}
addqFlush := func() {
for watchq := range addq {
s.watchTab[watchq] = struct{}{}
}
addq = make(map[chan<- Event]struct{})
}
handleReq := func(req watchRequest) {
switch req.op {
case addWatch:
s.watchTab[req.watchq] = struct{}{}
addq[req.watchq] = struct{}{}
case delWatch:
delete(s.watchTab, req.watchq)
delete(addq, req.watchq)
default:
panic("bad watch request op")
}
req.ack <- s.drvHead
}
// close all subscribers's watchq on close
// XXX AddWatch/DelWatch after watcher exits?
defer func() {
addqFlush()
for watchq := range s.watchTab {
close(watchq)
}
}()
for {
addqFlush() // register staged AddWatch(s)
select {
case req := <-s.watchReq:
handleReq(req)
case event, ok := <-s.drvWatchq:
if !ok {
// storage closed
// XXX close all subscribers' watchq?
// XXX AddWatch/DelWatch after watcher exits?
return
}
......@@ -235,8 +264,13 @@ func (s *storage) watcher() {
// deliver event to all watchers
for watchq := range s.watchTab {
// XXX + select and handle DelWatch
watchq <- event
select {
case req := <-s.watchReq:
handleReq(req)
case watchq <- event:
// ok
}
}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment