Commit 6ceaac6d authored by Quentin Smith's avatar Quentin Smith

storage: initial perfdata AE app

This creates the skeleton of an AppEngine app for perfdata.golang.org
and adds an initial implementation of /upload that saves the provided
file into Cloud Storage.

Change-Id: I1fe19b27841ab62aad146d1d1019996634012d35
Reviewed-on: https://go-review.googlesource.com/34620Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent ebfa8dc3
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package app implements the performance data storage server. Combine
// an App with a database and filesystem to get an HTTP server.
package app
import (
"net/http"
"golang.org/x/perf/storage/db"
"golang.org/x/perf/storage/fs"
)
// App manages the storage server logic. Construct an App instance
// using a literal with DB and FS objects and call RegisterOnMux to
// connect it with an HTTP server.
type App struct {
DB *db.DB
FS fs.FS
}
// RegisterOnMux registers the app's URLs on mux.
func (a *App) RegisterOnMux(mux *http.ServeMux) {
// TODO(quentin): Should we just make the App itself be an http.Handler?
mux.HandleFunc("/upload", a.upload)
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build appengine
package app
import (
"net/http"
"golang.org/x/net/context"
"google.golang.org/appengine"
"google.golang.org/appengine/log"
)
// requestContext returns the Context object for a given request.
func requestContext(r *http.Request) context.Context {
return appengine.NewContext(r)
}
var errorf = log.Errorf
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !appengine
package app
import (
"log"
"net/http"
"golang.org/x/net/context"
)
// requestContext returns the Context object for a given request.
func requestContext(r *http.Request) context.Context {
return r.Context()
}
func errorf(_ context.Context, format string, args ...interface{}) {
log.Printf(format, args...)
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package app
import (
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"golang.org/x/net/context"
)
// upload is the handler for the /upload endpoint. It serves a form on
// GET requests and processes files in a multipart/x-form-data POST
// request.
func (a *App) upload(w http.ResponseWriter, r *http.Request) {
ctx := requestContext(r)
// TODO(quentin): Authentication
if r.Method == http.MethodGet {
http.ServeFile(w, r, "static/upload.html")
return
}
if r.Method != http.MethodPost {
http.Error(w, "/upload must be called as a POST request", http.StatusMethodNotAllowed)
return
}
// We use r.MultipartReader instead of r.ParseForm to avoid
// storing uploaded data in memory.
mr, err := r.MultipartReader()
if err != nil {
errorf(ctx, "%v", err)
http.Error(w, err.Error(), 500)
return
}
result, err := a.processUpload(ctx, mr)
if err != nil {
errorf(ctx, "%v", err)
http.Error(w, err.Error(), 500)
return
}
if err := json.NewEncoder(w).Encode(result); err != nil {
errorf(ctx, "%v", err)
http.Error(w, err.Error(), 500)
return
}
}
// uploadStatus is the response to an /upload POST served as JSON.
type uploadStatus struct {
// UploadID is the upload ID assigned to the upload.
UploadID string `json:"uploadid"`
// FileIDs is the list of file IDs assigned to the files in the upload.
FileIDs []string `json:"fileids"`
}
// processUpload takes one or more files from a multipart.Reader,
// writes them to the filesystem, and indexes their content.
func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadStatus, error) {
var uploadid string
var status uploadStatus
for i := 0; ; i++ {
p, err := mr.NextPart()
if err == io.EOF {
break
}
name := p.FormName()
if name != "file" {
return nil, fmt.Errorf("unexpected field %q", name)
}
if uploadid == "" {
var err error
uploadid, err = a.DB.ReserveUploadID(ctx)
if err != nil {
return nil, err
}
status.UploadID = uploadid
}
// The incoming file needs to be stored in Cloud
// Storage and it also needs to be indexed. If the file
// is invalid (contains no valid records) it needs to
// be rejected and the Cloud Storage upload aborted.
// TODO(quentin): We might as well do these in parallel.
meta := fileMetadata(ctx, uploadid, i)
fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta)
if err != nil {
return nil, err
}
// TODO(quentin): Write metadata at top of file
if _, err := io.Copy(fw, p); err != nil {
fw.CloseWithError(err)
return nil, err
}
// TODO(quentin): Write records to database
if err := fw.Close(); err != nil {
return nil, err
}
status.FileIDs = append(status.FileIDs, meta["fileid"])
}
return &status, nil
}
// fileMetadata returns the extra metadata fields associated with an
// uploaded file. It obtains the uploader's e-mail address from the
// Context.
func fileMetadata(_ context.Context, uploadid string, filenum int) map[string]string {
// TODO(quentin): Add the name of the uploader.
// TODO(quentin): Add the upload time.
// TODO(quentin): Add other fields?
return map[string]string{
"uploadid": uploadid,
"fileid": fmt.Sprintf("%s/%d", uploadid, filenum),
}
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package app
import (
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"net/http/httptest"
"testing"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/perf/storage/db"
"golang.org/x/perf/storage/fs"
)
func TestUpload(t *testing.T) {
db, err := db.OpenSQL("sqlite3", ":memory:")
if err != nil {
t.Fatalf("open database: %v", err)
}
defer db.Close()
fs := fs.NewMemFS()
app := &App{DB: db, FS: fs}
srv := httptest.NewServer(http.HandlerFunc(app.upload))
defer srv.Close()
pr, pw := io.Pipe()
mpw := multipart.NewWriter(pw)
go func() {
defer pw.Close()
defer mpw.Close()
// Write the parts here
w, err := mpw.CreateFormFile("file", "1.txt")
if err != nil {
t.Errorf("CreateFormFile: %v", err)
}
fmt.Fprintf(w, "key: value\nBenchmarkOne 5 ns/op\n")
}()
resp, err := http.Post(srv.URL, mpw.FormDataContentType(), pr)
if err != nil {
t.Fatalf("post /upload: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Errorf("post /upload: %v", resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("reading /upload response: %v", err)
}
t.Logf("/upload response:\n%s", body)
if len(fs.Files()) != 1 {
t.Errorf("/upload wrote %d files, want 1", len(fs.Files()))
}
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package appengine contains an AppEngine app for perfdata.golang.org
package appengine
import (
"fmt"
"log"
"net/http"
"os"
_ "github.com/go-sql-driver/mysql"
"golang.org/x/perf/storage/app"
"golang.org/x/perf/storage/db"
"golang.org/x/perf/storage/fs/gcs"
"google.golang.org/appengine"
aelog "google.golang.org/appengine/log"
)
// connectDB returns a DB initialized from the environment variables set in app.yaml. CLOUDSQL_CONNECTION_NAME, CLOUDSQL_USER, and CLOUDSQL_DATABASE must be set to point to the Cloud SQL instance. CLOUDSQL_PASSWORD can be set if needed.
func connectDB() (*db.DB, error) {
var (
connectionName = mustGetenv("CLOUDSQL_CONNECTION_NAME")
user = mustGetenv("CLOUDSQL_USER")
password = os.Getenv("CLOUDSQL_PASSWORD") // NOTE: password may be empty
dbName = mustGetenv("CLOUDSQL_DATABASE")
)
return db.OpenSQL("mysql", fmt.Sprintf("%s:%s@cloudsql(%s)/%s", user, password, connectionName, dbName))
}
func mustGetenv(k string) string {
v := os.Getenv(k)
if v == "" {
log.Panicf("%s environment variable not set.", k)
}
return v
}
// appHandler is the default handler, registered to serve "/".
// It creates a new App instance using the appengine Context and then
// dispatches the request to the App. The environment variable
// GCS_BUCKET must be set in app.yaml with the name of the bucket to
// write to.
func appHandler(w http.ResponseWriter, r *http.Request) {
ctx := appengine.NewContext(r)
// GCS clients need to be constructed with an AppEngine
// context, so we can't actually make the App until the
// request comes in.
// TODO(quentin): Figure out if there's a way to construct the
// app and clients once, in init(), instead of on every request.
db, err := connectDB()
if err != nil {
aelog.Errorf(ctx, "connectDB: %v", err)
http.Error(w, err.Error(), 500)
return
}
defer db.Close()
fs, err := gcs.NewFS(ctx, mustGetenv("GCS_BUCKET"))
if err != nil {
aelog.Errorf(ctx, "gcs.NewFS: %v", err)
http.Error(w, err.Error(), 500)
return
}
mux := http.NewServeMux()
app := &app.App{DB: db, FS: fs}
app.RegisterOnMux(mux)
mux.ServeHTTP(w, r)
}
func init() {
http.HandleFunc("/", appHandler)
}
# Update with
# google_appengine/appcfg.py [-V dev-test] update .
#
# Using -V dev-test will run as dev-test.perfdata.golang.org.
application: golang-org
module: perfdata
version: main
runtime: go
api_version: go1
handlers:
- url: /_ah/remote_api
script: _go_app
- url: /upload
script: _go_app
secure: always
env_variables:
CLOUDSQL_CONNECTION_NAME: golang-org:us-central1:golang-org
CLOUDSQL_USER: root
CLOUDSQL_PASSWORD: ''
CLOUDSQL_DATABASE: perfdata
GCS_BUCKET: golang-perfdata
<!DOCTYPE html>
<html>
<head>
<title>Upload Performance Results</title>
</head>
<body>
<p>Upload one or more <a href="https://github.com/golang/proposal/blob/master/design/14313-benchmark-format.md">benchmark files</a>.</p>
<form method="post" enctype="multipart/form-data">
<label>File: <input type="file" name="file" multiple></label><br>
<input type="submit" value="Upload">
</form>
</body>
</html>
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package db provides the high-level database interface for the
// storage app.
package db
import (
"database/sql"
"fmt"
"strings"
"golang.org/x/net/context"
)
// DB is a high-level interface to a database for the storage
// app. It's safe for concurrent use by multiple goroutines.
type DB struct {
sql *sql.DB
insertUpload *sql.Stmt
}
// OpenSQL creates a DB backed by a SQL database. The parameters are
// the same as the parameters for sql.Open. Only mysql and sqlite3 are
// explicitly supported; other database engines will receive MySQL
// query syntax which may or may not be compatible.
func OpenSQL(driverName, dataSourceName string) (*DB, error) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
return nil, err
}
d := &DB{sql: db}
if err := d.createTables(driverName); err != nil {
return nil, err
}
if err := d.prepareStatements(driverName); err != nil {
return nil, err
}
return d, nil
}
// createTables creates any missing tables on the connection in
// db.sql. driverName is the same driver name passed to sql.Open and
// is used to select the correct syntax.
func (db *DB) createTables(driverName string) error {
var schema string
switch driverName {
case "sqlite3":
schema = `
CREATE TABLE IF NOT EXISTS Uploads (
UploadId INTEGER PRIMARY KEY AUTOINCREMENT
);
`
default: // MySQL syntax
schema = `
CREATE TABLE IF NOT EXISTS Uploads (
UploadId SERIAL PRIMARY KEY AUTO_INCREMENT
);`
}
for _, q := range strings.Split(schema, ";") {
if strings.TrimSpace(q) == "" {
continue
}
if _, err := db.sql.Exec(q); err != nil {
return fmt.Errorf("create table: %v", err)
}
}
return nil
}
// prepareStatements calls db.sql.Prepare on reusable SQL statements.
func (db *DB) prepareStatements(driverName string) error {
var err error
q := "INSERT INTO Uploads() VALUES ()"
if driverName == "sqlite3" {
q = "INSERT INTO Uploads DEFAULT VALUES"
}
db.insertUpload, err = db.sql.Prepare(q)
if err != nil {
return err
}
return nil
}
// ReserveUploadID returns an upload ID which can be used for storing new files.
func (db *DB) ReserveUploadID(ctx context.Context) (string, error) {
// TODO(quentin): Use a transaction?
res, err := db.insertUpload.Exec()
if err != nil {
return "", err
}
// TODO(quentin): Use a date-based upload ID (YYYYMMDDnnn)
i, err := res.LastInsertId()
if err != nil {
return "", err
}
return fmt.Sprint(i), nil
}
// TODO(quentin): Implement
// func (db *DB) InsertRecord(uploadid string, fields map[string]string, lines map[int]string) error
// Close closes the database connections, releasing any open resources.
func (db *DB) Close() error {
return db.sql.Close()
}
-- The intended production Cloud SQL schema. Committed here only as a
-- form of notes (see the actual current schema in
-- db.go:createTables).
CREATE TABLE Uploads (
UploadId SERIAL PRIMARY KEY AUTO_INCREMENT
);
CREATE TABLE Records (
UploadId BIGINT UNSIGNED,
RecordId BIGINT UNSIGNED,
Contents BLOB,
PRIMARY KEY (UploadId, RecordId),
FOREIGN KEY (UploadId) REFERENCES Uploads(UploadId)
);
CREATE TABLE RecordLabels (
UploadId BIGINT UNSIGNED,
RecordId BIGINT UNSIGNED,
Name VARCHAR(255),
Value VARCHAR(8192),
INDEX (Name(100), Value(100)),
FOREIGN KEY (UploadId, RecordId) REFERENCES Records(UploadId, RecordId)
);
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package fs provides a backend-agnostic filesystem layer for storing
// performance results.
package fs
import (
"errors"
"io"
"sort"
"sync"
"golang.org/x/net/context"
)
// An FS stores uploaded benchmark data files.
type FS interface {
// NewWriter returns a Writer for a given file name.
// When the Writer is closed, the file will be stored with the
// given metadata and the data written to the writer.
NewWriter(ctx context.Context, name string, metadata map[string]string) (Writer, error)
}
// A Writer is an io.Writer that can also be closed with an error.
type Writer interface {
io.WriteCloser
// CloseWithError cancels the writing of the file, removing
// any partially written data.
CloseWithError(error) error
}
// MemFS is an in-memory filesystem implementing the FS interface.
type MemFS struct {
mu sync.Mutex
content map[string]*memFile
}
// NewMemFS constructs a new, empty MemFS.
func NewMemFS() *MemFS {
return &MemFS{
content: make(map[string]*memFile),
}
}
// NewWriter returns a Writer for a given file name. As a side effect,
// it associates the given metadata with the file.
func (fs *MemFS) NewWriter(_ context.Context, name string, metadata map[string]string) (Writer, error) {
meta := make(map[string]string)
for k, v := range metadata {
meta[k] = v
}
return &memFile{fs: fs, name: name, metadata: meta}, nil
}
// Files returns the names of the files written to fs.
func (fs *MemFS) Files() []string {
fs.mu.Lock()
defer fs.mu.Unlock()
var files []string
for f := range fs.content {
files = append(files, f)
}
sort.Strings(files)
return files
}
// memFile represents a file in a MemFS. While the file is being
// written, fs points to the filesystem. Close writes the file's
// content to fs and sets fs to nil.
type memFile struct {
fs *MemFS
name string
metadata map[string]string
content []byte
}
func (f *memFile) Write(p []byte) (int, error) {
f.content = append(f.content, p...)
return len(p), nil
}
func (f *memFile) Close() error {
if f.fs == nil {
return errors.New("already closed")
}
f.fs.mu.Lock()
defer f.fs.mu.Unlock()
f.fs.content[f.name] = f
f.fs = nil
return nil
}
func (f *memFile) CloseWithError(error) error {
f.fs = nil
return nil
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package gcs implements the fs.FS interface using Google Cloud Storage.
package gcs
import (
"cloud.google.com/go/storage"
"golang.org/x/net/context"
"golang.org/x/perf/storage/fs"
)
// impl is an fs.FS backed by Google Cloud Storage.
type impl struct {
bucket *storage.BucketHandle
}
// NewFS constructs an FS that writes to the provided bucket.
// On AppEngine, ctx must be a request-derived Context.
func NewFS(ctx context.Context, bucketName string) (fs.FS, error) {
client, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}
return &impl{client.Bucket(bucketName)}, nil
}
func (fs *impl) NewWriter(ctx context.Context, name string, metadata map[string]string) (fs.Writer, error) {
w := fs.bucket.Object(name).NewWriter(ctx)
// TODO(quentin): Do these need "x-goog-meta-" prefixes?
w.Metadata = metadata
return w, nil
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"flag"
"log"
"net/http"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/perf/storage/app"
"golang.org/x/perf/storage/db"
"golang.org/x/perf/storage/fs"
)
var host = flag.String("port", ":8080", "(host and) port to bind on")
func main() {
flag.Parse()
db, err := db.OpenSQL("sqlite3", ":memory:")
if err != nil {
log.Fatalf("open database: %v", err)
}
fs := fs.NewMemFS()
app := &app.App{DB: db, FS: fs}
app.RegisterOnMux(http.DefaultServeMux)
log.Printf("Listening on %s", *host)
log.Fatal(http.ListenAndServe(*host, nil))
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment