rm websockets

This commit is contained in:
Matias Alejo Garcia 2016-11-23 12:17:28 -03:00
parent 06a3aa4e23
commit 4a2b92bcb0
No known key found for this signature in database
GPG Key ID: 02470DB551277AB3
7 changed files with 75 additions and 197 deletions

View File

@ -13,7 +13,6 @@ var Locker = require('locker-server');
var BlockchainMonitor = require('../lib/blockchainmonitor');
var EmailService = require('../lib/emailservice');
var ExpressApp = require('../lib/expressapp');
var WsApp = require('../lib/wsapp');
var child_process = require('child_process');
var spawn = child_process.spawn;
var EventEmitter = require('events').EventEmitter;
@ -110,7 +109,6 @@ Service.prototype._getConfiguration = function() {
Service.prototype._startWalletService = function(config, next) {
var self = this;
var expressApp = new ExpressApp();
var wsApp = new WsApp();
if (self.https) {
var serverOpts = self._readHttpsOptions();
@ -119,15 +117,7 @@ Service.prototype._startWalletService = function(config, next) {
self.server = http.Server(expressApp.app);
}
async.parallel([
function(done) {
expressApp.start(config, done);
},
function(done) {
wsApp.start(self.server, config, done);
},
], function(err) {
expressApp.start(config, function(err){
if (err) {
return next(err);
}

113
bws.js
View File

@ -4,9 +4,7 @@ var async = require('async');
var fs = require('fs');
var ExpressApp = require('./lib/expressapp');
var WsApp = require('./lib/wsapp');
var config = require('./config');
var sticky = require('sticky-session');
var log = require('npmlog');
log.debug = log.verbose;
log.disableColor();
@ -41,60 +39,71 @@ if (config.https) {
};
}
var start = function(cb) {
var expressApp = new ExpressApp();
var wsApp = new WsApp();
function doStart(cb) {
var server = config.https ? serverModule.createServer(serverOpts, expressApp.app) : serverModule.Server(expressApp.app);
server.on('connection', function(socket) {
socket.setTimeout(300 * 1000);
})
async.parallel([
function(done) {
expressApp.start(config, done);
},
function(done) {
wsApp.start(server, config, done);
},
], function(err) {
if (err) {
log.error('Could not start BWS instance', err);
}
if (cb) return cb(err);
});
return server;
};
if (config.cluster) {
var server = sticky(clusterInstances, function() {
return doStart();
});
return cb(null, server);
} else {
var server = doStart(function(err) {
return cb(err, server);
});
}
};
if (config.cluster && !config.lockOpts.lockerServer)
throw 'When running in cluster mode, locker server need to be configured';
if (config.cluster && !config.messageBrokerOpts.messageBrokerServer)
throw 'When running in cluster mode, message broker server need to be configured';
start(function(err, server) {
if (err) {
console.log('Could not start BWS:', err);
process.exit(0);
}
server.listen(port, function(err) {
if (err) console.log('ERROR: ', err);
log.info('Bitcore Wallet Service running on port ' + port);
var expressApp = new ExpressApp();
function startInstance(cb) {
var server = config.https ? serverModule.createServer(serverOpts, expressApp.app) : serverModule.Server(expressApp.app);
server.on('connection', function(socket) {
socket.setTimeout(300 * 1000);
})
expressApp.start(config, function(err) {
if (err) {
log.error('Could not start BWS instance', err);
return cb(err);
}
server.listen(port);
return cb();
});
});
};
var logStart = function(err) {
if (err) {
log.error('Error:' + err);
return;
}
if (cluster.worker)
log.info('BWS Instance ' + cluster.worker.id + ' running');
else
log.info('BWS running');
};
if (config.cluster) {
if (cluster.isMaster) {
// Count the machine's CPUs
var instances = config.clusterInstances || require('os').cpus().length;
log.info('Starting ' + instances + ' instances on port:' + port);
// Create a worker for each CPU
for (var i = 0; i < instances; i += 1) {
cluster.fork();
// Listen for dying workers
cluster.on('exit', function(worker) {
// Replace the dead worker,
log.error('Worker ' + worker.id + ' died :(');
cluster.fork();
});
}
// Code to run if we're in a worker process
} else {
startInstance(logStart);
}
} else {
log.info('Starting on port: ' + port);
startInstance(logStart);
};

View File

@ -3,7 +3,7 @@ var config = {
disableLogs: false,
port: 3232,
// Uncomment to make BWS a forking server
// cluster: true,
cluster: true,
// Uncomment to use the nr of availalbe CPUs
// clusterInstances: 4,

View File

@ -56,7 +56,15 @@ ExpressApp.prototype.start = function(opts, cb) {
var POST_LIMIT = 1024 * 100 /* Max POST 100 kb */ ;
this.app.use(bodyParser.json({
limit: POST_LIMIT
limit: POST_LIMIT,
verify: function(req, res, buf, encoding) {
// get rawBody
req.rawBody = buf.toString();
console.log("rawBody", req.rawBody);
console.log("headers", req.headers);
}
}));
if (opts.disableLogs) {

View File

@ -49,7 +49,8 @@ Storage.prototype._createIndexes = function() {
});
this.db.collection(collections.TXS).createIndex({
walletId: 1,
createdOn: -1,
isPending: 1,
createdOn: 1
});
this.db.collection(collections.NOTIFICATIONS).createIndex({
walletId: 1,
@ -74,6 +75,7 @@ Storage.prototype._createIndexes = function() {
walletId: 1,
txid: 1,
});
};
Storage.prototype.connect = function(opts, cb) {

View File

@ -1,65 +0,0 @@
'use strict';
var $ = require('preconditions').singleton();
var _ = require('lodash');
var async = require('async');
var log = require('npmlog');
log.debug = log.verbose;
var Uuid = require('uuid');
var WalletService = require('./server');
var MessageBroker = require('./messagebroker');
log.level = 'debug';
var WsApp = function() {};
WsApp.prototype._unauthorized = function(socket) {
socket.emit('unauthorized');
socket.disconnect();
};
WsApp.prototype._handleNotification = function(notification) {
var room = notification.walletId ? this.io.to(notification.walletId) : this.io;
room.emit('notification', notification);
};
WsApp.prototype.start = function(server, opts, cb) {
opts = opts || {};
$.checkState(opts.messageBrokerOpts);
var self = this;
this.io = require('socket.io')(server);
async.series([
function(done) {
self.messageBroker = new MessageBroker(opts.messageBrokerOpts);
self.messageBroker.onMessage(_.bind(self._handleNotification, self));
done();
},
function(done) {
self.io.on('connection', function(socket) {
socket.nonce = Uuid.v4();
socket.on('authorize', function(data) {
if (data.message != socket.nonce) return self._unauthorized(socket);
WalletService.getInstanceWithAuth(data, function(err, service) {
if (err) return self._unauthorized(socket);
socket.join(service.walletId);
socket.emit('authorized');
});
});
socket.emit('challenge', socket.nonce);
});
done();
},
], function(err) {
if (cb) return cb(err);
});
};
module.exports = WsApp;

View File

@ -138,45 +138,6 @@ describe('Bitcore Node Service', function() {
});
});
describe('#_startWalletService', function() {
it('will start express and web socket servers', function(done) {
function TestExpressApp() {}
TestExpressApp.prototype.start = sinon.stub().callsArg(1);
function TestWSApp() {}
TestWSApp.prototype.start = sinon.stub().callsArg(2);
var listen = sinon.stub().callsArg(1);
var TestService = proxyquire('../bitcorenode', {
'../lib/expressapp': TestExpressApp,
'../lib/wsapp': TestWSApp,
'http': {
Server: sinon.stub().returns({
listen: listen
})
}
});
var options = {
node: {
bwsPort: 3232
}
};
var service = new TestService(options);
var config = {};
service._startWalletService(config, function(err) {
if (err) {
throw err;
}
TestExpressApp.prototype.start.callCount.should.equal(1);
TestExpressApp.prototype.start.args[0][0].should.equal(config);
TestExpressApp.prototype.start.args[0][1].should.be.a('function');
TestWSApp.prototype.start.callCount.should.equal(1);
TestWSApp.prototype.start.args[0][0].should.equal(service.server);
TestWSApp.prototype.start.args[0][1].should.equal(config);
TestWSApp.prototype.start.args[0][2].should.be.a('function');
listen.callCount.should.equal(1);
listen.args[0][0].should.equal(3232);
listen.args[0][1].should.be.a('function');
done();
});
});
it('error from express', function(done) {
function TestExpressApp() {}
TestExpressApp.prototype.start = sinon.stub().callsArgWith(1, new Error('test'));
@ -204,33 +165,6 @@ describe('Bitcore Node Service', function() {
done();
});
});
it('error from web socket', function(done) {
function TestExpressApp() {}
TestExpressApp.prototype.start = sinon.stub().callsArg(1);
function TestWSApp() {}
TestWSApp.prototype.start = sinon.stub().callsArgWith(2, new Error('test'));
var listen = sinon.stub().callsArg(1);
var TestService = proxyquire('../bitcorenode', {
'../lib/expressapp': TestExpressApp,
'../lib/wsapp': TestWSApp,
'http': {
Server: sinon.stub().returns({
listen: listen
})
}
});
var options = {
node: {
bwsPort: 3232
}
};
var service = new TestService(options);
var config = {};
service._startWalletService(config, function(err) {
err.message.should.equal('test');
done();
});
});
it('error from server.listen', function(done) {
var app = {};
function TestExpressApp() {