Commit 6ce1bbec authored by Quentin Smith's avatar Quentin Smith

storage/db: batch INSERT statements across records

The MySQL protocol requires 1-3 synchronous round-trips for every
INSERT statement; to reduce the overhead, we now batch up 900 label
INSERT statments at a time. This makes a massive difference;
TestQuery previously ran in 108s; with this change, it now runs in 5s.

We were also affected by golang/go#15606; since we now generate a new INSERT
statement for every record, we are sidestepping that issue.

Change-Id: Id7a56c18c0978470542135894a2f2bcf6f7c9dd1
Reviewed-on: https://go-review.googlesource.com/35266Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent 4515b322
......@@ -32,7 +32,6 @@ type DB struct {
// prepared statements
lastUpload *sql.Stmt
insertUpload *sql.Stmt
insertRecord *sql.Stmt
checkUpload *sql.Stmt
deleteRecords *sql.Stmt
}
......@@ -141,10 +140,6 @@ func (db *DB) prepareStatements(driverName string) error {
if err != nil {
return err
}
db.insertRecord, err = db.sql.Prepare("INSERT INTO Records(UploadID, RecordID, Content) VALUES (?, ?, ?)")
if err != nil {
return err
}
db.checkUpload, err = db.sql.Prepare("SELECT 1 FROM Uploads WHERE UploadID = ?")
if err != nil {
return err
......@@ -168,6 +163,10 @@ type Upload struct {
db *DB
// tx is the transaction used by the upload.
tx *sql.Tx
// pending arguments for flush
insertRecordArgs []interface{}
insertLabelArgs []interface{}
}
// now is a hook for testing
......@@ -273,29 +272,73 @@ func (u *Upload) InsertRecord(r *benchfmt.Result) error {
if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
return err
}
if _, err := u.tx.Stmt(u.db.insertRecord).Exec(u.ID, u.recordid, buf.Bytes()); err != nil {
return err
}
var args []interface{}
u.insertRecordArgs = append(u.insertRecordArgs, u.ID, u.recordid, buf.Bytes())
for _, k := range r.Labels.Keys() {
args = append(args, u.ID, u.recordid, k, r.Labels[k])
if err := u.insertLabel(k, r.Labels[k]); err != nil {
return err
}
}
for _, k := range r.NameLabels.Keys() {
args = append(args, u.ID, u.recordid, k, r.NameLabels[k])
}
if len(args) > 0 {
query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4)
query = strings.TrimSuffix(query, ", ")
if _, err := u.tx.Exec(query, args...); err != nil {
if err := u.insertLabel(k, r.NameLabels[k]); err != nil {
return err
}
}
u.recordid++
return nil
}
// insertLabel queues a label pair for insertion.
// If there are enough labels queued, flush is called.
func (u *Upload) insertLabel(key, value string) error {
// N.B. sqlite3 has a max of 999 arguments.
// https://www.sqlite.org/limits.html#max_variable_number
if len(u.insertLabelArgs) >= 990 {
if err := u.flush(); err != nil {
return err
}
}
u.insertLabelArgs = append(u.insertLabelArgs, u.ID, u.recordid, key, value)
return nil
}
// repeatDelim returns a string consisting of n copies of s with delim between each copy.
func repeatDelim(s, delim string, n int) string {
return strings.TrimSuffix(strings.Repeat(s+delim, n), delim)
}
// insertMultiple executes a single INSERT statement to insert multiple rows.
func insertMultiple(tx *sql.Tx, sqlPrefix string, argsPerRow int, args []interface{}) error {
if len(args) == 0 {
return nil
}
query := sqlPrefix + repeatDelim("("+repeatDelim("?", ", ", argsPerRow)+")", ", ", len(args)/argsPerRow)
_, err := tx.Exec(query, args...)
return err
}
// flush sends INSERT statements for any pending data in u.insertRecordArgs and u.insertLabelArgs.
func (u *Upload) flush() error {
if n := len(u.insertRecordArgs); n > 0 {
if err := insertMultiple(u.tx, "INSERT INTO Records(UploadID, RecordID, Content) VALUES ", 3, u.insertRecordArgs); err != nil {
return err
}
u.insertRecordArgs = nil
}
if n := len(u.insertLabelArgs); n > 0 {
if err := insertMultiple(u.tx, "INSERT INTO RecordLabels VALUES ", 4, u.insertLabelArgs); err != nil {
return err
}
u.insertLabelArgs = nil
}
return nil
}
// Commit finishes processing the upload.
func (u *Upload) Commit() error {
if err := u.flush(); err != nil {
return err
}
return u.tx.Commit()
}
......@@ -465,11 +508,10 @@ func (db *DB) CountUploads() (int, error) {
// Close closes the database connections, releasing any open resources.
func (db *DB) Close() error {
if err := db.insertUpload.Close(); err != nil {
return err
}
if err := db.insertRecord.Close(); err != nil {
return err
for _, stmt := range []*sql.Stmt{db.lastUpload, db.insertUpload, db.checkUpload, db.deleteRecords} {
if err := stmt.Close(); err != nil {
return err
}
}
return db.sql.Close()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment