Commit 4a23859e authored by Patrick Bajao's avatar Patrick Bajao

Merge branch 'cat-geo-pass-proxied-node-id' into 'master'

Send Geo proxied site data through Workhorse HTTP headers

See merge request gitlab-org/gitlab!82697
parents bd807f0d ed342b12
......@@ -41,7 +41,7 @@ module EE
return super unless ::Gitlab::Geo.secondary_with_primary?
if ::Gitlab::Geo.secondary_with_unified_url? || ::Feature.enabled?(:geo_secondary_proxy_separate_urls, default_enabled: :yaml)
{ geo_proxy_url: ::Gitlab::Geo.primary_node.internal_url }
{ geo_proxy_url: ::Gitlab::Geo.primary_node.internal_url, geo_proxy_extra_data: ::Gitlab::Geo.proxy_extra_data }
else
super
end
......
......@@ -12,6 +12,7 @@ module Gitlab
secondary_nodes
node_enabled
oauth_application
proxy_extra_data
).freeze
API_SCOPE = 'geo_api'
......@@ -32,6 +33,14 @@ module Gitlab
::Geo::JobArtifactReplicator
].freeze
# We "regenerate" an 1hour valid JWT every 30 minutes, resulting in
# a token valid for at least 30 minutes at every given time, even if
# the internal API is not available to serve a new one for up to 30m.
# The primary shouldn't hard fail if this isn't valid, it just doesn't
# know the proxying node.
PROXY_JWT_VALIDITY_PERIOD = 1.hour
PROXY_JWT_CACHE_EXPIRY = 30.minutes
def self.current_node
self.cache_value(:current_node, as: GeoNode) { GeoNode.current_node }
end
......@@ -44,6 +53,24 @@ module Gitlab
self.cache_value(:secondary_nodes, as: GeoNode) { GeoNode.secondary_nodes }
end
def self.proxy_extra_data
self.cache_value(:proxy_extra_data, l2_cache_expires_in: PROXY_JWT_CACHE_EXPIRY) { uncached_proxy_extra_data }
end
def self.uncached_proxy_extra_data
# Extra data that can be computed/sent for all proxied requests.
#
# We're currently only interested in the signing node which can
# be figured out from the signing key, so not sending any actual
# extra data.
data = {}
Gitlab::Geo::SignedData.new(geo_node: self.current_node, validity_period: PROXY_JWT_VALIDITY_PERIOD)
.sign_and_encode_data(data)
rescue GeoNodeNotFoundError, OpenSSL::Cipher::CipherError
nil
end
def self.connected?
# GeoNode#connected? only attempts to use existing DB connections so it can't
# be relied upon in initializers, without this active DB connectivity check.
......@@ -122,10 +149,11 @@ module Gitlab
SafeRequestStore[:geo_l2_cache] ||= Gitlab::JsonCache.new(namespace: :geo, cache_key_strategy: :version)
end
def self.cache_value(raw_key, as: nil, &block)
# We need a short expire time as we can't manually expire on a secondary node
l1_cache.fetch(raw_key, as: as, expires_in: 1.minute) do
l2_cache.fetch(raw_key, as: as, expires_in: 2.minutes) { yield }
# Default to a short expire time as we can't manually expire on a secondary node
# so short-lived or data that can get frequently updated doesn't persist too much
def self.cache_value(raw_key, as: nil, l1_cache_expires_in: 1.minute, l2_cache_expires_in: 2.minutes, &block)
l1_cache.fetch(raw_key, as: as, expires_in: l1_cache_expires_in) do
l2_cache.fetch(raw_key, as: as, expires_in: l2_cache_expires_in) { yield }
end
end
......
......@@ -9,14 +9,14 @@ RSpec.describe Gitlab::Geo, :geo, :request_store do
let_it_be(:primary_node) { create(:geo_node, :primary) }
let_it_be(:secondary_node) { create(:geo_node) }
shared_examples 'a Geo cached value' do |method, key|
shared_examples 'a Geo cached value' do |method, key, expected_l1_expiry: 1.minute, expected_l2_expiry: 2.minutes|
it 'includes GitLab version and Rails.version in the cache key' do
expanded_key = "geo:#{key}:#{Gitlab::VERSION}:#{Rails.version}"
expect(Gitlab::ProcessMemoryCache.cache_backend).to receive(:write)
.with(expanded_key, an_instance_of(String), expires_in: 1.minute).and_call_original
.with(expanded_key, an_instance_of(String), expires_in: expected_l1_expiry).and_call_original
expect(Rails.cache).to receive(:write)
.with(expanded_key, an_instance_of(String), expires_in: 2.minutes)
.with(expanded_key, an_instance_of(String), expires_in: expected_l2_expiry)
described_class.public_send(method)
end
......@@ -48,6 +48,63 @@ RSpec.describe Gitlab::Geo, :geo, :request_store do
it_behaves_like 'a Geo cached value', :secondary_nodes, :secondary_nodes
end
describe '.proxy_extra_data' do
before do
expect(described_class).to receive(:uncached_proxy_extra_data).and_return('proxy extra data')
end
it 'caches the result of .uncached_proxy_extra_data' do
expect(described_class.proxy_extra_data).to be('proxy extra data')
end
it_behaves_like 'a Geo cached value',
:proxy_extra_data,
:proxy_extra_data,
expected_l2_expiry: ::Gitlab::Geo::PROXY_JWT_CACHE_EXPIRY
end
describe '.uncached_proxy_extra_data' do
subject(:extra_data) { described_class.uncached_proxy_extra_data }
context 'without a geo node' do
it { is_expected.to be_nil }
end
context 'with an existing Geo node' do
let(:parsed_access_key) { extra_data.split(':').first }
let(:jwt) { JWT.decode(extra_data.split(':').second, secondary_node.secret_access_key) }
let(:decoded_extra_data) { Gitlab::Json.parse(jwt.first['data']) }
before do
stub_current_geo_node(secondary_node)
end
it 'generates a valid JWT' do
expect(parsed_access_key).to eq(secondary_node.access_key)
expect(decoded_extra_data).to eq({})
end
it 'sets the expected expiration time' do
freeze_time do
expect(jwt.first['exp']).to eq((Time.zone.now + Gitlab::Geo::PROXY_JWT_VALIDITY_PERIOD).to_i)
end
end
end
context 'when signing the JWT token raises errors' do
where(:error) { [Gitlab::Geo::GeoNodeNotFoundError, OpenSSL::Cipher::CipherError] }
with_them do
before do
expect_next_instance_of(Gitlab::Geo::SignedData) do |instance|
expect(instance).to receive(:sign_and_encode_data).and_raise(error)
end
end
it { is_expected.to be_nil }
end
end
end
describe '.primary?' do
context 'when current node is a primary node' do
before do
......
......@@ -679,11 +679,18 @@ RSpec.describe API::Geo do
end
context 'when a primary exists' do
it 'returns the primary internal URL' do
it 'returns the primary internal URL and extra proxy data' do
subject
expect(response).to have_gitlab_http_status(:ok)
expect(json_response['geo_proxy_url']).to match(primary_node.internal_url)
proxy_extra_data = json_response['geo_proxy_extra_data']
jwt = JWT.decode(proxy_extra_data.split(':').second, unified_url_secondary_node.secret_access_key)
extra_data = Gitlab::Json.parse(jwt.first['data'])
expect(proxy_extra_data.split(':').first).to match(unified_url_secondary_node.access_key)
expect(extra_data).to eq({})
end
end
......@@ -729,11 +736,18 @@ RSpec.describe API::Geo do
end
context 'when geo_secondary_proxy_separate_urls feature flag is enabled' do
it 'returns the primary internal URL' do
it 'returns the primary internal URL and extra proxy data' do
subject
expect(response).to have_gitlab_http_status(:ok)
expect(json_response['geo_proxy_url']).to match(primary_node.internal_url)
proxy_extra_data = json_response['geo_proxy_extra_data']
jwt = JWT.decode(proxy_extra_data.split(':').second, secondary_node.secret_access_key)
extra_data = Gitlab::Json.parse(jwt.first['data'])
expect(proxy_extra_data.split(':').first).to match(secondary_node.access_key)
expect(extra_data).to eq({})
end
end
end
......
......@@ -64,7 +64,13 @@ func NewAPI(myURL *url.URL, version string, roundTripper http.RoundTripper) *API
}
type GeoProxyEndpointResponse struct {
GeoProxyURL string `json:"geo_proxy_url"`
GeoProxyURL string `json:"geo_proxy_url"`
GeoProxyExtraData string `json:"geo_proxy_extra_data"`
}
type GeoProxyData struct {
GeoProxyURL *url.URL
GeoProxyExtraData string
}
type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
......@@ -394,7 +400,7 @@ func validResponseContentType(resp *http.Response) bool {
return helper.IsContentType(ResponseContentType, resp.Header.Get("Content-Type"))
}
func (api *API) GetGeoProxyURL() (*url.URL, error) {
func (api *API) GetGeoProxyData() (*GeoProxyData, error) {
geoProxyApiUrl := *api.URL
geoProxyApiUrl.Path, geoProxyApiUrl.RawPath = joinURLPath(api.URL, geoProxyEndpointPath)
geoProxyApiReq := &http.Request{
......@@ -405,23 +411,26 @@ func (api *API) GetGeoProxyURL() (*url.URL, error) {
httpResponse, err := api.doRequestWithoutRedirects(geoProxyApiReq)
if err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: do request: %v", err)
return nil, fmt.Errorf("GetGeoProxyData: do request: %v", err)
}
defer httpResponse.Body.Close()
if httpResponse.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GetGeoProxyURL: Received HTTP status code: %v", httpResponse.StatusCode)
return nil, fmt.Errorf("GetGeoProxyData: Received HTTP status code: %v", httpResponse.StatusCode)
}
response := &GeoProxyEndpointResponse{}
if err := json.NewDecoder(httpResponse.Body).Decode(response); err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: decode response: %v", err)
return nil, fmt.Errorf("GetGeoProxyData: decode response: %v", err)
}
geoProxyURL, err := url.Parse(response.GeoProxyURL)
if err != nil {
return nil, fmt.Errorf("GetGeoProxyURL: Could not parse Geo proxy URL: %v, err: %v", response.GeoProxyURL, err)
return nil, fmt.Errorf("GetGeoProxyData: Could not parse Geo proxy URL: %v, err: %v", response.GeoProxyURL, err)
}
return geoProxyURL, nil
return &GeoProxyData{
GeoProxyURL: geoProxyURL,
GeoProxyExtraData: response.GeoProxyExtraData,
}, nil
}
......@@ -4,7 +4,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"testing"
......@@ -18,21 +17,37 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
)
func TestGetGeoProxyURLWhenGeoSecondary(t *testing.T) {
geoProxyURL, err := getGeoProxyURLGivenResponse(t, `{"geo_proxy_url":"http://primary"}`)
require.NoError(t, err)
require.Equal(t, "http://primary", geoProxyURL.String())
}
func TestGetGeoProxyURLWhenGeoPrimaryOrNonGeo(t *testing.T) {
geoProxyURL, err := getGeoProxyURLGivenResponse(t, "{}")
require.NoError(t, err)
require.Equal(t, "", geoProxyURL.String())
func TestGetGeoProxyDataForResponses(t *testing.T) {
testCases := []struct {
desc string
json string
expectedError bool
expectedURL string
expectedExtraData string
}{
{"when Geo secondary", `{"geo_proxy_url":"http://primary","geo_proxy_extra_data":"geo-data"}`, false, "http://primary", "geo-data"},
{"when Geo secondary with explicit null data", `{"geo_proxy_url":"http://primary","geo_proxy_extra_data":null}`, false, "http://primary", ""},
{"when Geo secondary without extra data", `{"geo_proxy_url":"http://primary"}`, false, "http://primary", ""},
{"when Geo primary or no node", `{}`, false, "", ""},
{"for malformed request", `non-json`, true, "", ""},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
geoProxyData, err := getGeoProxyDataGivenResponse(t, tc.json)
if tc.expectedError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedURL, geoProxyData.GeoProxyURL.String())
require.Equal(t, tc.expectedExtraData, geoProxyData.GeoProxyExtraData)
}
})
}
}
func getGeoProxyURLGivenResponse(t *testing.T, givenInternalApiResponse string) (*url.URL, error) {
func getGeoProxyDataGivenResponse(t *testing.T, givenInternalApiResponse string) (*GeoProxyData, error) {
t.Helper()
ts := testRailsServer(regexp.MustCompile(`/api/v4/geo/proxy`), 200, givenInternalApiResponse)
defer ts.Close()
......@@ -43,9 +58,9 @@ func getGeoProxyURLGivenResponse(t *testing.T, givenInternalApiResponse string)
apiClient := NewAPI(backend, version, rt)
geoProxyURL, err := apiClient.GetGeoProxyURL()
geoProxyData, err := apiClient.GetGeoProxyData()
return geoProxyURL, err
return geoProxyData, err
}
func testRailsServer(url *regexp.Regexp, code int, body string) *httptest.Server {
......
......@@ -37,7 +37,6 @@ var (
upload.RewrittenFieldsHeader,
}
geoProxyApiPollingInterval = 10 * time.Second
geoProxyWorkhorseHeaders = map[string]string{"Gitlab-Workhorse-Geo-Proxy": "1"}
)
type upstream struct {
......@@ -48,6 +47,7 @@ type upstream struct {
CableRoundTripper http.RoundTripper
APIClient *apipkg.API
geoProxyBackend *url.URL
geoProxyExtraData string
geoLocalRoutes []routeEntry
geoProxyCableRoute routeEntry
geoProxyRoute routeEntry
......@@ -215,28 +215,44 @@ func (u *upstream) pollGeoProxyAPI() {
// Calls /api/v4/geo/proxy and sets up routes
func (u *upstream) callGeoProxyAPI() {
geoProxyURL, err := u.APIClient.GetGeoProxyURL()
geoProxyData, err := u.APIClient.GetGeoProxyData()
if err != nil {
log.WithError(err).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Error("Geo Proxy: Unable to determine Geo Proxy URL. Fallback on cached value.")
return
}
if u.geoProxyBackend.String() != geoProxyURL.String() {
log.WithFields(log.Fields{"oldGeoProxyURL": u.geoProxyBackend, "newGeoProxyURL": geoProxyURL}).Info("Geo Proxy: URL changed")
u.updateGeoProxyFields(geoProxyURL)
hasProxyDataChanged := false
if u.geoProxyBackend.String() != geoProxyData.GeoProxyURL.String() {
log.WithFields(log.Fields{"oldGeoProxyURL": u.geoProxyBackend, "newGeoProxyURL": geoProxyData.GeoProxyURL}).Info("Geo Proxy: URL changed")
hasProxyDataChanged = true
}
if u.geoProxyExtraData != geoProxyData.GeoProxyExtraData {
// extra data is usually a JWT, thus not explicitly logging it
log.Info("Geo Proxy: signed data changed")
hasProxyDataChanged = true
}
if hasProxyDataChanged {
u.updateGeoProxyFieldsFromData(geoProxyData)
}
}
func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) {
func (u *upstream) updateGeoProxyFieldsFromData(geoProxyData *apipkg.GeoProxyData) {
u.mu.Lock()
defer u.mu.Unlock()
u.geoProxyBackend = geoProxyURL
u.geoProxyBackend = geoProxyData.GeoProxyURL
u.geoProxyExtraData = geoProxyData.GeoProxyExtraData
if u.geoProxyBackend.String() == "" {
return
}
geoProxyWorkhorseHeaders := map[string]string{
"Gitlab-Workhorse-Geo-Proxy": "1",
"Gitlab-Workhorse-Geo-Proxy-Extra-Data": u.geoProxyExtraData,
}
geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode)
geoProxyUpstream := proxypkg.NewProxy(
u.geoProxyBackend,
......
......@@ -209,21 +209,74 @@ func TestGeoProxyFeatureEnablingAndDisabling(t *testing.T) {
runTestCases(t, ws, testCasesProxied)
}
func TestGeoProxySetsCustomHeader(t *testing.T) {
func TestGeoProxyUpdatesExtraDataWhenChanged(t *testing.T) {
var expectedGeoProxyExtraData string
remoteServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "1", r.Header.Get("Gitlab-Workhorse-Geo-Proxy"), "custom proxy header")
require.Equal(t, expectedGeoProxyExtraData, r.Header.Get("Gitlab-Workhorse-Geo-Proxy-Extra-Data"), "custom extra data header")
w.WriteHeader(http.StatusOK)
}))
defer remoteServer.Close()
geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
geoProxyEndpointExtraData1 := fmt.Sprintf(`{"geo_proxy_url":"%v","geo_proxy_extra_data":"data1"}`, remoteServer.URL)
geoProxyEndpointExtraData2 := fmt.Sprintf(`{"geo_proxy_url":"%v","geo_proxy_extra_data":"data2"}`, remoteServer.URL)
geoProxyEndpointExtraData3 := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
geoProxyEndpointResponseBody := geoProxyEndpointExtraData1
expectedGeoProxyExtraData = "data1"
railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
ws, wsDeferredClose, waitForNextApiPoll := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
http.Get(ws.URL)
// Verify that the expected header changes after next updated poll.
geoProxyEndpointResponseBody = geoProxyEndpointExtraData2
expectedGeoProxyExtraData = "data2"
waitForNextApiPoll()
http.Get(ws.URL)
// Validate that non-existing extra data results in empty header
geoProxyEndpointResponseBody = geoProxyEndpointExtraData3
expectedGeoProxyExtraData = ""
waitForNextApiPoll()
http.Get(ws.URL)
}
func TestGeoProxySetsCustomHeader(t *testing.T) {
testCases := []struct {
desc string
json string
extraData string
}{
{"no extra data", `{"geo_proxy_url":"%v"}`, ""},
{"with extra data", `{"geo_proxy_url":"%v","geo_proxy_extra_data":"extra-geo-data"}`, "extra-geo-data"},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
remoteServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "1", r.Header.Get("Gitlab-Workhorse-Geo-Proxy"), "custom proxy header")
require.Equal(t, tc.extraData, r.Header.Get("Gitlab-Workhorse-Geo-Proxy-Extra-Data"), "custom proxy extra data header")
w.WriteHeader(http.StatusOK)
}))
defer remoteServer.Close()
geoProxyEndpointResponseBody := fmt.Sprintf(tc.json, remoteServer.URL)
railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
http.Get(ws.URL)
})
}
}
func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment