Commit 890384bb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 86cf093c
...@@ -160,6 +160,7 @@ type storage struct { ...@@ -160,6 +160,7 @@ type storage struct {
// XXX LastTid - report only LastTid for which cache is ready? // XXX LastTid - report only LastTid for which cache is ready?
// or driver.LastTid(), then wait cache is ready? // or driver.LastTid(), then wait cache is ready?
// Load implements Loader.
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) { func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX here: offload xid validation from cache and driver ? // XXX here: offload xid validation from cache and driver ?
// XXX here: offload wrapping err -> OpError{"load", err} ? // XXX here: offload wrapping err -> OpError{"load", err} ?
...@@ -170,6 +171,7 @@ func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) { ...@@ -170,6 +171,7 @@ func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
} }
} }
// Prefetch implements Prefetcher.
func (s *storage) Prefetch(ctx context.Context, xid Xid) { func (s *storage) Prefetch(ctx context.Context, xid Xid) {
if s.l1cache != nil { if s.l1cache != nil {
s.l1cache.Prefetch(ctx, xid) s.l1cache.Prefetch(ctx, xid)
...@@ -202,8 +204,8 @@ func (s *storage) watcher() { ...@@ -202,8 +204,8 @@ func (s *storage) watcher() {
// also served - not to get stuck and support clients who do DelWatch // also served - not to get stuck and support clients who do DelWatch
// and no longer receive from their watchq. However we cannot register // and no longer receive from their watchq. However we cannot register
// added watchq immediately, because it is undefined whether or not // added watchq immediately, because it is undefined whether or not
// we'll see it while iterating watchTab. So we queue what was added // we'll see it while iterating watchTab map. So we queue what was
// and flush it on the beginning of each cycle. // added and flush it to watchTab on the beginning of each cycle.
var addq map[chan<- Event]struct{} var addq map[chan<- Event]struct{}
addqFlush := func() { addqFlush := func() {
for watchq := range addq { for watchq := range addq {
...@@ -211,7 +213,7 @@ func (s *storage) watcher() { ...@@ -211,7 +213,7 @@ func (s *storage) watcher() {
} }
addq = make(map[chan<- Event]struct{}) addq = make(map[chan<- Event]struct{})
} }
handleReq := func(req watchRequest) { serveReq := func(req watchRequest) {
switch req.op { switch req.op {
case addWatch: case addWatch:
addq[req.watchq] = struct{}{} addq[req.watchq] = struct{}{}
...@@ -248,7 +250,7 @@ func (s *storage) watcher() { ...@@ -248,7 +250,7 @@ func (s *storage) watcher() {
select { select {
case req := <-s.watchReq: case req := <-s.watchReq:
handleReq(req) serveReq(req)
case event, ok := <-s.drvWatchq: case event, ok := <-s.drvWatchq:
if !ok { if !ok {
...@@ -280,7 +282,7 @@ func (s *storage) watcher() { ...@@ -280,7 +282,7 @@ func (s *storage) watcher() {
for watchq := range s.watchTab { for watchq := range s.watchTab {
select { select {
case req := <-s.watchReq: case req := <-s.watchReq:
handleReq(req) serveReq(req)
case watchq <- event: case watchq <- event:
// ok // ok
...@@ -292,7 +294,7 @@ func (s *storage) watcher() { ...@@ -292,7 +294,7 @@ func (s *storage) watcher() {
// AddWatch implements Watcher. // AddWatch implements Watcher.
func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) { func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
// XXX when already Closed? // XXX when already Closed? -> `go watchq <- .downErr + close(watchq)`
ack := make(chan Tid) ack := make(chan Tid)
s.watchReq <- watchRequest{addWatch, ack, watchq} s.watchReq <- watchRequest{addWatch, ack, watchq}
return <-ack return <-ack
...@@ -300,7 +302,7 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) { ...@@ -300,7 +302,7 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
// DelWatch implements Watcher. // DelWatch implements Watcher.
func (s *storage) DelWatch(watchq chan<- Event) { func (s *storage) DelWatch(watchq chan<- Event) {
// XXX when already Closed? // XXX when already Closed? -> noop
ack := make(chan Tid) ack := make(chan Tid)
s.watchReq <- watchRequest{delWatch, ack, watchq} s.watchReq <- watchRequest{delWatch, ack, watchq}
<-ack <-ack
......
...@@ -441,7 +441,7 @@ type Event interface { ...@@ -441,7 +441,7 @@ type Event interface {
func (_ *EventError) event() {} func (_ *EventError) event() {}
func (_ *EventCommit) event() {} func (_ *EventCommit) event() {}
// EventError is event descrbing an error observed by watcher. // EventError is event describing an error observed by watcher.
type EventError struct { type EventError struct {
Err error Err error
} }
...@@ -467,7 +467,7 @@ type Watcher interface { ...@@ -467,7 +467,7 @@ type Watcher interface {
// sent, where at₀ is database head that was current when AddWatch call // sent, where at₀ is database head that was current when AddWatch call
// was made. // was made.
// //
// Once registered, watchq must be read untill it is closed or until // Once registered, watchq must be read until it is closed or until
// DelWatch call. Not doing so will stuck whole storage. // DelWatch call. Not doing so will stuck whole storage.
// //
// Registered watchq are closed when the database storage is closed. // Registered watchq are closed when the database storage is closed.
...@@ -478,7 +478,7 @@ type Watcher interface { ...@@ -478,7 +478,7 @@ type Watcher interface {
// DelWatch unregisters watchq from being notified of database changes. // DelWatch unregisters watchq from being notified of database changes.
// //
// After DelWatch call completes, no new events will be sent to watchq. // After DelWatch call completes, no new events will be sent to watchq.
// It is safe to call DelWatch without sumultaneously reading watchq. // It is safe to call DelWatch without simultaneously reading watchq.
// In particular the following example is valid: // In particular the following example is valid:
// //
// at0 := stor.AddWatch(watchq) // at0 := stor.AddWatch(watchq)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment