Commit f9f4b93d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a4be5d2f
...@@ -46,7 +46,7 @@ type DriverOptions struct { ...@@ -46,7 +46,7 @@ type DriverOptions struct {
// it can get out of sync with the on-disk database file. // it can get out of sync with the on-disk database file.
// //
// The storage driver closes !nil Watchq when the driver is closed. // The storage driver closes !nil Watchq when the driver is closed.
Watchq chan<- WatchEvent Watchq chan<- CommitEvent
} }
// DriverOpener is a function to open a storage driver. // DriverOpener is a function to open a storage driver.
...@@ -91,7 +91,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -91,7 +91,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme) return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme)
} }
drvWatchq := make(chan WatchEvent) drvWatchq := make(chan CommitEvent)
drvOpt := &DriverOptions{ drvOpt := &DriverOptions{
ReadOnly: opt.ReadOnly, ReadOnly: opt.ReadOnly,
Watchq: drvWatchq, Watchq: drvWatchq,
...@@ -115,7 +115,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -115,7 +115,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
drvWatchq: drvWatchq, drvWatchq: drvWatchq,
watchReq: make(chan watchRequest), watchReq: make(chan watchRequest),
watchTab: make(map[chan WatchEvent]struct{}), watchTab: make(map[chan CommitEvent]struct{}),
}, nil }, nil
} }
...@@ -131,9 +131,9 @@ type storage struct { ...@@ -131,9 +131,9 @@ type storage struct {
l1cache *Cache // can be =nil, if opened with NoCache l1cache *Cache // can be =nil, if opened with NoCache
// watcher // watcher
drvWatchq chan WatchEvent // watchq passed to driver drvWatchq chan CommitEvent // watchq passed to driver
watchReq chan watchRequest // {Add,Del}Watch requests go here watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan WatchEvent]struct{} // registered watchers watchTab map[chan CommitEvent]struct{} // registered watchers
} }
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
...@@ -164,7 +164,7 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) { ...@@ -164,7 +164,7 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) {
type watchRequest struct { type watchRequest struct {
op watchOp // add or del op watchOp // add or del
ack chan struct{} // when request processed ack chan struct{} // when request processed
watchq chan WatchEvent // {Add,Del}Watch argument watchq chan CommitEvent // {Add,Del}Watch argument
} }
type watchOp int type watchOp int
...@@ -204,14 +204,16 @@ func (s *storage) watcher() { ...@@ -204,14 +204,16 @@ func (s *storage) watcher() {
} }
} }
func (s *storage) AddWatch(watchq chan WatchEvent) { // AddWatch implements Watcher.
func (s *storage) AddWatch(watchq chan CommitEvent) {
// XXX when already Closed? // XXX when already Closed?
ack := make(chan struct{}) ack := make(chan struct{})
s.watchReq <- watchRequest{addWatch, ack, watchq} s.watchReq <- watchRequest{addWatch, ack, watchq}
<-ack <-ack
} }
func (s *storage) DelWatch(watchq chan WatchEvent) { // DelWatch implements Watcher.
func (s *storage) DelWatch(watchq chan CommitEvent) {
// XXX when already Closed? // XXX when already Closed?
ack := make(chan struct{}) ack := make(chan struct{})
s.watchReq <- watchRequest{delWatch, ack, watchq} s.watchReq <- watchRequest{delWatch, ack, watchq}
......
...@@ -99,7 +99,7 @@ type FileStorage struct { ...@@ -99,7 +99,7 @@ type FileStorage struct {
downErr error // !nil when the storage is no longer operational downErr error // !nil when the storage is no longer operational
// driver client <- watcher: database commits. // driver client <- watcher: database commits.
watchq chan<- zodb.WatchEvent watchq chan<- zodb.CommitEvent
down chan struct{} // ready when storage is no longer operational down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
...@@ -668,7 +668,7 @@ mainloop: ...@@ -668,7 +668,7 @@ mainloop:
case <-fs.down: case <-fs.down:
return nil return nil
case fs.watchq <- zodb.WatchEvent{it.Txnh.Tid, oidv}: case fs.watchq <- zodb.CommitEvent{it.Txnh.Tid, oidv}:
// ok // ok
} }
} }
......
...@@ -410,7 +410,7 @@ func TestWatch(t *testing.T) { ...@@ -410,7 +410,7 @@ func TestWatch(t *testing.T) {
// force tfs creation & open tfs at go side // force tfs creation & open tfs at go side
at := xcommit(0, Object{0, "data0"}) at := xcommit(0, Object{0, "data0"})
watchq := make(chan zodb.WatchEvent) watchq := make(chan zodb.CommitEvent)
fs := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq}) fs := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq})
ctx := context.Background() ctx := context.Background()
......
...@@ -429,8 +429,8 @@ type Committer interface { ...@@ -429,8 +429,8 @@ type Committer interface {
} }
// WatchEvent is one event describing observed database change. // CommitEvent is event describing one observed database commit.
type WatchEvent struct { type CommitEvent struct {
Tid Tid Tid Tid
Changev []Oid // XXX name Changev []Oid // XXX name
} }
...@@ -447,14 +447,14 @@ type Watcher interface { ...@@ -447,14 +447,14 @@ type Watcher interface {
// Once registered, watchq must be read. Not doing so will stuck whole storage. // Once registered, watchq must be read. Not doing so will stuck whole storage.
// //
// Multiple AddWatch calls with the same watchq register watchq only once. // Multiple AddWatch calls with the same watchq register watchq only once.
AddWatch(watchq chan WatchEvent) AddWatch(watchq chan CommitEvent)
// DelWatch unregisters watchq to be notified of database changes. // DelWatch unregisters watchq to be notified of database changes.
// //
// After DelWatch call completes, no new events will be sent to watchq. // After DelWatch call completes, no new events will be sent to watchq.
// //
// DelWatch is noop if watchq was not registered. // DelWatch is noop if watchq was not registered.
DelWatch(watchq chan WatchEvent) DelWatch(watchq chan CommitEvent)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment