Commit c9c9a36a authored by Igor Drozdov's avatar Igor Drozdov

Implement Dependency proxy via Workhorse injectors

Changelog: changed
parent c77e8c13
...@@ -5,11 +5,15 @@ class Groups::DependencyProxyForContainersController < ::Groups::DependencyProxy ...@@ -5,11 +5,15 @@ class Groups::DependencyProxyForContainersController < ::Groups::DependencyProxy
include DependencyProxy::GroupAccess include DependencyProxy::GroupAccess
include SendFileUpload include SendFileUpload
include ::PackagesHelper # for event tracking include ::PackagesHelper # for event tracking
include WorkhorseRequest
before_action :ensure_group before_action :ensure_group
before_action :ensure_token_granted! before_action :ensure_token_granted!, only: [:blob, :manifest]
before_action :ensure_feature_enabled! before_action :ensure_feature_enabled!
before_action :verify_workhorse_api!, only: [:authorize_upload_blob, :upload_blob]
skip_before_action :verify_authenticity_token, only: [:authorize_upload_blob, :upload_blob]
attr_reader :token attr_reader :token
feature_category :dependency_proxy feature_category :dependency_proxy
...@@ -38,6 +42,8 @@ class Groups::DependencyProxyForContainersController < ::Groups::DependencyProxy ...@@ -38,6 +42,8 @@ class Groups::DependencyProxyForContainersController < ::Groups::DependencyProxy
end end
def blob def blob
return blob_via_workhorse if Feature.enabled?(:dependency_proxy_workhorse, group, default_enabled: :yaml)
result = DependencyProxy::FindOrCreateBlobService result = DependencyProxy::FindOrCreateBlobService
.new(group, image, token, params[:sha]).execute .new(group, image, token, params[:sha]).execute
...@@ -50,11 +56,47 @@ class Groups::DependencyProxyForContainersController < ::Groups::DependencyProxy ...@@ -50,11 +56,47 @@ class Groups::DependencyProxyForContainersController < ::Groups::DependencyProxy
end end
end end
def authorize_upload_blob
set_workhorse_internal_api_content_type
render json: DependencyProxy::FileUploader.workhorse_authorize(has_length: false)
end
def upload_blob
@group.dependency_proxy_blobs.create!(
file_name: blob_file_name,
file: params[:file],
size: params[:file].size
)
event_name = tracking_event_name(object_type: :blob, from_cache: false)
track_package_event(event_name, :dependency_proxy, namespace: group, user: auth_user)
head :ok
end
private private
def blob_via_workhorse
blob = @group.dependency_proxy_blobs.find_by_file_name(blob_file_name)
if blob.present?
event_name = tracking_event_name(object_type: :blob, from_cache: true)
track_package_event(event_name, :dependency_proxy, namespace: group, user: auth_user)
send_upload(blob.file)
else
send_dependency(token, DependencyProxy::Registry.blob_url(image, params[:sha]), blob_file_name)
end
end
def blob_file_name
@blob_file_name ||= params[:sha].sub('sha256:', '') + '.gz'
end
def group def group
strong_memoize(:group) do strong_memoize(:group) do
Group.find_by_full_path(params[:group_id], follow_redirects: request.get?) Group.find_by_full_path(params[:group_id], follow_redirects: true)
end end
end end
......
...@@ -41,6 +41,15 @@ module WorkhorseHelper ...@@ -41,6 +41,15 @@ module WorkhorseHelper
head :ok head :ok
end end
def send_dependency(token, url, filename)
headers.store(*Gitlab::Workhorse.send_dependency(token, url))
headers['Content-Disposition'] =
ActionDispatch::Http::ContentDisposition.format(disposition: 'attachment', filename: filename)
headers['Content-Type'] = 'application/gzip'
head :ok
end
def set_workhorse_internal_api_content_type def set_workhorse_internal_api_content_type
headers['Content-Type'] = Gitlab::Workhorse::INTERNAL_API_CONTENT_TYPE headers['Content-Type'] = Gitlab::Workhorse::INTERNAL_API_CONTENT_TYPE
end end
......
# frozen_string_literal: true # frozen_string_literal: true
class DependencyProxy::FileUploader < GitlabUploader class DependencyProxy::FileUploader < GitlabUploader
extend Workhorse::UploadPath
include ObjectStorage::Concern include ObjectStorage::Concern
before :cache, :set_content_type before :cache, :set_content_type
......
---
name: dependency_proxy_workhorse
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68157
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/339639
milestone: '14.3'
type: development
group: group::source code
default_enabled: false
...@@ -146,5 +146,7 @@ scope format: false do ...@@ -146,5 +146,7 @@ scope format: false do
constraints image: Gitlab::PathRegex.container_image_regex, sha: Gitlab::PathRegex.container_image_blob_sha_regex do constraints image: Gitlab::PathRegex.container_image_regex, sha: Gitlab::PathRegex.container_image_blob_sha_regex do
get 'v2/*group_id/dependency_proxy/containers/*image/manifests/*tag' => 'groups/dependency_proxy_for_containers#manifest' # rubocop:todo Cop/PutGroupRoutesUnderScope get 'v2/*group_id/dependency_proxy/containers/*image/manifests/*tag' => 'groups/dependency_proxy_for_containers#manifest' # rubocop:todo Cop/PutGroupRoutesUnderScope
get 'v2/*group_id/dependency_proxy/containers/*image/blobs/:sha' => 'groups/dependency_proxy_for_containers#blob' # rubocop:todo Cop/PutGroupRoutesUnderScope get 'v2/*group_id/dependency_proxy/containers/*image/blobs/:sha' => 'groups/dependency_proxy_for_containers#blob' # rubocop:todo Cop/PutGroupRoutesUnderScope
post 'v2/*group_id/dependency_proxy/containers/*image/blobs/:sha/upload/authorize' => 'groups/dependency_proxy_for_containers#authorize_upload_blob' # rubocop:todo Cop/PutGroupRoutesUnderScope
post 'v2/*group_id/dependency_proxy/containers/*image/blobs/:sha/upload' => 'groups/dependency_proxy_for_containers#upload_blob' # rubocop:todo Cop/PutGroupRoutesUnderScope
end end
end end
...@@ -99,21 +99,14 @@ RSpec.describe Groups::DependencyProxyForContainersController do ...@@ -99,21 +99,14 @@ RSpec.describe Groups::DependencyProxyForContainersController do
end end
describe 'GET #blob' do describe 'GET #blob' do
let_it_be(:blob) { create(:dependency_proxy_blob) } let_it_be(:blob) { create(:dependency_proxy_blob, group: group) }
let(:blob_sha) { blob.file_name.sub('.gz', '') } let(:blob_sha) { blob.file_name.sub('.gz', '') }
let(:blob_response) { { status: :success, blob: blob, from_cache: false } }
subject(:get_blob) do subject(:get_blob) do
get :blob, params: { group_id: group.to_param, image: 'alpine', sha: blob_sha } get :blob, params: { group_id: group.to_param, image: 'alpine', sha: blob_sha }
end end
before do
allow_next_instance_of(DependencyProxy::FindOrCreateBlobService) do |instance|
allow(instance).to receive(:execute).and_return(blob_response)
end
end
it_behaves_like 'when sso is enabled for the group', 'a successful blob pull' it_behaves_like 'when sso is enabled for the group', 'a successful blob pull'
end end
end end
...@@ -158,6 +158,7 @@ module Gitlab ...@@ -158,6 +158,7 @@ module Gitlab
::Gitlab.config.uploads.storage_path, ::Gitlab.config.uploads.storage_path,
::JobArtifactUploader.workhorse_upload_path, ::JobArtifactUploader.workhorse_upload_path,
::LfsObjectUploader.workhorse_upload_path, ::LfsObjectUploader.workhorse_upload_path,
::DependencyProxy::FileUploader.workhorse_upload_path,
File.join(Rails.root, 'public/uploads/tmp') File.join(Rails.root, 'public/uploads/tmp')
] + package_allowed_paths ] + package_allowed_paths
end end
......
...@@ -169,6 +169,18 @@ module Gitlab ...@@ -169,6 +169,18 @@ module Gitlab
] ]
end end
def send_dependency(token, url)
params = {
'Header' => { Authorization: ["Bearer #{token}"] },
'Url' => url
}
[
SEND_DATA_HEADER,
"send-dependency:#{encode(params)}"
]
end
def channel_websocket(channel) def channel_websocket(channel)
details = { details = {
'Channel' => { 'Channel' => {
......
...@@ -5,6 +5,7 @@ require 'spec_helper' ...@@ -5,6 +5,7 @@ require 'spec_helper'
RSpec.describe Groups::DependencyProxyForContainersController do RSpec.describe Groups::DependencyProxyForContainersController do
include HttpBasicAuthHelpers include HttpBasicAuthHelpers
include DependencyProxyHelpers include DependencyProxyHelpers
include WorkhorseHelpers
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be_with_reload(:group) { create(:group, :private) } let_it_be_with_reload(:group) { create(:group, :private) }
...@@ -242,16 +243,9 @@ RSpec.describe Groups::DependencyProxyForContainersController do ...@@ -242,16 +243,9 @@ RSpec.describe Groups::DependencyProxyForContainersController do
end end
describe 'GET #blob' do describe 'GET #blob' do
let_it_be(:blob) { create(:dependency_proxy_blob) } let(:blob) { create(:dependency_proxy_blob, group: group) }
let(:blob_sha) { blob.file_name.sub('.gz', '') } let(:blob_sha) { blob.file_name.sub('.gz', '') }
let(:blob_response) { { status: :success, blob: blob, from_cache: false } }
before do
allow_next_instance_of(DependencyProxy::FindOrCreateBlobService) do |instance|
allow(instance).to receive(:execute).and_return(blob_response)
end
end
subject { get_blob } subject { get_blob }
...@@ -264,6 +258,64 @@ RSpec.describe Groups::DependencyProxyForContainersController do ...@@ -264,6 +258,64 @@ RSpec.describe Groups::DependencyProxyForContainersController do
it_behaves_like 'without permission' it_behaves_like 'without permission'
it_behaves_like 'feature flag disabled with private group' it_behaves_like 'feature flag disabled with private group'
context 'a valid user' do
before do
group.add_guest(user)
end
it_behaves_like 'a successful blob pull'
it_behaves_like 'a package tracking event', described_class.name, 'pull_blob_from_cache'
context 'when cache entry does not exist' do
let(:blob_sha) { 'a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4' }
it 'returns Workhorse send-dependency instructions' do
subject
send_data_type, send_data = workhorse_send_data
header, url = send_data.values_at('Header', 'Url')
expect(send_data_type).to eq('send-dependency')
expect(header).to eq("Authorization" => ["Bearer abcd1234"])
expect(url).to eq(DependencyProxy::Registry.blob_url('alpine', blob_sha))
expect(response.headers['Content-Type']).to eq('application/gzip')
expect(response.headers['Content-Disposition']).to eq(
ActionDispatch::Http::ContentDisposition.format(disposition: 'attachment', filename: blob.file_name)
)
end
end
end
context 'a valid deploy token' do
let_it_be(:user) { create(:deploy_token, :group, :dependency_proxy_scopes) }
let_it_be(:group_deploy_token) { create(:group_deploy_token, deploy_token: user, group: group) }
it_behaves_like 'a successful blob pull'
context 'pulling from a subgroup' do
let_it_be_with_reload(:parent_group) { create(:group) }
let_it_be_with_reload(:group) { create(:group, parent: parent_group) }
before do
parent_group.create_dependency_proxy_setting!(enabled: true)
group_deploy_token.update_column(:group_id, parent_group.id)
end
it_behaves_like 'a successful blob pull'
end
end
context 'when dependency_proxy_workhorse disabled' do
let(:blob_response) { { status: :success, blob: blob, from_cache: false } }
before do
stub_feature_flags(dependency_proxy_workhorse: false)
allow_next_instance_of(DependencyProxy::FindOrCreateBlobService) do |instance|
allow(instance).to receive(:execute).and_return(blob_response)
end
end
context 'remote blob request fails' do context 'remote blob request fails' do
let(:blob_response) do let(:blob_response) do
{ {
...@@ -320,6 +372,7 @@ RSpec.describe Groups::DependencyProxyForContainersController do ...@@ -320,6 +372,7 @@ RSpec.describe Groups::DependencyProxyForContainersController do
end end
end end
end end
end
it_behaves_like 'not found when disabled' it_behaves_like 'not found when disabled'
...@@ -328,6 +381,61 @@ RSpec.describe Groups::DependencyProxyForContainersController do ...@@ -328,6 +381,61 @@ RSpec.describe Groups::DependencyProxyForContainersController do
end end
end end
describe 'GET #authorize_upload_blob' do
let(:blob_sha) { 'a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4' }
subject(:authorize_upload_blob) do
request.headers.merge!(workhorse_internal_api_request_header)
get :authorize_upload_blob, params: { group_id: group.to_param, image: 'alpine', sha: blob_sha }
end
it_behaves_like 'without permission'
context 'with a valid user' do
before do
group.add_guest(user)
end
it 'sends Workhorse file upload instructions', :aggregate_failures do
authorize_upload_blob
expect(response.headers['Content-Type']).to eq(Gitlab::Workhorse::INTERNAL_API_CONTENT_TYPE)
expect(json_response['TempPath']).to eq(DependencyProxy::FileUploader.workhorse_local_upload_path)
end
end
end
describe 'GET #upload_blob' do
let(:blob_sha) { 'a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4' }
let(:file) { fixture_file_upload("spec/fixtures/dependency_proxy/#{blob_sha}.gz", 'application/gzip') }
subject do
request.headers.merge!(workhorse_internal_api_request_header)
get :upload_blob, params: {
group_id: group.to_param,
image: 'alpine',
sha: blob_sha,
file: file
}
end
it_behaves_like 'without permission'
context 'with a valid user' do
before do
group.add_guest(user)
expect_next_found_instance_of(Group) do |instance|
expect(instance).to receive_message_chain(:dependency_proxy_blobs, :create!)
end
end
it_behaves_like 'a package tracking event', described_class.name, 'pull_blob'
end
end
def enable_dependency_proxy def enable_dependency_proxy
group.create_dependency_proxy_setting!(enabled: true) group.create_dependency_proxy_setting!(enabled: true)
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe 'Group Dependency Proxy for containers', :js do
include DependencyProxyHelpers
include_context 'file upload requests helpers'
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:sha) { 'a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4' }
let_it_be(:content) { fixture_file_upload("spec/fixtures/dependency_proxy/#{sha}.gz").read }
let(:image) { 'alpine' }
let(:url) { capybara_url("/v2/#{group.full_path}/dependency_proxy/containers/#{image}/blobs/sha256:#{sha}") }
let(:token) { 'token' }
let(:headers) { { 'Authorization' => "Bearer #{build_jwt(user).encoded}" } }
subject do
HTTParty.get(url, headers: headers)
end
def run_server(handler)
default_server = Capybara.server
Capybara.server = Capybara.servers[:puma]
server = Capybara::Server.new(handler)
server.boot
server
ensure
Capybara.server = default_server
end
let_it_be(:external_server) do
handler = lambda do |env|
if env['REQUEST_PATH'] == '/token'
[200, {}, [{ token: 'token' }.to_json]]
else
[200, {}, [content]]
end
end
run_server(handler)
end
before do
stub_application_setting(allow_local_requests_from_web_hooks_and_services: true)
stub_config(dependency_proxy: { enabled: true })
group.add_developer(user)
stub_const("DependencyProxy::Registry::AUTH_URL", external_server.base_url)
stub_const("DependencyProxy::Registry::LIBRARY_URL", external_server.base_url)
end
shared_examples 'responds with the file' do
it 'sends file' do
expect(subject.code).to eq(200)
expect(subject.body).to eq(content)
expect(subject.headers.to_h).to include(
"content-type" => ["application/gzip"],
"content-disposition" => ["attachment; filename=\"#{sha}.gz\"; filename*=UTF-8''#{sha}.gz"],
"content-length" => ["32"]
)
end
end
shared_examples 'caches the file' do
it 'caches the file' do
expect { subject }.to change {
group.dependency_proxy_blobs.count
}.from(0).to(1)
expect(subject.code).to eq(200)
expect(group.dependency_proxy_blobs.first.file.read).to eq(content)
end
end
context 'fetching a blob' do
context 'when the blob is cached for the group' do
let!(:dependency_proxy_blob) { create(:dependency_proxy_blob, group: group) }
it_behaves_like 'responds with the file'
context 'dependency_proxy_workhorse feature flag disabled' do
before do
stub_feature_flags({ dependency_proxy_workhorse: false })
end
it_behaves_like 'responds with the file'
end
end
end
context 'when the blob must be downloaded' do
it_behaves_like 'responds with the file'
it_behaves_like 'caches the file'
context 'dependency_proxy_workhorse feature flag disabled' do
before do
stub_feature_flags({ dependency_proxy_workhorse: false })
end
it_behaves_like 'responds with the file'
it_behaves_like 'caches the file'
end
end
end
...@@ -16,6 +16,7 @@ RSpec.describe Gitlab::Middleware::Multipart::Handler do ...@@ -16,6 +16,7 @@ RSpec.describe Gitlab::Middleware::Multipart::Handler do
::Gitlab.config.uploads.storage_path, ::Gitlab.config.uploads.storage_path,
::JobArtifactUploader.workhorse_upload_path, ::JobArtifactUploader.workhorse_upload_path,
::LfsObjectUploader.workhorse_upload_path, ::LfsObjectUploader.workhorse_upload_path,
::DependencyProxy::FileUploader.workhorse_upload_path,
File.join(Rails.root, 'public/uploads/tmp') File.join(Rails.root, 'public/uploads/tmp')
] ]
end end
......
package dependencyproxy
import (
"context"
"fmt"
"io"
"net"
"net/http"
"time"
"gitlab.com/gitlab-org/labkit/correlation"
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/labkit/tracing"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/senddata"
)
// httpTransport defines a http.Transport with values
// that are more restrictive than for http.DefaultTransport,
// they define shorter TLS Handshake, and more aggressive connection closing
// to prevent the connection hanging and reduce FD usage
var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
MaxIdleConns: 2,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
}))
var httpClient = &http.Client{
Transport: httpTransport,
}
type Injector struct {
senddata.Prefix
uploadHandler http.Handler
}
type entryParams struct {
Url string
Header http.Header
}
type nullResponseWriter struct {
header http.Header
status int
}
func (nullResponseWriter) Write(p []byte) (int, error) {
return len(p), nil
}
func (w *nullResponseWriter) Header() http.Header {
return w.header
}
func (w *nullResponseWriter) WriteHeader(status int) {
if w.status == 0 {
w.status = status
}
}
func NewInjector() *Injector {
return &Injector{
Prefix: "send-dependency:",
}
}
func (p *Injector) SetUploadHandler(uploadHandler http.Handler) {
p.uploadHandler = uploadHandler
}
func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
if err != nil {
helper.Fail500(w, r, err)
return
}
defer dependencyResponse.Body.Close()
if dependencyResponse.StatusCode >= 400 {
w.WriteHeader(dependencyResponse.StatusCode)
io.Copy(w, dependencyResponse.Body)
return
}
teeReader := io.TeeReader(dependencyResponse.Body, w)
saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("dependency proxy: failed to create request: %w", err))
}
saveFileRequest.Header = helper.HeaderClone(r.Header)
saveFileRequest.ContentLength = dependencyResponse.ContentLength
w.Header().Del("Content-Length")
nrw := &nullResponseWriter{header: http.Header{}}
p.uploadHandler.ServeHTTP(nrw, saveFileRequest)
if nrw.status != http.StatusOK {
fields := log.Fields{"code": nrw.status}
helper.Fail500WithFields(nrw, r, fmt.Errorf("dependency proxy: failed to upload file"), fields)
}
}
func (p *Injector) fetchUrl(ctx context.Context, sendData string) (*http.Response, error) {
var params entryParams
if err := p.Unpack(&params, sendData); err != nil {
return nil, fmt.Errorf("dependency proxy: unpack sendData: %v", err)
}
r, err := http.NewRequestWithContext(ctx, "GET", params.Url, nil)
if err != nil {
return nil, fmt.Errorf("dependency proxy: failed to fetch dependency: %v", err)
}
r.Header = params.Header
return httpClient.Do(r)
}
package dependencyproxy
import (
"encoding/base64"
"io"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"github.com/stretchr/testify/require"
)
type fakeUploadHandler struct {
request *http.Request
body []byte
handler func(w http.ResponseWriter, r *http.Request)
}
func (f *fakeUploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
f.request = r
f.body, _ = io.ReadAll(r.Body)
f.handler(w, r)
}
func TestSuccessfullRequest(t *testing.T) {
content := []byte("result")
originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", strconv.Itoa(len(content)))
w.Write(content)
}))
uploadHandler := &fakeUploadHandler{
handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
},
}
injector := NewInjector()
injector.SetUploadHandler(uploadHandler)
response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`)
require.Equal(t, "/target/upload", uploadHandler.request.URL.Path)
require.Equal(t, int64(6), uploadHandler.request.ContentLength)
require.Equal(t, content, uploadHandler.body)
require.Equal(t, 200, response.Code)
require.Equal(t, string(content), response.Body.String())
}
func TestIncorrectSendData(t *testing.T) {
response := makeRequest(NewInjector(), "")
require.Equal(t, 500, response.Code)
require.Equal(t, "Internal server error\n", response.Body.String())
}
func TestIncorrectSendDataUrl(t *testing.T) {
response := makeRequest(NewInjector(), `{"Token": "token", "Url": "url"}`)
require.Equal(t, 500, response.Code)
require.Equal(t, "Internal server error\n", response.Body.String())
}
func TestFailedOriginServer(t *testing.T) {
originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
w.Write([]byte("Not found"))
}))
uploadHandler := &fakeUploadHandler{
handler: func(w http.ResponseWriter, r *http.Request) {
require.FailNow(t, "the error response must not be uploaded")
},
}
injector := NewInjector()
injector.SetUploadHandler(uploadHandler)
response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`)
require.Equal(t, 404, response.Code)
require.Equal(t, "Not found", response.Body.String())
}
func makeRequest(injector *Injector, data string) *httptest.ResponseRecorder {
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/target", nil)
sendData := base64.StdEncoding.EncodeToString([]byte(data))
injector.Inject(w, r, sendData)
return w
}
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/builds" "gitlab.com/gitlab-org/gitlab/workhorse/internal/builds"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/channel" "gitlab.com/gitlab-org/gitlab/workhorse/internal/channel"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/dependencyproxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/git" "gitlab.com/gitlab-org/gitlab/workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/imageresizer" "gitlab.com/gitlab-org/gitlab/workhorse/internal/imageresizer"
...@@ -170,7 +171,7 @@ func (ro *routeEntry) isMatch(cleanedPath string, req *http.Request) bool { ...@@ -170,7 +171,7 @@ func (ro *routeEntry) isMatch(cleanedPath string, req *http.Request) bool {
return ok return ok
} }
func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config) http.Handler { func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config, dependencyProxyInjector *dependencyproxy.Injector) http.Handler {
proxier := proxypkg.NewProxy(backend, version, rt) proxier := proxypkg.NewProxy(backend, version, rt)
return senddata.SendData( return senddata.SendData(
...@@ -183,6 +184,7 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf ...@@ -183,6 +184,7 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf
artifacts.SendEntry, artifacts.SendEntry,
sendurl.SendURL, sendurl.SendURL,
imageresizer.NewResizer(cfg), imageresizer.NewResizer(cfg),
dependencyProxyInjector,
) )
} }
...@@ -193,7 +195,8 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf ...@@ -193,7 +195,8 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf
func configureRoutes(u *upstream) { func configureRoutes(u *upstream) {
api := u.APIClient api := u.APIClient
static := &staticpages.Static{DocumentRoot: u.DocumentRoot, Exclude: staticExclude} static := &staticpages.Static{DocumentRoot: u.DocumentRoot, Exclude: staticExclude}
proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config) dependencyProxyInjector := dependencyproxy.NewInjector()
proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config, dependencyProxyInjector)
cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper) cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper)
assetsNotFoundHandler := NotFoundUnless(u.DevelopmentMode, proxy) assetsNotFoundHandler := NotFoundUnless(u.DevelopmentMode, proxy)
...@@ -207,7 +210,7 @@ func configureRoutes(u *upstream) { ...@@ -207,7 +210,7 @@ func configureRoutes(u *upstream) {
} }
signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version) signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version)
signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config) signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config, dependencyProxyInjector)
preparers := createUploadPreparers(u.Config) preparers := createUploadPreparers(u.Config)
uploadPath := path.Join(u.DocumentRoot, "uploads/tmp") uploadPath := path.Join(u.DocumentRoot, "uploads/tmp")
...@@ -215,6 +218,8 @@ func configureRoutes(u *upstream) { ...@@ -215,6 +218,8 @@ func configureRoutes(u *upstream) {
ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration) ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
dependencyProxyInjector.SetUploadHandler(upload.BodyUploader(api, signingProxy, preparers.packages))
// Serve static files or forward the requests // Serve static files or forward the requests
defaultUpstream := static.ServeExisting( defaultUpstream := static.ServeExisting(
u.URLPrefix, u.URLPrefix,
......
...@@ -934,3 +934,101 @@ func TestHealthChecksUnreachable(t *testing.T) { ...@@ -934,3 +934,101 @@ func TestHealthChecksUnreachable(t *testing.T) {
}) })
} }
} }
func TestDependencyProxyInjector(t *testing.T) {
token := "token"
bodyLength := 4096 * 12
expectedBody := strings.Repeat("p", bodyLength)
testCases := []struct {
desc string
contentLength int
readSize int
finalizeHandler func(*testing.T, http.ResponseWriter)
}{
{
desc: "the uploading successfully finalized",
contentLength: bodyLength,
readSize: bodyLength,
finalizeHandler: func(t *testing.T, w http.ResponseWriter) {
w.WriteHeader(200)
},
}, {
desc: "the uploading failed",
contentLength: bodyLength,
readSize: bodyLength,
finalizeHandler: func(t *testing.T, w http.ResponseWriter) {
w.WriteHeader(500)
},
}, {
desc: "the origin resource server returns partial response",
contentLength: bodyLength + 1000,
readSize: bodyLength,
finalizeHandler: func(t *testing.T, _ http.ResponseWriter) {
t.Fatal("partial file must not be saved")
},
}, {
desc: "a user does not read the whole file",
contentLength: bodyLength,
readSize: bodyLength - 1000,
finalizeHandler: func(t *testing.T, _ http.ResponseWriter) {
t.Fatal("partial file must not be saved")
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
originResource := "/origin_resource"
originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, originResource, r.URL.String())
w.Header().Set("Content-Length", strconv.Itoa(tc.contentLength))
_, err := io.WriteString(w, expectedBody)
require.NoError(t, err)
}))
defer originResourceServer.Close()
originResourceUrl := originResourceServer.URL + originResource
ts := testhelper.TestServerWithHandler(regexp.MustCompile(`.`), func(w http.ResponseWriter, r *http.Request) {
switch r.URL.String() {
case "/base":
params := `{"Url": "` + originResourceUrl + `", "Token": "` + token + `"}`
w.Header().Set("Gitlab-Workhorse-Send-Data", `send-dependency:`+base64.URLEncoding.EncodeToString([]byte(params)))
case "/base/upload/authorize":
w.Header().Set("Content-Type", api.ResponseContentType)
_, err := fmt.Fprintf(w, `{"TempPath":"%s"}`, scratchDir)
require.NoError(t, err)
case "/base/upload":
tc.finalizeHandler(t, w)
default:
t.Fatalf("unexpected request: %s", r.URL)
}
})
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
req, err := http.NewRequest("GET", ws.URL+"/base", nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
body := make([]byte, tc.readSize)
_, err = io.ReadFull(resp.Body, body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close()) // Client closes connection
ws.Close() // Wait for server handler to return
require.Equal(t, 200, resp.StatusCode, "status code")
require.Equal(t, expectedBody[0:tc.readSize], string(body), "response body")
})
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment