bitcore-wallet-service/lib/blockchainmonitor.js

320 lines
8.7 KiB
JavaScript

'use strict';
var $ = require('preconditions').singleton();
var _ = require('lodash');
var async = require('async');
var log = require('npmlog');
log.debug = log.verbose;
var BlockchainExplorer = require('./blockchainexplorer');
var Storage = require('./storage');
var MessageBroker = require('./messagebroker');
var Lock = require('./lock');
var Bitcore = require('bitcore-lib');
var Common = require('./common');
var Utils = Common.Utils;
var Constants = Common.Constants;
var Defaults = Common.Defaults;
var Notification = require('./model/notification');
var WalletService = require('./server');
function BlockchainMonitor() {};
BlockchainMonitor.prototype.start = function(opts, cb) {
opts = opts || {};
var self = this;
async.parallel([
function(done) {
self.explorers = _.object(_.map(['livenet', 'testnet'], function(network) {
var explorer;
if (opts.blockchainExplorers) {
explorer = opts.blockchainExplorers[network];
} else {
var config = {}
if (opts.blockchainExplorerOpts && opts.blockchainExplorerOpts[network]) {
config = opts.blockchainExplorerOpts[network];
}
var explorer = new BlockchainExplorer({
provider: config.provider,
network: network,
url: config.url,
userAgent: WalletService.getServiceVersion(),
});
}
$.checkState(explorer);
self._initExplorer(explorer, network);
return [network, explorer];
}));
done();
},
function(done) {
if (opts.storage) {
self.storage = opts.storage;
done();
} else {
self.storage = new Storage();
self.storage.connect(opts.storageOpts, done);
}
},
function(done) {
self.messageBroker = opts.messageBroker || new MessageBroker(opts.messageBrokerOpts);
done();
},
function(done) {
self.lock = opts.lock || new Lock(opts.lockOpts);
done();
},
], function(err) {
if (err) {
log.error(err);
}
return cb(err);
});
};
BlockchainMonitor.prototype._initExplorer = function(explorer, network) {
var self = this;
var socket = explorer.initSocket();
socket.on('connect', function() {
log.info('Connected to ' + explorer.getConnectionInfo());
socket.emit('subscribe', 'inv');
});
socket.on('connect_error', function() {
log.error('Error connecting to ' + explorer.getConnectionInfo());
});
socket.on('tx', _.bind(self._handleIncomingTx, self));
socket.on('block', _.bind(self._handleNewBlock, self, network));
};
BlockchainMonitor.prototype._handleTxId = function(data, processIt) {
var self = this;
if (!data || !data.txid) return;
self.storage.fetchTxByHash(data.txid, function(err, txp) {
if (err) {
log.error('Could not fetch tx from the db');
return;
}
if (!txp || txp.status != 'accepted') return;
var walletId = txp.walletId;
if (!processIt) {
log.info('Detected broadcast ' + data.txid + ' of an accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]');
return setTimeout(self._handleTxId.bind(self, data, true), 20 * 1000);
}
log.info('Processing accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]');
txp.setBroadcasted();
self.storage.softResetTxHistoryCache(walletId, function() {
self.storage.storeTx(self.walletId, txp, function(err) {
if (err)
log.error('Could not save TX');
var args = {
txProposalId: txp.id,
txid: data.txid,
amount: txp.getTotalAmount(),
};
var notification = Notification.create({
type: 'NewOutgoingTxByThirdParty',
data: args,
walletId: walletId,
});
self._storeAndBroadcastNotification(notification);
});
});
});
};
BlockchainMonitor.prototype._handleTxOuts = function(data) {
var self = this;
if (!data || !data.vout) return;
var outs = _.compact(_.map(data.vout, function(v) {
var addr = _.keys(v)[0];
return {
address: addr,
amount: +v[addr]
};
}));
if (_.isEmpty(outs)) return;
async.each(outs, function(out, next) {
self.storage.fetchAddress(out.address, function(err, address) {
if (err) {
log.error('Could not fetch addresses from the db');
return next(err);
}
if (!address || address.isChange) return next();
var walletId = address.walletId;
log.info('Incoming tx for wallet ' + walletId + ' [' + out.amount + 'sat -> ' + out.address + ']');
var notification = Notification.create({
type: 'NewIncomingTx',
data: {
txid: data.txid,
address: out.address,
amount: out.amount,
},
walletId: walletId,
});
self.storage.softResetTxHistoryCache(walletId, function() {
self.storage.updateLastUsedOn(out.address, null, function() {
self._storeAndBroadcastNotification(notification, next);
});
});
});
}, function(err) {
return;
});
};
BlockchainMonitor.prototype._processBlockBody = function(network, block, cb) {
var txs = block.transactions;
var allAddresses = {};
_.each(txs, function(tx) {
_.each(tx.outputs, function(o) {
if (o.script) {
var a = o.script.toAddress(network);
if (a) {
allAddresses[a] = true;
}
}
});
});
this.storage.updateLastUsedOn(_.keys(allAddresses), block.header.time, cb);
};
BlockchainMonitor.prototype._fetchAndProcessBlock = function(network, hash, cb) {
var self = this;
this.explorers[network].getBlock(hash, function(err, data) {
if (err) return cb(err);
if (!data) return cb('Could not find block ' + network + ':' + hash);
var block = new Bitcore.Block(new Buffer(data.rawblock, 'hex'));
log.debug('Processing block ' + network + ' ' + block.hash);
self._processBlockBody(network, block, function(err) {
if (err) return cb(err);
return cb(null, block);
});
});
};
BlockchainMonitor.prototype._getBlockPrevHash = function(block) {
return block.header.toObject().prevHash;
};
BlockchainMonitor.prototype._handleNewBlockchainTip = function(network, hash, cb) {
var self = this;
self.storage.getBlockchainTip(network, function(err, tip) {
if (err) return cb(err);
if (!tip) {
log.info('', 'Tip not stored for %s. Using default', network);
tip = {
hashes: [Defaults.DEEPEST_BLOCK_TO_PROCESS[network]],
};
}
if (tip.updatedOn) {
log.debug('', 'Last processed tip for %s was: %s on %s', network, tip.hashes[0], (new Date(1000 * tip.updatedOn)).toString());
}
function fetchAndProcess(network, hash, next) {
self._fetchAndProcessBlock(network, hash, function(err, block) {
if (err) return next(err);
// TODO CRITICAL
if (!block) {
log.error('', 'Block from notification not found! %s %s', network, hash);
throw ('Block not found!' + network + hash);
};
// to make it testeable, it is on a different function
var prevHash = self._getBlockPrevHash(block);
if (_.indexOf(tip.hashes, prevHash) == -1) {
log.info('', 'Prevhash does not match stored tip hashes... ' + prevHash);
return fetchAndProcess(network, prevHash, next);
}
return next();
});
};
fetchAndProcess(network, hash, function(err) {
if (err) return cb(err);
// Block is chained with our processed blocks
tip.hashes.unshift(hash);
var hashes = tip.hashes.splice(0, Defaults.MAX_REORG_DEPTH);
log.debug('', 'Storing new tip:' + hashes);
self.storage.updateBlockchainTip(network, hashes, cb);
});
});
}
BlockchainMonitor.prototype._handleIncomingTx = function(data) {
this._handleTxId(data);
this._handleTxOuts(data);
};
BlockchainMonitor.prototype._handleNewBlock = function(network, hash) {
var self = this;
log.info('New ' + network + ' block: ', hash);
var notification = Notification.create({
type: 'NewBlock',
walletId: network, // use network name as wallet id for global notifications
data: {
hash: hash,
network: network,
},
});
self._handleNewBlockchainTip(network, hash, function() {
self.storage.softResetAllTxHistoryCache(function() {
self._storeAndBroadcastNotification(notification, function(err) {
return;
});
});
})
};
BlockchainMonitor.prototype._storeAndBroadcastNotification = function(notification, cb) {
var self = this;
self.storage.storeNotification(notification.walletId, notification, function() {
self.messageBroker.send(notification)
if (cb) return cb();
});
};
module.exports = BlockchainMonitor;