send notification

This commit is contained in:
Ivan Socolsky 2017-05-17 15:10:16 -03:00
parent b856d1cb02
commit c87350d7ef
No known key found for this signature in database
GPG Key ID: FAECE6A05FAA4F56
6 changed files with 103 additions and 30 deletions

View File

@ -25,7 +25,8 @@ BlockchainMonitor.prototype.start = function(opts, cb) {
async.parallel([
function(done) {
self.explorers = _.indexBy(_.map(['livenet', 'testnet'], function(network) {
self.explorers = {};
_.map(['livenet', 'testnet'], function(network) {
var explorer;
if (opts.blockchainExplorers) {
explorer = opts.blockchainExplorers[network];
@ -42,9 +43,9 @@ BlockchainMonitor.prototype.start = function(opts, cb) {
});
}
$.checkState(explorer);
self._initExplorer(explorer);
return explorer;
}), 'network');
self._initExplorer(network, explorer);
self.explorers[network] = explorer;
});
done();
},
function(done) {
@ -72,7 +73,7 @@ BlockchainMonitor.prototype.start = function(opts, cb) {
});
};
BlockchainMonitor.prototype._initExplorer = function(explorer) {
BlockchainMonitor.prototype._initExplorer = function(network, explorer) {
var self = this;
var socket = explorer.initSocket();
@ -85,7 +86,7 @@ BlockchainMonitor.prototype._initExplorer = function(explorer) {
log.error('Error connecting to ' + explorer.getConnectionInfo());
});
socket.on('tx', _.bind(self._handleIncomingTx, self));
socket.on('block', _.bind(self._handleNewBlock, self, explorer.network));
socket.on('block', _.bind(self._handleNewBlock, self, network));
};
BlockchainMonitor.prototype._handleThirdPartyBroadcasts = function(data, processIt) {
@ -209,7 +210,7 @@ BlockchainMonitor.prototype._handleIncomingTx = function(data) {
BlockchainMonitor.prototype._notifyNewBlock = function(network, hash) {
var self = this;
log.info('New ' + network + ' block: ', hash);
log.info('New ' + network + ' block: ' + hash);
var notification = Notification.create({
type: 'NewBlock',
walletId: network, // use network name as wallet id for global notifications
@ -229,17 +230,52 @@ BlockchainMonitor.prototype._notifyNewBlock = function(network, hash) {
BlockchainMonitor.prototype._handleTxConfirmations = function(network, hash) {
var self = this;
function processTriggeredSubs(subs, cb) {
async.each(subs, function(sub) {
log.info('New tx confirmation ' + sub.txid);
sub.isActive = false;
self.storage.storeTxConfirmationSub(sub, function(err) {
if (err) return cb(err);
var notification = Notification.create({
type: 'TxConfirmation',
walletId: sub.walletId,
creatorId: sub.copayerId,
data: {
txid: sub.txid,
network: network,
// TODO: amount
},
});
self._storeAndBroadcastNotification(notification, cb);
});
});
};
var explorer = self.explorers[network];
if (!explorer) return;
explorer.getTxidsInBlock(hash, function(err, txids) {
if (err) {
log.error('Could not fetch txids from block ' + hash);
log.error('Could not fetch txids from block ' + hash, err);
return;
}
// self.storage.fetchTx
self.storage.fetchActiveTxConfirmationSubs(null, function(err, subs) {
if (err) return;
if (_.isEmpty(subs)) return;
var indexedSubs = _.indexBy(subs, 'txid');
var triggered = [];
_.each(txids, function(txid) {
if (indexedSubs[txid]) triggered.push(indexedSubs[txid]);
});
processTriggeredSubs(triggered, function(err) {
if (err) {
log.error('Could not process tx confirmations', err);
}
return;
});
});
});
};

View File

@ -9,9 +9,9 @@ TxConfirmationSub.create = function(opts) {
x.version = 1;
x.createdOn = Math.floor(Date.now() / 1000);
x.walletId = opts.walletId;
x.copayerId = opts.copayerId;
x.txid = opts.txid;
x.nbConfirmations = opts.nbConfirmations || 1;
x.isActive = true;
return x;
};
@ -21,9 +21,9 @@ TxConfirmationSub.fromObj = function(obj) {
x.version = obj.version;
x.createdOn = obj.createdOn;
x.walletId = obj.walletId;
x.copayerId = obj.copayerId;
x.txid = obj.txid;
x.nbConfirmations = obj.nbConfirmations;
x.isActive = obj.isActive;
return x;
};

View File

@ -3140,7 +3140,6 @@ WalletService.prototype.pushNotificationsUnsubscribe = function(opts, cb) {
* Subscribe this copayer to the specified tx to get a notification when the tx confirms.
* @param {Object} opts
* @param {string} opts.txid - The txid of the tx to be notified of.
* @param {string} [opts.nbConfirmations=1] - The number of confirmations to wait before sending the notification.
*/
WalletService.prototype.txConfirmationSubscribe = function(opts, cb) {
if (!checkRequired(opts, ['txid'], cb)) return;
@ -3149,8 +3148,8 @@ WalletService.prototype.txConfirmationSubscribe = function(opts, cb) {
var sub = Model.TxConfirmationSub.create({
copayerId: self.copayerId,
walletId: self.walletId,
txid: opts.txid,
nbConfirmations: +opts.nbConfirmations || 1,
});
self.storage.storeTxConfirmationSub(sub, cb);

View File

@ -941,19 +941,22 @@ Storage.prototype.removePushNotificationSub = function(copayerId, token, cb) {
};
Storage.prototype.fetchActiveTxConfirmationSubs = function(copayerId, cb) {
this.db.collection(collections.TX_CONFIRMATION_SUBS).find({
copayerId: copayerId,
isActive: true,
}).toArray(function(err, result) {
if (err) return cb(err);
var filter = {
isActive: true
};
if (copayerId) filter.copayerId = copayerId;
if (!result) return cb();
this.db.collection(collections.TX_CONFIRMATION_SUBS).find(filter)
.toArray(function(err, result) {
if (err) return cb(err);
var subs = _.map([].concat(result), function(r) {
return Model.TxConfirmationSub.fromObj(r);
if (!result) return cb();
var subs = _.map([].concat(result), function(r) {
return Model.TxConfirmationSub.fromObj(r);
});
return cb(null, subs);
});
return cb(null, subs);
});
};
Storage.prototype.storeTxConfirmationSub = function(txConfirmationSub, cb) {

View File

@ -114,4 +114,45 @@ describe('Blockchain monitor', function() {
}, 50);
});
});
it('should notify copayers of tx confirmation', function(done) {
server.createAddress({}, function(err, address) {
should.not.exist(err);
var incoming = {
txid: '123',
vout: [{}],
};
incoming.vout[0][address.address] = 1500;
server.txConfirmationSubscribe({
txid: '123'
}, function(err) {
should.not.exist(err);
blockchainExplorer.getTxidsInBlock = sinon.stub().callsArgWith(1, null, ['123', '456']);
socket.handlers['block']('block1');
setTimeout(function() {
blockchainExplorer.getTxidsInBlock = sinon.stub().callsArgWith(1, null, ['123', '456']);
socket.handlers['block']('block2');
setTimeout(function() {
server.getNotifications({}, function(err, notifications) {
should.not.exist(err);
var notifications = _.filter(notifications, {
type: 'TxConfirmation'
});
notifications.length.should.equal(1);
var n = notifications[0];
n.walletId.should.equal(wallet.id);
n.creatorId.should.equal(server.copayerId);
n.data.txid.should.equal('123');
done();
});
}, 50);
}, 50);
});
});
});
});

View File

@ -7365,7 +7365,6 @@ describe('Wallet service', function() {
subs.length.should.equal(1);
var s = subs[0];
s.txid.should.equal('123');
s.nbConfirmations.should.equal(1);
s.isActive.should.be.true;
done();
});
@ -7380,17 +7379,12 @@ describe('Wallet service', function() {
}, function(err) {
server.txConfirmationSubscribe({
txid: '123',
nbConfirmations: 6,
}, function(err) {
should.not.exist(err);
server.storage.fetchActiveTxConfirmationSubs(wallet.copayers[0].id, function(err, subs) {
should.not.exist(err);
should.exist(subs);
subs.length.should.equal(1);
var s = subs[0];
s.txid.should.equal('123');
s.nbConfirmations.should.equal(6);
s.isActive.should.be.true;
done();
});
});