Commit 4b028980 authored by Quentin Smith's avatar Quentin Smith

storage: use date-based upload ID

Instead of simply sequentially-increasing upload IDs, we now generate
one with a date prefix.

Change-Id: Id54ab88e6d76932cfc121183ea2da5d145599d16
Reviewed-on: https://go-review.googlesource.com/35256Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent 62a90583
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"testing" "testing"
"time"
"golang.org/x/perf/storage/db" "golang.org/x/perf/storage/db"
"golang.org/x/perf/storage/db/dbtest" "golang.org/x/perf/storage/db/dbtest"
...@@ -96,6 +97,8 @@ func TestUpload(t *testing.T) { ...@@ -96,6 +97,8 @@ func TestUpload(t *testing.T) {
app := createTestApp(t) app := createTestApp(t)
defer app.Close() defer app.Close()
wantID := time.Now().UTC().Format("20060102.") + "1"
status := app.uploadFiles(t, func(mpw *multipart.Writer) { status := app.uploadFiles(t, func(mpw *multipart.Writer) {
w, err := mpw.CreateFormFile("file", "1.txt") w, err := mpw.CreateFormFile("file", "1.txt")
if err != nil { if err != nil {
...@@ -104,14 +107,14 @@ func TestUpload(t *testing.T) { ...@@ -104,14 +107,14 @@ func TestUpload(t *testing.T) {
fmt.Fprintf(w, "key: value\nBenchmarkOne 5 ns/op\nkey:value2\nBenchmarkTwo 10 ns/op\n") fmt.Fprintf(w, "key: value\nBenchmarkOne 5 ns/op\nkey:value2\nBenchmarkTwo 10 ns/op\n")
}) })
if status.UploadID != "1" { if status.UploadID != wantID {
t.Errorf("uploadid = %q, want %q", status.UploadID, "1") t.Errorf("uploadid = %q, want %q", status.UploadID, wantID)
} }
if have, want := status.FileIDs, []string{"1/0"}; !reflect.DeepEqual(have, want) { if have, want := status.FileIDs, []string{wantID + "/0"}; !reflect.DeepEqual(have, want) {
t.Errorf("fileids = %v, want %v", have, want) t.Errorf("fileids = %v, want %v", have, want)
} }
if status.ViewURL != "view:1" { if want := "view:" + wantID; status.ViewURL != want {
t.Errorf("viewurl = %q, want %q", status.ViewURL, "view:1") t.Errorf("viewurl = %q, want %q", status.ViewURL, want)
} }
if len(app.fs.Files()) != 1 { if len(app.fs.Files()) != 1 {
......
...@@ -12,8 +12,10 @@ import ( ...@@ -12,8 +12,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"strconv"
"strings" "strings"
"text/template" "text/template"
"time"
"unicode" "unicode"
"golang.org/x/net/context" "golang.org/x/net/context"
...@@ -27,6 +29,7 @@ import ( ...@@ -27,6 +29,7 @@ import (
type DB struct { type DB struct {
sql *sql.DB // underlying database connection sql *sql.DB // underlying database connection
// prepared statements // prepared statements
lastUpload *sql.Stmt
insertUpload *sql.Stmt insertUpload *sql.Stmt
insertRecord *sql.Stmt insertRecord *sql.Stmt
} }
...@@ -69,17 +72,25 @@ func RegisterOpenHook(driverName string, hook func(*sql.DB) error) { ...@@ -69,17 +72,25 @@ func RegisterOpenHook(driverName string, hook func(*sql.DB) error) {
// entry whose key is the driver name. // entry whose key is the driver name.
var createTmpl = template.Must(template.New("create").Parse(` var createTmpl = template.Must(template.New("create").Parse(`
CREATE TABLE IF NOT EXISTS Uploads ( CREATE TABLE IF NOT EXISTS Uploads (
UploadID {{if .sqlite3}}INTEGER PRIMARY KEY AUTOINCREMENT{{else}}SERIAL PRIMARY KEY AUTO_INCREMENT{{end}} UploadID VARCHAR(20) PRIMARY KEY,
Day VARCHAR(8),
Seq BIGINT UNSIGNED
{{if not .sqlite3}}
, Index (Day, Seq)
{{end}}
); );
{{if .sqlite3}}
CREATE INDEX IF NOT EXISTS UploadDaySeq ON Uploads(Day, Seq);
{{end}}
CREATE TABLE IF NOT EXISTS Records ( CREATE TABLE IF NOT EXISTS Records (
UploadID BIGINT UNSIGNED, UploadID VARCHAR(20),
RecordID BIGINT UNSIGNED, RecordID BIGINT UNSIGNED,
Content BLOB, Content BLOB,
PRIMARY KEY (UploadID, RecordID), PRIMARY KEY (UploadID, RecordID),
FOREIGN KEY (UploadID) REFERENCES Uploads(UploadID) ON UPDATE CASCADE ON DELETE CASCADE FOREIGN KEY (UploadID) REFERENCES Uploads(UploadID) ON UPDATE CASCADE ON DELETE CASCADE
); );
CREATE TABLE IF NOT EXISTS RecordLabels ( CREATE TABLE IF NOT EXISTS RecordLabels (
UploadID BIGINT UNSIGNED, UploadID VARCHAR(20),
RecordID BIGINT UNSIGNED, RecordID BIGINT UNSIGNED,
Name VARCHAR(255), Name VARCHAR(255),
Value VARCHAR(8192), Value VARCHAR(8192),
...@@ -115,11 +126,15 @@ func (db *DB) createTables(driverName string) error { ...@@ -115,11 +126,15 @@ func (db *DB) createTables(driverName string) error {
// prepareStatements calls db.sql.Prepare on reusable SQL statements. // prepareStatements calls db.sql.Prepare on reusable SQL statements.
func (db *DB) prepareStatements(driverName string) error { func (db *DB) prepareStatements(driverName string) error {
var err error var err error
q := "INSERT INTO Uploads() VALUES ()" query := "SELECT UploadID FROM Uploads ORDER BY Day DESC, Seq DESC LIMIT 1"
if driverName == "sqlite3" { if driverName != "sqlite3" {
q = "INSERT INTO Uploads DEFAULT VALUES" query += " FOR UPDATE"
}
db.lastUpload, err = db.sql.Prepare(query)
if err != nil {
return err
} }
db.insertUpload, err = db.sql.Prepare(q) db.insertUpload, err = db.sql.Prepare("INSERT INTO Uploads(UploadID, Day, Seq) VALUES (?, ?, ?)")
if err != nil { if err != nil {
return err return err
} }
...@@ -136,11 +151,6 @@ type Upload struct { ...@@ -136,11 +151,6 @@ type Upload struct {
// associated with every record in this upload. // associated with every record in this upload.
ID string ID string
// id is the numeric value used as the primary key. ID is a
// string for the public API; the underlying table actually
// uses an integer key. To avoid repeated calls to
// strconv.Atoi, the int64 is cached here.
id int64
// recordid is the index of the next record to insert. // recordid is the index of the next record to insert.
recordid int64 recordid int64
// db is the underlying database that this upload is going to. // db is the underlying database that this upload is going to.
...@@ -149,28 +159,61 @@ type Upload struct { ...@@ -149,28 +159,61 @@ type Upload struct {
tx *sql.Tx tx *sql.Tx
} }
// now is a hook for testing
var now = time.Now
// NewUpload returns an upload for storing new files. // NewUpload returns an upload for storing new files.
// All records written to the Upload will have the same upload ID. // All records written to the Upload will have the same upload ID.
func (db *DB) NewUpload(ctx context.Context) (*Upload, error) { func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
// TODO(quentin): Use the same transaction as the rest of the upload? day := now().UTC().Format("20060102")
res, err := db.insertUpload.Exec()
num := 0
tx, err := db.sql.Begin()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO(quentin): Use a date-based upload ID (YYYYMMDDnnn) defer func() {
i, err := res.LastInsertId() if tx != nil {
tx.Rollback()
}
}()
var lastID string
err = tx.Stmt(db.lastUpload).QueryRow().Scan(&lastID)
switch err {
case sql.ErrNoRows:
case nil:
if strings.HasPrefix(lastID, day) {
num, err = strconv.Atoi(lastID[len(day)+1:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
tx, err := db.sql.Begin() }
default:
return nil, err
}
num++
id := fmt.Sprintf("%s.%d", day, num)
_, err = tx.Stmt(db.insertUpload).Exec(id, day, num)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
tx = nil
utx, err := db.sql.Begin()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Upload{ return &Upload{
ID: fmt.Sprint(i), ID: id,
id: i,
db: db, db: db,
tx: tx, tx: utx,
}, nil }, nil
} }
...@@ -182,15 +225,15 @@ func (u *Upload) InsertRecord(r *benchfmt.Result) error { ...@@ -182,15 +225,15 @@ func (u *Upload) InsertRecord(r *benchfmt.Result) error {
if err := benchfmt.NewPrinter(&buf).Print(r); err != nil { if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
return err return err
} }
if _, err := u.tx.Stmt(u.db.insertRecord).Exec(u.id, u.recordid, buf.Bytes()); err != nil { if _, err := u.tx.Stmt(u.db.insertRecord).Exec(u.ID, u.recordid, buf.Bytes()); err != nil {
return err return err
} }
var args []interface{} var args []interface{}
for _, k := range r.Labels.Keys() { for _, k := range r.Labels.Keys() {
args = append(args, u.id, u.recordid, k, r.Labels[k]) args = append(args, u.ID, u.recordid, k, r.Labels[k])
} }
for _, k := range r.NameLabels.Keys() { for _, k := range r.NameLabels.Keys() {
args = append(args, u.id, u.recordid, k, r.NameLabels[k]) args = append(args, u.ID, u.recordid, k, r.NameLabels[k])
} }
if len(args) > 0 { if len(args) > 0 {
query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4) query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4)
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"time"
"golang.org/x/perf/storage/benchfmt" "golang.org/x/perf/storage/benchfmt"
. "golang.org/x/perf/storage/db" . "golang.org/x/perf/storage/db"
...@@ -36,8 +37,52 @@ func TestSplitQueryWords(t *testing.T) { ...@@ -36,8 +37,52 @@ func TestSplitQueryWords(t *testing.T) {
} }
} }
// TestUploadIDs verifies that NewUpload generates the correct sequence of upload IDs.
func TestUploadIDs(t *testing.T) {
ctx := context.Background()
db, cleanup := dbtest.NewDB(t)
defer cleanup()
defer SetNow(time.Time{})
tests := []struct {
sec int64
id string
}{
{0, "19700101.1"},
{0, "19700101.2"},
{86400, "19700102.1"},
{86400, "19700102.2"},
{86400, "19700102.3"},
{86400, "19700102.4"},
{86400, "19700102.5"},
{86400, "19700102.6"},
{86400, "19700102.7"},
{86400, "19700102.8"},
{86400, "19700102.9"},
{86400, "19700102.10"},
{86400, "19700102.11"},
}
for _, test := range tests {
SetNow(time.Unix(test.sec, 0))
u, err := db.NewUpload(ctx)
if err != nil {
t.Fatalf("NewUpload: %v", err)
}
if err := u.Commit(); err != nil {
t.Fatalf("Commit: %v", err)
}
if u.ID != test.id {
t.Fatalf("u.ID = %q, want %q", u.ID, test.id)
}
}
}
// TestNewUpload verifies that NewUpload and InsertRecord wrote the correct rows to the database. // TestNewUpload verifies that NewUpload and InsertRecord wrote the correct rows to the database.
func TestNewUpload(t *testing.T) { func TestNewUpload(t *testing.T) {
SetNow(time.Unix(0, 0))
defer SetNow(time.Time{})
db, cleanup := dbtest.NewDB(t) db, cleanup := dbtest.NewDB(t)
defer cleanup() defer cleanup()
...@@ -78,14 +123,15 @@ BenchmarkName 1 ns/op ...@@ -78,14 +123,15 @@ BenchmarkName 1 ns/op
i := 0 i := 0
for rows.Next() { for rows.Next() {
var uploadid, recordid int64 var uploadid string
var recordid int64
var name, value string var name, value string
if err := rows.Scan(&uploadid, &recordid, &name, &value); err != nil { if err := rows.Scan(&uploadid, &recordid, &name, &value); err != nil {
t.Fatalf("rows.Scan: %v") t.Fatalf("rows.Scan: %v", err)
} }
if uploadid != 1 { if uploadid != "19700101.1" {
t.Errorf("uploadid = %d, want 1", uploadid) t.Errorf("uploadid = %q, want %q", uploadid, "19700101.1")
} }
if recordid != 0 { if recordid != 0 {
t.Errorf("recordid = %d, want 0", recordid) t.Errorf("recordid = %d, want 0", recordid)
......
...@@ -4,10 +4,21 @@ ...@@ -4,10 +4,21 @@
package db package db
import "database/sql" import (
"database/sql"
"time"
)
var SplitQueryWords = splitQueryWords var SplitQueryWords = splitQueryWords
func DBSQL(db *DB) *sql.DB { func DBSQL(db *DB) *sql.DB {
return db.sql return db.sql
} }
func SetNow(t time.Time) {
if t.IsZero() {
now = time.Now
return
}
now = func() time.Time { return t }
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment