Commit b62d39d0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c872ddc6
...@@ -35,9 +35,10 @@ import ( ...@@ -35,9 +35,10 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task" "lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext" xxcontext "lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
"lab.nexedi.com/kirr/neo/go/xcommon/xio" "lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
) )
...@@ -121,7 +122,7 @@ func (stor *Storage) Run(ctx context.Context, l stdnet.Listener) (err error) { ...@@ -121,7 +122,7 @@ func (stor *Storage) Run(ctx context.Context, l stdnet.Listener) (err error) {
req, idReq, err := lli.Accept(ctx) req, idReq, err := lli.Accept(ctx)
if err != nil { if err != nil {
if !xcontext.Canceled(err) { if !xxcontext.Canceled(err) {
log.Error(ctx, err) // XXX throttle? log.Error(ctx, err) // XXX throttle?
} }
continue continue
......
...@@ -19,213 +19,15 @@ ...@@ -19,213 +19,15 @@
// Package xcontext provides addons to std package context. // Package xcontext provides addons to std package context.
// //
// Merging contexts
//
// Merge and MergeChan could be handy in situations where spawned job needs to
// be canceled whenever any of 2 contexts becomes done. This frequently arises
// with service methods that accept context as argument, and the service
// itself, on another control line, could be instructed to become
// non-operational. For example:
//
// func (srv *Service) DoSomething(ctx context.Context) error {
// // srv.down is chan struct{} that becomes ready when service is closed.
// ctxDown, cancel := xcontext.MergeChan(ctx, srv.down)
// defer cancel()
//
// err := doJob(ctxDown)
// if ctxDown.Err() != nil && ctx.Err() == nil {
// err = ErrDueToServiceDown
// }
//
// ...
// }
//
//
//
// XXX docs: // XXX docs:
// - Canceled // - Canceled
// - Merge
//
// - WhenDone // - WhenDone
package xcontext package xcontext
import ( import (
"context" "context"
"sync"
"time"
) )
// mergeCtx represents 2 context merged into 1.
type mergeCtx struct {
ctx1, ctx2 context.Context
done chan struct{}
doneErr error
cancelCh chan struct{}
cancelOnce sync.Once
}
// Merge merges 2 contexts into 1.
//
// The result context:
//
// - is done when ctx1 or ctx2 is done, or cancel called, whichever happens first,
// - has deadline = min(ctx1.Deadline, ctx2.Deadline),
// - has associated values merged from ctx1 and ctx2, with ctx1 taking precedence.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
mc := &mergeCtx{
ctx1: ctx1,
ctx2: ctx2,
done: make(chan struct{}),
cancelCh: make(chan struct{}),
}
// if src ctx is already cancelled - make mc cancelled right after creation
//
// this saves goroutine spawn and makes
//
// ctx = Merge(ctx1, ctx2); ctx.Err != nil
//
// check possible.
select {
case <-ctx1.Done():
close(mc.done)
mc.doneErr = ctx1.Err()
case <-ctx2.Done():
close(mc.done)
mc.doneErr = ctx2.Err()
default:
// src ctx not canceled - spawn ctx{1,2}.done merger.
go mc.wait()
}
return mc, mc.cancel
}
// wait waits when .ctx1 or .ctx2 is done and then mark mergeCtx as done
func (mc *mergeCtx) wait() {
select {
case <-mc.ctx1.Done():
mc.doneErr = mc.ctx1.Err()
case <-mc.ctx2.Done():
mc.doneErr = mc.ctx2.Err()
case <-mc.cancelCh:
mc.doneErr = context.Canceled
}
close(mc.done)
}
// cancel sends signal to wait to shutdown.
//
// cancel is the context.CancelFunc returned for mergeCtx by Merge.
func (mc *mergeCtx) cancel() {
mc.cancelOnce.Do(func() {
close(mc.cancelCh)
})
}
// Done implements context.Context .
func (mc *mergeCtx) Done() <-chan struct{} {
return mc.done
}
// Err implements context.Context .
func (mc *mergeCtx) Err() error {
// synchronize on .done to avoid .doneErr read races
select {
case <-mc.done:
default:
// done not yet closed
return nil
}
// .done closed; .doneErr was set before - no race
return mc.doneErr
}
// Deadline implements context.Context .
func (mc *mergeCtx) Deadline() (time.Time, bool) {
d1, ok1 := mc.ctx1.Deadline()
d2, ok2 := mc.ctx2.Deadline()
switch {
case !ok1:
return d2, ok2
case !ok2:
return d1, ok1
case d1.Before(d2):
return d1, true
default:
return d2, true
}
}
// Value implements context.Context .
func (mc *mergeCtx) Value(key interface{}) interface{} {
v := mc.ctx1.Value(key)
if v != nil {
return v
}
return mc.ctx2.Value(key)
}
// ----------------------------------------
// chanCtx wraps channel into context.Context interface.
type chanCtx struct {
done <-chan struct{}
}
// MergeChan merges context and channel into 1 context.
//
// MergeChan, similarly to Merge, provides resulting context which:
//
// - is done when ctx1 is done or done2 is closed, or cancel called, whichever happens first,
// - has the same deadline as ctx1,
// - has the same associated values as ctx1.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func MergeChan(ctx1 context.Context, done2 <-chan struct{}) (context.Context, context.CancelFunc) {
return Merge(ctx1, chanCtx{done2})
}
// Done implements context.Context .
func (c chanCtx) Done() <-chan struct{} {
return c.done
}
// Err implements context.Context .
func (c chanCtx) Err() error {
select {
case <-c.done:
return context.Canceled
default:
return nil
}
}
// Deadline implements context.Context .
func (c chanCtx) Deadline() (time.Time, bool) {
return time.Time{}, false
}
// Value implements context.Context .
func (c chanCtx) Value(key interface{}) interface{} {
return nil
}
// Cancelled reports whether an error is due to a canceled context. // Cancelled reports whether an error is due to a canceled context.
// //
// Since both cancellation ways - explicit and due to exceeding context // Since both cancellation ways - explicit and due to exceeding context
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xcontext provides addons to std package context.
package xcontext
import (
"context"
"testing"
"time"
)
func TestMerge(t *testing.T) {
bg := context.Background()
ctx1, cancel1 := context.WithCancel(bg)
ctx2, cancel2 := context.WithCancel(bg)
ctx1 = context.WithValue(ctx1, 1, "hello")
ctx2 = context.WithValue(ctx2, 2, "world")
mc, __ := Merge(ctx1, ctx2); defer __()
assertEq := func(a, b interface{}) {
t.Helper()
if a != b {
t.Fatalf("%v != %v", a, b)
}
}
assertEq(mc.Value(1), "hello")
assertEq(mc.Value(2), "world")
assertEq(mc.Value(3), nil)
t0 := time.Time{}
d, ok := mc.Deadline()
if !(d == t0 && ok == false) {
t.Fatal("deadline must be unset")
}
assertEq(mc.Err(), nil)
select {
case <-mc.Done():
t.Fatal("done before any parent done")
default:
}
cancel2()
<-mc.Done()
assertEq(mc.Err(), context.Canceled)
////////
mc, __ = Merge(ctx1, bg); defer __()
assertEq(mc.Value(1), "hello")
assertEq(mc.Value(2), nil)
assertEq(mc.Value(3), nil)
d, ok = mc.Deadline()
if !(d == t0 && ok == false) {
t.Fatal("deadline must be unset")
}
assertEq(mc.Err(), nil)
select {
case <-mc.Done():
t.Fatal("done before any parent done")
default:
}
cancel1()
<-mc.Done()
assertEq(mc.Err(), context.Canceled)
////////
ctx1, cancel1 = context.WithCancel(bg)
ctx1 = context.WithValue(ctx1, 3, "zzz")
done2 := make(chan struct{})
mc, __ = MergeChan(ctx1, done2); defer __()
assertEq(mc.Value(1), nil)
assertEq(mc.Value(2), nil)
assertEq(mc.Value(3), "zzz")
d, ok = mc.Deadline()
if !(d == t0 && ok == false) {
t.Fatal("deadline must be unset")
}
assertEq(mc.Err(), nil)
select {
case <-mc.Done():
t.Fatal("done before any parent done")
default:
}
close(done2)
<-mc.Done()
assertEq(mc.Err(), context.Canceled)
done2 = make(chan struct{})
mc, __ = MergeChan(ctx1, done2); defer __()
select {
case <-mc.Done():
t.Fatal("done before any parent done")
default:
}
cancel1()
<-mc.Done()
assertEq(mc.Err(), context.Canceled)
////////
t1 := t0.AddDate(7777, 1, 1)
t2 := t0.AddDate(9999, 1, 1)
ctx1, __ = context.WithDeadline(bg, t1); defer __()
ctx2, __ = context.WithDeadline(bg, t2); defer __()
checkDeadline := func(a, b context.Context, tt time.Time) {
t.Helper()
m, __ := Merge(a, b); defer __()
d, ok := m.Deadline()
if !ok {
t.Fatal("no deadline returned")
}
if d != tt {
t.Fatalf("incorrect deadline: %v ; want %v", d, tt)
}
}
checkDeadline(ctx1, bg, t1)
checkDeadline(bg, ctx2, t2)
checkDeadline(ctx1, ctx2, t1)
checkDeadline(ctx2, ctx1, t1)
////////
mc, mcancel := Merge(bg, bg)
select {
case <-mc.Done():
t.Fatal("done before any parent done")
default:
}
mcancel()
mcancel()
<-mc.Done()
assertEq(mc.Err(), context.Canceled)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment