Commit 213b9f14 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3d0666ab
...@@ -87,7 +87,10 @@ func openClientByURL(u *url.URL) (zodb.IStorage, error) { ...@@ -87,7 +87,10 @@ func openClientByURL(u *url.URL) (zodb.IStorage, error) {
return nil, err return nil, err
} }
conn := storLink.NewConn() conn, err := storLink.NewConn()
if err != nil {
return nil, err // XXX err ctx ?
}
// TODO identify ourselves via conn // TODO identify ourselves via conn
......
...@@ -173,16 +173,18 @@ func (nl *NodeLink) newConn(connId uint32) *Conn { ...@@ -173,16 +173,18 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
} }
// NewConn creates new connection on top of node-node link // NewConn creates new connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn { func (nl *NodeLink) NewConn() (*Conn, error) {
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
if nl.connTab == nil { if nl.connTab == nil {
// XXX -> error (because NodeLink can become "closed" due to IO errors ? if atomic.LoadUint32(&nl.closeCalled) != 0 {
panic("NewConn() on closed node-link") return nil, ErrLinkClosed
}
return nil, ErrLinkStopped
} }
c := nl.newConn(nl.nextConnId) c := nl.newConn(nl.nextConnId)
nl.nextConnId += 2 nl.nextConnId += 2
return c return c, nil
} }
// close is worker for Close & friends. // close is worker for Close & friends.
......
...@@ -60,6 +60,12 @@ func xclose(c io.Closer) { ...@@ -60,6 +60,12 @@ func xclose(c io.Closer) {
exc.Raiseif(err) exc.Raiseif(err)
} }
func xnewconn(nl *NodeLink) *Conn {
c, err := nl.NewConn()
exc.Raiseif(err)
return c
}
func xaccept(nl *NodeLink) *Conn { func xaccept(nl *NodeLink) *Conn {
c, err := nl.Accept() c, err := nl.Accept()
exc.Raiseif(err) exc.Raiseif(err)
...@@ -270,7 +276,7 @@ func TestNodeLink(t *testing.T) { ...@@ -270,7 +276,7 @@ func TestNodeLink(t *testing.T) {
// Close vs Recv // Close vs Recv
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = nl1.NewConn() c = xnewconn(nl1)
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
...@@ -286,7 +292,7 @@ func TestNodeLink(t *testing.T) { ...@@ -286,7 +292,7 @@ func TestNodeLink(t *testing.T) {
// Close vs Send // Close vs Send
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = nl1.NewConn() c = xnewconn(nl1)
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
...@@ -300,8 +306,8 @@ func TestNodeLink(t *testing.T) { ...@@ -300,8 +306,8 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
// NodeLink.Close vs Conn.Send/Recv // NodeLink.Close vs Conn.Send/Recv
c11 := nl1.NewConn() c11 := xnewconn(nl1)
c12 := nl1.NewConn() c12 := xnewconn(nl1)
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
pkt, err := c11.Recv() pkt, err := c11.Recv()
...@@ -325,9 +331,9 @@ func TestNodeLink(t *testing.T) { ...@@ -325,9 +331,9 @@ func TestNodeLink(t *testing.T) {
// NodeLink.Close vs Conn.Send/Recv on another side // NodeLink.Close vs Conn.Send/Recv on another side
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c11 = nl1.NewConn() c11 = xnewconn(nl1)
c12 = nl1.NewConn() c12 = xnewconn(nl1)
c13 := nl1.NewConn() c13 := xnewconn(nl1)
wg = WorkGroup() wg = WorkGroup()
var errRecv error var errRecv error
wg.Gox(func() { wg.Gox(func() {
...@@ -355,6 +361,12 @@ func TestNodeLink(t *testing.T) { ...@@ -355,6 +361,12 @@ func TestNodeLink(t *testing.T) {
// XXX denoise vvv // XXX denoise vvv
// NewConn after NodeLink stop
c, err = nl1.NewConn()
if err != ErrLinkStopped {
t.Fatalf("NewConn after NodeLink stop: %v", err)
}
// Recv/Send on another Conn // Recv/Send on another Conn
pkt, err = c13.Recv() pkt, err = c13.Recv()
if !(pkt == nil && err == errRecv) { if !(pkt == nil && err == errRecv) {
...@@ -397,9 +409,15 @@ func TestNodeLink(t *testing.T) { ...@@ -397,9 +409,15 @@ func TestNodeLink(t *testing.T) {
t.Fatalf("Conn.Send after NodeLink stop: %v", err) t.Fatalf("Conn.Send after NodeLink stop: %v", err)
} }
// NewConn after NodeLink close
c, err = nl1.NewConn()
if err != ErrLinkClosed {
t.Fatalf("NewConn after NodeLink close: %v", err)
}
xclose(c11) xclose(c11)
xclose(c12) xclose(c12)
// check Recv/Send error after Close & NodeLink shutdown // Recv/Send error after Close & NodeLink shutdown
pkt, err = c11.Recv() pkt, err = c11.Recv()
if !(pkt == nil && err == ErrClosedConn) { if !(pkt == nil && err == ErrClosedConn) {
t.Fatalf("Conn.Recv after close and NodeLink close: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after close and NodeLink close: pkt = %v err = %v", pkt, err)
...@@ -429,7 +447,7 @@ func TestNodeLink(t *testing.T) { ...@@ -429,7 +447,7 @@ func TestNodeLink(t *testing.T) {
xclose(c) xclose(c)
}) })
c = nl1.NewConn() c = xnewconn(nl1)
xsend(c, mkpkt(33, []byte("ping"))) xsend(c, mkpkt(33, []byte("ping")))
pkt = xrecv(c) pkt = xrecv(c)
xverifyPkt(pkt, c.connId, 34, []byte("pong")) xverifyPkt(pkt, c.connId, 34, []byte("pong"))
...@@ -477,8 +495,8 @@ func TestNodeLink(t *testing.T) { ...@@ -477,8 +495,8 @@ func TestNodeLink(t *testing.T) {
} }
}) })
c1 := nl1.NewConn() c1 := xnewconn(nl1)
c2 := nl1.NewConn() c2 := xnewconn(nl1)
xsend(c1, mkpkt(1, []byte(""))) xsend(c1, mkpkt(1, []byte("")))
xsend(c2, mkpkt(2, []byte(""))) xsend(c2, mkpkt(2, []byte("")))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment