Commit 2b9e09b0 authored by isaacs's avatar isaacs Committed by indexzero

Add flow control

Not in the "async/fibers/coro" sense of flow control, but in the TCP
backpressure sense.

Pause the stream when a write isn't flushed, and then resume it once the
writable stream drains.
parent 967884c5
...@@ -539,7 +539,7 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) { ...@@ -539,7 +539,7 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) {
response.on('data', function (chunk) { response.on('data', function (chunk) {
if (req.method !== 'HEAD' && res.writable) { if (req.method !== 'HEAD' && res.writable) {
try { try {
res.write(chunk); var flushed = res.write(chunk);
} catch (er) { } catch (er) {
console.error("res.write error: %s", er.message); console.error("res.write error: %s", er.message);
try { try {
...@@ -547,8 +547,15 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) { ...@@ -547,8 +547,15 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) {
} catch (er) { } catch (er) {
console.error("res.end error: %s", er.message); console.error("res.end error: %s", er.message);
} }
return;
} }
} }
if (!flushed) {
response.pause();
res.once('drain', function () {
response.resume();
});
}
}); });
// When the `reverseProxy` `response` ends, end the // When the `reverseProxy` `response` ends, end the
...@@ -578,7 +585,13 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) { ...@@ -578,7 +585,13 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) {
// `req` write it to the `reverseProxy` request. // `req` write it to the `reverseProxy` request.
req.on('data', function (chunk) { req.on('data', function (chunk) {
if (!errState) { if (!errState) {
reverseProxy.write(chunk); var flushed = reverseProxy.write(chunk);
if (!flushed) {
req.pause();
reverseProxy.once('drain', function () {
req.resume();
});
}
} }
}); });
...@@ -646,7 +659,13 @@ HttpProxy.prototype._forwardRequest = function (req) { ...@@ -646,7 +659,13 @@ HttpProxy.prototype._forwardRequest = function (req) {
// Chunk the client request body as chunks from the proxied request come in // Chunk the client request body as chunks from the proxied request come in
req.on('data', function (chunk) { req.on('data', function (chunk) {
forwardProxy.write(chunk); var flushed = forwardProxy.write(chunk);
if (!flushed) {
req.pause();
forwardProxy.once('drain', function () {
req.resume();
});
}
}) })
// At the end of the client request, we are going to stop the proxied request // At the end of the client request, we are going to stop the proxied request
...@@ -741,7 +760,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -741,7 +760,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
if (reverseProxy.incoming.socket.writable) { if (reverseProxy.incoming.socket.writable) {
try { try {
self.emit('websocket:outgoing', req, socket, head, data); self.emit('websocket:outgoing', req, socket, head, data);
reverseProxy.incoming.socket.write(data); var flushed = reverseProxy.incoming.socket.write(data);
if (!flushed) {
proxySocket.pause();
reverseProxy.incoming.socket.once('drain', function () {
proxySocket.resume();
});
}
} }
catch (e) { catch (e) {
detach(); detach();
...@@ -758,7 +783,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -758,7 +783,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function(data) { reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function(data) {
try { try {
self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data); self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data);
proxySocket.write(data); var flushed = proxySocket.write(data);
if (!flushed) {
reverseProxy.incoming.socket.pause();
proxySocket.once('drain', function () {
reverseProxy.incoming.socket.resume();
});
}
} }
catch (e) { catch (e) {
detach(); detach();
...@@ -918,7 +949,14 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -918,7 +949,14 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
// //
self.emit('websocket:handshake', req, socket, head, sdata, data); self.emit('websocket:handshake', req, socket, head, sdata, data);
socket.write(sdata); socket.write(sdata);
socket.write(data); var flushed = socket.write(data);
if (!flushed) {
reverseProxy.socket.pause();
socket.once('drain', function () {
reverseProxy.socket.resume();
});
}
} }
catch (ex) { catch (ex) {
proxyError(ex) proxyError(ex)
...@@ -935,9 +973,9 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -935,9 +973,9 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
reverseProxy.on('error', proxyError); reverseProxy.on('error', proxyError);
try { try {
//
// Attempt to write the upgrade-head to the reverseProxy request. // Attempt to write the upgrade-head to the reverseProxy request.
// // This is small, and there's only ever one of it.
// No need for pause/resume.
reverseProxy.write(head); reverseProxy.write(head);
} }
catch (ex) { catch (ex) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment