Commit 39fcf48d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7cb2f43d
......@@ -117,7 +117,9 @@ const (
Committing // transaction commit started
Committed // transaction commit finished successfully
// XXX CommitFailed // transaction commit resulted in error
// XXX Aborted // transaction was aborted by user
Aborting // transaction abort started
Aborted // transaction was aborted by user
// XXX AbortFailed ? // transaction abort resulted in error
// XXX Doomed // transaction was doomed
)
......
......@@ -16,6 +16,8 @@ package transaction
import (
"context"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
)
// transaction implements Transaction.
......@@ -78,13 +80,88 @@ func (txn *transaction) Commit(ctx context.Context) error {
// Abort implements Transaction.
func (txn *transaction) Abort() {
panic("TODO")
ctx := context.Background() // FIXME stub
var datav []DataManager
var syncv []Synchronizer
// under lock: change state to aborting; extract datav/syncv
func() {
txn.mu.Lock()
defer txn.mu.Unlock()
txn.checkNotYetCompleting("abort")
txn.status = Aborting
datav = txn.datav; txn.datav = nil
syncv = txn.syncv; txn.syncv = nil
}()
// lock released
// sync.BeforeCompletion -> errBeforeCompletion
n := len(syncv)
wg := sync.WaitGroup{}
wg.Add(n)
errv := make([]error, n)
for i := 0; i < n; i++ {
i := i
go func() {
defer wg.Done()
errv[i] = syncv[i].BeforeCompletion(ctx, txn)
}()
}
wg.Wait()
ev := xerr.Errorv{}
for _, err := range errv {
ev.Appendif(err)
}
errBeforeCompletion := ev.Err()
xerr.Context(&errBeforeCompletion, "transaction: abort:")
// XXX if before completion = err -> skip data.Abort()? state -> AbortFailed?
// data.Abort
n = len(datav)
wg = sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
i := i
go func() {
defer wg.Done()
datav[i].Abort(txn) // XXX err?
}()
}
wg.Wait()
// XXX set txn status
// sync.AfterCompletion
n = len(syncv)
wg = sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
i := i
go func() {
defer wg.Done()
syncv[i].AfterCompletion(txn)
}()
}
// XXX return error?
}
// checkNotYetCompleting asserts that transaction completion has not yet began.
//
// and panics if the assert fails.
// must be called with .mu held.
//
// XXX place
func (txn *transaction) checkNotYetCompleting(who string) {
switch txn.status {
case Active: // XXX + Doomed ?
......@@ -94,7 +171,6 @@ func (txn *transaction) checkNotYetCompleting(who string) {
}
}
// Join implements Transaction.
func (txn *transaction) Join(dm DataManager) {
txn.mu.Lock()
......
......@@ -69,6 +69,10 @@ type dmAbortOnly struct {
nabort int32
}
func (d *dmAbortOnly) Modify() {
d.txn.Join(d)
}
func (d *dmAbortOnly) Abort(txn Transaction) {
if txn != d.txn {
d.t.Fatalf("abort: txn is different")
......@@ -81,14 +85,19 @@ func (d *dmAbortOnly) TPCBegin(_ Transaction) { d.bug(); panic(0) }
func (d *dmAbortOnly) Commit(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCVote(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCFinish(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCAbort(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCAbort(_ context.Context, _ Transaction) { d.bug(); panic(0) }
func TestAbort(t *testing.T) {
txn, _ := New(context.Background())
dm := &dmAbortOnly{t: t, txn: txn}
txn, ctx := New(context.Background())
dm := &dmAbortOnly{t: t, txn: Current(ctx)}
dm.Modify()
// XXX +sync
txn.Abort()
if dm.nabort != 1 {
t.Fatalf("abort: nabort=%d; want=1", dm.nabort)
}
// txn.Abort() -> panic XXX
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment