Commit ad2c966f authored by 's avatar

Update websocket-server to use chrome.sockets API.

The chrome.socket API is deprecated since Chrome 33.
parent ccbc1ab8
...@@ -6,13 +6,98 @@ ...@@ -6,13 +6,98 @@
var http = function() { var http = function() {
var socket = (chrome.experimental && chrome.experimental.socket) || if (!chrome.sockets || !chrome.sockets.tcpServer)
chrome.socket;
// If this does not have chrome.socket, then return an empty http namespace.
if (!socket)
return {}; return {};
// Wrap chrome.sockets.tcp socketId with a Promise API.
var PSocket = (function() {
// chrome.sockets.tcp uses a global listener for incoming data so
// use a map to dispatch to the proper instance.
var socketMap = {};
chrome.sockets.tcp.onReceive.addListener(function(info) {
var pSocket = socketMap[info.socketId];
if (pSocket) {
if (pSocket.handlers) {
// Fulfil the pending read.
pSocket.handlers.resolve(info.data);
delete pSocket.handlers;
}
else {
// No pending read so put data on the queue.
pSocket.readQueue.push(info);
}
}
});
// Read errors also use a global listener.
chrome.sockets.tcp.onReceiveError.addListener(function(info) {
var pSocket = socketMap[info.socketId];
if (pSocket) {
if (pSocket.handlers) {
// Reject the pending read.
pSocket.handlers.reject(new Error('chrome.sockets.tcp error ' + info.resultCode));
delete pSocket.handlers;
}
else {
// No pending read so put data on the queue.
pSocket.readQueue.push(info);
}
}
});
// PSocket constructor.
return function(socketId) {
this.socketId = socketId;
this.readQueue = [];
// Register this instance for incoming data processing.
socketMap[socketId] = this;
chrome.sockets.tcp.setPaused(socketId, false);
};
})();
// Returns a Promise<ArrayBuffer> with read data.
PSocket.prototype.read = function() {
var that = this;
if (this.readQueue.length) {
// Return data from the queue.
var info = this.readQueue.shift();
if (!info.resultCode)
return Promise.resolve(info.data);
else
return Promise.reject(new Error('chrome.sockets.tcp error ' + info.resultCode));
}
else {
// The queue is empty so install handlers.
return new Promise(function(resolve, reject) {
that.handlers = { resolve: resolve, reject: reject };
});
}
};
// Returns a Promise<integer> with the number of bytes written.
PSocket.prototype.write = function(data) {
var that = this;
return new Promise(function(resolve, reject) {
chrome.sockets.tcp.send(that.socketId, data, function(info) {
if (info && info.resultCode >= 0)
resolve(info.bytesSent);
else
reject(new Error('chrome sockets.tcp error ' + (info && info.resultCode)));
});
});
};
// Returns a Promise.
PSocket.prototype.close = function() {
var that = this;
return new Promise(function(resolve, reject) {
chrome.sockets.tcp.disconnect(that.socketId, function() {
chrome.sockets.tcp.close(that.socketId, resolve);
});
});
};
// Http response code strings. // Http response code strings.
var responseMap = { var responseMap = {
200: 'OK', 200: 'OK',
...@@ -252,46 +337,42 @@ HttpServer.prototype = { ...@@ -252,46 +337,42 @@ HttpServer.prototype = {
*/ */
listen: function(port, opt_host) { listen: function(port, opt_host) {
var t = this; var t = this;
socket.create('tcp', {}, function(socketInfo) { chrome.sockets.tcpServer.create(function(socketInfo) {
t.socketInfo_ = socketInfo; chrome.sockets.tcpServer.onAccept.addListener(function(acceptInfo) {
socket.listen(t.socketInfo_.socketId, opt_host || '0.0.0.0', port, 50, if (acceptInfo.socketId === socketInfo.socketId)
t.readRequestFromSocket_(new PSocket(acceptInfo.clientSocketId));
});
chrome.sockets.tcpServer.listen(
socketInfo.socketId,
opt_host || '0.0.0.0',
port,
50,
function(result) { function(result) {
if (!result) {
t.readyState_ = 1; t.readyState_ = 1;
t.acceptConnection_(t.socketInfo_.socketId); }
}); else {
console.log(
'listen error ' +
chrome.runtime.lastError.message +
' (normal if another instance is already serving requests)');
}
}); });
},
acceptConnection_: function(socketId) {
var t = this;
socket.accept(this.socketInfo_.socketId, function(acceptInfo) {
t.onConnection_(acceptInfo);
t.acceptConnection_(socketId);
}); });
}, },
onConnection_: function(acceptInfo) { readRequestFromSocket_: function(pSocket) {
this.readRequestFromSocket_(acceptInfo.socketId);
},
readRequestFromSocket_: function(socketId) {
var t = this; var t = this;
var requestData = ''; var requestData = '';
var endIndex = 0; var endIndex = 0;
var onDataRead = function(readInfo) { var onDataRead = function(data) {
// Check if connection closed. requestData += arrayBufferToString(data).replace(/\r\n/g, '\n');
if (readInfo.resultCode <= 0) {
socket.disconnect(socketId);
socket.destroy(socketId);
return;
}
requestData += arrayBufferToString(readInfo.data).replace(/\r\n/g, '\n');
// Check for end of request. // Check for end of request.
endIndex = requestData.indexOf('\n\n', endIndex); endIndex = requestData.indexOf('\n\n', endIndex);
if (endIndex == -1) { if (endIndex == -1) {
endIndex = requestData.length - 1; endIndex = requestData.length - 1;
socket.read(socketId, onDataRead); return pSocket.read().then(onDataRead);
return;
} }
var headers = requestData.substring(0, endIndex).split('\n'); var headers = requestData.substring(0, endIndex).split('\n');
...@@ -306,10 +387,13 @@ HttpServer.prototype = { ...@@ -306,10 +387,13 @@ HttpServer.prototype = {
if (requestLine.length == 2) if (requestLine.length == 2)
headerMap[requestLine[0]] = requestLine[1].trim(); headerMap[requestLine[0]] = requestLine[1].trim();
} }
var request = new HttpRequest(headerMap, socketId); var request = new HttpRequest(headerMap, pSocket);
t.onRequest_(request); t.onRequest_(request);
} };
socket.read(socketId, onDataRead);
pSocket.read().then(onDataRead).catch(function(e) {
pSocket.close();
});
}, },
onRequest_: function(request) { onRequest_: function(request) {
...@@ -318,7 +402,7 @@ HttpServer.prototype = { ...@@ -318,7 +402,7 @@ HttpServer.prototype = {
if (!this.dispatchEvent(type, request)) if (!this.dispatchEvent(type, request))
request.close(); request.close();
else if (keepAlive) else if (keepAlive)
this.readRequestFromSocket_(request.socketId_); this.readRequestFromSocket_(request.pSocket_);
}, },
}; };
...@@ -338,15 +422,15 @@ var extensionTypes = { ...@@ -338,15 +422,15 @@ var extensionTypes = {
* Constructs an HttpRequest object which tracks all of the request headers and * Constructs an HttpRequest object which tracks all of the request headers and
* socket for an active Http request. * socket for an active Http request.
* @param {Object} headers The HTTP request headers. * @param {Object} headers The HTTP request headers.
* @param {number} socketId The socket Id to use for the response. * @param {Object} pSocket The socket to use for the response.
* @constructor * @constructor
*/ */
function HttpRequest(headers, socketId) { function HttpRequest(headers, pSocket) {
this.version = 'HTTP/1.1'; this.version = 'HTTP/1.1';
this.headers = headers; this.headers = headers;
this.responseHeaders_ = {}; this.responseHeaders_ = {};
this.headersSent = false; this.headersSent = false;
this.socketId_ = socketId; this.pSocket_ = pSocket;
this.writes_ = 0; this.writes_ = 0;
this.bytesRemaining = 0; this.bytesRemaining = 0;
this.finished_ = false; this.finished_ = false;
...@@ -362,11 +446,10 @@ HttpRequest.prototype = { ...@@ -362,11 +446,10 @@ HttpRequest.prototype = {
close: function() { close: function() {
// The socket for keep alive connections will be re-used by the server. // The socket for keep alive connections will be re-used by the server.
// Just stop referencing or using the socket in this HttpRequest. // Just stop referencing or using the socket in this HttpRequest.
if (this.headers['Connection'] != 'keep-alive') { if (this.headers['Connection'] != 'keep-alive')
socket.disconnect(this.socketId_); pSocket.close();
socket.destroy(this.socketId_);
} this.pSocket_ = null;
this.socketId_ = 0;
this.readyState = 3; this.readyState = 3;
}, },
...@@ -467,13 +550,12 @@ HttpRequest.prototype = { ...@@ -467,13 +550,12 @@ HttpRequest.prototype = {
write_: function(array) { write_: function(array) {
var t = this; var t = this;
this.bytesRemaining += array.byteLength; this.bytesRemaining += array.byteLength;
socket.write(this.socketId_, array, function(writeInfo) { this.pSocket_.write(array).then(function(bytesWritten) {
if (writeInfo.bytesWritten < 0) { t.bytesRemaining -= bytesWritten;
console.error('Error writing to socket, code '+writeInfo.bytesWritten);
return;
}
t.bytesRemaining -= writeInfo.bytesWritten;
t.checkFinished_(); t.checkFinished_();
}).catch(function(e) {
console.error(e.message);
return;
}); });
}, },
...@@ -505,7 +587,7 @@ WebSocketServer.prototype = { ...@@ -505,7 +587,7 @@ WebSocketServer.prototype = {
} }
if (this.dispatchEvent('request', new WebSocketRequest(request))) { if (this.dispatchEvent('request', new WebSocketRequest(request))) {
if (request.socketId_) if (request.pSocket_)
request.reject(); request.reject();
return true; return true;
} }
...@@ -522,8 +604,8 @@ WebSocketServer.prototype = { ...@@ -522,8 +604,8 @@ WebSocketServer.prototype = {
*/ */
function WebSocketRequest(httpRequest) { function WebSocketRequest(httpRequest) {
// We'll assume control of the socket for this request. // We'll assume control of the socket for this request.
HttpRequest.apply(this, [httpRequest.headers, httpRequest.socketId_]); HttpRequest.apply(this, [httpRequest.headers, httpRequest.pSocket_]);
httpRequest.socketId_ = 0; httpRequest.pSocket_ = null;
} }
WebSocketRequest.prototype = { WebSocketRequest.prototype = {
...@@ -568,9 +650,9 @@ WebSocketRequest.prototype = { ...@@ -568,9 +650,9 @@ WebSocketRequest.prototype = {
if (this.headers['Sec-WebSocket-Protocol']) if (this.headers['Sec-WebSocket-Protocol'])
responseHeader['Sec-WebSocket-Protocol'] = this.headers['Sec-WebSocket-Protocol']; responseHeader['Sec-WebSocket-Protocol'] = this.headers['Sec-WebSocket-Protocol'];
this.writeHead(101, responseHeader); this.writeHead(101, responseHeader);
var socket = new WebSocketServerSocket(this.socketId_); var socket = new WebSocketServerSocket(this.pSocket_);
// Detach the socket so that we don't use it anymore. // Detach the socket so that we don't use it anymore.
this.socketId_ = 0; this.pSocket_ = 0;
return socket; return socket;
}, },
...@@ -587,8 +669,9 @@ WebSocketRequest.prototype = { ...@@ -587,8 +669,9 @@ WebSocketRequest.prototype = {
* a socket which has already been upgraded from an Http request. * a socket which has already been upgraded from an Http request.
* @param {number} socketId The socket id with an active websocket connection. * @param {number} socketId The socket id with an active websocket connection.
*/ */
function WebSocketServerSocket(socketId) { function WebSocketServerSocket(pSocket) {
this.socketId_ = socketId; this.pSocket_ = pSocket;
this.readyState = 1;
EventSource.apply(this); EventSource.apply(this);
this.readFromSocket_(); this.readFromSocket_();
} }
...@@ -616,8 +699,10 @@ WebSocketServerSocket.prototype = { ...@@ -616,8 +699,10 @@ WebSocketServerSocket.prototype = {
* process. * process.
*/ */
close: function() { close: function() {
if (this.readyState === 1) {
this.sendFrame_(8); this.sendFrame_(8);
this.readyState = 2; this.readyState = 2;
}
}, },
readFromSocket_: function() { readFromSocket_: function() {
...@@ -627,17 +712,8 @@ WebSocketServerSocket.prototype = { ...@@ -627,17 +712,8 @@ WebSocketServerSocket.prototype = {
var fragmentedOp = 0; var fragmentedOp = 0;
var fragmentedMessages = []; var fragmentedMessages = [];
var onDataRead = function(readInfo) { var onDataRead = function(dataBuffer) {
if (readInfo.resultCode <= 0) { var a = new Uint8Array(dataBuffer);
t.close_();
return;
}
if (!readInfo.data.byteLength) {
socket.read(t.socketId_, onDataRead);
return;
}
var a = new Uint8Array(readInfo.data);
for (var i = 0; i < a.length; i++) for (var i = 0; i < a.length; i++)
data.push(a[i]); data.push(a[i]);
...@@ -708,9 +784,15 @@ WebSocketServerSocket.prototype = { ...@@ -708,9 +784,15 @@ WebSocketServerSocket.prototype = {
break; // Insufficient data, wait for more. break; // Insufficient data, wait for more.
} }
} }
socket.read(t.socketId_, onDataRead);
return t.pSocket_.read().then(onDataRead);
}; };
socket.read(this.socketId_, onDataRead);
this.pSocket_.read().then(function(data) {
return onDataRead(data);
}).catch(function(e) {
t.close_();
});
}, },
onFrame_: function(op, data) { onFrame_: function(op, data) {
...@@ -727,8 +809,9 @@ WebSocketServerSocket.prototype = { ...@@ -727,8 +809,9 @@ WebSocketServerSocket.prototype = {
this.dispatchEvent('message', {'data': data}); this.dispatchEvent('message', {'data': data});
} else if (op == 8) { } else if (op == 8) {
// A close message must be confirmed before the websocket is closed. // A close message must be confirmed before the websocket is closed.
if (this.readyState == 1) { if (this.readyState === 1) {
this.sendFrame_(8); this.sendFrame_(8);
this.readyState = 2;
} else { } else {
this.close_(); this.close_();
return false; return false;
...@@ -780,20 +863,21 @@ WebSocketServerSocket.prototype = { ...@@ -780,20 +863,21 @@ WebSocketServerSocket.prototype = {
return buffer; return buffer;
} }
var array = WebsocketFrameData(op, data || ''); var array = WebsocketFrameData(op, data || '');
socket.write(this.socketId_, array, function(writeInfo) { this.pSocket_.write(array).then(function(bytesWritten) {
if (writeInfo.resultCode < 0 || if (bytesWritten !== array.byteLength)
writeInfo.bytesWritten !== array.byteLength) { throw new Error('insufficient write');
}).catch(function(e) {
t.close_(); t.close_();
}
}); });
}, },
close_: function() { close_: function() {
chrome.socket.disconnect(this.socketId_); if (this.readyState !== 3) {
chrome.socket.destroy(this.socketId_); this.pSocket_.close();
this.readyState = 3; this.readyState = 3;
this.dispatchEvent('close'); this.dispatchEvent('close');
} }
}
}; };
return { return {
......
...@@ -64,12 +64,12 @@ if (http.Server && http.WebSocketServer) { ...@@ -64,12 +64,12 @@ if (http.Server && http.WebSocketServer) {
wsServer.addEventListener('request', function(req) { wsServer.addEventListener('request', function(req) {
var socket = req.accept(); var socket = req.accept();
socket_dict[socket.socketId_] = { socket_dict[socket.pSocket_.socketId] = {
socket: socket socket: socket
}; };
chrome.socket.getInfo(socket.socketId_, function (result) { chrome.sockets.tcp.getInfo(socket.pSocket_.socketId, function (result) {
if ((result !== undefined) && (result.connected)) { if ((result !== undefined) && (result.connected)) {
socket_dict[socket.socketId_].peer = result.peerAddress + ':' + result.peerPort; socket_dict[socket.pSocket_.socketId].peer = result.peerAddress + ':' + result.peerPort;
updateStatus(socket_dict); updateStatus(socket_dict);
} }
}); });
...@@ -87,7 +87,7 @@ if (http.Server && http.WebSocketServer) { ...@@ -87,7 +87,7 @@ if (http.Server && http.WebSocketServer) {
// When a socket is closed, remove it from the list of connected sockets. // When a socket is closed, remove it from the list of connected sockets.
socket.addEventListener('close', function() { socket.addEventListener('close', function() {
delete socket_dict[socket.socketId_]; delete socket_dict[socket.pSocket_.socketId];
updateStatus(socket_dict); updateStatus(socket_dict);
}); });
return true; return true;
......
...@@ -6,10 +6,17 @@ ...@@ -6,10 +6,17 @@
"minimum_chrome_version": "44", "minimum_chrome_version": "44",
"offline_enabled": true, "offline_enabled": true,
"manifest_version": 2, "manifest_version": 2,
"permissions": ["system.network", { "permissions": ["system.network"],
"socket": [
"tcp-connect", "sockets": {
"tcp-listen"]}], "tcp": {
"connect": "*"
},
"tcpServer": {
"listen": "*"
}
},
"app": { "app": {
"background": { "background": {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment