From ae0114b17b95a307b31dcc8c4ebcce617cf096ca Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 6 May 2015 12:48:43 -0300 Subject: [PATCH] broadcast blockchain events --- bcmonitor/bcmonitor.js | 15 ++++ lib/blockchainmonitor.js | 131 ++++++++++++++--------------- lib/storage.js | 14 +++ lib/wsapp.js | 3 +- messagebroker/bws-messagebroker.js | 2 +- 5 files changed, 96 insertions(+), 69 deletions(-) create mode 100644 bcmonitor/bcmonitor.js diff --git a/bcmonitor/bcmonitor.js b/bcmonitor/bcmonitor.js new file mode 100644 index 0000000..cfd85f6 --- /dev/null +++ b/bcmonitor/bcmonitor.js @@ -0,0 +1,15 @@ +#!/usr/bin/env node + +'use strict'; + +var log = require('npmlog'); +log.debug = log.verbose; + +var config = require('../config'); +var BlockchainExplorer = require('../lib/blockchainmonitor'); + +BlockchainExplorer.start(config, function(err) { + if (err) throw err; + + console.log('Blockchain monitor started'); +}); diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index 0e7f730..481dcf0 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -5,100 +5,97 @@ var _ = require('lodash'); var async = require('async'); var log = require('npmlog'); log.debug = log.verbose; -var Uuid = require('uuid'); -var inherits = require('inherits'); -var events = require('events'); -var nodeutil = require('util'); -var WalletUtils = require('bitcore-wallet-utils'); -var Bitcore = WalletUtils.Bitcore; -var WalletService = require('./server'); var BlockchainExplorer = require('./blockchainexplorer'); +var Storage = require('./storage'); +var MessageBroker = require('./messagebroker'); var Notification = require('./model/notification'); -function BlockchainMonitor(opts) { +var storage, messageBroker; + +function BlockchainMonitor() {}; + +BlockchainMonitor.start = function(opts, cb) { opts = opts || {}; - var self = this; - this.subscriptions = {}; - this.subscriber = {}; - _.each(['livenet', 'testnet'], function(network) { - opts[network] = opts[network] || {}; - self.subscriber[network] = self._getAddressSubscriber( - opts[network].provider, network, opts[network].url); - }); + $.checkArgument(opts.blockchainExplorerOpts); + $.checkArgument(opts.storageOpts); + + async.parallel([ + + function(done) { + _.each(['livenet', 'testnet'], function(network) { + var config = opts.blockchainExplorerOpts[network] || {}; + BlockchainMonitor._initExplorer(config.provider, network, config.url); + }); + done(); + }, + function(done) { + storage = new Storage(); + storage.connect(opts.storageOpts, done); + }, + function(done) { + messageBroker = new MessageBroker(opts.messageBrokerOpts); + done(); + }, + ], cb); + }; -nodeutil.inherits(BlockchainMonitor, events.EventEmitter); - -BlockchainMonitor.prototype._getAddressSubscriber = function(provider, network) { +BlockchainMonitor._initExplorer = function(provider, network, url) { $.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported'); var explorer = new BlockchainExplorer({ provider: provider, network: network, + url: url, }); var socket = explorer.initSocket(); - - // TODO: Extract on its own class once more providers are implemented - return { - subscribe: function(address, handler) { - socket.emit('subscribe', address); - socket.on(address, handler); - }, - }; + socket.emit('subscribe', 'inv'); + socket.on('tx', BlockchainMonitor._handleIncommingTx); }; -BlockchainMonitor.prototype.subscribeAddresses = function(walletId, addresses) { - $.checkArgument(walletId); +BlockchainMonitor._handleIncommingTx = function(data) { + if (!data || !data.vout) return; - var self = this; + var outs = _.compact(_.map(data.vout, function(v) { + var addr = _.keys(v)[0]; + if (addr.indexOf('3') != 0 && addr.indexOf('2') != 0) return; - if (!addresses || addresses.length == 0) return; - - function handlerFor(address, txid) { - var notification = Notification.create({ - walletId: this, - type: 'NewIncomingTx', - data: { - address: address, - txid: txid, - }, - }); - self.emit('notification', notification); - }; - - if (!self.subscriptions[walletId]) { - self.subscriptions[walletId] = { - addresses: [], + return { + address: addr, + amount: +v[addr] }; - }; + })); + if (_.isEmpty(outs)) return; - var addresses = [].concat(addresses); - var network = Bitcore.Address.fromString(addresses[0]).network.name; - var subscriber = self.subscriber[network]; - _.each(addresses, function(address) { - self.subscriptions[walletId].addresses.push(address); - subscriber.subscribe(address, _.bind(handlerFor, walletId, address)); + async.each(outs, function(out, next) { + storage.fetchWalletIdByAddress(out.address, function(err, walletId) { + if (err || !walletId) return next(err); + + log.info('Incoming tx for wallet ' + walletId + ' (' + out.address + ' -> ' + out.amount + ')'); + BlockchainMonitor._createNotification(walletId, data.txid, out.address, out.amount, next); + }); + }, function(err) { + return; }); }; -BlockchainMonitor.prototype.subscribeWallet = function(walletService, cb) { - var self = this; - - var walletId = walletService.walletId; - if (self.subscriptions[walletId]) return; - - walletService.getMainAddresses({}, function(err, addresses) { - if (err) { - delete self.subscriptions[walletId]; - return cb(new Error('Could not subscribe to addresses for wallet ' + walletId)); - } - self.subscribeAddresses(walletService.walletId, _.pluck(addresses, 'address')); +BlockchainMonitor._createNotification = function(walletId, txid, address, amount, cb) { + var n = Notification.create({ + type: 'NewIncomingTx', + data: { + txid: txid, + address: address, + amount: amount, + }, + walletId: walletId, + }); + storage.storeNotification(walletId, n, function() { + messageBroker.send(n) return cb(); }); }; - module.exports = BlockchainMonitor; diff --git a/lib/storage.js b/lib/storage.js index c72a0c9..e7b1160 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -312,6 +312,20 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { }); }; +Storage.prototype.fetchWalletIdByAddress = function(address, cb) { + var self = this; + + this.db.collection(collections.ADDRESSES).findOne({ + address: address, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + + return cb(null, result.walletId); + }); +}; + + Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; cb = cb || function() {}; diff --git a/lib/wsapp.js b/lib/wsapp.js index d4fb026..3f44471 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -9,10 +9,11 @@ var Uuid = require('uuid'); var WalletService = require('./server'); var MessageBroker = require('./messagebroker'); +var BlockchainMonitor = require('./blockchainmonitor'); log.level = 'debug'; -var io, messageBroker; +var io, messageBroker, blockchainMonitor; var WsApp = function() {}; diff --git a/messagebroker/bws-messagebroker.js b/messagebroker/bws-messagebroker.js index 68f43a7..d48dc63 100644 --- a/messagebroker/bws-messagebroker.js +++ b/messagebroker/bws-messagebroker.js @@ -20,4 +20,4 @@ server.on('connection', function(socket) { }); }); -console.log('Message queue server listening on port ' + opts.port) +console.log('Message broker server listening on port ' + opts.port)