diff --git a/bcmonitor/bcmonitor.js b/bcmonitor/bcmonitor.js index 3a62e69..3a6e6e7 100644 --- a/bcmonitor/bcmonitor.js +++ b/bcmonitor/bcmonitor.js @@ -14,7 +14,4 @@ bcm.start(config, function(err) { if (err) throw err; console.log('Blockchain monitor started'); - _.each(bcm.explorers, function(explorer) { - console.log('\t' + explorer.network.name + ': ' + explorer.url); - }); }); diff --git a/bws.js b/bws.js index 3080144..7812e29 100755 --- a/bws.js +++ b/bws.js @@ -37,8 +37,10 @@ var start = function(cb) { if (err) return cb(err); var server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); - new WsApp().start(server, config); - return server; + var wsApp = new WsApp(); + wsApp.start(server, config, function(err) { + return server; + }); }); }); return cb(null, server); @@ -47,8 +49,10 @@ var start = function(cb) { if (err) return cb(err); server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); - new WsApp().start(server, config); - return cb(null, server); + var wsApp = new WsApp(); + wsApp.start(server, config, function(err) { + return cb(err, server); + }); }); }; }; @@ -56,6 +60,9 @@ var start = function(cb) { if (config.cluster && !config.lockOpts.lockerServer) throw 'When running in cluster mode, locker server need to be configured'; +if (config.cluster && !config.messageBrokerOpts.messageBrokerServer) + throw 'When running in cluster mode, message broker server need to be configured'; + start(function(err, server) { if (err) { console.log('Could not start BWS:', err); diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index e1b782e..e2ff233 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -54,6 +54,14 @@ BlockchainMonitor.prototype._initExplorer = function(provider, network, url) { }); var socket = explorer.initSocket(); + + var connectionInfo = provider + ' (' + network + ') @ ' + url; + socket.on('connect', function() { + log.info('Connected to ' + connectionInfo); + }); + socket.on('connect_error', function() { + log.error('Error connecting to ' + connectionInfo); + }); socket.emit('subscribe', 'inv'); socket.on('tx', _.bind(self._handleIncommingTx, self)); @@ -67,7 +75,8 @@ BlockchainMonitor.prototype._handleIncommingTx = function(data) { var outs = _.compact(_.map(data.vout, function(v) { var addr = _.keys(v)[0]; - if (addr.indexOf('3') != 0 && addr.indexOf('2') != 0) return; + var startingChar = addr.charAt(0); + if (startingChar != '2' && startingChar != '3') return; return { address: addr, @@ -78,8 +87,11 @@ BlockchainMonitor.prototype._handleIncommingTx = function(data) { async.each(outs, function(out, next) { self.storage.fetchAddress(out.address, function(err, address) { - if (err || !address) return next(err); - if (address.isChange) return next(); + if (err) { + log.error('Could not fetch addresses from the db'); + return next(err); + } + if (!address || address.isChange) return next(); var walletId = address.walletId; log.info('Incoming tx for wallet ' + walletId + ' [' + out.amount + 'sat -> ' + out.address + ']'); diff --git a/lib/messagebroker.js b/lib/messagebroker.js index 028bdcb..e41da43 100644 --- a/lib/messagebroker.js +++ b/lib/messagebroker.js @@ -18,14 +18,14 @@ function MessageBroker(opts) { this.mq = require('socket.io-client').connect(url); this.mq.on('connect', function() {}); this.mq.on('connect_error', function() { - log.warn('Message queue server connection error'); + log.warn('Error connecting to message broker server @ ' + url); }); this.mq.on('msg', function(data) { self.emit('msg', data); }); - log.info('Using message queue server at ' + url); + log.info('Using message broker server at ' + url); } }; diff --git a/lib/server.js b/lib/server.js index 36c6130..bf71221 100644 --- a/lib/server.js +++ b/lib/server.js @@ -282,7 +282,7 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) { walletId: opts.walletId, copayerId: self.copayerId, copayerName: opts.name, - }, false, function() { + }, function() { return cb(null, { copayerId: self.copayerId, wallet: wallet @@ -308,11 +308,18 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) { * * @param {String} type * @param {Object} data - * @param {Boolean} isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false) + * @param {Object} opts + * @param {Boolean} opts.isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false) */ -WalletService.prototype._notify = function(type, data, isGlobal, cb) { +WalletService.prototype._notify = function(type, data, opts, cb) { var self = this; + if (_.isFunction(opts)) { + cb = opts; + opts = {}; + } + opts = opts || {}; + log.debug('Notification', type, data); var walletId = self.walletId || data.walletId; @@ -324,7 +331,7 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) { type: type, data: data, ticker: this.notifyTicker++, - creatorId: isGlobal ? null : copayerId, + creatorId: opts.isGlobal ? null : copayerId, walletId: walletId, }); this.storage.storeNotification(walletId, n, function() { @@ -393,7 +400,7 @@ WalletService.prototype.joinWallet = function(opts, cb) { walletId: opts.walletId, copayerId: copayer.id, copayerName: copayer.name, - }, false, function() { + }, function() { return cb(null, { copayerId: copayer.id, wallet: wallet @@ -426,7 +433,7 @@ WalletService.prototype.createAddress = function(opts, cb) { self._notify('NewAddress', { address: address.address, - }, false, function() { + }, function() { return cb(null, address); }); }); @@ -726,7 +733,7 @@ WalletService.prototype.createTx = function(opts, cb) { self._notify('NewTxProposal', { amount: opts.amount - }, false, function() { + }, function() { return cb(null, txp); }); }); @@ -800,7 +807,7 @@ WalletService.prototype.removePendingTx = function(opts, cb) { if (actors.length > 1 || (actors.length == 1 && actors[0] !== self.copayerId)) return cb(new ClientError('TXACTIONED', 'Cannot remove a proposal signed/rejected by other copayers')); - self._notify('TxProposalRemoved', {}, false, function() { + self._notify('TxProposalRemoved', {}, function() { self.storage.removeTx(self.walletId, txp.id, cb); }); }); @@ -863,13 +870,13 @@ WalletService.prototype.signTx = function(opts, cb) { self._notify('TxProposalAcceptedBy', { txProposalId: opts.txProposalId, copayerId: self.copayerId, - }, false, done); + }, done); }, function(done) { if (txp.isAccepted()) { self._notify('TxProposalFinallyAccepted', { txProposalId: opts.txProposalId, - }, false, done); + }, done); } else { done(); } @@ -918,7 +925,7 @@ WalletService.prototype.broadcastTx = function(opts, cb) { self._notify('NewOutgoingTx', { txProposalId: opts.txProposalId, txid: txid - }, false, function() { + }, function() { return cb(null, txp); }); }); @@ -964,13 +971,13 @@ WalletService.prototype.rejectTx = function(opts, cb) { self._notify('TxProposalRejectedBy', { txProposalId: opts.txProposalId, copayerId: self.copayerId, - }, false, done); + }, done); }, function(done) { if (txp.status == 'rejected') { self._notify('TxProposalFinallyRejected', { txProposalId: opts.txProposalId, - }, false, done); + }, done); } else { done(); } @@ -1319,7 +1326,9 @@ WalletService.prototype.startScan = function(opts, cb) { result: err ? 'error' : 'success', }; if (err) data.error = err; - self._notify('ScanFinished', data, true); + self._notify('ScanFinished', data, { + isGlobal: true + }); }; self.getWallet({}, function(err, wallet) { diff --git a/lib/storage.js b/lib/storage.js index 11740d5..f964126 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -100,6 +100,9 @@ Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { + this.db.collection(collections.COPAYERS_LOOKUP).createIndex({ + copayerId: 1 + }); this.db.collection(collections.COPAYERS_LOOKUP).findOne({ copayerId: copayerId }, function(err, result) { @@ -315,6 +318,9 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { Storage.prototype.fetchAddress = function(address, cb) { var self = this; + this.db.collection(collections.ADDRESSES).createIndex({ + address: 1 + }); this.db.collection(collections.ADDRESSES).findOne({ address: address, }, function(err, result) { diff --git a/lib/wsapp.js b/lib/wsapp.js index 9bc8ca8..da325f8 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -53,8 +53,8 @@ WsApp.prototype.start = function(server, opts, cb) { }); socket.emit('challenge', socket.nonce); - done(); }); + done(); }, ], function(err) { if (cb) return cb(err);