refactor notification dispatching

This commit is contained in:
Ivan Socolsky 2015-04-28 10:34:26 -03:00
parent de037a3227
commit 37a02c37ec
2 changed files with 70 additions and 0 deletions

View File

@ -10,11 +10,15 @@ var BlockchainExplorer = require('./blockchainexplorer');
var Storage = require('./storage');
var MessageBroker = require('./messagebroker');
<<<<<<< HEAD
var Notification = require('./model/notification');
function BlockchainMonitor() {};
BlockchainMonitor.prototype.start = function(opts, cb) {
=======
function BlockchainMonitor(opts) {
>>>>>>> refactor notification dispatching
opts = opts || {};
$.checkArgument(opts.blockchainExplorerOpts);
$.checkArgument(opts.storageOpts);
@ -65,23 +69,42 @@ BlockchainMonitor.prototype._initExplorer = function(provider, network, url) {
});
socket.on('tx', _.bind(self._handleIncommingTx, self));
<<<<<<< HEAD
return explorer;
};
=======
BlockchainMonitor.prototype.subscribeAddresses = function(walletService, addresses) {
$.checkArgument(walletService);
$.checkArgument(walletService.walletId);
>>>>>>> refactor notification dispatching
BlockchainMonitor.prototype._handleIncommingTx = function(data) {
var self = this;
var walletId = walletService.walletId;
if (!data || !data.vout) return;
<<<<<<< HEAD
var outs = _.compact(_.map(data.vout, function(v) {
var addr = _.keys(v)[0];
var startingChar = addr.charAt(0);
if (startingChar != '2' && startingChar != '3') return;
=======
function handlerFor(address, txid) {
var data = {
walletId: this.walletId,
address: address,
txid: txid,
};
self.emit('NewIncomingTx', data, this);
};
>>>>>>> refactor notification dispatching
return {
address: addr,
amount: +v[addr]
};
<<<<<<< HEAD
}));
if (_.isEmpty(outs)) return;
@ -99,12 +122,23 @@ BlockchainMonitor.prototype._handleIncommingTx = function(data) {
});
}, function(err) {
return;
=======
};
var addresses = [].concat(addresses);
var network = Bitcore.Address.fromString(addresses[0]).network.name;
var subscriber = self.subscriber[network];
_.each(addresses, function(address) {
self.subscriptions[walletId].addresses.push(address);
subscriber.subscribe(address, _.bind(handlerFor, walletService, address));
>>>>>>> refactor notification dispatching
});
};
BlockchainMonitor.prototype._createNotification = function(walletId, txid, address, amount, cb) {
var self = this;
<<<<<<< HEAD
var n = Notification.create({
type: 'NewIncomingTx',
data: {
@ -116,6 +150,17 @@ BlockchainMonitor.prototype._createNotification = function(walletId, txid, addre
});
self.storage.storeNotification(walletId, n, function() {
self.messageBroker.send(n)
=======
var walletId = walletService.walletId;
if (self.subscriptions[walletId]) return;
walletService.getMainAddresses({}, function(err, addresses) {
if (err) {
delete self.subscriptions[walletId];
return cb(new Error('Could not subscribe to addresses for wallet ' + walletId));
}
self.subscribeAddresses(walletService, _.pluck(addresses, 'address'));
>>>>>>> refactor notification dispatching
return cb();
});
};

View File

@ -0,0 +1,25 @@
'use strict';
var log = require('npmlog');
log.debug = log.verbose;
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
function NotificationBroadcaster() {};
nodeutil.inherits(NotificationBroadcaster, events.EventEmitter);
NotificationBroadcaster.prototype.broadcast = function(eventName, notification, walletService) {
this.emit(eventName, notification, walletService);
};
var _instance;
NotificationBroadcaster.singleton = function() {
if (!_instance) {
_instance = new NotificationBroadcaster();
}
return _instance;
};
module.exports = NotificationBroadcaster.singleton();