diff --git a/bitcorenode/index.js b/bitcorenode/index.js index 11dead7..be77775 100644 --- a/bitcorenode/index.js +++ b/bitcorenode/index.js @@ -13,7 +13,6 @@ var Locker = require('locker-server'); var BlockchainMonitor = require('../lib/blockchainmonitor'); var EmailService = require('../lib/emailservice'); var ExpressApp = require('../lib/expressapp'); -var WsApp = require('../lib/wsapp'); var child_process = require('child_process'); var spawn = child_process.spawn; var EventEmitter = require('events').EventEmitter; @@ -110,7 +109,6 @@ Service.prototype._getConfiguration = function() { Service.prototype._startWalletService = function(config, next) { var self = this; var expressApp = new ExpressApp(); - var wsApp = new WsApp(); if (self.https) { var serverOpts = self._readHttpsOptions(); @@ -119,15 +117,7 @@ Service.prototype._startWalletService = function(config, next) { self.server = http.Server(expressApp.app); } - async.parallel([ - - function(done) { - expressApp.start(config, done); - }, - function(done) { - wsApp.start(self.server, config, done); - }, - ], function(err) { + expressApp.start(config, function(err){ if (err) { return next(err); } diff --git a/bws.js b/bws.js index fe0be8f..ed541d1 100755 --- a/bws.js +++ b/bws.js @@ -4,9 +4,7 @@ var async = require('async'); var fs = require('fs'); var ExpressApp = require('./lib/expressapp'); -var WsApp = require('./lib/wsapp'); var config = require('./config'); -var sticky = require('sticky-session'); var log = require('npmlog'); log.debug = log.verbose; log.disableColor(); @@ -41,60 +39,71 @@ if (config.https) { }; } -var start = function(cb) { - var expressApp = new ExpressApp(); - var wsApp = new WsApp(); - - function doStart(cb) { - var server = config.https ? serverModule.createServer(serverOpts, expressApp.app) : serverModule.Server(expressApp.app); - - server.on('connection', function(socket) { - socket.setTimeout(300 * 1000); - }) - - async.parallel([ - - function(done) { - expressApp.start(config, done); - }, - function(done) { - wsApp.start(server, config, done); - }, - ], function(err) { - if (err) { - log.error('Could not start BWS instance', err); - } - if (cb) return cb(err); - }); - - return server; - }; - - if (config.cluster) { - var server = sticky(clusterInstances, function() { - return doStart(); - }); - return cb(null, server); - } else { - var server = doStart(function(err) { - return cb(err, server); - }); - } -}; - if (config.cluster && !config.lockOpts.lockerServer) throw 'When running in cluster mode, locker server need to be configured'; if (config.cluster && !config.messageBrokerOpts.messageBrokerServer) throw 'When running in cluster mode, message broker server need to be configured'; -start(function(err, server) { - if (err) { - console.log('Could not start BWS:', err); - process.exit(0); - } - server.listen(port, function(err) { - if (err) console.log('ERROR: ', err); - log.info('Bitcore Wallet Service running on port ' + port); +var expressApp = new ExpressApp(); + +function startInstance(cb) { + var server = config.https ? serverModule.createServer(serverOpts, expressApp.app) : serverModule.Server(expressApp.app); + + server.on('connection', function(socket) { + socket.setTimeout(300 * 1000); + }) + + expressApp.start(config, function(err) { + if (err) { + log.error('Could not start BWS instance', err); + return cb(err); + } + + server.listen(port); + return cb(); }); -}); +}; + + +var logStart = function(err) { + if (err) { + log.error('Error:' + err); + return; + } + + if (cluster.worker) + log.info('BWS Instance ' + cluster.worker.id + ' running'); + else + log.info('BWS running'); +}; + + +if (config.cluster) { + + if (cluster.isMaster) { + + // Count the machine's CPUs + var instances = config.clusterInstances || require('os').cpus().length; + + log.info('Starting ' + instances + ' instances on port:' + port); + + // Create a worker for each CPU + for (var i = 0; i < instances; i += 1) { + cluster.fork(); + + // Listen for dying workers + cluster.on('exit', function(worker) { + // Replace the dead worker, + log.error('Worker ' + worker.id + ' died :('); + cluster.fork(); + }); + } + // Code to run if we're in a worker process + } else { + startInstance(logStart); + } +} else { + log.info('Starting on port: ' + port); + startInstance(logStart); +}; diff --git a/config.js b/config.js index 3e6a902..373fcf8 100644 --- a/config.js +++ b/config.js @@ -3,7 +3,7 @@ var config = { disableLogs: false, port: 3232, // Uncomment to make BWS a forking server - // cluster: true, + cluster: true, // Uncomment to use the nr of availalbe CPUs // clusterInstances: 4, diff --git a/lib/expressapp.js b/lib/expressapp.js index 1c50e53..6b1e51a 100644 --- a/lib/expressapp.js +++ b/lib/expressapp.js @@ -56,7 +56,15 @@ ExpressApp.prototype.start = function(opts, cb) { var POST_LIMIT = 1024 * 100 /* Max POST 100 kb */ ; this.app.use(bodyParser.json({ - limit: POST_LIMIT + limit: POST_LIMIT, + verify: function(req, res, buf, encoding) { + + // get rawBody + req.rawBody = buf.toString(); + console.log("rawBody", req.rawBody); + console.log("headers", req.headers); + + } })); if (opts.disableLogs) { diff --git a/lib/storage.js b/lib/storage.js index f63f968..ca0ee2b 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -49,7 +49,8 @@ Storage.prototype._createIndexes = function() { }); this.db.collection(collections.TXS).createIndex({ walletId: 1, - createdOn: -1, + isPending: 1, + createdOn: 1 }); this.db.collection(collections.NOTIFICATIONS).createIndex({ walletId: 1, @@ -74,6 +75,7 @@ Storage.prototype._createIndexes = function() { walletId: 1, txid: 1, }); + }; Storage.prototype.connect = function(opts, cb) { diff --git a/lib/wsapp.js b/lib/wsapp.js deleted file mode 100644 index 2fd2193..0000000 --- a/lib/wsapp.js +++ /dev/null @@ -1,65 +0,0 @@ -'use strict'; - -var $ = require('preconditions').singleton(); -var _ = require('lodash'); -var async = require('async'); -var log = require('npmlog'); -log.debug = log.verbose; -var Uuid = require('uuid'); - -var WalletService = require('./server'); -var MessageBroker = require('./messagebroker'); - -log.level = 'debug'; - -var WsApp = function() {}; - -WsApp.prototype._unauthorized = function(socket) { - socket.emit('unauthorized'); - socket.disconnect(); -}; - -WsApp.prototype._handleNotification = function(notification) { - var room = notification.walletId ? this.io.to(notification.walletId) : this.io; - room.emit('notification', notification); -}; - -WsApp.prototype.start = function(server, opts, cb) { - opts = opts || {}; - $.checkState(opts.messageBrokerOpts); - - var self = this; - - this.io = require('socket.io')(server); - - async.series([ - - function(done) { - self.messageBroker = new MessageBroker(opts.messageBrokerOpts); - self.messageBroker.onMessage(_.bind(self._handleNotification, self)); - done(); - }, - function(done) { - self.io.on('connection', function(socket) { - socket.nonce = Uuid.v4(); - socket.on('authorize', function(data) { - if (data.message != socket.nonce) return self._unauthorized(socket); - - WalletService.getInstanceWithAuth(data, function(err, service) { - if (err) return self._unauthorized(socket); - - socket.join(service.walletId); - socket.emit('authorized'); - }); - }); - - socket.emit('challenge', socket.nonce); - }); - done(); - }, - ], function(err) { - if (cb) return cb(err); - }); -}; - -module.exports = WsApp; diff --git a/test/bitcorenode.js b/test/bitcorenode.js index f1e9806..18f1c02 100644 --- a/test/bitcorenode.js +++ b/test/bitcorenode.js @@ -138,45 +138,6 @@ describe('Bitcore Node Service', function() { }); }); describe('#_startWalletService', function() { - it('will start express and web socket servers', function(done) { - function TestExpressApp() {} - TestExpressApp.prototype.start = sinon.stub().callsArg(1); - function TestWSApp() {} - TestWSApp.prototype.start = sinon.stub().callsArg(2); - var listen = sinon.stub().callsArg(1); - var TestService = proxyquire('../bitcorenode', { - '../lib/expressapp': TestExpressApp, - '../lib/wsapp': TestWSApp, - 'http': { - Server: sinon.stub().returns({ - listen: listen - }) - } - }); - var options = { - node: { - bwsPort: 3232 - } - }; - var service = new TestService(options); - var config = {}; - service._startWalletService(config, function(err) { - if (err) { - throw err; - } - TestExpressApp.prototype.start.callCount.should.equal(1); - TestExpressApp.prototype.start.args[0][0].should.equal(config); - TestExpressApp.prototype.start.args[0][1].should.be.a('function'); - TestWSApp.prototype.start.callCount.should.equal(1); - TestWSApp.prototype.start.args[0][0].should.equal(service.server); - TestWSApp.prototype.start.args[0][1].should.equal(config); - TestWSApp.prototype.start.args[0][2].should.be.a('function'); - listen.callCount.should.equal(1); - listen.args[0][0].should.equal(3232); - listen.args[0][1].should.be.a('function'); - done(); - }); - }); it('error from express', function(done) { function TestExpressApp() {} TestExpressApp.prototype.start = sinon.stub().callsArgWith(1, new Error('test')); @@ -204,33 +165,6 @@ describe('Bitcore Node Service', function() { done(); }); }); - it('error from web socket', function(done) { - function TestExpressApp() {} - TestExpressApp.prototype.start = sinon.stub().callsArg(1); - function TestWSApp() {} - TestWSApp.prototype.start = sinon.stub().callsArgWith(2, new Error('test')); - var listen = sinon.stub().callsArg(1); - var TestService = proxyquire('../bitcorenode', { - '../lib/expressapp': TestExpressApp, - '../lib/wsapp': TestWSApp, - 'http': { - Server: sinon.stub().returns({ - listen: listen - }) - } - }); - var options = { - node: { - bwsPort: 3232 - } - }; - var service = new TestService(options); - var config = {}; - service._startWalletService(config, function(err) { - err.message.should.equal('test'); - done(); - }); - }); it('error from server.listen', function(done) { var app = {}; function TestExpressApp() {