Commit 25d866fa authored by Jacob Vosmaer (GitLab)'s avatar Jacob Vosmaer (GitLab)

Merge branch 'bjk/grpc_vendor' into 'master'

More vendoring updates

See merge request gitlab-org/gitlab-workhorse!280
parents 2fd82d0f 273a9016
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [1.2.0](https://github.com/grpc-ecosystem/go-grpc-prometheus/releases/tag/v1.2.0) - 2018-06-04
### Added
* Provide metrics object as `prometheus.Collector`, for conventional metric registration.
* Support non-default/global Prometheus registry.
* Allow configuring counters with `prometheus.CounterOpts`.
### Changed
* Remove usage of deprecated `grpc.Code()`.
* Remove usage of deprecated `grpc.Errorf` and replace with `status.Errorf`.
---
This changelog was started with version `v1.2.0`, for earlier versions refer to the respective [GitHub releases](https://github.com/grpc-ecosystem/go-grpc-prometheus/releases).
...@@ -49,8 +49,8 @@ import "github.com/grpc-ecosystem/go-grpc-prometheus" ...@@ -49,8 +49,8 @@ import "github.com/grpc-ecosystem/go-grpc-prometheus"
... ...
clientConn, err = grpc.Dial( clientConn, err = grpc.Dial(
address, address,
grpc.WithUnaryInterceptor(UnaryClientInterceptor), grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(StreamClientInterceptor) grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor)
) )
client = pb_testproto.NewTestServiceClient(clientConn) client = pb_testproto.NewTestServiceClient(clientConn)
resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"}) resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"})
...@@ -118,7 +118,7 @@ each of the 20 messages sent back, a counter will be incremented: ...@@ -118,7 +118,7 @@ each of the 20 messages sent back, a counter will be incremented:
grpc_server_msg_sent_total{grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream"} 20 grpc_server_msg_sent_total{grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream"} 20
``` ```
After the call completes, it's status (`OK` or other [gRPC status code](https://github.com/grpc/grpc-go/blob/master/codes/codes.go)) After the call completes, its status (`OK` or other [gRPC status code](https://github.com/grpc/grpc-go/blob/master/codes/codes.go))
and the relevant call labels increment the `grpc_server_handled_total` counter. and the relevant call labels increment the `grpc_server_handled_total` counter.
```jsoniq ```jsoniq
...@@ -128,8 +128,8 @@ grpc_server_handled_total{grpc_code="OK",grpc_method="PingList",grpc_service="mw ...@@ -128,8 +128,8 @@ grpc_server_handled_total{grpc_code="OK",grpc_method="PingList",grpc_service="mw
## Histograms ## Histograms
[Prometheus histograms](https://prometheus.io/docs/concepts/metric_types/#histogram) are a great way [Prometheus histograms](https://prometheus.io/docs/concepts/metric_types/#histogram) are a great way
to measure latency distributions of your RPCs. However since it is bad practice to have metrics to measure latency distributions of your RPCs. However, since it is bad practice to have metrics
of [high cardinality](https://prometheus.io/docs/practices/instrumentation/#do-not-overuse-labels)) of [high cardinality](https://prometheus.io/docs/practices/instrumentation/#do-not-overuse-labels)
the latency monitoring metrics are disabled by default. To enable them please call the following the latency monitoring metrics are disabled by default. To enable them please call the following
in your server initialization code: in your server initialization code:
...@@ -137,8 +137,8 @@ in your server initialization code: ...@@ -137,8 +137,8 @@ in your server initialization code:
grpc_prometheus.EnableHandlingTimeHistogram() grpc_prometheus.EnableHandlingTimeHistogram()
``` ```
After the call completes, it's handling time will be recorded in a [Prometheus histogram](https://prometheus.io/docs/concepts/metric_types/#histogram) After the call completes, its handling time will be recorded in a [Prometheus histogram](https://prometheus.io/docs/concepts/metric_types/#histogram)
variable `grpc_server_handling_seconds`. It contains three sub-metrics: variable `grpc_server_handling_seconds`. The histogram variable contains three sub-metrics:
* `grpc_server_handling_seconds_count` - the count of all completed RPCs by status and method * `grpc_server_handling_seconds_count` - the count of all completed RPCs by status and method
* `grpc_server_handling_seconds_sum` - cumulative time of RPCs by status and method, useful for * `grpc_server_handling_seconds_sum` - cumulative time of RPCs by status and method, useful for
...@@ -168,7 +168,7 @@ grpc_server_handling_seconds_count{grpc_code="OK",grpc_method="PingList",grpc_se ...@@ -168,7 +168,7 @@ grpc_server_handling_seconds_count{grpc_code="OK",grpc_method="PingList",grpc_se
## Useful query examples ## Useful query examples
Prometheus philosophy is to provide the most detailed metrics possible to the monitoring system, and Prometheus philosophy is to provide raw metrics to the monitoring system, and
let the aggregations be handled there. The verbosity of above metrics make it possible to have that let the aggregations be handled there. The verbosity of above metrics make it possible to have that
flexibility. Here's a couple of useful monitoring queries: flexibility. Here's a couple of useful monitoring queries:
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
// ClientMetrics represents a collection of metrics to be registered on a // ClientMetrics represents a collection of metrics to be registered on a
...@@ -25,31 +26,32 @@ type ClientMetrics struct { ...@@ -25,31 +26,32 @@ type ClientMetrics struct {
// ClientMetrics when not using the default Prometheus metrics registry, for // ClientMetrics when not using the default Prometheus metrics registry, for
// example when wanting to control which metrics are added to a registry as // example when wanting to control which metrics are added to a registry as
// opposed to automatically adding metrics via init functions. // opposed to automatically adding metrics via init functions.
func NewClientMetrics() *ClientMetrics { func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
opts := counterOptions(counterOpts)
return &ClientMetrics{ return &ClientMetrics{
clientStartedCounter: prom.NewCounterVec( clientStartedCounter: prom.NewCounterVec(
prom.CounterOpts{ opts.apply(prom.CounterOpts{
Name: "grpc_client_started_total", Name: "grpc_client_started_total",
Help: "Total number of RPCs started on the client.", Help: "Total number of RPCs started on the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}), }), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientHandledCounter: prom.NewCounterVec( clientHandledCounter: prom.NewCounterVec(
prom.CounterOpts{ opts.apply(prom.CounterOpts{
Name: "grpc_client_handled_total", Name: "grpc_client_handled_total",
Help: "Total number of RPCs completed by the client, regardless of success or failure.", Help: "Total number of RPCs completed by the client, regardless of success or failure.",
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}), }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
clientStreamMsgReceived: prom.NewCounterVec( clientStreamMsgReceived: prom.NewCounterVec(
prom.CounterOpts{ opts.apply(prom.CounterOpts{
Name: "grpc_client_msg_received_total", Name: "grpc_client_msg_received_total",
Help: "Total number of RPC stream messages received by the client.", Help: "Total number of RPC stream messages received by the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}), }), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientStreamMsgSent: prom.NewCounterVec( clientStreamMsgSent: prom.NewCounterVec(
prom.CounterOpts{ opts.apply(prom.CounterOpts{
Name: "grpc_client_msg_sent_total", Name: "grpc_client_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the client.", Help: "Total number of gRPC stream messages sent by the client.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}), }), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientHandledHistogramEnabled: false, clientHandledHistogramEnabled: false,
clientHandledHistogramOpts: prom.HistogramOpts{ clientHandledHistogramOpts: prom.HistogramOpts{
...@@ -111,18 +113,20 @@ func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, metho ...@@ -111,18 +113,20 @@ func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, metho
if err != nil { if err != nil {
monitor.ReceivedMessage() monitor.ReceivedMessage()
} }
monitor.Handled(grpc.Code(err)) st, _ := status.FromError(err)
monitor.Handled(st.Code())
return err return err
} }
} }
// StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs. // StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
monitor := newClientReporter(m, clientStreamType(desc), method) monitor := newClientReporter(m, clientStreamType(desc), method)
clientStream, err := streamer(ctx, desc, cc, method, opts...) clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil { if err != nil {
monitor.Handled(grpc.Code(err)) st, _ := status.FromError(err)
monitor.Handled(st.Code())
return nil, err return nil, err
} }
return &monitoredClientStream{clientStream, monitor}, nil return &monitoredClientStream{clientStream, monitor}, nil
...@@ -159,7 +163,8 @@ func (s *monitoredClientStream) RecvMsg(m interface{}) error { ...@@ -159,7 +163,8 @@ func (s *monitoredClientStream) RecvMsg(m interface{}) error {
} else if err == io.EOF { } else if err == io.EOF {
s.monitor.Handled(codes.OK) s.monitor.Handled(codes.OK)
} else { } else {
s.monitor.Handled(grpc.Code(err)) st, _ := status.FromError(err)
s.monitor.Handled(st.Code())
} }
return err return err
} }
SHELL="/bin/bash"
GOFILES_NOVENDOR = $(shell go list ./... | grep -v /vendor/)
all: vet fmt test
fmt:
go fmt $(GOFILES_NOVENDOR)
vet:
go vet $(GOFILES_NOVENDOR)
test: vet
./scripts/test_all.sh
.PHONY: all vet test
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
)
// A CounterOption lets you add options to Counter metrics using With* funcs.
type CounterOption func(*prom.CounterOpts)
type counterOptions []CounterOption
func (co counterOptions) apply(o prom.CounterOpts) prom.CounterOpts {
for _, f := range co {
f(&o)
}
return o
}
// WithConstLabels allows you to add ConstLabels to Counter metrics.
func WithConstLabels(labels prom.Labels) CounterOption {
return func(o *prom.CounterOpts) {
o.ConstLabels = labels
}
}
// A HistogramOption lets you add options to Histogram metrics using With*
// funcs.
type HistogramOption func(*prom.HistogramOpts)
// WithHistogramBuckets allows you to specify custom bucket ranges for histograms if EnableHandlingTimeHistogram is on.
func WithHistogramBuckets(buckets []float64) HistogramOption {
return func(o *prom.HistogramOpts) { o.Buckets = buckets }
}
// WithHistogramConstLabels allows you to add custom ConstLabels to
// histograms metrics.
func WithHistogramConstLabels(labels prom.Labels) HistogramOption {
return func(o *prom.HistogramOpts) {
o.ConstLabels = labels
}
}
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/status"
) )
// ServerMetrics represents a collection of metrics to be registered on a // ServerMetrics represents a collection of metrics to be registered on a
...@@ -22,28 +23,29 @@ type ServerMetrics struct { ...@@ -22,28 +23,29 @@ type ServerMetrics struct {
// ServerMetrics when not using the default Prometheus metrics registry, for // ServerMetrics when not using the default Prometheus metrics registry, for
// example when wanting to control which metrics are added to a registry as // example when wanting to control which metrics are added to a registry as
// opposed to automatically adding metrics via init functions. // opposed to automatically adding metrics via init functions.
func NewServerMetrics() *ServerMetrics { func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
opts := counterOptions(counterOpts)
return &ServerMetrics{ return &ServerMetrics{
serverStartedCounter: prom.NewCounterVec( serverStartedCounter: prom.NewCounterVec(
prom.CounterOpts{ opts.apply(prom.CounterOpts{
Name: "grpc_server_started_total", Name: "grpc_server_started_total",
Help: "Total number of RPCs started on the server.", Help: "Total number of RPCs started on the server.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}), }), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledCounter: prom.NewCounterVec( serverHandledCounter: prom.NewCounterVec(
prom.CounterOpts{ opts.apply(prom.CounterOpts{
Name: "grpc_server_handled_total", Name: "grpc_server_handled_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.", Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}), }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
serverStreamMsgReceived: prom.NewCounterVec( serverStreamMsgReceived: prom.NewCounterVec(
prom.CounterOpts{ opts.apply(prom.CounterOpts{
Name: "grpc_server_msg_received_total", Name: "grpc_server_msg_received_total",
Help: "Total number of RPC stream messages received on the server.", Help: "Total number of RPC stream messages received on the server.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}), }), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverStreamMsgSent: prom.NewCounterVec( serverStreamMsgSent: prom.NewCounterVec(
prom.CounterOpts{ opts.apply(prom.CounterOpts{
Name: "grpc_server_msg_sent_total", Name: "grpc_server_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the server.", Help: "Total number of gRPC stream messages sent by the server.",
}, []string{"grpc_type", "grpc_service", "grpc_method"}), }), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledHistogramEnabled: false, serverHandledHistogramEnabled: false,
serverHandledHistogramOpts: prom.HistogramOpts{ serverHandledHistogramOpts: prom.HistogramOpts{
Name: "grpc_server_handling_seconds", Name: "grpc_server_handling_seconds",
...@@ -54,13 +56,6 @@ func NewServerMetrics() *ServerMetrics { ...@@ -54,13 +56,6 @@ func NewServerMetrics() *ServerMetrics {
} }
} }
type HistogramOption func(*prom.HistogramOpts)
// WithHistogramBuckets allows you to specify custom bucket ranges for histograms if EnableHandlingTimeHistogram is on.
func WithHistogramBuckets(buckets []float64) HistogramOption {
return func(o *prom.HistogramOpts) { o.Buckets = buckets }
}
// EnableHandlingTimeHistogram enables histograms being registered when // EnableHandlingTimeHistogram enables histograms being registered when
// registering the ServerMetrics on a Prometheus registry. Histograms can be // registering the ServerMetrics on a Prometheus registry. Histograms can be
// expensive on Prometheus servers. It takes options to configure histogram // expensive on Prometheus servers. It takes options to configure histogram
...@@ -110,7 +105,8 @@ func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req i ...@@ -110,7 +105,8 @@ func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req i
monitor := newServerReporter(m, Unary, info.FullMethod) monitor := newServerReporter(m, Unary, info.FullMethod)
monitor.ReceivedMessage() monitor.ReceivedMessage()
resp, err := handler(ctx, req) resp, err := handler(ctx, req)
monitor.Handled(grpc.Code(err)) st, _ := status.FromError(err)
monitor.Handled(st.Code())
if err == nil { if err == nil {
monitor.SentMessage() monitor.SentMessage()
} }
...@@ -121,9 +117,10 @@ func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req i ...@@ -121,9 +117,10 @@ func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req i
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs. // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
monitor := newServerReporter(m, streamRpcType(info), info.FullMethod) monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
err := handler(srv, &monitoredServerStream{ss, monitor}) err := handler(srv, &monitoredServerStream{ss, monitor})
monitor.Handled(grpc.Code(err)) st, _ := status.FromError(err)
monitor.Handled(st.Code())
return err return err
} }
} }
...@@ -140,27 +137,7 @@ func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) { ...@@ -140,27 +137,7 @@ func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
} }
} }
// Register registers all server metrics in a given metrics registry. Depending func streamRPCType(info *grpc.StreamServerInfo) grpcType {
// on histogram options and whether they are enabled, histogram metrics are
// also registered.
//
// Deprecated: ServerMetrics implements Prometheus Collector interface. You can
// register an instance of ServerMetrics directly by using
// prometheus.Register(m).
func (m *ServerMetrics) Register(r prom.Registerer) error {
return r.Register(m)
}
// MustRegister tries to register all server metrics and panics on an error.
//
// Deprecated: ServerMetrics implements Prometheus Collector interface. You can
// register an instance of ServerMetrics directly by using
// prometheus.MustRegister(m).
func (m *ServerMetrics) MustRegister(r prom.Registerer) {
r.MustRegister(m)
}
func streamRpcType(info *grpc.StreamServerInfo) grpcType {
if info.IsClientStream && !info.IsServerStream { if info.IsClientStream && !info.IsServerStream {
return ClientStream return ClientStream
} else if !info.IsClientStream && info.IsServerStream { } else if !info.IsClientStream && info.IsServerStream {
......
#!/usr/bin/env bash
set -e
echo "" > coverage.txt
for d in $(go list ./... | grep -v vendor); do
echo -e "TESTS FOR: for \033[0;35m${d}\033[0m"
go test -race -v -coverprofile=profile.coverage.out -covermode=atomic $d
if [ -f profile.coverage.out ]; then
cat profile.coverage.out >> coverage.txt
rm profile.coverage.out
fi
echo ""
done
...@@ -37,13 +37,13 @@ func splitMethodName(fullMethodName string) (string, string) { ...@@ -37,13 +37,13 @@ func splitMethodName(fullMethodName string) (string, string) {
} }
func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType { func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType {
if mInfo.IsClientStream == false && mInfo.IsServerStream == false { if !mInfo.IsClientStream && !mInfo.IsServerStream {
return Unary return Unary
} }
if mInfo.IsClientStream == true && mInfo.IsServerStream == false { if mInfo.IsClientStream && !mInfo.IsServerStream {
return ClientStream return ClientStream
} }
if mInfo.IsClientStream == false && mInfo.IsServerStream == true { if !mInfo.IsClientStream && mInfo.IsServerStream {
return ServerStream return ServerStream
} }
return BidiStream return BidiStream
......
# 1.0.5
* Fix hooks race (#707)
* Fix panic deadlock (#695)
# 1.0.4 # 1.0.4
* Fix race when adding hooks (#612) * Fix race when adding hooks (#612)
......
...@@ -220,7 +220,7 @@ Logrus comes with [built-in hooks](hooks/). Add those, or your custom hook, in ...@@ -220,7 +220,7 @@ Logrus comes with [built-in hooks](hooks/). Add those, or your custom hook, in
```go ```go
import ( import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"gopkg.in/gemnasium/logrus-airbrake-hook.v2" // the package is named "aibrake" "gopkg.in/gemnasium/logrus-airbrake-hook.v2" // the package is named "airbrake"
logrus_syslog "github.com/sirupsen/logrus/hooks/syslog" logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
"log/syslog" "log/syslog"
) )
...@@ -241,58 +241,8 @@ func init() { ...@@ -241,58 +241,8 @@ func init() {
``` ```
Note: Syslog hook also support connecting to local syslog (Ex. "/dev/log" or "/var/run/syslog" or "/var/run/log"). For the detail, please check the [syslog hook README](hooks/syslog/README.md). Note: Syslog hook also support connecting to local syslog (Ex. "/dev/log" or "/var/run/syslog" or "/var/run/log"). For the detail, please check the [syslog hook README](hooks/syslog/README.md).
| Hook | Description | A list of currently known of service hook can be found in this wiki [page](https://github.com/sirupsen/logrus/wiki/Hooks)
| ----- | ----------- |
| [Airbrake "legacy"](https://github.com/gemnasium/logrus-airbrake-legacy-hook) | Send errors to an exception tracking service compatible with the Airbrake API V2. Uses [`airbrake-go`](https://github.com/tobi/airbrake-go) behind the scenes. |
| [Airbrake](https://github.com/gemnasium/logrus-airbrake-hook) | Send errors to the Airbrake API V3. Uses the official [`gobrake`](https://github.com/airbrake/gobrake) behind the scenes. |
| [Amazon Kinesis](https://github.com/evalphobia/logrus_kinesis) | Hook for logging to [Amazon Kinesis](https://aws.amazon.com/kinesis/) |
| [Amqp-Hook](https://github.com/vladoatanasov/logrus_amqp) | Hook for logging to Amqp broker (Like RabbitMQ) |
| [AzureTableHook](https://github.com/kpfaulkner/azuretablehook/) | Hook for logging to Azure Table Storage|
| [Bugsnag](https://github.com/Shopify/logrus-bugsnag/blob/master/bugsnag.go) | Send errors to the Bugsnag exception tracking service. |
| [DeferPanic](https://github.com/deferpanic/dp-logrus) | Hook for logging to DeferPanic |
| [Discordrus](https://github.com/kz/discordrus) | Hook for logging to [Discord](https://discordapp.com/) |
| [ElasticSearch](https://github.com/sohlich/elogrus) | Hook for logging to ElasticSearch|
| [Firehose](https://github.com/beaubrewer/logrus_firehose) | Hook for logging to [Amazon Firehose](https://aws.amazon.com/kinesis/firehose/)
| [Fluentd](https://github.com/evalphobia/logrus_fluent) | Hook for logging to fluentd |
| [Go-Slack](https://github.com/multiplay/go-slack) | Hook for logging to [Slack](https://slack.com) |
| [Graylog](https://github.com/gemnasium/logrus-graylog-hook) | Hook for logging to [Graylog](http://graylog2.org/) |
| [Hiprus](https://github.com/nubo/hiprus) | Send errors to a channel in hipchat. |
| [Honeybadger](https://github.com/agonzalezro/logrus_honeybadger) | Hook for sending exceptions to Honeybadger |
| [InfluxDB](https://github.com/Abramovic/logrus_influxdb) | Hook for logging to influxdb |
| [Influxus](http://github.com/vlad-doru/influxus) | Hook for concurrently logging to [InfluxDB](http://influxdata.com/) |
| [Journalhook](https://github.com/wercker/journalhook) | Hook for logging to `systemd-journald` |
| [KafkaLogrus](https://github.com/tracer0tong/kafkalogrus) | Hook for logging to Kafka |
| [LFShook](https://github.com/rifflock/lfshook) | Hook for logging to the local filesystem |
| [Logbeat](https://github.com/macandmia/logbeat) | Hook for logging to [Opbeat](https://opbeat.com/) |
| [Logentries](https://github.com/jcftang/logentriesrus) | Hook for logging to [Logentries](https://logentries.com/) |
| [Logentrus](https://github.com/puddingfactory/logentrus) | Hook for logging to [Logentries](https://logentries.com/) |
| [Logmatic.io](https://github.com/logmatic/logmatic-go) | Hook for logging to [Logmatic.io](http://logmatic.io/) |
| [Logrusly](https://github.com/sebest/logrusly) | Send logs to [Loggly](https://www.loggly.com/) |
| [Logstash](https://github.com/bshuster-repo/logrus-logstash-hook) | Hook for logging to [Logstash](https://www.elastic.co/products/logstash) |
| [Mail](https://github.com/zbindenren/logrus_mail) | Hook for sending exceptions via mail |
| [Mattermost](https://github.com/shuLhan/mattermost-integration/tree/master/hooks/logrus) | Hook for logging to [Mattermost](https://mattermost.com/) |
| [Mongodb](https://github.com/weekface/mgorus) | Hook for logging to mongodb |
| [NATS-Hook](https://github.com/rybit/nats_logrus_hook) | Hook for logging to [NATS](https://nats.io) |
| [Octokit](https://github.com/dorajistyle/logrus-octokit-hook) | Hook for logging to github via octokit |
| [Papertrail](https://github.com/polds/logrus-papertrail-hook) | Send errors to the [Papertrail](https://papertrailapp.com) hosted logging service via UDP. |
| [PostgreSQL](https://github.com/gemnasium/logrus-postgresql-hook) | Send logs to [PostgreSQL](http://postgresql.org) |
| [Promrus](https://github.com/weaveworks/promrus) | Expose number of log messages as [Prometheus](https://prometheus.io/) metrics |
| [Pushover](https://github.com/toorop/logrus_pushover) | Send error via [Pushover](https://pushover.net) |
| [Raygun](https://github.com/squirkle/logrus-raygun-hook) | Hook for logging to [Raygun.io](http://raygun.io/) |
| [Redis-Hook](https://github.com/rogierlommers/logrus-redis-hook) | Hook for logging to a ELK stack (through Redis) |
| [Rollrus](https://github.com/heroku/rollrus) | Hook for sending errors to rollbar |
| [Scribe](https://github.com/sagar8192/logrus-scribe-hook) | Hook for logging to [Scribe](https://github.com/facebookarchive/scribe)|
| [Sentry](https://github.com/evalphobia/logrus_sentry) | Send errors to the Sentry error logging and aggregation service. |
| [Slackrus](https://github.com/johntdyer/slackrus) | Hook for Slack chat. |
| [Stackdriver](https://github.com/knq/sdhook) | Hook for logging to [Google Stackdriver](https://cloud.google.com/logging/) |
| [Sumorus](https://github.com/doublefree/sumorus) | Hook for logging to [SumoLogic](https://www.sumologic.com/)|
| [Syslog](https://github.com/sirupsen/logrus/blob/master/hooks/syslog/syslog.go) | Send errors to remote syslog server. Uses standard library `log/syslog` behind the scenes. |
| [Syslog TLS](https://github.com/shinji62/logrus-syslog-ng) | Send errors to remote syslog server with TLS support. |
| [Telegram](https://github.com/rossmcdonald/telegram_hook) | Hook for logging errors to [Telegram](https://telegram.org/) |
| [TraceView](https://github.com/evalphobia/logrus_appneta) | Hook for logging to [AppNeta TraceView](https://www.appneta.com/products/traceview/) |
| [Typetalk](https://github.com/dragon3/logrus-typetalk-hook) | Hook for logging to [Typetalk](https://www.typetalk.in/) |
| [logz.io](https://github.com/ripcurld00d/logrus-logzio-hook) | Hook for logging to [logz.io](https://logz.io), a Log as a Service using Logstash |
| [SQS-Hook](https://github.com/tsarpaul/logrus_sqs) | Hook for logging to [Amazon Simple Queue Service (SQS)](https://aws.amazon.com/sqs/) |
#### Level logging #### Level logging
...@@ -370,6 +320,8 @@ The built-in logging formatters are: ...@@ -370,6 +320,8 @@ The built-in logging formatters are:
field to `true`. To force no colored output even if there is a TTY set the field to `true`. To force no colored output even if there is a TTY set the
`DisableColors` field to `true`. For Windows, see `DisableColors` field to `true`. For Windows, see
[github.com/mattn/go-colorable](https://github.com/mattn/go-colorable). [github.com/mattn/go-colorable](https://github.com/mattn/go-colorable).
* When colors are enabled, levels are truncated to 4 characters by default. To disable
truncation set the `DisableLevelTruncation` field to `true`.
* All options are listed in the [generated docs](https://godoc.org/github.com/sirupsen/logrus#TextFormatter). * All options are listed in the [generated docs](https://godoc.org/github.com/sirupsen/logrus#TextFormatter).
* `logrus.JSONFormatter`. Logs fields as JSON. * `logrus.JSONFormatter`. Logs fields as JSON.
* All options are listed in the [generated docs](https://godoc.org/github.com/sirupsen/logrus#JSONFormatter). * All options are listed in the [generated docs](https://godoc.org/github.com/sirupsen/logrus#JSONFormatter).
...@@ -493,7 +445,7 @@ logrus.RegisterExitHandler(handler) ...@@ -493,7 +445,7 @@ logrus.RegisterExitHandler(handler)
#### Thread safety #### Thread safety
By default Logger is protected by mutex for concurrent writes, this mutex is invoked when calling hooks and writing logs. By default, Logger is protected by a mutex for concurrent writes. The mutex is held when calling hooks and writing logs.
If you are sure such locking is not needed, you can call logger.SetNoLock() to disable the locking. If you are sure such locking is not needed, you can call logger.SetNoLock() to disable the locking.
Situation when locking is not needed includes: Situation when locking is not needed includes:
......
...@@ -48,7 +48,7 @@ type Entry struct { ...@@ -48,7 +48,7 @@ type Entry struct {
func NewEntry(logger *Logger) *Entry { func NewEntry(logger *Logger) *Entry {
return &Entry{ return &Entry{
Logger: logger, Logger: logger,
// Default is three fields, give a little extra room // Default is five fields, give a little extra room
Data: make(Fields, 5), Data: make(Fields, 5),
} }
} }
...@@ -83,49 +83,70 @@ func (entry *Entry) WithFields(fields Fields) *Entry { ...@@ -83,49 +83,70 @@ func (entry *Entry) WithFields(fields Fields) *Entry {
for k, v := range fields { for k, v := range fields {
data[k] = v data[k] = v
} }
return &Entry{Logger: entry.Logger, Data: data} return &Entry{Logger: entry.Logger, Data: data, Time: entry.Time}
}
// Overrides the time of the Entry.
func (entry *Entry) WithTime(t time.Time) *Entry {
return &Entry{Logger: entry.Logger, Data: entry.Data, Time: t}
} }
// This function is not declared with a pointer value because otherwise // This function is not declared with a pointer value because otherwise
// race conditions will occur when using multiple goroutines // race conditions will occur when using multiple goroutines
func (entry Entry) log(level Level, msg string) { func (entry Entry) log(level Level, msg string) {
var buffer *bytes.Buffer var buffer *bytes.Buffer
// Default to now, but allow users to override if they want.
//
// We don't have to worry about polluting future calls to Entry#log()
// with this assignment because this function is declared with a
// non-pointer receiver.
if entry.Time.IsZero() {
entry.Time = time.Now() entry.Time = time.Now()
}
entry.Level = level entry.Level = level
entry.Message = msg entry.Message = msg
entry.Logger.mu.Lock() entry.fireHooks()
err := entry.Logger.Hooks.Fire(level, &entry)
entry.Logger.mu.Unlock()
if err != nil {
entry.Logger.mu.Lock()
fmt.Fprintf(os.Stderr, "Failed to fire hook: %v\n", err)
entry.Logger.mu.Unlock()
}
buffer = bufferPool.Get().(*bytes.Buffer) buffer = bufferPool.Get().(*bytes.Buffer)
buffer.Reset() buffer.Reset()
defer bufferPool.Put(buffer) defer bufferPool.Put(buffer)
entry.Buffer = buffer entry.Buffer = buffer
serialized, err := entry.Logger.Formatter.Format(&entry)
entry.write()
entry.Buffer = nil entry.Buffer = nil
// To avoid Entry#log() returning a value that only would make sense for
// panic() to use in Entry#Panic(), we avoid the allocation by checking
// directly here.
if level <= PanicLevel {
panic(&entry)
}
}
func (entry *Entry) fireHooks() {
entry.Logger.mu.Lock()
defer entry.Logger.mu.Unlock()
err := entry.Logger.Hooks.Fire(entry.Level, entry)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Failed to fire hook: %v\n", err)
}
}
func (entry *Entry) write() {
serialized, err := entry.Logger.Formatter.Format(entry)
entry.Logger.mu.Lock() entry.Logger.mu.Lock()
defer entry.Logger.mu.Unlock()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to obtain reader, %v\n", err) fmt.Fprintf(os.Stderr, "Failed to obtain reader, %v\n", err)
entry.Logger.mu.Unlock()
} else { } else {
entry.Logger.mu.Lock()
_, err = entry.Logger.Out.Write(serialized) _, err = entry.Logger.Out.Write(serialized)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Failed to write to log, %v\n", err) fmt.Fprintf(os.Stderr, "Failed to write to log, %v\n", err)
} }
entry.Logger.mu.Unlock()
}
// To avoid Entry#log() returning a value that only would make sense for
// panic() to use in Entry#Panic(), we avoid the allocation by checking
// directly here.
if level <= PanicLevel {
panic(&entry)
} }
} }
......
...@@ -2,6 +2,7 @@ package logrus ...@@ -2,6 +2,7 @@ package logrus
import ( import (
"io" "io"
"time"
) )
var ( var (
...@@ -15,9 +16,7 @@ func StandardLogger() *Logger { ...@@ -15,9 +16,7 @@ func StandardLogger() *Logger {
// SetOutput sets the standard logger output. // SetOutput sets the standard logger output.
func SetOutput(out io.Writer) { func SetOutput(out io.Writer) {
std.mu.Lock() std.SetOutput(out)
defer std.mu.Unlock()
std.Out = out
} }
// SetFormatter sets the standard logger formatter. // SetFormatter sets the standard logger formatter.
...@@ -72,6 +71,15 @@ func WithFields(fields Fields) *Entry { ...@@ -72,6 +71,15 @@ func WithFields(fields Fields) *Entry {
return std.WithFields(fields) return std.WithFields(fields)
} }
// WithTime creats an entry from the standard logger and overrides the time of
// logs generated with it.
//
// Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
// or Panic on the Entry it returns.
func WithTime(t time.Time) *Entry {
return std.WithTime(t)
}
// Debug logs a message at level Debug on the standard logger. // Debug logs a message at level Debug on the standard logger.
func Debug(args ...interface{}) { func Debug(args ...interface{}) {
std.Debug(args...) std.Debug(args...)
...@@ -107,7 +115,7 @@ func Panic(args ...interface{}) { ...@@ -107,7 +115,7 @@ func Panic(args ...interface{}) {
std.Panic(args...) std.Panic(args...)
} }
// Fatal logs a message at level Fatal on the standard logger. // Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
func Fatal(args ...interface{}) { func Fatal(args ...interface{}) {
std.Fatal(args...) std.Fatal(args...)
} }
...@@ -147,7 +155,7 @@ func Panicf(format string, args ...interface{}) { ...@@ -147,7 +155,7 @@ func Panicf(format string, args ...interface{}) {
std.Panicf(format, args...) std.Panicf(format, args...)
} }
// Fatalf logs a message at level Fatal on the standard logger. // Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
func Fatalf(format string, args ...interface{}) { func Fatalf(format string, args ...interface{}) {
std.Fatalf(format, args...) std.Fatalf(format, args...)
} }
...@@ -187,7 +195,7 @@ func Panicln(args ...interface{}) { ...@@ -187,7 +195,7 @@ func Panicln(args ...interface{}) {
std.Panicln(args...) std.Panicln(args...)
} }
// Fatalln logs a message at level Fatal on the standard logger. // Fatalln logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
func Fatalln(args ...interface{}) { func Fatalln(args ...interface{}) {
std.Fatalln(args...) std.Fatalln(args...)
} }
...@@ -30,16 +30,22 @@ type Formatter interface { ...@@ -30,16 +30,22 @@ type Formatter interface {
// //
// It's not exported because it's still using Data in an opinionated way. It's to // It's not exported because it's still using Data in an opinionated way. It's to
// avoid code duplication between the two default formatters. // avoid code duplication between the two default formatters.
func prefixFieldClashes(data Fields) { func prefixFieldClashes(data Fields, fieldMap FieldMap) {
if t, ok := data["time"]; ok { timeKey := fieldMap.resolve(FieldKeyTime)
data["fields.time"] = t if t, ok := data[timeKey]; ok {
data["fields."+timeKey] = t
delete(data, timeKey)
} }
if m, ok := data["msg"]; ok { msgKey := fieldMap.resolve(FieldKeyMsg)
data["fields.msg"] = m if m, ok := data[msgKey]; ok {
data["fields."+msgKey] = m
delete(data, msgKey)
} }
if l, ok := data["level"]; ok { levelKey := fieldMap.resolve(FieldKeyLevel)
data["fields.level"] = l if l, ok := data[levelKey]; ok {
data["fields."+levelKey] = l
delete(data, levelKey)
} }
} }
...@@ -33,6 +33,9 @@ type JSONFormatter struct { ...@@ -33,6 +33,9 @@ type JSONFormatter struct {
// DisableTimestamp allows disabling automatic timestamps in output // DisableTimestamp allows disabling automatic timestamps in output
DisableTimestamp bool DisableTimestamp bool
// DataKey allows users to put all the log entry parameters into a nested dictionary at a given key.
DataKey string
// FieldMap allows users to customize the names of keys for default fields. // FieldMap allows users to customize the names of keys for default fields.
// As an example: // As an example:
// formatter := &JSONFormatter{ // formatter := &JSONFormatter{
...@@ -58,7 +61,14 @@ func (f *JSONFormatter) Format(entry *Entry) ([]byte, error) { ...@@ -58,7 +61,14 @@ func (f *JSONFormatter) Format(entry *Entry) ([]byte, error) {
data[k] = v data[k] = v
} }
} }
prefixFieldClashes(data)
if f.DataKey != "" {
newData := make(Fields, 4)
newData[f.DataKey] = data
data = newData
}
prefixFieldClashes(data, f.FieldMap)
timestampFormat := f.TimestampFormat timestampFormat := f.TimestampFormat
if timestampFormat == "" { if timestampFormat == "" {
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"os" "os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )
type Logger struct { type Logger struct {
...@@ -88,7 +89,7 @@ func (logger *Logger) releaseEntry(entry *Entry) { ...@@ -88,7 +89,7 @@ func (logger *Logger) releaseEntry(entry *Entry) {
} }
// Adds a field to the log entry, note that it doesn't log until you call // Adds a field to the log entry, note that it doesn't log until you call
// Debug, Print, Info, Warn, Fatal or Panic. It only creates a log entry. // Debug, Print, Info, Warn, Error, Fatal or Panic. It only creates a log entry.
// If you want multiple fields, use `WithFields`. // If you want multiple fields, use `WithFields`.
func (logger *Logger) WithField(key string, value interface{}) *Entry { func (logger *Logger) WithField(key string, value interface{}) *Entry {
entry := logger.newEntry() entry := logger.newEntry()
...@@ -112,6 +113,13 @@ func (logger *Logger) WithError(err error) *Entry { ...@@ -112,6 +113,13 @@ func (logger *Logger) WithError(err error) *Entry {
return entry.WithError(err) return entry.WithError(err)
} }
// Overrides the time of the log entry.
func (logger *Logger) WithTime(t time.Time) *Entry {
entry := logger.newEntry()
defer logger.releaseEntry(entry)
return entry.WithTime(t)
}
func (logger *Logger) Debugf(format string, args ...interface{}) { func (logger *Logger) Debugf(format string, args ...interface{}) {
if logger.level() >= DebugLevel { if logger.level() >= DebugLevel {
entry := logger.newEntry() entry := logger.newEntry()
...@@ -316,6 +324,12 @@ func (logger *Logger) SetLevel(level Level) { ...@@ -316,6 +324,12 @@ func (logger *Logger) SetLevel(level Level) {
atomic.StoreUint32((*uint32)(&logger.Level), uint32(level)) atomic.StoreUint32((*uint32)(&logger.Level), uint32(level))
} }
func (logger *Logger) SetOutput(out io.Writer) {
logger.mu.Lock()
defer logger.mu.Unlock()
logger.Out = out
}
func (logger *Logger) AddHook(hook Hook) { func (logger *Logger) AddHook(hook Hook) {
logger.mu.Lock() logger.mu.Lock()
defer logger.mu.Unlock() defer logger.mu.Unlock()
......
// +build darwin freebsd openbsd netbsd dragonfly // +build darwin freebsd openbsd netbsd dragonfly
// +build !appengine // +build !appengine,!gopherjs
package logrus package logrus
......
// +build appengine // +build appengine gopherjs
package logrus package logrus
......
// +build !appengine // +build !appengine,!gopherjs
package logrus package logrus
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// +build !appengine // +build !appengine,!gopherjs
package logrus package logrus
......
...@@ -20,6 +20,7 @@ const ( ...@@ -20,6 +20,7 @@ const (
var ( var (
baseTimestamp time.Time baseTimestamp time.Time
emptyFieldMap FieldMap
) )
func init() { func init() {
...@@ -50,12 +51,24 @@ type TextFormatter struct { ...@@ -50,12 +51,24 @@ type TextFormatter struct {
// be desired. // be desired.
DisableSorting bool DisableSorting bool
// Disables the truncation of the level text to 4 characters.
DisableLevelTruncation bool
// QuoteEmptyFields will wrap empty fields in quotes if true // QuoteEmptyFields will wrap empty fields in quotes if true
QuoteEmptyFields bool QuoteEmptyFields bool
// Whether the logger's out is to a terminal // Whether the logger's out is to a terminal
isTerminal bool isTerminal bool
// FieldMap allows users to customize the names of keys for default fields.
// As an example:
// formatter := &TextFormatter{
// FieldMap: FieldMap{
// FieldKeyTime: "@timestamp",
// FieldKeyLevel: "@level",
// FieldKeyMsg: "@message"}}
FieldMap FieldMap
sync.Once sync.Once
} }
...@@ -67,7 +80,8 @@ func (f *TextFormatter) init(entry *Entry) { ...@@ -67,7 +80,8 @@ func (f *TextFormatter) init(entry *Entry) {
// Format renders a single log entry // Format renders a single log entry
func (f *TextFormatter) Format(entry *Entry) ([]byte, error) { func (f *TextFormatter) Format(entry *Entry) ([]byte, error) {
var b *bytes.Buffer prefixFieldClashes(entry.Data, f.FieldMap)
keys := make([]string, 0, len(entry.Data)) keys := make([]string, 0, len(entry.Data))
for k := range entry.Data { for k := range entry.Data {
keys = append(keys, k) keys = append(keys, k)
...@@ -76,14 +90,14 @@ func (f *TextFormatter) Format(entry *Entry) ([]byte, error) { ...@@ -76,14 +90,14 @@ func (f *TextFormatter) Format(entry *Entry) ([]byte, error) {
if !f.DisableSorting { if !f.DisableSorting {
sort.Strings(keys) sort.Strings(keys)
} }
var b *bytes.Buffer
if entry.Buffer != nil { if entry.Buffer != nil {
b = entry.Buffer b = entry.Buffer
} else { } else {
b = &bytes.Buffer{} b = &bytes.Buffer{}
} }
prefixFieldClashes(entry.Data)
f.Do(func() { f.init(entry) }) f.Do(func() { f.init(entry) })
isColored := (f.ForceColors || f.isTerminal) && !f.DisableColors isColored := (f.ForceColors || f.isTerminal) && !f.DisableColors
...@@ -96,11 +110,11 @@ func (f *TextFormatter) Format(entry *Entry) ([]byte, error) { ...@@ -96,11 +110,11 @@ func (f *TextFormatter) Format(entry *Entry) ([]byte, error) {
f.printColored(b, entry, keys, timestampFormat) f.printColored(b, entry, keys, timestampFormat)
} else { } else {
if !f.DisableTimestamp { if !f.DisableTimestamp {
f.appendKeyValue(b, "time", entry.Time.Format(timestampFormat)) f.appendKeyValue(b, f.FieldMap.resolve(FieldKeyTime), entry.Time.Format(timestampFormat))
} }
f.appendKeyValue(b, "level", entry.Level.String()) f.appendKeyValue(b, f.FieldMap.resolve(FieldKeyLevel), entry.Level.String())
if entry.Message != "" { if entry.Message != "" {
f.appendKeyValue(b, "msg", entry.Message) f.appendKeyValue(b, f.FieldMap.resolve(FieldKeyMsg), entry.Message)
} }
for _, key := range keys { for _, key := range keys {
f.appendKeyValue(b, key, entry.Data[key]) f.appendKeyValue(b, key, entry.Data[key])
...@@ -124,7 +138,10 @@ func (f *TextFormatter) printColored(b *bytes.Buffer, entry *Entry, keys []strin ...@@ -124,7 +138,10 @@ func (f *TextFormatter) printColored(b *bytes.Buffer, entry *Entry, keys []strin
levelColor = blue levelColor = blue
} }
levelText := strings.ToUpper(entry.Level.String())[0:4] levelText := strings.ToUpper(entry.Level.String())
if !f.DisableLevelTruncation {
levelText = levelText[0:4]
}
if f.DisableTimestamp { if f.DisableTimestamp {
fmt.Fprintf(b, "\x1b[%dm%s\x1b[0m %-44s ", levelColor, levelText, entry.Message) fmt.Fprintf(b, "\x1b[%dm%s\x1b[0m %-44s ", levelColor, levelText, entry.Message)
......
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
// Package ctxhttp provides helper functions for performing context-aware HTTP requests.
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
// Do sends an HTTP request with the provided http.Client and returns
// an HTTP response.
//
// If the client is nil, http.DefaultClient is used.
//
// The provided ctx must be non-nil. If it is canceled or times out,
// ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
resp, err := client.Do(req.WithContext(ctx))
// If we got an error, and the context has been canceled,
// the context's error is probably more useful.
if err != nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
}
}
return resp, err
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
func nop() {}
var (
testHookContextDoneBeforeHeaders = nop
testHookDoReturned = nop
testHookDidBodyClose = nop
)
// Do sends an HTTP request with the provided http.Client and returns an HTTP response.
// If the client is nil, http.DefaultClient is used.
// If the context is canceled or times out, ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
// TODO(djd): Respect any existing value of req.Cancel.
cancel := make(chan struct{})
req.Cancel = cancel
type responseAndError struct {
resp *http.Response
err error
}
result := make(chan responseAndError, 1)
// Make local copies of test hooks closed over by goroutines below.
// Prevents data races in tests.
testHookDoReturned := testHookDoReturned
testHookDidBodyClose := testHookDidBodyClose
go func() {
resp, err := client.Do(req)
testHookDoReturned()
result <- responseAndError{resp, err}
}()
var resp *http.Response
select {
case <-ctx.Done():
testHookContextDoneBeforeHeaders()
close(cancel)
// Clean up after the goroutine calling client.Do:
go func() {
if r := <-result; r.resp != nil {
testHookDidBodyClose()
r.resp.Body.Close()
}
}()
return nil, ctx.Err()
case r := <-result:
var err error
resp, err = r.resp, r.err
if err != nil {
return resp, err
}
}
c := make(chan struct{})
go func() {
select {
case <-ctx.Done():
close(cancel)
case <-c:
// The response's Body is closed.
}
}()
resp.Body = &notifyingReader{resp.Body, c}
return resp, nil
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// notifyingReader is an io.ReadCloser that closes the notify channel after
// Close is called or a Read fails on the underlying ReadCloser.
type notifyingReader struct {
io.ReadCloser
notify chan<- struct{}
}
func (r *notifyingReader) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
if err != nil && r.notify != nil {
close(r.notify)
r.notify = nil
}
return n, err
}
func (r *notifyingReader) Close() error {
err := r.ReadCloser.Close()
if r.notify != nil {
close(r.notify)
r.notify = nil
}
return err
}
...@@ -27,6 +27,10 @@ How to get your contributions merged smoothly and quickly. ...@@ -27,6 +27,10 @@ How to get your contributions merged smoothly and quickly.
- Keep your PR up to date with upstream/master (if there are merge conflicts, we can't really merge your change). - Keep your PR up to date with upstream/master (if there are merge conflicts, we can't really merge your change).
- **All tests need to be passing** before your change can be merged. We recommend you **run tests locally** before creating your PR to catch breakages early on. - **All tests need to be passing** before your change can be merged. We recommend you **run tests locally** before creating your PR to catch breakages early on.
- `make all` to test everything, OR
- `make vet` to catch vet errors
- `make test` to run the tests
- `make testrace` to run tests in race mode
- Exceptions to the rules can be made if there's a compelling reason for doing so. - Exceptions to the rules can be made if there's a compelling reason for doing so.
all: test testrace all: vet test testrace
deps: deps:
go get -d -v google.golang.org/grpc/... go get -d -v google.golang.org/grpc/...
...@@ -22,6 +22,9 @@ proto: ...@@ -22,6 +22,9 @@ proto:
fi fi
go generate google.golang.org/grpc/... go generate google.golang.org/grpc/...
vet:
./vet.sh
test: testdeps test: testdeps
go test -cpu 1,4 -timeout 5m google.golang.org/grpc/... go test -cpu 1,4 -timeout 5m google.golang.org/grpc/...
...@@ -39,7 +42,7 @@ clean: ...@@ -39,7 +42,7 @@ clean:
updatetestdeps \ updatetestdeps \
build \ build \
proto \ proto \
vet \
test \ test \
testrace \ testrace \
clean \ clean
coverage
...@@ -16,8 +16,7 @@ $ go get -u google.golang.org/grpc ...@@ -16,8 +16,7 @@ $ go get -u google.golang.org/grpc
Prerequisites Prerequisites
------------- -------------
This requires Go 1.6 or later. Go 1.7 will be required as of the next gRPC-Go This requires Go 1.6 or later. Go 1.7 will be required soon.
release (1.8).
Constraints Constraints
----------- -----------
......
...@@ -16,10 +16,12 @@ ...@@ -16,10 +16,12 @@
* *
*/ */
// See internal/backoff package for the backoff implementation. This file is
// kept for the exported types and API backward compatility.
package grpc package grpc
import ( import (
"math/rand"
"time" "time"
) )
...@@ -27,70 +29,10 @@ import ( ...@@ -27,70 +29,10 @@ import (
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
var DefaultBackoffConfig = BackoffConfig{ var DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second, MaxDelay: 120 * time.Second,
baseDelay: 1.0 * time.Second,
factor: 1.6,
jitter: 0.2,
}
// backoffStrategy defines the methodology for backing off after a grpc
// connection failure.
//
// This is unexported until the gRPC project decides whether or not to allow
// alternative backoff strategies. Once a decision is made, this type and its
// method may be exported.
type backoffStrategy interface {
// backoff returns the amount of time to wait before the next retry given
// the number of consecutive failures.
backoff(retries int) time.Duration
} }
// BackoffConfig defines the parameters for the default gRPC backoff strategy. // BackoffConfig defines the parameters for the default gRPC backoff strategy.
type BackoffConfig struct { type BackoffConfig struct {
// MaxDelay is the upper bound of backoff delay. // MaxDelay is the upper bound of backoff delay.
MaxDelay time.Duration MaxDelay time.Duration
// TODO(stevvooe): The following fields are not exported, as allowing
// changes would violate the current gRPC specification for backoff. If
// gRPC decides to allow more interesting backoff strategies, these fields
// may be opened up in the future.
// baseDelay is the amount of time to wait before retrying after the first
// failure.
baseDelay time.Duration
// factor is applied to the backoff after each retry.
factor float64
// jitter provides a range to randomize backoff delays.
jitter float64
}
func setDefaults(bc *BackoffConfig) {
md := bc.MaxDelay
*bc = DefaultBackoffConfig
if md > 0 {
bc.MaxDelay = md
}
}
func (bc BackoffConfig) backoff(retries int) time.Duration {
if retries == 0 {
return bc.baseDelay
}
backoff, max := float64(bc.baseDelay), float64(bc.MaxDelay)
for backoff < max && retries > 0 {
backoff *= bc.factor
retries--
}
if backoff > max {
backoff = max
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
backoff *= 1 + bc.jitter*(rand.Float64()*2-1)
if backoff < 0 {
return 0
}
return time.Duration(backoff)
} }
...@@ -32,7 +32,8 @@ import ( ...@@ -32,7 +32,8 @@ import (
) )
// Address represents a server the client connects to. // Address represents a server the client connects to.
// This is the EXPERIMENTAL API and may be changed or extended in the future. //
// Deprecated: please use package balancer.
type Address struct { type Address struct {
// Addr is the server address on which a connection will be established. // Addr is the server address on which a connection will be established.
Addr string Addr string
...@@ -42,6 +43,8 @@ type Address struct { ...@@ -42,6 +43,8 @@ type Address struct {
} }
// BalancerConfig specifies the configurations for Balancer. // BalancerConfig specifies the configurations for Balancer.
//
// Deprecated: please use package balancer.
type BalancerConfig struct { type BalancerConfig struct {
// DialCreds is the transport credential the Balancer implementation can // DialCreds is the transport credential the Balancer implementation can
// use to dial to a remote load balancer server. The Balancer implementations // use to dial to a remote load balancer server. The Balancer implementations
...@@ -54,7 +57,8 @@ type BalancerConfig struct { ...@@ -54,7 +57,8 @@ type BalancerConfig struct {
} }
// BalancerGetOptions configures a Get call. // BalancerGetOptions configures a Get call.
// This is the EXPERIMENTAL API and may be changed or extended in the future. //
// Deprecated: please use package balancer.
type BalancerGetOptions struct { type BalancerGetOptions struct {
// BlockingWait specifies whether Get should block when there is no // BlockingWait specifies whether Get should block when there is no
// connected address. // connected address.
...@@ -62,7 +66,8 @@ type BalancerGetOptions struct { ...@@ -62,7 +66,8 @@ type BalancerGetOptions struct {
} }
// Balancer chooses network addresses for RPCs. // Balancer chooses network addresses for RPCs.
// This is the EXPERIMENTAL API and may be changed or extended in the future. //
// Deprecated: please use package balancer.
type Balancer interface { type Balancer interface {
// Start does the initialization work to bootstrap a Balancer. For example, // Start does the initialization work to bootstrap a Balancer. For example,
// this function may start the name resolution and watch the updates. It will // this function may start the name resolution and watch the updates. It will
...@@ -135,6 +140,8 @@ func downErrorf(timeout, temporary bool, format string, a ...interface{}) downEr ...@@ -135,6 +140,8 @@ func downErrorf(timeout, temporary bool, format string, a ...interface{}) downEr
// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch // RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
// the name resolution updates and updates the addresses available correspondingly. // the name resolution updates and updates the addresses available correspondingly.
//
// Deprecated: please use package balancer/roundrobin.
func RoundRobin(r naming.Resolver) Balancer { func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r} return &roundRobin{r: r}
} }
......
...@@ -36,9 +36,12 @@ var ( ...@@ -36,9 +36,12 @@ var (
m = make(map[string]Builder) m = make(map[string]Builder)
) )
// Register registers the balancer builder to the balancer map. // Register registers the balancer builder to the balancer map. b.Name
// b.Name (lowercased) will be used as the name registered with // (lowercased) will be used as the name registered with this builder.
// this builder. //
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Balancers are
// registered with the same name, the one registered last will take effect.
func Register(b Builder) { func Register(b Builder) {
m[strings.ToLower(b.Name())] = b m[strings.ToLower(b.Name())] = b
} }
...@@ -126,6 +129,8 @@ type BuildOptions struct { ...@@ -126,6 +129,8 @@ type BuildOptions struct {
// to a remote load balancer server. The Balancer implementations // to a remote load balancer server. The Balancer implementations
// can ignore this if it doesn't need to talk to remote balancer. // can ignore this if it doesn't need to talk to remote balancer.
Dialer func(context.Context, string) (net.Conn, error) Dialer func(context.Context, string) (net.Conn, error)
// ChannelzParentID is the entity parent's channelz unique identification number.
ChannelzParentID int64
} }
// Builder creates a balancer. // Builder creates a balancer.
...@@ -160,7 +165,7 @@ var ( ...@@ -160,7 +165,7 @@ var (
) )
// Picker is used by gRPC to pick a SubConn to send an RPC. // Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot everytime its // Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed. // internal state has changed.
// //
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState(). // The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
...@@ -221,3 +226,45 @@ type Balancer interface { ...@@ -221,3 +226,45 @@ type Balancer interface {
// ClientConn.RemoveSubConn for its existing SubConns. // ClientConn.RemoveSubConn for its existing SubConns.
Close() Close()
} }
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
//
// It's not thread safe.
type ConnectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
// RecordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
//
// Idle and Shutdown are not considered.
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}
...@@ -146,7 +146,6 @@ func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectiv ...@@ -146,7 +146,6 @@ func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectiv
} }
b.cc.UpdateBalancerState(b.state, b.picker) b.cc.UpdateBalancerState(b.state, b.picker)
return
} }
// Close is a nop because base balancer doesn't have internal state to clean up, // Close is a nop because base balancer doesn't have internal state to clean up,
......
...@@ -115,7 +115,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui ...@@ -115,7 +115,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
return ccb return ccb
} }
// watcher balancer functions sequencially, so the balancer can be implemeneted // watcher balancer functions sequentially, so the balancer can be implemented
// lock-free. // lock-free.
func (ccb *ccBalancerWrapper) watcher() { func (ccb *ccBalancerWrapper) watcher() {
for { for {
......
...@@ -55,7 +55,7 @@ func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.B ...@@ -55,7 +55,7 @@ func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.B
startCh: make(chan struct{}), startCh: make(chan struct{}),
conns: make(map[resolver.Address]balancer.SubConn), conns: make(map[resolver.Address]balancer.SubConn),
connSt: make(map[balancer.SubConn]*scState), connSt: make(map[balancer.SubConn]*scState),
csEvltr: &connectivityStateEvaluator{}, csEvltr: &balancer.ConnectivityStateEvaluator{},
state: connectivity.Idle, state: connectivity.Idle,
} }
cc.UpdateBalancerState(connectivity.Idle, bw) cc.UpdateBalancerState(connectivity.Idle, bw)
...@@ -80,10 +80,6 @@ type balancerWrapper struct { ...@@ -80,10 +80,6 @@ type balancerWrapper struct {
cc balancer.ClientConn cc balancer.ClientConn
targetAddr string // Target without the scheme. targetAddr string // Target without the scheme.
// To aggregate the connectivity state.
csEvltr *connectivityStateEvaluator
state connectivity.State
mu sync.Mutex mu sync.Mutex
conns map[resolver.Address]balancer.SubConn conns map[resolver.Address]balancer.SubConn
connSt map[balancer.SubConn]*scState connSt map[balancer.SubConn]*scState
...@@ -92,6 +88,10 @@ type balancerWrapper struct { ...@@ -92,6 +88,10 @@ type balancerWrapper struct {
// - NewSubConn is created, cc wants to notify balancer of state changes; // - NewSubConn is created, cc wants to notify balancer of state changes;
// - Build hasn't return, cc doesn't have access to balancer. // - Build hasn't return, cc doesn't have access to balancer.
startCh chan struct{} startCh chan struct{}
// To aggregate the connectivity state.
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
} }
// lbWatcher watches the Notify channel of the balancer and manages // lbWatcher watches the Notify channel of the balancer and manages
...@@ -248,7 +248,7 @@ func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s conne ...@@ -248,7 +248,7 @@ func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s conne
scSt.down(errConnClosing) scSt.down(errConnClosing)
} }
} }
sa := bw.csEvltr.recordTransition(oldS, s) sa := bw.csEvltr.RecordTransition(oldS, s)
if bw.state != sa { if bw.state != sa {
bw.state = sa bw.state = sa
} }
...@@ -257,7 +257,6 @@ func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s conne ...@@ -257,7 +257,6 @@ func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s conne
// Remove state for this sc. // Remove state for this sc.
delete(bw.connSt, sc) delete(bw.connSt, sc)
} }
return
} }
func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) { func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
...@@ -270,7 +269,6 @@ func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) { ...@@ -270,7 +269,6 @@ func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
} }
// There should be a resolver inside the balancer. // There should be a resolver inside the balancer.
// All updates here, if any, are ignored. // All updates here, if any, are ignored.
return
} }
func (bw *balancerWrapper) Close() { func (bw *balancerWrapper) Close() {
...@@ -282,7 +280,6 @@ func (bw *balancerWrapper) Close() { ...@@ -282,7 +280,6 @@ func (bw *balancerWrapper) Close() {
close(bw.startCh) close(bw.startCh)
} }
bw.balancer.Close() bw.balancer.Close()
return
} }
// The picker is the balancerWrapper itself. // The picker is the balancerWrapper itself.
...@@ -329,47 +326,3 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) ...@@ -329,47 +326,3 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions)
return sc, done, nil return sc, done, nil
} }
// connectivityStateEvaluator gets updated by addrConns when their
// states transition, based on which it evaluates the state of
// ClientConn.
type connectivityStateEvaluator struct {
mu sync.Mutex
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
// recordTransition records state change happening in every subConn and based on
// that it evaluates what aggregated state should be.
// It can only transition between Ready, Connecting and TransientFailure. Other states,
// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
// before any subConn is created ClientConn is in idle state. In the end when ClientConn
// closes it is in Shutdown state.
// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
cse.mu.Lock()
defer cse.mu.Unlock()
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}
This diff is collapsed.
This diff is collapsed.
...@@ -19,96 +19,32 @@ ...@@ -19,96 +19,32 @@
package grpc package grpc
import ( import (
"math" "google.golang.org/grpc/encoding"
"sync" _ "google.golang.org/grpc/encoding/proto" // to register the Codec for "proto"
"github.com/golang/protobuf/proto"
) )
// baseCodec contains the functionality of both Codec and encoding.Codec, but
// omits the name/string, which vary between the two and are not needed for
// anything besides the registry in the encoding package.
type baseCodec interface {
Marshal(v interface{}) ([]byte, error)
Unmarshal(data []byte, v interface{}) error
}
var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)
// Codec defines the interface gRPC uses to encode and decode messages. // Codec defines the interface gRPC uses to encode and decode messages.
// Note that implementations of this interface must be thread safe; // Note that implementations of this interface must be thread safe;
// a Codec's methods can be called from concurrent goroutines. // a Codec's methods can be called from concurrent goroutines.
//
// Deprecated: use encoding.Codec instead.
type Codec interface { type Codec interface {
// Marshal returns the wire format of v. // Marshal returns the wire format of v.
Marshal(v interface{}) ([]byte, error) Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the wire format into v. // Unmarshal parses the wire format into v.
Unmarshal(data []byte, v interface{}) error Unmarshal(data []byte, v interface{}) error
// String returns the name of the Codec implementation. The returned // String returns the name of the Codec implementation. This is unused by
// string will be used as part of content type in transmission. // gRPC.
String() string String() string
} }
// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC.
type protoCodec struct {
}
type cachedProtoBuffer struct {
lastMarshaledSize uint32
proto.Buffer
}
func capToMaxInt32(val int) uint32 {
if val > math.MaxInt32 {
return uint32(math.MaxInt32)
}
return uint32(val)
}
func (p protoCodec) marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
protoMsg := v.(proto.Message)
newSlice := make([]byte, 0, cb.lastMarshaledSize)
cb.SetBuf(newSlice)
cb.Reset()
if err := cb.Marshal(protoMsg); err != nil {
return nil, err
}
out := cb.Bytes()
cb.lastMarshaledSize = capToMaxInt32(len(out))
return out, nil
}
func (p protoCodec) Marshal(v interface{}) ([]byte, error) {
if pm, ok := v.(proto.Marshaler); ok {
// object can marshal itself, no need for buffer
return pm.Marshal()
}
cb := protoBufferPool.Get().(*cachedProtoBuffer)
out, err := p.marshal(v, cb)
// put back buffer and lose the ref to the slice
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return out, err
}
func (p protoCodec) Unmarshal(data []byte, v interface{}) error {
protoMsg := v.(proto.Message)
protoMsg.Reset()
if pu, ok := protoMsg.(proto.Unmarshaler); ok {
// object can unmarshal itself, no need for buffer
return pu.Unmarshal(data)
}
cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.SetBuf(data)
err := cb.Unmarshal(protoMsg)
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return err
}
func (protoCodec) String() string {
return "proto"
}
var protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
},
}
...@@ -19,8 +19,10 @@ ...@@ -19,8 +19,10 @@
// Package codes defines the canonical error codes used by gRPC. It is // Package codes defines the canonical error codes used by gRPC. It is
// consistent across various languages. // consistent across various languages.
package codes // import "google.golang.org/grpc/codes" package codes // import "google.golang.org/grpc/codes"
import ( import (
"fmt" "fmt"
"strconv"
) )
// A Code is an unsigned 32-bit error code as defined in the gRPC spec. // A Code is an unsigned 32-bit error code as defined in the gRPC spec.
...@@ -69,10 +71,6 @@ const ( ...@@ -69,10 +71,6 @@ const (
// instead for those errors). // instead for those errors).
PermissionDenied Code = 7 PermissionDenied Code = 7
// Unauthenticated indicates the request does not have valid
// authentication credentials for the operation.
Unauthenticated Code = 16
// ResourceExhausted indicates some resource has been exhausted, perhaps // ResourceExhausted indicates some resource has been exhausted, perhaps
// a per-user quota, or perhaps the entire file system is out of space. // a per-user quota, or perhaps the entire file system is out of space.
ResourceExhausted Code = 8 ResourceExhausted Code = 8
...@@ -142,6 +140,12 @@ const ( ...@@ -142,6 +140,12 @@ const (
// DataLoss indicates unrecoverable data loss or corruption. // DataLoss indicates unrecoverable data loss or corruption.
DataLoss Code = 15 DataLoss Code = 15
// Unauthenticated indicates the request does not have valid
// authentication credentials for the operation.
Unauthenticated Code = 16
_maxCode = 17
) )
var strToCode = map[string]Code{ var strToCode = map[string]Code{
...@@ -175,6 +179,16 @@ func (c *Code) UnmarshalJSON(b []byte) error { ...@@ -175,6 +179,16 @@ func (c *Code) UnmarshalJSON(b []byte) error {
if c == nil { if c == nil {
return fmt.Errorf("nil receiver passed to UnmarshalJSON") return fmt.Errorf("nil receiver passed to UnmarshalJSON")
} }
if ci, err := strconv.ParseUint(string(b), 10, 32); err == nil {
if ci >= _maxCode {
return fmt.Errorf("invalid code: %q", ci)
}
*c = Code(ci)
return nil
}
if jc, ok := strToCode[string(b)]; ok { if jc, ok := strToCode[string(b)]; ok {
*c = jc *c = jc
return nil return nil
......
...@@ -43,8 +43,9 @@ type PerRPCCredentials interface { ...@@ -43,8 +43,9 @@ type PerRPCCredentials interface {
// GetRequestMetadata gets the current request metadata, refreshing // GetRequestMetadata gets the current request metadata, refreshing
// tokens if required. This should be called by the transport layer on // tokens if required. This should be called by the transport layer on
// each request, and the data should be populated in headers or other // each request, and the data should be populated in headers or other
// context. uri is the URI of the entry point for the request. When // context. If a status code is returned, it will be used as the status
// supported by the underlying implementation, ctx can be used for // for the RPC. uri is the URI of the entry point for the request.
// When supported by the underlying implementation, ctx can be used for
// timeout and cancellation. // timeout and cancellation.
// TODO(zhaoq): Define the set of the qualified keys instead of leaving // TODO(zhaoq): Define the set of the qualified keys instead of leaving
// it as an arbitrary string. // it as an arbitrary string.
......
...@@ -16,46 +16,103 @@ ...@@ -16,46 +16,103 @@
* *
*/ */
// Package encoding defines the interface for the compressor and the functions // Package encoding defines the interface for the compressor and codec, and
// to register and get the compossor. // functions to register and retrieve compressors and codecs.
//
// This package is EXPERIMENTAL. // This package is EXPERIMENTAL.
package encoding package encoding
import ( import (
"io" "io"
"strings"
) )
var registerCompressor = make(map[string]Compressor) // Identity specifies the optional encoding for uncompressed streams.
// It is intended for grpc internal use only.
const Identity = "identity"
// Compressor is used for compressing and decompressing when sending or receiving messages. // Compressor is used for compressing and decompressing when sending or
// receiving messages.
type Compressor interface { type Compressor interface {
// Compress writes the data written to wc to w after compressing it. If an error // Compress writes the data written to wc to w after compressing it. If an
// occurs while initializing the compressor, that error is returned instead. // error occurs while initializing the compressor, that error is returned
// instead.
Compress(w io.Writer) (io.WriteCloser, error) Compress(w io.Writer) (io.WriteCloser, error)
// Decompress reads data from r, decompresses it, and provides the uncompressed data // Decompress reads data from r, decompresses it, and provides the
// via the returned io.Reader. If an error occurs while initializing the decompressor, that error // uncompressed data via the returned io.Reader. If an error occurs while
// is returned instead. // initializing the decompressor, that error is returned instead.
Decompress(r io.Reader) (io.Reader, error) Decompress(r io.Reader) (io.Reader, error)
// Name is the name of the compression codec and is used to set the content coding header. // Name is the name of the compression codec and is used to set the content
// coding header. The result must be static; the result cannot change
// between calls.
Name() string Name() string
} }
// RegisterCompressor registers the compressor with gRPC by its name. It can be activated when var registeredCompressor = make(map[string]Compressor)
// sending an RPC via grpc.UseCompressor(). It will be automatically accessed when receiving a
// message based on the content coding header. Servers also use it to send a response with the // RegisterCompressor registers the compressor with gRPC by its name. It can
// same encoding as the request. // be activated when sending an RPC via grpc.UseCompressor(). It will be
// automatically accessed when receiving a message based on the content coding
// header. Servers also use it to send a response with the same encoding as
// the request.
// //
// NOTE: this function must only be called during initialization time (i.e. in an init() function). If // NOTE: this function must only be called during initialization time (i.e. in
// multiple Compressors are registered with the same name, the one registered last will take effect. // an init() function), and is not thread-safe. If multiple Compressors are
// registered with the same name, the one registered last will take effect.
func RegisterCompressor(c Compressor) { func RegisterCompressor(c Compressor) {
registerCompressor[c.Name()] = c registeredCompressor[c.Name()] = c
} }
// GetCompressor returns Compressor for the given compressor name. // GetCompressor returns Compressor for the given compressor name.
func GetCompressor(name string) Compressor { func GetCompressor(name string) Compressor {
return registerCompressor[name] return registeredCompressor[name]
} }
// Identity specifies the optional encoding for uncompressed streams. // Codec defines the interface gRPC uses to encode and decode messages. Note
// It is intended for grpc internal use only. // that implementations of this interface must be thread safe; a Codec's
const Identity = "identity" // methods can be called from concurrent goroutines.
type Codec interface {
// Marshal returns the wire format of v.
Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the wire format into v.
Unmarshal(data []byte, v interface{}) error
// Name returns the name of the Codec implementation. The returned string
// will be used as part of content type in transmission. The result must be
// static; the result cannot change between calls.
Name() string
}
var registeredCodecs = make(map[string]Codec)
// RegisterCodec registers the provided Codec for use with all gRPC clients and
// servers.
//
// The Codec will be stored and looked up by result of its Name() method, which
// should match the content-subtype of the encoding handled by the Codec. This
// is case-insensitive, and is stored and looked up as lowercase. If the
// result of calling Name() is an empty string, RegisterCodec will panic. See
// Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Compressors are
// registered with the same name, the one registered last will take effect.
func RegisterCodec(codec Codec) {
if codec == nil {
panic("cannot register a nil Codec")
}
contentSubtype := strings.ToLower(codec.Name())
if contentSubtype == "" {
panic("cannot register Codec with empty string result for String()")
}
registeredCodecs[contentSubtype] = codec
}
// GetCodec gets a registered Codec by content-subtype, or nil if no Codec is
// registered for the content-subtype.
//
// The content-subtype is expected to be lowercase.
func GetCodec(contentSubtype string) Codec {
return registeredCodecs[contentSubtype]
}
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package proto defines the protobuf codec. Importing this package will
// register the codec.
package proto
import (
"math"
"sync"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/encoding"
)
// Name is the name registered for the proto compressor.
const Name = "proto"
func init() {
encoding.RegisterCodec(codec{})
}
// codec is a Codec implementation with protobuf. It is the default codec for gRPC.
type codec struct{}
type cachedProtoBuffer struct {
lastMarshaledSize uint32
proto.Buffer
}
func capToMaxInt32(val int) uint32 {
if val > math.MaxInt32 {
return uint32(math.MaxInt32)
}
return uint32(val)
}
func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
protoMsg := v.(proto.Message)
newSlice := make([]byte, 0, cb.lastMarshaledSize)
cb.SetBuf(newSlice)
cb.Reset()
if err := cb.Marshal(protoMsg); err != nil {
return nil, err
}
out := cb.Bytes()
cb.lastMarshaledSize = capToMaxInt32(len(out))
return out, nil
}
func (codec) Marshal(v interface{}) ([]byte, error) {
if pm, ok := v.(proto.Marshaler); ok {
// object can marshal itself, no need for buffer
return pm.Marshal()
}
cb := protoBufferPool.Get().(*cachedProtoBuffer)
out, err := marshal(v, cb)
// put back buffer and lose the ref to the slice
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return out, err
}
func (codec) Unmarshal(data []byte, v interface{}) error {
protoMsg := v.(proto.Message)
protoMsg.Reset()
if pu, ok := protoMsg.(proto.Unmarshaler); ok {
// object can unmarshal itself, no need for buffer
return pu.Unmarshal(data)
}
cb := protoBufferPool.Get().(*cachedProtoBuffer)
cb.SetBuf(data)
err := cb.Unmarshal(protoMsg)
cb.SetBuf(nil)
protoBufferPool.Put(cb)
return err
}
func (codec) Name() string {
return Name
}
var protoBufferPool = &sync.Pool{
New: func() interface{} {
return &cachedProtoBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
},
}
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"os"
"strings"
)
const (
envConfigPrefix = "GRPC_GO_"
envConfigStickinessStr = envConfigPrefix + "STICKINESS"
)
var (
envConfigStickinessOn bool
)
func init() {
envConfigStickinessOn = strings.EqualFold(os.Getenv(envConfigStickinessStr), "on")
}
...@@ -25,7 +25,6 @@ import ( ...@@ -25,7 +25,6 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"os"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
...@@ -48,6 +47,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro ...@@ -48,6 +47,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro
// toRPCErr converts an error into an error from the status package. // toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error { func toRPCErr(err error) error {
if err == nil || err == io.EOF {
return err
}
if _, ok := status.FromError(err); ok { if _, ok := status.FromError(err); ok {
return err return err
} }
...@@ -62,37 +64,7 @@ func toRPCErr(err error) error { ...@@ -62,37 +64,7 @@ func toRPCErr(err error) error {
return status.Error(codes.DeadlineExceeded, err.Error()) return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled: case context.Canceled:
return status.Error(codes.Canceled, err.Error()) return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
} }
} }
return status.Error(codes.Unknown, err.Error()) return status.Error(codes.Unknown, err.Error())
} }
// convertCode converts a standard Go error into its canonical code. Note that
// this is only used to translate the error returned by the server applications.
func convertCode(err error) codes.Code {
switch err {
case nil:
return codes.OK
case io.EOF:
return codes.OutOfRange
case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
return codes.FailedPrecondition
case os.ErrInvalid:
return codes.InvalidArgument
case context.Canceled:
return codes.Canceled
case context.DeadlineExceeded:
return codes.DeadlineExceeded
}
switch {
case os.IsExist(err):
return codes.AlreadyExists
case os.IsNotExist(err):
return codes.NotFound
case os.IsPermission(err):
return codes.PermissionDenied
}
return codes.Unknown
}
...@@ -26,7 +26,6 @@ import ( ...@@ -26,7 +26,6 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"os"
netctx "golang.org/x/net/context" netctx "golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
...@@ -49,6 +48,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro ...@@ -49,6 +48,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro
// toRPCErr converts an error into an error from the status package. // toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error { func toRPCErr(err error) error {
if err == nil || err == io.EOF {
return err
}
if _, ok := status.FromError(err); ok { if _, ok := status.FromError(err); ok {
return err return err
} }
...@@ -63,37 +65,7 @@ func toRPCErr(err error) error { ...@@ -63,37 +65,7 @@ func toRPCErr(err error) error {
return status.Error(codes.DeadlineExceeded, err.Error()) return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled, netctx.Canceled: case context.Canceled, netctx.Canceled:
return status.Error(codes.Canceled, err.Error()) return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
} }
} }
return status.Error(codes.Unknown, err.Error()) return status.Error(codes.Unknown, err.Error())
} }
// convertCode converts a standard Go error into its canonical code. Note that
// this is only used to translate the error returned by the server applications.
func convertCode(err error) codes.Code {
switch err {
case nil:
return codes.OK
case io.EOF:
return codes.OutOfRange
case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
return codes.FailedPrecondition
case os.ErrInvalid:
return codes.InvalidArgument
case context.Canceled, netctx.Canceled:
return codes.Canceled
case context.DeadlineExceeded, netctx.DeadlineExceeded:
return codes.DeadlineExceeded
}
switch {
case os.IsExist(err):
return codes.AlreadyExists
case os.IsNotExist(err):
return codes.NotFound
case os.IsPermission(err):
return codes.PermissionDenied
}
return codes.Unknown
}
This diff is collapsed.
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package grpc.lb.v1;
option go_package = "google.golang.org/grpc/grpclb/grpc_lb_v1/messages";
message Duration {
// Signed seconds of the span of time. Must be from -315,576,000,000
// to +315,576,000,000 inclusive.
int64 seconds = 1;
// Signed fractions of a second at nanosecond resolution of the span
// of time. Durations less than one second are represented with a 0
// `seconds` field and a positive or negative `nanos` field. For durations
// of one second or more, a non-zero value for the `nanos` field must be
// of the same sign as the `seconds` field. Must be from -999,999,999
// to +999,999,999 inclusive.
int32 nanos = 2;
}
message Timestamp {
// Represents seconds of UTC time since Unix epoch
// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
// 9999-12-31T23:59:59Z inclusive.
int64 seconds = 1;
// Non-negative fractions of a second at nanosecond resolution. Negative
// second values with fractions must still have non-negative nanos values
// that count forward in time. Must be from 0 to 999,999,999
// inclusive.
int32 nanos = 2;
}
message LoadBalanceRequest {
oneof load_balance_request_type {
// This message should be sent on the first request to the load balancer.
InitialLoadBalanceRequest initial_request = 1;
// The client stats should be periodically reported to the load balancer
// based on the duration defined in the InitialLoadBalanceResponse.
ClientStats client_stats = 2;
}
}
message InitialLoadBalanceRequest {
// Name of load balanced service (IE, balancer.service.com)
// length should be less than 256 bytes.
string name = 1;
}
// Contains client level statistics that are useful to load balancing. Each
// count except the timestamp should be reset to zero after reporting the stats.
message ClientStats {
// The timestamp of generating the report.
Timestamp timestamp = 1;
// The total number of RPCs that started.
int64 num_calls_started = 2;
// The total number of RPCs that finished.
int64 num_calls_finished = 3;
// The total number of RPCs that were dropped by the client because of rate
// limiting.
int64 num_calls_finished_with_drop_for_rate_limiting = 4;
// The total number of RPCs that were dropped by the client because of load
// balancing.
int64 num_calls_finished_with_drop_for_load_balancing = 5;
// The total number of RPCs that failed to reach a server except dropped RPCs.
int64 num_calls_finished_with_client_failed_to_send = 6;
// The total number of RPCs that finished and are known to have been received
// by a server.
int64 num_calls_finished_known_received = 7;
}
message LoadBalanceResponse {
oneof load_balance_response_type {
// This message should be sent on the first response to the client.
InitialLoadBalanceResponse initial_response = 1;
// Contains the list of servers selected by the load balancer. The client
// should send requests to these servers in the specified order.
ServerList server_list = 2;
}
}
message InitialLoadBalanceResponse {
// This is an application layer redirect that indicates the client should use
// the specified server for load balancing. When this field is non-empty in
// the response, the client should open a separate connection to the
// load_balancer_delegate and call the BalanceLoad method. Its length should
// be less than 64 bytes.
string load_balancer_delegate = 1;
// This interval defines how often the client should send the client stats
// to the load balancer. Stats should only be reported when the duration is
// positive.
Duration client_stats_report_interval = 2;
}
message ServerList {
// Contains a list of servers selected by the load balancer. The list will
// be updated when server resolutions change or as needed to balance load
// across more servers. The client should consume the server list in order
// unless instructed otherwise via the client_config.
repeated Server servers = 1;
// Was google.protobuf.Duration expiration_interval.
reserved 3;
}
// Contains server information. When none of the [drop_for_*] fields are true,
// use the other fields. When drop_for_rate_limiting is true, ignore all other
// fields. Use drop_for_load_balancing only when it is true and
// drop_for_rate_limiting is false.
message Server {
// A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address.
bytes ip_address = 1;
// A resolved port number for the server.
int32 port = 2;
// An opaque but printable token given to the frontend for each pick. All
// frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to
// allow the backend to report load to the gRPC LB system.
//
// Its length is variable but less than 50 bytes.
string load_balance_token = 3;
// Indicates whether this particular request should be dropped by the client
// for rate limiting.
bool drop_for_rate_limiting = 4;
// Indicates whether this particular request should be dropped by the client
// for load balancing.
bool drop_for_load_balancing = 5;
}
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"sync"
"sync/atomic"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
"google.golang.org/grpc/status"
)
type rpcStats struct {
NumCallsStarted int64
NumCallsFinished int64
NumCallsFinishedWithDropForRateLimiting int64
NumCallsFinishedWithDropForLoadBalancing int64
NumCallsFinishedWithClientFailedToSend int64
NumCallsFinishedKnownReceived int64
}
// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
func (s *rpcStats) toClientStats() *lbpb.ClientStats {
stats := &lbpb.ClientStats{
NumCallsStarted: atomic.SwapInt64(&s.NumCallsStarted, 0),
NumCallsFinished: atomic.SwapInt64(&s.NumCallsFinished, 0),
NumCallsFinishedWithDropForRateLimiting: atomic.SwapInt64(&s.NumCallsFinishedWithDropForRateLimiting, 0),
NumCallsFinishedWithDropForLoadBalancing: atomic.SwapInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 0),
NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.NumCallsFinishedWithClientFailedToSend, 0),
NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.NumCallsFinishedKnownReceived, 0),
}
return stats
}
func (s *rpcStats) dropForRateLimiting() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithDropForRateLimiting, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
}
func (s *rpcStats) dropForLoadBalancing() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
}
func (s *rpcStats) failedToSend() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithClientFailedToSend, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
}
func (s *rpcStats) knownReceived() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedKnownReceived, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
}
type errPicker struct {
// Pick always returns this err.
err error
}
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
// rrPicker does roundrobin on subConns. It's typically used when there's no
// response from remote balancer, and grpclb falls back to the resolved
// backends.
//
// It guaranteed that len(subConns) > 0.
type rrPicker struct {
mu sync.Mutex
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int
}
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
defer p.mu.Unlock()
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
return sc, nil, nil
}
// lbPicker does two layers of picks:
//
// First layer: roundrobin on all servers in serverList, including drops and backends.
// - If it picks a drop, the RPC will fail as being dropped.
// - If it picks a backend, do a second layer pick to pick the real backend.
//
// Second layer: roundrobin on all READY backends.
//
// It's guaranteed that len(serverList) > 0.
type lbPicker struct {
mu sync.Mutex
serverList []*lbpb.Server
serverListNext int
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int
stats *rpcStats
}
func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
defer p.mu.Unlock()
// Layer one roundrobin on serverList.
s := p.serverList[p.serverListNext]
p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
// If it's a drop, return an error and fail the RPC.
if s.DropForRateLimiting {
p.stats.dropForRateLimiting()
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
}
if s.DropForLoadBalancing {
p.stats.dropForLoadBalancing()
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
}
// If not a drop but there's no ready subConns.
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
// Return the next ready subConn in the list, also collect rpc stats.
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
done := func(info balancer.DoneInfo) {
if !info.BytesSent {
p.stats.failedToSend()
} else if info.BytesReceived {
p.stats.knownReceived()
}
}
return sc, done, nil
}
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"fmt"
"net"
"reflect"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
// processServerList updates balaner's internal state, create/remove SubConns
// and regenerates picker using the received serverList.
func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
grpclog.Infof("lbBalancer: processing server list: %+v", l)
lb.mu.Lock()
defer lb.mu.Unlock()
// Set serverListReceived to true so fallback will not take effect if it has
// not hit timeout.
lb.serverListReceived = true
// If the new server list == old server list, do nothing.
if reflect.DeepEqual(lb.fullServerList, l.Servers) {
grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
return
}
lb.fullServerList = l.Servers
var backendAddrs []resolver.Address
for _, s := range l.Servers {
if s.DropForLoadBalancing || s.DropForRateLimiting {
continue
}
md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
ip := net.IP(s.IpAddress)
ipStr := ip.String()
if ip.To4() == nil {
// Add square brackets to ipv6 addresses, otherwise net.Dial() and
// net.SplitHostPort() will return too many colons error.
ipStr = fmt.Sprintf("[%s]", ipStr)
}
addr := resolver.Address{
Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
Metadata: &md,
}
backendAddrs = append(backendAddrs, addr)
}
// Call refreshSubConns to create/remove SubConns.
backendsUpdated := lb.refreshSubConns(backendAddrs)
// If no backend was updated, no SubConn will be newed/removed. But since
// the full serverList was different, there might be updates in drops or
// pick weights(different number of duplicates). We need to update picker
// with the fulllist.
if !backendsUpdated {
lb.regeneratePicker()
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}
}
// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
// indicating whether the backendAddrs are different from the cached
// backendAddrs (whether any SubConn was newed/removed).
// Caller must hold lb.mu.
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
lb.backendAddrs = nil
var backendsUpdated bool
// addrsSet is the set converted from backendAddrs, it's used to quick
// lookup for an address.
addrsSet := make(map[resolver.Address]struct{})
// Create new SubConns.
for _, addr := range backendAddrs {
addrWithoutMD := addr
addrWithoutMD.Metadata = nil
addrsSet[addrWithoutMD] = struct{}{}
lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
if _, ok := lb.subConns[addrWithoutMD]; !ok {
backendsUpdated = true
// Use addrWithMD to create the SubConn.
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("roundrobinBalancer: failed to create new SubConn: %v", err)
continue
}
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
lb.scStates[sc] = connectivity.Idle
sc.Connect()
}
}
for a, sc := range lb.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
backendsUpdated = true
lb.cc.RemoveSubConn(sc)
delete(lb.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
}
}
return backendsUpdated
}
func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
for {
reply, err := s.Recv()
if err != nil {
return fmt.Errorf("grpclb: failed to recv server list: %v", err)
}
if serverList := reply.GetServerList(); serverList != nil {
lb.processServerList(serverList)
}
}
}
func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-s.Context().Done():
return
}
stats := lb.clientStats.toClientStats()
t := time.Now()
stats.Timestamp = &lbpb.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
if err := s.Send(&lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
ClientStats: stats,
},
}); err != nil {
return
}
}
}
func (lb *lbBalancer) callRemoteBalancer() error {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, FailFast(false))
if err != nil {
return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}
// grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbpb.InitialLoadBalanceRequest{
Name: lb.target,
},
},
}
if err := stream.Send(initReq); err != nil {
return fmt.Errorf("grpclb: failed to send init request: %v", err)
}
reply, err := stream.Recv()
if err != nil {
return fmt.Errorf("grpclb: failed to recv init response: %v", err)
}
initResp := reply.GetInitialResponse()
if initResp == nil {
return fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
}
if initResp.LoadBalancerDelegate != "" {
return fmt.Errorf("grpclb: Delegation is not supported")
}
go func() {
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d)
}
}()
return lb.readServerList(stream)
}
func (lb *lbBalancer) watchRemoteBalancer() {
for {
err := lb.callRemoteBalancer()
select {
case <-lb.doneCh:
return
default:
if err != nil {
grpclog.Error(err)
}
}
}
}
func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
var dopts []DialOption
if creds := lb.opt.DialCreds; creds != nil {
if err := creds.OverrideServerName(remoteLBName); err == nil {
dopts = append(dopts, WithTransportCredentials(creds))
} else {
grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
dopts = append(dopts, WithInsecure())
}
} else {
dopts = append(dopts, WithInsecure())
}
if lb.opt.Dialer != nil {
// WithDialer takes a different type of function, so we instead use a
// special DialOption here.
dopts = append(dopts, withContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
dopts = append(dopts, withResolverBuilder(lb.manualResolver))
// Dial using manualResolver.Scheme, which is a random scheme generated
// when init grpclb. The target name is not important.
cc, err := Dial("grpclb:///grpclb.server", dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
}
lb.ccRemoteLB = cc
go lb.watchRemoteBalancer()
}
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// The parent ClientConn should re-resolve when grpclb loses connection to the
// remote balancer. When the ClientConn inside grpclb gets a TransientFailure,
// it calls lbManualResolver.ResolveNow(), which calls parent ClientConn's
// ResolveNow, and eventually results in re-resolve happening in parent
// ClientConn's resolver (DNS for example).
//
// parent
// ClientConn
// +-----------------------------------------------------------------+
// | parent +---------------------------------+ |
// | DNS ClientConn | grpclb | |
// | resolver balancerWrapper | | |
// | + + | grpclb grpclb | |
// | | | | ManualResolver ClientConn | |
// | | | | + + | |
// | | | | | | Transient | |
// | | | | | | Failure | |
// | | | | | <--------- | | |
// | | | <--------------- | ResolveNow | | |
// | | <--------- | ResolveNow | | | | |
// | | ResolveNow | | | | | |
// | | | | | | | |
// | + + | + + | |
// | +---------------------------------+ |
// +-----------------------------------------------------------------+
// lbManualResolver is used by the ClientConn inside grpclb. It's a manual
// resolver with a special ResolveNow() function.
//
// When ResolveNow() is called, it calls ResolveNow() on the parent ClientConn,
// so when grpclb client lose contact with remote balancers, the parent
// ClientConn's resolver will re-resolve.
type lbManualResolver struct {
scheme string
ccr resolver.ClientConn
ccb balancer.ClientConn
}
func (r *lbManualResolver) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
r.ccr = cc
return r, nil
}
func (r *lbManualResolver) Scheme() string {
return r.scheme
}
// ResolveNow calls resolveNow on the parent ClientConn.
func (r *lbManualResolver) ResolveNow(o resolver.ResolveNowOption) {
r.ccb.ResolveNow(o)
}
// Close is a noop for Resolver.
func (*lbManualResolver) Close() {}
// NewAddress calls cc.NewAddress.
func (r *lbManualResolver) NewAddress(addrs []resolver.Address) {
r.ccr.NewAddress(addrs)
}
// NewServiceConfig calls cc.NewServiceConfig.
func (r *lbManualResolver) NewServiceConfig(sc string) {
r.ccr.NewServiceConfig(sc)
}
...@@ -105,18 +105,21 @@ func Fatalln(args ...interface{}) { ...@@ -105,18 +105,21 @@ func Fatalln(args ...interface{}) {
} }
// Print prints to the logger. Arguments are handled in the manner of fmt.Print. // Print prints to the logger. Arguments are handled in the manner of fmt.Print.
//
// Deprecated: use Info. // Deprecated: use Info.
func Print(args ...interface{}) { func Print(args ...interface{}) {
logger.Info(args...) logger.Info(args...)
} }
// Printf prints to the logger. Arguments are handled in the manner of fmt.Printf. // Printf prints to the logger. Arguments are handled in the manner of fmt.Printf.
//
// Deprecated: use Infof. // Deprecated: use Infof.
func Printf(format string, args ...interface{}) { func Printf(format string, args ...interface{}) {
logger.Infof(format, args...) logger.Infof(format, args...)
} }
// Println prints to the logger. Arguments are handled in the manner of fmt.Println. // Println prints to the logger. Arguments are handled in the manner of fmt.Println.
//
// Deprecated: use Infoln. // Deprecated: use Infoln.
func Println(args ...interface{}) { func Println(args ...interface{}) {
logger.Infoln(args...) logger.Infoln(args...)
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package grpclog package grpclog
// Logger mimics golang's standard Logger as an interface. // Logger mimics golang's standard Logger as an interface.
//
// Deprecated: use LoggerV2. // Deprecated: use LoggerV2.
type Logger interface { type Logger interface {
Fatal(args ...interface{}) Fatal(args ...interface{})
...@@ -31,6 +32,7 @@ type Logger interface { ...@@ -31,6 +32,7 @@ type Logger interface {
// SetLogger sets the logger that is used in grpc. Call only from // SetLogger sets the logger that is used in grpc. Call only from
// init() functions. // init() functions.
//
// Deprecated: use SetLoggerV2. // Deprecated: use SetLoggerV2.
func SetLogger(l Logger) { func SetLogger(l Logger) {
logger = &loggerWrapper{Logger: l} logger = &loggerWrapper{Logger: l}
......
...@@ -48,7 +48,9 @@ type UnaryServerInfo struct { ...@@ -48,7 +48,9 @@ type UnaryServerInfo struct {
} }
// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal // UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal
// execution of a unary RPC. // execution of a unary RPC. If a UnaryHandler returns an error, it should be produced by the
// status package, or else gRPC will use codes.Unknown as the status code and err.Error() as
// the status message of the RPC.
type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error) type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info // UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
......
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package backoff implement the backoff strategy for gRPC.
//
// This is kept in internal until the gRPC project decides whether or not to
// allow alternative backoff strategies.
package backoff
import (
"time"
"google.golang.org/grpc/internal/grpcrand"
)
// Strategy defines the methodology for backing off after a grpc connection
// failure.
//
type Strategy interface {
// Backoff returns the amount of time to wait before the next retry given
// the number of consecutive failures.
Backoff(retries int) time.Duration
}
const (
// baseDelay is the amount of time to wait before retrying after the first
// failure.
baseDelay = 1.0 * time.Second
// factor is applied to the backoff after each retry.
factor = 1.6
// jitter provides a range to randomize backoff delays.
jitter = 0.2
)
// Exponential implements exponential backoff algorithm as defined in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
type Exponential struct {
// MaxDelay is the upper bound of backoff delay.
MaxDelay time.Duration
}
// Backoff returns the amount of time to wait before the next retry given the
// number of retries.
func (bc Exponential) Backoff(retries int) time.Duration {
if retries == 0 {
return baseDelay
}
backoff, max := float64(baseDelay), float64(bc.MaxDelay)
for backoff < max && retries > 0 {
backoff *= factor
retries--
}
if backoff > max {
backoff = max
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
backoff *= 1 + jitter*(grpcrand.Float64()*2-1)
if backoff < 0 {
return 0
}
return time.Duration(backoff)
}
This diff is collapsed.
This diff is collapsed.
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package grpcrand implements math/rand functions in a concurrent-safe way
// with a global random source, independent of math/rand's global source.
package grpcrand
import (
"math/rand"
"sync"
"time"
)
var (
r = rand.New(rand.NewSource(time.Now().UnixNano()))
mu sync.Mutex
)
// Int63n implements rand.Int63n on the grpcrand global source.
func Int63n(n int64) int64 {
mu.Lock()
res := r.Int63n(n)
mu.Unlock()
return res
}
// Intn implements rand.Intn on the grpcrand global source.
func Intn(n int) int {
mu.Lock()
res := r.Intn(n)
mu.Unlock()
return res
}
// Float64 implements rand.Float64 on the grpcrand global source.
func Float64() float64 {
mu.Lock()
res := r.Float64()
mu.Unlock()
return res
}
...@@ -15,13 +15,22 @@ ...@@ -15,13 +15,22 @@
* *
*/ */
// Package internal contains gRPC-internal code for testing, to avoid polluting // Package internal contains gRPC-internal code, to avoid polluting
// the godoc of the top-level grpc package. // the godoc of the top-level grpc package. It must not import any grpc
// symbols to avoid circular dependencies.
package internal package internal
// TestingUseHandlerImpl enables the http.Handler-based server implementation. var (
// It must be called before Serve and requires TLS credentials.
// // TestingUseHandlerImpl enables the http.Handler-based server implementation.
// The provided grpcServer must be of type *grpc.Server. It is untyped // It must be called before Serve and requires TLS credentials.
// for circular dependency reasons. //
var TestingUseHandlerImpl func(grpcServer interface{}) // The provided grpcServer must be of type *grpc.Server. It is untyped
// for circular dependency reasons.
TestingUseHandlerImpl func(grpcServer interface{})
// WithContextDialer is exported by clientconn.go
WithContextDialer interface{} // func(context.Context, string) (net.Conn, error) grpc.DialOption
// WithResolverBuilder is exported by clientconn.go
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
)
...@@ -17,7 +17,8 @@ ...@@ -17,7 +17,8 @@
*/ */
// Package metadata define the structure of the metadata supported by gRPC library. // Package metadata define the structure of the metadata supported by gRPC library.
// Please refer to https://grpc.io/docs/guides/wire.html for more information about custom-metadata. // Please refer to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
// for more information about custom-metadata.
package metadata // import "google.golang.org/grpc/metadata" package metadata // import "google.golang.org/grpc/metadata"
import ( import (
...@@ -27,7 +28,9 @@ import ( ...@@ -27,7 +28,9 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
// DecodeKeyValue returns k, v, nil. It is deprecated and should not be used. // DecodeKeyValue returns k, v, nil.
//
// Deprecated: use k and v directly instead.
func DecodeKeyValue(k, v string) (string, string, error) { func DecodeKeyValue(k, v string) (string, string, error) {
return k, v, nil return k, v, nil
} }
...@@ -94,6 +97,30 @@ func (md MD) Copy() MD { ...@@ -94,6 +97,30 @@ func (md MD) Copy() MD {
return Join(md) return Join(md)
} }
// Get obtains the values for a given key.
func (md MD) Get(k string) []string {
k = strings.ToLower(k)
return md[k]
}
// Set sets the value of a given key with a slice of values.
func (md MD) Set(k string, vals ...string) {
if len(vals) == 0 {
return
}
k = strings.ToLower(k)
md[k] = vals
}
// Append adds the values to key k, not overwriting what was already stored at that key.
func (md MD) Append(k string, vals ...string) {
if len(vals) == 0 {
return
}
k = strings.ToLower(k)
md[k] = append(md[k], vals...)
}
// Join joins any number of mds into a single MD. // Join joins any number of mds into a single MD.
// The order of values for each key is determined by the order in which // The order of values for each key is determined by the order in which
// the mds containing those values are presented to Join. // the mds containing those values are presented to Join.
...@@ -115,9 +142,26 @@ func NewIncomingContext(ctx context.Context, md MD) context.Context { ...@@ -115,9 +142,26 @@ func NewIncomingContext(ctx context.Context, md MD) context.Context {
return context.WithValue(ctx, mdIncomingKey{}, md) return context.WithValue(ctx, mdIncomingKey{}, md)
} }
// NewOutgoingContext creates a new context with outgoing md attached. // NewOutgoingContext creates a new context with outgoing md attached. If used
// in conjunction with AppendToOutgoingContext, NewOutgoingContext will
// overwrite any previously-appended metadata.
func NewOutgoingContext(ctx context.Context, md MD) context.Context { func NewOutgoingContext(ctx context.Context, md MD) context.Context {
return context.WithValue(ctx, mdOutgoingKey{}, md) return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}
// AppendToOutgoingContext returns a new context with the provided kv merged
// with any existing metadata in the context. Please refer to the
// documentation of Pairs for a description of kv.
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context {
if len(kv)%2 == 1 {
panic(fmt.Sprintf("metadata: AppendToOutgoingContext got an odd number of input pairs for metadata: %d", len(kv)))
}
md, _ := ctx.Value(mdOutgoingKey{}).(rawMD)
added := make([][]string, len(md.added)+1)
copy(added, md.added)
added[len(added)-1] = make([]string, len(kv))
copy(added[len(added)-1], kv)
return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: added})
} }
// FromIncomingContext returns the incoming metadata in ctx if it exists. The // FromIncomingContext returns the incoming metadata in ctx if it exists. The
...@@ -128,10 +172,39 @@ func FromIncomingContext(ctx context.Context) (md MD, ok bool) { ...@@ -128,10 +172,39 @@ func FromIncomingContext(ctx context.Context) (md MD, ok bool) {
return return
} }
// FromOutgoingContextRaw returns the un-merged, intermediary contents
// of rawMD. Remember to perform strings.ToLower on the keys. The returned
// MD should not be modified. Writing to it may cause races. Modification
// should be made to copies of the returned MD.
//
// This is intended for gRPC-internal use ONLY.
func FromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) {
raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)
if !ok {
return nil, nil, false
}
return raw.md, raw.added, true
}
// FromOutgoingContext returns the outgoing metadata in ctx if it exists. The // FromOutgoingContext returns the outgoing metadata in ctx if it exists. The
// returned MD should not be modified. Writing to it may cause races. // returned MD should not be modified. Writing to it may cause races.
// Modification should be made to the copies of the returned MD. // Modification should be made to copies of the returned MD.
func FromOutgoingContext(ctx context.Context) (md MD, ok bool) { func FromOutgoingContext(ctx context.Context) (MD, bool) {
md, ok = ctx.Value(mdOutgoingKey{}).(MD) raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)
return if !ok {
return nil, false
}
mds := make([]MD, 0, len(raw.added)+1)
mds = append(mds, raw.md)
for _, vv := range raw.added {
mds = append(mds, Pairs(vv...))
}
return Join(mds...), ok
}
type rawMD struct {
md MD
added [][]string
} }
...@@ -153,10 +153,10 @@ type ipWatcher struct { ...@@ -153,10 +153,10 @@ type ipWatcher struct {
updateChan chan *Update updateChan chan *Update
} }
// Next returns the adrress resolution Update for the target. For IP address, // Next returns the address resolution Update for the target. For IP address,
// the resolution is itself, thus polling name server is unncessary. Therefore, // the resolution is itself, thus polling name server is unnecessary. Therefore,
// Next() will return an Update the first time it is called, and will be blocked // Next() will return an Update the first time it is called, and will be blocked
// for all following calls as no Update exisits until watcher is closed. // for all following calls as no Update exists until watcher is closed.
func (i *ipWatcher) Next() ([]*Update, error) { func (i *ipWatcher) Next() ([]*Update, error) {
u, ok := <-i.updateChan u, ok := <-i.updateChan
if !ok { if !ok {
......
...@@ -18,20 +18,26 @@ ...@@ -18,20 +18,26 @@
// Package naming defines the naming API and related data structures for gRPC. // Package naming defines the naming API and related data structures for gRPC.
// The interface is EXPERIMENTAL and may be suject to change. // The interface is EXPERIMENTAL and may be suject to change.
//
// Deprecated: please use package resolver.
package naming package naming
// Operation defines the corresponding operations for a name resolution change. // Operation defines the corresponding operations for a name resolution change.
//
// Deprecated: please use package resolver.
type Operation uint8 type Operation uint8
const ( const (
// Add indicates a new address is added. // Add indicates a new address is added.
Add Operation = iota Add Operation = iota
// Delete indicates an exisiting address is deleted. // Delete indicates an existing address is deleted.
Delete Delete
) )
// Update defines a name resolution update. Notice that it is not valid having both // Update defines a name resolution update. Notice that it is not valid having both
// empty string Addr and nil Metadata in an Update. // empty string Addr and nil Metadata in an Update.
//
// Deprecated: please use package resolver.
type Update struct { type Update struct {
// Op indicates the operation of the update. // Op indicates the operation of the update.
Op Operation Op Operation
...@@ -43,12 +49,16 @@ type Update struct { ...@@ -43,12 +49,16 @@ type Update struct {
} }
// Resolver creates a Watcher for a target to track its resolution changes. // Resolver creates a Watcher for a target to track its resolution changes.
//
// Deprecated: please use package resolver.
type Resolver interface { type Resolver interface {
// Resolve creates a Watcher for target. // Resolve creates a Watcher for target.
Resolve(target string) (Watcher, error) Resolve(target string) (Watcher, error)
} }
// Watcher watches for the updates on the specified target. // Watcher watches for the updates on the specified target.
//
// Deprecated: please use package resolver.
type Watcher interface { type Watcher interface {
// Next blocks until an update or error happens. It may return one or more // Next blocks until an update or error happens. It may return one or more
// updates. The first call should get the full set of the results. It should // updates. The first call should get the full set of the results. It should
......
...@@ -24,7 +24,6 @@ import ( ...@@ -24,7 +24,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math/rand"
"net" "net"
"os" "os"
"strconv" "strconv"
...@@ -34,6 +33,7 @@ import ( ...@@ -34,6 +33,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
...@@ -50,7 +50,9 @@ const ( ...@@ -50,7 +50,9 @@ const (
txtAttribute = "grpc_config=" txtAttribute = "grpc_config="
) )
var errMissingAddr = errors.New("missing address") var (
errMissingAddr = errors.New("missing address")
)
// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers. // NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder { func NewBuilder() resolver.Builder {
...@@ -64,6 +66,9 @@ type dnsBuilder struct { ...@@ -64,6 +66,9 @@ type dnsBuilder struct {
// Build creates and starts a DNS resolver that watches the name resolution of the target. // Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
if target.Authority != "" {
return nil, fmt.Errorf("Default DNS resolver does not support custom DNS server")
}
host, port, err := parseTarget(target.Endpoint) host, port, err := parseTarget(target.Endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -95,6 +100,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts ...@@ -95,6 +100,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
cc: cc, cc: cc,
t: time.NewTimer(0), t: time.NewTimer(0),
rn: make(chan struct{}, 1), rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
} }
d.wg.Add(1) d.wg.Add(1)
...@@ -158,6 +164,7 @@ type dnsResolver struct { ...@@ -158,6 +164,7 @@ type dnsResolver struct {
// will warns lookup (READ the lookup function pointers) inside watcher() goroutine // will warns lookup (READ the lookup function pointers) inside watcher() goroutine
// has data race with replaceNetFunc (WRITE the lookup function pointers). // has data race with replaceNetFunc (WRITE the lookup function pointers).
wg sync.WaitGroup wg sync.WaitGroup
disableServiceConfig bool
} }
// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches. // ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
...@@ -187,7 +194,7 @@ func (d *dnsResolver) watcher() { ...@@ -187,7 +194,7 @@ func (d *dnsResolver) watcher() {
result, sc := d.lookup() result, sc := d.lookup()
// Next lookup should happen after an interval defined by d.freq. // Next lookup should happen after an interval defined by d.freq.
d.t.Reset(d.freq) d.t.Reset(d.freq)
d.cc.NewServiceConfig(string(sc)) d.cc.NewServiceConfig(sc)
d.cc.NewAddress(result) d.cc.NewAddress(result)
} }
} }
...@@ -202,7 +209,7 @@ func (d *dnsResolver) lookupSRV() []resolver.Address { ...@@ -202,7 +209,7 @@ func (d *dnsResolver) lookupSRV() []resolver.Address {
for _, s := range srvs { for _, s := range srvs {
lbAddrs, err := lookupHost(d.ctx, s.Target) lbAddrs, err := lookupHost(d.ctx, s.Target)
if err != nil { if err != nil {
grpclog.Warningf("grpc: failed load banlacer address dns lookup due to %v.\n", err) grpclog.Infof("grpc: failed load balancer address dns lookup due to %v.\n", err)
continue continue
} }
for _, a := range lbAddrs { for _, a := range lbAddrs {
...@@ -221,7 +228,7 @@ func (d *dnsResolver) lookupSRV() []resolver.Address { ...@@ -221,7 +228,7 @@ func (d *dnsResolver) lookupSRV() []resolver.Address {
func (d *dnsResolver) lookupTXT() string { func (d *dnsResolver) lookupTXT() string {
ss, err := lookupTXT(d.ctx, d.host) ss, err := lookupTXT(d.ctx, d.host)
if err != nil { if err != nil {
grpclog.Warningf("grpc: failed dns TXT record lookup due to %v.\n", err) grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
return "" return ""
} }
var res string var res string
...@@ -257,10 +264,12 @@ func (d *dnsResolver) lookupHost() []resolver.Address { ...@@ -257,10 +264,12 @@ func (d *dnsResolver) lookupHost() []resolver.Address {
} }
func (d *dnsResolver) lookup() ([]resolver.Address, string) { func (d *dnsResolver) lookup() ([]resolver.Address, string) {
var newAddrs []resolver.Address newAddrs := d.lookupSRV()
newAddrs = d.lookupSRV()
// Support fallback to non-balancer address. // Support fallback to non-balancer address.
newAddrs = append(newAddrs, d.lookupHost()...) newAddrs = append(newAddrs, d.lookupHost()...)
if d.disableServiceConfig {
return newAddrs, ""
}
sc := d.lookupTXT() sc := d.lookupTXT()
return newAddrs, canaryingSC(sc) return newAddrs, canaryingSC(sc)
} }
...@@ -339,12 +348,7 @@ func chosenByPercentage(a *int) bool { ...@@ -339,12 +348,7 @@ func chosenByPercentage(a *int) bool {
if a == nil { if a == nil {
return true return true
} }
s := rand.NewSource(time.Now().UnixNano()) return grpcrand.Intn(100)+1 <= *a
r := rand.New(s)
if r.Intn(100)+1 > *a {
return false
}
return true
} }
func canaryingSC(js string) string { func canaryingSC(js string) string {
......
This diff is collapsed.
This diff is collapsed.
...@@ -169,6 +169,8 @@ func (s *OutTrailer) isRPCStats() {} ...@@ -169,6 +169,8 @@ func (s *OutTrailer) isRPCStats() {}
type End struct { type End struct {
// Client is true if this End is from client side. // Client is true if this End is from client side.
Client bool Client bool
// BeginTime is the time when the RPC began.
BeginTime time.Time
// EndTime is the time when the RPC ends. // EndTime is the time when the RPC ends.
EndTime time.Time EndTime time.Time
// Error is the error the RPC ended with. It is an error generated from // Error is the error the RPC ended with. It is an error generated from
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment