Commit d252963a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 18b22971
...@@ -35,11 +35,13 @@ import ( ...@@ -35,11 +35,13 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools" "lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
) )
type encoding byte // Z - pickles, M - msgpack
// ---- message encode/decode ---- // ---- message encode/decode ----
// pktEncode encodes message into raw packet. // pktEncode encodes message into raw packet.
func (zl *zLink) pktEncode(m msg) *pktBuf { func (e encoding) pktEncode(m msg) *pktBuf {
switch zl.encoding { switch e {
case 'Z': return pktEncodeZ(m) case 'Z': return pktEncodeZ(m)
case 'M': return pktEncodeM(m) case 'M': return pktEncodeM(m)
default: panic("bug") default: panic("bug")
...@@ -47,8 +49,8 @@ func (zl *zLink) pktEncode(m msg) *pktBuf { ...@@ -47,8 +49,8 @@ func (zl *zLink) pktEncode(m msg) *pktBuf {
} }
// pktDecode decodes raw packet into message. // pktDecode decodes raw packet into message.
func (zl *zLink) pktDecode(pkb *pktBuf) (msg, error) { func (e encoding) pktDecode(pkb *pktBuf) (msg, error) {
switch zl.encoding { switch e {
case 'Z': return pktDecodeZ(pkb) case 'Z': return pktDecodeZ(pkb)
case 'M': return pktDecodeM(pkb) case 'M': return pktDecodeM(pkb)
default: panic("bug") default: panic("bug")
...@@ -240,8 +242,8 @@ func derrf(format string, argv ...interface{}) error { ...@@ -240,8 +242,8 @@ func derrf(format string, argv ...interface{}) error {
// ---- encode/decode for data types ---- // ---- encode/decode for data types ----
// xuint64Unpack tries to decode packed 8-byte string as bigendian uint64 // xuint64Unpack tries to decode packed 8-byte string as bigendian uint64
func (zl *zLink) xuint64Unpack(xv interface{}) (uint64, bool) { func (e encoding) xuint64Unpack(xv interface{}) (uint64, bool) {
switch zl.encoding { switch e {
default: default:
panic("bug") panic("bug")
...@@ -270,11 +272,11 @@ func (zl *zLink) xuint64Unpack(xv interface{}) (uint64, bool) { ...@@ -270,11 +272,11 @@ func (zl *zLink) xuint64Unpack(xv interface{}) (uint64, bool) {
} }
// xuint64Pack packs v into big-endian 8-byte string // xuint64Pack packs v into big-endian 8-byte string
func (zl *zLink) xuint64Pack(v uint64) interface{} { func (e encoding) xuint64Pack(v uint64) interface{} {
var b [8]byte var b [8]byte
binary.BigEndian.PutUint64(b[:], v) binary.BigEndian.PutUint64(b[:], v)
switch zl.encoding { switch e {
default: default:
panic("bug") panic("bug")
...@@ -288,28 +290,28 @@ func (zl *zLink) xuint64Pack(v uint64) interface{} { ...@@ -288,28 +290,28 @@ func (zl *zLink) xuint64Pack(v uint64) interface{} {
} }
} }
func (zl *zLink) tidPack(tid zodb.Tid) interface{} { func (e encoding) tidPack(tid zodb.Tid) interface{} {
return zl.xuint64Pack(uint64(tid)) return e.xuint64Pack(uint64(tid))
} }
func (zl *zLink) oidPack(oid zodb.Oid) interface{} { func (e encoding) oidPack(oid zodb.Oid) interface{} {
return zl.xuint64Pack(uint64(oid)) return e.xuint64Pack(uint64(oid))
} }
func (zl *zLink) tidUnpack(xv interface{}) (zodb.Tid, bool) { func (e encoding) asTid(xv interface{}) (zodb.Tid, bool) {
v, ok := zl.xuint64Unpack(xv) v, ok := e.xuint64Unpack(xv)
return zodb.Tid(v), ok return zodb.Tid(v), ok
} }
func (zl *zLink) oidUnpack(xv interface{}) (zodb.Oid, bool) { func (e encoding) asOid(xv interface{}) (zodb.Oid, bool) {
v, ok := zl.xuint64Unpack(xv) v, ok := e.xuint64Unpack(xv)
return zodb.Oid(v), ok return zodb.Oid(v), ok
} }
// asTuple tries to decode object as tuple. XXX // asTuple tries to decode object as tuple. XXX
func (zl *zLink) asTuple(xt interface{}) (tuple, bool) { func (e encoding) asTuple(xt interface{}) (tuple, bool) {
switch zl.encoding { switch e {
default: default:
panic("bug") panic("bug")
...@@ -326,8 +328,8 @@ func (zl *zLink) asTuple(xt interface{}) (tuple, bool) { ...@@ -326,8 +328,8 @@ func (zl *zLink) asTuple(xt interface{}) (tuple, bool) {
} }
// asBytes tries to decode object as raw bytes. // asBytes tries to decode object as raw bytes.
func (zl *zLink) asBytes(xb interface{}) ([]byte, bool) { func (e encoding) asBytes(xb interface{}) ([]byte, bool) {
switch zl.encoding{ switch e {
default: default:
panic("bug") panic("bug")
...@@ -347,8 +349,8 @@ func (zl *zLink) asBytes(xb interface{}) ([]byte, bool) { ...@@ -347,8 +349,8 @@ func (zl *zLink) asBytes(xb interface{}) ([]byte, bool) {
} }
// asString tries to decode object as string. // asString tries to decode object as string.
func (zl *zLink) asString(xs interface{}) (string, bool) { func (e encoding) asString(xs interface{}) (string, bool) {
switch zl.encoding{ switch e {
default: default:
panic("bug") panic("bug")
......
...@@ -36,7 +36,7 @@ import ( ...@@ -36,7 +36,7 @@ import (
) )
type zeo struct { type zeo struct {
srv *zLink // XXX rename -> link? link *zLink
// driver client <- watcher: database commits | errors. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event watchq chan<- zodb.Event
...@@ -63,7 +63,7 @@ func (z *zeo) Sync(ctx context.Context) (head zodb.Tid, err error) { ...@@ -63,7 +63,7 @@ func (z *zeo) Sync(ctx context.Context) (head zodb.Tid, err error) {
return zodb.InvalidTid, err return zodb.InvalidTid, err
} }
head, ok := z.srv.tidUnpack(xhead) head, ok := z.link.enc.asTid(xhead)
if !ok { if !ok {
return zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xhead) return zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xhead)
} }
...@@ -83,19 +83,20 @@ func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error ...@@ -83,19 +83,20 @@ func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error
func (z *zeo) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) { func (z *zeo) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
rpc := z.rpc("loadBefore") rpc := z.rpc("loadBefore")
xres, err := rpc.call(ctx, z.srv.oidPack(xid.Oid), z.srv.tidPack(xid.At+1)) // XXX at2Before enc := z.link.enc
xres, err := rpc.call(ctx, enc.oidPack(xid.Oid), enc.tidPack(xid.At+1)) // XXX at2Before
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
// (data, serial, next_serial | None) // (data, serial, next_serial | None)
res, ok := z.srv.asTuple(xres) res, ok := enc.asTuple(xres)
if !ok || len(res) != 3 { if !ok || len(res) != 3 {
return nil, 0, rpc.ereplyf("got %#v; expect 3-tuple", xres) return nil, 0, rpc.ereplyf("got %#v; expect 3-tuple", xres)
} }
data, ok1 := z.srv.asBytes(res[0]) data, ok1 := enc.asBytes(res[0])
serial, ok2 := z.srv.tidUnpack(res[1]) serial, ok2 := enc.asTid(res[1])
// next_serial (res[2]) - just ignore // next_serial (res[2]) - just ignore
if !(ok1 && ok2) { if !(ok1 && ok2) {
...@@ -115,20 +116,21 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte ...@@ -115,20 +116,21 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte
func (z *zeo) invalidateTransaction(arg interface{}) (err error) { func (z *zeo) invalidateTransaction(arg interface{}) (err error) {
defer xerr.Context(&err, "invalidateTransaction") defer xerr.Context(&err, "invalidateTransaction")
t, ok := z.srv.asTuple(arg) enc := z.link.enc
t, ok := enc.asTuple(arg)
if !ok || len(t) != 2 { if !ok || len(t) != 2 {
return fmt.Errorf("got %#v; expect 2-tuple", arg) return fmt.Errorf("got %#v; expect 2-tuple", arg)
} }
// (tid, oidv) // (tid, oidv)
tid, ok1 := z.srv.tidUnpack(t[0]) tid, ok1 := enc.asTid(t[0])
xoidt, ok2 := z.srv.asTuple(t[1]) xoidt, ok2 := enc.asTuple(t[1])
if !(ok1 && ok2) { if !(ok1 && ok2) {
return fmt.Errorf("got (%T, %T); expect (tid, []oid)", t...) return fmt.Errorf("got (%T, %T); expect (tid, []oid)", t...)
} }
oidv := []zodb.Oid{} oidv := []zodb.Oid{}
for _, xoid := range xoidt { for _, xoid := range xoidt {
oid, ok := z.srv.oidUnpack(xoid) oid, ok := enc.asOid(xoid)
if !ok { if !ok {
return fmt.Errorf("non-oid %#v in oidv", xoid) return fmt.Errorf("non-oid %#v in oidv", xoid)
} }
...@@ -187,7 +189,7 @@ func ereplyf(addr, method, format string, argv ...interface{}) *errorUnexpectedR ...@@ -187,7 +189,7 @@ func ereplyf(addr, method, format string, argv ...interface{}) *errorUnexpectedR
// rpc returns rpc object handy to make calls/create errors // rpc returns rpc object handy to make calls/create errors
func (z *zeo) rpc(method string) rpc { func (z *zeo) rpc(method string) rpc {
return rpc{zl: z.srv, method: method} return rpc{zl: z.link, method: method}
} }
type rpc struct { type rpc struct {
...@@ -242,7 +244,7 @@ func (r rpc) excError(exc string, argv tuple) error { ...@@ -242,7 +244,7 @@ func (r rpc) excError(exc string, argv tuple) error {
return r.ereplyf("poskeyerror: got %#v; expect 1-tuple", argv...) return r.ereplyf("poskeyerror: got %#v; expect 1-tuple", argv...)
} }
oid, ok := r.zl.oidUnpack(argv[0]) oid, ok := r.zl.enc.asOid(argv[0])
if !ok { if !ok {
return r.ereplyf("poskeyerror: got (%v); expect (oid)", argv[0]) return r.ereplyf("poskeyerror: got (%v); expect (oid)", argv[0])
} }
...@@ -259,14 +261,15 @@ func (r rpc) excError(exc string, argv tuple) error { ...@@ -259,14 +261,15 @@ func (r rpc) excError(exc string, argv tuple) error {
// zeo5Error decodes arg of reply with msgExcept flag set and returns // zeo5Error decodes arg of reply with msgExcept flag set and returns
// corresponding error. // corresponding error.
func (r rpc) zeo5Error(arg interface{}) error { func (r rpc) zeo5Error(arg interface{}) error {
enc := r.zl.enc
// ('type', (arg1, arg2, arg3, ...)) // ('type', (arg1, arg2, arg3, ...))
texc, ok := r.zl.asTuple(arg) texc, ok := enc.asTuple(arg)
if !ok || len(texc) != 2 { if !ok || len(texc) != 2 {
return r.ereplyf("except5: got %#v; expect 2-tuple", arg) return r.ereplyf("except5: got %#v; expect 2-tuple", arg)
} }
exc, ok1 := r.zl.asString(texc[0]) exc, ok1 := enc.asString(texc[0])
argv, ok2 := r.zl.asTuple(texc[1]) argv, ok2 := enc.asTuple(texc[1])
if !(ok1 && ok2) { if !(ok1 && ok2) {
return r.ereplyf("except5: got (%T, %T); expect (str, tuple)", texc...) return r.ereplyf("except5: got (%T, %T); expect (str, tuple)", texc...)
} }
...@@ -382,7 +385,19 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -382,7 +385,19 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented") return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented")
} }
zl, err := dialZLink(ctx, net, addr) // XXX + methodTable {invalidateTransaction tid, oidv} -> ... z := &zeo{watchq: opt.Watchq, url: url}
//zl, err := dialZLink(ctx, net, addr) // XXX + methodTable {invalidateTransaction tid, oidv} -> ...
zl, err := dialZLink(ctx, net, addr,
/*
// notifyTab
map[string]func(interface{})error {
"invalidateTransaction": z.invalidateTransaction,
},
// serveTab
nil,
*/
)
if err != nil { if err != nil {
return nil, zodb.InvalidTid, err return nil, zodb.InvalidTid, err
} }
...@@ -394,7 +409,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -394,7 +409,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
}() }()
z := &zeo{srv: zl, watchq: opt.Watchq, url: url} z.link = zl
rpc := z.rpc("register") rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly) xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
...@@ -404,7 +419,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -404,7 +419,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// register returns last_tid in ZEO5 but nothing earlier. // register returns last_tid in ZEO5 but nothing earlier.
// if so we have to retrieve last_tid in another RPC. // if so we have to retrieve last_tid in another RPC.
if z.srv.ver < "5" { if z.link.ver < "5" {
rpc = z.rpc("lastTransaction") rpc = z.rpc("lastTransaction")
xlastTid, err = rpc.call(ctx) xlastTid, err = rpc.call(ctx)
if err != nil { if err != nil {
...@@ -412,7 +427,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -412,7 +427,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
} }
} }
lastTid, ok := zl.tidUnpack(xlastTid) lastTid, ok := zl.enc.asTid(xlastTid)
if !ok { if !ok {
return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid) return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid)
} }
...@@ -458,7 +473,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -458,7 +473,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
} }
func (z *zeo) Close() error { func (z *zeo) Close() error {
err := z.srv.Close() err := z.link.Close()
if z.watchq != nil { if z.watchq != nil {
close(z.watchq) close(z.watchq)
} }
......
...@@ -43,7 +43,7 @@ type ZEOSrv interface { ...@@ -43,7 +43,7 @@ type ZEOSrv interface {
Addr() string // unix-socket address of the server Addr() string // unix-socket address of the server
Close() error Close() error
Encoding() byte // encoding used on the wire - 'M' or 'Z' Encoding() encoding // encoding used on the wire - 'M' or 'Z'
} }
// ZEOPySrv represents running ZEO/py server. // ZEOPySrv represents running ZEO/py server.
...@@ -134,10 +134,10 @@ func (z *ZEOPySrv) Close() (err error) { ...@@ -134,10 +134,10 @@ func (z *ZEOPySrv) Close() (err error) {
return err return err
} }
func (z *ZEOPySrv) Encoding() byte { func (z *ZEOPySrv) Encoding() encoding {
encoding := byte('Z') enc := encoding('Z')
if z.opt.msgpack { encoding = byte('M') } if z.opt.msgpack { enc = encoding('M') }
return encoding return enc
} }
...@@ -227,8 +227,8 @@ func TestHandshake(t *testing.T) { ...@@ -227,8 +227,8 @@ func TestHandshake(t *testing.T) {
}() }()
ewant := zsrv.Encoding() ewant := zsrv.Encoding()
if zlink.encoding != ewant { if zlink.enc != ewant {
t.Fatalf("handshake: encoding=%c ; want %c", zlink.encoding, ewant) t.Fatalf("handshake: encoding=%c ; want %c", zlink.enc, ewant)
} }
}) })
} }
......
...@@ -73,8 +73,8 @@ type zLink struct { ...@@ -73,8 +73,8 @@ type zLink struct {
down1 sync.Once down1 sync.Once
errClose error // error got from .link.Close() errClose error // error got from .link.Close()
ver string // protocol version in use (without "Z" or "M" prefix) ver string // protocol version in use (without "Z" or "M" prefix)
encoding byte // protocol encoding in use ('Z' or 'M') enc encoding // protocol encoding in use ('Z' or 'M')
} }
// (called after handshake) // (called after handshake)
...@@ -120,7 +120,7 @@ func (zl *zLink) Close() error { ...@@ -120,7 +120,7 @@ func (zl *zLink) Close() error {
// serveRecv handles receives from underlying link and dispatches them to calls // serveRecv handles receives from underlying link and dispatches them to calls
// waiting results. // waiting for results, to notify and serve handlers .
func (zl *zLink) serveRecv() { func (zl *zLink) serveRecv() {
defer zl.serveWg.Done() defer zl.serveWg.Done()
for { for {
...@@ -143,7 +143,7 @@ func (zl *zLink) serveRecv() { ...@@ -143,7 +143,7 @@ func (zl *zLink) serveRecv() {
// serveRecv1 handles 1 incoming packet. // serveRecv1 handles 1 incoming packet.
func (zl *zLink) serveRecv1(pkb *pktBuf) error { func (zl *zLink) serveRecv1(pkb *pktBuf) error {
// decode packet // decode packet
m, err := zl.pktDecode(pkb) m, err := zl.enc.pktDecode(pkb)
if err != nil { if err != nil {
return err return err
} }
...@@ -245,7 +245,7 @@ func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) ( ...@@ -245,7 +245,7 @@ func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (
zl.callMu.Unlock() zl.callMu.Unlock()
// (msgid, async, method, argv) // (msgid, async, method, argv)
pkb := zl.pktEncode(msg{ pkb := zl.enc.pktEncode(msg{
msgid: callID, msgid: callID,
flags: 0, flags: 0,
method: method, method: method,
...@@ -279,7 +279,7 @@ func (zl *zLink) reply(msgid int64, res interface{}) (err error) { ...@@ -279,7 +279,7 @@ func (zl *zLink) reply(msgid int64, res interface{}) (err error) {
} }
}() }()
pkb := zl.pktEncode(msg{ pkb := zl.enc.pktEncode(msg{
msgid: msgid, msgid: msgid,
flags: msgAsync, flags: msgAsync,
method: ".reply", method: ".reply",
...@@ -480,7 +480,7 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) { ...@@ -480,7 +480,7 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
} }
// use wire encoding preferred by server // use wire encoding preferred by server
encoding := proto[0] enc := encoding(proto[0])
// extract peer version from protocol string and choose actual // extract peer version from protocol string and choose actual
// version to use as min(peer, mybest) // version to use as min(peer, mybest)
...@@ -505,14 +505,14 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) { ...@@ -505,14 +505,14 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
// version selected - now send it back to server as // version selected - now send it back to server as
// corresponding handshake reply. // corresponding handshake reply.
pkb = allocPkb() pkb = allocPkb()
pkb.WriteString(fmt.Sprintf("%c%s", encoding, ver)) pkb.WriteString(fmt.Sprintf("%c%s", enc, ver))
err = zl.sendPkt(pkb) err = zl.sendPkt(pkb)
if err != nil { if err != nil {
return fmt.Errorf("tx: %s", err) return fmt.Errorf("tx: %s", err)
} }
zl.ver = ver zl.ver = ver
zl.encoding = encoding zl.enc = enc
close(hok) close(hok)
return nil return nil
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment