Commit 656fa2db authored by Nick Thomas's avatar Nick Thomas

Merge branch 'sh-add-azure-blob-store' into 'master'

Add Azure Blob Storage support

See merge request gitlab-org/gitlab-workhorse!555
parents 144c928b aa5192b4
---
title: Add Azure blob store support
merge_request: 555
author:
type: added
...@@ -2,9 +2,12 @@ ...@@ -2,9 +2,12 @@
URL = "unix:/home/git/gitlab/redis/redis.socket" URL = "unix:/home/git/gitlab/redis/redis.socket"
[object_storage] [object_storage]
enabled = false provider = "AWS" # Allowed options: AWS, AzureRM
provider = "AWS"
[object_storage.s3] [object_storage.s3]
aws_access_key_id = "YOUR AWS ACCESS KEY" aws_access_key_id = "YOUR AWS ACCESS KEY"
aws_secret_access_key = "YOUR AWS SECRET ACCESS KEY" aws_secret_access_key = "YOUR AWS SECRET ACCESS KEY"
[object_store.azurerm]
azure_storage_account_name = "YOUR ACCOUNT NAME"
azure_storage_access_key = "YOUR ACCOUNT KEY"
...@@ -3,14 +3,15 @@ module gitlab.com/gitlab-org/gitlab-workhorse ...@@ -3,14 +3,15 @@ module gitlab.com/gitlab-org/gitlab-workhorse
go 1.13 go 1.13
require ( require (
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/FZambia/sentinel v1.0.0 github.com/FZambia/sentinel v1.0.0
github.com/alecthomas/chroma v0.7.3 github.com/alecthomas/chroma v0.7.3
github.com/aws/aws-sdk-go v1.31.7 github.com/aws/aws-sdk-go v1.31.13
github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/getsentry/raven-go v0.1.2 github.com/getsentry/raven-go v0.1.2
github.com/golang/gddo v0.0.0-20190419222130-af0f2af80721 github.com/golang/gddo v0.0.0-20190419222130-af0f2af80721
github.com/golang/protobuf v1.3.2 github.com/golang/protobuf v1.4.2
github.com/gomodule/redigo v2.0.0+incompatible github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
...@@ -27,10 +28,11 @@ require ( ...@@ -27,10 +28,11 @@ require (
github.com/stretchr/testify v1.5.1 github.com/stretchr/testify v1.5.1
gitlab.com/gitlab-org/gitaly v1.74.0 gitlab.com/gitlab-org/gitaly v1.74.0
gitlab.com/gitlab-org/labkit v0.0.0-20200520155818-96e583c57891 gitlab.com/gitlab-org/labkit v0.0.0-20200520155818-96e583c57891
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f gocloud.dev v0.20.0
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b golang.org/x/lint v0.0.0-20200302205851-738671d3881b
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375 golang.org/x/net v0.0.0-20200602114024-627f9648deb9
google.golang.org/grpc v1.24.0 golang.org/x/tools v0.0.0-20200608174601-1b747fd94509
google.golang.org/grpc v1.29.1
gopkg.in/yaml.v2 v2.2.8 // indirect gopkg.in/yaml.v2 v2.2.8 // indirect
honnef.co/go/tools v0.0.1-2020.1.5 honnef.co/go/tools v0.0.1-2020.1.5
) )
This diff is collapsed.
...@@ -76,8 +76,9 @@ type MultipartUploadParams struct { ...@@ -76,8 +76,9 @@ type MultipartUploadParams struct {
} }
type ObjectStorageParams struct { type ObjectStorageParams struct {
Provider string Provider string
S3Config config.S3Config S3Config config.S3Config
GoCloudConfig config.GoCloudConfig
} }
type RemoteObject struct { type RemoteObject struct {
......
...@@ -2,9 +2,14 @@ package config ...@@ -2,9 +2,14 @@ package config
import ( import (
"net/url" "net/url"
"strings"
"time" "time"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"gitlab.com/gitlab-org/labkit/log"
"gocloud.dev/blob"
"gocloud.dev/blob/azureblob"
) )
type TomlURL struct { type TomlURL struct {
...@@ -30,7 +35,12 @@ func (d *TomlDuration) UnmarshalTest(text []byte) error { ...@@ -30,7 +35,12 @@ func (d *TomlDuration) UnmarshalTest(text []byte) error {
type ObjectStorageCredentials struct { type ObjectStorageCredentials struct {
Provider string Provider string
S3Credentials S3Credentials `toml:"s3"` S3Credentials S3Credentials `toml:"s3"`
AzureCredentials AzureCredentials `toml:"azurerm"`
}
type ObjectStorageConfig struct {
URLMux *blob.URLMux `toml:"-"`
} }
type S3Credentials struct { type S3Credentials struct {
...@@ -48,6 +58,15 @@ type S3Config struct { ...@@ -48,6 +58,15 @@ type S3Config struct {
SSEKMSKeyID string `toml:"-"` // Server-side encryption key-management service key ID (e.g. arn:aws:xxx) SSEKMSKeyID string `toml:"-"` // Server-side encryption key-management service key ID (e.g. arn:aws:xxx)
} }
type GoCloudConfig struct {
URL string `toml:"-"`
}
type AzureCredentials struct {
AccountName string `toml:"azure_storage_account_name"`
AccountKey string `toml:"azure_storage_access_key"`
}
type RedisConfig struct { type RedisConfig struct {
URL TomlURL URL TomlURL
Sentinel []TomlURL Sentinel []TomlURL
...@@ -75,6 +94,7 @@ type Config struct { ...@@ -75,6 +94,7 @@ type Config struct {
APIQueueLimit uint `toml:"-"` APIQueueLimit uint `toml:"-"`
APIQueueTimeout time.Duration `toml:"-"` APIQueueTimeout time.Duration `toml:"-"`
APICILongPollingDuration time.Duration `toml:"-"` APICILongPollingDuration time.Duration `toml:"-"`
ObjectStorageConfig ObjectStorageConfig `toml:"-"`
ObjectStorageCredentials *ObjectStorageCredentials `toml:"object_storage"` ObjectStorageCredentials *ObjectStorageCredentials `toml:"object_storage"`
PropagateCorrelationID bool `toml:"-"` PropagateCorrelationID bool `toml:"-"`
} }
...@@ -88,3 +108,31 @@ func LoadConfig(filename string) (*Config, error) { ...@@ -88,3 +108,31 @@ func LoadConfig(filename string) (*Config, error) {
return cfg, nil return cfg, nil
} }
func (c *Config) RegisterGoCloudURLOpeners() error {
c.ObjectStorageConfig.URLMux = new(blob.URLMux)
creds := c.ObjectStorageCredentials
if strings.EqualFold(creds.Provider, "AzureRM") && creds.AzureCredentials.AccountName != "" && creds.AzureCredentials.AccountKey != "" {
accountName := azureblob.AccountName(creds.AzureCredentials.AccountName)
accountKey := azureblob.AccountKey(creds.AzureCredentials.AccountKey)
credential, err := azureblob.NewCredential(accountName, accountKey)
if err != nil {
log.WithError(err).Error("error creating Azure credentials")
return err
}
pipeline := azureblob.NewPipeline(credential, azblob.PipelineOptions{})
azureURLOpener := &azureblob.URLOpener{
AccountName: accountName,
Pipeline: pipeline,
Options: azureblob.Options{Credential: credential},
}
c.ObjectStorageConfig.URLMux.RegisterBucket(azureblob.Scheme, azureURLOpener)
}
return nil
}
...@@ -11,24 +11,16 @@ import ( ...@@ -11,24 +11,16 @@ import (
func TestLoadObjectStorageConfig(t *testing.T) { func TestLoadObjectStorageConfig(t *testing.T) {
config := ` config := `
[object_storage] [object_storage]
enabled = true
provider = "AWS" provider = "AWS"
[object_storage.s3] [object_storage.s3]
aws_access_key_id = "minio" aws_access_key_id = "minio"
aws_secret_access_key = "gdk-minio" aws_secret_access_key = "gdk-minio"
` `
tmpFile, err := ioutil.TempFile(os.TempDir(), "test-")
require.NoError(t, err)
tmpFile, cfg := loadTempConfig(t, config)
defer os.Remove(tmpFile.Name()) defer os.Remove(tmpFile.Name())
_, err = tmpFile.Write([]byte(config))
require.NoError(t, err)
cfg, err := LoadConfig(tmpFile.Name())
require.NoError(t, err)
require.NotNil(t, cfg.ObjectStorageCredentials, "Expected object storage credentials") require.NotNil(t, cfg.ObjectStorageCredentials, "Expected object storage credentials")
expected := ObjectStorageCredentials{ expected := ObjectStorageCredentials{
...@@ -41,3 +33,49 @@ aws_secret_access_key = "gdk-minio" ...@@ -41,3 +33,49 @@ aws_secret_access_key = "gdk-minio"
require.Equal(t, expected, *cfg.ObjectStorageCredentials) require.Equal(t, expected, *cfg.ObjectStorageCredentials)
} }
func TestRegisterGoCloudURLOpeners(t *testing.T) {
config := `
[object_storage]
provider = "AzureRM"
[object_storage.azurerm]
azure_storage_account_name = "azuretester"
azure_storage_access_key = "deadbeef"
`
tmpFile, cfg := loadTempConfig(t, config)
defer os.Remove(tmpFile.Name())
require.NotNil(t, cfg.ObjectStorageCredentials, "Expected object storage credentials")
expected := ObjectStorageCredentials{
Provider: "AzureRM",
AzureCredentials: AzureCredentials{
AccountName: "azuretester",
AccountKey: "deadbeef",
},
}
require.Equal(t, expected, *cfg.ObjectStorageCredentials)
require.Nil(t, cfg.ObjectStorageConfig.URLMux)
err := cfg.RegisterGoCloudURLOpeners()
require.NoError(t, err)
require.NotNil(t, cfg.ObjectStorageConfig.URLMux)
require.True(t, cfg.ObjectStorageConfig.URLMux.ValidBucketScheme("azblob"))
require.Equal(t, []string{"azblob"}, cfg.ObjectStorageConfig.URLMux.BucketSchemes())
}
func loadTempConfig(t *testing.T, config string) (f *os.File, cfg *Config) {
tmpFile, err := ioutil.TempFile(os.TempDir(), "test-")
require.NoError(t, err)
_, err = tmpFile.Write([]byte(config))
require.NoError(t, err)
cfg, err = LoadConfig(tmpFile.Name())
require.NoError(t, err)
return tmpFile, cfg
}
...@@ -117,32 +117,41 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -117,32 +117,41 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
} }
}() }()
useS3Client := opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid() var clientMode string
if opts.IsRemote() {
if useS3Client { if opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsGoCloud() {
remoteWriter, err = objectstore.NewS3Object(ctx, opts.RemoteTempObjectID, opts.ObjectStorageConfig.S3Credentials, opts.ObjectStorageConfig.S3Config, opts.Deadline) clientMode = fmt.Sprintf("go_cloud:%s", opts.ObjectStorageConfig.Provider)
if err != nil { p := &objectstore.GoCloudObjectParams{
return nil, err Ctx: ctx,
} Mux: opts.ObjectStorageConfig.URLMux,
BucketURL: opts.ObjectStorageConfig.GoCloudConfig.URL,
writers = append(writers, remoteWriter) ObjectName: opts.RemoteTempObjectID,
} else if opts.IsMultipart() { Deadline: opts.Deadline,
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize) }
if err != nil { remoteWriter, err = objectstore.NewGoCloudObject(p)
return nil, err } else if opts.UseWorkhorseClientEnabled() && opts.ObjectStorageConfig.IsAWS() && opts.ObjectStorageConfig.IsValid() {
clientMode = "s3"
remoteWriter, err = objectstore.NewS3Object(ctx, opts.RemoteTempObjectID, opts.ObjectStorageConfig.S3Credentials, opts.ObjectStorageConfig.S3Config, opts.Deadline)
} else if opts.IsMultipart() {
clientMode = "multipart"
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize)
} else {
clientMode = "http"
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, size)
} }
writers = append(writers, remoteWriter)
} else if opts.IsRemote() {
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, size)
if err != nil { if err != nil {
return nil, err return nil, err
} }
writers = append(writers, remoteWriter) writers = append(writers, remoteWriter)
} }
if opts.IsLocal() { if opts.IsLocal() {
if clientMode == "" {
clientMode = "local"
} else {
clientMode += "+local"
}
fileWriter, err := fh.uploadLocalFile(ctx, opts) fileWriter, err := fh.uploadLocalFile(ctx, opts)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -172,12 +181,14 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -172,12 +181,14 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
"is_remote": opts.IsRemote(), "is_remote": opts.IsRemote(),
"remote_id": opts.RemoteID, "remote_id": opts.RemoteID,
"temp_file_prefix": opts.TempFilePrefix, "temp_file_prefix": opts.TempFilePrefix,
"use_s3_client": useS3Client, "client_mode": clientMode,
}) })
if opts.IsLocal() { if opts.IsLocal() {
logger = logger.WithField("local_temp_path", opts.LocalTempPath) logger = logger.WithField("local_temp_path", opts.LocalTempPath)
} else if useS3Client { }
if opts.IsRemote() {
logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID) logger = logger.WithField("remote_temp_object", opts.RemoteTempObjectID)
} }
......
...@@ -14,7 +14,9 @@ import ( ...@@ -14,7 +14,9 @@ import (
"github.com/dgrijalva/jwt-go" "github.com/dgrijalva/jwt-go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gocloud.dev/blob"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test" "gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
...@@ -184,9 +186,11 @@ func TestSaveFile(t *testing.T) { ...@@ -184,9 +186,11 @@ func TestSaveFile(t *testing.T) {
for _, spec := range tests { for _, spec := range tests {
t.Run(spec.name, func(t *testing.T) { t.Run(spec.name, func(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
logHook := testhelper.SetupLogger()
var opts filestore.SaveFileOpts var opts filestore.SaveFileOpts
var expectedDeletes, expectedPuts int var expectedDeletes, expectedPuts int
var expectedClientMode string
osStub, ts := test.StartObjectStore() osStub, ts := test.StartObjectStore()
defer ts.Close() defer ts.Close()
...@@ -203,6 +207,7 @@ func TestSaveFile(t *testing.T) { ...@@ -203,6 +207,7 @@ func TestSaveFile(t *testing.T) {
expectedDeletes = 1 expectedDeletes = 1
expectedPuts = 1 expectedPuts = 1
expectedClientMode = "http"
case remoteMultipart: case remoteMultipart:
objectURL := ts.URL + test.ObjectPath objectURL := ts.URL + test.ObjectPath
...@@ -217,11 +222,18 @@ func TestSaveFile(t *testing.T) { ...@@ -217,11 +222,18 @@ func TestSaveFile(t *testing.T) {
osStub.InitiateMultipartUpload(test.ObjectPath) osStub.InitiateMultipartUpload(test.ObjectPath)
expectedDeletes = 1 expectedDeletes = 1
expectedPuts = 2 expectedPuts = 2
expectedClientMode = "multipart"
} }
if spec.local { if spec.local {
opts.LocalTempPath = tmpFolder opts.LocalTempPath = tmpFolder
opts.TempFilePrefix = "test-file" opts.TempFilePrefix = "test-file"
if expectedClientMode != "" {
expectedClientMode += "+local"
} else {
expectedClientMode = "local"
}
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
...@@ -271,11 +283,20 @@ func TestSaveFile(t *testing.T) { ...@@ -271,11 +283,20 @@ func TestSaveFile(t *testing.T) {
uploadFields := token.Claims.(*testhelper.UploadClaims).Upload uploadFields := token.Claims.(*testhelper.UploadClaims).Upload
checkFileHandlerWithFields(t, fh, uploadFields, "", spec.remote == notRemote) checkFileHandlerWithFields(t, fh, uploadFields, "", spec.remote == notRemote)
require.True(t, testhelper.WaitForLogEvent(logHook))
entries := logHook.AllEntries()
require.Equal(t, 1, len(entries))
msg := entries[0].Message
require.Contains(t, msg, "saved file")
require.Contains(t, msg, fmt.Sprintf("client_mode=%s", expectedClientMode))
}) })
} }
} }
func TestSaveFileWithWorkhorseClient(t *testing.T) { func TestSaveFileWithS3WorkhorseClient(t *testing.T) {
logHook := testhelper.SetupLogger()
s3Creds, s3Config, sess, ts := test.SetupS3(t, "") s3Creds, s3Config, sess, ts := test.SetupS3(t, "")
defer ts.Close() defer ts.Close()
...@@ -299,6 +320,71 @@ func TestSaveFileWithWorkhorseClient(t *testing.T) { ...@@ -299,6 +320,71 @@ func TestSaveFileWithWorkhorseClient(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent) test.S3ObjectExists(t, sess, s3Config, remoteObject, test.ObjectContent)
require.True(t, testhelper.WaitForLogEvent(logHook))
entries := logHook.AllEntries()
require.Equal(t, 1, len(entries))
msg := entries[0].Message
require.Contains(t, msg, "saved file")
require.Contains(t, msg, "client_mode=s3")
}
func TestSaveFileWithAzureWorkhorseClient(t *testing.T) {
logHook := testhelper.SetupLogger()
mux, bucketDir, cleanup := test.SetupGoCloudFileBucket(t, "azblob")
defer cleanup()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
Provider: "AzureRM",
URLMux: mux,
GoCloudConfig: config.GoCloudConfig{URL: "azblob://test-container"},
},
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.NoError(t, err)
test.GoCloudObjectExists(t, bucketDir, remoteObject)
require.True(t, testhelper.WaitForLogEvent(logHook))
entries := logHook.AllEntries()
require.Equal(t, 1, len(entries))
msg := entries[0].Message
require.Contains(t, msg, "saved file")
require.Contains(t, msg, "client_mode=\"go_cloud:AzureRM\"")
}
func TestSaveFileWithUnknownGoCloudScheme(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mux := new(blob.URLMux)
remoteObject := "tmp/test-file/1"
opts := filestore.SaveFileOpts{
RemoteID: "test-file",
Deadline: testDeadline(),
UseWorkhorseClient: true,
RemoteTempObjectID: remoteObject,
ObjectStorageConfig: filestore.ObjectStorageConfig{
Provider: "SomeCloud",
URLMux: mux,
GoCloudConfig: config.GoCloudConfig{URL: "foo://test-container"},
},
}
_, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts)
require.Error(t, err)
} }
func TestSaveMultipartInBodyFailure(t *testing.T) { func TestSaveMultipartInBodyFailure(t *testing.T) {
......
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"strings" "strings"
"time" "time"
"gocloud.dev/blob"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config" "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
) )
...@@ -16,6 +18,12 @@ type ObjectStorageConfig struct { ...@@ -16,6 +18,12 @@ type ObjectStorageConfig struct {
S3Credentials config.S3Credentials S3Credentials config.S3Credentials
S3Config config.S3Config S3Config config.S3Config
// GoCloud mux that maps azureblob:// and future URLs (e.g. s3://, gcs://, etc.) to a handler
URLMux *blob.URLMux
// Azure credentials are registered at startup in the GoCloud URLMux, so only the container name is needed
GoCloudConfig config.GoCloudConfig
} }
// SaveFileOpts represents all the options available for saving a file to object store // SaveFileOpts represents all the options available for saving a file to object store
...@@ -66,7 +74,7 @@ func (s *SaveFileOpts) IsLocal() bool { ...@@ -66,7 +74,7 @@ func (s *SaveFileOpts) IsLocal() bool {
// IsRemote checks if the options requires a remote upload // IsRemote checks if the options requires a remote upload
func (s *SaveFileOpts) IsRemote() bool { func (s *SaveFileOpts) IsRemote() bool {
return s.PresignedPut != "" || s.IsMultipart() return s.PresignedPut != "" || s.IsMultipart() || s.UseWorkhorseClient
} }
// IsMultipart checks if the options requires a Multipart upload // IsMultipart checks if the options requires a Multipart upload
...@@ -97,6 +105,7 @@ func GetOpts(apiResponse *api.Response) *SaveFileOpts { ...@@ -97,6 +105,7 @@ func GetOpts(apiResponse *api.Response) *SaveFileOpts {
if opts.UseWorkhorseClient && objectStorageParams != nil { if opts.UseWorkhorseClient && objectStorageParams != nil {
opts.ObjectStorageConfig.Provider = objectStorageParams.Provider opts.ObjectStorageConfig.Provider = objectStorageParams.Provider
opts.ObjectStorageConfig.S3Config = objectStorageParams.S3Config opts.ObjectStorageConfig.S3Config = objectStorageParams.S3Config
opts.ObjectStorageConfig.GoCloudConfig = objectStorageParams.GoCloudConfig
} }
// Backwards compatibility to ensure API servers that do not include the // Backwards compatibility to ensure API servers that do not include the
...@@ -120,11 +129,28 @@ func (c *ObjectStorageConfig) IsAWS() bool { ...@@ -120,11 +129,28 @@ func (c *ObjectStorageConfig) IsAWS() bool {
return strings.EqualFold(c.Provider, "AWS") || strings.EqualFold(c.Provider, "S3") return strings.EqualFold(c.Provider, "AWS") || strings.EqualFold(c.Provider, "S3")
} }
func (c *ObjectStorageConfig) IsAzure() bool {
return strings.EqualFold(c.Provider, "AzureRM")
}
func (c *ObjectStorageConfig) IsGoCloud() bool {
return c.GoCloudConfig.URL != ""
}
func (c *ObjectStorageConfig) IsValid() bool { func (c *ObjectStorageConfig) IsValid() bool {
return c.S3Config.Bucket != "" && c.S3Config.Region != "" && c.credentialsValid() if c.IsAWS() {
return c.S3Config.Bucket != "" && c.S3Config.Region != "" && c.s3CredentialsValid()
} else if c.IsGoCloud() {
// We could parse and validate the URL, but GoCloud providers
// such as AzureRM don't have a fallback to normal HTTP, so we
// always want to try the GoCloud path if there is a URL.
return true
}
return false
} }
func (c *ObjectStorageConfig) credentialsValid() bool { func (c *ObjectStorageConfig) s3CredentialsValid() bool {
// We need to be able to distinguish between two cases of AWS access: // We need to be able to distinguish between two cases of AWS access:
// 1. AWS access via key and secret, but credentials not configured in Workhorse // 1. AWS access via key and secret, but credentials not configured in Workhorse
// 2. IAM instance profiles used // 2. IAM instance profiles used
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config" "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
) )
func TestSaveFileOptsLocalAndRemote(t *testing.T) { func TestSaveFileOptsLocalAndRemote(t *testing.T) {
...@@ -269,6 +270,66 @@ func TestUseWorkhorseClientEnabled(t *testing.T) { ...@@ -269,6 +270,66 @@ func TestUseWorkhorseClientEnabled(t *testing.T) {
require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID) require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient) require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient)
require.Equal(t, test.expected, opts.UseWorkhorseClientEnabled()) require.Equal(t, test.expected, opts.UseWorkhorseClientEnabled())
require.Equal(t, test.UseWorkhorseClient, opts.IsRemote())
})
}
}
func TestGoCloudConfig(t *testing.T) {
mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azblob")
defer cleanup()
tests := []struct {
name string
provider string
url string
valid bool
}{
{
name: "valid AzureRM config",
provider: "AzureRM",
url: "azblob:://test-container",
valid: true,
},
{
name: "invalid GoCloud scheme",
provider: "AzureRM",
url: "unknown:://test-container",
valid: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
apiResponse := &api.Response{
TempPath: "/tmp",
RemoteObject: api.RemoteObject{
Timeout: 10,
ID: "id",
UseWorkhorseClient: true,
RemoteTempObjectID: "test-object",
ObjectStorage: &api.ObjectStorageParams{
Provider: test.provider,
GoCloudConfig: config.GoCloudConfig{
URL: test.url,
},
},
},
}
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
opts := filestore.GetOpts(apiResponse)
opts.ObjectStorageConfig.URLMux = mux
require.Equal(t, apiResponse.TempPath, opts.LocalTempPath)
require.Equal(t, apiResponse.RemoteObject.RemoteTempObjectID, opts.RemoteTempObjectID)
require.WithinDuration(t, deadline, opts.Deadline, time.Second)
require.Equal(t, apiResponse.RemoteObject.ID, opts.RemoteID)
require.Equal(t, apiResponse.RemoteObject.UseWorkhorseClient, opts.UseWorkhorseClient)
require.Equal(t, test.provider, opts.ObjectStorageConfig.Provider)
require.Equal(t, apiResponse.RemoteObject.ObjectStorage.GoCloudConfig, opts.ObjectStorageConfig.GoCloudConfig)
require.True(t, opts.UseWorkhorseClientEnabled())
require.Equal(t, test.valid, opts.ObjectStorageConfig.IsValid())
require.True(t, opts.IsRemote())
}) })
} }
} }
package objectstore
import (
"context"
"io"
"time"
"gitlab.com/gitlab-org/labkit/log"
"gocloud.dev/blob"
)
type GoCloudObject struct {
bucket *blob.Bucket
mux *blob.URLMux
bucketURL string
objectName string
uploader
}
type GoCloudObjectParams struct {
Ctx context.Context
Mux *blob.URLMux
BucketURL string
ObjectName string
Deadline time.Time
}
func NewGoCloudObject(p *GoCloudObjectParams) (*GoCloudObject, error) {
bucket, err := p.Mux.OpenBucket(p.Ctx, p.BucketURL)
if err != nil {
return nil, err
}
o := &GoCloudObject{
bucket: bucket,
mux: p.Mux,
bucketURL: p.BucketURL,
objectName: p.ObjectName,
}
o.uploader = newUploader(o)
o.Execute(p.Ctx, p.Deadline)
return o, nil
}
func (o *GoCloudObject) Upload(ctx context.Context, r io.Reader) error {
defer o.bucket.Close()
writer, err := o.bucket.NewWriter(ctx, o.objectName, nil)
if err != nil {
log.ContextLogger(ctx).WithError(err).Error("error creating GoCloud bucket")
return err
}
if _, err = io.Copy(writer, r); err != nil {
log.ContextLogger(ctx).WithError(err).Error("error writing to GoCloud bucket")
writer.Close()
return err
}
if err := writer.Close(); err != nil {
log.ContextLogger(ctx).WithError(err).Error("error closing GoCloud bucket")
return err
}
return nil
}
func (o *GoCloudObject) ETag() string {
return ""
}
func (o *GoCloudObject) Abort() {
o.Delete()
}
// Delete will always attempt to delete the temporary file.
// According to https://github.com/google/go-cloud/blob/7818961b5c9a112f7e092d3a2d8479cbca80d187/blob/azureblob/azureblob.go#L881-L883,
// if the writer is closed before any Write is called, Close will create an empty file.
func (o *GoCloudObject) Delete() {
if o.bucketURL == "" || o.objectName == "" {
return
}
// Note we can't use the request context because in a successful
// case, the original request has already completed.
deleteCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // lint:allow context.Background
defer cancel()
bucket, err := o.mux.OpenBucket(deleteCtx, o.bucketURL)
if err != nil {
log.WithError(err).Error("error opening bucket for delete")
return
}
if err := bucket.Delete(deleteCtx, o.objectName); err != nil {
log.WithError(err).Error("error deleting object", err)
}
}
package objectstore_test
import (
"context"
"fmt"
"io"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/objectstore/test"
)
func TestGoCloudObjectUpload(t *testing.T) {
mux, _, cleanup := test.SetupGoCloudFileBucket(t, "azuretest")
defer cleanup()
ctx, cancel := context.WithCancel(context.Background())
deadline := time.Now().Add(testTimeout)
objectName := "test.png"
testURL := "azuretest://azure.example.com/test-container"
p := &objectstore.GoCloudObjectParams{Ctx: ctx, Mux: mux, BucketURL: testURL, ObjectName: objectName, Deadline: deadline}
object, err := objectstore.NewGoCloudObject(p)
require.NotNil(t, object)
require.NoError(t, err)
// copy data
n, err := io.Copy(object, strings.NewReader(test.ObjectContent))
require.NoError(t, err)
require.Equal(t, test.ObjectSize, n, "Uploaded file mismatch")
// close HTTP stream
err = object.Close()
require.NoError(t, err)
bucket, err := mux.OpenBucket(ctx, testURL)
require.NoError(t, err)
// Verify the data was copied correctly.
received, err := bucket.ReadAll(ctx, objectName)
require.NoError(t, err)
require.Equal(t, []byte(test.ObjectContent), received)
cancel()
deleted := false
retry(3, time.Second, func() error {
exists, err := bucket.Exists(ctx, objectName)
require.NoError(t, err)
if exists {
return fmt.Errorf("file %s is still present, retrying", objectName)
} else {
deleted = true
return nil
}
})
require.True(t, deleted)
}
package test
import (
"context"
"io/ioutil"
"net/url"
"os"
"testing"
"github.com/stretchr/testify/require"
"gocloud.dev/blob"
"gocloud.dev/blob/fileblob"
)
type dirOpener struct {
u *url.URL // last url passed to OpenBucketURL
tmpDir string
}
func (o *dirOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) {
o.u = u
return fileblob.OpenBucket(o.tmpDir, nil)
}
func SetupGoCloudFileBucket(t *testing.T, scheme string) (m *blob.URLMux, bucketDir string, cleanup func()) {
tmpDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
mux := new(blob.URLMux)
fake := &dirOpener{tmpDir: tmpDir}
mux.RegisterBucket(scheme, fake)
cleanup = func() {
os.RemoveAll(tmpDir)
}
return mux, tmpDir, cleanup
}
func GoCloudObjectExists(t *testing.T, bucketDir string, objectName string) {
bucket, err := fileblob.OpenBucket(bucketDir, nil)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background()) // lint:allow context.Background
defer cancel()
exists, err := bucket.Exists(ctx, objectName)
require.NoError(t, err)
require.True(t, exists)
}
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"io" "io"
"strings" "strings"
"time" "time"
"gitlab.com/gitlab-org/labkit/log"
) )
// Upload represents an upload to an ObjectStorage provider // Upload represents an upload to an ObjectStorage provider
...@@ -123,6 +125,8 @@ func (u *uploader) Execute(ctx context.Context, deadline time.Time) { ...@@ -123,6 +125,8 @@ func (u *uploader) Execute(ctx context.Context, deadline time.Time) {
if u.md5 != nil { if u.md5 != nil {
err := compareMD5(u.md5Sum(), u.etag) err := compareMD5(u.md5Sum(), u.etag)
if err != nil { if err != nil {
log.ContextLogger(ctx).WithError(err).Error("error comparing MD5 checksum")
u.uploadError = err u.uploadError = err
if u.metrics { if u.metrics {
objectStorageUploadRequestsRequestFailed.Inc() objectStorageUploadRequestsRequestFailed.Inc()
......
...@@ -15,8 +15,11 @@ import ( ...@@ -15,8 +15,11 @@ import (
"runtime" "runtime"
"strings" "strings"
"testing" "testing"
"time"
"github.com/dgrijalva/jwt-go" "github.com/dgrijalva/jwt-go"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/labkit/log" "gitlab.com/gitlab-org/labkit/log"
...@@ -207,3 +210,25 @@ type UploadClaims struct { ...@@ -207,3 +210,25 @@ type UploadClaims struct {
Upload map[string]string `json:"upload"` Upload map[string]string `json:"upload"`
jwt.StandardClaims jwt.StandardClaims
} }
func SetupLogger() *test.Hook {
logger, hook := test.NewNullLogger()
logrus.SetOutput(logger.Writer())
return hook
}
// logrus fires a Goroutine to write the output log, but there's no way to
// flush all outstanding hooks to fire. We just wait up to a second
// for an event to appear.
func WaitForLogEvent(hook *test.Hook) bool {
for i := 0; i < 10; i++ {
if entry := hook.LastEntry(); entry != nil {
return true
}
time.Sleep(100 * time.Millisecond)
}
return false
}
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
) )
type ObjectStoragePreparer struct { type ObjectStoragePreparer struct {
config config.ObjectStorageConfig
credentials config.ObjectStorageCredentials credentials config.ObjectStorageCredentials
} }
...@@ -17,11 +18,12 @@ func NewObjectStoragePreparer(c config.Config) Preparer { ...@@ -17,11 +18,12 @@ func NewObjectStoragePreparer(c config.Config) Preparer {
creds = &config.ObjectStorageCredentials{} creds = &config.ObjectStorageCredentials{}
} }
return &ObjectStoragePreparer{credentials: *creds} return &ObjectStoragePreparer{credentials: *creds, config: c.ObjectStorageConfig}
} }
func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) { func (p *ObjectStoragePreparer) Prepare(a *api.Response) (*filestore.SaveFileOpts, Verifier, error) {
opts := filestore.GetOpts(a) opts := filestore.GetOpts(a)
opts.ObjectStorageConfig.URLMux = p.config.URLMux
opts.ObjectStorageConfig.S3Credentials = p.credentials.S3Credentials opts.ObjectStorageConfig.S3Credentials = p.credentials.S3Credentials
return opts, nil, nil return opts, nil, nil
......
...@@ -3,6 +3,8 @@ package upload_test ...@@ -3,6 +3,8 @@ package upload_test
import ( import (
"testing" "testing"
"gocloud.dev/blob"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config" "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upload" "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload"
...@@ -21,6 +23,9 @@ func TestPrepareWithS3Config(t *testing.T) { ...@@ -21,6 +23,9 @@ func TestPrepareWithS3Config(t *testing.T) {
Provider: "AWS", Provider: "AWS",
S3Credentials: creds, S3Credentials: creds,
}, },
ObjectStorageConfig: config.ObjectStorageConfig{
URLMux: new(blob.URLMux),
},
} }
r := &api.Response{ r := &api.Response{
...@@ -39,6 +44,7 @@ func TestPrepareWithS3Config(t *testing.T) { ...@@ -39,6 +44,7 @@ func TestPrepareWithS3Config(t *testing.T) {
require.True(t, opts.ObjectStorageConfig.IsAWS()) require.True(t, opts.ObjectStorageConfig.IsAWS())
require.True(t, opts.UseWorkhorseClient) require.True(t, opts.UseWorkhorseClient)
require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials) require.Equal(t, creds, opts.ObjectStorageConfig.S3Credentials)
require.NotNil(t, opts.ObjectStorageConfig.URLMux)
require.Equal(t, nil, v) require.Equal(t, nil, v)
} }
...@@ -50,5 +56,6 @@ func TestPrepareWithNoConfig(t *testing.T) { ...@@ -50,5 +56,6 @@ func TestPrepareWithNoConfig(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.False(t, opts.UseWorkhorseClient) require.False(t, opts.UseWorkhorseClient)
require.Equal(t, nil, v) require.Nil(t, v)
require.Nil(t, opts.ObjectStorageConfig.URLMux)
} }
...@@ -172,6 +172,11 @@ func main() { ...@@ -172,6 +172,11 @@ func main() {
redis.Configure(cfg.Redis, redis.DefaultDialFunc) redis.Configure(cfg.Redis, redis.DefaultDialFunc)
go redis.Process() go redis.Process()
} }
err = cfg.RegisterGoCloudURLOpeners()
if err != nil {
log.WithError(err).Fatal("could not load cloud credentials")
}
} }
accessLogger, accessCloser, err := getAccessLogger(logConfig) accessLogger, accessCloser, err := getAccessLogger(logConfig)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment