Commit 1472f046 authored by Robert Speicher's avatar Robert Speicher

Merge branch 'go-1.5-compat' into 'master'

Downgrade grpc to get Go 1.5 compatibility

Closes #85

See merge request !133
parents e1e34707 334345d3
......@@ -44,12 +44,29 @@ rspec:ruby2.1:
except:
- tags
go:
# Image taken from gitlab-ce@59f81b4ff8
image: "dev.gitlab.org:5005/gitlab/gitlab-build-images:ruby-2.3.3-golang-1.8-git-2.7-phantomjs-2.1-node-7.1"
.go: &go_definition
before_script:
- apt-get update -qq && apt-get install -y ruby
- ruby -v
script:
- go version
- which go
- bin/compile
- support/go-test
- support/go-format check
go:1.8:
<<: *go_definition
image: golang:1.8
go:1.7:
<<: *go_definition
image: golang:1.7
go:1.6:
<<: *go_definition
image: golang:1.6
go:1.5:
<<: *go_definition
image: golang:1.5
# gRPC-Go
#gRPC-Go
[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc)
......@@ -16,7 +16,23 @@ $ go get google.golang.org/grpc
Prerequisites
-------------
This requires Go 1.6 or later.
This requires Go 1.5 or later.
A note on the version used: significant performance improvements in benchmarks
of grpc-go have been seen by upgrading the go version from 1.5 to the latest
1.7.1.
From https://golang.org/doc/install, one way to install the latest version of go is:
```
$ GO_VERSION=1.7.1
$ OS=linux
$ ARCH=amd64
$ curl -O https://storage.googleapis.com/golang/go${GO_VERSION}.${OS}-${ARCH}.tar.gz
$ sudo tar -C /usr/local -xzf go$GO_VERSION.$OS-$ARCH.tar.gz
$ # Put go on the PATH, keep the usual installation dir
$ sudo ln -s /usr/local/go/bin/go /usr/bin/go
$ rm go$GO_VERSION.$OS-$ARCH.tar.gz
```
Constraints
-----------
......
......@@ -36,14 +36,13 @@ package grpc
import (
"bytes"
"io"
"math"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
......@@ -73,22 +72,19 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
}
}
for {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil {
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {
if err == io.EOF {
break
}
return
}
}
if inPayload != nil && err == io.EOF && stream.Status().Code() == codes.OK {
if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
// Fix the order if necessary.
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
}
c.trailerMD = stream.Trailer()
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
return nil
}
......@@ -231,7 +227,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := status.FromError(err); ok {
if _, ok := err.(*rpcError); ok {
return err
}
if err == errConnClosing || err == errConnUnavailable {
......@@ -285,6 +281,6 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
put()
put = nil
}
return stream.Status().Err()
return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
}
}
......@@ -36,8 +36,8 @@ package grpc
import (
"errors"
"fmt"
"math"
"net"
"strings"
"sync"
"time"
......@@ -45,7 +45,6 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
......@@ -79,6 +78,7 @@ var (
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
......@@ -98,21 +98,11 @@ type dialOptions struct {
timeout time.Duration
scChan <-chan ServiceConfig
copts transport.ConnectOptions
maxMsgSize int
}
const defaultClientMaxMsgSize = math.MaxInt32
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
func WithMaxMsgSize(s int) DialOption {
return func(o *dialOptions) {
o.maxMsgSize = s
}
}
// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
func WithCodec(c Codec) DialOption {
return func(o *dialOptions) {
......@@ -259,13 +249,6 @@ func WithUserAgent(s string) DialOption {
}
}
// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
return func(o *dialOptions) {
o.copts.KeepaliveParams = kp
}
}
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
......@@ -280,15 +263,6 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
}
}
// WithAuthority returns a DialOption that specifies the value to be used as
// the :authority pseudo-header. This value only works with WithInsecure and
// has no effect if TransportCredentials are present.
func WithAuthority(a string) DialOption {
return func(o *dialOptions) {
o.copts.Authority = a
}
}
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
......@@ -305,19 +279,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.dopts.maxMsgSize = defaultClientMaxMsgSize
for _, opt := range opts {
opt(&cc.dopts)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
grpcUA := "grpc-go/" + Version
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
......@@ -357,18 +321,24 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
cc.authority = target
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
}
cc.authority = target[:colonPos]
}
var ok bool
waitC := make(chan error, 1)
go func() {
defer close(waitC)
var addrs []Address
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
if cc.dopts.balancer != nil {
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
} else {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
......@@ -381,22 +351,24 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return
}
ch := cc.dopts.balancer.Notify()
if ch != nil {
if cc.dopts.block {
doneChan := make(chan struct{})
go cc.lbWatcher(doneChan)
<-doneChan
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
} else {
go cc.lbWatcher(nil)
}
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
waitC <- errNoAddr
return
}
}
// No balancer, or no resolver within the balancer. Connect directly.
if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
}
for _, a := range addrs {
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
}
close(waitC)
}()
select {
case <-ctx.Done():
......@@ -407,10 +379,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
return cc, nil
}
......@@ -459,14 +436,9 @@ type ClientConn struct {
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn
// Keepalive parameter can be udated if a GoAway is received.
mkp keepalive.ClientParameters
}
// lbWatcher watches the Notify channel of the balancer in cc and manages
// connections accordingly. If doneChan is not nil, it is closed after the
// first successfull connection is made.
func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
func (cc *ClientConn) lbWatcher() {
for addrs := range cc.dopts.balancer.Notify() {
var (
add []Address // Addresses need to setup connections.
......@@ -493,15 +465,7 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
}
cc.mu.Unlock()
for _, a := range add {
if doneChan != nil {
err := cc.resetAddrConn(a, true, nil)
if err == nil {
close(doneChan)
doneChan = nil
}
} else {
cc.resetAddrConn(a, false, nil)
}
cc.resetAddrConn(a, true, nil)
}
for _, c := range del {
c.tearDown(errConnDrain)
......@@ -530,15 +494,12 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = cc.mkp
cc.mu.RUnlock()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
......@@ -583,7 +544,8 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
stale.tearDown(tearDownErr)
}
}
if block {
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing {
// Tear down ac and delete it from cc.conns.
......@@ -720,20 +682,6 @@ type addrConn struct {
tearDownErr error
}
// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
switch r {
case transport.TooManyPings:
v := 2 * ac.dopts.copts.KeepaliveParams.Time
ac.cc.mu.Lock()
if v > ac.cc.mkp.Time {
ac.cc.mkp.Time = v
}
ac.cc.mu.Unlock()
}
}
// printf records an event in ac's event log, unless ac has been closed.
// REQUIRES ac.mu is held.
func (ac *addrConn) printf(format string, a ...interface{}) {
......@@ -818,8 +766,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
// Don't call cancel in success path due to a race in Go 1.6:
// https://github.com/golang/go/issues/15078.
if err != nil {
cancel()
......@@ -890,7 +836,6 @@ func (ac *addrConn) transportMonitor() {
}
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
......@@ -899,9 +844,9 @@ func (ac *addrConn) transportMonitor() {
// In both cases, a new ac is created.
select {
case <-t.Error():
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
default:
ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
}
return
case <-t.Error():
......@@ -910,8 +855,7 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
return
default:
}
......
......@@ -40,7 +40,7 @@ import (
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. inovker is the handler to complete the RPC
// and it is the responsibility of the interceptor to call it.
// This is the EXPERIMENTAL API.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
......
......@@ -37,6 +37,7 @@ import (
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
......@@ -47,9 +48,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
......@@ -141,7 +140,6 @@ type callInfo struct {
failFast bool
headerMD metadata.MD
trailerMD metadata.MD
peer *peer.Peer
traceInfo traceInfo // in trace.go
}
......@@ -185,22 +183,12 @@ func Trailer(md *metadata.MD) CallOption {
})
}
// Peer returns a CallOption that retrieves peer information for a
// unary RPC.
func Peer(peer *peer.Peer) CallOption {
return afterCall(func(c *callInfo) {
if c.peer != nil {
*peer = *c.peer
}
})
}
// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failfast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will retry
// the call if it fails due to a transient error. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md. Note: failFast is default to true.
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
......@@ -372,57 +360,88 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
return nil
}
// rpcError defines the status from an RPC.
type rpcError struct {
code codes.Code
desc string
}
func (e *rpcError) Error() string {
return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
}
// Code returns the error code for err if it was produced by the rpc system.
// Otherwise, it returns codes.Unknown.
//
// Deprecated; use status.FromError and Code method instead.
func Code(err error) codes.Code {
if s, ok := status.FromError(err); ok {
return s.Code()
if err == nil {
return codes.OK
}
if e, ok := err.(*rpcError); ok {
return e.code
}
return codes.Unknown
}
// ErrorDesc returns the error description of err if it was produced by the rpc system.
// Otherwise, it returns err.Error() or empty string when err is nil.
//
// Deprecated; use status.FromError and Message method instead.
func ErrorDesc(err error) string {
if s, ok := status.FromError(err); ok {
return s.Message()
if err == nil {
return ""
}
if e, ok := err.(*rpcError); ok {
return e.desc
}
return err.Error()
}
// Errorf returns an error containing an error code and a description;
// Errorf returns nil if c is OK.
//
// Deprecated; use status.Errorf instead.
func Errorf(c codes.Code, format string, a ...interface{}) error {
return status.Errorf(c, format, a...)
if c == codes.OK {
return nil
}
return &rpcError{
code: c,
desc: fmt.Sprintf(format, a...),
}
}
// toRPCErr converts an error into an error from the status package.
// toRPCErr converts an error into a rpcError.
func toRPCErr(err error) error {
if _, ok := status.FromError(err); ok {
return err
}
switch e := err.(type) {
case *rpcError:
return err
case transport.StreamError:
return status.Error(e.Code, e.Desc)
return &rpcError{
code: e.Code,
desc: e.Desc,
}
case transport.ConnectionError:
return status.Error(codes.Internal, e.Desc)
return &rpcError{
code: codes.Internal,
desc: e.Desc,
}
default:
switch err {
case context.DeadlineExceeded:
return status.Error(codes.DeadlineExceeded, err.Error())
return &rpcError{
code: codes.DeadlineExceeded,
desc: err.Error(),
}
case context.Canceled:
return status.Error(codes.Canceled, err.Error())
return &rpcError{
code: codes.Canceled,
desc: err.Error(),
}
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
return &rpcError{
code: codes.FailedPrecondition,
desc: err.Error(),
}
}
return status.Error(codes.Unknown, err.Error())
}
return Errorf(codes.Unknown, "%v", err)
}
// convertCode converts a standard Go error into its canonical code. Note that
......@@ -467,17 +486,17 @@ type MethodConfig struct {
// then the other will be used. If neither is set, then the RPC has no deadline.
Timeout time.Duration
// MaxReqSize is the maximum allowed payload size for an individual request in a
// stream (client->server) in bytes. The size which is measured is the serialized
// payload after per-message compression (but before stream compression) in bytes.
// The actual value used is the minumum of the value specified here and the value set
// by the application via the gRPC client API. If either one is not set, then the other
// will be used. If neither is set, then the built-in default is used.
// stream (client->server) in bytes. The size which is measured is the serialized,
// uncompressed payload in bytes. The actual value used is the minumum of the value
// specified here and the value set by the application via the gRPC client API. If
// either one is not set, then the other will be used. If neither is set, then the
// built-in default is used.
// TODO: support this.
MaxReqSize uint32
MaxReqSize uint64
// MaxRespSize is the maximum allowed payload size for an individual response in a
// stream (server->client) in bytes.
// TODO: support this.
MaxRespSize uint32
MaxRespSize uint64
}
// ServiceConfig is provided by the service provider and contains parameters for how
......@@ -498,6 +517,3 @@ type ServiceConfig struct {
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
const SupportPackageIsVersion4 = true
// Version is the current grpc version.
const Version = "1.3.0-dev"
This diff is collapsed.
......@@ -37,6 +37,7 @@ import (
"bytes"
"errors"
"io"
"math"
"sync"
"time"
......@@ -45,7 +46,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
......@@ -178,7 +178,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := status.FromError(err); ok {
if _, ok := err.(*rpcError); ok {
return nil, err
}
if err == errConnClosing || err == errConnUnavailable {
......@@ -214,7 +214,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
maxMsgSize: cc.dopts.maxMsgSize,
cancel: cancel,
put: put,
......@@ -240,7 +239,11 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
case <-s.Done():
// TODO: The trace of the RPC is terminated here when there is no pending
// I/O, which is probably not the optimal solution.
cs.finish(s.Status().Err())
if s.StatusCode() == codes.OK {
cs.finish(nil)
} else {
cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
}
cs.closeTransportStream(nil)
case <-s.GoAway():
cs.finish(errConnDrain)
......@@ -266,7 +269,6 @@ type clientStream struct {
cp Compressor
cbuf *bytes.Buffer
dc Decompressor
maxMsgSize int
cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
......@@ -380,7 +382,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
Client: true,
}
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, inPayload)
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
......@@ -403,30 +405,30 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
// Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inPayload.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, nil)
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if err == io.EOF {
if se := cs.s.Status().Err(); se != nil {
return se
}
if cs.s.StatusCode() == codes.OK {
cs.finish(err)
return nil
}
return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
}
return toRPCErr(err)
}
if _, ok := err.(transport.ConnectionError); !ok {
cs.closeTransportStream(err)
}
if err == io.EOF {
if statusErr := cs.s.Status().Err(); statusErr != nil {
return statusErr
}
if cs.s.StatusCode() == codes.OK {
// Returns io.EOF to indicate the end of the stream.
return
}
return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
}
return toRPCErr(err)
}
......@@ -517,6 +519,8 @@ type serverStream struct {
dc Decompressor
cbuf *bytes.Buffer
maxMsgSize int
statusCode codes.Code
statusDesc string
trInfo *traceInfo
statsHandler stats.Handler
......
......@@ -35,9 +35,7 @@ package transport
import (
"fmt"
"math"
"sync"
"time"
"golang.org/x/net/http2"
)
......@@ -48,16 +46,6 @@ const (
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
infinity = time.Duration(math.MaxInt64)
defaultClientKeepaliveTime = infinity
defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
defaultMaxConnectionIdle = infinity
defaultMaxConnectionAge = infinity
defaultMaxConnectionAgeGrace = infinity
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
)
// The following defines various control items which could flow through
......@@ -85,8 +73,6 @@ type resetStream struct {
func (*resetStream) item() {}
type goAway struct {
code http2.ErrCode
debugData []byte
}
func (*goAway) item() {}
......
......@@ -53,7 +53,6 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC
......@@ -183,7 +182,7 @@ func (ht *serverHandlerTransport) do(fn func()) error {
}
}
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
err := ht.do(func() {
ht.writeCommonHeaders(s)
......@@ -193,13 +192,10 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
ht.rw.(http.Flusher).Flush()
h := ht.rw.Header()
h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
if m := st.Message(); m != "" {
h.Set("Grpc-Message", encodeGrpcMessage(m))
h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode))
if statusDesc != "" {
h.Set("Grpc-Message", encodeGrpcMessage(statusDesc))
}
// TODO: Support Grpc-Status-Details-Bin
if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
......@@ -238,7 +234,6 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
// and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
h.Add("Trailer", "Grpc-Status")
h.Add("Trailer", "Grpc-Message")
// TODO: Support Grpc-Status-Details-Bin
if s.sendCompress != "" {
h.Set("Grpc-Encoding", s.sendCompress)
......@@ -319,7 +314,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
if req.TLS != nil {
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
ctx = metadata.NewContext(ctx, ht.headerMD)
ctx = peer.NewContext(ctx, pr)
s.ctx = newContextWithStream(ctx, s)
s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}
......
......@@ -44,17 +44,16 @@ import (
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const (
// The primary user agent
primaryUA = "grpc-go/1.0"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
......@@ -93,15 +92,13 @@ var (
// Records the states during HPACK decoding. Must be reset once the
// decoding of the entire headers are finished.
type decodeState struct {
err error // first error encountered decoding
encoding string
// statusGen caches the stream status received from the trailer the server
// sent. Client side only. Do not access directly. After all trailers are
// parsed, use the status method to retrieve the status.
statusGen *status.Status
// rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
// intended for direct access outside of parsing.
rawStatusCode int32
rawStatusMsg string
// statusCode caches the stream status received from the trailer
// the server sent. Client side only.
statusCode codes.Code
statusDesc string
// Server side only fields.
timeoutSet bool
timeout time.Duration
......@@ -124,7 +121,6 @@ func isReservedHeader(hdr string) bool {
"grpc-message",
"grpc-status",
"grpc-timeout",
"grpc-status-details-bin",
"te":
return true
default:
......@@ -143,6 +139,12 @@ func isWhitelistedPseudoHeader(hdr string) bool {
}
}
func (d *decodeState) setErr(err error) {
if d.err == nil {
d.err = err
}
}
func validContentType(t string) bool {
e := "application/grpc"
if !strings.HasPrefix(t, e) {
......@@ -156,62 +158,56 @@ func validContentType(t string) bool {
return true
}
func (d *decodeState) status() *status.Status {
if d.statusGen == nil {
// No status-details were provided; generate status using code/msg.
d.statusGen = status.New(codes.Code(d.rawStatusCode), d.rawStatusMsg)
}
return d.statusGen
}
func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
func (d *decodeState) processHeaderField(f hpack.HeaderField) {
switch f.Name {
case "content-type":
if !validContentType(f.Value) {
return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
return
}
case "grpc-encoding":
d.encoding = f.Value
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
return
}
d.rawStatusCode = int32(code)
d.statusCode = codes.Code(code)
case "grpc-message":
d.rawStatusMsg = decodeGrpcMessage(f.Value)
case "grpc-status-details-bin":
_, v, err := metadata.DecodeKeyValue("grpc-status-details-bin", f.Value)
if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
}
s := &spb.Status{}
if err := proto.Unmarshal([]byte(v), s); err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
}
d.statusGen = status.FromProto(s)
d.statusDesc = decodeGrpcMessage(f.Value)
case "grpc-timeout":
d.timeoutSet = true
var err error
if d.timeout, err = decodeTimeout(f.Value); err != nil {
return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
d.timeout, err = decodeTimeout(f.Value)
if err != nil {
d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
return
}
case ":path":
d.method = f.Value
default:
if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
if f.Name == "user-agent" {
i := strings.LastIndex(f.Value, " ")
if i == -1 {
// There is no application user agent string being set.
return
}
// Extract the application user agent string.
f.Value = f.Value[:i]
}
if d.mdata == nil {
d.mdata = make(map[string][]string)
}
k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
if err != nil {
grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
return nil
return
}
d.mdata[k] = append(d.mdata[k], v)
}
}
return nil
}
type timeoutUnit uint8
......@@ -383,9 +379,6 @@ func newFramer(conn net.Conn) *framer {
writer: bufio.NewWriterSize(conn, http2IOBufSize),
}
f.fr = http2.NewFramer(f.writer, f.reader)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}
......
// +build !go1.6
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"net"
"time"
"golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
var dialer net.Dialer
if deadline, ok := ctx.Deadline(); ok {
dialer.Timeout = deadline.Sub(time.Now())
}
return dialer.Dial(network, address)
}
......@@ -45,13 +45,10 @@ import (
"sync"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
)
......@@ -213,13 +210,9 @@ type Stream struct {
// true iff headerChan is closed. Used to avoid closing headerChan
// multiple times.
headerDone bool
// the status error received from the server.
status *status.Status
// rstStream indicates whether a RST_STREAM frame needs to be sent
// to the server to signify that this stream is closing.
rstStream bool
// rstError is the error that needs to be sent along with the RST_STREAM frame.
rstError http2.ErrCode
// the status received from the server.
statusCode codes.Code
statusDesc string
}
// RecvCompress returns the compression algorithm applied to the inbound
......@@ -284,9 +277,14 @@ func (s *Stream) Method() string {
return s.method
}
// Status returns the status received from the server.
func (s *Stream) Status() *status.Status {
return s.status
// StatusCode returns statusCode received from the server.
func (s *Stream) StatusCode() codes.Code {
return s.statusCode
}
// StatusDesc returns statusDesc received from the server.
func (s *Stream) StatusDesc() string {
return s.statusDesc
}
// SetHeader sets the header metadata. This can be called multiple times.
......@@ -333,20 +331,6 @@ func (s *Stream) Read(p []byte) (n int, err error) {
return
}
// finish sets the stream's state and status, and closes the done channel.
// s.mu must be held by the caller. st must always be non-nil.
func (s *Stream) finish(st *status.Status) {
s.status = st
s.state = streamDone
close(s.done)
}
// GoString is implemented by Stream so context.String() won't
// race when printing %#v.
func (s *Stream) GoString() string {
return fmt.Sprintf("<stream: %p, %v>", s, s.method)
}
// The key to save transport.Stream in the context.
type streamKey struct{}
......@@ -378,8 +362,6 @@ type ServerConfig struct {
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
......@@ -392,9 +374,6 @@ func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (S
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Authority is the :authority pseudo-header to use. This field has no effect if
// TransportCredentials is set.
Authority string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
......@@ -403,8 +382,6 @@ type ConnectOptions struct {
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}
......@@ -493,9 +470,6 @@ type ClientTransport interface {
// receives the draining signal from the server (e.g., GOAWAY frame in
// HTTP/2).
GoAway() <-chan struct{}
// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
}
// ServerTransport is the common interface for all gRPC server-side transport
......@@ -515,9 +489,10 @@ type ServerTransport interface {
// Write may not be called on all streams.
Write(s *Stream, data []byte, opts *Options) error
// WriteStatus sends the status of a stream to the client. WriteStatus is
// the final call made on a stream and always occurs.
WriteStatus(s *Stream, st *status.Status) error
// WriteStatus sends the status of a stream to the client.
// WriteStatus is the final call made on a stream and always
// occurs.
WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
......@@ -583,8 +558,6 @@ var (
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
)
// TODO: See if we can replace StreamError with status package errors.
// StreamError is an error that only affects one stream within a connection.
type StreamError struct {
Code codes.Code
......@@ -592,7 +565,7 @@ type StreamError struct {
}
func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc)
}
// ContextErr converts the error from context package into a StreamError.
......@@ -633,16 +606,3 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-
return i, nil
}
}
// GoAwayReason contains the reason for the GoAway frame received.
type GoAwayReason uint8
const (
// Invalid indicates that no GoAway frame is received.
Invalid GoAwayReason = 0
// NoReason is the default value when GoAway frame is received.
NoReason GoAwayReason = 1
// TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
// was recieved and that the debug data said "too_many_pings".
TooManyPings GoAwayReason = 2
)
......@@ -132,10 +132,10 @@
"revisionTime": "2017-04-04T13:20:09Z"
},
{
"checksumSHA1": "tidJMmntKTZuU196aiLojkULL+g=",
"checksumSHA1": "epHwh7hDQSYzDowPIbw8vnLzPS0=",
"path": "google.golang.org/grpc",
"revision": "6d158dbf32084eac5fc0b9ea6f1feed214290ec6",
"revisionTime": "2017-04-12T06:39:30Z"
"revision": "50955793b0183f9de69bd78e2ec251cf20aab121",
"revisionTime": "2017-01-11T19:10:52Z"
},
{
"checksumSHA1": "08icuA15HRkdYCt6H+Cs90RPQsY=",
......@@ -204,10 +204,10 @@
"revisionTime": "2017-04-12T06:39:30Z"
},
{
"checksumSHA1": "WMlN+OrgFM70j2/AoMh6DM6NtK8=",
"checksumSHA1": "yHpUeGwKoqqwd3cbEp3lkcnvft0=",
"path": "google.golang.org/grpc/transport",
"revision": "6d158dbf32084eac5fc0b9ea6f1feed214290ec6",
"revisionTime": "2017-04-12T06:39:30Z"
"revision": "50955793b0183f9de69bd78e2ec251cf20aab121",
"revisionTime": "2017-01-11T19:10:52Z"
},
{
"checksumSHA1": "fALlQNY1fM99NesfLJ50KguWsio=",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment