Commit 0a6ac0f4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cb68775a
...@@ -123,8 +123,8 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -123,8 +123,8 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
// XXX stor.δtail - init with (at0, at] // XXX stor.δtail - init with (at0, at]
stor := &storage{ stor := &storage{
IStorageDriver: storDriver, driver: storDriver,
l1cache: cache, l1cache: cache,
drvWatchq: drvWatchq, drvWatchq: drvWatchq,
drvHead: at0, drvHead: at0,
...@@ -144,19 +144,37 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -144,19 +144,37 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
// it provides a small cache on top of raw storage driver to implement prefetch // it provides a small cache on top of raw storage driver to implement prefetch
// and other storage-independed higher-level functionality. // and other storage-independed higher-level functionality.
type storage struct { type storage struct {
IStorageDriver driver IStorageDriver
l1cache *Cache // can be =nil, if opened with NoCache l1cache *Cache // can be =nil, if opened with NoCache
down chan struct // ready when no longer operational
downErr error // reason for shutdown
// watcher // watcher
drvWatchq chan Event // watchq passed to driver drvWatchq chan Event // watchq passed to driver
drvHead Tid // last tid received from drvWatchq drvHead Tid // last tid received from drvWatchq
watchReq chan watchRequest // {Add,Del}Watch requests go here watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan<- Event]struct{} // registered watchers watchTab map[chan<- Event]struct{} // registered watchers
// when watcher is closed (.down is ready) {Add,Del}Watch operate directly
// on .watchTab and interact with each other directly. In that mode:
watchMu sync.Mutex // for watchTab and * below
watchCancel map[chan<- Event]struct{} // DelWatch can cancel AddWatch via here
} }
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
// XXX Close - stop watching? (driver will close watchq in its own Close) // this go directly to driver
func (s *storage) URL() string { return s.driver.URL() }
func (s *storage) Iterate(ctx context.Context, tidMin, tidMax Tid) ITxnIterator {
return s.driver.Iterate(ctx, tidMin, tidMax)
}
func (s *storage) Close() error {
// XXX Close - stop watching? (driver will close watchq in its own Close)
return s.driver.Shutdown(fmt.Errorf("closed"))
}
// XXX LastTid - report only LastTid for which cache is ready? // XXX LastTid - report only LastTid for which cache is ready?
// or driver.LastTid(), then wait cache is ready? // or driver.LastTid(), then wait cache is ready?
...@@ -164,10 +182,11 @@ type storage struct { ...@@ -164,10 +182,11 @@ type storage struct {
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) { func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX here: offload xid validation from cache and driver ? // XXX here: offload xid validation from cache and driver ?
// XXX here: offload wrapping err -> OpError{"load", err} ? // XXX here: offload wrapping err -> OpError{"load", err} ?
// XXX wait xid.At <= .Head ?
if s.l1cache != nil { if s.l1cache != nil {
return s.l1cache.Load(ctx, xid) return s.l1cache.Load(ctx, xid)
} else { } else {
return s.IStorageDriver.Load(ctx, xid) return s.driver.Load(ctx, xid)
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment