Commit b65f6d0f authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb: Teach ZODB/go to access ZEO (draft)

For the reference on deco (performance, frequency not fixed):

	name                           time/object
	deco/fs1/zhash.py              15.8µs ± 2%
	deco/fs1/zhash.py-P16           116µs ±12%
	deco/fs1/zhash.go              2.60µs ± 0%
	deco/fs1/zhash.go+prefetch128  3.70µs ±11%
	deco/fs1/zhash.go-P16          13.4µs ±43%
	deco/zeo/zhash.py               316µs ± 7%
	deco/zeo/zhash.py-P16          2.68ms ± 7%
	deco/zeo/zhash.go               111µs ± 2%
	deco/zeo/zhash.go+prefetch128  57.7µs ± 2%
	deco/zeo/zhash.go-P16          1.23ms ± 5%

and in particular it shows that with the same ZEO/py server, the latency
to load an object via py client is ~ 3x worse compared to the latency to
load the same object via hereby Go client.

The performance was obtained via forthcoming neotest, and in particular
ZEO/go client will be also used in forthcoming zwrk (no analog on python side).

See http://navytux.spb.ru/~kirr/neo.html#performance-tests for details.

Tests: pending.
parent 2ee495ce
...@@ -1165,6 +1165,8 @@ var ErrPktTooBig = errors.New("packet too big") ...@@ -1165,6 +1165,8 @@ var ErrPktTooBig = errors.New("packet too big")
// recvPkt receives raw packet from peer. // recvPkt receives raw packet from peer.
// //
// rx error, if any, is returned as is and is analyzed in serveRecv // rx error, if any, is returned as is and is analyzed in serveRecv
//
// XXX dup in ZEO.
func (nl *NodeLink) recvPkt() (*pktBuf, error) { func (nl *NodeLink) recvPkt() (*pktBuf, error) {
// FIXME if rxbuf is non-empty - first look there for header and then if // FIXME if rxbuf is non-empty - first look there for header and then if
// we know size -> allocate pkt with that size. // we know size -> allocate pkt with that size.
......
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package zeo provides simple ZEO client
package zeo
import (
"context"
"encoding/binary"
"fmt"
"net/url"
"sync"
pickle "github.com/kisielk/og-rek"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/zodb"
)
type zeo struct {
srv *zLink
// state we get from server by way of server notifications.
mu sync.Mutex
lastTid zodb.Tid
url string // we were opened via this
}
func (z *zeo) LastTid(ctx context.Context) (zodb.Tid, error) {
z.mu.Lock()
defer z.mu.Unlock()
return z.lastTid, nil
}
func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
// defer func() ...
buf, serial, err := z._Load(ctx, xid)
if err != nil {
err = &zodb.OpError{URL: z.URL(), Op: "load", Args: xid, Err: err}
}
return buf, serial, err
}
func (z *zeo) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
rpc := z.rpc("loadBefore")
xres, err := rpc.call(ctx, oidPack(xid.Oid), tidPack(xid.At+1)) // XXX at2Before
if err != nil {
return nil, 0, err
}
// (data, serial, next_serial | None)
res, ok := xres.(pickle.Tuple)
if !ok || len(res) != 3 {
return nil, 0, rpc.ereplyf("got %#v; expect 3-tuple", res)
}
data, ok1 := res[0].(string)
serial, ok2 := tidUnpack(res[1])
// next_serial (res[2]) - just ignore
if !(ok1 && ok2) {
return nil, 0, rpc.ereplyf("got (%T, %v, %T); expect (str, tid, .)", res...)
}
return &mem.Buf{Data: mem.Bytes(data)}, serial, nil
}
func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
panic("TODO")
}
// errorUnexpectedReply is returned by zLink.Call callers when reply was
// received successfully, but is not what the caller expected.
type errorUnexpectedReply struct {
Addr string
Method string
Err error
}
func (e *errorUnexpectedReply) Error() string {
return fmt.Sprintf("%s: call %s: unexpected reply: %s", e.Addr, e.Method, e.Err)
}
func ereplyf(addr, method, format string, argv ...interface{}) *errorUnexpectedReply {
return &errorUnexpectedReply{
Addr: addr,
Method: method,
Err: fmt.Errorf(format, argv...),
}
}
// rpc returns rpc object handy to make calls/create errors
func (z *zeo) rpc(method string) rpc {
return rpc{zl: z.srv, method: method}
}
type rpc struct {
zl *zLink
method string
}
// rpcExcept represents generic exception
type rpcExcept struct {
exc string
argv []interface{}
}
func (r *rpcExcept) Error() string {
return fmt.Sprintf("exception: %s %q", r.exc, r.argv)
}
func (r rpc) call(ctx context.Context, argv ...interface{}) (interface{}, error) {
reply, err := r.zl.Call(ctx, r.method, argv...)
if err != nil {
return nil, err
}
if reply.flags & msgExcept == 0 {
return reply.arg, nil
}
// exception - let's decode it
// ('type', (arg1, arg2, arg3, ...))
texc, ok := reply.arg.(pickle.Tuple)
if !ok || len(texc) != 2 {
return nil, r.ereplyf("except: got %#v; expect 2-tuple", reply.arg)
}
exc, ok1 := texc[0].(string)
argv, ok2 := texc[1].(pickle.Tuple)
if !(ok1 && ok2) {
return nil, r.ereplyf("except: got (%T, %T); expect (str, tuple)", texc...)
}
// translate well-known exceptions
switch exc {
case "ZODB.POSException.POSKeyError":
// POSKeyError(oid)
if len(argv) != 1 {
return nil, r.ereplyf("poskeyerror: got %#v; expect 1-tuple", argv...)
}
oid, ok := oidUnpack(argv[0])
if !ok {
return nil, r.ereplyf("poskeyerror: got (%v); expect (oid)", argv[0])
}
// XXX POSKeyError does not allow to distinguish whether it is
// no object at all or object exists and its data was not found
// for tid_before. IOW we cannot translate to zodb.NoDataError
return nil, &zodb.NoObjectError{Oid: oid}
}
return nil, &rpcExcept{exc, argv}
}
func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply {
return ereplyf(r.zl.link.RemoteAddr().String(), r.method, format, argv...)
}
// ---- open ----
func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (_ zodb.IStorageDriver, err error) {
url := u.String()
defer xerr.Contextf(&err, "open %s:", url)
// zeo://host:port/path?storage=...&...
var net xnet.Networker
var addr string
if u.Host != "" {
net = xnet.NetPlain("tcp")
addr = u.Host
} else {
net = xnet.NetPlain("unix")
addr = u.Path
}
storageID := "1"
q := u.Query()
if s := q.Get("storage"); s != "" {
storageID = s
}
if !opt.ReadOnly {
return nil, fmt.Errorf("TODO write mode not implemented")
}
zl, err := dialZLink(ctx, net, addr)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
zl.Close()
}
}()
z := &zeo{srv: zl, url: url}
rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
if err != nil {
return nil, err
}
lastTid, ok := tidUnpack(xlastTid) // XXX -> xlastTid -> scan
if !ok {
return nil, rpc.ereplyf("got %v; expect tid", xlastTid)
}
z.lastTid = lastTid
//call('get_info') -> {}str->str, ex // XXX can be omitted
/*
{'interfaces': (('ZODB.interfaces', 'IStorageRestoreable'),
('ZODB.interfaces', 'IStorageIteration'),
('ZODB.interfaces', 'IStorageUndoable'),
('ZODB.interfaces', 'IStorageCurrentRecordIteration'),
('ZODB.interfaces', 'IExternalGC'),
('ZODB.interfaces', 'IStorage'),
('zope.interface', 'Interface')),
'length': 2128,
'name': '/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/neo/t/var/wczblk1-8/fs1/data.fs',
'size': 8630075,
'supportsUndo': True,
'supports_record_iternext': True})
*/
return z, nil
}
func (z *zeo) Close() error {
return z.srv.Close()
}
func (z *zeo) URL() string {
return z.url
}
func init() {
zodb.RegisterDriver("zeo", openByURL)
}
// ---- oid/tid packing ----
// xuint64Unpack tries to decode packed 8-byte string as bigendian uint64
func xuint64Unpack(xv interface{}) (uint64, bool) {
s, ok := xv.(string)
if !ok || len(s) != 8 {
return 0, false
}
return binary.BigEndian.Uint64(mem.Bytes(s)), true
}
// xuint64Pack packs v into big-endian 8-byte string
func xuint64Pack(v uint64) string {
var b [8]byte
binary.BigEndian.PutUint64(b[:], v)
return mem.String(b[:])
}
func tidPack(tid zodb.Tid) string {
return xuint64Pack(uint64(tid))
}
func oidPack(oid zodb.Oid) string {
return xuint64Pack(uint64(oid))
}
func tidUnpack(xv interface{}) (zodb.Tid, bool) {
v, ok := xuint64Unpack(xv)
return zodb.Tid(v), ok
}
func oidUnpack(xv interface{}) (zodb.Oid, bool) {
v, ok := xuint64Unpack(xv)
return zodb.Oid(v), ok
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zeo
// RPC calls client<->server
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"sync"
"golang.org/x/sync/errgroup"
pickle "github.com/kisielk/og-rek"
"github.com/someonegg/gocontainer/rbuf"
"lab.nexedi.com/kirr/go123/xbytes"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
)
const (
protocolVersion = "Z5"
pktHeaderLen = 4
)
// zLink is ZEO connection between client (local end) and server (remote end).
//
// zLink provides service to make RPC requests.
// XXX and receive notification from server (server sends invalidations)
//
// create zLink via dialZLink or handshake.
type zLink struct {
link net.Conn // underlying network
rxbuf rbuf.RingBuf // buffer for reading from link
// calls in-flight
callMu sync.Mutex
callTab map[int64]chan msg // msgid -> rxc for that call; nil when closed
callID int64 // ID for next call; incremented at every call
serveWg sync.WaitGroup // for serveRecv
down1 sync.Once
errClose error // error got from .link.Close()
}
// (called after handshake)
func (zl *zLink) start() {
zl.callTab = make(map[int64]chan msg)
zl.serveWg.Add(1)
go zl.serveRecv()
}
var errLinkClosed = errors.New("zlink is closed")
// shutdown shuts zlink down and sets error (XXX) which
func (zl *zLink) shutdown(err error) {
zl.down1.Do(func() {
// XXX what with err?
zl.errClose = zl.link.Close()
// notify call waiters
zl.callMu.Lock()
callTab := zl.callTab
zl.callTab = nil
zl.callMu.Unlock()
for _, rxc := range callTab {
rxc <- msg{arg: nil} // notify link was closed XXX ok? or err explicitly?
}
})
}
func (zl *zLink) Close() error {
zl.shutdown(nil)
zl.serveWg.Wait() // wait in case shutdown was called from serveRecv
return zl.errClose
}
// serveRecv handles receives from underlying link and dispatches them to calls
// waiting results.
func (zl *zLink) serveRecv() {
defer zl.serveWg.Done()
for {
// receive 1 packet
pkb, err := zl.recvPkt()
if err != nil {
zl.shutdown(err)
return
}
err = zl.serveRecv1(pkb)
pkb.Free()
// XXX ratelimit / only incstat?
if err != nil {
log.Printf("%s: rx: %s", zl.link.RemoteAddr(), err)
}
}
}
// serveRecv1 handles 1 incoming packet.
func (zl *zLink) serveRecv1(pkb *pktBuf) error {
// decode packet
m, err := pktDecode(pkb)
if err != nil {
return err
}
if m.method != ".reply" {
// TODO add notification channel (server calls client by itself")
return fmt.Errorf(".%d: method=%q; expected \".reply\"", m.msgid, m.method)
}
// lookup call by msgid and dispatch result to waiter
zl.callMu.Lock()
rxc := zl.callTab[m.msgid]
if rxc != nil {
delete(zl.callTab, m.msgid)
}
zl.callMu.Unlock()
if rxc == nil {
return fmt.Errorf(".%d: unexpected reply", m.msgid)
}
rxc <- m
return nil
}
// msg represents 1 message.
type msg struct {
msgid int64
flags msgFlags
method string
arg interface{} // can be e.g. (arg1, arg2, ...)
}
type msgFlags int64
const (
msgAsync msgFlags = 1 // message does not need a reply
msgExcept = 2 // exception was raised on remote side
)
func derrf(format string, argv ...interface{}) error {
return fmt.Errorf("decode: "+format, argv...)
}
// pktDecode decodes raw packet into message
func pktDecode(pkb *pktBuf) (msg, error) {
var m msg
// must be (msgid, False|0, ".reply", res)
d := pickle.NewDecoder(bytes.NewReader(pkb.Payload()))
xpkt, err := d.Decode()
if err != nil {
return m, err
}
tpkt, ok := xpkt.(pickle.Tuple) // XXX also list?
if !ok {
return m, derrf("got %T; expected tuple", xpkt)
}
if len(tpkt) != 4 {
return m, derrf("len(msg-tuple)=%d; expected 4", len(tpkt))
}
m.msgid, ok = pickletools.Xint64(tpkt[0])
if !ok {
return m, derrf("msgid: got %T; expected int", tpkt[0])
}
flags, ok := pickletools.Xint64(tpkt[1])
if !ok {
return m, derrf("flags: got %T; expected int", tpkt[1])
}
// XXX check flags are in range?
m.flags = msgFlags(flags)
m.method, ok = tpkt[2].(string)
if !ok {
return m, derrf(".%d: method: got %T; expected str", m.msgid, tpkt[2])
}
m.arg = tpkt[3]
return m, nil
}
// call makes 1 RPC call to server, waits for reply and returns it.
func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, _ error) {
// defer func() ...
reply, err := zl._call(ctx, method, argv...)
if err != nil {
err = fmt.Errorf("%s: call %s: %s", zl.link.RemoteAddr(), method, err)
}
return reply, err
}
func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{}) (reply msg, _ error) {
rxc := make(chan msg, 1) // reply will go here
// register our call
zl.callMu.Lock()
if zl.callTab == nil {
zl.callMu.Unlock()
return msg{}, errLinkClosed
}
callID := zl.callID
zl.callID++
zl.callTab[callID] = rxc
zl.callMu.Unlock()
// (msgid, async, method, argv)
pkb := allocPkb()
p := pickle.NewEncoder(pkb)
err := p.Encode(pickle.Tuple{callID, false, method, pickle.Tuple(argv)})
if err != nil {
panic(err) // all our types are expected to be supported by pickle
}
// ok, pkt is ready to go
err = zl.sendPkt(pkb) // XXX ctx cancel
if err != nil {
return msg{}, err
}
select {
case <-ctx.Done():
return msg{}, ctx.Err()
case reply = <-rxc:
if reply.arg == nil {
// we were woken up because of shutdown
return msg{}, errLinkClosed
}
}
return reply, nil
}
// ---- raw IO ----
// pktBuf is buffer with packet data.
//
// alloc via allocPkb and free via pkb.Free.
// similar to skb in Linux.
type pktBuf struct {
data []byte
}
// Fixup fixes packet length in header according to current packet data.
func (pkb *pktBuf) Fixup() {
binary.BigEndian.PutUint32(pkb.data, uint32(len(pkb.data) - pktHeaderLen))
}
// Bytes returns whole buffer data including header and payload.
func (pkb *pktBuf) Bytes() []byte {
return pkb.data
}
// Payload returns payload part of buffer data.
func (pkb *pktBuf) Payload() []byte {
return pkb.data[pktHeaderLen:]
}
var pkbPool = sync.Pool{New: func() interface{} {
return &pktBuf{make([]byte, 0, 4096)}
}}
func allocPkb() *pktBuf {
pkb := pkbPool.Get().(*pktBuf)
pkb.data = pkb.data[:0]
pkb.Write([]byte("\x00\x00\x00\x00")) // room for header (= pktHeaderLen)
return pkb
}
func (pkb *pktBuf) Free() {
pkbPool.Put(pkb)
}
func (pkb *pktBuf) Write(p []byte) (int, error) {
pkb.data = append(pkb.data, p...)
return len(p), nil
}
func (pkb *pktBuf) WriteString(s string) (int, error) {
pkb.data = append(pkb.data, s...)
return len(s), nil
}
const dumpio = false
// sendPkt sends 1 raw ZEO packet.
//
// pkb is freed upon return.
func (zl *zLink) sendPkt(pkb *pktBuf) error {
pkb.Fixup()
_, err := zl.link.Write(pkb.Bytes())
if dumpio {
fmt.Printf("%v > %v: %q\n", zl.link.LocalAddr(), zl.link.RemoteAddr(), pkb.Bytes())
}
pkb.Free()
return err
}
// recvPkt receives 1 raw ZEO packet.
//
// the packet returned contains both header and payload.
// XXX almost dump from NEO.
func (zl *zLink) recvPkt() (*pktBuf, error) {
pkb := allocPkb()
data := pkb.data[:cap(pkb.data)]
n := 0
// next packet could be already prefetched in part by previous read
if zl.rxbuf.Len() > 0 {
δn, _ := zl.rxbuf.Read(data[:pktHeaderLen])
n += δn
}
// first read to read pkt header and hopefully rest of packet in 1 syscall
if n < pktHeaderLen {
δn, err := io.ReadAtLeast(zl.link, data[n:], pktHeaderLen - n)
if err != nil {
return nil, err
}
n += δn
}
payloadLen := binary.BigEndian.Uint32(data)
// XXX check payloadLen for max size
pktLen := int(pktHeaderLen + payloadLen)
// resize data if we don't have enough room in it
data = xbytes.Resize(data, pktLen)
data = data[:cap(data)]
// we might have more data already prefetched in rxbuf
if zl.rxbuf.Len() > 0 {
δn, _ := zl.rxbuf.Read(data[n:pktLen])
n += δn
}
// read rest of pkt data, if we need to
if n < pktLen {
δn, err := io.ReadAtLeast(zl.link, data[n:], pktLen - n)
if err != nil {
return nil, err
}
n += δn
}
// put overread data into rxbuf for next reader
if n > pktLen {
zl.rxbuf.Write(data[pktLen:n])
}
// fixup data/pkt
data = data[:n]
pkb.data = data
if dumpio {
fmt.Printf("%v < %v: %q\n", zl.link.LocalAddr(), zl.link.RemoteAddr(), pkb.data)
}
return pkb, nil
}
// ---- dial + handshake ----
// dialZLink connects to address on given network, performs ZEO protocol
// handshake and wraps the connection as zLink.
func dialZLink(ctx context.Context, net xnet.Networker, addr string) (*zLink, error) {
conn, err := net.Dial(ctx, addr)
if err != nil {
return nil, err
}
return handshake(ctx, conn)
}
// handshake performs ZEO protocol handshake just after raw connection has been
// established in between client and server.
//
// On success raw connection is returned wrapped into zLink.
// On error raw connection is closed.
func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
defer xerr.Contextf(&err, "%s: handshake", conn.RemoteAddr())
// create raw zlink since we need to do the handshake as ZEO message exchange,
// but don't start serve goroutines yet.
zl := &zLink{link: conn}
// ready when/if handshake tx/rx exchange succeeds
hok := make(chan struct{})
wg, ctx := errgroup.WithContext(ctx)
// tx/rx handshake packet
wg.Go(func() error {
pkb := allocPkb()
pkb.WriteString(protocolVersion)
err = zl.sendPkt(pkb)
if err != nil {
return err
}
pkb, err = zl.recvPkt()
if err != nil {
return err
}
rxver := string(pkb.Payload())
pkb.Free()
if rxver != protocolVersion {
return fmt.Errorf("version mismatch: remote=%q, my=%q", rxver, protocolVersion)
}
close(hok)
return nil
})
wg.Go(func() error {
select {
case <-ctx.Done():
// either ctx canceled from outside, or it is tx/rx problem.
// Close connection in any case. If it was not tx/rx
// problem - we interrupt IO there.
conn.Close()
return ctx.Err()
case <-hok:
return nil
}
})
err = wg.Wait()
if err != nil {
return nil, err
}
// handshaked ok
zl.start()
return zl, nil
}
...@@ -23,10 +23,11 @@ ...@@ -23,10 +23,11 @@
// //
// import _ "lab.nexedi.com/kirr/neo/go/zodb/wks" // import _ "lab.nexedi.com/kirr/neo/go/zodb/wks"
// //
// and this way automatically link in support for file:// ... and other // and this way automatically link in support for file:// zeo:// ... and
// common storages. // other common storages.
package wks package wks
import ( import (
_ "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" _ "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
_ "lab.nexedi.com/kirr/neo/go/zodb/storage/zeo"
) )
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment