better insight error handling

add check to getPending

add check to getPending

add tests

rm log

store TXID upon tx completion

monitor accepted txs

change delay

change _checkTxInBlockchain

change _checkTxInBlockchain

rm useless params, refactor txid param

complete tx at fetch
This commit is contained in:
Matias Alejo Garcia 2015-09-08 00:57:59 -03:00
parent 4baaa6dbab
commit a8607fa11b
7 changed files with 244 additions and 53 deletions

View File

@ -75,7 +75,10 @@ Insight.prototype.getTransaction = function(txid, cb) {
};
request(args, function(err, res, tx) {
if (err || res.statusCode !== 200) return cb(_parseErr(err,res));
if (res && res.statusCode == 404 ) return cb();
if (err || res.statusCode !== 200)
return cb(_parseErr(err,res));
return cb(null, tx);
});
};

View File

@ -86,7 +86,50 @@ BlockchainMonitor.prototype._initExplorer = function(explorer) {
socket.on('block', _.bind(self._handleNewBlock, self));
};
BlockchainMonitor.prototype._handleIncommingTx = function(data) {
BlockchainMonitor.prototype._handleTxId = function(data, processIt) {
var self = this;
if (!data || !data.txid) return;
self.storage.fetchTxByHash(data.txid, function(err, txp) {
if (err) {
log.error('Could not fetch tx the db');
return;
}
if (!txp || txp.status != 'accepted') return;
var walletId = txp.walletId;
if (!processIt) {
log.info('Detected broadcast ' + data.txid + ' of an accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]');
return setTimeout(self._handleTxId.bind(self, data, true), 20 * 1000);
}
log.info('Processing accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]');
txp.setBroadcasted();
self.storage.storeTx(self.walletId, txp, function(err) {
if (err)
log.error('Could not save TX');
var args = {
txProposalId: txp.id,
txid: data.txid,
amount: txp.getTotalAmount(),
};
var notification = Notification.create({
type: 'NewOutgoingTxByThirdParty',
data: args,
walletId: walletId,
});
self._storeAndBroadcastNotification(notification);
});
});
};
BlockchainMonitor.prototype._handleTxOuts = function(data) {
var self = this;
if (!data || !data.vout) return;
@ -130,6 +173,12 @@ BlockchainMonitor.prototype._handleIncommingTx = function(data) {
});
};
BlockchainMonitor.prototype._handleIncommingTx = function(data) {
this._handleTxId(data);
this._handleTxOuts(data);
};
BlockchainMonitor.prototype._handleNewBlock = function(hash) {
var self = this;
@ -150,7 +199,7 @@ BlockchainMonitor.prototype._storeAndBroadcastNotification = function(notificati
self.storage.storeNotification(notification.walletId, notification, function() {
self.messageBroker.send(notification)
return cb();
if (cb) return cb();
});
};

View File

@ -1,6 +1,7 @@
'use strict';
var _ = require('lodash');
var $ = require('preconditions').singleton();
var Uuid = require('uuid');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
@ -300,8 +301,10 @@ TxProposal.prototype.sign = function(copayerId, signatures, xpub) {
this.addAction(copayerId, 'accept', null, signatures, xpub);
if (this.status == 'accepted')
if (this.status == 'accepted') {
this.raw = tx.uncheckedSerialize();
this.txid = tx.id;
}
return true;
} catch (e) {
@ -331,8 +334,8 @@ TxProposal.prototype.isBroadcasted = function() {
return this.status == 'broadcasted';
};
TxProposal.prototype.setBroadcasted = function(txid) {
this.txid = txid;
TxProposal.prototype.setBroadcasted = function() {
$.checkState(this.txid);
this.status = 'broadcasted';
this.broadcastedOn = Math.floor(Date.now() / 1000);
};

View File

@ -1362,11 +1362,11 @@ WalletService.prototype.broadcastRawTx = function(opts, cb) {
WalletService.prototype._checkTxInBlockchain = function(txp, cb) {
var tx = txp.getBitcoreTx();
if (!txp.txid) return cb();
var bc = this._getBlockchainExplorer(txp.getNetworkName());
bc.getTransaction(tx.id, function(err, tx) {
bc.getTransaction(txp.txid, function(err, tx) {
if (err) return cb(err);
return cb(null, tx);
return cb(null, !!tx);
})
};
@ -1435,6 +1435,31 @@ WalletService.prototype.signTx = function(opts, cb) {
});
};
WalletService.prototype._processBroadcast = function(txp, opts, cb) {
var self = this;
$.checkState(txp.txid);
opts = opts || {};
txp.setBroadcasted();
self.storage.storeTx(self.walletId, txp, function(err) {
if (err) return cb(err);
var args = {
txProposalId: txp.id,
txid: txp.txid,
amount: txp.getTotalAmount(),
};
if (opts.byThirdParty) {
self._notify('NewOutgoingTxByThirdParty', args);
} else {
self._notify('NewOutgoingTx', args);
}
return cb(err, txp);
});
};
/**
* Broadcast a transaction proposal.
@ -1447,21 +1472,6 @@ WalletService.prototype.broadcastTx = function(opts, cb) {
if (!Utils.checkRequired(opts, ['txProposalId']))
return cb(new ClientError('Required argument missing'));
function setBroadcasted(txp, txid, cb) {
txp.setBroadcasted(txid);
self.storage.storeTx(self.walletId, txp, function(err) {
if (err) return cb(err);
self._notify('NewOutgoingTx', {
txProposalId: opts.txProposalId,
txid: txid,
amount: txp.getTotalAmount(),
}, function() {
return cb(null, txp);
});
});
};
self.getWallet({}, function(err, wallet) {
if (err) return cb(err);
@ -1483,14 +1493,21 @@ WalletService.prototype.broadcastTx = function(opts, cb) {
if (err) {
var broadcastErr = err;
// Check if tx already in blockchain
self._checkTxInBlockchain(txp, function(err, tx) {
self._checkTxInBlockchain(txp, function(err, isInBlockchain) {
if (err) return cb(err);
if (!tx) return cb(broadcastErr);
if (!isInBlockchain) return cb(broadcastErr);
setBroadcasted(txp, tx.txid, cb);
self._processBroadcast(txp, {
byThirdParty: true
}, cb);
});
} else {
setBroadcasted(txp, txid, cb);
self._processBroadcast(txp, {
byThirdParty: false
}, function(err) {
if (err) return cb(err);
return cb(null, txp);
});
}
});
});
@ -1570,7 +1587,20 @@ WalletService.prototype.getPendingTxs = function(opts, cb) {
txp.deleteLockTime = self.getRemainingDeleteLockTime(txp);
});
return cb(null, txps);
async.each(txps, function(txp, a_cb) {
if (txp.status != 'accepted') return a_cb();
self._checkTxInBlockchain(txp, function(err, isInBlockchain) {
if (err || !isInBlockchain) return a_cb(err);
self._processBroadcast(txp, {
byThirdParty: true
}, a_cb);
});
}, function(err) {
return cb(err, _.reject(txps, function(txp) {
return txp.status == 'broadcasted';
}));
});
});
};

View File

@ -41,6 +41,7 @@ Storage.prototype._createIndexes = function() {
this.db.collection(collections.TXS).createIndex({
walletId: 1,
isPending: 1,
txid: 1,
});
this.db.collection(collections.NOTIFICATIONS).createIndex({
walletId: 1,
@ -171,6 +172,7 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) {
});
};
// TODO: remove walletId from signature
Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
var self = this;
@ -184,7 +186,18 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
});
};
Storage.prototype.fetchTxByHash = function(hash, cb) {
var self = this;
this.db.collection(collections.TXS).findOne({
txid: hash,
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return self._completeTxData(result.walletId, Model.TxProposal.fromObj(result), cb);
});
};
Storage.prototype.fetchLastTxs = function(walletId, creatorId, limit, cb) {
var self = this;

View File

@ -181,8 +181,9 @@ helpers.stubUtxos = function(server, wallet, amounts, cb) {
});
};
helpers.stubBroadcast = function(txid) {
blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, null, txid);
helpers.stubBroadcast = function(thirdPartyBroadcast) {
blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, null, '112233');
blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, null);
};
helpers.stubHistory = function(txs) {
@ -485,14 +486,14 @@ describe('Wallet service', function() {
message: 'some message'
});
var txpId;
var txp;
async.waterfall([
function(next) {
server.createTx(txOpts, next);
},
function(txp, next) {
txpId = txp.id;
function(t, next) {
txp = t;
async.eachSeries(_.range(2), function(i, next) {
var copayer = TestData.copayers[i];
helpers.getAuthServer(copayer.id45, function(server) {
@ -500,14 +501,17 @@ describe('Wallet service', function() {
server.signTx({
txProposalId: txp.id,
signatures: signatures,
}, next);
}, function(err, t) {
txp = t;
next();
});
});
}, next);
},
function(next) {
helpers.stubBroadcast('999');
helpers.stubBroadcast();
server.broadcastTx({
txProposalId: txpId,
txProposalId: txp.id,
}, next);
},
], function(err) {
@ -525,7 +529,7 @@ describe('Wallet service', function() {
one.text.should.contain(wallet.name);
one.text.should.contain('800,000');
should.exist(one.html);
one.html.should.contain('https://insight.bitpay.com/tx/999');
one.html.should.contain('https://insight.bitpay.com/tx/' + txp.txid);
server.storage.fetchUnsentEmails(function(err, unsent) {
should.not.exist(err);
unsent.should.be.empty;
@ -2817,6 +2821,7 @@ describe('Wallet service', function() {
});
it('should sign a TX with multiple inputs, different paths, and return raw', function(done) {
blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, null);
server.getPendingTxs({}, function(err, txs) {
var tx = txs[0];
tx.id.should.equal(txid);
@ -3053,7 +3058,7 @@ describe('Wallet service', function() {
});
describe('#broadcastTx & #broadcastRawTx', function() {
var server, wallet, txpid;
var server, wallet, txpid, txid;
beforeEach(function(done) {
helpers.createAndJoinWallet(1, 1, function(s, w) {
server = s;
@ -3074,6 +3079,7 @@ describe('Wallet service', function() {
should.exist(txp);
txp.isAccepted().should.be.true;
txp.isBroadcasted().should.be.false;
txid = txp.txid;
txpid = txp.id;
done();
});
@ -3084,7 +3090,7 @@ describe('Wallet service', function() {
it('should broadcast a tx', function(done) {
var clock = sinon.useFakeTimers(1234000, 'Date');
helpers.stubBroadcast('999');
helpers.stubBroadcast();
server.broadcastTx({
txProposalId: txpid
}, function(err) {
@ -3094,7 +3100,7 @@ describe('Wallet service', function() {
}, function(err, txp) {
should.not.exist(err);
should.not.exist(txp.raw);
txp.txid.should.equal('999');
txp.txid.should.equal(txid);
txp.isBroadcasted().should.be.true;
txp.broadcastedOn.should.equal(1234);
clock.restore();
@ -3104,19 +3110,19 @@ describe('Wallet service', function() {
});
it('should broadcast a raw tx', function(done) {
helpers.stubBroadcast('999');
helpers.stubBroadcast();
server.broadcastRawTx({
network: 'testnet',
rawTx: 'raw tx',
}, function(err, txid) {
should.not.exist(err);
txid.should.equal('999');
should.exist(txid);
done();
});
});
it('should fail to brodcast a tx already marked as broadcasted', function(done) {
helpers.stubBroadcast('999');
helpers.stubBroadcast();
server.broadcastTx({
txProposalId: txpid
}, function(err) {
@ -3131,8 +3137,52 @@ describe('Wallet service', function() {
});
});
it('should auto process already broadcasted txs', function(done) {
helpers.stubBroadcast();
server.getPendingTxs({}, function(err, txs) {
should.not.exist(err);
txs.length.should.equal(1);
blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, {
txid: 999
});
server.getPendingTxs({}, function(err, txs) {
should.not.exist(err);
txs.length.should.equal(0);
done();
});
});
});
it('should process only broadcasted txs', function(done) {
helpers.stubBroadcast();
var txOpts = helpers.createSimpleProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 9, TestData.copayers[0].privKey_1H_0, {
message: 'some message 2'
});
server.createTx(txOpts, function(err, txp) {
should.not.exist(err);
server.getPendingTxs({}, function(err, txs) {
should.not.exist(err);
txs.length.should.equal(2);
blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, {
txid: 999
});
server.getPendingTxs({}, function(err, txs) {
should.not.exist(err);
txs.length.should.equal(1);
txs[0].status.should.equal('pending');
should.not.exist(txs[0].txid);
done();
});
});
});
});
it('should fail to brodcast a not yet accepted tx', function(done) {
helpers.stubBroadcast('999');
helpers.stubBroadcast();
var txOpts = helpers.createSimpleProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 9, TestData.copayers[0].privKey_1H_0, {
message: 'some message'
});
@ -3161,7 +3211,7 @@ describe('Wallet service', function() {
txProposalId: txpid
}, function(err, txp) {
should.not.exist(err);
should.not.exist(txp.txid);
should.exist(txp.txid);
txp.isBroadcasted().should.be.false;
should.not.exist(txp.broadcastedOn);
txp.isAccepted().should.be.true;
@ -3184,7 +3234,6 @@ describe('Wallet service', function() {
}, function(err, txp) {
should.not.exist(err);
should.exist(txp.txid);
txp.txid.should.equal('999');
txp.isBroadcasted().should.be.true;
should.exist(txp.broadcastedOn);
done();
@ -3204,7 +3253,7 @@ describe('Wallet service', function() {
txProposalId: txpid
}, function(err, txp) {
should.not.exist(err);
should.not.exist(txp.txid);
should.exist(txp.txid);
txp.isBroadcasted().should.be.false;
should.not.exist(txp.broadcastedOn);
txp.isAccepted().should.be.true;
@ -3221,7 +3270,7 @@ describe('Wallet service', function() {
server = s;
wallet = w;
helpers.stubUtxos(server, wallet, _.range(1, 9), function() {
helpers.stubBroadcast('999');
helpers.stubBroadcast();
done();
});
});
@ -3320,7 +3369,7 @@ describe('Wallet service', function() {
txp.isPending().should.be.true;
txp.isAccepted().should.be.true;
txp.isBroadcasted().should.be.false;
should.not.exist(txp.txid);
should.exist(txp.txid);
txp.actions.length.should.equal(2);
server.getNotifications({}, function(err, notifications) {
should.not.exist(err);
@ -3692,7 +3741,7 @@ describe('Wallet service', function() {
signatures: signatures,
}, function(err) {
should.not.exist(err);
helpers.stubBroadcast('1122334455');
helpers.stubBroadcast();
server.broadcastTx({
txProposalId: tx.id
}, function(err, txp) {
@ -3710,6 +3759,38 @@ describe('Wallet service', function() {
});
});
});
it('should notify sign, acceptance, and broadcast, and emit (with 3rd party broadcast', function(done) {
server.getPendingTxs({}, function(err, txs) {
var tx = txs[2];
var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey);
server.signTx({
txProposalId: tx.id,
signatures: signatures,
}, function(err) {
should.not.exist(err);
blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, 'err');
blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, {
txid: 11
});
server.broadcastTx({
txProposalId: tx.id
}, function(err, txp) {
should.not.exist(err);
server.getNotifications({
limit: 3,
reverse: true,
}, function(err, notifications) {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['NewOutgoingTxByThirdParty', 'TxProposalFinallyAccepted', 'TxProposalAcceptedBy']);
done();
});
});
});
});
});
});
describe('#removeWallet', function() {
@ -4192,13 +4273,13 @@ describe('Wallet service', function() {
}, function(err, tx) {
should.not.exist(err);
helpers.stubBroadcast('1122334455');
helpers.stubBroadcast();
server.broadcastTx({
txProposalId: tx.id
}, function(err, txp) {
should.not.exist(err);
var txs = [{
txid: '1122334455',
txid: txp.txid,
confirmations: 1,
fees: 5460,
time: 1,

View File

@ -151,6 +151,7 @@ describe('Storage', function() {
tx.status = 'rejected';
tx.isPending().should.be.false;
}
tx.txid = 'txid' + i;
return tx;
});
async.each(proposals, function(tx, next) {
@ -171,6 +172,17 @@ describe('Storage', function() {
done();
});
});
it('should fetch tx by hash', function(done) {
storage.fetchTxByHash('txid0', function(err, tx) {
should.not.exist(err);
should.exist(tx);
tx.id.should.equal(proposals[0].id);
tx.walletId.should.equal(proposals[0].walletId);
tx.creatorName.should.equal('copayer 0');
done();
});
});
it('should fetch all pending txs', function(done) {
storage.fetchPendingTxs('123', function(err, txs) {
should.not.exist(err);