Commit 2e9168ed authored by Quentin Smith's avatar Quentin Smith

storage/app: parse incoming benchmark records

This change parses incoming benchmark records in parallel to writing
them to Google Cloud Storage. It does not yet attempt to insert the
parsed records into Cloud SQL.

Change-Id: I250b334569b8d59f2366025db5c56add51b96bd6
Reviewed-on: https://go-review.googlesource.com/34628Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent 242cc6d2
......@@ -6,6 +6,7 @@ package app
import (
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
......@@ -67,8 +68,7 @@ type uploadStatus struct {
// writes them to the filesystem, and indexes their content.
func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadStatus, error) {
var uploadid string
var status uploadStatus
var fileids []string
for i := 0; ; i++ {
p, err := mr.NextPart()
......@@ -87,22 +87,43 @@ func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadS
if err != nil {
return nil, err
}
status.UploadID = uploadid
}
// The incoming file needs to be stored in Cloud
// Storage and it also needs to be indexed. If the file
// is invalid (contains no valid records) it needs to
// be rejected and the Cloud Storage upload aborted.
// TODO(quentin): We might as well do these in parallel.
meta := fileMetadata(ctx, uploadid, i)
fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta)
if err != nil {
// We need to do two things with the incoming data:
// - Write it to permanent storage via a.FS
// - Write index records to a.DB
// AND if anything fails, attempt to clean up both the
// FS and the index records.
if err := a.indexFile(ctx, p, meta); err != nil {
return nil, err
}
fileids = append(fileids, meta["fileid"])
}
return &uploadStatus{uploadid, fileids}, nil
}
func (a *App) indexFile(ctx context.Context, p io.Reader, meta map[string]string) (err error) {
fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta)
if err != nil {
return err
}
defer func() {
if err != nil {
fw.CloseWithError(err)
} else {
err = fw.Close()
}
}()
var keys []string
for k := range meta {
keys = append(keys, k)
......@@ -110,25 +131,30 @@ func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadS
sort.Strings(keys)
for _, k := range keys {
if _, err := fmt.Fprintf(fw, "%s: %s\n", k, meta[k]); err != nil {
fw.CloseWithError(err)
return nil, err
return err
}
}
if _, err := io.Copy(fw, p); err != nil {
fw.CloseWithError(err)
return nil, err
// TODO(quentin): Add a separate goroutine and buffer for writes to fw?
tr := io.TeeReader(p, fw)
br := NewBenchmarkReader(tr)
br.AddLabels(meta)
i := 0
for {
result, err := br.Next()
if err != nil {
if err != io.EOF {
return err
}
// TODO(quentin): Write records to database
if err := fw.Close(); err != nil {
return nil, err
if i == 0 {
return errors.New("no valid benchmark lines found")
}
status.FileIDs = append(status.FileIDs, meta["fileid"])
return nil
}
i++
// TODO(quentin): Write records to database
_ = result
}
return &status, nil
}
// fileMetadata returns the extra metadata fields associated with an
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment