Commit 0701fead authored by Alessio Caiazza's avatar Alessio Caiazza Committed by Jacob Vosmaer (GitLab)

Object Store direct upload

parent 22343155
......@@ -68,7 +68,12 @@ func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTrippe
type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
type RemoteObjectStore struct {
// StoreURL is the temporary URL to which upload the first found file
// GetURL is not used in gitlab-workhorse. We pass it back to gitlab-rails
// later for symmetry with the 'store upload in tempfile' approach.
GetURL string
// DeleteURL is a presigned S3 RemoveObject URL
DeleteURL string
// StoreURL is the temporary presigned S3 PutObject URL to which upload the first found file
StoreURL string
// ObjectID is a unique identifier of object storage upload
ObjectID string
......@@ -90,8 +95,8 @@ type Response struct {
// RepoPath is the full path on disk to the Git repository the request is
// about
RepoPath string
// StoreLFSPath is provided by the GitLab Rails application
// to mark where the tmp file should be placed
// StoreLFSPath is provided by the GitLab Rails application to mark where the tmp file should be placed.
// This field is deprecated. GitLab will use TempPath instead
StoreLFSPath string
// LFS object id
LfsOid string
......
......@@ -4,126 +4,28 @@ import (
"context"
"fmt"
"mime/multipart"
"net/http"
"os"
"time"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
)
var (
DefaultObjectStoreTimeoutSeconds = 360
)
var (
objectStorageUploadRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_object_storage_upload_requests",
Help: "How many object storage requests have been processed",
},
[]string{"status"},
)
objectStorageUploadsOpen = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_object_storage_upload_open",
Help: "Describes many object storage requests are open now",
},
)
objectStorageUploadBytes = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_object_storage_upload_bytes",
Help: "How many bytes were sent to object storage",
},
)
objectStorageUploadTime = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "gitlab_workhorse_object_storage_upload_time",
Help: "How long it took to upload objects",
Buckets: objectStorageUploadTimeBuckets,
})
objectStorageUploadRequestsFileFailed = objectStorageUploadRequests.WithLabelValues("file-failed")
objectStorageUploadRequestsRequestFailed = objectStorageUploadRequests.WithLabelValues("request-failed")
objectStorageUploadRequestsInvalidStatus = objectStorageUploadRequests.WithLabelValues("invalid-status")
objectStorageUploadRequestsSucceeded = objectStorageUploadRequests.WithLabelValues("succeeded")
objectStorageUploadRequestsMultipleUploads = objectStorageUploadRequests.WithLabelValues("multiple-uploads")
objectStorageUploadTimeBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}
)
func init() {
prometheus.MustRegister(
objectStorageUploadRequests,
objectStorageUploadsOpen,
objectStorageUploadBytes)
}
func (a *artifactsUploadProcessor) storeFile(ctx context.Context, formName, fileName string, writer *multipart.Writer) error {
if a.ObjectStore.StoreURL == "" {
if !a.opts.IsRemote() {
return nil
}
if a.stored {
objectStorageUploadRequestsMultipleUploads.Inc()
return nil
}
started := time.Now()
defer func() {
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}()
file, err := os.Open(fileName)
if err != nil {
objectStorageUploadRequestsFileFailed.Inc()
return err
}
defer file.Close()
fi, err := file.Stat()
fh, err := filestore.SaveFileFromDisk(ctx, fileName, a.opts)
if err != nil {
objectStorageUploadRequestsFileFailed.Inc()
return err
return fmt.Errorf("Uploading to object store failed. %s", err)
}
req, err := http.NewRequest("PUT", a.ObjectStore.StoreURL, file)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return fmt.Errorf("PUT %q: %v", a.ObjectStore.StoreURL, err)
}
req.Header.Set("Content-Type", "application/octet-stream")
req.ContentLength = fi.Size()
objectStorageUploadsOpen.Inc()
defer objectStorageUploadsOpen.Dec()
timeout := DefaultObjectStoreTimeoutSeconds
if a.ObjectStore.Timeout != 0 {
timeout = a.ObjectStore.Timeout
}
ctx2, cancelFn := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancelFn()
req = req.WithContext(ctx2)
resp, err := http.DefaultClient.Do(req)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return fmt.Errorf("PUT request %q: %v", a.ObjectStore.StoreURL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
objectStorageUploadRequestsInvalidStatus.Inc()
return fmt.Errorf("PUT request %v returned: %d %s", a.ObjectStore.StoreURL, resp.StatusCode, resp.Status)
for field, value := range fh.GitLabFinalizeFields(formName) {
writer.WriteField(field, value)
}
writer.WriteField(formName+".store_url", a.ObjectStore.StoreURL)
writer.WriteField(formName+".object_id", a.ObjectStore.ObjectID)
objectStorageUploadRequestsSucceeded.Inc()
objectStorageUploadBytes.Add(float64(fi.Size()))
// Allow to upload only once using given credentials
a.stored = true
return nil
......
......@@ -3,6 +3,8 @@ package artifacts
import (
"archive/zip"
"bytes"
"crypto/md5"
"encoding/hex"
"fmt"
"io/ioutil"
"mime/multipart"
......@@ -19,14 +21,21 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
)
func createTestZipArchive(t *testing.T) []byte {
func createTestZipArchive(t *testing.T) (data []byte, md5Hash string) {
var buffer bytes.Buffer
archive := zip.NewWriter(&buffer)
fileInArchive, err := archive.Create("test.file")
require.NoError(t, err)
fmt.Fprint(fileInArchive, "test")
archive.Close()
return buffer.Bytes()
data = buffer.Bytes()
hasher := md5.New()
hasher.Write(data)
hexHash := hasher.Sum(nil)
md5Hash = hex.EncodeToString(hexHash)
return data, md5Hash
}
func createTestMultipartForm(t *testing.T, data []byte) (bytes.Buffer, string) {
......@@ -46,7 +55,7 @@ func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
}
defer os.RemoveAll(tempPath)
archiveData := createTestZipArchive(t)
archiveData, md5 := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
storeServerCalled := 0
......@@ -59,6 +68,7 @@ func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
require.Equal(t, archiveData, receivedData)
storeServerCalled++
w.Header().Set("ETag", md5)
w.WriteHeader(200)
})
......@@ -78,6 +88,7 @@ func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
ObjectStore: api.RemoteObjectStore{
StoreURL: storeServer.URL + "/url/put",
ObjectID: "store-id",
GetURL: storeServer.URL + "/store-id",
},
}
......@@ -112,7 +123,7 @@ func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *tes
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
archiveData, _ := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
......@@ -141,7 +152,7 @@ func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T)
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
archiveData, _ := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
......@@ -182,7 +193,7 @@ func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T)
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
archiveData, _ := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
......@@ -226,7 +237,7 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
archiveData, _ := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
......
......@@ -12,14 +12,14 @@ import (
"syscall"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upload"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/zipartifacts"
)
type artifactsUploadProcessor struct {
TempPath string
ObjectStore api.RemoteObjectStore
opts *filestore.SaveFileOpts
metadataFile string
stored bool
}
......@@ -56,7 +56,7 @@ func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName, fi
}
// Create temporary file for metadata and store it's path
tempFile, err := ioutil.TempFile(a.TempPath, "metadata_")
tempFile, err := ioutil.TempFile(a.opts.LocalTempPath, "metadata_")
if err != nil {
return err
}
......@@ -106,10 +106,7 @@ func UploadArtifacts(myAPI *api.API, h http.Handler) http.Handler {
return
}
mg := &artifactsUploadProcessor{
TempPath: a.TempPath,
ObjectStore: a.ObjectStore,
}
mg := &artifactsUploadProcessor{opts: filestore.GetOpts(a)}
defer mg.Cleanup()
upload.HandleFileUploads(w, r, h, a.TempPath, mg)
......
......@@ -8,15 +8,24 @@ import (
"io/ioutil"
"os"
"strconv"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
type MD5Error error
type SizeError error
// FileHandler represent a file that has been processed for upload
// it may be either uploaded to an ObjectStore and/or saved on local path.
// Remote upload is not yet implemented
type FileHandler struct {
// LocalPath is the path on the disk where file has been stored
LocalPath string
// RemoteID is the objectID provided by GitLab Rails
RemoteID string
// RemoteURL is ObjectStore URL provided by GitLab Rails
RemoteURL string
// Size is the persisted file size
Size int64
......@@ -55,6 +64,12 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) map[string]string {
if fh.LocalPath != "" {
data[key("path")] = fh.LocalPath
}
if fh.RemoteURL != "" {
data[key("store_url")] = fh.RemoteURL
}
if fh.RemoteID != "" {
data[key("object_id")] = fh.RemoteID
}
data[key("size")] = strconv.FormatInt(fh.Size, 10)
for hashName, hash := range fh.hashes {
data[key(hashName)] = hash
......@@ -66,7 +81,12 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) map[string]string {
// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) {
fh = &FileHandler{Name: opts.TempFilePrefix}
var object *objectstore.Object
fh = &FileHandler{
Name: opts.TempFilePrefix,
RemoteID: opts.RemoteID,
RemoteURL: opts.RemoteURL,
}
hashes := newMultiHash()
writers := []io.Writer{hashes.Writer}
defer func() {
......@@ -77,6 +97,21 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}
}()
if opts.IsRemote() {
// Unknown ContentLength must be implemented in order to achieve Artifact Uploading
if size == -1 && !opts.isGoogleCloudStorage() {
// TODO add support for artifact upload to S3-compatible object storage
return nil, errors.New("Not implemented")
}
object, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Timeout, size)
if err != nil {
return nil, err
}
writers = append(writers, object)
}
if opts.IsLocal() {
fileWriter, err := fh.uploadLocalFile(ctx, opts)
if err != nil {
......@@ -97,11 +132,23 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}
if size != -1 && size != fh.Size {
return nil, fmt.Errorf("Expected %d bytes but got only %d", size, fh.Size)
return nil, SizeError(fmt.Errorf("Expected %d bytes but got only %d", size, fh.Size))
}
fh.hashes = hashes.finish()
if opts.IsRemote() {
// we need to close the writer in order to get ETag header
err = object.Close()
if err != nil {
return nil, err
}
if fh.MD5() != object.MD5() {
return nil, MD5Error(fmt.Errorf("expected md5 %s, got %s", fh.MD5(), object.MD5()))
}
}
return fh, err
}
......@@ -125,3 +172,19 @@ func (fh *FileHandler) uploadLocalFile(ctx context.Context, opts *SaveFileOpts)
fh.LocalPath = file.Name()
return file, nil
}
// SaveFileFromDisk open the local file fileName and calls SaveFileFromReader
func SaveFileFromDisk(ctx context.Context, fileName string, opts *SaveFileOpts) (fh *FileHandler, err error) {
file, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
return nil, err
}
return SaveFileFromReader(ctx, file, fi.Size(), opts)
}
......@@ -5,6 +5,8 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"testing"
"time"
......@@ -13,64 +15,211 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
)
// Some usefull const for testing purpose
const (
// testContent an example textual content
testContent = "TEST OBJECT CONTENT"
// testSize is the testContent size
testSize = int64(len(testContent))
// testMD5 is testContent MD5 hash
testMD5 = "42d000eea026ee0760677e506189cb33"
// testSHA256 is testContent SHA256 hash
testSHA256 = "b0257e9e657ef19b15eed4fbba975bd5238d651977564035ef91cb45693647aa"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
)
func TestSaveFileFromReader(t *testing.T) {
func assertFileGetsRemovedAsync(t *testing.T, filePath string) {
var err error
// Poll because the file removal is async
for i := 0; i < 100; i++ {
_, err = os.Stat(filePath)
if err != nil {
break
}
time.Sleep(100 * time.Millisecond)
}
assert.True(t, os.IsNotExist(err), "File hasn't been deleted during cleanup")
}
func assertObjectStoreDeletedAsync(t *testing.T, expectedDeletes int, osStub *test.ObjectstoreStub) {
// Poll because the object removal is async
for i := 0; i < 100; i++ {
if osStub.DeletesCnt() == expectedDeletes {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, expectedDeletes, osStub.DeletesCnt(), "Object not deleted")
}
func TestSaveFileWrongSize(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
require.NoError(err)
defer os.RemoveAll(tmpFolder)
opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"}
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, opts)
assert.Error(err)
_, isSizeError := err.(filestore.SizeError)
assert.True(isSizeError, "Should fail with SizeError")
assert.Nil(fh)
}
func TestSaveFromDiskNotExistingFile(t *testing.T) {
assert := assert.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, err := filestore.SaveFileFromDisk(ctx, "/I/do/not/exist", &filestore.SaveFileOpts{})
assert.Error(err, "SaveFileFromDisk should fail")
assert.True(os.IsNotExist(err), "Provided file should not exists")
assert.Nil(fh, "On error FileHandler should be nil")
}
opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"}
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(testContent), testSize, opts)
assert.NoError(err)
require.NotNil(fh)
func TestSaveFileWrongMD5(t *testing.T) {
assert := assert.New(t)
assert.NotEmpty(fh.LocalPath, "File hasn't been persisted on disk")
_, err = os.Stat(fh.LocalPath)
assert.NoError(err)
osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"})
defer ts.Close()
assert.Equal(testMD5, fh.MD5())
assert.Equal(testSHA256, fh.SHA256())
objectURL := ts.URL + test.ObjectPath
cancel()
time.Sleep(100 * time.Millisecond)
_, err = os.Stat(fh.LocalPath)
opts := &filestore.SaveFileOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PresignedPut: objectURL + "?Signature=ASignature",
PresignedDelete: objectURL + "?Signature=AnotherSignature",
}
ctx, cancel := context.WithCancel(context.Background())
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
assert.Nil(fh)
assert.Error(err)
assert.True(os.IsNotExist(err), "File hasn't been deleted during cleanup")
_, isMD5Error := err.(filestore.MD5Error)
assert.True(isMD5Error, "Should fail with MD5Error")
assert.Equal(1, osStub.PutsCnt(), "File not uploaded")
cancel() // this will trigger an async cleanup
assertObjectStoreDeletedAsync(t, 1, osStub)
}
func TestSaveFileWrongSize(t *testing.T) {
func TestSaveFileFromDiskToLocalPath(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
f, err := ioutil.TempFile("", "workhorse-test")
require.NoError(err)
defer os.Remove(f.Name())
_, err = fmt.Fprint(f, test.ObjectContent)
require.NoError(err)
tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
require.NoError(err)
defer os.RemoveAll(tmpFolder)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder}
fh, err := filestore.SaveFileFromDisk(ctx, f.Name(), opts)
assert.NoError(err)
require.NotNil(fh)
assert.NotEmpty(fh.LocalPath, "File not persisted on disk")
_, err = os.Stat(fh.LocalPath)
assert.NoError(err)
}
func TestSaveFile(t *testing.T) {
tmpFolder, err := ioutil.TempDir("", "workhorse-test-tmp")
require.NoError(err)
require.NoError(t, err)
defer os.RemoveAll(tmpFolder)
opts := &filestore.SaveFileOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"}
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(testContent), testSize+1, opts)
assert.Error(err)
assert.EqualError(err, fmt.Sprintf("Expected %d bytes but got only %d", testSize+1, testSize))
assert.Nil(fh)
tests := []struct {
name string
local bool
remote bool
}{
{name: "Local only", local: true},
{name: "Remote only", remote: true},
{name: "Both", local: true, remote: true},
}
for _, spec := range tests {
t.Run(spec.name, func(t *testing.T) {
assert := assert.New(t)
var opts filestore.SaveFileOpts
var expectedDeletes, expectedPuts int
osStub, ts := test.StartObjectStore()
defer ts.Close()
if spec.remote {
objectURL := ts.URL + test.ObjectPath
opts.RemoteID = "test-file"
opts.RemoteURL = objectURL
opts.PresignedPut = objectURL + "?Signature=ASignature"
opts.PresignedDelete = objectURL + "?Signature=AnotherSignature"
expectedDeletes = 1
expectedPuts = 1
}
if spec.local {
opts.LocalTempPath = tmpFolder
opts.TempFilePrefix = "test-file"
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
assert.NoError(err)
require.NotNil(t, fh)
assert.Equal(opts.RemoteID, fh.RemoteID)
assert.Equal(opts.RemoteURL, fh.RemoteURL)
if spec.local {
assert.NotEmpty(fh.LocalPath, "File not persisted on disk")
_, err := os.Stat(fh.LocalPath)
assert.NoError(err)
dir := path.Dir(fh.LocalPath)
assert.Equal(opts.LocalTempPath, dir)
filename := path.Base(fh.LocalPath)
beginsWithPrefix := strings.HasPrefix(filename, opts.TempFilePrefix)
assert.True(beginsWithPrefix, fmt.Sprintf("LocalPath filename %q do not begin with TempFilePrefix %q", filename, opts.TempFilePrefix))
} else {
assert.Empty(fh.LocalPath, "LocalPath must be empty for non local uploads")
}
assert.Equal(test.ObjectSize, fh.Size)
assert.Equal(test.ObjectMD5, fh.MD5())
assert.Equal(test.ObjectSHA256, fh.SHA256())
assert.Equal(expectedPuts, osStub.PutsCnt(), "ObjectStore PutObject count mismatch")
assert.Equal(0, osStub.DeletesCnt(), "File deleted too early")
cancel() // this will trigger an async cleanup
assertObjectStoreDeletedAsync(t, expectedDeletes, osStub)
assertFileGetsRemovedAsync(t, fh.LocalPath)
// checking generated fields
fields := fh.GitLabFinalizeFields("file")
assert.Equal(fh.Name, fields["file.name"])
assert.Equal(fh.LocalPath, fields["file.path"])
assert.Equal(fh.RemoteURL, fields["file.store_url"])
assert.Equal(fh.RemoteID, fields["file.object_id"])
assert.Equal(strconv.FormatInt(test.ObjectSize, 10), fields["file.size"])
assert.Equal(test.ObjectMD5, fields["file.md5"])
assert.Equal(test.ObjectSHA1, fields["file.sha1"])
assert.Equal(test.ObjectSHA256, fields["file.sha256"])
assert.Equal(test.ObjectSHA512, fields["file.sha512"])
})
}
}
package filestore
import (
"net/url"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
// SaveFileOpts represents all the options available for saving a file to object store
type SaveFileOpts struct {
// TempFilePrefix is the prefix used to create temporary local file
TempFilePrefix string
// LocalTempPath is the directory where to write a local copy of the file
LocalTempPath string
// RemoteID is the remote ObjectID provided by GitLab
RemoteID string
// RemoteURL is the final URL of the file
RemoteURL string
// PresignedPut is a presigned S3 PutObject compatible URL
PresignedPut string
// PresignedDelete is a presigned S3 DeleteObject compatible URL.
PresignedDelete string
// Timeout it the S3 operation timeout. If 0, objectstore.DefaultObjectStoreTimeout will be used
Timeout time.Duration
}
// IsLocal checks if the options require the writing of the file on disk
func (s *SaveFileOpts) IsLocal() bool {
return s.LocalTempPath != ""
}
// IsRemote checks if the options requires a remote upload
func (s *SaveFileOpts) IsRemote() bool {
return s.PresignedPut != ""
}
func (s *SaveFileOpts) isGoogleCloudStorage() bool {
if !s.IsRemote() {
return false
}
getURL, err := url.Parse(s.RemoteURL)
if err != nil {
return false
}
return objectstore.IsGoogleCloudStorage(getURL)
}
// GetOpts converts GitLab api.Response to a proper SaveFileOpts
func GetOpts(apiResponse *api.Response) *SaveFileOpts {
timeout := time.Duration(apiResponse.ObjectStore.Timeout) * time.Second
if timeout == 0 {
timeout = objectstore.DefaultObjectStoreTimeout
}
return &SaveFileOpts{
LocalTempPath: apiResponse.TempPath,
RemoteID: apiResponse.ObjectStore.ObjectID,
RemoteURL: apiResponse.ObjectStore.GetURL,
PresignedPut: apiResponse.ObjectStore.StoreURL,
PresignedDelete: apiResponse.ObjectStore.DeleteURL,
Timeout: timeout,
}
}
package filestore_test
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
)
func TestSaveFileOptsLocalAndRemote(t *testing.T) {
tests := []struct {
name string
localTempPath string
presignedPut string
isLocal bool
isRemote bool
}{
{
name: "Only LocalTempPath",
localTempPath: "/tmp",
isLocal: true,
},
{
name: "Both paths",
localTempPath: "/tmp",
presignedPut: "http://example.com",
isLocal: true,
isRemote: true,
},
{
name: "No paths",
},
{
name: "Only remoteUrl",
presignedPut: "http://example.com",
isRemote: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert := assert.New(t)
opts := filestore.SaveFileOpts{
LocalTempPath: test.localTempPath,
PresignedPut: test.presignedPut,
}
assert.Equal(test.isLocal, opts.IsLocal(), "IsLocal() mismatch")
assert.Equal(test.isRemote, opts.IsRemote(), "IsRemote() mismatch")
})
}
}
func TestGetOpts(t *testing.T) {
assert := assert.New(t)
apiResponse := &api.Response{
TempPath: "/tmp",
ObjectStore: api.RemoteObjectStore{
Timeout: 10,
ObjectID: "id",
GetURL: "http://get",
StoreURL: "http://store",
DeleteURL: "http://delete",
},
}
opts := filestore.GetOpts(apiResponse)
assert.Equal(apiResponse.TempPath, opts.LocalTempPath)
assert.Equal(time.Duration(apiResponse.ObjectStore.Timeout)*time.Second, opts.Timeout)
assert.Equal(apiResponse.ObjectStore.ObjectID, opts.RemoteID)
assert.Equal(apiResponse.ObjectStore.GetURL, opts.RemoteURL)
assert.Equal(apiResponse.ObjectStore.StoreURL, opts.PresignedPut)
assert.Equal(apiResponse.ObjectStore.DeleteURL, opts.PresignedDelete)
}
func TestGetOptsDefaultTimeout(t *testing.T) {
assert := assert.New(t)
opts := filestore.GetOpts(&api.Response{})
assert.Equal(objectstore.DefaultObjectStoreTimeout, opts.Timeout)
}
......@@ -18,7 +18,7 @@ import (
const NginxResponseBufferHeader = "X-Accel-Buffering"
var scrubRegexp = regexp.MustCompile(`(?i)([\?&]((?:private|authenticity|rss)[\-_]token)|X-AMZ-Signature)=[^&]*`)
var scrubRegexp = regexp.MustCompile(`(?i)([\?&]((?:private|authenticity|rss)[\-_]token)|(?:X-AMZ-)?Signature)=[^&]*`)
func Fail500(w http.ResponseWriter, r *http.Request, err error) {
http.Error(w, "Internal server error", 500)
......
......@@ -130,6 +130,8 @@ func TestScrubURLParams(t *testing.T) {
"?X-AMZ-Signature=foo": "?X-AMZ-Signature=[FILTERED]",
"&X-AMZ-Signature=foo": "&X-AMZ-Signature=[FILTERED]",
"?x-amz-signature=foo": "?x-amz-signature=[FILTERED]",
"&Signature=foo": "&Signature=[FILTERED]",
"?Signature=foo": "?Signature=[FILTERED]",
} {
after := ScrubURLParams(before)
assert.Equal(t, expected, after, "Scrubbing %q", before)
......
......@@ -23,10 +23,14 @@ func PutStore(a *api.API, h http.Handler) http.Handler {
func handleStoreLFSObject(myAPI *api.API, h http.Handler) http.Handler {
return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
opts := &filestore.SaveFileOpts{
LocalTempPath: a.StoreLFSPath,
TempFilePrefix: a.LfsOid,
opts := filestore.GetOpts(a)
opts.TempFilePrefix = a.LfsOid
// backward compatible api check - to be removed on next release
if a.StoreLFSPath != "" {
opts.LocalTempPath = a.StoreLFSPath
}
// end of backward compatible api check
fh, err := filestore.SaveFileFromReader(r.Context(), r.Body, r.ContentLength, opts)
if err != nil {
......@@ -54,7 +58,11 @@ func handleStoreLFSObject(myAPI *api.API, h http.Handler) http.Handler {
r.Body = ioutil.NopCloser(strings.NewReader(body))
r.ContentLength = int64(len(body))
r.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// backward compatible API header - to be removed on next release
if opts.IsLocal() {
r.Header.Set("X-GitLab-Lfs-Tmp", filepath.Base(fh.LocalPath))
}
// end of backward compatible API header
// And proxy the request
h.ServeHTTP(w, r)
......
package objectstore
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
// DefaultObjectStoreTimeout is the timeout for ObjectStore PutObject api calls
const DefaultObjectStoreTimeout = 360 * time.Second
// httpTransport defines a http.Transport with values
// that are more restrictive than for http.DefaultTransport,
// they define shorter TLS Handshake, and more agressive connection closing
// to prevent the connection hanging and reduce FD usage
var httpTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
MaxIdleConns: 2,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
}
var httpClient = &http.Client{
Transport: httpTransport,
}
// IsGoogleCloudStorage checks if the provided URL is from Google Cloud Storage service
func IsGoogleCloudStorage(u *url.URL) bool {
return strings.ToLower(u.Host) == "storage.googleapis.com"
}
type MissingContentLengthError error
type StatusCodeError error
// Object represents an object on a S3 compatible Object Store service.
// It can be used as io.WriteCloser for uploading an object
type Object struct {
// PutURL is a presigned URL for PutObject
PutURL string
// DeleteURL is a presigned URL for RemoveObject
DeleteURL string
// md5 is the checksum provided by the Object Store
md5 string
// writeCloser is the writer bound to the PutObject body
writeCloser io.WriteCloser
// uploadError is the last error occourred during upload
uploadError error
// ctx is the internal context bound to the upload request
ctx context.Context
}
// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Duration, size int64) (*Object, error) {
started := time.Now()
o := &Object{
PutURL: putURL,
DeleteURL: deleteURL,
}
pr, pw := io.Pipe()
o.writeCloser = pw
req, err := http.NewRequest(http.MethodPut, o.PutURL, pr)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(o.PutURL), err)
}
if size == -1 {
if !IsGoogleCloudStorage(req.URL) {
objectStorageUploadRequestsRequestFailed.Inc()
return nil, MissingContentLengthError(fmt.Errorf("Unknown Content-Length not allowed on %s", req.URL.Host))
}
} else {
req.ContentLength = size
}
req.Header.Set("Content-Type", "application/octet-stream")
if timeout == 0 {
timeout = DefaultObjectStoreTimeout
}
uploadCtx, cancelFn := context.WithTimeout(ctx, timeout)
o.ctx = uploadCtx
objectStorageUploadsOpen.Inc()
go func() {
// wait for the upload to finish
<-o.ctx.Done()
objectStorageUploadTime.Observe(time.Since(started).Seconds())
// wait for provided context to finish before performing cleanup
<-ctx.Done()
o.delete()
}()
go func() {
defer cancelFn()
defer objectStorageUploadsOpen.Dec()
defer pr.Close()
req = req.WithContext(o.ctx)
resp, err := httpClient.Do(req)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
o.uploadError = fmt.Errorf("PUT request %q: %v", helper.ScrubURLParams(o.PutURL), err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
objectStorageUploadRequestsInvalidStatus.Inc()
o.uploadError = StatusCodeError(fmt.Errorf("PUT request %v returned: %s", helper.ScrubURLParams(o.PutURL), resp.Status))
return
}
o.extractMD5(resp.Header)
}()
return o, nil
}
// Write implements the standard io.Writer interface: it writes data to the PutObject body.
func (o *Object) Write(p []byte) (int, error) {
return o.writeCloser.Write(p)
}
// Close implements the standard io.Closer interface: it closes the http client request.
// This method will also wait for the connection to terminate and return any error occurred during the upload
func (o *Object) Close() error {
if err := o.writeCloser.Close(); err != nil {
return err
}
<-o.ctx.Done()
return o.uploadError
}
// MD5 returns the md5sum of the uploaded returned by the Object Store provider via ETag Header.
// This method will wait until upload context is done before returning.
func (o *Object) MD5() string {
<-o.ctx.Done()
return o.md5
}
func (o *Object) extractMD5(h http.Header) {
etag := h.Get("ETag")
if etag != "" && etag[0] == '"' {
etag = etag[1 : len(etag)-1]
}
o.md5 = etag
}
func (o *Object) delete() {
if o.DeleteURL == "" {
return
}
<-o.ctx.Done()
req, err := http.NewRequest(http.MethodDelete, o.DeleteURL, nil)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return
}
resp, err := httpClient.Do(req)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return
}
resp.Body.Close()
}
package objectstore_test
import (
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
)
const testTimeout = 10 * time.Second
func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
assert := assert.New(t)
osStub, ts := test.StartObjectStore()
defer ts.Close()
objectURL := ts.URL + test.ObjectPath
var deleteURL string
if useDeleteURL {
deleteURL = objectURL
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
object, err := objectstore.NewObject(ctx, objectURL, deleteURL, testTimeout, test.ObjectSize)
require.NoError(t, err)
// copy data
n, err := io.Copy(object, strings.NewReader(test.ObjectContent))
assert.NoError(err)
assert.Equal(test.ObjectSize, n, "Uploaded file mismatch")
// close HTTP stream
err = object.Close()
assert.NoError(err)
// Checking MD5 extraction
assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.MD5())
// Checking cleanup
cancel()
assert.Equal(1, osStub.PutsCnt(), "Object hasn't been uploaded")
var expectedDeleteCnt int
if useDeleteURL {
expectedDeleteCnt = 1
}
// Poll because the object removal is async
for i := 0; i < 100; i++ {
if osStub.DeletesCnt() == expectedDeleteCnt {
break
}
time.Sleep(10 * time.Millisecond)
}
if useDeleteURL {
assert.Equal(1, osStub.DeletesCnt(), "Object hasn't been deleted")
} else {
assert.Equal(0, osStub.DeletesCnt(), "Object has been deleted")
}
}
func TestObjectUpload(t *testing.T) {
t.Run("with delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, true) })
t.Run("without delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, false) })
}
func TestObjectUpload404(t *testing.T) {
assert := assert.New(t)
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, test.ObjectSize)
require.NoError(t, err)
_, err = io.Copy(object, strings.NewReader(test.ObjectContent))
assert.NoError(err)
err = object.Close()
assert.Error(err)
_, isStatusCodeError := err.(objectstore.StatusCodeError)
assert.True(isStatusCodeError, "Should fail with StatusCodeError")
assert.Contains(err.Error(), "404")
}
func TestUnknownSizeUpload(t *testing.T) {
assert := assert.New(t)
object, err := objectstore.NewObject(context.Background(), "http://example.com/bucket/object", "", 0, -1)
assert.Error(err)
_, isMissingContentLengthError := err.(objectstore.MissingContentLengthError)
assert.True(isMissingContentLengthError, "Should fail with MissingContentLengthError")
assert.Nil(object)
}
package objectstore
import "github.com/prometheus/client_golang/prometheus"
var (
objectStorageUploadRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_object_storage_upload_requests",
Help: "How many object storage requests have been processed",
},
[]string{"status"},
)
objectStorageUploadsOpen = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_object_storage_upload_open",
Help: "Describes many object storage requests are open now",
},
)
objectStorageUploadBytes = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_object_storage_upload_bytes",
Help: "How many bytes were sent to object storage",
},
)
objectStorageUploadTime = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "gitlab_workhorse_object_storage_upload_time",
Help: "How long it took to upload objects",
Buckets: objectStorageUploadTimeBuckets,
})
objectStorageUploadRequestsFileFailed = objectStorageUploadRequests.WithLabelValues("file-failed")
objectStorageUploadRequestsRequestFailed = objectStorageUploadRequests.WithLabelValues("request-failed")
objectStorageUploadRequestsInvalidStatus = objectStorageUploadRequests.WithLabelValues("invalid-status")
objectStorageUploadRequestsSucceeded = objectStorageUploadRequests.WithLabelValues("succeeded")
objectStorageUploadRequestsMultipleUploads = objectStorageUploadRequests.WithLabelValues("multiple-uploads")
objectStorageUploadTimeBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}
)
func init() {
prometheus.MustRegister(
objectStorageUploadRequests,
objectStorageUploadsOpen,
objectStorageUploadBytes)
}
package test
// Some usefull const for testing purpose
const (
// ObjectContent an example textual content
ObjectContent = "TEST OBJECT CONTENT"
// ObjectSize is the ObjectContent size
ObjectSize = int64(len(ObjectContent))
// Objectpath is an example remote object path (including bucket name)
ObjectPath = "/bucket/object"
// ObjectMD5 is ObjectContent MD5 hash
ObjectMD5 = "42d000eea026ee0760677e506189cb33"
// ObjectSHA1 is ObjectContent SHA1 hash
ObjectSHA1 = "173cfd58c6b60cb910f68a26cbb77e3fc5017a6d"
// ObjectSHA256 is ObjectContent SHA256 hash
ObjectSHA256 = "b0257e9e657ef19b15eed4fbba975bd5238d651977564035ef91cb45693647aa"
// ObjectSHA512 is ObjectContent SHA512 hash
ObjectSHA512 = "51af8197db2047f7894652daa7437927bf831d5aa63f1b0b7277c4800b06f5e3057251f0e4c2d344ca8c2daf1ffc08a28dd3b2f5fe0e316d3fd6c3af58c34b97"
)
package test
import (
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sync"
)
// ObjectstoreStub is a testing implementation of ObjectStore.
// Instead of storing objects it will just save md5sum.
type ObjectstoreStub struct {
// bucket contains md5sum of uploaded objects
bucket map[string]string
// overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash
overwriteMD5 map[string]string
puts int
deletes int
m sync.Mutex
}
// StartObjectStore will start an ObjectStore stub
func StartObjectStore() (*ObjectstoreStub, *httptest.Server) {
return StartObjectStoreWithCustomMD5(make(map[string]string))
}
// StartObjectStoreWithCustomMD5 will start an ObjectStore stub: md5Hashes contains overwrites for md5sum that should be return on PutObject
func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStub, *httptest.Server) {
os := &ObjectstoreStub{
bucket: make(map[string]string),
overwriteMD5: make(map[string]string),
}
for k, v := range md5Hashes {
os.overwriteMD5[k] = v
}
return os, httptest.NewServer(os)
}
// PutsCnt counts PutObject invocations
func (o *ObjectstoreStub) PutsCnt() int {
o.m.Lock()
defer o.m.Unlock()
return o.puts
}
// DeletesCnt counts DeleteObject invocation of a valid object
func (o *ObjectstoreStub) DeletesCnt() int {
o.m.Lock()
defer o.m.Unlock()
return o.deletes
}
// GetObjectMD5 return the calculated MD5 of the object uploaded to path
// it will return an empty string if no object has been uploaded on such path
func (o *ObjectstoreStub) GetObjectMD5(path string) string {
o.m.Lock()
defer o.m.Unlock()
return o.bucket[path]
}
func (o *ObjectstoreStub) removeObject(w http.ResponseWriter, r *http.Request) {
o.m.Lock()
defer o.m.Unlock()
objectPath := r.URL.Path
if _, ok := o.bucket[objectPath]; ok {
o.deletes++
delete(o.bucket, objectPath)
w.WriteHeader(200)
} else {
w.WriteHeader(404)
}
}
func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
o.m.Lock()
defer o.m.Unlock()
objectPath := r.URL.Path
etag, overwritten := o.overwriteMD5[objectPath]
if !overwritten {
hasher := md5.New()
io.Copy(hasher, r.Body)
checksum := hasher.Sum(nil)
etag = hex.EncodeToString(checksum)
}
o.puts++
o.bucket[objectPath] = etag
w.Header().Set("ETag", etag)
w.WriteHeader(200)
}
func (o *ObjectstoreStub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}
fmt.Println("ObjectStore Stub:", r.Method, r.URL.Path)
switch r.Method {
case "DELETE":
o.removeObject(w, r)
case "PUT":
o.putObject(w, r)
default:
w.WriteHeader(404)
}
}
package test
import (
"net/http"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestObjectStoreStub(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
stub, ts := StartObjectStore()
defer ts.Close()
assert.Equal(0, stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
objectURL := ts.URL + ObjectPath
req, err := http.NewRequest(http.MethodPut, objectURL, strings.NewReader(ObjectContent))
require.NoError(err)
_, err = http.DefaultClient.Do(req)
require.NoError(err)
assert.Equal(1, stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
assert.Equal(ObjectMD5, stub.GetObjectMD5(ObjectPath))
req, err = http.NewRequest(http.MethodDelete, objectURL, nil)
require.NoError(err)
_, err = http.DefaultClient.Do(req)
require.NoError(err)
assert.Equal(1, stub.PutsCnt())
assert.Equal(1, stub.DeletesCnt())
}
func TestObjectStoreStubDelete404(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
stub, ts := StartObjectStore()
defer ts.Close()
objectURL := ts.URL + ObjectPath
req, err := http.NewRequest(http.MethodDelete, objectURL, nil)
require.NoError(err)
resp, err := http.DefaultClient.Do(req)
require.NoError(err)
assert.Equal(404, resp.StatusCode)
assert.Equal(0, stub.DeletesCnt())
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment