Commit 35a1a84a authored by Quentin Smith's avatar Quentin Smith

storage, cmd/benchsave: move upload logic into Client

Change-Id: Ie2cafb6ee0fe30860803cc332133fc1922857831
Reviewed-on: https://go-review.googlesource.com/38304
Run-TryBot: Quentin Smith <quentin@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent e2bd3136
......@@ -18,20 +18,19 @@ package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"mime"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"time"
"golang.org/x/oauth2"
"golang.org/x/perf/storage"
)
var (
......@@ -42,18 +41,9 @@ var (
const userAgent = "Benchsave/1.0"
type uploadStatus struct {
// UploadID is the upload ID assigned to the upload.
UploadID string `json:"uploadid"`
// FileIDs is the list of file IDs assigned to the files in the upload.
FileIDs []string `json:"fileids"`
// ViewURL is a server-supplied URL to view the results.
ViewURL string `json:"viewurl"`
}
// writeOneFile reads name and writes it to mpw.
func writeOneFile(mpw *multipart.Writer, name string, header []byte) error {
w, err := mpw.CreateFormFile("file", filepath.Base(name))
// writeOneFile reads name and writes it to u.
func writeOneFile(u *storage.Upload, name string, header []byte) error {
w, err := u.CreateFile(filepath.Base(name))
if err != nil {
return err
}
......@@ -108,49 +98,23 @@ func main() {
// Or they might need non-Google authentication.
hc := oauth2.NewClient(context.Background(), newTokenSource())
pr, pw := io.Pipe()
mpw := multipart.NewWriter(pw)
go func() {
defer pw.Close()
defer mpw.Close()
for _, name := range files {
if err := writeOneFile(mpw, name, headerData); err != nil {
log.Print(err)
mpw.WriteField("abort", "1")
// Writing the 'abort' field will cause the server to send back an error response,
// which will cause the main goroutine to below.
return
}
}
mpw.WriteField("commit", "1")
}()
client := &storage.Client{BaseURL: *server, HTTPClient: hc}
start := time.Now()
req, err := http.NewRequest("POST", *server+"/upload", pr)
if err != nil {
log.Fatalf("NewRequest failed: %v\n", err)
}
req.Header.Set("Content-Type", mpw.FormDataContentType())
req.Header.Set("User-Agent", userAgent)
resp, err := hc.Do(req)
if err != nil {
log.Fatalf("upload failed: %v\n", err)
}
defer resp.Body.Close()
u := client.NewUpload()
if resp.StatusCode != 200 {
log.Printf("upload failed: %v\n", resp.Status)
io.Copy(os.Stderr, resp.Body)
os.Exit(1)
for _, name := range files {
if err := writeOneFile(u, name, headerData); err != nil {
log.Print(err)
u.Abort()
return
}
}
status := &uploadStatus{}
if err := json.NewDecoder(resp.Body).Decode(status); err != nil {
log.Fatalf("cannot parse upload response: %v\n", err)
status, err := u.Commit()
if err != nil {
log.Fatalf("upload failed: %v\n", err)
}
if *verbose {
......
......@@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url"
......@@ -47,7 +48,6 @@ func (c *Client) Query(q string) *Query {
if err != nil {
return &Query{err: err}
}
if resp.StatusCode != 200 {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
......@@ -210,4 +210,131 @@ func (ul *UploadList) Close() error {
return ul.Err()
}
// TODO(quentin): Move upload code here from cmd/benchsave?
// NewUpload starts a new upload to the storage server.
// The upload must have Abort or Commit called on it.
// If the server requires authentication for uploads, c.HTTPClient should be set to the result of oauth2.NewClient.
func (c *Client) NewUpload() *Upload {
hc := c.httpClient()
pr, pw := io.Pipe()
mpw := multipart.NewWriter(pw)
req, err := http.NewRequest("POST", c.BaseURL+"/upload", pr)
if err != nil {
return &Upload{err: err}
}
req.Header.Set("Content-Type", mpw.FormDataContentType())
req.Header.Set("User-Agent", "golang.org/x/perf/storage")
errCh := make(chan error)
u := &Upload{pw: pw, mpw: mpw, errCh: errCh}
go func() {
resp, err := hc.Do(req)
if err != nil {
errCh <- err
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
errCh <- fmt.Errorf("upload failed: %v\n%s", resp.Status, body)
return
}
status := &UploadStatus{}
if err := json.NewDecoder(resp.Body).Decode(status); err != nil {
errCh <- err
}
u.status = status
errCh <- nil
}()
return u
}
// UploadStatus contains information about a successful upload.
type UploadStatus struct {
// UploadID is the upload ID assigned to the upload.
UploadID string `json:"uploadid"`
// FileIDs is the list of file IDs assigned to the files in the upload.
FileIDs []string `json:"fileids"`
// ViewURL is a server-supplied URL to view the results.
ViewURL string `json:"viewurl"`
}
// An Upload is an in-progress upload.
// Use CreateFile to upload one or more files, then call Commit or Abort.
//
// u := client.NewUpload()
// w, err := u.CreateFile()
// if err != nil {
// u.Abort()
// return err
// }
// fmt.Fprintf(w, "BenchmarkResult 1 1 ns/op\n")
// if err := u.Commit(); err != nil {
// return err
// }
type Upload struct {
pw io.WriteCloser
mpw *multipart.Writer
status *UploadStatus
// errCh is used to report the success/failure of the HTTP request
errCh chan error
// err is the first observed error; it is only accessed from user-called methods for thread safety
err error
}
// CreateFile creates a new upload with the given name.
// The Writer may be used until CreateFile is called again.
// name may be the empty string if the file does not have a name.
func (u *Upload) CreateFile(name string) (io.Writer, error) {
if u.err != nil {
return nil, u.err
}
return u.mpw.CreateFormFile("file", name)
}
// Commit attempts to commit the upload.
func (u *Upload) Commit() (*UploadStatus, error) {
if u.err != nil {
return nil, u.err
}
if u.err = u.mpw.WriteField("commit", "1"); u.err != nil {
u.Abort()
return nil, u.err
}
if u.err = u.mpw.Close(); u.err != nil {
u.Abort()
return nil, u.err
}
u.mpw = nil
if u.err = u.pw.Close(); u.err != nil {
u.Abort()
return nil, u.err
}
u.pw = nil
u.err = <-u.errCh
u.errCh = nil
if u.err != nil {
return nil, u.err
}
return u.status, nil
}
// Abort attempts to cancel the in-progress upload.
func (u *Upload) Abort() error {
if u.mpw != nil {
u.mpw.WriteField("abort", "1")
// Writing the 'abort' field will cause the server to send back an error response.
u.mpw.Close()
u.mpw = nil
}
if u.pw != nil {
u.pw.Close()
u.pw = nil
}
err := <-u.errCh
u.errCh = nil
if u.err == nil {
u.err = err
}
return u.err
}
......@@ -7,6 +7,8 @@ package storage
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
......@@ -90,3 +92,115 @@ func TestListUploads(t *testing.T) {
t.Fatalf("Err: %v", err)
}
}
func TestNewUpload(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if have, want := r.URL.RequestURI(), "/upload"; have != want {
t.Errorf("RequestURI = %q, want %q", have, want)
}
mr, err := r.MultipartReader()
if err != nil {
t.Error(err)
}
i := 0
for i = 0; ; i++ {
p, err := mr.NextPart()
if err == io.EOF {
break
}
name := p.FormName()
if name == "commit" {
continue
}
if name != "file" {
t.Errorf("unexpected field %q, want file", name)
}
if have, want := p.FileName(), fmt.Sprintf("want%d.txt", i); have != want {
t.Errorf("file name = %q, want %q", have, want)
}
content, _ := ioutil.ReadAll(p)
if have, want := string(content), "content"; have != want {
t.Errorf("unexpected content %q, want %q", have, want)
}
}
if i != 3 {
t.Errorf("number of files = %d, want %d", i, 3)
}
fmt.Fprintf(w, "%s\n", `{"uploadid": "id", "fileids": ["id/1", "id/2"]}`)
}))
defer ts.Close()
c := &Client{BaseURL: ts.URL}
u := c.NewUpload()
for i := 0; i < 2; i++ {
w, err := u.CreateFile(fmt.Sprintf("want%d.txt", i))
if err != nil {
t.Fatalf("CreateFile = %v", err)
}
if _, err := fmt.Fprintf(w, "content"); err != nil {
t.Fatalf("Write returned %v", err)
}
}
status, err := u.Commit()
if err != nil {
t.Errorf("Commit = %v", err)
}
if status.UploadID != "id" {
t.Errorf("status.UploadID = %q, want %q", status.UploadID, "id")
}
}
func TestNewUploadAbort(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if have, want := r.URL.RequestURI(), "/upload"; have != want {
t.Errorf("RequestURI = %q, want %q", have, want)
}
mr, err := r.MultipartReader()
if err != nil {
t.Error(err)
}
i := 0
for i = 0; ; i++ {
p, err := mr.NextPart()
if err == io.EOF {
break
}
name := p.FormName()
if name == "abort" {
continue
}
if name != "file" {
t.Errorf("unexpected field %q, want file or abort", name)
}
if have, want := p.FileName(), fmt.Sprintf("want%d.txt", i); have != want {
t.Errorf("file name = %q, want %q", have, want)
}
content, _ := ioutil.ReadAll(p)
if have, want := string(content), "content"; have != want {
t.Errorf("unexpected content %q, want %q", have, want)
}
}
if i != 3 {
t.Errorf("number of files = %d, want %d", i, 3)
}
fmt.Fprintf(w, "%s\n", `{"uploadid": "id", "fileids": ["id/1", "id/2"]}`)
}))
defer ts.Close()
c := &Client{BaseURL: ts.URL}
u := c.NewUpload()
for i := 0; i < 2; i++ {
w, err := u.CreateFile(fmt.Sprintf("want%d.txt", i))
if err != nil {
t.Fatalf("CreateFile = %v", err)
}
if _, err := fmt.Fprintf(w, "content"); err != nil {
t.Fatalf("Write returned %v", err)
}
}
if err := u.Abort(); err != nil {
t.Errorf("Abort = %v", err)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment