Commit c9a3e4ad authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d5a0a0ea
......@@ -368,17 +368,18 @@ Tables
------
- config
.name str
.value str
.name text !null PK
.value text
(name, nid, partitions, ptid, replicas, version, zodb=pickle...)
# partition table
- pt
.rid int // = row id = part of oid space
.nid int
.rid int !null // = row id = part of oid space
.nid int !null
.state tinyint // = cell state
pkey (rid, nid)
PK (rid, nid)
# committed txns
- trans
......
......@@ -34,6 +34,7 @@ import (
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo/internal/xsha1"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -453,8 +454,8 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e
buf := resp.Data
if !xsha1skip {
checksum := sha1Sum(buf.Data)
if !xsha1.Skip {
checksum := xsha1.Sum(buf.Data)
if checksum != resp.Checksum {
return nil, 0, fmt.Errorf("data corrupt: checksum mismatch")
}
......
......@@ -976,10 +976,11 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
buf1, serial1, _, err := zback.Load(ctx, xid1)
obj1, err := zback.Load(ctx, xid1)
if err != nil {
b.Fatal(err)
}
buf1, serial1 := obj1.Data, obj1.Serial
// C.Load(xid1)
xcload1 := func() {
......
......@@ -35,7 +35,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/storage"
_ "lab.nexedi.com/kirr/neo/go/neo/storage/fs1"
_ "lab.nexedi.com/kirr/neo/go/neo/storage/sql"
_ "lab.nexedi.com/kirr/neo/go/neo/storage/sqlite"
)
const storageSummary = "run storage node"
......
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
......@@ -17,68 +17,29 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package sql provides NEO storage backend that uses SQL database for persistence.
package sql
// TODO also support mysql
// Package xsha1, similarly to crypto/sha1, provides SHA1 computation, but
// makes it a noop if requested from environment.
package xsha1
import (
"context"
"net/url"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/zodb"
"database/sql"
_ "github.com/mattn/go-sqlite3"
_ "github.com/go-sql-driver/mysql"
"crypto/sha1"
"fmt"
"os"
)
type SQLBackend struct {
db *sql.DB
}
var _ storage.Backend = (*SQLBackend)(nil)
func (b *SQLBackend) LastTid(ctx context.Context) (zodb.Tid, error) {
panic("TODO")
}
func (b *SQLBackend) LastOid(ctx context.Context) (zodb.Oid, error) {
panic("TODO")
}
func (b *SQLBackend) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, zodb.Tid, error) {
panic("TODO")
}
// ---- open by URL ----
func openURL(ctx context.Context, u *url.URL) (storage.Backend, error) {
// TODO handle query
// XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path
db, err := sql.Open("sqlite3", path) // XXX +context
if err != nil {
return nil, err
}
// check we can actually access db
err = db.PingContext(ctx)
if err != nil {
// XXX db.Close()
return nil, err // XXX err ctx
// XXX for benchmarking: how much sha1 computation takes time from latency
var Skip bool
func init() {
if os.Getenv("X_NEOGO_SHA1_SKIP") == "y" {
fmt.Fprintf(os.Stderr, "# NEO/go (%s): skipping SHA1 computations\n", os.Args[0])
Skip = true
}
return nil, nil
}
func Sum(b []byte) [sha1.Size]byte {
if !Skip {
return sha1.Sum(b)
}
func init() {
storage.RegisterBackend("sqlite", openURL)
return [sha1.Size]byte{} // all 0
}
......@@ -44,7 +44,7 @@ func xfs1stor(path string) *zfs1.FileStorage {
return stor
}
func xfs1back(path string) *bfs1.FS1Backend {
func xfs1back(path string) *bfs1.Backend {
back, err := bfs1.Open(bg, path)
exc.Raiseif(err)
return back
......
......@@ -526,8 +526,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
xid.At = before2At(req.Tid)
}
// FIXME kill nextSerial support after neo/py cache does not depend on next_serial
buf, serial, nextSerial, err := stor.back.Load(ctx, xid)
resp, err := stor.back.Load(ctx, xid)
if err != nil {
// translate err to NEO protocol error codes
e := err.(*zodb.OpError) // XXX move this to ErrEncode?
......@@ -537,7 +536,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
// compatibility with py side:
// for loadSerial - check we have exact hit - else "nodata"
if req.Serial != proto.INVALID_TID {
if serial != req.Serial {
if resp.Serial != req.Serial {
return &proto.Error{
Code: proto.OID_NOT_FOUND,
Message: fmt.Sprintf("%s: no data with serial %s", xid.Oid, req.Serial),
......@@ -545,23 +544,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
}
}
// no next serial -> None
if nextSerial == zodb.TidMax {
nextSerial = proto.INVALID_TID
}
return &proto.AnswerObject{
Oid: xid.Oid,
Serial: serial,
NextSerial: nextSerial,
Compression: false,
Data: buf,
Checksum: sha1Sum(buf.Data), // XXX computing every time
// XXX .NextSerial
// XXX .DataSerial
}
return resp
case *proto.LastTransaction:
lastTid, err := stor.back.LastTid(ctx)
......
......@@ -24,14 +24,14 @@ import (
"context"
"net/url"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/neo/internal/xsha1"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
)
type FS1Backend struct {
type Backend struct {
// TODO storage layout:
// meta/
// data/
......@@ -49,28 +49,49 @@ type FS1Backend struct {
zstor *fs1.FileStorage // underlying ZODB storage
}
var _ storage.Backend = (*FS1Backend)(nil)
var _ storage.Backend = (*Backend)(nil)
func Open(ctx context.Context, path string) (*FS1Backend, error) {
func Open(ctx context.Context, path string) (*Backend, error) {
zstor, err := fs1.Open(ctx, path)
if err != nil {
return nil, err
}
return &FS1Backend{zstor: zstor}, nil
return &Backend{zstor: zstor}, nil
}
func (f *FS1Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
func (f *Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
return f.zstor.LastTid(ctx)
}
func (f *FS1Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
func (f *Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
return f.zstor.LastOid(ctx)
}
func (f *FS1Backend) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, zodb.Tid, error) {
func (f *Backend) Load(ctx context.Context, xid zodb.Xid) (*proto.AnswerObject, error) {
// FIXME kill nextSerial support after neo/py cache does not depend on next_serial
return f.zstor.Load_XXXWithNextSerialXXX(ctx, xid)
buf, serial, nextSerial, err := f.zstor.Load_XXXWithNextSerialXXX(ctx, xid)
if err != nil {
return nil, err
}
// no next serial -> None
if nextSerial == zodb.TidMax {
nextSerial = proto.INVALID_TID
}
return &proto.AnswerObject{
Oid: xid.Oid,
Serial: serial,
NextSerial: nextSerial,
Compression: false,
Data: buf,
Checksum: xsha1.Sum(buf.Data), // XXX computing every time
// XXX .DataSerial
}, nil
}
......
// Copyright (C) 2018 Nexedi SA and Contributors.
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package sqlite provides NEO storage backend that uses SQLite database for persistence.
package sqlite
import (
"context"
"net/url"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/zodb"
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
const version = 2
// ---- schema ----
// table "config" stores configuration parameters which affect the persistent data.
//
// XXX
// (name, nid, partitions, ptid, replicas, version, zodb=pickle...)
const config = `
name TEXT NOT NULL PRIMARY KEY,
value TEXT
`
// table "pt" stores a partition table.
const pt = `
rid INTEGER NOT NULL, -- row id
nid INTEGER NOT NULL, -- node id
state INTEGER NOT NULL, -- cell state
PRIMARY KEY (rid, nid)
`
// table "trans" stores information on committed transactions.
const trans = `
partition INTEGER NOT NULL,
tid INTEGER NOT NULL,
packed BOOLEAN NOT NULL,
oids BLOB NOT NULL, -- []oid
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL,
PRIMARY KEY (partition, tid)
`
// table "obj" stores committed object metadata.
const obj = `
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER, -- -> data.id
value_tid INTEGER, -- data_tid for zodb
PRIMARY KEY (partition, oid, tid)
`
// `(partition, tid, oid)`
// `(data_id)`
// XXX reenable for ^^^
//index_dict['obj'] = (
// "CREATE INDEX %s ON %s(partition, tid, oid)",
// "CREATE INDEX %s ON %s(data_id)")
// table "data" stores object data.
const data = `
id INTEGER PRIMARY KEY,
hash BLOB NOT NULL,
compression INTEGER NOT NULL,
value BLOB NOT NULL
`
// XXX reenable for ^^^
//if dedup:
// index_dict['data'] = (
// "CREATE UNIQUE INDEX %s ON %s(hash, compression)",)
// table "ttrans" stores information on uncommitted transactions.
const ttrans = `
partition INTEGER NOT NULL,
tid INTEGER,
packed BOOLEAN NOT NULL,
oids BLOB NOT NULL,
user BLOB NOT NULL,
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL
`
// table "tobj" stores uncommitted object metadata.
const tobj = `
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (tid, oid)
`
type Backend struct {
db *sql.DB
url string
}
var _ storage.Backend = (*Backend)(nil)
func (b *Backend) query1(ctx context.Context, query string, argv ...interface{}) *sql.Row {
return b.db.QueryRowContext(ctx, query, argv...)
}
func (b *Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
var lastTid zodb.Tid
err := b.query1(ctx,
"SELECT MAX(tid) FROM pt, trans" +
" WHERE nid=? AND rid=partition" /* XXX AND tid<=? (max_tid) */,
b.nodeID()).Scan(&lastTid)
if err != nil {
// no transaction have been committed
if err == sql.ErrNoRows {
return 0, nil
}
// XXX ok to reuse zodb.OpError here? or better it should be storage.OpError ?
return 0, &zodb.OpError{URL: b.url, Op: "last_tid", Err: err}
}
return lastTid, nil
}
func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
panic("TODO")
}
func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, zodb.Tid, error) {
// XXX err ctx zodb.OpError{URL: b.url, Op: "load", Err: ...}
// XXX pid = getReadablePartition (= oid % Np, raise if pid not readable)
err := b.query1(ctx,
"SELECT tid, compression, data.hash, value, value_tid" +
" FROM obj LEFT JOIN data ON obj.data_id = data.id" +
" WHERE partition=? AND oid=? AND tid<=?" +
" ORDER BY tid DESC LIMIT 1",
pid, xid.Oid, xid.At)
.Scan(&serial, &compression, &hash, &data, &data_tid)
if err != nil {
if err == sql.ErrNoRows {
// XXX see if object exists at all
err = zodb.ErrNoData | zodb.ErrNoObject
}
return err
}
buf =
// find out nextSerial
// XXX kill nextSerial support after neo/py cache does not need it
err = b.query1(ctx,
"SELECT tid from obj" +
" WHERE partition=? AND oid=? AND tid>?" +
" ORDER BY tid LIMIT 1",
pid, xid.Oid, xid.At)
.Scan(&nextSerial)
if err != nil {
if err == sql.ErrNoObject {
nextSerial = proto.INVALID_TID
} else {
return err
}
}
return &proto.AnswerObject{
Oid: xid.Oid,
Serial: serial,
NextSerial: nextSerial,
Compression: compression,
Checksum: hash,
Data: buf,
DataSerial: data_tid,
}
}
func (b *Backend) config(key string) (..., error) {
// XXX cache
var value string
err := b.query1("SELECT value FROM config WHERE name=?", key).Scan(&value)
if err != nil {
if err = sql.ErrNoRows {
// XXX
}
// XXX
}
return value, nil
}
// ---- open by URL ----
func openURL(ctx context.Context, u *url.URL) (storage.Backend, error) {
// TODO handle query
// XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path
db, err := sql.Open("sqlite3", path) // XXX +context
if err != nil {
return nil, err
}
// check we can actually access db
err = db.PingContext(ctx)
if err != nil {
// XXX db.Close()
return nil, err // XXX err ctx
}
// XXX check config("version") vs version
// config("nid")
// config("partitions")
// config("replicas")
// config("name")
// config("ptid")
// config("backup_tid")
// config("truncate_tid")
// config("_pack_tid")
return &Backend{db: db, url: u.String()}, nil
}
func init() {
storage.RegisterBackend("sqlite", openURL)
}
......@@ -27,7 +27,7 @@ import (
"sort"
"sync"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/zodb"
)
......@@ -43,9 +43,7 @@ type Backend interface {
LastOid(ctx context.Context) (zodb.Oid, error)
// Load, similarly to zodb.IStorageDriver.Load should load object data addressed by xid.
// FIXME kill nextSerial support after neo/py cache does not depend on next_serial
// XXX +viewAt ?
Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial, nextSerial zodb.Tid, err error)
Load(ctx context.Context, xid zodb.Xid) (*proto.AnswerObject, error)
}
// BackendOpener is a function to open a NEO storage backend
......
......@@ -23,10 +23,7 @@ import (
"bytes"
"compress/zlib"
"context"
"crypto/sha1"
"fmt"
"io"
"os"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
......@@ -70,23 +67,6 @@ func decompress(in []byte, out []byte) (data []byte, err error) {
return bout.Bytes(), nil
}
// XXX for benchmarking: how much sha1 computation takes time from latency
var xsha1skip bool
func init() {
if os.Getenv("X_NEOGO_SHA1_SKIP") == "y" {
fmt.Fprintf(os.Stderr, "# NEO/go (%s): skipping SHA1 computations\n", os.Args[0])
xsha1skip = true
}
}
func sha1Sum(b []byte) [sha1.Size]byte {
if !xsha1skip {
return sha1.Sum(b)
}
return [sha1.Size]byte{} // all 0
}
// at2Before converts at to before for ZODB load semantics taking edge cases into account.
//
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment