Commit bbc76236 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c70467a6
...@@ -36,10 +36,10 @@ import ( ...@@ -36,10 +36,10 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
const version = 2
// ---- schema ---- // ---- schema ----
const schemaVersion = 2
// table "config" stores configuration parameters which affect the persistent data. // table "config" stores configuration parameters which affect the persistent data.
// //
// XXX // XXX
...@@ -79,7 +79,8 @@ const obj = ` ...@@ -79,7 +79,8 @@ const obj = `
tid INTEGER NOT NULL, tid INTEGER NOT NULL,
data_id INTEGER, -- -> data.id data_id INTEGER, -- -> data.id
value_tid INTEGER, -- data_tid for zodb value_tid INTEGER, -- data_tid for zodb
-- XXX ^^^ can be NOT NULL with 0 serving instead
PRIMARY KEY (partition, oid, tid) PRIMARY KEY (partition, oid, tid)
` `
// `(partition, tid, oid)` // `(partition, tid, oid)`
...@@ -172,7 +173,7 @@ func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) { ...@@ -172,7 +173,7 @@ func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
err := b.query1(ctx, err := b.query1(ctx,
"SELECT MAX(oid) FROM pt, obj WHERE nid=? AND rid=partition", "SELECT MAX(oid) FROM pt, obj WHERE nid=? AND rid=partition",
myID).Scan(lastOid) myID).Scan(&lastOid)
if err != nil { if err != nil {
// no objects // no objects
...@@ -193,19 +194,31 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject ...@@ -193,19 +194,31 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
} }
}() }()
obj := &proto.AnswerObject{Oid: xid.Oid} obj := &proto.AnswerObject{Oid: xid.Oid, DataSerial: 0}
var data sql.RawBytes // TODO reenable, but XXX we have to use Query, not QueryRow for RawBytes support
//var data sql.RawBytes
var data []byte
// hash is variable-length BLOB - Scan refuses to put it into [20]byte
//var hash sql.RawBytes
var hash []byte
// obj.value_tid can be null
var valueTid sql.NullInt64 // XXX ok not to uint64 - max tid is max signed int64
// FIXME pid = getReadablePartition (= oid % Np; error if pid not readable) // FIXME pid = getReadablePartition (= oid % Np; error if pid not readable)
pid := 0 pid := 0
// XXX somehow detect errors in sql misuse and log them as 500 without reporting to client?
// XXX such errors start with "unsupported Scan, "
err = b.query1(ctx, err = b.query1(ctx,
"SELECT tid, compression, data.hash, value, value_tid" + "SELECT tid, compression, data.hash, value, value_tid" +
" FROM obj LEFT JOIN data ON obj.data_id = data.id" + " FROM obj LEFT JOIN data ON obj.data_id = data.id" +
" WHERE partition=? AND oid=? AND tid<=?" + " WHERE partition=? AND oid=? AND tid<=?" +
" ORDER BY tid DESC LIMIT 1", " ORDER BY tid DESC LIMIT 1",
pid, xid.Oid, xid.At). pid, xid.Oid, xid.At).
Scan(&obj.Serial, &obj.Compression, &obj.Checksum, &data, &obj.DataSerial) Scan(&obj.Serial, &obj.Compression, &hash, &data, &valueTid)
if err != nil { if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
...@@ -231,6 +244,18 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject ...@@ -231,6 +244,18 @@ func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject
return nil, err return nil, err
} }
// hash -> obj.Checksum
if len(hash) != len(obj.Checksum) {
return nil, fmt.Errorf("data corrupt: len(hash) = %d", len(hash))
}
copy(obj.Checksum[:], hash)
// valueTid -> obj.DataSerial
if valueTid.Valid {
obj.DataSerial = zodb.Tid(valueTid.Int64)
}
// data -> obj.Data // data -> obj.Data
obj.Data = mem.BufAlloc(len(data)) obj.Data = mem.BufAlloc(len(data))
copy(obj.Data.Data, data) copy(obj.Data.Data, data)
...@@ -309,7 +334,7 @@ func openURL(ctx context.Context, u *url.URL) (_ storage.Backend, err error) { ...@@ -309,7 +334,7 @@ func openURL(ctx context.Context, u *url.URL) (_ storage.Backend, err error) {
} }
} }
checkConfig("version", version) checkConfig("version", schemaVersion)
checkConfig("nid", proto.UUID(proto.STORAGE, 1)) checkConfig("nid", proto.UUID(proto.STORAGE, 1))
checkConfig("partitions", 1) checkConfig("partitions", 1)
checkConfig("replicas", 1) checkConfig("replicas", 1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment