Commit 2d12146a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a910d553
...@@ -117,14 +117,15 @@ func NewDB(stor IStorage) *DB { ...@@ -117,14 +117,15 @@ func NewDB(stor IStorage) *DB {
// XXX db options? // XXX db options?
db := &DB{ db := &DB{
stor: stor, stor: stor,
δtail: NewΔTail(),
δwait: make(map[δwaiter]struct{}), δwait: make(map[δwaiter]struct{}),
tδkeep: 10*time.Minute, // see δtail discussion tδkeep: 10*time.Minute, // see δtail discussion
} }
watchq := make(chan CommitEvent) watchq := make(chan CommitEvent)
stor.AddWatch(watchq) at0 := stor.AddWatch(watchq)
db.δtail = NewΔTail(at0) // init δtail to (at0, at0]
go db.watcher(watchq) go db.watcher(watchq)
// XXX DelWatch? in db.Close() ? // XXX DelWatch? in db.Close() ?
......
...@@ -80,13 +80,13 @@ func RegisterDriver(scheme string, opener DriverOpener) { ...@@ -80,13 +80,13 @@ func RegisterDriver(scheme string, opener DriverOpener) {
// get support for well-known storages. // get support for well-known storages.
// //
// Storage authors should register their storages with RegisterStorage. // Storage authors should register their storages with RegisterStorage.
func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (IStorage, error) { func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, error) {
// no scheme -> file:// // no scheme -> file://
if !strings.Contains(storageURL, "://") { if !strings.Contains(zurl, "://") {
storageURL = "file://" + storageURL zurl = "file://" + zurl
} }
u, err := url.Parse(storageURL) u, err := url.Parse(zurl)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -119,17 +119,17 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -119,17 +119,17 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
// FIXME teach cache for watching and remove vvv // FIXME teach cache for watching and remove vvv
log.Printf("zodb: FIXME: open %s: cache is not ready for invalidations" + log.Printf("zodb: FIXME: open %s: cache is not ready for invalidations" +
" -> NoCache forced", storageURL) " -> NoCache forced", zurl)
cache = nil cache = nil
} }
_ = at0
// XXX stor.δtail - init with (at0, at] // XXX stor.δtail - init with (at0, at]
stor := &storage{ stor := &storage{
IStorageDriver: storDriver, IStorageDriver: storDriver,
l1cache: cache, l1cache: cache,
drvWatchq: drvWatchq, drvWatchq: drvWatchq,
drvHead: at0,
watchReq: make(chan watchRequest), watchReq: make(chan watchRequest),
watchTab: make(map[chan<- CommitEvent]struct{}), watchTab: make(map[chan<- CommitEvent]struct{}),
...@@ -151,6 +151,7 @@ type storage struct { ...@@ -151,6 +151,7 @@ type storage struct {
// watcher // watcher
drvWatchq chan CommitEvent // watchq passed to driver drvWatchq chan CommitEvent // watchq passed to driver
drvHead Tid // last tid received from drvWatchq
watchReq chan watchRequest // {Add,Del}Watch requests go here watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan<- CommitEvent]struct{} // registered watchers watchTab map[chan<- CommitEvent]struct{} // registered watchers
} }
...@@ -183,7 +184,7 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) { ...@@ -183,7 +184,7 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) {
// watchRequest represents request to add/del a watch. // watchRequest represents request to add/del a watch.
type watchRequest struct { type watchRequest struct {
op watchOp // add or del op watchOp // add or del
ack chan struct{} // when request processed ack chan Tid // when request processed: at0 for add, ø for del.
watchq chan<- CommitEvent // {Add,Del}Watch argument watchq chan<- CommitEvent // {Add,Del}Watch argument
} }
...@@ -209,7 +210,7 @@ func (s *storage) watcher() { ...@@ -209,7 +210,7 @@ func (s *storage) watcher() {
panic("bad watch request op") panic("bad watch request op")
} }
close(req.ack) req.ack <- s.drvHead
case event, ok := <-s.drvWatchq: case event, ok := <-s.drvWatchq:
if !ok { if !ok {
...@@ -221,6 +222,7 @@ func (s *storage) watcher() { ...@@ -221,6 +222,7 @@ func (s *storage) watcher() {
// XXX verify event.Tid ↑ (else e.g. δtail.Append will panic) // XXX verify event.Tid ↑ (else e.g. δtail.Append will panic)
// if !↑ - stop the storage with error. // if !↑ - stop the storage with error.
s.drvHead = event.Tid
// deliver event to all watchers // deliver event to all watchers
for watchq := range s.watchTab { for watchq := range s.watchTab {
...@@ -231,17 +233,17 @@ func (s *storage) watcher() { ...@@ -231,17 +233,17 @@ func (s *storage) watcher() {
} }
// AddWatch implements Watcher. // AddWatch implements Watcher.
func (s *storage) AddWatch(watchq chan<- CommitEvent) { func (s *storage) AddWatch(watchq chan<- CommitEvent) (at0 Tid) {
// XXX when already Closed? // XXX when already Closed?
ack := make(chan struct{}) ack := make(chan Tid)
s.watchReq <- watchRequest{addWatch, ack, watchq} s.watchReq <- watchRequest{addWatch, ack, watchq}
<-ack return <-ack
} }
// DelWatch implements Watcher. // DelWatch implements Watcher.
func (s *storage) DelWatch(watchq chan<- CommitEvent) { func (s *storage) DelWatch(watchq chan<- CommitEvent) {
// XXX when already Closed? // XXX when already Closed?
ack := make(chan struct{}) ack := make(chan Tid)
s.watchReq <- watchRequest{delWatch, ack, watchq} s.watchReq <- watchRequest{delWatch, ack, watchq}
<-ack <-ack
} }
...@@ -444,18 +444,23 @@ type Watcher interface { ...@@ -444,18 +444,23 @@ type Watcher interface {
// Whenever a new transaction is committed into the database, // Whenever a new transaction is committed into the database,
// corresponding event will be sent to watchq. // corresponding event will be sent to watchq.
// //
// It will be only and all events in (at₀, +∞] range, that will be
// sent, where at₀ is database head that was current when AddWatch call was made. XXX
//
// Once registered, watchq must be read. Not doing so will stuck whole storage. // Once registered, watchq must be read. Not doing so will stuck whole storage.
// //
// Multiple AddWatch calls with the same watchq register watchq only once. // Multiple AddWatch calls with the same watchq register watchq only once. XXX
// //
// XXX watchq closed when stor.watchq closed? // XXX watchq closed when stor.watchq closed?
AddWatch(watchq chan<- CommitEvent) AddWatch(watchq chan<- CommitEvent) (at0 Tid)
// DelWatch unregisters watchq from being notified of database changes. // DelWatch unregisters watchq from being notified of database changes.
// //
// After DelWatch call completes, no new events will be sent to watchq. // After DelWatch call completes, no new events will be sent to watchq.
// //
// DelWatch is noop if watchq was not registered. // DelWatch is noop if watchq was not registered.
//
// XXX also return curent head?
DelWatch(watchq chan<- CommitEvent) DelWatch(watchq chan<- CommitEvent)
} }
......
...@@ -73,8 +73,13 @@ type δRevEntry struct { ...@@ -73,8 +73,13 @@ type δRevEntry struct {
} }
// NewΔTail creates new ΔTail object. // NewΔTail creates new ΔTail object.
func NewΔTail() *ΔTail { //
return &ΔTail{lastRevOf: make(map[Oid]Tid)} // Initial coverage of created ΔTail is (at₀, at₀].
func NewΔTail(at0 Tid) *ΔTail {
return &ΔTail{
head: at0,
lastRevOf: make(map[Oid]Tid),
}
} }
// Len returns number of revisions. // Len returns number of revisions.
...@@ -84,16 +89,14 @@ func (δtail *ΔTail) Len() int { ...@@ -84,16 +89,14 @@ func (δtail *ΔTail) Len() int {
// Head returns newest database state for which δtail has history coverage. // Head returns newest database state for which δtail has history coverage.
// //
// For newly created ΔTail Head returns 0. // Head is ↑ on Append, in particular it does not ↓ on Forget even if δtail becomes empty.
// Head is ↑, in particular it does not go back to 0 when δtail becomes empty.
func (δtail *ΔTail) Head() Tid { func (δtail *ΔTail) Head() Tid {
return δtail.head return δtail.head
} }
// Tail returns oldest database state for which δtail has history coverage. // Tail returns oldest database state for which δtail has history coverage.
// //
// For newly created ΔTail Tail returns 0. // Tail is ↑= on Forget, even if δtail becomes empty.
// Tail is ↑, in particular it does not go back to 0 when δtail becomes empty.
func (δtail *ΔTail) Tail() Tid { func (δtail *ΔTail) Tail() Tid {
if len(δtail.tailv) == 0 { if len(δtail.tailv) == 0 {
return δtail.head return δtail.head
......
...@@ -26,7 +26,7 @@ import ( ...@@ -26,7 +26,7 @@ import (
) )
func TestΔTail(t *testing.T) { func TestΔTail(t *testing.T) {
δtail := NewΔTail() δtail := NewΔTail(1)
// R is syntactic sugar to create 1 δRevEntry // R is syntactic sugar to create 1 δRevEntry
R := func(rev Tid, changev ...Oid) δRevEntry { R := func(rev Tid, changev ...Oid) δRevEntry {
...@@ -171,7 +171,7 @@ func TestΔTail(t *testing.T) { ...@@ -171,7 +171,7 @@ func TestΔTail(t *testing.T) {
} }
δCheck(0) δCheck(1)
δCheckLastUP(4, 12, 12) // δtail = ø δCheckLastUP(4, 12, 12) // δtail = ø
δAppend(R(10, 3,5)) δAppend(R(10, 3,5))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment