Commit b0b0183c authored by indexzero's avatar indexzero

[api] Update WebSocket support to use http.Agent APIs

parent 5681fc1a
......@@ -136,7 +136,7 @@ exports.createServer = function () {
server.on('upgrade', function(req, socket, head) {
// Tunnel websocket requests too
proxy.proxyWebSocketRequest(port, host);
proxy.proxyWebSocketRequest(req, socket, head, port, host);
});
}
......@@ -444,22 +444,11 @@ HttpProxy.prototype._forwardRequest = function (req) {
});
};
HttpProxy.prototype.proxyWebSocketRequest = function (port, server, host, data) {
var self = this, req = self.req, socket = self.sock, head = self.head,
headers = new _headers(req.headers), CRLF = '\r\n';
// Will generate clone of headers
// To not change original
function _headers(headers) {
var h = {};
for (var i in headers) {
h[i] = headers[i];
}
return h;
}
HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, port, host, buffer) {
var self = this, CRLF = '\r\n';
// WebSocket requests has method = GET
if (req.method !== 'GET' || headers.upgrade.toLowerCase() !== 'websocket') {
if (req.method !== 'GET' || req.headers.upgrade.toLowerCase() !== 'websocket') {
// This request is not WebSocket request
return;
}
......@@ -467,68 +456,99 @@ HttpProxy.prototype.proxyWebSocketRequest = function (port, server, host, data)
// Turn of all bufferings
// For server set KeepAlive
// For client set encoding
function _socket(socket, server) {
function _socket(socket, keepAlive) {
socket.setTimeout(0);
socket.setNoDelay(true);
if (server) {
if (keepAlive) {
socket.setKeepAlive(true, 0);
}
else {
socket.setEncoding('utf8');
}
}
function onUpgrade(reverseProxy) {
var listeners = {};
// We're now connected to the server, so lets change server socket
reverseProxy.on('data', listeners._r_data = function(data) {
// Pass data to client
if (socket.writable) {
try {
socket.write(data);
}
catch (e) {
socket.end();
reverseProxy.end();
}
}
});
socket.on('data', listeners._data = function(data) {
// Pass data from client to server
try {
reverseProxy.write(data);
}
catch (e) {
reverseProxy.end();
socket.end();
}
});
// Detach event listeners from reverseProxy
function detach() {
reverseProxy.removeListener('close', listeners._r_close);
reverseProxy.removeListener('data', listeners._r_data);
socket.removeListener('data', listeners._data);
socket.removeListener('close', listeners._close);
}
// Hook disconnections
reverseProxy.on('end', listeners._r_close = function() {
socket.end();
detach();
});
socket.on('end', listeners._close = function() {
reverseProxy.end();
detach();
});
};
// Client socket
_socket(socket);
// If host is undefined
// Get it from headers
if (!host) {
host = headers.Host;
}
// Remote host address
var remote_host = server + (port - 80 === 0 ? '' : ':' + port);
var remoteHost = host + (port - 80 === 0 ? '' : ':' + port),
agent = _getAgent(host, port);
// Change headers
headers.Host = remote_host;
headers.Origin = 'http://' + remote_host;
// Open request
var p = manager.getPool(port, server);
p.getClient(function(client) {
// Based on 'pool/main.js'
var request = client.request('GET', req.url, headers);
var errorListener = function (error) {
client.removeListener('error', errorListener);
// Remove the client from the pool's available clients since it has errored
p.clients.splice(p.clients.indexOf(client), 1);
socket.end();
}
// Not disconnect on update
client.on('upgrade', function(request, remote_socket, head) {
// Prepare socket
_socket(remote_socket, true);
req.headers.host = remoteHost;
req.headers.origin = 'http://' + host;
var opts = {
host: host,
port: port,
agent: agent,
method: 'GET',
path: req.url,
headers: req.headers
}
// Emit event
onUpgrade(remote_socket);
});
// Make the outgoing WebSocket request
var request = http.request(opts, function () { });
// Not disconnect on update
agent.on('upgrade', function(request, remoteSocket, head) {
// Prepare socket
_socket(remoteSocket, true);
client.on('error', errorListener);
request.on('response', function (response) {
response.on('end', function () {
client.removeListener('error', errorListener);
client.busy = false;
p.onFree(client);
})
})
client.busy = true;
var handshake;
// Emit event
onUpgrade(remoteSocket);
});
var handshake;
if (typeof request.socket !== 'undefined') {
request.socket.on('data', handshake = function(data) {
// Handshaking
......@@ -547,8 +567,8 @@ HttpProxy.prototype.proxyWebSocketRequest = function (port, server, host, data)
data = data.slice(Buffer.byteLength(sdata), data.length);
// Replace host and origin
sdata = sdata.replace(remote_host, host)
.replace(remote_host, host);
sdata = sdata.replace(remoteHost, host)
.replace(remoteHost, host);
try {
// Write printable
......@@ -570,65 +590,19 @@ HttpProxy.prototype.proxyWebSocketRequest = function (port, server, host, data)
// Remove data listener now that the 'handshake' is complete
request.socket.removeListener('data', handshake);
});
}
// Write upgrade-head
try {
request.write(head);
}
catch(e) {
request.end();
socket.end();
}
self.unwatch(socket);
});
// Request
function onUpgrade(reverse_proxy) {
var listeners = {};
// We're now connected to the server, so lets change server socket
reverse_proxy.on('data', listeners._r_data = function(data) {
// Pass data to client
if (socket.writable) {
try {
socket.write(data);
}
catch (e) {
socket.end();
reverse_proxy.end();
}
}
});
socket.on('data', listeners._data = function(data) {
// Pass data from client to server
try {
reverse_proxy.write(data);
}
catch (e) {
reverse_proxy.end();
socket.end();
}
});
// Detach event listeners from reverse_proxy
function detach() {
reverse_proxy.removeListener('close', listeners._r_close);
reverse_proxy.removeListener('data', listeners._r_data);
socket.removeListener('data', listeners._data);
socket.removeListener('close', listeners._close);
}
// Hook disconnections
reverse_proxy.on('end', listeners._r_close = function() {
socket.end();
detach();
});
socket.on('end', listeners._close = function() {
reverse_proxy.end();
detach();
});
};
// Write upgrade-head
try {
request.write(head);
}
catch (ex) {
request.end();
socket.end();
}
// If we have been passed buffered data, resume it.
if (buffer && !errState) {
buffer.resume();
}
};
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment