Commit 60c4ddbe authored by Kirill Smelkov's avatar Kirill Smelkov

X go/zodb/zeo: Add support for protocols preceding ZEO5

In particular try to support ZEO4:

- during handshake we now first wait for remote server to announce its
  preferred protocol, and only then send the version we select to use.
  This is the procedure original ZEO server-client do.

- teach rpc.call to decode exceptions not only for how ZEO5 encodes them
  (marking via 2 flag in "async" field), but also on how ZEO4 and
  earlier encode them: via replying with (exc_type, exc_inst) and
  expecting client to dynamically check exc_type is a subtype of
  Exception.

- handle other protocol differences - e.g. ZEO5 returns last_tid on
  register(), while earlier versions return nothing there.
parent 0edd5129
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"net/url" "net/url"
"strings"
"sync" "sync"
pickle "github.com/kisielk/og-rek" pickle "github.com/kisielk/og-rek"
...@@ -38,7 +39,7 @@ import ( ...@@ -38,7 +39,7 @@ import (
type zeo struct { type zeo struct {
srv *zLink srv *zLink
// state we get from server by way of server notificatons. // state we get from server by way of server notifications.
mu sync.Mutex mu sync.Mutex
lastTid zodb.Tid lastTid zodb.Tid
...@@ -136,43 +137,140 @@ func (r rpc) call(ctx context.Context, argv ...interface{}) (interface{}, error) ...@@ -136,43 +137,140 @@ func (r rpc) call(ctx context.Context, argv ...interface{}) (interface{}, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if reply.flags & msgExcept == 0 {
return reply.arg, nil
}
// exception - let's decode it if r.zl.ver >= "5" {
// ('type', (arg1, arg2, arg3, ...)) // in ZEO5 exceptions are marked via flag
texc, ok := reply.arg.(pickle.Tuple) if (reply.flags & msgExcept != 0) {
if !ok || len(texc) != 2 { return nil, r.zeo5Error(reply.arg)
return nil, r.ereplyf("except: got %#v; expect 2-tuple", reply.arg) }
} else {
// in ZEO < 5 exceptions are represented by returning
// (exc_class, exc_inst) - check it
err = r.zeo4Error(reply.arg)
if err != nil {
return nil, err
}
} }
exc, ok1 := texc[0].(string) // it is not an exception
argv, ok2 := texc[1].(pickle.Tuple) return reply.arg, nil
if !(ok1 && ok2) { }
return nil, r.ereplyf("except: got (%T, %T); expect (str, tuple)", texc...)
}
// excError returns error corresponding to an exception.
//
// well-known exceptions are mapped to corresponding well-known errors - e.g.
// POSKeyError -> zodb.NoObjectError, and rest are returned wrapper into rpcExcept.
func (r rpc) excError(exc string, argv []interface{}) error {
// translate well-known exceptions // translate well-known exceptions
switch exc { switch exc {
case "ZODB.POSException.POSKeyError": case "ZODB.POSException.POSKeyError":
// POSKeyError(oid) // POSKeyError(oid)
if len(argv) != 1 { if len(argv) != 1 {
return nil, r.ereplyf("poskeyerror: got %#v; expect 1-tuple", argv...) return r.ereplyf("poskeyerror: got %#v; expect 1-tuple", argv...)
} }
oid, ok := oidUnpack(argv[0]) oid, ok := oidUnpack(argv[0])
if !ok { if !ok {
return nil, r.ereplyf("poskeyerror: got (%v); expect (oid)", argv[0]) return r.ereplyf("poskeyerror: got (%v); expect (oid)", argv[0])
} }
// XXX POSKeyError does not allow to distinguish whether it is // XXX POSKeyError does not allow to distinguish whether it is
// no object at all or object exists and its data was not found // no object at all or object exists and its data was not found
// for tid_before. IOW we cannot translate to zodb.NoDataError // for tid_before. IOW we cannot translate to zodb.NoDataError
return nil, &zodb.NoObjectError{Oid: oid} return &zodb.NoObjectError{Oid: oid}
} }
return nil, &rpcExcept{exc, argv} return &rpcExcept{exc, argv}
}
// zeo5Error decodes arg of reply with msgExcept flag set and returns
// corresponding error.
func (r rpc) zeo5Error(arg interface{}) error {
// ('type', (arg1, arg2, arg3, ...))
texc, ok := arg.(pickle.Tuple)
if !ok || len(texc) != 2 {
return r.ereplyf("except5: got %#v; expect 2-tuple", arg)
}
exc, ok1 := texc[0].(string)
argv, ok2 := texc[1].(pickle.Tuple)
if !(ok1 && ok2) {
return r.ereplyf("except5: got (%T, %T); expect (str, tuple)", texc...)
}
return r.excError(exc, argv)
}
// zeo4Error checks whether arg corresponds to exceptional reply, and if
// yes, decodes it into corresponding error.
//
// nil is returned if arg does not represent an exception.
func (r rpc) zeo4Error(arg interface{}) error {
// (exc_class, exc_inst), e.g.
// ogórek.Tuple{
// ogórek.Class{Module:"ZODB.POSException", Name:"POSKeyError"},
// ogórek.Call{
// Callable: ogórek.Class{Module:"ZODB.POSException", Name:"_recon"},
// Args: ogórek.Tuple{
// ogórek.Class{Module:"ZODB.POSException", Name:"POSKeyError"},
// map[interface {}]interface {}{
// "args":ogórek.Tuple{"\x00\x00\x00\x00\x00\x00\bP"}
// }
// }
// }
// }
targ, ok := arg.(pickle.Tuple)
if !ok || len(targ) != 2 {
return nil
}
klass, ok := targ[0].(pickle.Class)
if !ok || !isPyExceptClass(klass) {
return nil
}
exc := klass.Module + "." + klass.Name
// it is exception
call, ok := targ[1].(pickle.Call)
if !ok {
// not a call - the best we can do is to guess
return r.ereplyf("excep4: %s: inst %#v; expect call", exc, targ[1:])
}
exc = call.Callable.Module + "." + call.Callable.Name
argv := call.Args
if exc == "ZODB.POSException._recon" {
// args: (class, state)
if len(argv) != 2 {
return r.ereplyf("except4: %s: got %#v; expect 2-tuple", exc, argv)
}
klass, ok1 := argv[0].(pickle.Class)
state, ok2 := argv[1].(map[interface{}]interface{})
if !(ok1 && ok2) {
return r.ereplyf("except4: %s: got (%T, %T); expect (class, dict)", exc, argv[0], argv[1])
}
args, ok := state["args"].(pickle.Tuple)
if !ok {
return r.ereplyf("except4: %s: state.args = %#v; expect tuple", exc, state["args"])
}
exc = klass.Module + "." + klass.Name
argv = args
}
return r.excError(exc, argv)
}
// isPyExceptClass returns whether klass represents python exception
func isPyExceptClass(klass pickle.Class) bool {
// XXX this is approximation
if strings.HasSuffix(klass.Name, "Error") {
return true
}
return false
} }
func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply { func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply {
...@@ -229,6 +327,16 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (_ zodb.I ...@@ -229,6 +327,16 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (_ zodb.I
return nil, err return nil, err
} }
// register returns last_tid in ZEO5 but nothing earlier.
// if so we have to retrieve last_tid in another RPC.
if z.srv.ver < "5" {
rpc = z.rpc("lastTransaction")
xlastTid, err = rpc.call(ctx)
if err != nil {
return nil, err
}
}
lastTid, ok := tidUnpack(xlastTid) // XXX -> xlastTid -> scan lastTid, ok := tidUnpack(xlastTid) // XXX -> xlastTid -> scan
if !ok { if !ok {
return nil, rpc.ereplyf("got %v; expect tid", xlastTid) return nil, rpc.ereplyf("got %v; expect tid", xlastTid)
......
...@@ -42,10 +42,14 @@ import ( ...@@ -42,10 +42,14 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools" "lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
) )
const ( const pktHeaderLen = 4
protocolVersion = "Z5"
pktHeaderLen = 4 // we can speak this protocol versions
) var protoVersions = []string{
"3101", // last in ZEO3 series
"4", // no longer call load.
"5", // current in ZEO5 series.
}
// zLink is ZEO connection between client (local end) and server (remote end). // zLink is ZEO connection between client (local end) and server (remote end).
...@@ -66,6 +70,8 @@ type zLink struct { ...@@ -66,6 +70,8 @@ type zLink struct {
serveWg sync.WaitGroup // for serveRecv serveWg sync.WaitGroup // for serveRecv
down1 sync.Once down1 sync.Once
errClose error // error got from .link.Close() errClose error // error got from .link.Close()
ver string // protocol verision in use (without "Z" or "M" prefix)
} }
// (called after handshake) // (called after handshake)
...@@ -77,7 +83,7 @@ func (zl *zLink) start() { ...@@ -77,7 +83,7 @@ func (zl *zLink) start() {
var errLinkClosed = errors.New("zlink is closed") var errLinkClosed = errors.New("zlink is closed")
// shutdown shuts zlink down and sets errror (XXX) which // shutdown shuts zlink down and sets error (XXX) which
func (zl *zLink) shutdown(err error) { func (zl *zLink) shutdown(err error) {
zl.down1.Do(func() { zl.down1.Do(func() {
// XXX what with err? // XXX what with err?
...@@ -164,7 +170,7 @@ type msg struct { ...@@ -164,7 +170,7 @@ type msg struct {
type msgFlags int64 type msgFlags int64
const ( const (
msgAsync msgFlags = 1 // message does not need a reply msgAsync msgFlags = 1 // message does not need a reply
msgExcept = 2 // exception was raised on remote side msgExcept = 2 // exception was raised on remote side (ZEO5)
) )
func derrf(format string, argv ...interface{}) error { func derrf(format string, argv ...interface{}) error {
...@@ -265,7 +271,7 @@ func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{}) ...@@ -265,7 +271,7 @@ func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{})
// ---- raw IO ---- // ---- raw IO ----
// pktBuf is buffer for preparing outgoind packet. // pktBuf is buffer for preparing outgoing packet.
// //
// alloc via allocPkb and free via pkb.Free. // alloc via allocPkb and free via pkb.Free.
// similar to skb in Linux. // similar to skb in Linux.
...@@ -273,7 +279,7 @@ type pktBuf struct { ...@@ -273,7 +279,7 @@ type pktBuf struct {
data []byte data []byte
} }
// Fixup fixes packet length in header acccording to current packet data. // Fixup fixes packet length in header according to current packet data.
func (pkb *pktBuf) Fixup() { func (pkb *pktBuf) Fixup() {
binary.BigEndian.PutUint32(pkb.data, uint32(len(pkb.data) - pktHeaderLen)) binary.BigEndian.PutUint32(pkb.data, uint32(len(pkb.data) - pktHeaderLen))
} }
...@@ -424,25 +430,54 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) { ...@@ -424,25 +430,54 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
// tx/rx handshake packet // rx/tx handshake packet
wg.Go(func() error { wg.Go(func() error {
pkb := allocPkb() // server first announces its preferred protocol
pkb.WriteString(protocolVersion) // it is e.g. "M5", "Z5", "Z4", "Z3101", ...
err = zl.sendPkt(pkb) pkb, err := zl.recvPkt()
if err != nil { if err != nil {
return err return fmt.Errorf("rx: %s", err)
} }
pkb, err = zl.recvPkt() proto := string(pkb.Payload())
if err != nil {
return err
}
rxver := string(pkb.Payload())
pkb.Free() pkb.Free()
if rxver != protocolVersion { if !(len(proto) >= 2 && (proto[0] == 'Z' || proto[0] == 'M')) {
return fmt.Errorf("version mismatch: remote=%q, my=%q", rxver, protocolVersion) return fmt.Errorf("rx: invalid peer handshake: %q", proto)
}
// even if server announced it prefers 'M' (msgpack) it will
// accept 'Z' (pickles) as encoding. We always use 'Z'.
//
// extract peer version from protocol string and choose actual
// version to use as min(peer, mybest)
ver := proto[1:]
myBest := protoVersions[len(protoVersions)-1]
if ver > myBest {
ver = myBest
}
// verify ver is among protocol versions that we support.
there := false
for _, weSupport := range protoVersions {
if ver == weSupport {
there = true
break
}
}
if !there {
return fmt.Errorf("rx: unsupported peer version: %q", proto)
}
// version selected - now send it back to server as
// corresponding handshake reply.
pkb = allocPkb()
pkb.WriteString("Z" + ver)
err = zl.sendPkt(pkb)
if err != nil {
return fmt.Errorf("tx: %s", err)
} }
zl.ver = ver
close(hok) close(hok)
return nil return nil
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment