diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index ff5d34c..dc7a36f 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -181,38 +181,87 @@ BlockchainMonitor.prototype._handleTxOuts = function(data) { }); }; -BlockchainMonitor.prototype._processBlock = function(network, hash, cb) { + +BlockchainMonitor.prototype._processBlockData = function(network, data, cb) { var self = this; - //1. check block is not reorg - //2. doProcessBlock - this.explorers[network].getBlock(hash, function(err, data) { + var block = new Bitcore.Block(new Buffer(data.rawblock, 'hex')); + log.debug('Processing block ' + network + ' ' + block.hash); + + var txs = block.toObject().transactions; + + var allAddresses = {}; + _.each(txs, function(tx) { + _.each(tx.outputs, function(o) { + if (o.script) { + var s = new Bitcore.Script(new Buffer(o.script, 'hex')); + var a = s.toAddress(network); + if (a) { + allAddresses[a] = true; + } + } + }); + }); + allAddresses = _.keys(allAddresses); + + async.eachLimit(allAddresses, 10, function(address, i_cb) { + self.storage.updateLastUsedOn(address, i_cb); + }, function(err) { if (err) return cb(err); - log.debug('Processing block ' + network + ' ' + hash); - var block = new Bitcore.Block(new Buffer(data.rawblock, 'hex')); - var txs = block.toObject().transactions; - - var allAddresses = {}; - _.each(txs, function(tx) { - _.each(tx.outputs, function(o) { - if (o.script) { - var s = new Bitcore.Script(new Buffer(o.script, 'hex')); - var a = s.toAddress(network); - if (a) { - allAddresses[a] = true; - } - } - }); - }); - allAddresses = _.keys(allAddresses); - - async.eachLimit(allAddresses, 10, function(address, i_cb) { - self.storage.updateLastUsedOn(address, i_cb); - }, cb); + return cb(null, block); }); }; + + + +BlockchainMonitor.prototype._fetchAndProcessBlock = function(network, hash, cb) { + var self = this; + + this.explorers[network].getBlock(hash, function(err, data) { + if (err) return cb(err); + self._processBlockData(network, data, cb); + }); +}; + + +BlockchainMonitor.prototype._processBlockchainSince = function(network, hash, cb) { + var self = this; + + self.storage.getTip(network, function(err, data) { + if (err) return cb(err); + + if (!data) { + log.error('No Tip recorded for ' + network); + return cb('No Tip recorded'); + } + + var tip = data.hash; + + log.debug('', 'Last processed tip for %s was: %s on ', network, tip, (new Date(1000*data.updateOn)).toString() ); + + self._fetchAndProcessBlock(network, hash, function(err, block){ + if (err) return cb(err); + + // TODO CRITICAL + if (!block) { + log.error('', 'Block from notification no found! %s %s', network, hash); + return cb('Block not found!'); + }; + + var header = block.header.toObject(); + + if (header.prevHash != tip) { + log.info ('', 'Notified block\'s prevhash does not match stored hash... Processing'); + } else { + self.storage.updateTip(network, hash, cb); + } + }); + }); + +}; + BlockchainMonitor.prototype._handleIncomingTx = function(data) { this._handleTxId(data); this._handleTxOuts(data); @@ -231,12 +280,10 @@ BlockchainMonitor.prototype._handleNewBlock = function(network, hash) { }, }); - self._processBlock(network, hash, function() { - self.storage.updateTip(function() { - self.storage.softResetAllTxHistoryCache(function() { - self._storeAndBroadcastNotification(notification, function(err) { - return; - }); + self._processBlockchainSince(network, hash, function() { + self.storage.softResetAllTxHistoryCache(function() { + self._storeAndBroadcastNotification(notification, function(err) { + return; }); }); }) diff --git a/lib/storage.js b/lib/storage.js index 7367cf4..24638dc 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -624,23 +624,33 @@ Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) { }; -Storage.prototype.updateTip = function(hash, cb) { +Storage.prototype.updateTip = function(network, hash, cb) { this.db.collection(collections.CACHE).update({ - type: 'blochainTip', + type: 'tip', + key: network, }, { $set: { - hash:hash, - + hash: hash, + updatedOn: Math.floor(Date.now() / 1000), } }, { upsert: true, }, cb); }; + +Storage.prototype.getTip = function(network,cb) { + this.db.collection(collections.CACHE).findOne({ + type: 'tip', + key: network, + }, cb); +}; + + Storage.prototype.softResetAllTxHistoryCache = function(cb) { this.db.collection(collections.CACHE).remove({ type: 'historyCacheStatus', - }, { + }, { multi: true, }, cb); }; diff --git a/test/integration/bcmonitor.js b/test/integration/bcmonitor.js index 267fc1b..0c570ca 100644 --- a/test/integration/bcmonitor.js +++ b/test/integration/bcmonitor.js @@ -144,17 +144,30 @@ describe('Blockchain monitor', function() { rawblock: TestData.block.rawblock }); - var fakeAddresses= TestData.block.addresses.splice(0, 3); + + server.storage.getTip = sinon.stub().yields(null, { + hash: TestData.block.prev, + updatedOn: Date.now(), + }); + + + var fakeAddresses = TestData.block.addresses.splice(0, 3); server.getWallet({}, function(err, wallet) { should.not.exist(err); + + var aLongTimeAgo = Date.now() - (1000 * 10 * 86400); + var clock = sinon.useFakeTimers(aLongTimeAgo, 'Date'); + + helpers.insertFakeAddresses(server, wallet, fakeAddresses, function(err) { should.not.exist(err); + clock.restore(); socket.handlers['block'](incoming); setTimeout(function() { storage.fetchRecentAddresses(wallet.id, (Date.now() / 1000) - 100, function(err, addr) { - _.pluck(addr,'address').should.be.deep.equal(fakeAddresses); + _.pluck(addr, 'address').should.be.deep.equal(fakeAddresses); done(); }); }, 50); diff --git a/test/testdata.js b/test/testdata.js index 149d47e..368aa3e 100644 --- a/test/testdata.js +++ b/test/testdata.js @@ -296,4 +296,4 @@ var addresses = [ 'mzrj4QmPhk98vc2yQw42uCsgwfBjVzPPLM', module.exports.keyPair = keyPair; module.exports.copayers = copayers; module.exports.history = history; -module.exports.block = { rawblock: rawblock, addresses: addresses}; +module.exports.block = { rawblock: rawblock, addresses: addresses, prev: '0000000000001e5b4b49aef6109a1ceeee9d4f628882bce97e06aa0f2889aee1'};