Commit 9e1c3858 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f456cfaa
...@@ -179,9 +179,11 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -179,9 +179,11 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
Mlink := Mconn.Link() Mlink := Mconn.Link()
// close Mlink on return / cancel // close Mlink on return / cancel
retch := make(chan struct{})
defer func() { defer func() {
err2 := Mlink.Close() err2 := Mlink.Close()
err = xerr.First(err, err2) err = xerr.First(err, err2)
close(retch)
}() }()
// XXX add master UUID -> nodeTab ? or master will notify us with it himself ? // XXX add master UUID -> nodeTab ? or master will notify us with it himself ?
...@@ -200,7 +202,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -200,7 +202,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// accept next connection from master. only 1 connection is served at any given time. // accept next connection from master. only 1 connection is served at any given time.
// every new connection from master means talk over previous connection is cancelled. // every new connection from master means talk over previous connection is cancelled.
// XXX recheck compatibility with py // XXX recheck compatibility with py
acceptq := make(chan *neo.Conn, 1) acceptq := make(chan *neo.Conn)
go func () { go func () {
for { for {
conn, err := Mlink.Accept() conn, err := Mlink.Accept()
...@@ -208,7 +210,11 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -208,7 +210,11 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
log.Error(ctx, err) log.Error(ctx, err)
return return
} }
acceptq <- conn
select {
case acceptq <- conn:
case <-retch:
}
} }
}() }()
...@@ -243,18 +249,18 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -243,18 +249,18 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
talkq <- talk() talkq <- talk()
}() }()
// talk finished / next connection / cancel // next connection / talk finished / cancel
select { select {
case conn := <-acceptq:
lclose(ctx, Mconn) // wakeup/cancel current talk
<-talkq // wait till it finish
Mconn = conn // proceed next cycle on accepted conn
case err = <-talkq: case err = <-talkq:
// XXX check for shutdown command // XXX check for shutdown command
lclose(ctx, Mconn) lclose(ctx, Mconn)
Mconn = nil // now wait for accept to get next Mconn Mconn = nil // now wait for accept to get next Mconn
case conn := <-acceptq:
lclose(ctx, Mconn) // wakeup/cancel current talk
<-talkq
Mconn = conn
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment