Commit 8b9eb18f authored by Quentin Smith's avatar Quentin Smith

storage: write index information to database

Change-Id: Icef3215b69d220620c9bdc44150699abbce1c57e
Reviewed-on: https://go-review.googlesource.com/34834Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent 2e9168ed
......@@ -14,6 +14,8 @@ import (
"sort"
"golang.org/x/net/context"
"golang.org/x/perf/storage/benchfmt"
"golang.org/x/perf/storage/db"
)
// upload is the handler for the /upload endpoint. It serves a form on
......@@ -67,7 +69,7 @@ type uploadStatus struct {
// processUpload takes one or more files from a multipart.Reader,
// writes them to the filesystem, and indexes their content.
func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadStatus, error) {
var uploadid string
var upload *db.Upload
var fileids []string
for i := 0; ; i++ {
......@@ -81,9 +83,9 @@ func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadS
return nil, fmt.Errorf("unexpected field %q", name)
}
if uploadid == "" {
if upload == nil {
var err error
uploadid, err = a.DB.ReserveUploadID(ctx)
upload, err = a.DB.NewUpload(ctx)
if err != nil {
return nil, err
}
......@@ -94,7 +96,7 @@ func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadS
// is invalid (contains no valid records) it needs to
// be rejected and the Cloud Storage upload aborted.
meta := fileMetadata(ctx, uploadid, i)
meta := fileMetadata(ctx, upload.ID, i)
// We need to do two things with the incoming data:
// - Write it to permanent storage via a.FS
......@@ -102,17 +104,17 @@ func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadS
// AND if anything fails, attempt to clean up both the
// FS and the index records.
if err := a.indexFile(ctx, p, meta); err != nil {
if err := a.indexFile(ctx, upload, p, meta); err != nil {
return nil, err
}
fileids = append(fileids, meta["fileid"])
}
return &uploadStatus{uploadid, fileids}, nil
return &uploadStatus{upload.ID, fileids}, nil
}
func (a *App) indexFile(ctx context.Context, p io.Reader, meta map[string]string) (err error) {
func (a *App) indexFile(ctx context.Context, upload *db.Upload, p io.Reader, meta map[string]string) (err error) {
fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta)
if err != nil {
return err
......@@ -137,7 +139,7 @@ func (a *App) indexFile(ctx context.Context, p io.Reader, meta map[string]string
// TODO(quentin): Add a separate goroutine and buffer for writes to fw?
tr := io.TeeReader(p, fw)
br := NewBenchmarkReader(tr)
br := benchfmt.NewReader(tr)
br.AddLabels(meta)
i := 0
for {
......@@ -152,8 +154,9 @@ func (a *App) indexFile(ctx context.Context, p io.Reader, meta map[string]string
return nil
}
i++
// TODO(quentin): Write records to database
_ = result
if err := upload.InsertRecord(result); err != nil {
return err
}
}
}
......
......@@ -13,8 +13,8 @@ import (
"net/http/httptest"
"testing"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/perf/storage/db"
_ "golang.org/x/perf/storage/db/sqlite3"
"golang.org/x/perf/storage/fs"
)
......@@ -41,7 +41,7 @@ func TestUpload(t *testing.T) {
if err != nil {
t.Errorf("CreateFormFile: %v", err)
}
fmt.Fprintf(w, "key: value\nBenchmarkOne 5 ns/op\n")
fmt.Fprintf(w, "key: value\nBenchmarkOne 5 ns/op\nkey:value2\nBenchmarkTwo 10 ns/op\n")
}()
resp, err := http.Post(srv.URL, mpw.FormDataContentType(), pr)
if err != nil {
......
......@@ -2,7 +2,10 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package app
// Package benchfmt provides readers and writers for the Go benchmark format.
//
// The format is documented at https://golang.org/design/14313-benchmark-format
package benchfmt
import (
"bufio"
......@@ -14,97 +17,111 @@ import (
"unicode"
)
// BenchmarkReader reads benchmark results from an io.Reader.
type BenchmarkReader struct {
// Reader reads benchmark results from an io.Reader.
type Reader struct {
s *bufio.Scanner
labels map[string]string
labels Labels
lineNum int
}
// NewBenchmarkReader creates a BenchmarkReader that reads from r.
func NewBenchmarkReader(r io.Reader) *BenchmarkReader {
return &BenchmarkReader{
// TODO(quentin): Make Reader have a Scanner-style interface instead, to match db.Query.
// NewReader creates a BenchmarkReader that reads from r.
func NewReader(r io.Reader) *Reader {
return &Reader{
s: bufio.NewScanner(r),
labels: make(map[string]string),
labels: make(Labels),
}
}
// AddLabels adds additional labels as if they had been read from the file.
// It must be called before the first call to r.Next.
func (r *BenchmarkReader) AddLabels(labels map[string]string) {
func (r *Reader) AddLabels(labels Labels) {
for k, v := range labels {
r.labels[k] = v
}
}
// TODO: It would probably be helpful to add a named type for
// map[string]string with String(), Keys(), and Equal() methods.
// Result represents a single line from a benchmark file.
// All information about that line is self-contained in the Result.
type Result struct {
// Labels is the set of persistent labels that apply to the result.
// Labels must not be modified.
Labels map[string]string
Labels Labels
// NameLabels is the set of ephemeral labels that were parsed
// from the benchmark name/line.
// NameLabels must not be modified.
NameLabels map[string]string
NameLabels Labels
// LineNum is the line number on which the result was found
LineNum int
// Content is the verbatim input line of the benchmark file, beginning with the string "Benchmark".
Content string
}
// A BenchmarkPrinter prints a sequence of benchmark results.
type BenchmarkPrinter struct {
// Labels is a set of key-value strings.
type Labels map[string]string
// TODO(quentin): Add String and Equal methods to Labels?
// Keys returns a sorted list of the keys in l.
func (l Labels) Keys() []string {
var out []string
for k := range l {
out = append(out, k)
}
sort.Strings(out)
return out
}
// A Printer prints a sequence of benchmark results.
type Printer struct {
w io.Writer
labels map[string]string
labels Labels
}
// NewBenchmarkPrinter constructs a BenchmarkPrinter writing to w.
func NewBenchmarkPrinter(w io.Writer) *BenchmarkPrinter {
return &BenchmarkPrinter{w: w}
// NewPrinter constructs a BenchmarkPrinter writing to w.
func NewPrinter(w io.Writer) *Printer {
return &Printer{w: w}
}
// Print writes the lines necessary to recreate r.
func (bp *BenchmarkPrinter) Print(r *Result) error {
func (p *Printer) Print(r *Result) error {
var keys []string
// Print removed keys first.
for k := range bp.labels {
for k := range p.labels {
if r.Labels[k] == "" {
keys = append(keys, k)
}
}
sort.Strings(keys)
for _, k := range keys {
if _, err := fmt.Fprintf(bp.w, "%s:\n", k); err != nil {
if _, err := fmt.Fprintf(p.w, "%s:\n", k); err != nil {
return err
}
}
// Then print new or changed keys.
keys = keys[:0]
for k, v := range r.Labels {
if v != "" && bp.labels[k] != v {
if v != "" && p.labels[k] != v {
keys = append(keys, k)
}
}
sort.Strings(keys)
for _, k := range keys {
if _, err := fmt.Fprintf(bp.w, "%s: %s\n", k, r.Labels[k]); err != nil {
if _, err := fmt.Fprintf(p.w, "%s: %s\n", k, r.Labels[k]); err != nil {
return err
}
}
// Finally print the actual line itself.
if _, err := fmt.Fprintf(bp.w, "%s\n", r.Content); err != nil {
if _, err := fmt.Fprintf(p.w, "%s\n", r.Content); err != nil {
return err
}
bp.labels = r.Labels
p.labels = r.Labels
return nil
}
// parseNameLabels extracts extra labels from a benchmark name and sets them in labels.
func parseNameLabels(name string, labels map[string]string) {
func parseNameLabels(name string, labels Labels) {
dash := strings.LastIndex(name, "-")
if dash >= 0 {
// Accept -N as an alias for /GOMAXPROCS=N
......@@ -129,10 +146,10 @@ func parseNameLabels(name string, labels map[string]string) {
}
// newResult parses a line and returns a Result object for the line.
func newResult(labels map[string]string, lineNum int, name, content string) *Result {
func newResult(labels Labels, lineNum int, name, content string) *Result {
r := &Result{
Labels: labels,
NameLabels: make(map[string]string),
NameLabels: make(Labels),
LineNum: lineNum,
Content: content,
}
......@@ -140,11 +157,11 @@ func newResult(labels map[string]string, lineNum int, name, content string) *Res
return r
}
// copyLabels makes a new copy of the labels map, to protect against
// copy returns a new copy of the labels map, to protect against
// future modifications to labels.
func copyLabels(labels map[string]string) map[string]string {
new := make(map[string]string)
for k, v := range labels {
func (l Labels) copy() Labels {
new := make(Labels)
for k, v := range l {
new[k] = v
}
return new
......@@ -154,7 +171,7 @@ func copyLabels(labels map[string]string) map[string]string {
// Next returns the next benchmark result from the file. If there are
// no further results, it returns nil, io.EOF.
func (r *BenchmarkReader) Next() (*Result, error) {
func (r *Reader) Next() (*Result, error) {
copied := false
for r.s.Scan() {
r.lineNum++
......@@ -162,7 +179,7 @@ func (r *BenchmarkReader) Next() (*Result, error) {
if key, value, ok := parseKeyValueLine(line); ok {
if !copied {
copied = true
r.labels = copyLabels(r.labels)
r.labels = r.labels.copy()
}
// TODO(quentin): Spec says empty value is valid, but
// we need a way to cancel previous labels, so we'll
......
......@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package app
package benchfmt
import (
"bytes"
......@@ -16,7 +16,7 @@ import (
"testing"
)
func readAllResults(t *testing.T, r *BenchmarkReader) []*Result {
func readAllResults(t *testing.T, r *Reader) []*Result {
var out []*Result
for {
result, err := r.Next()
......@@ -32,7 +32,6 @@ func readAllResults(t *testing.T, r *BenchmarkReader) []*Result {
}
func TestBenchmarkReader(t *testing.T) {
type kv map[string]string
tests := []struct {
name, input string
want []*Result
......@@ -43,8 +42,8 @@ func TestBenchmarkReader(t *testing.T) {
BenchmarkOne 1 ns/sec
`,
[]*Result{{
kv{"key": "value"},
kv{"name": "One"},
Labels{"key": "value"},
Labels{"name": "One"},
2,
"BenchmarkOne 1 ns/sec",
}},
......@@ -57,14 +56,14 @@ BenchmarkTwo 2 ns/sec
`,
[]*Result{
{
kv{"key": "value"},
kv{"name": "One", "sub1": "foo", "bar": "1", "GOMAXPROCS": "2"},
Labels{"key": "value"},
Labels{"name": "One", "sub1": "foo", "bar": "1", "GOMAXPROCS": "2"},
2,
"BenchmarkOne/foo/bar=1-2 1 ns/sec",
},
{
kv{"key": "value"},
kv{"name": "Two"},
Labels{"key": "value"},
Labels{"name": "Two"},
3,
"BenchmarkTwo 2 ns/sec",
},
......@@ -78,8 +77,8 @@ BenchmarkOne 1 ns/sec
`,
[]*Result{
{
kv{},
kv{"name": "One"},
Labels{},
Labels{"name": "One"},
3,
"BenchmarkOne 1 ns/sec",
},
......@@ -88,7 +87,7 @@ BenchmarkOne 1 ns/sec
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := NewBenchmarkReader(strings.NewReader(test.input))
r := NewReader(strings.NewReader(test.input))
have := readAllResults(t, r)
want := test.want
diff := ""
......@@ -157,10 +156,10 @@ BenchmarkOne 1 ns/sec
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := NewBenchmarkReader(strings.NewReader(test.input))
r := NewReader(strings.NewReader(test.input))
results := readAllResults(t, r)
var have bytes.Buffer
bp := NewBenchmarkPrinter(&have)
bp := NewPrinter(&have)
for _, result := range results {
if err := bp.Print(result); err != nil {
t.Errorf("Print returned %v", err)
......
......@@ -7,18 +7,23 @@
package db
import (
"bytes"
"database/sql"
"fmt"
"strings"
"text/template"
"golang.org/x/net/context"
"golang.org/x/perf/storage/benchfmt"
)
// DB is a high-level interface to a database for the storage
// app. It's safe for concurrent use by multiple goroutines.
type DB struct {
sql *sql.DB
sql *sql.DB // underlying database connection
// prepared statements
insertUpload *sql.Stmt
insertRecord *sql.Stmt
}
// OpenSQL creates a DB backed by a SQL database. The parameters are
......@@ -30,6 +35,11 @@ func OpenSQL(driverName, dataSourceName string) (*DB, error) {
if err != nil {
return nil, err
}
if hook := openHooks[driverName]; hook != nil {
if err := hook(db); err != nil {
return nil, err
}
}
d := &DB{sql: db}
if err := d.createTables(driverName); err != nil {
return nil, err
......@@ -40,25 +50,53 @@ func OpenSQL(driverName, dataSourceName string) (*DB, error) {
return d, nil
}
var openHooks = make(map[string]func(*sql.DB) error)
// RegisterOpenHook registers a hook to be called after opening a connection to driverName.
// This is used by the sqlite3 package to register a ConnectHook.
// It must be called from an init function.
func RegisterOpenHook(driverName string, hook func(*sql.DB) error) {
openHooks[driverName] = hook
}
// createTmpl is the template used to prepare the CREATE statements
// for the database. It is evaluated with . as a map containing one
// entry whose key is the driver name.
var createTmpl = template.Must(template.New("create").Parse(`
CREATE TABLE IF NOT EXISTS Uploads (
UploadID {{if .sqlite3}}INTEGER PRIMARY KEY AUTOINCREMENT{{else}}SERIAL PRIMARY KEY AUTO_INCREMENT{{end}}
);
CREATE TABLE IF NOT EXISTS Records (
UploadID BIGINT UNSIGNED,
RecordID BIGINT UNSIGNED,
Content BLOB,
PRIMARY KEY (UploadID, RecordID),
FOREIGN KEY (UploadID) REFERENCES Uploads(UploadID) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS RecordLabels (
UploadID BIGINT UNSIGNED,
RecordID BIGINT UNSIGNED,
Name VARCHAR(255),
Value VARCHAR(8192),
{{if not .sqlite3}}
Index (Name(100), Value(100)),
{{end}}
FOREIGN KEY (UploadID, RecordID) REFERENCES Records(UploadID, RecordID) ON UPDATE CASCADE ON DELETE CASCADE
);
{{if .sqlite3}}
CREATE INDEX IF NOT EXISTS RecordLabelsNameValue ON RecordLabels(Name, Value);
{{end}}
`))
// createTables creates any missing tables on the connection in
// db.sql. driverName is the same driver name passed to sql.Open and
// is used to select the correct syntax.
func (db *DB) createTables(driverName string) error {
var schema string
switch driverName {
case "sqlite3":
schema = `
CREATE TABLE IF NOT EXISTS Uploads (
UploadId INTEGER PRIMARY KEY AUTOINCREMENT
);
`
default: // MySQL syntax
schema = `
CREATE TABLE IF NOT EXISTS Uploads (
UploadId SERIAL PRIMARY KEY AUTO_INCREMENT
);`
var buf bytes.Buffer
if err := createTmpl.Execute(&buf, map[string]bool{driverName: true}); err != nil {
return err
}
for _, q := range strings.Split(schema, ";") {
for _, q := range strings.Split(buf.String(), ";") {
if strings.TrimSpace(q) == "" {
continue
}
......@@ -80,28 +118,97 @@ func (db *DB) prepareStatements(driverName string) error {
if err != nil {
return err
}
db.insertRecord, err = db.sql.Prepare("INSERT INTO Records(UploadID, RecordID, Content) VALUES (?, ?, ?)")
if err != nil {
return err
}
return nil
}
// ReserveUploadID returns an upload ID which can be used for storing new files.
func (db *DB) ReserveUploadID(ctx context.Context) (string, error) {
// An Upload is a collection of files that share an upload ID.
type Upload struct {
// ID is the value of the "uploadid" key that should be
// associated with every record in this upload.
ID string
// id is the numeric value used as the primary key. ID is a
// string for the public API; the underlying table actually
// uses an integer key. To avoid repeated calls to
// strconv.Atoi, the int64 is cached here.
id int64
// recordid is the index of the next record to insert.
recordid int64
// db is the underlying database that this upload is going to.
db *DB
}
// NewUpload returns an upload for storing new files.
// All records written to the Upload will have the same upload ID.
func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
// TODO(quentin): Use a transaction?
res, err := db.insertUpload.Exec()
if err != nil {
return "", err
return nil, err
}
// TODO(quentin): Use a date-based upload ID (YYYYMMDDnnn)
i, err := res.LastInsertId()
if err != nil {
return "", err
return nil, err
}
return fmt.Sprint(i), nil
return &Upload{
ID: fmt.Sprint(i),
id: i,
db: db,
}, nil
}
// TODO(quentin): Implement
// func (db *DB) InsertRecord(uploadid string, fields map[string]string, lines map[int]string) error
// InsertRecord inserts a single record in an existing upload.
func (u *Upload) InsertRecord(r *benchfmt.Result) (err error) {
// TODO(quentin): Use a single transaction for the whole upload?
tx, err := u.db.sql.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
} else {
err = tx.Commit()
}
}()
// TODO(quentin): Support multiple lines (slice of results?)
var buf bytes.Buffer
if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
return err
}
if _, err = tx.Stmt(u.db.insertRecord).Exec(u.id, u.recordid, buf.Bytes()); err != nil {
return err
}
var args []interface{}
for _, k := range r.Labels.Keys() {
args = append(args, u.id, u.recordid, k, r.Labels[k])
}
for _, k := range r.NameLabels.Keys() {
args = append(args, u.id, u.recordid, k, r.NameLabels[k])
}
if len(args) > 0 {
query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4)
query = strings.TrimSuffix(query, ", ")
if _, err := tx.Exec(query, args...); err != nil {
return err
}
}
u.recordid++
return nil
}
// Close closes the database connections, releasing any open resources.
func (db *DB) Close() error {
if err := db.insertUpload.Close(); err != nil {
return err
}
if err := db.insertRecord.Close(); err != nil {
return err
}
return db.sql.Close()
}
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package db_test
import (
"context"
"strings"
"testing"
"golang.org/x/perf/storage/benchfmt"
. "golang.org/x/perf/storage/db"
_ "golang.org/x/perf/storage/db/sqlite3"
)
// Most of the db package is tested via the end-to-end-tests in perf/storage/app.
// TestNewUpload verifies that NewUpload and InsertRecord wrote the correct rows to the database.
func TestNewUpload(t *testing.T) {
db, err := OpenSQL("sqlite3", ":memory:")
if err != nil {
t.Fatalf("open database: %v", err)
}
defer db.Close()
u, err := db.NewUpload(context.Background())
if err != nil {
t.Fatalf("NewUpload: %v", err)
}
br := benchfmt.NewReader(strings.NewReader(`
key: value
BenchmarkName 1 ns/op
`))
r, err := br.Next()
if err != nil {
t.Fatalf("BenchmarkReader.Next: %v", err)
}
if err := u.InsertRecord(r); err != nil {
t.Fatalf("InsertRecord: %v", err)
}
rows, err := DBSQL(db).Query("SELECT UploadId, RecordId, Name, Value FROM RecordLabels")
if err != nil {
t.Fatalf("sql.Query: %v", err)
}
defer rows.Close()
want := map[string]string{
"key": "value",
"name": "Name",
}
i := 0
for rows.Next() {
var uploadid, recordid int64
var name, value string
if err := rows.Scan(&uploadid, &recordid, &name, &value); err != nil {
t.Fatalf("rows.Scan: %v")
}
if uploadid != 1 {
t.Errorf("uploadid = %d, want 1", uploadid)
}
if recordid != 0 {
t.Errorf("recordid = %d, want 0", recordid)
}
if want[name] != value {
t.Errorf("%s = %q, want %q", name, value, want[name])
}
i++
}
if i != len(want) {
t.Errorf("have %d labels, want %d", i, len(want))
}
if err := rows.Err(); err != nil {
t.Errorf("rows.Err: %v", err)
}
}
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package db
import "database/sql"
var SplitQueryWords = splitQueryWords
func DBSQL(db *DB) *sql.DB {
return db.sql
}
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package sqlite3 provides the sqlite3 driver for
// x/perf/storage/db. It must be imported instead of go-sqlite3 to
// ensure foreign keys are properly honored.
package sqlite3
import (
"database/sql"
sqlite3 "github.com/mattn/go-sqlite3"
"golang.org/x/perf/storage/db"
)
func init() {
db.RegisterOpenHook("sqlite3", func(db *sql.DB) error {
db.Driver().(*sqlite3.SQLiteDriver).ConnectHook = func(c *sqlite3.SQLiteConn) error {
_, err := c.Exec("PRAGMA foreign_keys = ON;", nil)
return err
}
return nil
})
}
......@@ -9,9 +9,9 @@ import (
"log"
"net/http"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/perf/storage/app"
"golang.org/x/perf/storage/db"
_ "golang.org/x/perf/storage/db/sqlite3"
"golang.org/x/perf/storage/fs"
)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment