Commit 8d8f99fc authored by Nick Thomas's avatar Nick Thomas

Merge branch 'grpc-go-1.9.1' into 'master'

Use grpc-go 1.9.1

See merge request gitlab-org/gitlab-workhorse!224
parents cdabb988 6afbe6cb
......@@ -7,7 +7,7 @@ If you are new to github, please start by reading [Pull Request howto](https://h
## Legal requirements
In order to protect both you and ourselves, you will need to sign the
[Contributor License Agreement](
[Contributor License Agreement](
## Guidelines for Pull Requests
How to get your contributions merged smoothly and quickly.
......@@ -23,10 +23,10 @@ proto:
go generate
test: testdeps
go test -cpu 1,4
go test -cpu 1,4 -timeout 5m
testrace: testdeps
go test -race -cpu 1,4
go test -race -cpu 1,4 -timeout 7m
go clean -i
# gRPC-Go
[![Build Status](]( [![GoDoc](](
[![Build Status](]( [![GoDoc](]( [![GoReportCard](](
The Go implementation of [gRPC]( A high performance, open source, general RPC framework that puts mobile and HTTP/2 first. For more information see the [gRPC Quick Start: Go]( guide.
......@@ -16,7 +16,8 @@ $ go get -u
This requires Go 1.7 or later.
This requires Go 1.6 or later. Go 1.7 will be required as of the next gRPC-Go
release (1.8).
......@@ -25,14 +25,12 @@ import (
// DefaultBackoffConfig uses values specified for backoff in
var (
DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second,
baseDelay: 1.0 * time.Second,
factor: 1.6,
jitter: 0.2,
var DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second,
baseDelay: 1.0 * time.Second,
factor: 1.6,
jitter: 0.2,
// backoffStrategy defines the methodology for backing off after a grpc
// connection failure.
......@@ -28,6 +28,7 @@ import (
// Address represents a server the client connects to.
......@@ -310,7 +311,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
if !opts.BlockingWait {
if len(rr.addrs) == 0 {
err = Errorf(codes.Unavailable, "there is no address available")
err = status.Errorf(codes.Unavailable, "there is no address available")
// Returns the next addr on rr.addrs for failfast RPCs.
......@@ -23,6 +23,7 @@ package balancer
import (
......@@ -33,24 +34,23 @@ import (
var (
// m is a map from name to balancer builder.
m = make(map[string]Builder)
// defaultBuilder is the default balancer to use.
defaultBuilder Builder // TODO(bar) install pickfirst as default.
// Register registers the balancer builder to the balancer map.
// b.Name will be used as the name registered with this builder.
// b.Name (lowercased) will be used as the name registered with
// this builder.
func Register(b Builder) {
m[b.Name()] = b
m[strings.ToLower(b.Name())] = b
// Get returns the resolver builder registered with the given name.
// If no builder is register with the name, the default pickfirst will
// be used.
// Note that the compare is done in a case-insenstive fashion.
// If no builder is register with the name, nil will be returned.
func Get(name string) Builder {
if b, ok := m[name]; ok {
if b, ok := m[strings.ToLower(name)]; ok {
return b
return defaultBuilder
return nil
// SubConn represents a gRPC sub connection.
......@@ -66,6 +66,11 @@ func Get(name string) Builder {
// When the connection encounters an error, it will reconnect immediately.
// When the connection becomes IDLE, it will not reconnect unless Connect is
// called.
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
......@@ -83,6 +88,11 @@ type SubConn interface {
type NewSubConnOptions struct{}
// ClientConn represents a gRPC ClientConn.
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// NewSubConn is called by balancer to create a new SubConn.
// It doesn't block and wait for the connections to be established.
......@@ -99,6 +109,9 @@ type ClientConn interface {
// on the new picker to pick new SubConn.
UpdateBalancerState(s connectivity.State, p Picker)
// ResolveNow is called by balancer to notify gRPC to do a name resolving.
// Target returns the dial target for this ClientConn.
Target() string
......@@ -131,6 +144,10 @@ type PickOptions struct{}
type DoneInfo struct {
// Err is the rpc error the RPC finished with. It could be nil.
Err error
// BytesSent indicates if any bytes have been sent to the server.
BytesSent bool
// BytesReceived indicates if any byte has been received from the server.
BytesReceived bool
var (
......@@ -161,7 +178,7 @@ type Picker interface {
// If a SubConn is returned:
// - If it is READY, gRPC will send the RPC on it;
// - If it is not ready, or becomes not ready after it's returned, gRPC will block
// this call until a new picker is updated and will call pick on the new picker.
// until UpdateBalancerState() is called and will call pick on the new picker.
// If the returned error is not nil:
// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package base
import (
type baseBuilder struct {
name string
pickerBuilder PickerBuilder
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &connectivityStateEvaluator{},
// Initialize picker to a picker that always return
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
// may call UpdateBalancerState with this picker.
picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
func (bb *baseBuilder) Name() string {
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
csEvltr *connectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
grpclog.Infof("base.baseBalancer: HandleResolvedAddrs called with error %v", err)
grpclog.Infoln("base.baseBalancer: got new resolved addresses: ", addrs)
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).
sc, err :=[]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
b.subConns[a] = sc
b.scStates[sc] = connectivity.Idle
for a, sc := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// regeneratePicker takes a snapshot of the balancer, and generates a picker
// from it. The picker is
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
// - built by the pickerBuilder with all READY SubConns otherwise.
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = NewErrPicker(balancer.ErrTransientFailure)
readySCs := make(map[resolver.Address]balancer.SubConn)
// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[addr] = sc
b.picker = b.pickerBuilder.Build(readySCs)
func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
oldS, ok := b.scStates[sc]
if !ok {
grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
b.scStates[sc] = s
switch s {
case connectivity.Idle:
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
oldAggrState := b.state
b.state = b.csEvltr.recordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
}, b.picker)
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (b *baseBalancer) Close() {
// NewErrPicker returns a picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}
type errPicker struct {
err error // Pick() always returns this err.
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
// connectivityStateEvaluator gets updated by addrConns when their
// states transition, based on which it evaluates the state of
// ClientConn.
type connectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
// recordTransition records state change happening in every subConn and based on
// that it evaluates what aggregated state should be.
// It can only transition between Ready, Connecting and TransientFailure. Other states,
// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
// before any subConn is created ClientConn is in idle state. In the end when ClientConn
// closes it is in Shutdown state.
// recordTransition should only be called synchronously from the same goroutine.
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
if cse.numConnecting > 0 {
return connectivity.Connecting
return connectivity.TransientFailure
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
// Package base defines a balancer base that can be used to build balancers with
// different picking algorithms.
// The base balancer creates a new SubConn for each resolved address. The
// provided picker will only be notified about READY SubConns.
// This package is the base of round_robin balancer, its purpose is to be used
// to build round_robin like balancers with complex picking algorithms.
// Balancers with more complicated logic should try to implement a balancer
// builder from scratch.
// All APIs in this package are experimental.
package base
import (
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build takes a slice of ready SubConns, and returns a picker that will be
// used by gRPC to pick a SubConn.
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
// Package roundrobin defines a roundrobin balancer. Roundrobin balancer is
// installed as one of the default balancers in gRPC, users don't need to
// explicitly install this balancer.
package roundrobin
import (
// Name is the name of round_robin balancer.
const Name = "round_robin"
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
func init() {
type rrPickerBuilder struct{}
func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
var scs []balancer.SubConn
for _, sc := range readySCs {
scs = append(scs, sc)
return &rrPicker{
subConns: scs,
type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
mu sync.Mutex
next int
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
sc := p.subConns[] = ( + 1) % len(p.subConns)
return sc, nil, nil
......@@ -19,6 +19,7 @@
package grpc
import (
......@@ -73,7 +74,7 @@ func (b *scStateUpdateBuffer) load() {
// get returns the channel that receives a recvMsg in the buffer.
// get returns the channel that the scStateUpdate will be sent to.
// Upon receiving, the caller should call load to send another
// scStateChangeTuple onto the channel if there is any.
......@@ -96,6 +97,9 @@ type ccBalancerWrapper struct {
stateChangeQueue *scStateUpdateBuffer
resolverUpdateCh chan *resolverUpdate
done chan struct{}
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
......@@ -104,6 +108,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
stateChangeQueue: newSCStateUpdateBuffer(),
resolverUpdateCh: make(chan *resolverUpdate, 1),
done: make(chan struct{}),
subConns: make(map[*acBalancerWrapper]struct{}),
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
......@@ -117,8 +122,20 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case t := <-ccb.stateChangeQueue.get():
select {
case <-ccb.done:
ccb.balancer.HandleSubConnStateChange(, t.state)
case t := <-ccb.resolverUpdateCh:
select {
case <-ccb.done:
ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
case <-ccb.done:
......@@ -126,6 +143,13 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case <-ccb.done:
scs := ccb.subConns
ccb.subConns = nil
for acbw := range scs {, errConnDrain)
......@@ -165,31 +189,54 @@ func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
grpclog.Infof("ccBalancerWrapper: new subconn: %v", addrs)
if len(addrs) <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
if ccb.subConns == nil {
return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
ac, err :=
if err != nil {
return nil, err
acbw := &acBalancerWrapper{ac: ac}
ac.acbw = acbw
ccb.subConns[acbw] = struct{}{}
return acbw, nil
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
grpclog.Infof("ccBalancerWrapper: removing subconn")
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
if ccb.subConns == nil {
delete(ccb.subConns, acbw), errConnDrain)
func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
grpclog.Infof("ccBalancerWrapper: updating state and picker called by balancer: %v, %p", s, p)
if ccb.subConns == nil {
func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
func (ccb *ccBalancerWrapper) Target() string {
......@@ -202,9 +249,12 @@ type acBalancerWrapper struct {
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
grpclog.Infof("acBalancerWrapper: UpdateAddresses called with %v", addrs)
if len(addrs) <= 0 {
if ! {
cc :=
......@@ -228,9 +278,11 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
} = ac
ac.acbw = acbw
if acState != connectivity.Idle {
......@@ -238,7 +290,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
func (acbw *acBalancerWrapper) Connect() {
func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
......@@ -19,6 +19,7 @@
package grpc
import (
......@@ -27,6 +28,7 @@ import (
type balancerWrapperBuilder struct {
......@@ -34,20 +36,27 @@ type balancerWrapperBuilder struct {
func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
bwb.b.Start(cc.Target(), BalancerConfig{
targetAddr := cc.Target()
targetSplitted := strings.Split(targetAddr, ":///")
if len(targetSplitted) >= 2 {
targetAddr = targetSplitted[1]
bwb.b.Start(targetAddr, BalancerConfig{
DialCreds: opts.DialCreds,
Dialer: opts.Dialer,
_, pickfirst := bwb.b.(*pickFirst)
bw := &balancerWrapper{
balancer: bwb.b,
pickfirst: pickfirst,
cc: cc,
startCh: make(chan struct{}),
conns: make(map[resolver.Address]balancer.SubConn),
connSt: make(map[balancer.SubConn]*scState),
csEvltr: &connectivityStateEvaluator{},
state: connectivity.Idle,
balancer: bwb.b,
pickfirst: pickfirst,
cc: cc,
targetAddr: targetAddr,
startCh: make(chan struct{}),
conns: make(map[resolver.Address]balancer.SubConn),
connSt: make(map[balancer.SubConn]*scState),
csEvltr: &connectivityStateEvaluator{},
state: connectivity.Idle,
cc.UpdateBalancerState(connectivity.Idle, bw)
go bw.lbWatcher()
......@@ -68,7 +77,8 @@ type balancerWrapper struct {
balancer Balancer // The v1 balancer.
pickfirst bool
cc balancer.ClientConn
cc balancer.ClientConn
targetAddr string // Target without the scheme.
// To aggregate the connectivity state.
csEvltr *connectivityStateEvaluator
......@@ -88,12 +98,11 @@ type balancerWrapper struct {
// connections accordingly.
func (bw *balancerWrapper) lbWatcher() {
grpclog.Infof("balancerWrapper: is pickfirst: %v\n", bw.pickfirst)
notifyCh := bw.balancer.Notify()
if notifyCh == nil {
// There's no resolver in the balancer. Connect directly.
a := resolver.Address{
Addr: bw.targetAddr,
Type: resolver.Backend,
sc, err :=[]resolver.Address{a}, balancer.NewSubConnOptions{})
......@@ -103,7 +112,7 @@ func (bw *balancerWrapper) lbWatcher() {
bw.conns[a] = sc
bw.connSt[sc] = &scState{
addr: Address{Addr:},
addr: Address{Addr: bw.targetAddr},
s: connectivity.Idle,
......@@ -165,10 +174,10 @@ func (bw *balancerWrapper) lbWatcher() {
} else {
bw.connSt[oldSC].addr = addrs[0]
} else {
var (
......@@ -221,7 +230,6 @@ func (bw *balancerWrapper) lbWatcher() {
func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("balancerWrapper: handle subconn state change: %p, %v", sc, s)
scSt, ok := bw.connSt[sc]
......@@ -310,12 +318,12 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions)
Metadata: a.Metadata,
if !ok && failfast {
return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
// If the returned sc is not ready and RPC is failfast,
// return error, and this RPC will fail.
return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
This diff is collapsed.
This diff is collapsed.
......@@ -69,6 +69,11 @@ func (p protoCodec) marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error
func (p protoCodec) Marshal(v interface{}) ([]byte, error) {
if pm, ok := v.(proto.Marshaler); ok {
// object can marshal itself, no need for buffer
return pm.Marshal()
cb := protoBufferPool.Get().(*cachedProtoBuffer)
out, err := p.marshal(v, cb)
......@@ -79,10 +84,17 @@ func (p protoCodec) Marshal(v interface{}) ([]byte, error) {
func (p protoCodec) Unmarshal(data []byte, v interface{}) error {
protoMsg := v.(proto.Message)
if pu, ok := protoMsg.(proto.Unmarshaler); ok {
// object can unmarshal itself, no need for buffer
return pu.Unmarshal(data)
cb := protoBufferPool.Get().(*cachedProtoBuffer)
err := cb.Unmarshal(v.(proto.Message))
err := cb.Unmarshal(protoMsg)
return err
......@@ -92,13 +104,11 @@ func (protoCodec) String() string {
return "proto"
var (
protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
var protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
// Code generated by "stringer -type=Code"; DO NOT EDIT.
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package codes
import "fmt"
import "strconv"
const _Code_name = "OKCanceledUnknownInvalidArgumentDeadlineExceededNotFoundAlreadyExistsPermissionDeniedResourceExhaustedFailedPreconditionAbortedOutOfRangeUnimplementedInternalUnavailableDataLossUnauthenticated"
var _Code_index = [...]uint8{0, 2, 10, 17, 32, 48, 56, 69, 85, 102, 120, 127, 137, 150, 158, 169, 177, 192}
func (i Code) String() string {
if i >= Code(len(_Code_index)-1) {
return fmt.Sprintf("Code(%d)", i)
func (c Code) String() string {
switch c {
case OK:
return "OK"
case Canceled:
return "Canceled"
case Unknown:
return "Unknown"
case InvalidArgument:
return "InvalidArgument"
case DeadlineExceeded:
return "DeadlineExceeded"
case NotFound:
return "NotFound"
case AlreadyExists:
return "AlreadyExists"
case PermissionDenied:
return "PermissionDenied"
case ResourceExhausted:
return "ResourceExhausted"
case FailedPrecondition:
return "FailedPrecondition"
case Aborted:
return "Aborted"
case OutOfRange:
return "OutOfRange"
case Unimplemented:
return "Unimplemented"
case Internal:
return "Internal"
case Unavailable:
return "Unavailable"
case DataLoss:
return "DataLoss"
case Unauthenticated:
return "Unauthenticated"
return "Code(" + strconv.FormatInt(int64(c), 10) + ")"
return _Code_name[_Code_index[i]:_Code_index[i+1]]
......@@ -19,12 +19,13 @@
// Package codes defines the canonical error codes used by gRPC. It is
// consistent across various languages.
package codes // import ""
import (
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32
//go:generate stringer -type=Code
const (
// OK is returned on success.
OK Code = 0
......@@ -142,3 +143,41 @@ const (
// DataLoss indicates unrecoverable data loss or corruption.
DataLoss Code = 15
var strToCode = map[string]Code{
`"OK"`: OK,
`"CANCELLED"`:/* [sic] */ Canceled,
`"UNKNOWN"`: Unknown,
`"INVALID_ARGUMENT"`: InvalidArgument,
`"DEADLINE_EXCEEDED"`: DeadlineExceeded,
`"NOT_FOUND"`: NotFound,
`"ALREADY_EXISTS"`: AlreadyExists,
`"PERMISSION_DENIED"`: PermissionDenied,
`"RESOURCE_EXHAUSTED"`: ResourceExhausted,
`"FAILED_PRECONDITION"`: FailedPrecondition,
`"ABORTED"`: Aborted,
`"OUT_OF_RANGE"`: OutOfRange,
`"UNIMPLEMENTED"`: Unimplemented,
`"INTERNAL"`: Internal,
`"UNAVAILABLE"`: Unavailable,
`"DATA_LOSS"`: DataLoss,
`"UNAUTHENTICATED"`: Unauthenticated,
// UnmarshalJSON unmarshals b into the Code.
func (c *Code) UnmarshalJSON(b []byte) error {
// From json.Unmarshaler: By convention, to approximate the behavior of
// Unmarshal itself, Unmarshalers implement UnmarshalJSON([]byte("null")) as
// a no-op.
if string(b) == "null" {
return nil
if c == nil {
return fmt.Errorf("nil receiver passed to UnmarshalJSON")
if jc, ok := strToCode[string(b)]; ok {
*c = jc
return nil
return fmt.Errorf("invalid code: %q", string(b))
......@@ -34,10 +34,8 @@ import (
var (
// alpnProtoStr are the specified application level protocols for gRPC.
alpnProtoStr = []string{"h2"}
// alpnProtoStr are the specified application level protocols for gRPC.
var alpnProtoStr = []string{"h2"}
// PerRPCCredentials defines the common interface for the credentials which need to
// attach security information to every RPC (e.g., oauth2).
......@@ -74,11 +72,9 @@ type AuthInfo interface {
AuthType() string
var (
// ErrConnDispatched indicates that rawConn has been dispatched out of gRPC
// and the caller should not close rawConn.
ErrConnDispatched = errors.New("credentials: rawConn is dispatched out of gRPC")
// ErrConnDispatched indicates that rawConn has been dispatched out of gRPC
// and the caller should not close rawConn.
var ErrConnDispatched = errors.New("credentials: rawConn is dispatched out of gRPC")
// TransportCredentials defines the common interface for all the live gRPC wire
// protocols and supported transport security protocols (e.g., TLS, SSL).
......@@ -91,10 +87,14 @@ type TransportCredentials interface {
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
// If the returned error is a wrapper error, implementations should make sure that
// the error implements Temporary() to have the correct retry behaviors.
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
// ServerHandshake does the authentication handshake for servers. It returns
// the authenticated connection and the corresponding auth information about
// the connection.
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
ServerHandshake(net.Conn) (net.Conn, AuthInfo, error)
// Info provides the ProtocolInfo of this TransportCredentials.
Info() ProtocolInfo
......@@ -131,15 +131,15 @@ func (c tlsCreds) Info() ProtocolInfo {
func (c *tlsCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (_ net.Conn, _ AuthInfo, err error) {
func (c *tlsCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (_ net.Conn, _ AuthInfo, err error) {
// use local cfg to avoid clobbering ServerName if using multiple endpoints
cfg := cloneTLSConfig(c.config)
if cfg.ServerName == "" {
colonPos := strings.LastIndex(addr, ":")
colonPos := strings.LastIndex(authority, ":")
if colonPos == -1 {
colonPos = len(addr)
colonPos = len(authority)
cfg.ServerName = addr[:colonPos]
cfg.ServerName = authority[:colonPos]
conn := tls.Client(rawConn, cfg)
errChannel := make(chan error, 1)
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
// Package encoding defines the interface for the compressor and the functions
// to register and get the compossor.
// This package is EXPERIMENTAL.
package encoding
import (
var registerCompressor = make(map[string]Compressor)
// Compressor is used for compressing and decompressing when sending or receiving messages.
type Compressor interface {
// Compress writes the data written to wc to w after compressing it. If an error
// occurs while initializing the compressor, that error is returned instead.
Compress(w io.Writer) (io.WriteCloser, error)
// Decompress reads data from r, decompresses it, and provides the uncompressed data
// via the returned io.Reader. If an error occurs while initializing the decompressor, that error
// is returned instead.
Decompress(r io.Reader) (io.Reader, error)
// Name is the name of the compression codec and is used to set the content coding header.
Name() string
// RegisterCompressor registers the compressor with gRPC by its name. It can be activated when
// sending an RPC via grpc.UseCompressor(). It will be automatically accessed when receiving a
// message based on the content coding header. Servers also use it to send a response with the
// same encoding as the request.
// NOTE: this function must only be called during initialization time (i.e. in an init() function). If
// multiple Compressors are registered with the same name, the one registered last will take effect.
func RegisterCompressor(c Compressor) {
registerCompressor[c.Name()] = c
// GetCompressor returns Compressor for the given compressor name.
func GetCompressor(name string) Compressor {
return registerCompressor[name]
// Identity specifies the optional encoding for uncompressed streams.
// It is intended for grpc internal use only.
const Identity = "identity"
// +build go1.6,!go1.7
* Copyright 2016 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package grpc
import (
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
req.Cancel = ctx.Done()
if err := req.Write(conn); err != nil {
return fmt.Errorf("failed to write the HTTP request: %v", err)
return nil
// toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error {
if _, ok := status.FromError(err); ok {
return err
switch e := err.(type) {
case transport.StreamError:
return status.Error(e.Code, e.Desc)
case transport.ConnectionError:
return status.Error(codes.Unavailable, e.Desc)
switch err {
case context.DeadlineExceeded:
return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled:
return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
return status.Error(codes.Unknown, err.Error())
// convertCode converts a standard Go error into its canonical code. Note that
// this is only used to translate the error returned by the server applications.
func convertCode(err error) codes.Code {
switch err {
case nil:
return codes.OK
case io.EOF:
return codes.OutOfRange
case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
return codes.FailedPrecondition
case os.ErrInvalid:
return codes.InvalidArgument
case context.Canceled:
return codes.Canceled
case context.DeadlineExceeded:
return codes.DeadlineExceeded
switch {
case os.IsExist(err):
return codes.AlreadyExists
case os.IsNotExist(err):
return codes.NotFound
case os.IsPermission(err):
return codes.PermissionDenied
return codes.Unknown
// +build go1.7
* Copyright 2016 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package grpc
import (
netctx ""
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, address)
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
req = req.WithContext(ctx)
if err := req.Write(conn); err != nil {
return fmt.Errorf("failed to write the HTTP request: %v", err)
return nil
// toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error {
if _, ok := status.FromError(err); ok {
return err
switch e := err.(type) {
case transport.StreamError:
return status.Error(e.Code, e.Desc)
case transport.ConnectionError:
return status.Error(codes.Unavailable, e.Desc)
switch err {
case context.DeadlineExceeded, netctx.DeadlineExceeded:
return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled, netctx.Canceled:
return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
return status.Error(codes.Unknown, err.Error())
// convertCode converts a standard Go error into its canonical code. Note that
// this is only used to translate the error returned by the server applications.
func convertCode(err error) codes.Code {
switch err {
case nil:
return codes.OK
case io.EOF:
return codes.OutOfRange
case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
return codes.FailedPrecondition
case os.ErrInvalid:
return codes.InvalidArgument
case context.Canceled, netctx.Canceled:
return codes.Canceled
case context.DeadlineExceeded, netctx.DeadlineExceeded:
return codes.DeadlineExceeded
switch {
case os.IsExist(err):
return codes.AlreadyExists
case os.IsNotExist(err):
return codes.NotFound
case os.IsPermission(err):
return codes.PermissionDenied
return codes.Unknown
This diff is collapsed.
......@@ -15,7 +15,7 @@
syntax = "proto3";
option go_package = "messages";
option go_package = "";
message Duration {
// Signed seconds of the span of time. Must be from -315,576,000,000
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package grpc
import (
lbpb ""
type rpcStats struct {
NumCallsStarted int64
NumCallsFinished int64
NumCallsFinishedWithDropForRateLimiting int64
NumCallsFinishedWithDropForLoadBalancing int64
NumCallsFinishedWithClientFailedToSend int64
NumCallsFinishedKnownReceived int64
// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
func (s *rpcStats) toClientStats() *lbpb.ClientStats {
stats := &lbpb.ClientStats{
NumCallsStarted: atomic.SwapInt64(&s.NumCallsStarted, 0),
NumCallsFinished: atomic.SwapInt64(&s.NumCallsFinished, 0),
NumCallsFinishedWithDropForRateLimiting: atomic.SwapInt64(&s.NumCallsFinishedWithDropForRateLimiting, 0),
NumCallsFinishedWithDropForLoadBalancing: atomic.SwapInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 0),
NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.NumCallsFinishedWithClientFailedToSend, 0),
NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.NumCallsFinishedKnownReceived, 0),
return stats
func (s *rpcStats) dropForRateLimiting() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithDropForRateLimiting, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
func (s *rpcStats) dropForLoadBalancing() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
func (s *rpcStats) failedToSend() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithClientFailedToSend, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
func (s *rpcStats) knownReceived() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedKnownReceived, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
type errPicker struct {
// Pick always returns this err.
err error
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
// rrPicker does roundrobin on subConns. It's typically used when there's no
// response from remote balancer, and grpclb falls back to the resolved
// backends.
// It guaranteed that len(subConns) > 0.
type rrPicker struct {
mu sync.Mutex
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
return sc, nil, nil
// lbPicker does two layers of picks:
// First layer: roundrobin on all servers in serverList, including drops and backends.
// - If it picks a drop, the RPC will fail as being dropped.
// - If it picks a backend, do a second layer pick to pick the real backend.
// Second layer: roundrobin on all READY backends.
// It's guaranteed that len(serverList) > 0.
type lbPicker struct {
mu sync.Mutex
serverList []*lbpb.Server
serverListNext int
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int
stats *rpcStats
func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
// Layer one roundrobin on serverList.
s := p.serverList[p.serverListNext]
p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
// If it's a drop, return an error and fail the RPC.
if s.DropForRateLimiting {
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
if s.DropForLoadBalancing {
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
// If not a drop but there's no ready subConns.
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
// Return the next ready subConn in the list, also collect rpc stats.
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
done := func(info balancer.DoneInfo) {
if !info.BytesSent {
} else if info.BytesReceived {
return sc, done, nil
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package grpc
import (
lbpb ""
// processServerList updates balaner's internal state, create/remove SubConns
// and regenerates picker using the received serverList.
func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
grpclog.Infof("lbBalancer: processing server list: %+v", l)
// Set serverListReceived to true so fallback will not take effect if it has
// not hit timeout.
lb.serverListReceived = true
// If the new server list == old server list, do nothing.
if reflect.DeepEqual(lb.fullServerList, l.Servers) {
grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
lb.fullServerList = l.Servers
var backendAddrs []resolver.Address
for _, s := range l.Servers {
if s.DropForLoadBalancing || s.DropForRateLimiting {
md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
ip := net.IP(s.IpAddress)
ipStr := ip.String()
if ip.To4() == nil {
// Add square brackets to ipv6 addresses, otherwise net.Dial() and
// net.SplitHostPort() will return too many colons error.
ipStr = fmt.Sprintf("[%s]", ipStr)
addr := resolver.Address{
Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
Metadata: &md,
backendAddrs = append(backendAddrs, addr)
// Call refreshSubConns to create/remove SubConns.
backendsUpdated := lb.refreshSubConns(backendAddrs)
// If no backend was updated, no SubConn will be newed/removed. But since
// the full serverList was different, there might be updates in drops or
// pick weights(different number of duplicates). We need to update picker
// with the fulllist.
if !backendsUpdated {
lb.regeneratePicker(), lb.picker)
// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
// indicating whether the backendAddrs are different from the cached
// backendAddrs (whether any SubConn was newed/removed).
// Caller must hold
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
lb.backendAddrs = nil
var backendsUpdated bool
// addrsSet is the set converted from backendAddrs, it's used to quick
// lookup for an address.
addrsSet := make(map[resolver.Address]struct{})
// Create new SubConns.
for _, addr := range backendAddrs {
addrWithoutMD := addr
addrWithoutMD.Metadata = nil
addrsSet[addrWithoutMD] = struct{}{}
lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
if _, ok := lb.subConns[addrWithoutMD]; !ok {
backendsUpdated = true
// Use addrWithMD to create the SubConn.
sc, err :=[]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("roundrobinBalancer: failed to create new SubConn: %v", err)
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
lb.scStates[sc] = connectivity.Idle
for a, sc := range lb.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
backendsUpdated = true
delete(lb.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
return backendsUpdated
func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
for {
reply, err := s.Recv()
if err != nil {
return fmt.Errorf("grpclb: failed to recv server list: %v", err)
if serverList := reply.GetServerList(); serverList != nil {
func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-s.Context().Done():
stats := lb.clientStats.toClientStats()
t := time.Now()
stats.Timestamp = &lbpb.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
if err := s.Send(&lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
ClientStats: stats,
}); err != nil {
func (lb *lbBalancer) callRemoteBalancer() error {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, FailFast(false))
if err != nil {
return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
// grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbpb.InitialLoadBalanceRequest{
if err := stream.Send(initReq); err != nil {
return fmt.Errorf("grpclb: failed to send init request: %v", err)
reply, err := stream.Recv()
if err != nil {
return fmt.Errorf("grpclb: failed to recv init response: %v", err)
initResp := reply.GetInitialResponse()
if initResp == nil {
return fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
if initResp.LoadBalancerDelegate != "" {
return fmt.Errorf("grpclb: Delegation is not supported")
go func() {
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d)
return lb.readServerList(stream)
func (lb *lbBalancer) watchRemoteBalancer() {
for {
err := lb.callRemoteBalancer()
select {
case <-lb.doneCh:
if err != nil {
func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
var dopts []DialOption
if creds := lb.opt.DialCreds; creds != nil {
if err := creds.OverrideServerName(remoteLBName); err == nil {
dopts = append(dopts, WithTransportCredentials(creds))
} else {
grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
dopts = append(dopts, WithInsecure())
} else {
dopts = append(dopts, WithInsecure())
if lb.opt.Dialer != nil {
// WithDialer takes a different type of function, so we instead use a
// special DialOption here.
dopts = append(dopts, withContextDialer(lb.opt.Dialer))
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
dopts = append(dopts, withResolverBuilder(lb.manualResolver))
// Dial using manualResolver.Scheme, which is a random scheme generated
// when init grpclb. The target name is not important.
cc, err := Dial("grpclb:///grpclb.server", dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
lb.ccRemoteLB = cc
go lb.watchRemoteBalancer()
* Copyright 2016 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package grpc
import (
// The parent ClientConn should re-resolve when grpclb loses connection to the
// remote balancer. When the ClientConn inside grpclb gets a TransientFailure,
// it calls lbManualResolver.ResolveNow(), which calls parent ClientConn's
// ResolveNow, and eventually results in re-resolve happening in parent
// ClientConn's resolver (DNS for example).
// parent
// ClientConn
// +-----------------------------------------------------------------+
// | parent +---------------------------------+ |
// | DNS ClientConn | grpclb | |
// | resolver balancerWrapper | | |
// | + + | grpclb grpclb | |
// | | | | ManualResolver ClientConn | |
// | | | | + + | |
// | | | | | | Transient | |
// | | | | | | Failure | |
// | | | | | <--------- | | |
// | | | <--------------- | ResolveNow | | |
// | | <--------- | ResolveNow | | | | |
// | | ResolveNow | | | | | |
// | | | | | | | |
// | + + | + + | |
// | +---------------------------------+ |
// +-----------------------------------------------------------------+
// lbManualResolver is used by the ClientConn inside grpclb. It's a manual
// resolver with a special ResolveNow() function.
// When ResolveNow() is called, it calls ResolveNow() on the parent ClientConn,
// so when grpclb client lose contact with remote balancers, the parent
// ClientConn's resolver will re-resolve.
type lbManualResolver struct {
scheme string
ccr resolver.ClientConn
ccb balancer.ClientConn
func (r *lbManualResolver) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
r.ccr = cc
return r, nil
func (r *lbManualResolver) Scheme() string {
return r.scheme
// ResolveNow calls resolveNow on the parent ClientConn.
func (r *lbManualResolver) ResolveNow(o resolver.ResolveNowOption) {
// Close is a noop for Resolver.
func (*lbManualResolver) Close() {}
// NewAddress calls cc.NewAddress.
func (r *lbManualResolver) NewAddress(addrs []resolver.Address) {
// NewServiceConfig calls cc.NewServiceConfig.
func (r *lbManualResolver) NewServiceConfig(sc string) {
......@@ -19,13 +19,6 @@
// the godoc of the top-level grpc package.
package internal
// TestingCloseConns closes all existing transports but keeps
// grpcServer.lis accepting new connections.
// The provided grpcServer must be of type *grpc.Server. It is untyped
// for circular dependency reasons.
var TestingCloseConns func(grpcServer interface{})
// TestingUseHandlerImpl enables the http.Handler-based server implementation.
// It must be called before Serve and requires TLS credentials.
// +build go1.7, !go1.8
// +build go1.6,!go1.8
......@@ -97,7 +97,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
p = bp.picker
subConn, put, err := p.Pick(ctx, opts)
subConn, done, err := p.Pick(ctx, opts)
if err != nil {
switch err {
......@@ -120,7 +120,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
return t, put, nil
return t, done, nil
grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
// If ok == false, ac.state is not READY.
......@@ -26,6 +26,9 @@ import (
// PickFirstBalancerName is the name of the pick_first balancer.
const PickFirstBalancerName = "pick_first"
func newPickfirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
......@@ -37,7 +40,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions
func (*pickfirstBuilder) Name() string {
return "pickfirst"
return PickFirstBalancerName
type pickfirstBalancer struct {
......@@ -57,14 +60,20 @@ func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err er
}, &picker{sc:})
} else {
func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s)
if != sc || s == connectivity.Shutdown {
if != sc {
grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
if s == connectivity.Shutdown { = nil
......@@ -93,3 +102,7 @@ func (p *picker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.
return, nil, nil
func init() {
......@@ -82,8 +82,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, addr string) (_
Header: map[string][]string{"User-Agent": {grpcUA}},
req = req.WithContext(ctx)
if err := req.Write(conn); err != nil {
if err := sendHTTPRequest(ctx, req, conn); err != nil {
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
This diff is collapsed.
// +build go1.6, !go1.8
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package dns
import (
var (
lookupHost = func(ctx context.Context, host string) ([]string, error) { return net.LookupHost(host) }
lookupSRV = func(ctx context.Context, service, proto, name string) (string, []*net.SRV, error) {
return net.LookupSRV(service, proto, name)
lookupTXT = func(ctx context.Context, name string) ([]string, error) { return net.LookupTXT(name) }
// +build go1.8
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package dns
import "net"
var (
lookupHost = net.DefaultResolver.LookupHost
lookupSRV = net.DefaultResolver.LookupSRV
lookupTXT = net.DefaultResolver.LookupTXT
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
// Package passthrough implements a pass-through resolver. It sends the target
// name without scheme back to gRPC as resolved address.
package passthrough
import ""
const scheme = "passthrough"
type passthroughBuilder struct{}
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
r := &passthroughResolver{
target: target,
cc: cc,
return r, nil
func (*passthroughBuilder) Scheme() string {
return scheme
type passthroughResolver struct {
target resolver.Target
cc resolver.ClientConn
func (r *passthroughResolver) start() {[]resolver.Address{{Addr:}})
func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOption) {}
func (*passthroughResolver) Close() {}
func init() {
......@@ -24,7 +24,7 @@ var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
// defaultScheme is the default scheme to use.
defaultScheme string
defaultScheme = "passthrough"
// TODO(bar) install dns resolver in init(){}.
......@@ -38,7 +38,7 @@ func Register(b Builder) {
// Get returns the resolver builder registered with the given scheme.
// If no builder is register with the scheme, the default scheme will
// be used.
// If the default scheme is not modified, "dns" will be the default
// If the default scheme is not modified, "passthrough" will be the default
// scheme, and the preinstalled dns resolver will be used.
// If the default scheme is modified, and a resolver is registered with
// the scheme, that resolver will be returned.
......@@ -55,7 +55,7 @@ func Get(scheme string) Builder {
// SetDefaultScheme sets the default scheme that will be used.
// The default default scheme is "dns".
// The default default scheme is "passthrough".
func SetDefaultScheme(scheme string) {
defaultScheme = scheme
......@@ -78,7 +78,9 @@ type Address struct {
// Type is the type of this address.
Type AddressType
// ServerName is the name of this address.
// It's the name of the grpc load balancer, which will be used for authentication.
// e.g. if Type is GRPCLB, ServerName should be the name of the remote load
// balancer, not the name of the backend.
ServerName string
// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
......@@ -88,10 +90,18 @@ type Address struct {
// BuildOption includes additional information for the builder to create
// the resolver.
type BuildOption struct {
// UserOptions can be used to pass configuration between DialOptions and the
// resolver.
UserOptions interface{}
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
......@@ -128,8 +138,10 @@ type ResolveNowOption struct{}
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name again.
// It's just a hint, resolver can ignore this if it's not necessary.
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
// It could be called multiple times concurrently.
// Close closes the resolver.
......@@ -19,6 +19,7 @@
package grpc
import (
......@@ -36,20 +37,24 @@ type ccResolverWrapper struct {
// split2 returns the values from strings.SplitN(s, sep, 2).
// If sep is not found, it returns "", s instead.
func split2(s, sep string) (string, string) {
// If sep is not found, it returns ("", s, false) instead.
func split2(s, sep string) (string, string, bool) {
spl := strings.SplitN(s, sep, 2)
if len(spl) < 2 {
return "", s
return "", "", false
return spl[0], spl[1]
return spl[0], spl[1], true
// parseTarget splits target into a struct containing scheme, authority and
// endpoint.
func parseTarget(target string) (ret resolver.Target) {
ret.Scheme, ret.Endpoint = split2(target, "://")
ret.Authority, ret.Endpoint = split2(ret.Endpoint, "/")
var ok bool
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
return resolver.Target{Endpoint: target}
ret.Authority, ret.Endpoint, _ = split2(ret.Endpoint, "/")
return ret
......@@ -57,18 +62,17 @@ func parseTarget(target string) (ret resolver.Target) {
// builder for this scheme. It then builds the resolver and starts the
// monitoring goroutine for it.
// This function could return nil, nil, in tests for old behaviors.
// TODO(bar) never return nil, nil when DNS becomes the default resolver.
// If withResolverBuilder dial option is set, the specified resolver will be
// used instead.
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
target := parseTarget(
grpclog.Infof("dialing to target with scheme: %q", target.Scheme)
grpclog.Infof("dialing to target with scheme: %q", cc.parsedTarget.Scheme)
rb := resolver.Get(target.Scheme)
rb := cc.dopts.resolverBuilder
if rb == nil {
// TODO(bar) return error when DNS becomes the default (implemented and
// registered by DNS package).
grpclog.Infof("could not get resolver for scheme: %q", target.Scheme)
return nil, nil
rb = resolver.Get(cc.parsedTarget.Scheme)
if rb == nil {
return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
ccr := &ccResolverWrapper{
......@@ -79,14 +83,19 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
var err error
ccr.resolver, err = rb.Build(target, ccr, resolver.BuildOption{})
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{
UserOptions: cc.dopts.resolverBuildUserOptions,
if err != nil {
return nil, err
go ccr.watcher()
return ccr, nil
func (ccr *ccResolverWrapper) start() {
go ccr.watcher()
// watcher processes address updates and service config updates sequencially.
// Otherwise, we need to resolve possible races between address and service
// config (e.g. they specify different balancer types).
......@@ -100,20 +109,31 @@ func (ccr *ccResolverWrapper) watcher() {
select {
case addrs := <-ccr.addrCh:
grpclog.Infof("ccResolverWrapper: sending new addresses to balancer wrapper: %v", addrs)
// TODO(bar switching) this should never be nil. Pickfirst should be default.
if != nil {
// TODO(bar switching) create balancer if it's nil?, nil)
select {
case <-ccr.done:
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs), nil)
case sc := <-ccr.scCh:
select {
case <-ccr.done:
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
case <-ccr.done:
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
func (ccr *ccResolverWrapper) close() {
This diff is collapsed.
This diff is collapsed.
* Copyright 2017 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package grpc
import (
const maxInt = int(^uint(0) >> 1)
// MethodConfig defines the configuration recommended by the service providers for a
// particular method.
// DEPRECATED: Users should not use this struct. Service config should be received
// through name resolver, as specified here
type MethodConfig struct {
// WaitForReady indicates whether RPCs sent to this method should wait until
// the connection is ready by default (!failfast). The value specified via the
// gRPC client API will override the value set here.
WaitForReady *bool
// Timeout is the default timeout for RPCs sent to this method. The actual
// deadline used will be the minimum of the value specified here and the value
// set by the application via the gRPC client API. If either one is not set,
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout *time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized
// payload after per-message compression (but before stream compression) in bytes.
// The actual value used is the minimum of the value specified here and the value set
// by the application via the gRPC client API. If either one is not set, then the other
// will be used. If neither is set, then the built-in default is used.
MaxReqSize *int
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
MaxRespSize *int
// ServiceConfig is provided by the service provider and contains parameters for how
// clients that connect to the service should behave.
// DEPRECATED: Users should not use this struct. Service config should be received
// through name resolver, as specified here
type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
LB *string
// Methods contains a map for the methods in this service.
// If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
// If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
// Otherwise, the method has no MethodConfig to use.
Methods map[string]MethodConfig
func parseDuration(s *string) (*time.Duration, error) {
if s == nil {
return nil, nil
if !strings.HasSuffix(*s, "s") {
return nil, fmt.Errorf("malformed duration %q", *s)
ss := strings.SplitN((*s)[:len(*s)-1], ".", 3)
if len(ss) > 2 {
return nil, fmt.Errorf("malformed duration %q", *s)
// hasDigits is set if either the whole or fractional part of the number is
// present, since both are optional but one is required.
hasDigits := false
var d time.Duration
if len(ss[0]) > 0 {
i, err := strconv.ParseInt(ss[0], 10, 32)
if err != nil {
return nil, fmt.Errorf("malformed duration %q: %v", *s, err)
d = time.Duration(i) * time.Second
hasDigits = true
if len(ss) == 2 && len(ss[1]) > 0 {
if len(ss[1]) > 9 {
return nil, fmt.Errorf("malformed duration %q", *s)
f, err := strconv.ParseInt(ss[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("malformed duration %q: %v", *s, err)
for i := 9; i > len(ss[1]); i-- {
f *= 10
d += time.Duration(f)
hasDigits = true
if !hasDigits {
return nil, fmt.Errorf("malformed duration %q", *s)
return &d, nil
type jsonName struct {
Service *string
Method *string
func (j jsonName) generatePath() (string, bool) {
if j.Service == nil {
return "", false
res := "/" + *j.Service + "/"
if j.Method != nil {
res += *j.Method
return res, true
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonMC struct {
Name *[]jsonName
WaitForReady *bool
Timeout *string
MaxRequestMessageBytes *int64
MaxResponseMessageBytes *int64
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct {
LoadBalancingPolicy *string
MethodConfig *[]jsonMC
func parseServiceConfig(js string) (ServiceConfig, error) {
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return ServiceConfig{}, err
sc := ServiceConfig{
LB: rsc.LoadBalancingPolicy,
Methods: make(map[string]MethodConfig),
if rsc.MethodConfig == nil {
return sc, nil
for _, m := range *rsc.MethodConfig {
if m.Name == nil {
d, err := parseDuration(m.Timeout)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return ServiceConfig{}, err
mc := MethodConfig{
WaitForReady: m.WaitForReady,
Timeout: d,
if m.MaxRequestMessageBytes != nil {
if *m.MaxRequestMessageBytes > int64(maxInt) {
mc.MaxReqSize = newInt(maxInt)
} else {
mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes))
if m.MaxResponseMessageBytes != nil {
if *m.MaxResponseMessageBytes > int64(maxInt) {
mc.MaxRespSize = newInt(maxInt)
} else {
mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes))
for _, n := range *m.Name {
if path, valid := n.generatePath(); valid {
sc.Methods[path] = mc
return sc, nil
func min(a, b *int) *int {
if *a < *b {
return a
return b
func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
if mcMax == nil && doptMax == nil {
return &defaultVal
if mcMax != nil && doptMax != nil {
return min(mcMax, doptMax)
if mcMax != nil {
return mcMax
return doptMax
func newInt(b int) *int {
return &b
......@@ -125,8 +125,8 @@ func FromError(err error) (s *Status, ok bool) {
if err == nil {
return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true
if s, ok := err.(*statusError); ok {
return s.status(), true
if se, ok := err.(*statusError); ok {
return se.status(), true
return nil, false
......@@ -166,3 +166,16 @@ func (s *Status) Details() []interface{} {
return details
// Code returns the Code of the error if it is a Status error, codes.OK if err
// is nil, or codes.Unknown otherwise.
func Code(err error) codes.Code {
// Don't use FromError to avoid allocation of OK status.
if err == nil {
return codes.OK
if se, ok := err.(*statusError); ok {
return se.status().Code()
return codes.Unknown
This diff is collapsed.
......@@ -41,12 +41,9 @@ const (
gamma = 2
var (
// Adding arbitrary data to ping so that its ack can be
// identified.
// Easter-egg: what does the ping message say?
bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
// Adding arbitrary data to ping so that its ack can be identified.
// Easter-egg: what does the ping message say?
var bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
type bdpEstimator struct {
// sentAt is the time when the ping was sent.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment