'use strict'; var $ = require('preconditions').singleton(); var _ = require('lodash'); var async = require('async'); var log = require('npmlog'); log.debug = log.verbose; var BlockchainExplorer = require('./blockchainexplorer'); var Storage = require('./storage'); var MessageBroker = require('./messagebroker'); var Lock = require('./lock'); var Notification = require('./model/notification'); var WalletService = require('./server'); var Constants = require('./common/constants'); function BlockchainMonitor() {}; BlockchainMonitor.prototype.start = function(opts, cb) { opts = opts || {}; var self = this; async.parallel([ function(done) { self.explorers = { btc: {}, bch: {}, }; var coinNetworkPairs = []; _.each(_.values(Constants.COINS), function(coin) { _.each(_.values(Constants.NETWORKS), function(network) { coinNetworkPairs.push({ coin: coin, network: network }); }); }); _.each(coinNetworkPairs, function(pair) { var explorer; if (opts.blockchainExplorers && opts.blockchainExplorers[pair.coin] && opts.blockchainExplorers[pair.coin][pair.network]) { explorer = opts.blockchainExplorers[pair.coin][pair.network]; } else { var config = {} if (opts.blockchainExplorerOpts && opts.blockchainExplorerOpts[pair.coin] && opts.blockchainExplorerOpts[pair.coin][pair.network]) { config = opts.blockchainExplorerOpts[pair.coin][pair.network]; } else { return; } var explorer = new BlockchainExplorer({ provider: config.provider, coin: pair.coin, network: pair.network, url: config.url, userAgent: WalletService.getServiceVersion(), }); } $.checkState(explorer); self._initExplorer(pair.coin, pair.network, explorer); self.explorers[pair.coin][pair.network] = explorer; }); done(); }, function(done) { if (opts.storage) { self.storage = opts.storage; done(); } else { self.storage = new Storage(); self.storage.connect(opts.storageOpts, done); } }, function(done) { self.messageBroker = opts.messageBroker || new MessageBroker(opts.messageBrokerOpts); done(); }, function(done) { self.lock = opts.lock || new Lock(opts.lockOpts); done(); }, ], function(err) { if (err) { log.error(err); } return cb(err); }); }; BlockchainMonitor.prototype._initExplorer = function(coin, network, explorer) { var self = this; var socket = explorer.initSocket(); socket.on('connect', function() { log.info('Connected to ' + explorer.getConnectionInfo()); socket.emit('subscribe', 'inv'); }); socket.on('connect_error', function() { log.error('Error connecting to ' + explorer.getConnectionInfo()); }); socket.on('tx', _.bind(self._handleIncomingTx, self, coin, network)); socket.on('block', _.bind(self._handleNewBlock, self, coin, network)); }; BlockchainMonitor.prototype._handleThirdPartyBroadcasts = function(data, processIt) { var self = this; if (!data || !data.txid) return; self.storage.fetchTxByHash(data.txid, function(err, txp) { if (err) { log.error('Could not fetch tx from the db'); return; } if (!txp || txp.status != 'accepted') return; var walletId = txp.walletId; if (!processIt) { log.info('Detected broadcast ' + data.txid + ' of an accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]'); return setTimeout(self._handleThirdPartyBroadcasts.bind(self, data, true), 20 * 1000); } log.info('Processing accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]'); txp.setBroadcasted(); self.storage.softResetTxHistoryCache(walletId, function() { self.storage.storeTx(self.walletId, txp, function(err) { if (err) log.error('Could not save TX'); var args = { txProposalId: txp.id, txid: data.txid, amount: txp.getTotalAmount(), }; var notification = Notification.create({ type: 'NewOutgoingTxByThirdParty', data: args, walletId: walletId, }); self._storeAndBroadcastNotification(notification); }); }); }); }; BlockchainMonitor.prototype._handleIncomingPayments = function(coin, network, data) { var self = this; if (!data || !data.vout) return; var outs = _.compact(_.map(data.vout, function(v) { var addr = _.keys(v)[0]; return { address: addr, amount: +v[addr] }; })); if (_.isEmpty(outs)) return; async.each(outs, function(out, next) { self.storage.fetchAddressByCoin(coin, out.address, function(err, address) { if (err) { log.error('Could not fetch addresses from the db'); return next(err); } if (!address || address.isChange) return next(); var walletId = address.walletId; log.info('Incoming tx for wallet ' + walletId + ' [' + out.amount + 'sat -> ' + out.address + ']'); var fromTs = Date.now() - 24 * 3600 * 1000; self.storage.fetchNotifications(walletId, null, fromTs, function(err, notifications) { if (err) return next(err); var alreadyNotified = _.any(notifications, function(n) { return n.type == 'NewIncomingTx' && n.data && n.data.txid == data.txid; }); if (alreadyNotified) { log.info('The incoming tx ' + data.txid + ' was already notified'); return next(); } var notification = Notification.create({ type: 'NewIncomingTx', data: { txid: data.txid, address: out.address, amount: out.amount, }, walletId: walletId, }); self.storage.softResetTxHistoryCache(walletId, function() { self._updateAddressesWithBalance(address, function() { self._storeAndBroadcastNotification(notification, next); }); }); }); }); }, function(err) { return; }); }; BlockchainMonitor.prototype._updateAddressesWithBalance = function(address, cb) { var self = this; self.storage.fetchAddressesWithBalance(address.walletId, function(err, result) { if (err) { log.warn('Could not update wallet cache', err); return cb(err); } var addresses = _.map(result,'address'); if (_.indexOf(addresses, address.address) >= 0) { return cb(); } addresses.push(address.address); log.info('Activating address ' + address.address); self.storage.storeAddressesWithBalance(address.walletId, addresses, function(err) { if (err) { log.warn('Could not update wallet cache', err); } return cb(err); }); }); }; BlockchainMonitor.prototype._handleIncomingTx = function(coin, network, data) { this._handleThirdPartyBroadcasts(data); this._handleIncomingPayments(coin, network, data); }; BlockchainMonitor.prototype._notifyNewBlock = function(coin, network, hash) { var self = this; log.info('New ' + network + ' block: ' + hash); var notification = Notification.create({ type: 'NewBlock', walletId: network, // use network name as wallet id for global notifications data: { hash: hash, coin: coin, network: network, }, }); self.storage.softResetAllTxHistoryCache(function() { self._storeAndBroadcastNotification(notification, function(err) { return; }); }); }; BlockchainMonitor.prototype._handleTxConfirmations = function(coin, network, hash) { var self = this; function processTriggeredSubs(subs, cb) { async.each(subs, function(sub) { log.info('New tx confirmation ' + sub.txid); sub.isActive = false; self.storage.storeTxConfirmationSub(sub, function(err) { if (err) return cb(err); var notification = Notification.create({ type: 'TxConfirmation', walletId: sub.walletId, creatorId: sub.copayerId, data: { txid: sub.txid, coin: coin, network: network, // TODO: amount }, }); self._storeAndBroadcastNotification(notification, cb); }); }); }; var explorer = self.explorers[coin][network]; if (!explorer) return; explorer.getTxidsInBlock(hash, function(err, txids) { if (err) { log.error('Could not fetch txids from block ' + hash, err); return; } self.storage.fetchActiveTxConfirmationSubs(null, function(err, subs) { if (err) return; if (_.isEmpty(subs)) return; var indexedSubs = _.indexBy(subs, 'txid'); var triggered = []; _.each(txids, function(txid) { if (indexedSubs[txid]) triggered.push(indexedSubs[txid]); }); processTriggeredSubs(triggered, function(err) { if (err) { log.error('Could not process tx confirmations', err); } return; }); }); }); }; BlockchainMonitor.prototype._handleNewBlock = function(coin, network, hash) { this._notifyNewBlock(coin, network, hash); this._handleTxConfirmations(coin, network, hash); }; BlockchainMonitor.prototype._storeAndBroadcastNotification = function(notification, cb) { var self = this; self.storage.storeNotification(notification.walletId, notification, function() { self.messageBroker.send(notification) if (cb) return cb(); }); }; module.exports = BlockchainMonitor;