Commit af22b35a authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer/rpc: MuxConn writes don't block the whole loop

parent 68e51de0
...@@ -175,6 +175,7 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) { ...@@ -175,6 +175,7 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) {
// Create the stream object and channel where data will be sent to // Create the stream object and channel where data will be sent to
dataR, dataW := io.Pipe() dataR, dataW := io.Pipe()
writeCh := make(chan []byte, 10)
// Set the data channel so we can write to it. // Set the data channel so we can write to it.
stream := &Stream{ stream := &Stream{
...@@ -182,9 +183,21 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) { ...@@ -182,9 +183,21 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) {
mux: m, mux: m,
reader: dataR, reader: dataR,
writer: dataW, writer: dataW,
writeCh: writeCh,
} }
stream.setState(streamStateClosed) stream.setState(streamStateClosed)
// Start the goroutine that will read from the queue and write
// data out.
go func() {
for {
data := <-writeCh
if _, err := dataW.Write(data); err != nil {
return
}
}
}()
m.streams[id] = stream m.streams[id] = stream
return m.streams[id], nil return m.streams[id], nil
} }
...@@ -256,7 +269,11 @@ func (m *MuxConn) loop() { ...@@ -256,7 +269,11 @@ func (m *MuxConn) loop() {
case muxPacketData: case muxPacketData:
stream.mu.Lock() stream.mu.Lock()
if stream.state == streamStateEstablished { if stream.state == streamStateEstablished {
stream.writer.Write(data) select {
case stream.writeCh <- data:
default:
log.Printf("[ERR] Failed to write data, buffer full: %d", id)
}
} else { } else {
log.Printf("[ERR] Data received for stream in state: %d", stream.state) log.Printf("[ERR] Data received for stream in state: %d", stream.state)
} }
...@@ -293,6 +310,7 @@ type Stream struct { ...@@ -293,6 +310,7 @@ type Stream struct {
state streamState state streamState
stateUpdated time.Time stateUpdated time.Time
mu sync.Mutex mu sync.Mutex
writeCh chan<- []byte
} }
type streamState byte type streamState byte
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment