register.go 5.44 KB
Newer Older
1 2 3
package builds

import (
4 5
	"encoding/json"
	"errors"
6 7 8
	"net/http"
	"time"

9
	"github.com/prometheus/client_golang/prometheus"
10

11 12 13
	"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
	"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
)
14

15
const (
16 17 18 19
	maxRegisterBodySize         = 32 * 1024
	runnerBuildQueue            = "runner:build_queue:"
	runnerBuildQueueHeaderKey   = "Gitlab-Ci-Builds-Polling"
	runnerBuildQueueHeaderValue = "yes"
20
)
21

22
var (
23
	registerHandlerRequests = prometheus.NewCounterVec(
24
		prometheus.CounterOpts{
25
			Name: "gitlab_workhorse_builds_register_handler_requests",
Kamil Trzcinski's avatar
Kamil Trzcinski committed
26
			Help: "Describes how many requests in different states hit a register handler",
27 28 29
		},
		[]string{"status"},
	)
Kamil Trzcinski's avatar
Kamil Trzcinski committed
30 31 32 33 34 35 36
	registerHandlerOpen = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "gitlab_workhorse_builds_register_handler_open",
			Help: "Describes how many requests is currently open in given state",
		},
		[]string{"state"},
	)
37

38 39 40 41
	registerHandlerOpenAtReading  = registerHandlerOpen.WithLabelValues("reading")
	registerHandlerOpenAtProxying = registerHandlerOpen.WithLabelValues("proxying")
	registerHandlerOpenAtWatching = registerHandlerOpen.WithLabelValues("watching")

42 43 44 45 46 47 48 49
	registerHandlerBodyReadErrors         = registerHandlerRequests.WithLabelValues("body-read-error")
	registerHandlerBodyParseErrors        = registerHandlerRequests.WithLabelValues("body-parse-error")
	registerHandlerMissingValues          = registerHandlerRequests.WithLabelValues("missing-values")
	registerHandlerWatchErrors            = registerHandlerRequests.WithLabelValues("watch-error")
	registerHandlerAlreadyChangedRequests = registerHandlerRequests.WithLabelValues("already-changed")
	registerHandlerSeenChangeRequests     = registerHandlerRequests.WithLabelValues("seen-change")
	registerHandlerTimeoutRequests        = registerHandlerRequests.WithLabelValues("timeout")
	registerHandlerNoChangeRequests       = registerHandlerRequests.WithLabelValues("no-change")
50 51
)

52
type largeBodyError struct{ error }
53

Kamil Trzcinski's avatar
Kamil Trzcinski committed
54 55
type WatchKeyHandler func(key, value string, timeout time.Duration) (redis.WatchKeyStatus, error)

56 57
func init() {
	prometheus.MustRegister(
58
		registerHandlerRequests,
Kamil Trzcinski's avatar
Kamil Trzcinski committed
59
		registerHandlerOpen,
60 61
	)
}
62

63 64 65
type runnerRequest struct {
	Token      string `json:"token,omitempty"`
	LastUpdate string `json:"last_update,omitempty"`
66 67
}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
68
func readRunnerBody(w http.ResponseWriter, r *http.Request) ([]byte, error) {
69 70
	registerHandlerOpenAtReading.Inc()
	defer registerHandlerOpenAtReading.Dec()
Kamil Trzcinski's avatar
Kamil Trzcinski committed
71 72 73 74

	return helper.ReadRequestBody(w, r, maxRegisterBodySize)
}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
75
func readRunnerRequest(r *http.Request, body []byte) (*runnerRequest, error) {
76
	if !helper.IsApplicationJson(r) {
Kamil Trzcinski's avatar
Kamil Trzcinski committed
77
		return nil, errors.New("invalid content-type received")
78 79
	}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
80
	var runnerRequest runnerRequest
81 82
	err := json.Unmarshal(body, &runnerRequest)
	if err != nil {
Kamil Trzcinski's avatar
Kamil Trzcinski committed
83
		return nil, err
84 85
	}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
86
	return &runnerRequest, nil
87 88
}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
89
func proxyRegisterRequest(h http.Handler, w http.ResponseWriter, r *http.Request) {
90 91
	registerHandlerOpenAtProxying.Inc()
	defer registerHandlerOpenAtProxying.Dec()
Kamil Trzcinski's avatar
Kamil Trzcinski committed
92 93 94 95

	h.ServeHTTP(w, r)
}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
96
func watchForRunnerChange(watchHandler WatchKeyHandler, token, lastUpdate string, duration time.Duration) (redis.WatchKeyStatus, error) {
97 98
	registerHandlerOpenAtWatching.Inc()
	defer registerHandlerOpenAtWatching.Dec()
Kamil Trzcinski's avatar
Kamil Trzcinski committed
99

Kamil Trzcinski's avatar
Kamil Trzcinski committed
100
	return watchHandler(runnerBuildQueue+token, lastUpdate, duration)
Kamil Trzcinski's avatar
Kamil Trzcinski committed
101 102
}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
103
func RegisterHandler(h http.Handler, watchHandler WatchKeyHandler, pollingDuration time.Duration) http.Handler {
104 105 106 107 108
	if pollingDuration == 0 {
		return h
	}

	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
109 110
		w.Header().Set(runnerBuildQueueHeaderKey, runnerBuildQueueHeaderValue)

Kamil Trzcinski's avatar
Kamil Trzcinski committed
111
		requestBody, err := readRunnerBody(w, r)
112
		if err != nil {
113
			registerHandlerBodyReadErrors.Inc()
114
			helper.RequestEntityTooLarge(w, r, &largeBodyError{err})
115 116 117
			return
		}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
118 119
		newRequest := helper.CloneRequestWithNewBody(r, requestBody)

120 121
		runnerRequest, err := readRunnerRequest(r, requestBody)
		if err != nil {
122
			registerHandlerBodyParseErrors.Inc()
Kamil Trzcinski's avatar
Kamil Trzcinski committed
123
			proxyRegisterRequest(h, w, newRequest)
124 125 126
			return
		}

127
		if runnerRequest.Token == "" || runnerRequest.LastUpdate == "" {
128
			registerHandlerMissingValues.Inc()
Kamil Trzcinski's avatar
Kamil Trzcinski committed
129
			proxyRegisterRequest(h, w, newRequest)
130 131 132
			return
		}

Kamil Trzcinski's avatar
Kamil Trzcinski committed
133 134
		result, err := watchForRunnerChange(watchHandler, runnerRequest.Token,
			runnerRequest.LastUpdate, pollingDuration)
135
		if err != nil {
136
			registerHandlerWatchErrors.Inc()
137
			proxyRegisterRequest(h, w, newRequest)
138 139 140 141 142
			return
		}

		switch result {
		// It means that we detected a change before starting watching on change,
Kamil Trzcinski's avatar
Kamil Trzcinski committed
143
		// We proxy request to Rails, to see whether we have a build to receive
144
		case redis.WatchKeyStatusAlreadyChanged:
145
			registerHandlerAlreadyChangedRequests.Inc()
Kamil Trzcinski's avatar
Kamil Trzcinski committed
146
			proxyRegisterRequest(h, w, newRequest)
147 148 149 150 151

		// It means that we detected a change after watching.
		// We could potentially proxy request to Rails, but...
		// We can end-up with unreliable responses,
		// as don't really know whether ResponseWriter is still in a sane state,
Kamil Trzcinski's avatar
Kamil Trzcinski committed
152
		// for example the connection is dead
153
		case redis.WatchKeyStatusSeenChange:
154
			registerHandlerSeenChangeRequests.Inc()
155
			w.WriteHeader(http.StatusNoContent)
156 157 158 159

		// When we receive one of these statuses, it means that we detected no change,
		// so we return to runner 204, which means nothing got changed,
		// and there's no new builds to process
160
		case redis.WatchKeyStatusTimeout:
161
			registerHandlerTimeoutRequests.Inc()
162
			w.WriteHeader(http.StatusNoContent)
163 164

		case redis.WatchKeyStatusNoChange:
165
			registerHandlerNoChangeRequests.Inc()
166
			w.WriteHeader(http.StatusNoContent)
167 168 169
		}
	})
}