Commit ed46b665 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 46fb8ddd
...@@ -60,9 +60,16 @@ type zLink struct { ...@@ -60,9 +60,16 @@ type zLink struct {
callID int64 // ID for next call; incremented at every call callID int64 // ID for next call; incremented at every call
// methods peer can invoke // methods peer can invoke
methTab map[string]func(interface{}) // methods are served in parallel
serveTab map[string]func(context.Context, interface{})interface{}
// notifications peer can send
// notifications are invoked in order
notifyTab map[string]func(interface{})
serveWg sync.WaitGroup // for serveRecv and serveTab spawned from it
serveCtx context.Context // serveTab handlers are called with this ctx
serveCancel func() // to cancel serveCtx
serveWg sync.WaitGroup // for serveRecv
down1 sync.Once down1 sync.Once
errClose error // error got from .link.Close() errClose error // error got from .link.Close()
...@@ -82,7 +89,12 @@ var errLinkClosed = errors.New("zlink is closed") ...@@ -82,7 +89,12 @@ var errLinkClosed = errors.New("zlink is closed")
// shutdown shuts zlink down and sets error (XXX) which // shutdown shuts zlink down and sets error (XXX) which
func (zl *zLink) shutdown(err error) { func (zl *zLink) shutdown(err error) {
zl.down1.Do(func() { zl.down1.Do(func() {
// XXX what with err? if err != nil {
log.Printf("%s: %s", zl.link.RemoteAddr(), err)
// XXX what else to do with err?
}
zl.serveCancel()
zl.errClose = zl.link.Close() zl.errClose = zl.link.Close()
// notify call waiters // notify call waiters
...@@ -120,11 +132,7 @@ func (zl *zLink) serveRecv() { ...@@ -120,11 +132,7 @@ func (zl *zLink) serveRecv() {
err = zl.serveRecv1(pkb) err = zl.serveRecv1(pkb)
pkb.Free() pkb.Free()
// XXX ratelimit / only incstat?
// XXX -> shutdown zlink on error.
if err != nil { if err != nil {
log.Printf("%s: rx: %s", zl.link.RemoteAddr(), err)
zl.shutdown(err) zl.shutdown(err)
return return
} }
...@@ -139,8 +147,7 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error { ...@@ -139,8 +147,7 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
return err return err
} }
// "invalidateTransaction", tid, oidv // message is reply
if m.method == ".reply" { if m.method == ".reply" {
// lookup call by msgid and dispatch result to waiter // lookup call by msgid and dispatch result to waiter
zl.callMu.Lock() zl.callMu.Lock()
...@@ -158,17 +165,42 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error { ...@@ -158,17 +165,42 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
return nil return nil
} }
// message is notification
// XXX currently only async/ no other flags handled if m.flags & msgAsync != 0 {
f := zl.methTab[m.method] // notifications go in-order
f := zl.notifyTab[m.method]
if f == nil { if f == nil {
// XXX reply "unknown method" if reply is possible return fmt.Errorf(".%d: unknown notification %q", m.msgid, m.method)
// XXX return error if reply is not possible
// XXX (ZEO/py always disconnects on error)
return fmt.Errorf(".%d: unknown method=%q", m.msgid, m.method)
} }
f(m.arg) f(m.arg)
return nil
}
// message is call
// calls are served in parallel
f := zl.serveTab[m.method]
if f == nil {
// disconnect on call to unknown method
err = fmt.Errorf("unknown method %q", m.method)
// XXX error -> exception
zl.reply(m.msgid, err) // ignore error
return fmt.Errorf(".%d: %s", m.msgid, err)
}
zl.serveWg.Add(1)
go func() {
defer zl.serveWg.Done()
res := f(zl.serveCtx, m.arg)
// XXX error -> exception
// send result back
err := zl.reply(m.msgid, res)
if err != nil {
zl.shutdown(err)
}
}()
return nil return nil
} }
...@@ -191,16 +223,13 @@ const ( ...@@ -191,16 +223,13 @@ const (
// Call makes 1 RPC call to server, waits for reply and returns it. // Call makes 1 RPC call to server, waits for reply and returns it.
func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, _ error) { func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, err error) {
// defer func() ... defer func() {
reply, err := zl._call(ctx, method, argv...)
if err != nil { if err != nil {
err = fmt.Errorf("%s: call %s: %s", zl.link.RemoteAddr(), method, err) err = fmt.Errorf("%s: call %s: %s", zl.link.RemoteAddr(), method, err)
} }
return reply, err }()
}
func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{}) (reply msg, _ error) {
rxc := make(chan msg, 1) // reply will go here rxc := make(chan msg, 1) // reply will go here
// register our call // register our call
...@@ -223,7 +252,7 @@ func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{}) ...@@ -223,7 +252,7 @@ func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{})
}) })
// ok, pkt is ready to go // ok, pkt is ready to go
err := zl.sendPkt(pkb) // XXX ctx cancel err = zl.sendPkt(pkb) // XXX ctx cancel
if err != nil { if err != nil {
return msg{}, err return msg{}, err
} }
...@@ -242,6 +271,23 @@ func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{}) ...@@ -242,6 +271,23 @@ func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{})
return reply, nil return reply, nil
} }
func (zl *zLink) reply(msgid int64, res interface{}) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("%s: .%d reply: %s", zl.link.RemoteAddr(), msgid, err)
}
}()
pkb := zl.pktEncode(msg{
msgid: msgid,
flags: msgAsync,
method: ".reply",
arg: res,
})
return zl.sendPkt(pkb)
}
// RegisterMethod registers f to be called when remote XXX // RegisterMethod registers f to be called when remote XXX
// FIXME -> provide methodTable to dial, so that it is available right from start without any race // FIXME -> provide methodTable to dial, so that it is available right from start without any race
func (zl *zLink) RegisterMethod(method string, f func(arg interface{})) { func (zl *zLink) RegisterMethod(method string, f func(arg interface{})) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment