Commit f3f03271 authored by Jacob Vosmaer (GitLab)'s avatar Jacob Vosmaer (GitLab)

Merge branch 'queue-requests' into 'master'

Allow to queue API requests and limit given capacity

This MR implements an API queueing on Workhorse side.
It's meant to better control given capacity for different resources.

This is meant to solve: https://gitlab.com/gitlab-com/infrastructure/issues/320.

And make a large number of requests easier to handle: https://gitlab.com/gitlab-org/gitlab-ce/issues/21698

It fulfils these requirements:
- allow to limit capacity given to API, specifically to allow to process up to N-number of requests at single time,
- allow to queue API requests and timeout them, specifically it allows to slow down processing of API calls if the Unicorn can process the current API requests in reasonable time

The implementation is made as constant cost and it's dead simple.
It should not inflate the memory / CPU usage of Workhorse.

It works like this:
- we hook into processing of requests,
- we try to acquire slot for our request by pushing to buffered channel. The buffered channel actually limits number of processed requests at single time,
- if we can't push to channel it means that all concurrent slots are in use and we have to wait,
- we block on buffered channel for the free a slot, secondly we wait on timer to timeout on channel,
- we generate 502 if timeout occurs,
- we process request if we manage to push to channel,
- we pop from channel when we finish processing of requests, allowing other requests to fire,
- if there's already too many request (over `apiQueueLimit`) we return 429,

This introduces 3 extra parameters (off by default):
- `apiLimit` - limit number of concurrent API requests,
- `apiQueueLimit` - limit the backlog for queueing,
- `apiQueueTimeout` - duration after we timeout requests if they sit too long in queue.

This allows:
- limit used capacity to any number of available workers, ex. allowing for API to use at most 25% of capacity,
- slowly process requests in case of slowness,
- better manage the API calls then rate limiting requests,
- by slowing down we are automatically backing off all services using API,


See merge request !65
parents 0b970386 06400541
...@@ -13,6 +13,12 @@ gitlab-workhorse'][brief-history-blog]. ...@@ -13,6 +13,12 @@ gitlab-workhorse'][brief-history-blog].
gitlab-workhorse [OPTIONS] gitlab-workhorse [OPTIONS]
Options: Options:
-apiLimit uint
Number of API requests allowed at single time
-apiQueueDuration duration
Maximum queueing duration of requests (default 30s)
-apiQueueLimit uint
Number of API requests allowed to be queued
-authBackend string -authBackend string
Authentication/authorization backend (default "http://localhost:8080") Authentication/authorization backend (default "http://localhost:8080")
-authSocket string -authSocket string
......
...@@ -21,6 +21,18 @@ func LogError(r *http.Request, err error) { ...@@ -21,6 +21,18 @@ func LogError(r *http.Request, err error) {
printError(r, err) printError(r, err)
} }
func ServiceUnavailable(w http.ResponseWriter, r *http.Request, err error) {
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
captureRavenError(r, err)
printError(r, err)
}
func TooManyRequests(w http.ResponseWriter, r *http.Request, err error) {
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
captureRavenError(r, err)
printError(r, err)
}
func printError(r *http.Request, err error) { func printError(r *http.Request, err error) {
if r != nil { if r != nil {
log.Printf("error: %s %q: %v", r.Method, r.RequestURI, err) log.Printf("error: %s %q: %v", r.Method, r.RequestURI, err)
......
package queueing
import (
"errors"
"time"
)
type errTooManyRequests struct{ error }
type errQueueingTimedout struct{ error }
var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}
type Queue struct {
busyCh chan struct{}
waitingCh chan struct{}
}
// NewQueue creates a new queue
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// if the number of requests is above the limit
func NewQueue(limit, queueLimit uint) *Queue {
return &Queue{
busyCh: make(chan struct{}, limit),
waitingCh: make(chan struct{}, limit+queueLimit),
}
}
// Acquire takes one slot from the Queue
// and returns when a request should be processed
// it allows up to (limit) of requests running at a time
// it allows to queue up to (queue-limit) requests
func (s *Queue) Acquire(timeout time.Duration) (err error) {
// push item to a queue to claim your own slot (non-blocking)
select {
case s.waitingCh <- struct{}{}:
break
default:
return ErrTooManyRequests
}
defer func() {
if err != nil {
<-s.waitingCh
}
}()
// fast path: push item to current processed items (non-blocking)
select {
case s.busyCh <- struct{}{}:
return nil
default:
break
}
timer := time.NewTimer(timeout)
defer timer.Stop()
// push item to current processed items (blocking)
select {
case s.busyCh <- struct{}{}:
return nil
case <-timer.C:
return ErrQueueingTimedout
}
}
// Release marks the finish of processing of requests
// It triggers next request to be processed if it's in queue
func (s *Queue) Release() {
// dequeue from queue to allow next request to be processed
<-s.waitingCh
<-s.busyCh
}
package queueing
import (
"testing"
"time"
)
func TestNormalQueueing(t *testing.T) {
q := NewQueue(2, 1)
err1 := q.Acquire(time.Microsecond)
if err1 != nil {
t.Fatal("we should acquire a new slot")
}
err2 := q.Acquire(time.Microsecond)
if err2 != nil {
t.Fatal("we should acquire a new slot")
}
err3 := q.Acquire(time.Microsecond)
if err3 != ErrQueueingTimedout {
t.Fatal("we should timeout")
}
q.Release()
err4 := q.Acquire(time.Microsecond)
if err4 != nil {
t.Fatal("we should acquire a new slot")
}
}
func TestQueueLimit(t *testing.T) {
q := NewQueue(1, 0)
err1 := q.Acquire(time.Microsecond)
if err1 != nil {
t.Fatal("we should acquire a new slot")
}
err2 := q.Acquire(time.Microsecond)
if err2 != ErrTooManyRequests {
t.Fatal("we should fail because of not enough slots in queue")
}
}
func TestQueueProcessing(t *testing.T) {
q := NewQueue(1, 1)
err1 := q.Acquire(time.Microsecond)
if err1 != nil {
t.Fatal("we should acquire a new slot")
}
go func() {
time.Sleep(50 * time.Microsecond)
q.Release()
}()
err2 := q.Acquire(time.Second)
if err2 != nil {
t.Fatal("we should acquire slot after the previous one finished")
}
}
package queueing
import (
"net/http"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
const DefaultTimeout = 30 * time.Second
func QueueRequests(h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler {
if limit == 0 {
return h
}
if queueTimeout == 0 {
queueTimeout = DefaultTimeout
}
queue := NewQueue(limit, queueLimit)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := queue.Acquire(queueTimeout)
switch err {
case nil:
defer queue.Release()
h.ServeHTTP(w, r)
case ErrTooManyRequests:
helper.TooManyRequests(w, r, err)
case ErrQueueingTimedout:
helper.ServiceUnavailable(w, r, err)
default:
helper.Fail500(w, r, err)
}
})
}
package queueing
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
)
var httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "OK")
})
func pausedHttpHandler(pauseCh chan struct{}) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-pauseCh
fmt.Fprintln(w, "OK")
})
}
func TestNormalRequestProcessing(t *testing.T) {
w := httptest.NewRecorder()
h := QueueRequests(httpHandler, 1, 1, time.Second)
h.ServeHTTP(w, nil)
if w.Code != 200 {
t.Fatal("QueueRequests should process request")
}
}
// testSlowRequestProcessing creates a new queue,
// then it runs a number of requests that are going through queue,
// we return the response of first finished request,
// where status of request can be 200, 429 or 503
func testSlowRequestProcessing(count int, limit, queueLimit uint, queueTimeout time.Duration) *httptest.ResponseRecorder {
pauseCh := make(chan struct{})
defer close(pauseCh)
handler := QueueRequests(pausedHttpHandler(pauseCh), limit, queueLimit, queueTimeout)
respCh := make(chan *httptest.ResponseRecorder, count)
// queue requests to use up the queue
for i := 0; i < count; i++ {
go func() {
w := httptest.NewRecorder()
handler.ServeHTTP(w, nil)
respCh <- w
}()
}
// dequeue first request
return <-respCh
}
// TestQueueingTimeout performs 2 requests
// the queue limit and length is 1,
// the second request gets timed-out
func TestQueueingTimeout(t *testing.T) {
w := testSlowRequestProcessing(2, 1, 1, time.Microsecond)
if w.Code != 503 {
t.Fatal("QueueRequests should timeout queued request")
}
}
// TestQueueingTooManyRequests performs 3 requests
// the queue limit and length is 1,
// so the third request has to be rejected with 429
func TestQueueingTooManyRequests(t *testing.T) {
w := testSlowRequestProcessing(3, 1, 1, time.Minute)
if w.Code != 429 {
t.Fatal("QueueRequests should return immediately and return too many requests")
}
}
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/git" "gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs" "gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs"
proxypkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy" proxypkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata" "gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/sendfile" "gitlab.com/gitlab-org/gitlab-workhorse/internal/sendfile"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/staticpages" "gitlab.com/gitlab-org/gitlab-workhorse/internal/staticpages"
...@@ -55,6 +56,7 @@ func (u *Upstream) configureRoutes() { ...@@ -55,6 +56,7 @@ func (u *Upstream) configureRoutes() {
git.SendPatch, git.SendPatch,
artifacts.SendEntry, artifacts.SendEntry,
) )
apiProxyQueue := queueing.QueueRequests(proxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
u.Routes = []route{ u.Routes = []route{
// Git Clone // Git Clone
...@@ -67,8 +69,8 @@ func (u *Upstream) configureRoutes() { ...@@ -67,8 +69,8 @@ func (u *Upstream) configureRoutes() {
route{"POST", regexp.MustCompile(ciAPIPattern + `v1/builds/[0-9]+/artifacts\z`), contentEncodingHandler(artifacts.UploadArtifacts(api, proxy))}, route{"POST", regexp.MustCompile(ciAPIPattern + `v1/builds/[0-9]+/artifacts\z`), contentEncodingHandler(artifacts.UploadArtifacts(api, proxy))},
// Explicitly proxy API requests // Explicitly proxy API requests
route{"", regexp.MustCompile(apiPattern), proxy}, route{"", regexp.MustCompile(apiPattern), apiProxyQueue},
route{"", regexp.MustCompile(ciAPIPattern), proxy}, route{"", regexp.MustCompile(ciAPIPattern), apiProxyQueue},
// Serve assets // Serve assets
route{"", regexp.MustCompile(`^/assets/`), route{"", regexp.MustCompile(`^/assets/`),
......
...@@ -20,30 +20,34 @@ import ( ...@@ -20,30 +20,34 @@ import (
var DefaultBackend = helper.URLMustParse("http://localhost:8080") var DefaultBackend = helper.URLMustParse("http://localhost:8080")
type Upstream struct { type Config struct {
Backend *url.URL Backend *url.URL
Version string Version string
SecretPath string SecretPath string
DocumentRoot string DocumentRoot string
DevelopmentMode bool DevelopmentMode bool
Socket string
ProxyHeadersTimeout time.Duration
APILimit uint
APIQueueLimit uint
APIQueueTimeout time.Duration
}
type Upstream struct {
Config
URLPrefix urlprefix.Prefix URLPrefix urlprefix.Prefix
Routes []route Routes []route
RoundTripper *badgateway.RoundTripper RoundTripper *badgateway.RoundTripper
} }
func NewUpstream(backend *url.URL, socket, version, secretFile, documentRoot string, developmentMode bool, proxyHeadersTimeout time.Duration) *Upstream { func NewUpstream(config Config) *Upstream {
up := Upstream{ up := Upstream{
Backend: backend, Config: config,
Version: version,
SecretPath: secretFile,
DocumentRoot: documentRoot,
DevelopmentMode: developmentMode,
} }
if backend == nil { if up.Backend == nil {
up.Backend = DefaultBackend up.Backend = DefaultBackend
} }
up.RoundTripper = badgateway.NewRoundTripper(up.Backend, socket, proxyHeadersTimeout) up.RoundTripper = badgateway.NewRoundTripper(up.Backend, up.Socket, up.ProxyHeadersTimeout)
up.configureURLPrefix() up.configureURLPrefix()
up.configureRoutes() up.configureRoutes()
return &up return &up
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"syscall" "syscall"
"time" "time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream" "gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream"
) )
...@@ -41,6 +42,9 @@ var documentRoot = flag.String("documentRoot", "public", "Path to static files c ...@@ -41,6 +42,9 @@ var documentRoot = flag.String("documentRoot", "public", "Path to static files c
var proxyHeadersTimeout = flag.Duration("proxyHeadersTimeout", 5*time.Minute, "How long to wait for response headers when proxying the request") var proxyHeadersTimeout = flag.Duration("proxyHeadersTimeout", 5*time.Minute, "How long to wait for response headers when proxying the request")
var developmentMode = flag.Bool("developmentMode", false, "Allow to serve assets from Rails app") var developmentMode = flag.Bool("developmentMode", false, "Allow to serve assets from Rails app")
var secretPath = flag.String("secretPath", "./.gitlab_workhorse_secret", "File with secret key to authenticate with authBackend") var secretPath = flag.String("secretPath", "./.gitlab_workhorse_secret", "File with secret key to authenticate with authBackend")
var apiLimit = flag.Uint("apiLimit", 0, "Number of API requests allowed at single time")
var apiQueueLimit = flag.Uint("apiQueueLimit", 0, "Number of API requests allowed to be queued")
var apiQueueTimeout = flag.Duration("apiQueueDuration", queueing.DefaultTimeout, "Maximum queueing duration of requests")
func main() { func main() {
flag.Usage = func() { flag.Usage = func() {
...@@ -89,16 +93,20 @@ func main() { ...@@ -89,16 +93,20 @@ func main() {
}() }()
} }
up := wrapRaven( upConfig := upstream.Config{
upstream.NewUpstream( Backend: backendURL,
backendURL, Socket: *authSocket,
*authSocket, Version: Version,
Version, SecretPath: *secretPath,
*secretPath, DocumentRoot: *documentRoot,
*documentRoot, DevelopmentMode: *developmentMode,
*developmentMode, ProxyHeadersTimeout: *proxyHeadersTimeout,
*proxyHeadersTimeout, APILimit: *apiLimit,
)) APIQueueLimit: *apiQueueLimit,
APIQueueTimeout: *apiQueueTimeout,
}
up := wrapRaven(upstream.NewUpstream(upConfig))
log.Fatal(http.Serve(listener, up)) log.Fatal(http.Serve(listener, up))
} }
...@@ -868,15 +868,14 @@ func archiveOKServer(t *testing.T, archiveName string) *httptest.Server { ...@@ -868,15 +868,14 @@ func archiveOKServer(t *testing.T, archiveName string) *httptest.Server {
} }
func startWorkhorseServer(authBackend string) *httptest.Server { func startWorkhorseServer(authBackend string) *httptest.Server {
u := upstream.NewUpstream( config := upstream.Config{
helper.URLMustParse(authBackend), Backend: helper.URLMustParse(authBackend),
"", Version: "123",
"123", SecretPath: testhelper.SecretPath(),
testhelper.SecretPath(), DocumentRoot: testDocumentRoot,
testDocumentRoot, }
false,
0, u := upstream.NewUpstream(config)
)
return httptest.NewServer(u) return httptest.NewServer(u)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment