Commit 7b39a569 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'sh-workhorse-direct-s3' into 'master'

Support Workhorse directly uploading files to S3 [3/3]

See merge request gitlab-org/gitlab-workhorse!466
parents e1e6ed3d 0671c96d
#!/bin/sh
git grep 'context.\(Background\|TODO\)' | \
grep -v -e '^[^:]*_test\.go:' -e '^vendor/' -e '^_support/' -e '^cmd/[^:]*/main.go' | \
grep -v -e '^[^:]*_test\.go:' -v -e "lint:allow context.Background" -e '^vendor/' -e '^_support/' -e '^cmd/[^:]*/main.go' | \
grep -e '^[^:]*\.go' | \
awk '{
print "Found disallowed use of context.Background or TODO"
......
---
title: Support Workhorse directly uploading files to S3
merge_request: 466
author:
type: added
[redis]
URL = "unix:/home/git/gitlab/redis/redis.socket"
[object_storage]
enabled = false
provider = "AWS"
[object_storage.s3]
aws_access_key_id = "YOUR AWS ACCESS KEY"
aws_secret_access_key = "YOUR AWS SECRET ACCESS KEY"
......@@ -6,6 +6,7 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/FZambia/sentinel v1.0.0
github.com/alecthomas/chroma v0.7.3
github.com/aws/aws-sdk-go v1.31.7
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/getsentry/raven-go v0.1.2
github.com/golang/gddo v0.0.0-20190419222130-af0f2af80721
......@@ -15,17 +16,19 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/jfbus/httprs v0.0.0-20190827093123-b0af8319bb15
github.com/johannesboyne/gofakes3 v0.0.0-20200510090907-02d71f533bec
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d
github.com/prometheus/client_golang v1.0.0
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1
github.com/sebest/xff v0.0.0-20160910043805-6c115e0ffa35
github.com/shabbyrobe/gocovmerge v0.0.0-20190829150210-3e036491d500 // indirect
github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.5.1
gitlab.com/gitlab-org/gitaly v1.74.0
gitlab.com/gitlab-org/labkit v0.0.0-20200520155818-96e583c57891
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa
golang.org/x/tools v0.0.0-20200117161641-43d50277825c
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375
google.golang.org/grpc v1.24.0
gopkg.in/yaml.v2 v2.2.8 // indirect
honnef.co/go/tools v0.0.1-2019.2.3
......
This diff is collapsed.
package config
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require"
)
func TestLoadObjectStorageConfig(t *testing.T) {
config := `
[object_storage]
enabled = true
provider = "AWS"
[object_storage.s3]
aws_access_key_id = "minio"
aws_secret_access_key = "gdk-minio"
`
tmpFile, err := ioutil.TempFile(os.TempDir(), "test-")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.Write([]byte(config))
require.NoError(t, err)
cfg, err := LoadConfig(tmpFile.Name())
require.NoError(t, err)
require.NotNil(t, cfg.ObjectStorageCredentials, "Expected object storage credentials")
expected := ObjectStorageCredentials{
Provider: "AWS",
S3Credentials: S3Credentials{
AwsAccessKeyID: "minio",
AwsSecretAccessKey: "gdk-minio",
},
}
require.Equal(t, expected, *cfg.ObjectStorageCredentials)
}
......@@ -11,6 +11,8 @@ import (
"github.com/dgrijalva/jwt-go"
"gitlab.com/gitlab-org/labkit/log"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/secret"
)
......@@ -115,7 +117,16 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}
}()
if opts.IsMultipart() {
useS3Client := opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid()
if useS3Client {
remoteWriter, err = objectstore.NewS3Object(ctx, opts.RemoteTempObjectID, opts.ObjectStorageConfig.S3Credentials, opts.ObjectStorageConfig.S3Config, opts.Deadline)
if err != nil {
return nil, err
}
writers = append(writers, remoteWriter)
} else if opts.IsMultipart() {
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize)
if err != nil {
return nil, err
......@@ -154,6 +165,24 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
return nil, SizeError(fmt.Errorf("expected %d bytes but got only %d", size, fh.Size))
}
logger := log.WithContextFields(ctx, log.Fields{
"copied_bytes": fh.Size,
"is_local": opts.IsLocal(),
"is_multipart": opts.IsMultipart(),
"is_remote": opts.IsRemote(),
"remote_id": opts.RemoteID,
"temp_file_prefix": opts.TempFilePrefix,
"use_s3_client": useS3Client,
})
if opts.IsLocal() {
logger = logger.WithField("local_temp_path", opts.LocalTempPath)
} else if useS3Client {
logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID)
}
logger.Info("saved file")
fh.hashes = hashes.finish()
if opts.IsRemote() {
......
......@@ -275,6 +275,32 @@ func TestSaveFile(t *testing.T) {
}
}
func TestSaveFileWithWorkhorseClient(t *testing.T) {
s3Creds, s3Config, sess, ts := test.SetupS3(t)
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
Provider: "AWS",
S3Credentials: s3Creds,
S3Config: s3Config,
},
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.NoError(t, err)
test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent)
}
func TestSaveMultipartInBodyFailure(t *testing.T) {
assert := assert.New(t)
......
package objectstore
import (
"context"
"io"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"gitlab.com/gitlab-org/labkit/log"
)
type S3Object struct {
credentials config.S3Credentials
config config.S3Config
objectName string
uploader
}
func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) {
pr, pw := io.Pipe()
objectStorageUploadsOpen.Inc()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &S3Object{
uploader: newUploader(uploadCtx, pw),
credentials: s3Credentials,
config: s3Config,
}
go o.trackUploadTime()
go o.cleanup(ctx)
go func() {
defer cancelFn()
defer objectStorageUploadsOpen.Dec()
defer func() {
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(o.uploadError)
}()
sess, err := setupS3Session(s3Credentials, s3Config)
if err != nil {
o.uploadError = err
log.WithError(err).Error("error creating S3 session")
return
}
o.objectName = objectName
uploader := s3manager.NewUploader(sess)
_, err = uploader.UploadWithContext(uploadCtx, &s3manager.UploadInput{
Bucket: aws.String(s3Config.Bucket),
Key: aws.String(objectName),
Body: pr,
})
if err != nil {
o.uploadError = err
objectStorageUploadRequestsRequestFailed.Inc()
log.WithError(err).Error("error uploading S3 session")
return
}
}()
return o, nil
}
func (o *S3Object) trackUploadTime() {
started := time.Now()
<-o.ctx.Done()
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}
func (o *S3Object) cleanup(ctx context.Context) {
// wait for the upload to finish
<-o.ctx.Done()
if o.uploadError != nil {
objectStorageUploadRequestsRequestFailed.Inc()
o.delete()
return
}
// We have now successfully uploaded the file to object storage. Another
// goroutine will hand off the object to gitlab-rails.
<-ctx.Done()
// gitlab-rails is now done with the object so it's time to delete it.
o.delete()
}
func (o *S3Object) delete() {
if o.objectName == "" {
return
}
session, err := setupS3Session(o.credentials, o.config)
if err != nil {
log.WithError(err).Error("error setting up S3 session in delete")
return
}
svc := s3.New(session)
input := &s3.DeleteObjectInput{
Bucket: aws.String(o.config.Bucket),
Key: aws.String(o.objectName),
}
// Note we can't use the request context because in a successful
// case, the original request has already completed.
deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
defer cancel()
_, err = svc.DeleteObjectWithContext(deleteCtx, input)
if err != nil {
log.WithError(err).Error("error deleting S3 object", err)
}
}
package objectstore_test
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
)
func TestS3ObjectUpload(t *testing.T) {
creds, config, sess, ts := test.SetupS3(t)
defer ts.Close()
deadline := time.Now().Add(testTimeout)
tmpDir, err := ioutil.TempDir("", "workhorse-test-")
require.NoError(t, err)
defer os.Remove(tmpDir)
objectName := filepath.Join(tmpDir, "s3-test-data")
ctx, cancel := context.WithCancel(context.Background())
object, err := objectstore.NewS3Object(ctx, objectName, creds, config, deadline)
require.NoError(t, err)
// copy data
n, err := io.Copy(object, strings.NewReader(test.ObjectContent))
require.NoError(t, err)
require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
// close HTTP stream
err = object.Close()
require.NoError(t, err)
test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent)
cancel()
deleted := false
retry(3, time.Second, func() error {
if test.S3ObjectDoesNotExist(t, sess, config, objectName) {
deleted = true
return nil
} else {
return fmt.Errorf("file is still present, retrying")
}
})
require.True(t, deleted)
}
func TestConcurrentS3ObjectUpload(t *testing.T) {
creds, uploadsConfig, uploadsSession, uploadServer := test.SetupS3WithBucket(t, "uploads")
defer uploadServer.Close()
// This will return a separate S3 endpoint
_, artifactsConfig, artifactsSession, artifactsServer := test.SetupS3WithBucket(t, "artifacts")
defer artifactsServer.Close()
deadline := time.Now().Add(testTimeout)
tmpDir, err := ioutil.TempDir("", "workhorse-test-")
require.NoError(t, err)
defer os.Remove(tmpDir)
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
go func(index int) {
var sess *session.Session
var config config.S3Config
if index%2 == 0 {
sess = uploadsSession
config = uploadsConfig
} else {
sess = artifactsSession
config = artifactsConfig
}
name := fmt.Sprintf("s3-test-data-%d", index)
objectName := filepath.Join(tmpDir, name)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
object, err := objectstore.NewS3Object(ctx, objectName, creds, config, deadline)
require.NoError(t, err)
// copy data
n, err := io.Copy(object, strings.NewReader(test.ObjectContent))
require.NoError(t, err)
require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
// close HTTP stream
require.NoError(t, object.Close())
test.S3ObjectExists(t, sess, config, objectName, test.ObjectContent)
wg.Done()
}(i)
}
wg.Wait()
}
func TestS3ObjectUploadCancel(t *testing.T) {
creds, config, _, ts := test.SetupS3(t)
defer ts.Close()
ctx, cancel := context.WithCancel(context.Background())
deadline := time.Now().Add(testTimeout)
tmpDir, err := ioutil.TempDir("", "workhorse-test-")
require.NoError(t, err)
defer os.Remove(tmpDir)
objectName := filepath.Join(tmpDir, "s3-test-data")
object, err := objectstore.NewS3Object(ctx, objectName, creds, config, deadline)
require.NoError(t, err)
// Cancel the transfer before the data has been copied to ensure
// we handle this gracefully.
cancel()
_, err = io.Copy(object, strings.NewReader(test.ObjectContent))
require.Error(t, err)
}
func retry(attempts int, sleep time.Duration, fn func() error) error {
if err := fn(); err != nil {
if s, ok := err.(stop); ok {
// Return the original error for later checking
return s.error
}
if attempts--; attempts > 0 {
time.Sleep(sleep)
return retry(attempts, 2*sleep, fn)
}
return err
}
return nil
}
type stop struct {
error
}
package objectstore
import (
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
)
type s3Session struct {
session *session.Session
expiry time.Time
}
type s3SessionCache struct {
// An S3 session is cached by its input configuration (e.g. region,
// endpoint, path style, etc.), but the bucket is actually
// determined by the type of object to be uploaded (e.g. CI
// artifact, LFS, etc.) during runtime. In practice, we should only
// need one session per Workhorse process if we only allow one
// configuration for many different buckets. However, using a map
// indexed by the config avoids potential pitfalls in case the
// bucket configuration is supplied at startup or we need to support
// multiple S3 endpoints.
sessions map[config.S3Config]*s3Session
sync.Mutex
}
func (s *s3Session) isExpired() bool {
return time.Now().After(s.expiry)
}
func newS3SessionCache() *s3SessionCache {
cache := &s3SessionCache{sessions: make(map[config.S3Config]*s3Session)}
return cache
}
var (
// By default, it looks like IAM instance profiles may last 6 hours
// (via curl http://169.254.169.254/latest/meta-data/iam/security-credentials/<role_name>),
// but this may be configurable from anywhere for 15 minutes to 12
// hours. To be safe, refresh AWS sessions every 10 minutes.
sessionExpiration = time.Duration(10 * time.Minute)
sessionCache = newS3SessionCache()
)
// SetupS3Session initializes a new AWS S3 session and refreshes one if
// necessary. As recommended in https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html,
// sessions should be cached when possible. Sessions are safe to use
// concurrently as long as the session isn't modified.
func setupS3Session(s3Credentials config.S3Credentials, s3Config config.S3Config) (*session.Session, error) {
sessionCache.Lock()
defer sessionCache.Unlock()
s, ok := sessionCache.sessions[s3Config]
if !ok {
s = &s3Session{}
sessionCache.sessions[s3Config] = s
} else if s.session != nil && !s.isExpired() {
return s.session.Copy(), nil
}
cfg := &aws.Config{
Region: aws.String(s3Config.Region),
S3ForcePathStyle: aws.Bool(s3Config.PathStyle),
}
// In case IAM profiles aren't being used, use the static credentials
if s3Credentials.AwsAccessKeyID != "" && s3Credentials.AwsSecretAccessKey != "" {
cfg.Credentials = credentials.NewStaticCredentials(s3Credentials.AwsAccessKeyID, s3Credentials.AwsSecretAccessKey, "")
}
if s3Config.Endpoint != "" {
cfg.Endpoint = aws.String(s3Config.Endpoint)
}
sess, err := session.NewSession(cfg)
if err != nil {
return nil, err
}
s.expiry = time.Now().Add(sessionExpiration)
s.session = sess
return sess.Copy(), nil
}
func ResetS3Session(s3Config config.S3Config) {
sessionCache.Lock()
defer sessionCache.Unlock()
s, ok := sessionCache.sessions[s3Config]
if ok {
s.session = nil
}
}
package objectstore
import (
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
)
func TestS3SessionSetup(t *testing.T) {
credentials := config.S3Credentials{}
cfg := config.S3Config{Region: "us-west-1", PathStyle: true}
sess, err := setupS3Session(credentials, cfg)
require.NoError(t, err)
require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1")
require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle))
require.Equal(t, len(sessionCache.sessions), 1)
anotherConfig := cfg
_, err = setupS3Session(credentials, anotherConfig)
require.NoError(t, err)
require.Equal(t, len(sessionCache.sessions), 1)
ResetS3Session(cfg)
}
func TestS3SessionExpiry(t *testing.T) {
credentials := config.S3Credentials{}
cfg := config.S3Config{Region: "us-west-1", PathStyle: true}
sess, err := setupS3Session(credentials, cfg)
require.NoError(t, err)
require.Equal(t, aws.StringValue(sess.Config.Region), "us-west-1")
require.True(t, aws.BoolValue(sess.Config.S3ForcePathStyle))
firstSession, ok := sessionCache.sessions[cfg]
require.True(t, ok)
require.False(t, firstSession.isExpired())
firstSession.expiry = time.Now().Add(-1 * time.Second)
require.True(t, firstSession.isExpired())
_, err = setupS3Session(credentials, cfg)
require.NoError(t, err)
nextSession, ok := sessionCache.sessions[cfg]
require.True(t, ok)
require.False(t, nextSession.isExpired())
ResetS3Session(cfg)
}
package test
import (
"io/ioutil"
"net/http/httptest"
"os"
"strings"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
)
func SetupS3(t *testing.T) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) {
return SetupS3WithBucket(t, "test-bucket")
}
func SetupS3WithBucket(t *testing.T, bucket string) (config.S3Credentials, config.S3Config, *session.Session, *httptest.Server) {
backend := s3mem.New()
faker := gofakes3.New(backend)
ts := httptest.NewServer(faker.Server())
creds := config.S3Credentials{
AwsAccessKeyID: "YOUR-ACCESSKEYID",
AwsSecretAccessKey: "YOUR-SECRETACCESSKEY",
}
config := config.S3Config{
Bucket: bucket,
Endpoint: ts.URL,
Region: "eu-central-1",
PathStyle: true,
}
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(creds.AwsAccessKeyID, creds.AwsSecretAccessKey, ""),
Endpoint: aws.String(ts.URL),
Region: aws.String(config.Region),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
})
require.NoError(t, err)
// Create S3 service client
svc := s3.New(sess)
_, err = svc.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
require.NoError(t, err)
return creds, config, sess, ts
}
// S3ObjectExists will fail the test if the file does not exist.
func S3ObjectExists(t *testing.T, sess *session.Session, config config.S3Config, objectName string, expectedBytes string) {
downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) {
require.NoError(t, err)
require.Equal(t, int64(len(expectedBytes)), numBytes)
output, err := ioutil.ReadFile(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, []byte(expectedBytes), output)
})
}
// S3ObjectDoesNotExist returns true if the object has been deleted,
// false otherwise. The return signature is different from
// S3ObjectExists because deletion may need to be retried since deferred
// clean up callsinternal/objectstore/test/s3_stub.go may cause the actual deletion to happen after the
// initial check.
func S3ObjectDoesNotExist(t *testing.T, sess *session.Session, config config.S3Config, objectName string) bool {
deleted := false
downloadObject(t, sess, config, objectName, func(tmpfile *os.File, numBytes int64, err error) {
if err != nil && strings.Contains(err.Error(), "NoSuchKey") {
deleted = true
}
})
return deleted
}
func downloadObject(t *testing.T, sess *session.Session, config config.S3Config, objectName string, handler func(tmpfile *os.File, numBytes int64, err error)) {
tmpDir, err := ioutil.TempDir("", "workhorse-test-")
require.NoError(t, err)
defer os.Remove(tmpDir)
tmpfile, err := ioutil.TempFile(tmpDir, "s3-output")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
downloadSvc := s3manager.NewDownloader(sess)
numBytes, err := downloadSvc.Download(tmpfile, &s3.GetObjectInput{
Bucket: aws.String(config.Bucket),
Key: aws.String(objectName),
})
handler(tmpfile, numBytes, err)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment