Commit 2e79e3ba authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8129fa8f
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package zodb package zodb
// open storages by URL // IStorage wrapper + open storage by URL
import ( import (
"context" "context"
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"sync" "sync"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xcontext"
) )
// OpenOptions describes options for OpenStorage. // OpenOptions describes options for OpenStorage.
...@@ -149,6 +150,7 @@ type storage struct { ...@@ -149,6 +150,7 @@ type storage struct {
l1cache *Cache // can be =nil, if opened with NoCache l1cache *Cache // can be =nil, if opened with NoCache
down chan struct{} // ready when no longer operational down chan struct{} // ready when no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
downErr error // reason for shutdown downErr error // reason for shutdown
// watcher // watcher
...@@ -163,31 +165,49 @@ type storage struct { ...@@ -163,31 +165,49 @@ type storage struct {
watchCancel map[chan<- Event]chan struct{} // DelWatch can cancel AddWatch via here watchCancel map[chan<- Event]chan struct{} // DelWatch can cancel AddWatch via here
} }
// loading goes through cache - this way prefetching can work
// this go directly to driver
func (s *storage) URL() string { return s.driver.URL() } func (s *storage) URL() string { return s.driver.URL() }
func (s *storage) shutdown(reason error) {
s.downOnce.Do(func() {
close(s.down)
s.downErr = fmt.Errorf("not operational due: %s", reason)
})
}
func (s *storage) Iterate(ctx context.Context, tidMin, tidMax Tid) ITxnIterator { func (s *storage) Iterate(ctx context.Context, tidMin, tidMax Tid) ITxnIterator {
// XXX downErr // XXX better -> xcontext.Merge(ctx, s.opCtx)
ctx, cancel := xcontext.MergeChan(ctx, s.down)
defer cancel()
return s.driver.Iterate(ctx, tidMin, tidMax) return s.driver.Iterate(ctx, tidMin, tidMax)
} }
func (s *storage) Close() error { func (s *storage) Close() error {
// XXX Close - stop watching? (driver will close watchq in its own Close) s.shutdown(fmt.Errorf("closed"))
//return s.driver.Shutdown(fmt.Errorf("closed")) return s.driver.Close() // this will close drvWatchq and cause watcher stop
// XXX downErr
return s.driver.Close()
} }
// loading goes through cache - this way prefetching can work
func (s *storage) LastTid(ctx context.Context) (Tid, error) { func (s *storage) LastTid(ctx context.Context) (Tid, error) {
// XXX LastTid - report only LastTid for which cache is ready? // XXX LastTid - report only LastTid for which cache is ready?
// or driver.LastTid(), then wait cache is ready? // or driver.LastTid(), then wait cache is ready?
// XXX downErr
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) {
return InvalidTid, s.zerr("last_tid", nil, s.downErr)
}
return s.driver.LastTid(ctx) return s.driver.LastTid(ctx)
} }
// Load implements Loader. // Load implements Loader.
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) { func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) {
return nil, InvalidTid, s.zerr("load", xid, s.downErr)
}
// XXX here: offload xid validation from cache and driver ? // XXX here: offload xid validation from cache and driver ?
// XXX here: offload wrapping err -> OpError{"load", err} ? // XXX here: offload wrapping err -> OpError{"load", err} ?
// XXX wait xid.At <= .Head ? // XXX wait xid.At <= .Head ?
...@@ -205,7 +225,7 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) { ...@@ -205,7 +225,7 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) {
} }
} }
// watcher // ---- watcher ----
// FIXME tests // FIXME tests
// watchRequest represents request to add/del a watch. // watchRequest represents request to add/del a watch.
...@@ -225,6 +245,11 @@ const ( ...@@ -225,6 +245,11 @@ const (
// watcher dispatches events from driver to subscribers and serves // watcher dispatches events from driver to subscribers and serves
// {Add,Del}Watch requests. // {Add,Del}Watch requests.
func (s *storage) watcher() { func (s *storage) watcher() {
err := s._watcher()
s.shutdown(err)
}
func (s *storage) _watcher() error {
// staging place for AddWatch requests. // staging place for AddWatch requests.
// //
// during event delivery to registered watchqs, add/del requests are // during event delivery to registered watchqs, add/del requests are
...@@ -265,8 +290,7 @@ func (s *storage) watcher() { ...@@ -265,8 +290,7 @@ func (s *storage) watcher() {
req.ack <- s.drvHead req.ack <- s.drvHead
} }
// close all subscribers's watchq on close // close all subscribers's watchq on watcher shutdow
// XXX AddWatch/DelWatch after watcher exits?
defer func() { defer func() {
addqFlush() addqFlush()
for watchq := range s.watchTab { for watchq := range s.watchTab {
...@@ -277,9 +301,7 @@ func (s *storage) watcher() { ...@@ -277,9 +301,7 @@ func (s *storage) watcher() {
var errDown error var errDown error
for { for {
if errDown != nil { if errDown != nil {
// XXX stop storage with errDown return errDown
//return errDown
return
} }
addqFlush() // register staged AddWatch(s) addqFlush() // register staged AddWatch(s)
...@@ -291,7 +313,7 @@ func (s *storage) watcher() { ...@@ -291,7 +313,7 @@ func (s *storage) watcher() {
case event, ok := <-s.drvWatchq: case event, ok := <-s.drvWatchq:
if !ok { if !ok {
// storage closed // storage closed
return return nil
} }
switch e := event.(type) { switch e := event.(type) {
...@@ -405,3 +427,23 @@ func (s *storage) DelWatch(watchq chan<- Event) { ...@@ -405,3 +427,23 @@ func (s *storage) DelWatch(watchq chan<- Event) {
<-ack <-ack
} }
} }
// ---- misc ----
// zerr turns err into OpError about s.op(args)
func (s *storage) zerr(op string, args interface{}, err error) *OpError {
return &OpError{URL: s.URL(), Op: op, Args: args, Err: err}
}
// ready returns whether channel is ready.
//
// it should be used only on channels that are intended to be closed.
func ready(ch chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment