Merge pull request #202 from isocolsky/message_queue

Allow multiple instances (using message broker)
This commit is contained in:
Matias Alejo Garcia 2015-05-07 18:01:19 -03:00
commit 0cbff65110
12 changed files with 348 additions and 305 deletions

17
bcmonitor/bcmonitor.js Normal file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env node
'use strict';
var _ = require('lodash');
var log = require('npmlog');
log.debug = log.verbose;
var config = require('../config');
var BlockchainMonitor = require('../lib/blockchainmonitor');
var bcm = new BlockchainMonitor();
bcm.start(config, function(err) {
if (err) throw err;
console.log('Blockchain monitor started');
});

15
bws.js
View File

@ -37,8 +37,10 @@ var start = function(cb) {
if (err) return cb(err);
var server = config.https ? serverModule.createServer(serverOpts, app) :
serverModule.Server(app);
WsApp.start(server, config);
return server;
var wsApp = new WsApp();
wsApp.start(server, config, function(err) {
return server;
});
});
});
return cb(null, server);
@ -47,8 +49,10 @@ var start = function(cb) {
if (err) return cb(err);
server = config.https ? serverModule.createServer(serverOpts, app) :
serverModule.Server(app);
WsApp.start(server, config);
return cb(null, server);
var wsApp = new WsApp();
wsApp.start(server, config, function(err) {
return cb(err, server);
});
});
};
};
@ -56,6 +60,9 @@ var start = function(cb) {
if (config.cluster && !config.lockOpts.lockerServer)
throw 'When running in cluster mode, locker server need to be configured';
if (config.cluster && !config.messageBrokerOpts.messageBrokerServer)
throw 'When running in cluster mode, message broker server need to be configured';
start(function(err, server) {
if (err) {
console.log('Could not start BWS:', err);

View File

@ -23,6 +23,12 @@ var config = {
// port: 3231,
// },
},
messageBrokerOpts: {
// To use message broker server, uncomment this:
messageBrokerServer: {
url: 'http://localhost:3380',
},
},
blockchainExplorerOpts: {
livenet: {
provider: 'insight',

View File

@ -62,7 +62,9 @@ function getAddressActivityInsight(url, addresses, cb) {
};
function initSocketInsight(url) {
var socket = io.connect(url, {});
var socket = io.connect(url, {
'reconnection': true,
});
return socket;
};

View File

@ -5,100 +5,119 @@ var _ = require('lodash');
var async = require('async');
var log = require('npmlog');
log.debug = log.verbose;
var Uuid = require('uuid');
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
var WalletService = require('./server');
var BlockchainExplorer = require('./blockchainexplorer');
var Storage = require('./storage');
var MessageBroker = require('./messagebroker');
var Notification = require('./model/notification');
function BlockchainMonitor(opts) {
function BlockchainMonitor() {};
BlockchainMonitor.prototype.start = function(opts, cb) {
opts = opts || {};
$.checkArgument(opts.blockchainExplorerOpts);
$.checkArgument(opts.storageOpts);
var self = this;
this.subscriptions = {};
this.subscriber = {};
_.each(['livenet', 'testnet'], function(network) {
opts[network] = opts[network] || {};
self.subscriber[network] = self._getAddressSubscriber(
opts[network].provider, network, opts[network].url);
});
async.parallel([
function(done) {
self.explorers = _.map(['livenet', 'testnet'], function(network) {
var config = opts.blockchainExplorerOpts[network] || {};
return self._initExplorer(config.provider, network, config.url);
});
done();
},
function(done) {
self.storage = new Storage();
self.storage.connect(opts.storageOpts, done);
},
function(done) {
self.messageBroker = new MessageBroker(opts.messageBrokerOpts);
done();
},
], cb);
};
nodeutil.inherits(BlockchainMonitor, events.EventEmitter);
BlockchainMonitor.prototype._getAddressSubscriber = function(provider, network) {
BlockchainMonitor.prototype._initExplorer = function(provider, network, url) {
$.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported');
var self = this;
var explorer = new BlockchainExplorer({
provider: provider,
network: network,
url: url,
});
var socket = explorer.initSocket();
// TODO: Extract on its own class once more providers are implemented
return {
subscribe: function(address, handler) {
socket.emit('subscribe', address);
socket.on(address, handler);
},
};
var connectionInfo = provider + ' (' + network + ') @ ' + url;
socket.on('connect', function() {
log.info('Connected to ' + connectionInfo);
socket.emit('subscribe', 'inv');
});
socket.on('connect_error', function() {
log.error('Error connecting to ' + connectionInfo);
});
socket.on('tx', _.bind(self._handleIncommingTx, self));
return explorer;
};
BlockchainMonitor.prototype.subscribeAddresses = function(walletId, addresses) {
$.checkArgument(walletId);
BlockchainMonitor.prototype._handleIncommingTx = function(data) {
var self = this;
if (!addresses || addresses.length == 0) return;
if (!data || !data.vout) return;
function handlerFor(address, txid) {
var notification = Notification.create({
walletId: this,
type: 'NewIncomingTx',
data: {
address: address,
txid: txid,
},
});
self.emit('notification', notification);
};
var outs = _.compact(_.map(data.vout, function(v) {
var addr = _.keys(v)[0];
var startingChar = addr.charAt(0);
if (startingChar != '2' && startingChar != '3') return;
if (!self.subscriptions[walletId]) {
self.subscriptions[walletId] = {
addresses: [],
return {
address: addr,
amount: +v[addr]
};
};
}));
if (_.isEmpty(outs)) return;
var addresses = [].concat(addresses);
var network = Bitcore.Address.fromString(addresses[0]).network.name;
var subscriber = self.subscriber[network];
_.each(addresses, function(address) {
self.subscriptions[walletId].addresses.push(address);
subscriber.subscribe(address, _.bind(handlerFor, walletId, address));
async.each(outs, function(out, next) {
self.storage.fetchAddress(out.address, function(err, address) {
if (err) {
log.error('Could not fetch addresses from the db');
return next(err);
}
if (!address || address.isChange) return next();
var walletId = address.walletId;
log.info('Incoming tx for wallet ' + walletId + ' [' + out.amount + 'sat -> ' + out.address + ']');
self._createNotification(walletId, data.txid, out.address, out.amount, next);
});
}, function(err) {
return;
});
};
BlockchainMonitor.prototype.subscribeWallet = function(walletService, cb) {
BlockchainMonitor.prototype._createNotification = function(walletId, txid, address, amount, cb) {
var self = this;
var walletId = walletService.walletId;
if (self.subscriptions[walletId]) return;
walletService.getMainAddresses({}, function(err, addresses) {
if (err) {
delete self.subscriptions[walletId];
return cb(new Error('Could not subscribe to addresses for wallet ' + walletId));
}
self.subscribeAddresses(walletService.walletId, _.pluck(addresses, 'address'));
var n = Notification.create({
type: 'NewIncomingTx',
data: {
txid: txid,
address: address,
amount: amount,
},
walletId: walletId,
});
self.storage.storeNotification(walletId, n, function() {
self.messageBroker.send(n)
return cb();
});
};
module.exports = BlockchainMonitor;

46
lib/messagebroker.js Normal file
View File

@ -0,0 +1,46 @@
var $ = require('preconditions').singleton();
var _ = require('lodash');
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
var log = require('npmlog');
log.debug = log.verbose;
log.disableColor();
function MessageBroker(opts) {
var self = this;
opts = opts || {};
if (opts.messageBrokerServer) {
var url = opts.messageBrokerServer.url;
this.remote = true;
this.mq = require('socket.io-client').connect(url);
this.mq.on('connect', function() {});
this.mq.on('connect_error', function() {
log.warn('Error connecting to message broker server @ ' + url);
});
this.mq.on('msg', function(data) {
self.emit('msg', data);
});
log.info('Using message broker server at ' + url);
}
};
nodeutil.inherits(MessageBroker, events.EventEmitter);
MessageBroker.prototype.send = function(data) {
if (this.remote) {
this.mq.emit('msg', data);
} else {
this.emit('msg', data);
}
};
MessageBroker.prototype.onMessage = function(handler) {
this.on('msg', handler);
};
module.exports = MessageBroker;

View File

@ -1,25 +0,0 @@
'use strict';
var log = require('npmlog');
log.debug = log.verbose;
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
function NotificationBroadcaster() {};
nodeutil.inherits(NotificationBroadcaster, events.EventEmitter);
NotificationBroadcaster.prototype.broadcast = function(eventName, notification) {
this.emit(eventName, notification);
};
var _instance;
NotificationBroadcaster.singleton = function() {
if (!_instance) {
_instance = new NotificationBroadcaster();
}
return _instance;
};
module.exports = NotificationBroadcaster.singleton();

View File

@ -16,7 +16,7 @@ var ClientError = require('./clienterror');
var Utils = require('./utils');
var Lock = require('./lock');
var Storage = require('./storage');
var NotificationBroadcaster = require('./notificationbroadcaster');
var MessageBroker = require('./messagebroker');
var BlockchainExplorer = require('./blockchainexplorer');
var Wallet = require('./model/wallet');
@ -27,6 +27,7 @@ var Notification = require('./model/notification');
var initialized = false;
var lock, storage, blockchainExplorer, blockchainExplorerOpts;
var messageBroker;
/**
@ -41,13 +42,10 @@ function WalletService() {
this.storage = storage;
this.blockchainExplorer = blockchainExplorer;
this.blockchainExplorerOpts = blockchainExplorerOpts;
this.messageBroker = messageBroker;
this.notifyTicker = 0;
};
WalletService.onNotification = function(func) {
NotificationBroadcaster.on('notification', func);
};
/**
* Initializes global settings for all instances.
* @param {Object} opts
@ -58,7 +56,6 @@ WalletService.onNotification = function(func) {
WalletService.initialize = function(opts, cb) {
$.shouldBeFunction(cb);
opts = opts || {};
lock = opts.lock || new Lock(opts.lockOpts);
blockchainExplorer = opts.blockchainExplorer;
@ -67,19 +64,44 @@ WalletService.initialize = function(opts, cb) {
if (initialized)
return cb();
if (opts.storage) {
storage = opts.storage;
initialized = true;
return cb();
} else {
function initStorage(cb) {
if (opts.storage) {
storage = opts.storage;
return cb();
}
var newStorage = new Storage();
newStorage.connect(opts.storageOpts, function(err) {
if (err) return cb(err);
storage = newStorage;
initialized = true;
return cb();
});
}
};
function initMessageBroker(cb) {
if (opts.messageBroker) {
messageBroker = opts.messageBroker;
} else {
messageBroker = new MessageBroker(opts.messageBrokerOpts);
}
return cb();
};
async.series([
function(next) {
initStorage(next);
},
function(next) {
initMessageBroker(next);
},
], function(err) {
if (err) {
log.error('Could not initialize', err);
throw err;
}
initialized = true;
return cb();
});
};
@ -260,11 +282,11 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) {
walletId: opts.walletId,
copayerId: self.copayerId,
copayerName: opts.name,
});
return cb(null, {
copayerId: self.copayerId,
wallet: wallet
}, function() {
return cb(null, {
copayerId: self.copayerId,
wallet: wallet
});
});
});
});
@ -281,25 +303,23 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) {
return WalletUtils.verifyMessage(text, signature, pubKey);
};
/**
* _emit
*
* @param {Object} args
*/
WalletService.prototype._emit = function(eventName, args) {
NotificationBroadcaster.broadcast(eventName, args);
};
/**
* _notify
*
* @param {String} type
* @param {Object} data
* @param {Boolean} isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false)
* @param {Object} opts
* @param {Boolean} opts.isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false)
*/
WalletService.prototype._notify = function(type, data, isGlobal) {
WalletService.prototype._notify = function(type, data, opts, cb) {
var self = this;
if (_.isFunction(opts)) {
cb = opts;
opts = {};
}
opts = opts || {};
log.debug('Notification', type, data);
var walletId = self.walletId || data.walletId;
@ -311,11 +331,12 @@ WalletService.prototype._notify = function(type, data, isGlobal) {
type: type,
data: data,
ticker: this.notifyTicker++,
creatorId: isGlobal ? null : copayerId,
creatorId: opts.isGlobal ? null : copayerId,
walletId: walletId,
});
this.storage.storeNotification(walletId, n, function() {
self._emit('notification', n);
self.messageBroker.send(n);
if (cb) return cb();
});
};
@ -379,10 +400,11 @@ WalletService.prototype.joinWallet = function(opts, cb) {
walletId: opts.walletId,
copayerId: copayer.id,
copayerName: copayer.name,
});
return cb(null, {
copayerId: copayer.id,
wallet: wallet
}, function() {
return cb(null, {
copayerId: copayer.id,
wallet: wallet
});
});
});
});
@ -411,8 +433,9 @@ WalletService.prototype.createAddress = function(opts, cb) {
self._notify('NewAddress', {
address: address.address,
}, function() {
return cb(null, address);
});
return cb(null, address);
});
});
});
@ -710,8 +733,9 @@ WalletService.prototype.createTx = function(opts, cb) {
self._notify('NewTxProposal', {
amount: opts.amount
}, function() {
return cb(null, txp);
});
return cb(null, txp);
});
});
});
@ -783,8 +807,9 @@ WalletService.prototype.removePendingTx = function(opts, cb) {
if (actors.length > 1 || (actors.length == 1 && actors[0] !== self.copayerId))
return cb(new ClientError('TXACTIONED', 'Cannot remove a proposal signed/rejected by other copayers'));
self._notify('TxProposalRemoved');
self.storage.removeTx(self.walletId, txp.id, cb);
self._notify('TxProposalRemoved', {}, function() {
self.storage.removeTx(self.walletId, txp.id, cb);
});
});
});
};
@ -839,18 +864,26 @@ WalletService.prototype.signTx = function(opts, cb) {
self.storage.storeTx(self.walletId, txp, function(err) {
if (err) return cb(err);
self._notify('TxProposalAcceptedBy', {
txProposalId: opts.txProposalId,
copayerId: self.copayerId,
async.parallel([
function(done) {
self._notify('TxProposalAcceptedBy', {
txProposalId: opts.txProposalId,
copayerId: self.copayerId,
}, done);
},
function(done) {
if (txp.isAccepted()) {
self._notify('TxProposalFinallyAccepted', {
txProposalId: opts.txProposalId,
}, done);
} else {
done();
}
},
], function() {
return cb(null, txp);
});
if (txp.isAccepted()) {
self._notify('TxProposalFinallyAccepted', {
txProposalId: opts.txProposalId,
});
}
return cb(null, txp);
});
});
});
@ -892,9 +925,9 @@ WalletService.prototype.broadcastTx = function(opts, cb) {
self._notify('NewOutgoingTx', {
txProposalId: opts.txProposalId,
txid: txid
}, function() {
return cb(null, txp);
});
return cb(null, txp);
});
});
});
@ -932,19 +965,26 @@ WalletService.prototype.rejectTx = function(opts, cb) {
self.storage.storeTx(self.walletId, txp, function(err) {
if (err) return cb(err);
self._notify('TxProposalRejectedBy', {
txProposalId: opts.txProposalId,
copayerId: self.copayerId,
async.parallel([
function(done) {
self._notify('TxProposalRejectedBy', {
txProposalId: opts.txProposalId,
copayerId: self.copayerId,
}, done);
},
function(done) {
if (txp.status == 'rejected') {
self._notify('TxProposalFinallyRejected', {
txProposalId: opts.txProposalId,
}, done);
} else {
done();
}
},
], function() {
return cb(null, txp);
});
if (txp.status == 'rejected') {
self._notify('TxProposalFinallyRejected', {
txProposalId: opts.txProposalId,
});
};
return cb(null, txp);
});
});
};
@ -1286,7 +1326,9 @@ WalletService.prototype.startScan = function(opts, cb) {
result: err ? 'error' : 'success',
};
if (err) data.error = err;
self._notify('ScanFinished', data, true);
self._notify('ScanFinished', data, {
isGlobal: true
});
};
self.getWallet({}, function(err, wallet) {

View File

@ -100,6 +100,9 @@ Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) {
};
Storage.prototype.fetchCopayerLookup = function(copayerId, cb) {
this.db.collection(collections.COPAYERS_LOOKUP).createIndex({
copayerId: 1
});
this.db.collection(collections.COPAYERS_LOOKUP).findOne({
copayerId: copayerId
}, function(err, result) {
@ -312,6 +315,23 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
});
};
Storage.prototype.fetchAddress = function(address, cb) {
var self = this;
this.db.collection(collections.ADDRESSES).createIndex({
address: 1
});
this.db.collection(collections.ADDRESSES).findOne({
address: address,
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return cb(null, Model.Address.fromObj(result));
});
};
Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log;
cb = cb || function() {};

View File

@ -7,63 +7,57 @@ var log = require('npmlog');
log.debug = log.verbose;
var Uuid = require('uuid');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
var WalletService = require('./server');
var BlockchainMonitor = require('./blockchainmonitor')
var Notification = require('./model/notification');
var MessageBroker = require('./messagebroker');
log.level = 'debug';
var io, bcMonitor;
var WsApp = function() {};
WsApp._unauthorized = function(socket) {
WsApp.prototype._unauthorized = function(socket) {
socket.emit('unauthorized');
socket.disconnect();
};
WsApp.handleNotification = function(service, notification) {
if (notification.type == 'NewAddress') {
self.subscribeAddresses(notification.walletId, notification.data.address);
}
io.to(notification.walletId).emit('notification', notification);
WsApp.prototype._handleNotification = function(notification) {
this.io.to(notification.walletId).emit('notification', notification);
};
WsApp.start = function(server, config) {
io = require('socket.io')(server);
bcMonitor = new BlockchainMonitor(config.blockchainExplorerOpts);
WsApp.prototype.start = function(server, opts, cb) {
opts = opts || {};
$.checkState(opts.messageBrokerOpts);
function handleNotification(notification) {
if (notification.type == 'NewAddress') {
bcMonitor.subscribeAddresses(notification.walletId, notification.data.address);
}
io.to(notification.walletId).emit('notification', notification);
};
var self = this;
bcMonitor.on('notification', handleNotification);
WalletService.onNotification(handleNotification);
this.io = require('socket.io')(server);
io.on('connection', function(socket) {
socket.nonce = Uuid.v4();
socket.emit('challenge', socket.nonce);
async.series([
socket.on('authorize', function(data) {
if (data.message != socket.nonce) return WsApp._unauthorized(socket);
function(done) {
self.messageBroker = new MessageBroker(opts.messageBrokerOpts);
self.messageBroker.onMessage(_.bind(self._handleNotification, self));
done();
},
function(done) {
self.io.on('connection', function(socket) {
socket.nonce = Uuid.v4();
socket.on('authorize', function(data) {
if (data.message != socket.nonce) return self._unauthorized(socket);
WalletService.getInstanceWithAuth(data, function(err, service) {
if (err) return WsApp._unauthorized(socket);
WalletService.getInstanceWithAuth(data, function(err, service) {
if (err) return self._unauthorized(socket);
socket.join(service.walletId);
socket.emit('authorized');
bcMonitor.subscribeWallet(service, function(err) {
if (err) log.warn(err.message);
socket.join(service.walletId);
socket.emit('authorized');
});
});
socket.emit('challenge', socket.nonce);
});
});
done();
},
], function(err) {
if (cb) return cb(err);
});
};

View File

@ -0,0 +1,23 @@
#!/usr/bin/env node
'use strict';
var $ = require('preconditions').singleton();
var io = require('socket.io');
var log = require('npmlog');
log.debug = log.verbose;
var DEFAULT_PORT = 3380;
var opts = {
port: parseInt(process.argv[2]) || DEFAULT_PORT,
};
var server = io(opts.port);
server.on('connection', function(socket) {
socket.on('msg', function(data) {
server.emit('msg', data);
});
});
console.log('Message broker server listening on port ' + opts.port)

View File

@ -19,7 +19,6 @@ var Utils = require('../../lib/utils');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
var Storage = require('../../lib/storage');
var BlockchainMonitor = require('../../lib/blockchainmonitor');
var Model = require('../../lib/model');
var Wallet = Model.Wallet;
@ -28,7 +27,6 @@ var Address = Model.Address;
var Copayer = Model.Copayer;
var WalletService = require('../../lib/server');
var NotificationBroadcaster = require('../../lib/notificationbroadcaster');
var TestData = require('../testdata');
var helpers = {};
@ -2050,7 +2048,6 @@ describe('Wallet service', function() {
server.getPendingTxs({}, function(err, txs) {
var tx = txs[2];
var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey);
sinon.spy(server, '_emit');
server.signTx({
txProposalId: tx.id,
signatures: signatures,
@ -2068,11 +2065,6 @@ describe('Wallet service', function() {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['NewOutgoingTx', 'TxProposalFinallyAccepted', 'TxProposalAcceptedBy']);
// Check also events
server._emit.getCall(0).args[1].type.should.equal('TxProposalAcceptedBy');
server._emit.getCall(1).args[1].type.should.equal('TxProposalFinallyAccepted');;
server._emit.getCall(2).args[1].type.should.equal('NewOutgoingTx');
done();
});
});
@ -2738,7 +2730,7 @@ describe('Wallet service', function() {
});
afterEach(function() {
WalletService.scanConfig = scanConfigOld;
NotificationBroadcaster.removeAllListeners();
server.messageBroker.removeAllListeners();
});
it('should start an asynchronous scan', function(done) {
@ -2755,7 +2747,7 @@ describe('Wallet service', function() {
'm/2147483647/1/0',
'm/2147483647/1/1',
];
WalletService.onNotification(function(n) {
server.messageBroker.onMessage(function(n) {
if (n.type == 'ScanFinished') {
server.getWallet({}, function(err, wallet) {
should.exist(wallet.scanStatus);
@ -2781,7 +2773,7 @@ describe('Wallet service', function() {
});
it('should set scan status error when unable to reach blockchain', function(done) {
blockchainExplorer.getAddressActivity = sinon.stub().yields('dummy error');
WalletService.onNotification(function(n) {
server.messageBroker.onMessage(function(n) {
if (n.type == 'ScanFinished') {
should.exist(n.data.error);
server.getWallet({}, function(err, wallet) {
@ -2800,7 +2792,7 @@ describe('Wallet service', function() {
WalletService.scanConfig.SCAN_WINDOW = 1;
var scans = 0;
WalletService.onNotification(function(n) {
server.messageBroker.onMessage(function(n) {
if (n.type == 'ScanFinished') {
scans++;
if (scans == 2) done();
@ -3014,7 +3006,6 @@ describe('Wallet service', function() {
should.not.exist(err);
var copayerId2 = result.copayerId;
helpers.getAuthServer(copayerId, function(server) {
server.getWallet({}, function(err, wallet) {
@ -3040,102 +3031,3 @@ describe('Wallet service', function() {
});
});
});
describe('Blockchain monitor', function() {
var addressSubscriber;
before(function(done) {
openDb(function() {
storage = new Storage({
db: db
});
done();
});
});
beforeEach(function(done) {
addressSubscriber = sinon.stub();
addressSubscriber.subscribe = sinon.stub();
sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber);
resetDb(function() {
blockchainExplorer = sinon.stub();
WalletService.initialize({
storage: storage,
blockchainExplorer: blockchainExplorer,
}, done);
});
});
afterEach(function() {
BlockchainMonitor.prototype._getAddressSubscriber.restore();
});
after(function(done) {
WalletService.shutDown(done);
});
it('should subscribe wallet', function(done) {
var monitor = new BlockchainMonitor();
helpers.createAndJoinWallet(2, 2, function(server, wallet) {
server.createAddress({}, function(err, address1) {
should.not.exist(err);
server.createAddress({}, function(err, address2) {
should.not.exist(err);
monitor.subscribeWallet(server, function(err) {
should.not.exist(err);
addressSubscriber.subscribe.calledTwice.should.be.true;
addressSubscriber.subscribe.calledWith(address1.address).should.be.true;
addressSubscriber.subscribe.calledWith(address2.address).should.be.true;
done();
});
});
});
});
});
it('should be able to subscribe new address', function(done) {
var monitor = new BlockchainMonitor();
helpers.createAndJoinWallet(2, 2, function(server, wallet) {
server.createAddress({}, function(err, address1) {
should.not.exist(err);
monitor.subscribeWallet(server, function(err) {
should.not.exist(err);
addressSubscriber.subscribe.calledOnce.should.be.true;
addressSubscriber.subscribe.calledWith(address1.address).should.be.true;
server.createAddress({}, function(err, address2) {
should.not.exist(err);
monitor.subscribeAddresses(wallet.id, address2.address);
addressSubscriber.subscribe.calledTwice.should.be.true;
addressSubscriber.subscribe.calledWith(address2.address).should.be.true;
done();
});
});
});
});
});
it('should create NewIncomingTx notification when a new tx arrives on registered address', function(done) {
var monitor = new BlockchainMonitor();
helpers.createAndJoinWallet(2, 2, function(server, wallet) {
server.createAddress({}, function(err, address1) {
should.not.exist(err);
monitor.subscribeWallet(server, function(err) {
should.not.exist(err);
addressSubscriber.subscribe.calledOnce.should.be.true;
addressSubscriber.subscribe.getCall(0).args[0].should.equal(address1.address);
var handler = addressSubscriber.subscribe.getCall(0).args[1];
_.isFunction(handler).should.be.true;
monitor.on('notification', function(notification) {
notification.type.should.equal('NewIncomingTx');
notification.data.address.should.equal(address1.address);
notification.data.txid.should.equal('txid');
done();
});
handler('txid');
});
});
});
});
});