Commit b5a8ccb9 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'sh-refactor-uploaders' into 'master'

Refactor uploaders to use different upload strategies

See merge request gitlab-org/gitlab-workhorse!553
parents 2278438e cb807306
title: Refactor uploaders to use different upload strategies
merge_request: 553
type: other
......@@ -22,12 +22,16 @@ var ErrNotEnoughParts = errors.New("not enough Parts")
// Multipart represents a MultipartUpload on a S3 compatible Object Store service.
// It can be used as io.WriteCloser for uploading an object
type Multipart struct {
PartURLs []string
// CompleteURL is a presigned URL for CompleteMultipartUpload
CompleteURL string
// AbortURL is a presigned URL for AbortMultipartUpload
AbortURL string
// DeleteURL is a presigned URL for RemoveObject
DeleteURL string
PutHeaders map[string]string
partSize int64
etag string
......@@ -36,35 +40,28 @@ type Multipart struct {
// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, deadline time.Time, partSize int64) (*Multipart, error) {
pr, pw := io.Pipe()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
m := &Multipart{
PartURLs: partURLs,
CompleteURL: completeURL,
AbortURL: abortURL,
DeleteURL: deleteURL,
uploader: newUploader(uploadCtx, pw),
PutHeaders: putHeaders,
partSize: partSize,
go m.trackUploadTime()
go m.cleanup(ctx)
m.uploader = newUploader(m)
m.Execute(ctx, deadline)
go func() {
defer cancelFn()
defer objectStorageUploadsOpen.Dec()
defer func() {
// This will be returned as error to the next write operation on the pipe
return m, nil
func (m *Multipart) Upload(ctx context.Context, r io.Reader) error {
cmu := &CompleteMultipartUpload{}
for i, partURL := range partURLs {
src := io.LimitReader(pr, partSize)
part, err := m.readAndUploadOnePart(partURL, putHeaders, src, i+1)
for i, partURL := range m.PartURLs {
src := io.LimitReader(r, m.partSize)
part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1)
if err != nil {
m.uploadError = err
return err
if part == nil {
......@@ -73,93 +70,33 @@ func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL,
n, err := io.Copy(ioutil.Discard, pr)
n, err := io.Copy(ioutil.Discard, r)
if err != nil {
m.uploadError = fmt.Errorf("drain pipe: %v", err)
return fmt.Errorf("drain pipe: %v", err)
if n > 0 {
m.uploadError = ErrNotEnoughParts
return ErrNotEnoughParts
if err := m.complete(cmu); err != nil {
m.uploadError = err
if err := m.complete(ctx, cmu); err != nil {
return err
return m, nil
return nil
func (m *Multipart) trackUploadTime() {
started := time.Now()
func (m *Multipart) ETag() string {
return m.etag
func (m *Multipart) cleanup(ctx context.Context) {
// wait for the upload to finish
if m.uploadError != nil {
// We have now successfully uploaded the file to object storage. Another
// goroutine will hand off the object to gitlab-rails.
// gitlab-rails is now done with the object so it's time to delete it.
func (m *Multipart) Abort() {
func (m *Multipart) complete(cmu *CompleteMultipartUpload) error {
body, err := xml.Marshal(cmu)
if err != nil {
return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err)
req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create CompleteMultipartUpload request: %v", err)
req.ContentLength = int64(len(body))
req.Header.Set("Content-Type", "application/xml")
req = req.WithContext(m.ctx)
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status)
result := &compoundCompleteMultipartUploadResult{}
decoder := xml.NewDecoder(resp.Body)
if err := decoder.Decode(&result); err != nil {
return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err)
if result.isError() {
return result
if result.CompleteMultipartUploadResult == nil {
return fmt.Errorf("empty CompleteMultipartUploadResult")
return nil
func (m *Multipart) Delete() {
func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
file, err := ioutil.TempFile("", "part-buffer")
if err != nil {
return nil, fmt.Errorf("create temporary buffer file: %v", err)
......@@ -182,20 +119,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]s
return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err)
etag, err := m.uploadPart(partURL, putHeaders, file, n)
etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n)
if err != nil {
return nil, fmt.Errorf("upload part %d: %v", partNumber, err)
return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Reader, size int64) (string, error) {
deadline, ok := m.ctx.Deadline()
func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) {
deadline, ok := ctx.Deadline()
if !ok {
return "", fmt.Errorf("missing deadline")
part, err := newObject(m.ctx, url, "", headers, deadline, size, false)
part, err := newObject(ctx, url, "", headers, deadline, size, false)
if err != nil {
return "", err
......@@ -213,10 +150,45 @@ func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Re
return part.ETag(), nil
func (m *Multipart) delete() {
func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error {
body, err := xml.Marshal(cmu)
if err != nil {
return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err)
req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create CompleteMultipartUpload request: %v", err)
req.ContentLength = int64(len(body))
req.Header.Set("Content-Type", "application/xml")
req = req.WithContext(ctx)
func (m *Multipart) abort() {
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status)
result := &compoundCompleteMultipartUploadResult{}
decoder := xml.NewDecoder(resp.Body)
if err := decoder.Decode(&result); err != nil {
return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err)
if result.isError() {
return result
if result.CompleteMultipartUploadResult == nil {
return fmt.Errorf("empty CompleteMultipartUploadResult")
m.etag = extractETag(result.ETag)
return nil
......@@ -7,7 +7,6 @@ import (
......@@ -36,109 +35,82 @@ var httpClient = &http.Client{
Transport: httpTransport,
type StatusCodeError error
// Object represents an object on a S3 compatible Object Store service.
// It can be used as io.WriteCloser for uploading an object
type Object struct {
// PutURL is a presigned URL for PutObject
PutURL string
// DeleteURL is a presigned URL for RemoveObject
DeleteURL string
// putURL is a presigned URL for PutObject
putURL string
// deleteURL is a presigned URL for RemoveObject
deleteURL string
putHeaders map[string]string
size int64
etag string
metrics bool
type StatusCodeError error
// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
func NewObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64) (*Object, error) {
return newObject(ctx, putURL, deleteURL, putHeaders, deadline, size, true)
func newObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64, metrics bool) (*Object, error) {
started := time.Now()
pr, pw := io.Pipe()
// we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
req, err := http.NewRequest(http.MethodPut, putURL, ioutil.NopCloser(pr))
if err != nil {
if metrics {
return nil, fmt.Errorf("PUT %q: %v", mask.URL(putURL), err)
req.ContentLength = size
for k, v := range putHeaders {
req.Header.Set(k, v)
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &Object{
PutURL: putURL,
DeleteURL: deleteURL,
uploader: newMD5Uploader(uploadCtx, pw),
putURL: putURL,
deleteURL: deleteURL,
putHeaders: putHeaders,
size: size,
metrics: metrics,
if metrics {
o.uploader = newMD5Uploader(o, metrics)
o.Execute(ctx, deadline)
go func() {
// wait for the upload to finish
if metrics {
return o, nil
// wait for provided context to finish before performing cleanup
func (o *Object) Upload(ctx context.Context, r io.Reader) error {
// we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
req, err := http.NewRequest(http.MethodPut, o.putURL, ioutil.NopCloser(r))
go func() {
defer cancelFn()
if metrics {
defer objectStorageUploadsOpen.Dec()
if err != nil {
return fmt.Errorf("PUT %q: %v", mask.URL(o.putURL), err)
defer func() {
// This will be returned as error to the next write operation on the pipe
req.ContentLength = o.size
req = req.WithContext(o.ctx)
for k, v := range o.putHeaders {
req.Header.Set(k, v)
resp, err := httpClient.Do(req)
if err != nil {
if metrics {
o.uploadError = fmt.Errorf("PUT request %q: %v", mask.URL(o.PutURL), err)
return fmt.Errorf("PUT request %q: %v", mask.URL(o.putURL), err)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
if metrics {
if o.metrics {
o.uploadError = StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.PutURL), resp.Status))
return StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.putURL), resp.Status))
o.uploadError = compareMD5(o.md5Sum(), o.etag)
o.etag = extractETag(resp.Header.Get("ETag"))
return o, nil
return nil
func (o *Object) delete() {
func (o *Object) ETag() string {
return o.etag
func compareMD5(local, remote string) error {
if !strings.EqualFold(local, remote) {
return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote)
func (o *Object) Abort() {
return nil
func (o *Object) Delete() {
......@@ -5,23 +5,36 @@ import (
type S3Object struct {
credentials config.S3Credentials
config config.S3Config
objectName string
uploaded bool
func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) {
o := &S3Object{
credentials: s3Credentials,
config: s3Config,
objectName: objectName,
o.uploader = newUploader(o)
o.Execute(ctx, deadline)
return o, nil
func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) {
if s3Config.ServerSideEncryption != "" {
input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption)
......@@ -32,88 +45,48 @@ func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config
func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) {
pr, pw := io.Pipe()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &S3Object{
uploader: newUploader(uploadCtx, pw),
credentials: s3Credentials,
config: s3Config,
go o.trackUploadTime()
go o.cleanup(ctx)
go func() {
defer cancelFn()
defer objectStorageUploadsOpen.Dec()
defer func() {
// This will be returned as error to the next write operation on the pipe
sess, err := setupS3Session(s3Credentials, s3Config)
func (s *S3Object) Upload(ctx context.Context, r io.Reader) error {
sess, err := setupS3Session(s.credentials, s.config)
if err != nil {
o.uploadError = err
log.WithError(err).Error("error creating S3 session")
return err
o.objectName = objectName
uploader := s3manager.NewUploader(sess)
input := &s3manager.UploadInput{
Bucket: aws.String(s3Config.Bucket),
Key: aws.String(objectName),
Body: pr,
Bucket: aws.String(s.config.Bucket),
Key: aws.String(s.objectName),
Body: r,
setEncryptionOptions(input, s3Config)
setEncryptionOptions(input, s.config)
_, err = uploader.UploadWithContext(uploadCtx, input)
_, err = uploader.UploadWithContext(ctx, input)
if err != nil {
o.uploadError = err
log.WithError(err).Error("error uploading S3 session")
return err
return o, nil
s.uploaded = true
func (o *S3Object) trackUploadTime() {
started := time.Now()
return nil
func (o *S3Object) cleanup(ctx context.Context) {
// wait for the upload to finish
if o.uploadError != nil {
// We have now successfully uploaded the file to object storage. Another
// goroutine will hand off the object to gitlab-rails.
func (s *S3Object) ETag() string {
return ""
// gitlab-rails is now done with the object so it's time to delete it.
func (s *S3Object) Abort() {
func (o *S3Object) delete() {
if o.objectName == "" {
func (s *S3Object) Delete() {
if !s.uploaded {
session, err := setupS3Session(o.credentials, o.config)
session, err := setupS3Session(s.credentials, s.config)
if err != nil {
log.WithError(err).Error("error setting up S3 session in delete")
......@@ -121,8 +94,8 @@ func (o *S3Object) delete() {
svc := s3.New(session)
input := &s3.DeleteObjectInput{
Bucket: aws.String(o.config.Bucket),
Key: aws.String(o.objectName),
Bucket: aws.String(s.config.Bucket),
Key: aws.String(s.objectName),
// Note we can't use the request context because in a successful
package objectstore
import (
type uploadStrategy interface {
Upload(ctx context.Context, r io.Reader) error
ETag() string
func deleteURL(url string) {
if url == "" {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
// TODO: consider adding the context to the outgoing request for better instrumentation
// here we are not using u.ctx because we must perform cleanup regardless of parent context
resp, err := httpClient.Do(req)
if err != nil {
log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
func extractETag(rawETag string) string {
if rawETag != "" && rawETag[0] == '"' {
rawETag = rawETag[1 : len(rawETag)-1]
return rawETag
......@@ -4,12 +4,11 @@ import (
// Upload represents an upload to an ObjectStorage provider
......@@ -33,16 +32,23 @@ type uploader struct {
uploadError error
// ctx is the internal context bound to the upload request
ctx context.Context
pr *io.PipeReader
pw *io.PipeWriter
strategy uploadStrategy
metrics bool
func newUploader(ctx context.Context, w io.WriteCloser) uploader {
return uploader{w: w, c: w, ctx: ctx}
func newUploader(strategy uploadStrategy) uploader {
pr, pw := io.Pipe()
return uploader{w: pw, c: pw, pr: pr, pw: pw, strategy: strategy, metrics: true}
func newMD5Uploader(ctx context.Context, w io.WriteCloser) uploader {
func newMD5Uploader(strategy uploadStrategy, metrics bool) uploader {
pr, pw := io.Pipe()
hasher := md5.New()
mw := io.MultiWriter(w, hasher)
return uploader{w: mw, c: w, md5: hasher, ctx: ctx}
mw := io.MultiWriter(pw, hasher)
return uploader{w: mw, c: pw, pr: pr, pw: pw, md5: hasher, strategy: strategy, metrics: metrics}
// Close implements the standard io.Closer interface: it closes the http client request.
......@@ -65,50 +71,100 @@ func (u *uploader) Write(p []byte) (int, error) {
return u.w.Write(p)
// syncAndDelete wait for Context to be Done and then performs the requested HTTP call
func (u *uploader) syncAndDelete(url string) {
if url == "" {
func (u *uploader) md5Sum() string {
if u.md5 == nil {
return ""
checksum := u.md5.Sum(nil)
return hex.EncodeToString(checksum)
// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header.
// This method will wait until upload context is done before returning.
func (u *uploader) ETag() string {
req, err := http.NewRequest("DELETE", url, nil)
return u.etag
func (u *uploader) Execute(ctx context.Context, deadline time.Time) {
if u.metrics {
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
u.ctx = uploadCtx
if u.metrics {
go u.trackUploadTime()
go u.cleanup(ctx)
go func() {
defer cancelFn()
if u.metrics {
defer objectStorageUploadsOpen.Dec()
defer func() {
// This will be returned as error to the next write operation on the pipe
err := u.strategy.Upload(uploadCtx,
if err != nil {
log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
u.uploadError = err
if u.metrics {
// TODO: consider adding the context to the outgoing request for better instrumentation
// here we are not using u.ctx because we must perform cleanup regardless of parent context
resp, err := httpClient.Do(req)
u.etag = u.strategy.ETag()
if u.md5 != nil {
err := compareMD5(u.md5Sum(), u.etag)
if err != nil {
log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed")
u.uploadError = err
if u.metrics {
func (u *uploader) extractETag(rawETag string) {
if rawETag != "" && rawETag[0] == '"' {
rawETag = rawETag[1 : len(rawETag)-1]
func (u *uploader) trackUploadTime() {
started := time.Now()
if u.metrics {
u.etag = rawETag
func (u *uploader) md5Sum() string {
if u.md5 == nil {
return ""
func (u *uploader) cleanup(ctx context.Context) {
// wait for the upload to finish
if u.uploadError != nil {
if u.metrics {
checksum := u.md5.Sum(nil)
return hex.EncodeToString(checksum)
// We have now successfully uploaded the file to object storage. Another
// goroutine will hand off the object to gitlab-rails.
// gitlab-rails is now done with the object so it's time to delete it.
// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header.
// This method will wait until upload context is done before returning.
func (u *uploader) ETag() string {
func compareMD5(local, remote string) error {
if !strings.EqualFold(local, remote) {
return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote)
return u.etag
return nil
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment