Commit ed342b12 authored by Catalin Irimie's avatar Catalin Irimie

Send Geo proxied extra data through Workhorse HTTP headers

We need to find at runtime which node is proxying the current
request on the primary site, so we are also saving and sending
extra JWT-signed data through headers in Workhorse, similar to
the already existing "is proxied or not" header.
parent 25be580b
......@@ -41,7 +41,7 @@ module EE
return super unless ::Gitlab::Geo.secondary_with_primary?
if ::Gitlab::Geo.secondary_with_unified_url? || ::Feature.enabled?(:geo_secondary_proxy_separate_urls, default_enabled: :yaml)
{ geo_proxy_url: ::Gitlab::Geo.primary_node.internal_url }
{ geo_proxy_url: ::Gitlab::Geo.primary_node.internal_url, geo_proxy_extra_data: ::Gitlab::Geo.proxy_extra_data }
else
super
end
......
......@@ -12,6 +12,7 @@ module Gitlab
secondary_nodes
node_enabled
oauth_application
proxy_extra_data
).freeze
API_SCOPE = 'geo_api'
......@@ -32,6 +33,14 @@ module Gitlab
::Geo::JobArtifactReplicator
].freeze
# We "regenerate" an 1hour valid JWT every 30 minutes, resulting in
# a token valid for at least 30 minutes at every given time, even if
# the internal API is not available to serve a new one for up to 30m.
# The primary shouldn't hard fail if this isn't valid, it just doesn't
# know the proxying node.
PROXY_JWT_VALIDITY_PERIOD = 1.hour
PROXY_JWT_CACHE_EXPIRY = 30.minutes
def self.current_node
self.cache_value(:current_node, as: GeoNode) { GeoNode.current_node }
end
......@@ -44,6 +53,24 @@ module Gitlab
self.cache_value(:secondary_nodes, as: GeoNode) { GeoNode.secondary_nodes }
end
def self.proxy_extra_data
self.cache_value(:proxy_extra_data, l2_cache_expires_in: PROXY_JWT_CACHE_EXPIRY) { uncached_proxy_extra_data }
end
def self.uncached_proxy_extra_data
# Extra data that can be computed/sent for all proxied requests.
#
# We're currently only interested in the signing node which can
# be figured out from the signing key, so not sending any actual
# extra data.
data = {}
Gitlab::Geo::SignedData.new(geo_node: self.current_node, validity_period: PROXY_JWT_VALIDITY_PERIOD)
.sign_and_encode_data(data)
rescue GeoNodeNotFoundError, OpenSSL::Cipher::CipherError
nil
end
def self.connected?
# GeoNode#connected? only attempts to use existing DB connections so it can't
# be relied upon in initializers, without this active DB connectivity check.
......@@ -122,10 +149,11 @@ module Gitlab
SafeRequestStore[:geo_l2_cache] ||= Gitlab::JsonCache.new(namespace: :geo, cache_key_strategy: :version)
end
def self.cache_value(raw_key, as: nil, &block)
# We need a short expire time as we can't manually expire on a secondary node
l1_cache.fetch(raw_key, as: as, expires_in: 1.minute) do
l2_cache.fetch(raw_key, as: as, expires_in: 2.minutes) { yield }
# Default to a short expire time as we can't manually expire on a secondary node
# so short-lived or data that can get frequently updated doesn't persist too much
def self.cache_value(raw_key, as: nil, l1_cache_expires_in: 1.minute, l2_cache_expires_in: 2.minutes, &block)
l1_cache.fetch(raw_key, as: as, expires_in: l1_cache_expires_in) do
l2_cache.fetch(raw_key, as: as, expires_in: l2_cache_expires_in) { yield }
end
end
......
......@@ -9,14 +9,14 @@ RSpec.describe Gitlab::Geo, :geo, :request_store do
let_it_be(:primary_node) { create(:geo_node, :primary) }
let_it_be(:secondary_node) { create(:geo_node) }
shared_examples 'a Geo cached value' do |method, key|
shared_examples 'a Geo cached value' do |method, key, expected_l1_expiry: 1.minute, expected_l2_expiry: 2.minutes|
it 'includes GitLab version and Rails.version in the cache key' do
expanded_key = "geo:#{key}:#{Gitlab::VERSION}:#{Rails.version}"
expect(Gitlab::ProcessMemoryCache.cache_backend).to receive(:write)
.with(expanded_key, an_instance_of(String), expires_in: 1.minute).and_call_original
.with(expanded_key, an_instance_of(String), expires_in: expected_l1_expiry).and_call_original
expect(Rails.cache).to receive(:write)
.with(expanded_key, an_instance_of(String), expires_in: 2.minutes)
.with(expanded_key, an_instance_of(String), expires_in: expected_l2_expiry)
described_class.public_send(method)
end
......@@ -48,6 +48,63 @@ RSpec.describe Gitlab::Geo, :geo, :request_store do
it_behaves_like 'a Geo cached value', :secondary_nodes, :secondary_nodes
end
describe '.proxy_extra_data' do
before do
expect(described_class).to receive(:uncached_proxy_extra_data).and_return('proxy extra data')
end
it 'caches the result of .uncached_proxy_extra_data' do
expect(described_class.proxy_extra_data).to be('proxy extra data')
end
it_behaves_like 'a Geo cached value',
:proxy_extra_data,
:proxy_extra_data,
expected_l2_expiry: ::Gitlab::Geo::PROXY_JWT_CACHE_EXPIRY
end
describe '.uncached_proxy_extra_data' do
subject(:extra_data) { described_class.uncached_proxy_extra_data }
context 'without a geo node' do
it { is_expected.to be_nil }
end
context 'with an existing Geo node' do
let(:parsed_access_key) { extra_data.split(':').first }
let(:jwt) { JWT.decode(extra_data.split(':').second, secondary_node.secret_access_key) }
let(:decoded_extra_data) { Gitlab::Json.parse(jwt.first['data']) }
before do
stub_current_geo_node(secondary_node)
end
it 'generates a valid JWT' do
expect(parsed_access_key).to eq(secondary_node.access_key)
expect(decoded_extra_data).to eq({})
end
it 'sets the expected expiration time' do
freeze_time do
expect(jwt.first['exp']).to eq((Time.zone.now + Gitlab::Geo::PROXY_JWT_VALIDITY_PERIOD).to_i)
end
end
end
context 'when signing the JWT token raises errors' do
where(:error) { [Gitlab::Geo::GeoNodeNotFoundError, OpenSSL::Cipher::CipherError] }
with_them do
before do
expect_next_instance_of(Gitlab::Geo::SignedData) do |instance|
expect(instance).to receive(:sign_and_encode_data).and_raise(error)
end
end
it { is_expected.to be_nil }
end
end
end
describe '.primary?' do
context 'when current node is a primary node' do
before do
......
......@@ -679,11 +679,18 @@ RSpec.describe API::Geo do
end
context 'when a primary exists' do
it 'returns the primary internal URL' do
it 'returns the primary internal URL and extra proxy data' do
subject
expect(response).to have_gitlab_http_status(:ok)
expect(json_response['geo_proxy_url']).to match(primary_node.internal_url)
proxy_extra_data = json_response['geo_proxy_extra_data']
jwt = JWT.decode(proxy_extra_data.split(':').second, unified_url_secondary_node.secret_access_key)
extra_data = Gitlab::Json.parse(jwt.first['data'])
expect(proxy_extra_data.split(':').first).to match(unified_url_secondary_node.access_key)
expect(extra_data).to eq({})
end
end
......@@ -729,11 +736,18 @@ RSpec.describe API::Geo do
end
context 'when geo_secondary_proxy_separate_urls feature flag is enabled' do
it 'returns the primary internal URL' do
it 'returns the primary internal URL and extra proxy data' do
subject
expect(response).to have_gitlab_http_status(:ok)
expect(json_response['geo_proxy_url']).to match(primary_node.internal_url)
proxy_extra_data = json_response['geo_proxy_extra_data']
jwt = JWT.decode(proxy_extra_data.split(':').second, secondary_node.secret_access_key)
extra_data = Gitlab::Json.parse(jwt.first['data'])
expect(proxy_extra_data.split(':').first).to match(secondary_node.access_key)
expect(extra_data).to eq({})
end
end
end
......
......@@ -64,7 +64,13 @@ func NewAPI(myURL *url.URL, version string, roundTripper http.RoundTripper) *API
}
type GeoProxyEndpointResponse struct {
GeoProxyURL string `json:"geo_proxy_url"`
GeoProxyURL string `json:"geo_proxy_url"`
GeoProxyExtraData string `json:"geo_proxy_extra_data"`
}
type GeoProxyData struct {
GeoProxyURL *url.URL
GeoProxyExtraData string
}
type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
......@@ -394,7 +400,7 @@ func validResponseContentType(resp *http.Response) bool {
return helper.IsContentType(ResponseContentType, resp.Header.Get("Content-Type"))
}
func (api *API) GetGeoProxyURL() (*url.URL, error) {
func (api *API) GetGeoProxyData() (*GeoProxyData, error) {
geoProxyApiUrl := *api.URL
geoProxyApiUrl.Path, geoProxyApiUrl.RawPath = joinURLPath(api.URL, geoProxyEndpointPath)
geoProxyApiReq := &http.Request{
......@@ -405,23 +411,26 @@ func (api *API) GetGeoProxyURL() (*url.URL, error) {
httpResponse, err := api.doRequestWithoutRedirects(geoProxyApiReq)
if err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: do request: %v", err)
return nil, fmt.Errorf("GetGeoProxyData: do request: %v", err)
}
defer httpResponse.Body.Close()
if httpResponse.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GetGeoProxyURL: Received HTTP status code: %v", httpResponse.StatusCode)
return nil, fmt.Errorf("GetGeoProxyData: Received HTTP status code: %v", httpResponse.StatusCode)
}
response := &GeoProxyEndpointResponse{}
if err := json.NewDecoder(httpResponse.Body).Decode(response); err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: decode response: %v", err)
return nil, fmt.Errorf("GetGeoProxyData: decode response: %v", err)
}
geoProxyURL, err := url.Parse(response.GeoProxyURL)
if err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: Could not parse Geo proxy URL: %v, err: %v", response.GeoProxyURL, err)
return nil, fmt.Errorf("GetGeoProxyData: Could not parse Geo proxy URL: %v, err: %v", response.GeoProxyURL, err)
}
return geoProxyURL, nil
return &GeoProxyData{
GeoProxyURL: geoProxyURL,
GeoProxyExtraData: response.GeoProxyExtraData,
}, nil
}
......@@ -4,7 +4,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"testing"
......@@ -18,21 +17,37 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
)
func TestGetGeoProxyURLWhenGeoSecondary(t *testing.T) {
geoProxyURL, err := getGeoProxyURLGivenResponse(t, `{"geo_proxy_url":"http://primary"}`)
require.NoError(t, err)
require.Equal(t, "http://primary", geoProxyURL.String())
}
func TestGetGeoProxyURLWhenGeoPrimaryOrNonGeo(t *testing.T) {
geoProxyURL, err := getGeoProxyURLGivenResponse(t, "{}")
require.NoError(t, err)
require.Equal(t, "", geoProxyURL.String())
func TestGetGeoProxyDataForResponses(t *testing.T) {
testCases := []struct {
desc string
json string
expectedError bool
expectedURL string
expectedExtraData string
}{
{"when Geo secondary", `{"geo_proxy_url":"http://primary","geo_proxy_extra_data":"geo-data"}`, false, "http://primary", "geo-data"},
{"when Geo secondary with explicit null data", `{"geo_proxy_url":"http://primary","geo_proxy_extra_data":null}`, false, "http://primary", ""},
{"when Geo secondary without extra data", `{"geo_proxy_url":"http://primary"}`, false, "http://primary", ""},
{"when Geo primary or no node", `{}`, false, "", ""},
{"for malformed request", `non-json`, true, "", ""},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
geoProxyData, err := getGeoProxyDataGivenResponse(t, tc.json)
if tc.expectedError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedURL, geoProxyData.GeoProxyURL.String())
require.Equal(t, tc.expectedExtraData, geoProxyData.GeoProxyExtraData)
}
})
}
}
func getGeoProxyURLGivenResponse(t *testing.T, givenInternalApiResponse string) (*url.URL, error) {
func getGeoProxyDataGivenResponse(t *testing.T, givenInternalApiResponse string) (*GeoProxyData, error) {
t.Helper()
ts := testRailsServer(regexp.MustCompile(`/api/v4/geo/proxy`), 200, givenInternalApiResponse)
defer ts.Close()
......@@ -43,9 +58,9 @@ func getGeoProxyURLGivenResponse(t *testing.T, givenInternalApiResponse string)
apiClient := NewAPI(backend, version, rt)
geoProxyURL, err := apiClient.GetGeoProxyURL()
geoProxyData, err := apiClient.GetGeoProxyData()
return geoProxyURL, err
return geoProxyData, err
}
func testRailsServer(url *regexp.Regexp, code int, body string) *httptest.Server {
......
......@@ -37,7 +37,6 @@ var (
upload.RewrittenFieldsHeader,
}
geoProxyApiPollingInterval = 10 * time.Second
geoProxyWorkhorseHeaders = map[string]string{"Gitlab-Workhorse-Geo-Proxy": "1"}
)
type upstream struct {
......@@ -48,6 +47,7 @@ type upstream struct {
CableRoundTripper http.RoundTripper
APIClient *apipkg.API
geoProxyBackend *url.URL
geoProxyExtraData string
geoLocalRoutes []routeEntry
geoProxyCableRoute routeEntry
geoProxyRoute routeEntry
......@@ -215,28 +215,44 @@ func (u *upstream) pollGeoProxyAPI() {
// Calls /api/v4/geo/proxy and sets up routes
func (u *upstream) callGeoProxyAPI() {
geoProxyURL, err := u.APIClient.GetGeoProxyURL()
geoProxyData, err := u.APIClient.GetGeoProxyData()
if err != nil {
log.WithError(err).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Error("Geo Proxy: Unable to determine Geo Proxy URL. Fallback on cached value.")
return
}
if u.geoProxyBackend.String() != geoProxyURL.String() {
log.WithFields(log.Fields{"oldGeoProxyURL": u.geoProxyBackend, "newGeoProxyURL": geoProxyURL}).Info("Geo Proxy: URL changed")
u.updateGeoProxyFields(geoProxyURL)
hasProxyDataChanged := false
if u.geoProxyBackend.String() != geoProxyData.GeoProxyURL.String() {
log.WithFields(log.Fields{"oldGeoProxyURL": u.geoProxyBackend, "newGeoProxyURL": geoProxyData.GeoProxyURL}).Info("Geo Proxy: URL changed")
hasProxyDataChanged = true
}
if u.geoProxyExtraData != geoProxyData.GeoProxyExtraData {
// extra data is usually a JWT, thus not explicitly logging it
log.Info("Geo Proxy: signed data changed")
hasProxyDataChanged = true
}
if hasProxyDataChanged {
u.updateGeoProxyFieldsFromData(geoProxyData)
}
}
func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) {
func (u *upstream) updateGeoProxyFieldsFromData(geoProxyData *apipkg.GeoProxyData) {
u.mu.Lock()
defer u.mu.Unlock()
u.geoProxyBackend = geoProxyURL
u.geoProxyBackend = geoProxyData.GeoProxyURL
u.geoProxyExtraData = geoProxyData.GeoProxyExtraData
if u.geoProxyBackend.String() == "" {
return
}
geoProxyWorkhorseHeaders := map[string]string{
"Gitlab-Workhorse-Geo-Proxy": "1",
"Gitlab-Workhorse-Geo-Proxy-Extra-Data": u.geoProxyExtraData,
}
geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode)
geoProxyUpstream := proxypkg.NewProxy(
u.geoProxyBackend,
......
......@@ -209,21 +209,74 @@ func TestGeoProxyFeatureEnablingAndDisabling(t *testing.T) {
runTestCases(t, ws, testCasesProxied)
}
func TestGeoProxySetsCustomHeader(t *testing.T) {
func TestGeoProxyUpdatesExtraDataWhenChanged(t *testing.T) {
var expectedGeoProxyExtraData string
remoteServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "1", r.Header.Get("Gitlab-Workhorse-Geo-Proxy"), "custom proxy header")
require.Equal(t, expectedGeoProxyExtraData, r.Header.Get("Gitlab-Workhorse-Geo-Proxy-Extra-Data"), "custom extra data header")
w.WriteHeader(http.StatusOK)
}))
defer remoteServer.Close()
geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
geoProxyEndpointExtraData1 := fmt.Sprintf(`{"geo_proxy_url":"%v","geo_proxy_extra_data":"data1"}`, remoteServer.URL)
geoProxyEndpointExtraData2 := fmt.Sprintf(`{"geo_proxy_url":"%v","geo_proxy_extra_data":"data2"}`, remoteServer.URL)
geoProxyEndpointExtraData3 := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
geoProxyEndpointResponseBody := geoProxyEndpointExtraData1
expectedGeoProxyExtraData = "data1"
railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
ws, wsDeferredClose, waitForNextApiPoll := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
http.Get(ws.URL)
// Verify that the expected header changes after next updated poll.
geoProxyEndpointResponseBody = geoProxyEndpointExtraData2
expectedGeoProxyExtraData = "data2"
waitForNextApiPoll()
http.Get(ws.URL)
// Validate that non-existing extra data results in empty header
geoProxyEndpointResponseBody = geoProxyEndpointExtraData3
expectedGeoProxyExtraData = ""
waitForNextApiPoll()
http.Get(ws.URL)
}
func TestGeoProxySetsCustomHeader(t *testing.T) {
testCases := []struct {
desc string
json string
extraData string
}{
{"no extra data", `{"geo_proxy_url":"%v"}`, ""},
{"with extra data", `{"geo_proxy_url":"%v","geo_proxy_extra_data":"extra-geo-data"}`, "extra-geo-data"},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
remoteServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "1", r.Header.Get("Gitlab-Workhorse-Geo-Proxy"), "custom proxy header")
require.Equal(t, tc.extraData, r.Header.Get("Gitlab-Workhorse-Geo-Proxy-Extra-Data"), "custom proxy extra data header")
w.WriteHeader(http.StatusOK)
}))
defer remoteServer.Close()
geoProxyEndpointResponseBody := fmt.Sprintf(tc.json, remoteServer.URL)
railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
http.Get(ws.URL)
})
}
}
func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment