Commit a02e8554 authored by Quentin Smith's avatar Quentin Smith

storage/db: add ReplaceUpload for use in reindexing

ReplaceUpload removes any records for an upload so that those records
can be reinserted in the database.

Change-Id: I8fa0701b72c3ace380d3c7922df0c17b81a0d426
Reviewed-on: https://go-review.googlesource.com/35257Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent 4b028980
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"regexp"
"strconv" "strconv"
"strings" "strings"
"text/template" "text/template"
...@@ -32,6 +33,8 @@ type DB struct { ...@@ -32,6 +33,8 @@ type DB struct {
lastUpload *sql.Stmt lastUpload *sql.Stmt
insertUpload *sql.Stmt insertUpload *sql.Stmt
insertRecord *sql.Stmt insertRecord *sql.Stmt
checkUpload *sql.Stmt
deleteRecords *sql.Stmt
} }
// OpenSQL creates a DB backed by a SQL database. The parameters are // OpenSQL creates a DB backed by a SQL database. The parameters are
...@@ -142,6 +145,14 @@ func (db *DB) prepareStatements(driverName string) error { ...@@ -142,6 +145,14 @@ func (db *DB) prepareStatements(driverName string) error {
if err != nil { if err != nil {
return err return err
} }
db.checkUpload, err = db.sql.Prepare("SELECT 1 FROM Uploads WHERE UploadID = ?")
if err != nil {
return err
}
db.deleteRecords, err = db.sql.Prepare("DELETE FROM Records WHERE UploadID = ?")
if err != nil {
return err
}
return nil return nil
} }
...@@ -162,6 +173,42 @@ type Upload struct { ...@@ -162,6 +173,42 @@ type Upload struct {
// now is a hook for testing // now is a hook for testing
var now = time.Now var now = time.Now
// ReplaceUpload removes the records associated with id if any and
// allows insertion of new records.
func (db *DB) ReplaceUpload(id string) (*Upload, error) {
if _, err := db.deleteRecords.Exec(id); err != nil {
return nil, err
}
var found bool
err := db.checkUpload.QueryRow(id).Scan(&found)
switch err {
case sql.ErrNoRows:
var day sql.NullString
var num sql.NullInt64
if m := regexp.MustCompile(`^(\d+)\.(\d+)$`).FindStringSubmatch(id); m != nil {
day.Valid, num.Valid = true, true
day.String = m[1]
num.Int64, _ = strconv.ParseInt(m[2], 10, 64)
}
if _, err := db.insertUpload.Exec(id, day, num); err != nil {
return nil, err
}
case nil:
default:
return nil, err
}
tx, err := db.sql.Begin()
if err != nil {
return nil, err
}
u := &Upload{
ID: id,
db: db,
tx: tx,
}
return u, nil
}
// NewUpload returns an upload for storing new files. // NewUpload returns an upload for storing new files.
// All records written to the Upload will have the same upload ID. // All records written to the Upload will have the same upload ID.
func (db *DB) NewUpload(ctx context.Context) (*Upload, error) { func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
...@@ -210,11 +257,12 @@ func (db *DB) NewUpload(ctx context.Context) (*Upload, error) { ...@@ -210,11 +257,12 @@ func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Upload{ u := &Upload{
ID: id, ID: id,
db: db, db: db,
tx: utx, tx: utx,
}, nil }
return u, nil
} }
// InsertRecord inserts a single record in an existing upload. // InsertRecord inserts a single record in an existing upload.
...@@ -352,7 +400,7 @@ func splitQueryWords(q string) []string { ...@@ -352,7 +400,7 @@ func splitQueryWords(q string) []string {
// Query is the result of a query. // Query is the result of a query.
// Use Next to advance through the rows, making sure to call Close when done: // Use Next to advance through the rows, making sure to call Close when done:
// //
// q, err := db.Query("key:value") // q := db.Query("key:value")
// defer q.Close() // defer q.Close()
// for q.Next() { // for q.Next() {
// res := q.Result() // res := q.Result()
......
...@@ -5,8 +5,12 @@ ...@@ -5,8 +5,12 @@
package db_test package db_test
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io/ioutil"
"os"
"os/exec"
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
...@@ -79,6 +83,95 @@ func TestUploadIDs(t *testing.T) { ...@@ -79,6 +83,95 @@ func TestUploadIDs(t *testing.T) {
} }
} }
// checkQueryResults performs a query on db and verifies that the
// results as printed by BenchmarkPrinter are equal to results.
func checkQueryResults(t *testing.T, db *DB, query, results string) {
q := db.Query(query)
defer q.Close()
var buf bytes.Buffer
bp := benchfmt.NewPrinter(&buf)
for q.Next() {
if err := bp.Print(q.Result()); err != nil {
t.Fatalf("Print: %v", err)
}
}
if err := q.Err(); err != nil {
t.Fatalf("Err: %v", err)
}
if diff := diff(buf.String(), results); diff != "" {
t.Errorf("wrong results: (- have/+ want)\n%s", diff)
}
}
// TestReplaceUpload verifies that the expected number of rows exist after replacing an upload.
func TestReplaceUpload(t *testing.T) {
SetNow(time.Unix(0, 0))
defer SetNow(time.Time{})
db, cleanup := dbtest.NewDB(t)
defer cleanup()
ctx := context.Background()
r := &benchfmt.Result{
benchfmt.Labels{"key": "value"},
nil,
1,
"BenchmarkName 1 ns/op",
}
u, err := db.NewUpload(ctx)
if err != nil {
t.Fatalf("NewUpload: %v", err)
}
r.Labels["uploadid"] = u.ID
for _, num := range []string{"1", "2"} {
r.Labels["num"] = num
if err := u.InsertRecord(r); err != nil {
t.Fatalf("InsertRecord: %v", err)
}
}
if err := u.Commit(); err != nil {
t.Fatalf("Commit: %v", err)
}
checkQueryResults(t, db, "key:value",
`key: value
num: 1
uploadid: 19700101.1
BenchmarkName 1 ns/op
num: 2
BenchmarkName 1 ns/op
`)
r.Labels["num"] = "3"
for _, uploadid := range []string{u.ID, "new"} {
u, err := db.ReplaceUpload(uploadid)
if err != nil {
t.Fatalf("ReplaceUpload: %v", err)
}
r.Labels["uploadid"] = u.ID
if err := u.InsertRecord(r); err != nil {
t.Fatalf("InsertRecord: %v", err)
}
if err := u.Commit(); err != nil {
t.Fatalf("Commit: %v", err)
}
}
checkQueryResults(t, db, "key:value",
`key: value
num: 3
uploadid: 19700101.1
BenchmarkName 1 ns/op
uploadid: new
BenchmarkName 1 ns/op
`)
}
// TestNewUpload verifies that NewUpload and InsertRecord wrote the correct rows to the database. // TestNewUpload verifies that NewUpload and InsertRecord wrote the correct rows to the database.
func TestNewUpload(t *testing.T) { func TestNewUpload(t *testing.T) {
SetNow(time.Unix(0, 0)) SetNow(time.Unix(0, 0))
...@@ -218,3 +311,36 @@ func TestQuery(t *testing.T) { ...@@ -218,3 +311,36 @@ func TestQuery(t *testing.T) {
}) })
} }
} }
// diff returns the output of unified diff on s1 and s2. If the result
// is non-empty, the strings differ or the diff command failed.
func diff(s1, s2 string) string {
f1, err := ioutil.TempFile("", "benchfmt_test")
if err != nil {
return err.Error()
}
defer os.Remove(f1.Name())
defer f1.Close()
f2, err := ioutil.TempFile("", "benchfmt_test")
if err != nil {
return err.Error()
}
defer os.Remove(f2.Name())
defer f2.Close()
f1.Write([]byte(s1))
f2.Write([]byte(s2))
data, err := exec.Command("diff", "-u", f1.Name(), f2.Name()).CombinedOutput()
if len(data) > 0 {
// diff exits with a non-zero status when the files don't match.
// Ignore that failure as long as we get output.
err = nil
}
if err != nil {
data = append(data, []byte(err.Error())...)
}
return string(data)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment