Commit ef38617b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0a6ac0f4
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"log" "log"
"net/url" "net/url"
"strings" "strings"
"sync"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
) )
...@@ -147,7 +148,7 @@ type storage struct { ...@@ -147,7 +148,7 @@ type storage struct {
driver IStorageDriver driver IStorageDriver
l1cache *Cache // can be =nil, if opened with NoCache l1cache *Cache // can be =nil, if opened with NoCache
down chan struct // ready when no longer operational down chan struct{} // ready when no longer operational
downErr error // reason for shutdown downErr error // reason for shutdown
// watcher // watcher
...@@ -158,8 +159,8 @@ type storage struct { ...@@ -158,8 +159,8 @@ type storage struct {
// when watcher is closed (.down is ready) {Add,Del}Watch operate directly // when watcher is closed (.down is ready) {Add,Del}Watch operate directly
// on .watchTab and interact with each other directly. In that mode: // on .watchTab and interact with each other directly. In that mode:
watchMu sync.Mutex // for watchTab and * below watchMu sync.Mutex // for watchTab and * below
watchCancel map[chan<- Event]struct{} // DelWatch can cancel AddWatch via here watchCancel map[chan<- Event]chan struct{} // DelWatch can cancel AddWatch via here
} }
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
...@@ -235,6 +236,15 @@ func (s *storage) watcher() { ...@@ -235,6 +236,15 @@ func (s *storage) watcher() {
serveReq := func(req watchRequest) { serveReq := func(req watchRequest) {
switch req.op { switch req.op {
case addWatch: case addWatch:
_, already := s.watchTab[req.watchq]
if !already {
_, already = addq[req.watchq]
}
if already {
req.ack <- InvalidTid
return
}
addq[req.watchq] = struct{}{} addq[req.watchq] = struct{}{}
case delWatch: case delWatch:
...@@ -300,7 +310,7 @@ func (s *storage) watcher() { ...@@ -300,7 +310,7 @@ func (s *storage) watcher() {
// deliver event to all watchers. // deliver event to all watchers.
// handle add/del watchq in the process. // handle add/del watchq in the process.
deliver: next:
for watchq := range s.watchTab { for watchq := range s.watchTab {
for { for {
select { select {
...@@ -310,12 +320,12 @@ func (s *storage) watcher() { ...@@ -310,12 +320,12 @@ func (s *storage) watcher() {
// else try sending to current watchq once again. // else try sending to current watchq once again.
_, present := s.watchTab[watchq] _, present := s.watchTab[watchq]
if !present { if !present {
continue deliver continue next
} }
case watchq <- event: case watchq <- event:
// ok // ok
continue deliver continue next
} }
} }
} }
...@@ -330,11 +340,13 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) { ...@@ -330,11 +340,13 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
// no longer operational: behave if watchq was registered before that // no longer operational: behave if watchq was registered before that
// and then seen down/close events. Interact with DelWatch directly. // and then seen down/close events. Interact with DelWatch directly.
case <-s.down: case <-s.down:
at0 = s.drvHead
s.watchMu.Lock() s.watchMu.Lock()
_, already := s.watchTab[watchq] _, already := s.watchTab[watchq]
if already { if already {
s.watchMu.Unlock() s.watchMu.Unlock()
return // multiple AddWatch panic("multiple AddWatch with the same channel")
} }
s.watchTab[watchq] = struct{}{} s.watchTab[watchq] = struct{}{}
cancel := make(chan struct{}) cancel := make(chan struct{})
...@@ -353,11 +365,16 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) { ...@@ -353,11 +365,16 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
} }
close(watchq) close(watchq)
}() }()
return s.drvHead
return at0
// operational - interact with watcher // operational - interact with watcher
case s.watchReq <- watchRequest{addWatch, ack, watchq}: case s.watchReq <- watchRequest{addWatch, ack, watchq}:
return <-ack at0 = <-ack
if at0 == InvalidTid {
panic("multiple AddWatch with the same channel")
}
return at0
} }
} }
......
...@@ -472,7 +472,9 @@ type Watcher interface { ...@@ -472,7 +472,9 @@ type Watcher interface {
// //
// Registered watchq are closed when the database storage is closed. // Registered watchq are closed when the database storage is closed.
// //
// Multiple AddWatch calls with the same watchq register watchq only once. // It is safe to add watch to a closed database storage.
//
// AddWatch must be used only once for a particular watchq channel.
AddWatch(watchq chan<- Event) (at0 Tid) AddWatch(watchq chan<- Event) (at0 Tid)
// DelWatch unregisters watchq from being notified of database changes. // DelWatch unregisters watchq from being notified of database changes.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment