Commit ecde60ad authored by Quentin Smith's avatar Quentin Smith

storage: reuse transaction for uploads

Reuse a single transaction for all INSERT statements for a single
upload. This cuts about 50% of the runtime off an invocation of
the tests with -cloud, and about 25% of the runtime off an /upload run
on App Engine.

Change-Id: I90068f60420cfb67ae693f18ed83b341c2e483bd
Reviewed-on: https://go-review.googlesource.com/35069Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent 72f847b9
...@@ -99,6 +99,11 @@ func (a *App) processUpload(ctx context.Context, user string, mr *multipart.Read ...@@ -99,6 +99,11 @@ func (a *App) processUpload(ctx context.Context, user string, mr *multipart.Read
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer func() {
if upload != nil {
upload.Abort()
}
}()
} }
// The incoming file needs to be stored in Cloud // The incoming file needs to be stored in Cloud
...@@ -121,11 +126,20 @@ func (a *App) processUpload(ctx context.Context, user string, mr *multipart.Read ...@@ -121,11 +126,20 @@ func (a *App) processUpload(ctx context.Context, user string, mr *multipart.Read
fileids = append(fileids, meta["fileid"]) fileids = append(fileids, meta["fileid"])
} }
if upload == nil {
return nil, errors.New("no files processed")
}
if err := upload.Commit(); err != nil {
return nil, err
}
status := &uploadStatus{UploadID: upload.ID, FileIDs: fileids} status := &uploadStatus{UploadID: upload.ID, FileIDs: fileids}
if a.ViewURLBase != "" { if a.ViewURLBase != "" {
status.ViewURL = a.ViewURLBase + url.QueryEscape(upload.ID) status.ViewURL = a.ViewURLBase + url.QueryEscape(upload.ID)
} }
upload = nil
return status, nil return status, nil
} }
......
...@@ -145,12 +145,14 @@ type Upload struct { ...@@ -145,12 +145,14 @@ type Upload struct {
recordid int64 recordid int64
// db is the underlying database that this upload is going to. // db is the underlying database that this upload is going to.
db *DB db *DB
// tx is the transaction used by the upload.
tx *sql.Tx
} }
// NewUpload returns an upload for storing new files. // NewUpload returns an upload for storing new files.
// All records written to the Upload will have the same upload ID. // All records written to the Upload will have the same upload ID.
func (db *DB) NewUpload(ctx context.Context) (*Upload, error) { func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
// TODO(quentin): Use a transaction? // TODO(quentin): Use the same transaction as the rest of the upload?
res, err := db.insertUpload.Exec() res, err := db.insertUpload.Exec()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -160,33 +162,27 @@ func (db *DB) NewUpload(ctx context.Context) (*Upload, error) { ...@@ -160,33 +162,27 @@ func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
tx, err := db.sql.Begin()
if err != nil {
return nil, err
}
return &Upload{ return &Upload{
ID: fmt.Sprint(i), ID: fmt.Sprint(i),
id: i, id: i,
db: db, db: db,
tx: tx,
}, nil }, nil
} }
// InsertRecord inserts a single record in an existing upload. // InsertRecord inserts a single record in an existing upload.
func (u *Upload) InsertRecord(r *benchfmt.Result) (err error) { // If InsertRecord returns a non-nil error, the Upload has failed and u.Abort() must be called.
// TODO(quentin): Use a single transaction for the whole upload? func (u *Upload) InsertRecord(r *benchfmt.Result) error {
tx, err := u.db.sql.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
} else {
err = tx.Commit()
}
}()
// TODO(quentin): Support multiple lines (slice of results?) // TODO(quentin): Support multiple lines (slice of results?)
var buf bytes.Buffer var buf bytes.Buffer
if err := benchfmt.NewPrinter(&buf).Print(r); err != nil { if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
return err return err
} }
if _, err = tx.Stmt(u.db.insertRecord).Exec(u.id, u.recordid, buf.Bytes()); err != nil { if _, err := u.tx.Stmt(u.db.insertRecord).Exec(u.id, u.recordid, buf.Bytes()); err != nil {
return err return err
} }
var args []interface{} var args []interface{}
...@@ -199,7 +195,7 @@ func (u *Upload) InsertRecord(r *benchfmt.Result) (err error) { ...@@ -199,7 +195,7 @@ func (u *Upload) InsertRecord(r *benchfmt.Result) (err error) {
if len(args) > 0 { if len(args) > 0 {
query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4) query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4)
query = strings.TrimSuffix(query, ", ") query = strings.TrimSuffix(query, ", ")
if _, err := tx.Exec(query, args...); err != nil { if _, err := u.tx.Exec(query, args...); err != nil {
return err return err
} }
} }
...@@ -207,6 +203,17 @@ func (u *Upload) InsertRecord(r *benchfmt.Result) (err error) { ...@@ -207,6 +203,17 @@ func (u *Upload) InsertRecord(r *benchfmt.Result) (err error) {
return nil return nil
} }
// Commit finishes processing the upload.
func (u *Upload) Commit() error {
return u.tx.Commit()
}
// Abort cleans up resources associated with the upload.
// It does not attempt to clean up partial database state.
func (u *Upload) Abort() error {
return u.tx.Rollback()
}
// Query searches for results matching the given query string. // Query searches for results matching the given query string.
// //
// The query string is first parsed into quoted words (as in the shell) // The query string is first parsed into quoted words (as in the shell)
......
...@@ -60,6 +60,10 @@ BenchmarkName 1 ns/op ...@@ -60,6 +60,10 @@ BenchmarkName 1 ns/op
t.Fatalf("InsertRecord: %v", err) t.Fatalf("InsertRecord: %v", err)
} }
if err := u.Commit(); err != nil {
t.Fatalf("Commit: %v", err)
}
rows, err := DBSQL(db).Query("SELECT UploadId, RecordId, Name, Value FROM RecordLabels") rows, err := DBSQL(db).Query("SELECT UploadId, RecordId, Name, Value FROM RecordLabels")
if err != nil { if err != nil {
t.Fatalf("sql.Query: %v", err) t.Fatalf("sql.Query: %v", err)
...@@ -119,6 +123,9 @@ func TestQuery(t *testing.T) { ...@@ -119,6 +123,9 @@ func TestQuery(t *testing.T) {
t.Fatalf("InsertRecord: %v", err) t.Fatalf("InsertRecord: %v", err)
} }
} }
if err := u.Commit(); err != nil {
t.Fatalf("Commit: %v", err)
}
tests := []struct { tests := []struct {
q string q string
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment