Commit aa17a79e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7a98ef72
...@@ -140,7 +140,6 @@ func (m *Master) Run(ctx context.Context) (err error) { ...@@ -140,7 +140,6 @@ func (m *Master) Run(ctx context.Context) (err error) {
naddr, err := proto.Addr(l.Addr()) naddr, err := proto.Addr(l.Addr())
if err != nil { if err != nil {
// must be ok since l.Addr() is valid since it is listening // must be ok since l.Addr() is valid since it is listening
// XXX panic -> errors.Wrap?
panic(err) panic(err)
} }
......
...@@ -21,9 +21,12 @@ package sqlite ...@@ -21,9 +21,12 @@ package sqlite
import ( import (
"context" "context"
"fmt"
"net/url" "net/url"
"reflect"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/storage" "lab.nexedi.com/kirr/neo/go/neo/storage"
...@@ -140,10 +143,14 @@ func (b *Backend) query1(ctx context.Context, query string, argv ...interface{}) ...@@ -140,10 +143,14 @@ func (b *Backend) query1(ctx context.Context, query string, argv ...interface{})
func (b *Backend) LastTid(ctx context.Context) (zodb.Tid, error) { func (b *Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
var lastTid zodb.Tid var lastTid zodb.Tid
// FIXME nodeID <- my node UUID
myID := proto.UUID(proto.STORAGE, 1)
err := b.query1(ctx, err := b.query1(ctx,
"SELECT MAX(tid) FROM pt, trans" + "SELECT MAX(tid) FROM pt, trans" +
" WHERE nid=? AND rid=partition" /* XXX AND tid<=? (max_tid) */, " WHERE nid=? AND rid=partition" /* XXX AND tid<=? (max_tid) */,
b.nodeID()).Scan(&lastTid) myID).Scan(&lastTid)
if err != nil { if err != nil {
// no transaction have been committed // no transaction have been committed
...@@ -172,7 +179,8 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject ...@@ -172,7 +179,8 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
obj := &proto.AnswerObject{Oid: xid.Oid} obj := &proto.AnswerObject{Oid: xid.Oid}
var data sql.RawBytes var data sql.RawBytes
// XXX pid = getReadablePartition (= oid % Np; error if pid not readable) // FIXME pid = getReadablePartition (= oid % Np; error if pid not readable)
pid := 0
err = b.query1(ctx, err = b.query1(ctx,
"SELECT tid, compression, data.hash, value, value_tid" + "SELECT tid, compression, data.hash, value, value_tid" +
...@@ -231,44 +239,71 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject ...@@ -231,44 +239,71 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
} }
/* func (b *Backend) config(ctx context.Context, key string, pvalue interface{}) error {
func (b *Backend) config(key string) (..., error) { return b.query1(ctx, "SELECT value FROM config WHERE name=?", key).Scan(pvalue)
// XXX cache
var value string
err := b.query1("SELECT value FROM config WHERE name=?", key).Scan(&value)
if err != nil {
if err = sql.ErrNoRows {
// XXX
}
// XXX
}
return value, nil
} }
*/
// ---- open by URL ---- // ---- open by URL ----
func openURL(ctx context.Context, u *url.URL) (storage.Backend, error) { func openURL(ctx context.Context, u *url.URL) (_ storage.Backend, err error) {
// TODO handle query // TODO handle query
// XXX u.Path is not always raw path - recheck and fix // XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path path := u.Host + u.Path
db, err := sql.Open("sqlite3", path) // XXX +context db, err := sql.Open("sqlite3", path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
b := &Backend{db: db, url: u.String()}
defer func() {
if err != nil {
db.Close()
}
}()
// check we can actually access db // check we can actually access db
err = db.PingContext(ctx) err = db.PingContext(ctx)
if err != nil { if err != nil {
// XXX db.Close()
return nil, err // XXX err ctx return nil, err // XXX err ctx
} }
// XXX check config("version") vs version // check schema and that our limited version can work with the db
errv := xerr.Errorv{}
checkConfig := func(name string, expect interface{}) {
pvalue := reflect.New(reflect.TypeOf(expect)).Interface()
err := b.config(ctx, name, pvalue)
// XXX prefix "b.path: config: %s:"
switch err {
case sql.ErrNoRows:
err = fmt.Errorf("not found")
case nil:
value := reflect.ValueOf(pvalue).Elem().Interface()
if value != expect {
err = fmt.Errorf("got %s; want %s", value, expect)
}
}
if err != nil {
errv.Appendf("%s: config: %s: %s", b.url, name, err)
}
}
checkConfig("version", version)
checkConfig("nid", proto.UUID(proto.STORAGE, 1))
checkConfig("partitions", 1)
checkConfig("replicas", 1)
err = errv.Err()
if err != nil {
return nil, fmt.Errorf("NEO/go POC: not ready to handle: %s", err)
}
// config("version") vs version
// config("nid") // config("nid")
// config("partitions") // config("partitions")
// config("replicas") // config("replicas")
...@@ -278,7 +313,7 @@ func openURL(ctx context.Context, u *url.URL) (storage.Backend, error) { ...@@ -278,7 +313,7 @@ func openURL(ctx context.Context, u *url.URL) (storage.Backend, error) {
// config("truncate_tid") // config("truncate_tid")
// config("_pack_tid") // config("_pack_tid")
return &Backend{db: db, url: u.String()}, nil return b, nil
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment