Commit 439f5493 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 64d3132a
...@@ -55,7 +55,7 @@ type DB struct { ...@@ -55,7 +55,7 @@ type DB struct {
δtail ΔTail // [](rev↑, []oid) δtail ΔTail // [](rev↑, []oid)
// openers waiting for δtail.Head to become covering their at. // openers waiting for δtail.Head to become covering their at.
δwait map[δwaiter]struct{} // set{(at, ready)} δwait map[δwaiter]struct{} // set{(at, ready)} XXX -> set_δwaiter?
} }
// δwaiter represents someone waiting for δtail.Head to become ≥ at. // δwaiter represents someone waiting for δtail.Head to become ≥ at.
...@@ -160,7 +160,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -160,7 +160,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at := opt.At at := opt.At
if at == 0 { if at == 0 {
head := zodb.Tid(0) head := Tid(0)
if opt.NoSync { if opt.NoSync {
// XXX locking // XXX locking
...@@ -188,18 +188,18 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -188,18 +188,18 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// wait till .δtail.head is up to date covering ≥ at // wait till .δtail.head is up to date covering ≥ at
var δready chan struct{} var δready chan struct{}
db.mu.Lock() db.mu.Lock()
δhead := δtail.Head() δhead := db.δtail.Head()
// XXX prevent head from going away? // XXX prevent head from going away?
if δhead < at { if δhead < at {
δready = make(chan struct{}) δready = make(chan struct{})
db.δwait[δwaiter{at, δready}] = struct{} db.δwait[δwaiter{at, δready}] = struct{}{}
} }
db.mu.Unlock() db.mu.Unlock()
if δready != nil { if δready != nil {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return nil, ctx.Err()
case <-δready: case <-δready:
// ok // ok
......
...@@ -117,7 +117,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -117,7 +117,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
drvWatchq: drvWatchq, drvWatchq: drvWatchq,
watchReq: make(chan watchRequest), watchReq: make(chan watchRequest),
watchTab: make(map[chan CommitEvent]struct{}), watchTab: make(map[chan<- CommitEvent]struct{}),
} }
go stor.watcher() // XXX stop on close go stor.watcher() // XXX stop on close
...@@ -138,7 +138,7 @@ type storage struct { ...@@ -138,7 +138,7 @@ type storage struct {
// watcher // watcher
drvWatchq chan CommitEvent // watchq passed to driver drvWatchq chan CommitEvent // watchq passed to driver
watchReq chan watchRequest // {Add,Del}Watch requests go here watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan CommitEvent]struct{} // registered watchers watchTab map[chan<- CommitEvent]struct{} // registered watchers
} }
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
...@@ -170,7 +170,7 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) { ...@@ -170,7 +170,7 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) {
type watchRequest struct { type watchRequest struct {
op watchOp // add or del op watchOp // add or del
ack chan struct{} // when request processed ack chan struct{} // when request processed
watchq chan CommitEvent // {Add,Del}Watch argument watchq chan<- CommitEvent // {Add,Del}Watch argument
} }
type watchOp int type watchOp int
...@@ -217,7 +217,7 @@ func (s *storage) watcher() { ...@@ -217,7 +217,7 @@ func (s *storage) watcher() {
} }
// AddWatch implements Watcher. // AddWatch implements Watcher.
func (s *storage) AddWatch(watchq chan CommitEvent) { func (s *storage) AddWatch(watchq chan<- CommitEvent) {
// XXX when already Closed? // XXX when already Closed?
ack := make(chan struct{}) ack := make(chan struct{})
s.watchReq <- watchRequest{addWatch, ack, watchq} s.watchReq <- watchRequest{addWatch, ack, watchq}
...@@ -225,7 +225,7 @@ func (s *storage) AddWatch(watchq chan CommitEvent) { ...@@ -225,7 +225,7 @@ func (s *storage) AddWatch(watchq chan CommitEvent) {
} }
// DelWatch implements Watcher. // DelWatch implements Watcher.
func (s *storage) DelWatch(watchq chan CommitEvent) { func (s *storage) DelWatch(watchq chan<- CommitEvent) {
// XXX when already Closed? // XXX when already Closed?
ack := make(chan struct{}) ack := make(chan struct{})
s.watchReq <- watchRequest{delWatch, ack, watchq} s.watchReq <- watchRequest{delWatch, ack, watchq}
......
// Copyright (C) 2018-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zodb
import (
"fmt"
)
// ΔTail represents tail of revisional changes.
//
// It semantically consists of
//
// [](rev↑, []id)
//
// and index
//
// {} id -> max(rev: rev changed id)
//
// where
//
// rev - is ZODB revision, and
// id - is an identifier of what has been changed(*)
//
// It provides operations to
//
// - XXX Head
// - append information to the tail about next revision,
// - forget information in the tail past specified revision, and
// - query the tail about what is last revision that changed an id.
//
// ΔTail is safe to access for multiple-readers / single writer.
//
// (*) examples of id:
//
// oid - ZODB object identifier, when ΔTail represents changes to ZODB objects,
// #blk - file block number, when ΔTail represents changes to a file.
type ΔTail struct {
tailv []δRevEntry
lastRevOf map[Oid]Tid // index for LastRevOf queries
// TODO also add either tailv idx <-> rev index, or lastRevOf -> tailv idx
// (if linear back-scan of δRevEntry starts to eat cpu).
}
// δRevEntry represents information of what have been changed in one revision.
type δRevEntry struct {
rev Tid
changev []Oid
}
// NewΔTail creates new ΔTail object.
func NewΔTail() *ΔTail {
return &ΔTail{lastRevOf: make(map[Oid]Tid)}
}
// XXX + .Head() -> max(rev) XXX or 0 if len(tailv) == 0?
func (δtail *ΔTail) Head() Tid {
panic("TODO")
}
// XXX add way to extend coverage without appending changed data? (i.e. if a
// txn did not change file at all) -> but then it is simply .Append(rev, nil)?
// Append appends to δtail information about what have been changed in next revision.
//
// rev must be ↑.
func (δtail *ΔTail) Append(rev Tid, changev []Oid) {
// check rev↑
// XXX better also check even when δtail is ø (after forget)
if l := len(δtail.tailv); l > 0 {
if revPrev := δtail.tailv[l-1].rev; revPrev >= rev {
panic(fmt.Sprintf("δtail.Append: rev not ↑: %s -> %s", revPrev, rev))
}
}
δtail.tailv = append(δtail.tailv, δRevEntry{rev, changev})
for _, id := range changev {
δtail.lastRevOf[id] = rev
}
}
// ForgetBefore discards all δtail entries with rev < revCut.
func (δtail *ΔTail) ForgetBefore(revCut Tid) {
icut := 0
for i, δ := range δtail.tailv {
rev := δ.rev
if rev >= revCut {
break
}
icut = i+1
// if forgotten revision was last for id, we have to update lastRevOf index
for _, id := range δ.changev {
if δtail.lastRevOf[id] == rev {
delete(δtail.lastRevOf, id)
}
}
}
// tailv = tailv[icut:] but without
// 1) growing underlying storage array indefinitely
// 2) keeping underlying storage after forget
l := len(δtail.tailv)-icut
tailv := make([]δRevEntry, l)
copy(tailv, δtail.tailv[icut:])
δtail.tailv = tailv
}
// LastRevOf tries to return what was the last revision that changed id as of at database state.
//
// Depending on current information in δtail it returns either exact result, or
// an upper-bound estimate for the last id revision, assuming id was changed ≤ at:
//
// 1) if δtail does not cover at, at is returned:
//
// # at ∉ [min(rev ∈ δtail), max(rev ∈ δtail)]
// LastRevOf(id, at) = at
//
// 2) if δtail has an entry corresponding to id change, it gives exactly the
// last revision that changed id:
//
// # at ∈ [min(rev ∈ δtail), max(rev ∈ δtail)]
// # ∃ rev ∈ δtail: rev changed id && rev ≤ at
// LastRevOf(id, at) = max(rev: rev changed id && rev ≤ at)
//
// 3) if δtail does not contain appropriate record with id - it returns δtail's
// lower bound as the estimate for the upper bound of the last id revision:
//
// # at ∈ [min(rev ∈ δtail), max(rev ∈ δtail)]
// # ∄ rev ∈ δtail: rev changed id && rev ≤ at
// LastRevOf(id, at) = min(rev ∈ δtail)
//
// On return exact indicates whether returned revision is exactly the last
// revision of id, or only an upper-bound estimate of it.
func (δtail *ΔTail) LastRevOf(id Oid, at Tid) (_ Tid, exact bool) {
// check if we have no coverage at all
l := len(δtail.tailv)
if l == 0 {
return at, false
}
revMin := δtail.tailv[0].rev
revMax := δtail.tailv[l-1].rev
if !(revMin <= at && at <= revMax) {
return at, false
}
// we have the coverage
rev, ok := δtail.lastRevOf[id]
if !ok {
return δtail.tailv[0].rev, false
}
if rev <= at {
return rev, true
}
// what's in index is after at - scan tailv back to find appropriate entry
// XXX linear scan
for i := l - 1; i >= 0; i-- {
δ := δtail.tailv[i]
if δ.rev > at {
continue
}
for _, δid := range δ.changev {
if id == δid {
return δ.rev, true
}
}
}
// nothing found
return δtail.tailv[0].rev, false
}
#!/bin/bash
# δtail.go.cat-generic - cat to stdout δtail.go variant suitable for templating.
# Copyright (C) 2018-2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
input=$(dirname $0)/δtail.go
TODO
# import "lab.nexedi.com/kirr/neo/go/zodb"
# package zodb -> package PACKAGE
# Tid -> zodb.Tid
# Oid -> ID
// Copyright (C) 2018-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zodb
import (
"fmt"
"reflect"
"testing"
)
// XXX test Head
func TestΔTail(t *testing.T) {
δtail := NewΔTail()
// R is syntactic sugar to create 1 δRevEntry
R := func(rev Tid, changev ...Oid) δRevEntry {
return δRevEntry{rev, changev}
}
// δAppend is syntactic sugar for δtail.Append
δAppend := func(δ δRevEntry) {
δtail.Append(δ.rev, δ.changev)
}
// δCheck verifies that δtail state corresponds to provided tailv
δCheck := func(tailv ...δRevEntry) {
t.Helper()
for i := 1; i < len(tailv); i++ {
if !(tailv[i-1].rev < tailv[i].rev) {
panic("test tailv: rev not ↑")
}
}
if !tailvEqual(δtail.tailv, tailv) {
t.Fatalf("tailv:\nhave: %v\nwant: %v", δtail.tailv, tailv)
}
// verify lastRevOf query / index
lastRevOf := make(map[Oid]Tid)
for _, δ := range tailv {
for _, id := range δ.changev {
idRev, exact := δtail.LastRevOf(id, δ.rev)
if !(idRev == δ.rev && exact) {
t.Fatalf("LastRevOf(%v, at=%s) -> %s, %v ; want %s, %v", id, δ.rev, idRev, exact, δ.rev, true)
}
lastRevOf[id] = δ.rev
}
}
if !reflect.DeepEqual(δtail.lastRevOf, lastRevOf) {
t.Fatalf("lastRevOf:\nhave: %v\nwant: %v", δtail.lastRevOf, lastRevOf)
}
}
// δCheckLastUP verifies that δtail.LastRevOf(id, at) gives lastOk and exact=false.
// (we don't need to check for exact=true as those cases are covered in δCheck)
δCheckLastUP := func(id Oid, at, lastOk Tid) {
t.Helper()
last, exact := δtail.LastRevOf(id, at)
if !(last == lastOk && exact == false) {
t.Fatalf("LastRevOf(%v, at=%s) -> %s, %v ; want %s, %v", id, at, last, exact, lastOk, false)
}
}
δCheck()
δCheckLastUP(4, 12, 12) // δtail = ø
δAppend(R(10, 3,5))
δCheck(R(10, 3,5))
δCheckLastUP(3, 9, 9) // at < δtail
δCheckLastUP(3, 12, 12) // at > δtail
δCheckLastUP(4, 10, 10) // id ∉ δtail
δAppend(R(11, 7))
δCheck(R(10, 3,5), R(11, 7))
δAppend(R(12, 7))
δCheck(R(10, 3,5), R(11, 7), R(12, 7))
δAppend(R(14, 3,8))
δCheck(R(10, 3,5), R(11, 7), R(12, 7), R(14, 3,8))
δCheckLastUP(8, 12, 10) // id ∈ δtail, but has no entry with rev ≤ at
δtail.ForgetBefore(10)
δCheck(R(10, 3,5), R(11, 7), R(12, 7), R(14, 3,8))
δtail.ForgetBefore(11)
δCheck(R(11, 7), R(12, 7), R(14, 3,8))
δtail.ForgetBefore(13)
δCheck(R(14, 3,8))
δtail.ForgetBefore(15)
δCheck()
// Append panics on non-↑ rev
δAppend(R(15, 1))
func() {
defer func() {
r := recover()
if r == nil {
t.Fatal("append non-↑: not panicked")
}
rev := Tid(15)
want := fmt.Sprintf("δtail.Append: rev not ↑: %s -> %s", rev, rev)
if r != want {
t.Fatalf("append non-↑:\nhave: %q\nwant: %q", r, want)
}
}()
δAppend(R(15, 1))
}()
// .tailv underlying storage is not kept after forget
δtail.ForgetBefore(16)
const N = 1E3
for rev, i := Tid(16), 0; i < N; i, rev = i+1, rev+1 {
δAppend(R(rev, 1))
}
capN := cap(δtail.tailv)
δtail.ForgetBefore(N)
if c := cap(δtail.tailv); !(c < capN/10) {
t.Fatalf("forget: tailv storage did not shrink: cap%v: %d -> cap: %d", N, capN, c)
}
// .tailv underlying storage does not grow indefinitely
// XXX cannot test as the growth here goes to left and we cannot get
// access to whole underlying array from a slice.
}
func tailvEqual(a, b []δRevEntry) bool {
// for empty one can be nil and another !nil [] = reflect.DeepEqual
// does not think those are equal.
return (len(a) == 0 && len(b) == 0) ||
reflect.DeepEqual(a, b)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment