broadcast blockchain events
This commit is contained in:
parent
756b82b370
commit
ae0114b17b
|
@ -0,0 +1,15 @@
|
||||||
|
#!/usr/bin/env node
|
||||||
|
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
var log = require('npmlog');
|
||||||
|
log.debug = log.verbose;
|
||||||
|
|
||||||
|
var config = require('../config');
|
||||||
|
var BlockchainExplorer = require('../lib/blockchainmonitor');
|
||||||
|
|
||||||
|
BlockchainExplorer.start(config, function(err) {
|
||||||
|
if (err) throw err;
|
||||||
|
|
||||||
|
console.log('Blockchain monitor started');
|
||||||
|
});
|
|
@ -5,100 +5,97 @@ var _ = require('lodash');
|
||||||
var async = require('async');
|
var async = require('async');
|
||||||
var log = require('npmlog');
|
var log = require('npmlog');
|
||||||
log.debug = log.verbose;
|
log.debug = log.verbose;
|
||||||
var Uuid = require('uuid');
|
|
||||||
var inherits = require('inherits');
|
|
||||||
var events = require('events');
|
|
||||||
var nodeutil = require('util');
|
|
||||||
|
|
||||||
var WalletUtils = require('bitcore-wallet-utils');
|
|
||||||
var Bitcore = WalletUtils.Bitcore;
|
|
||||||
var WalletService = require('./server');
|
|
||||||
var BlockchainExplorer = require('./blockchainexplorer');
|
var BlockchainExplorer = require('./blockchainexplorer');
|
||||||
|
var Storage = require('./storage');
|
||||||
|
var MessageBroker = require('./messagebroker');
|
||||||
|
|
||||||
var Notification = require('./model/notification');
|
var Notification = require('./model/notification');
|
||||||
|
|
||||||
function BlockchainMonitor(opts) {
|
var storage, messageBroker;
|
||||||
|
|
||||||
|
function BlockchainMonitor() {};
|
||||||
|
|
||||||
|
BlockchainMonitor.start = function(opts, cb) {
|
||||||
opts = opts || {};
|
opts = opts || {};
|
||||||
var self = this;
|
$.checkArgument(opts.blockchainExplorerOpts);
|
||||||
this.subscriptions = {};
|
$.checkArgument(opts.storageOpts);
|
||||||
this.subscriber = {};
|
|
||||||
|
async.parallel([
|
||||||
|
|
||||||
|
function(done) {
|
||||||
_.each(['livenet', 'testnet'], function(network) {
|
_.each(['livenet', 'testnet'], function(network) {
|
||||||
opts[network] = opts[network] || {};
|
var config = opts.blockchainExplorerOpts[network] || {};
|
||||||
self.subscriber[network] = self._getAddressSubscriber(
|
BlockchainMonitor._initExplorer(config.provider, network, config.url);
|
||||||
opts[network].provider, network, opts[network].url);
|
|
||||||
});
|
});
|
||||||
|
done();
|
||||||
|
},
|
||||||
|
function(done) {
|
||||||
|
storage = new Storage();
|
||||||
|
storage.connect(opts.storageOpts, done);
|
||||||
|
},
|
||||||
|
function(done) {
|
||||||
|
messageBroker = new MessageBroker(opts.messageBrokerOpts);
|
||||||
|
done();
|
||||||
|
},
|
||||||
|
], cb);
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
nodeutil.inherits(BlockchainMonitor, events.EventEmitter);
|
BlockchainMonitor._initExplorer = function(provider, network, url) {
|
||||||
|
|
||||||
BlockchainMonitor.prototype._getAddressSubscriber = function(provider, network) {
|
|
||||||
$.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported');
|
$.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported');
|
||||||
|
|
||||||
var explorer = new BlockchainExplorer({
|
var explorer = new BlockchainExplorer({
|
||||||
provider: provider,
|
provider: provider,
|
||||||
network: network,
|
network: network,
|
||||||
|
url: url,
|
||||||
});
|
});
|
||||||
|
|
||||||
var socket = explorer.initSocket();
|
var socket = explorer.initSocket();
|
||||||
|
socket.emit('subscribe', 'inv');
|
||||||
// TODO: Extract on its own class once more providers are implemented
|
socket.on('tx', BlockchainMonitor._handleIncommingTx);
|
||||||
return {
|
|
||||||
subscribe: function(address, handler) {
|
|
||||||
socket.emit('subscribe', address);
|
|
||||||
socket.on(address, handler);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
BlockchainMonitor.prototype.subscribeAddresses = function(walletId, addresses) {
|
BlockchainMonitor._handleIncommingTx = function(data) {
|
||||||
$.checkArgument(walletId);
|
if (!data || !data.vout) return;
|
||||||
|
|
||||||
var self = this;
|
var outs = _.compact(_.map(data.vout, function(v) {
|
||||||
|
var addr = _.keys(v)[0];
|
||||||
|
if (addr.indexOf('3') != 0 && addr.indexOf('2') != 0) return;
|
||||||
|
|
||||||
if (!addresses || addresses.length == 0) return;
|
return {
|
||||||
|
address: addr,
|
||||||
|
amount: +v[addr]
|
||||||
|
};
|
||||||
|
}));
|
||||||
|
if (_.isEmpty(outs)) return;
|
||||||
|
|
||||||
function handlerFor(address, txid) {
|
async.each(outs, function(out, next) {
|
||||||
var notification = Notification.create({
|
storage.fetchWalletIdByAddress(out.address, function(err, walletId) {
|
||||||
walletId: this,
|
if (err || !walletId) return next(err);
|
||||||
|
|
||||||
|
log.info('Incoming tx for wallet ' + walletId + ' (' + out.address + ' -> ' + out.amount + ')');
|
||||||
|
BlockchainMonitor._createNotification(walletId, data.txid, out.address, out.amount, next);
|
||||||
|
});
|
||||||
|
}, function(err) {
|
||||||
|
return;
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
BlockchainMonitor._createNotification = function(walletId, txid, address, amount, cb) {
|
||||||
|
var n = Notification.create({
|
||||||
type: 'NewIncomingTx',
|
type: 'NewIncomingTx',
|
||||||
data: {
|
data: {
|
||||||
address: address,
|
|
||||||
txid: txid,
|
txid: txid,
|
||||||
|
address: address,
|
||||||
|
amount: amount,
|
||||||
},
|
},
|
||||||
|
walletId: walletId,
|
||||||
});
|
});
|
||||||
self.emit('notification', notification);
|
storage.storeNotification(walletId, n, function() {
|
||||||
};
|
messageBroker.send(n)
|
||||||
|
|
||||||
if (!self.subscriptions[walletId]) {
|
|
||||||
self.subscriptions[walletId] = {
|
|
||||||
addresses: [],
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
var addresses = [].concat(addresses);
|
|
||||||
var network = Bitcore.Address.fromString(addresses[0]).network.name;
|
|
||||||
var subscriber = self.subscriber[network];
|
|
||||||
_.each(addresses, function(address) {
|
|
||||||
self.subscriptions[walletId].addresses.push(address);
|
|
||||||
subscriber.subscribe(address, _.bind(handlerFor, walletId, address));
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
BlockchainMonitor.prototype.subscribeWallet = function(walletService, cb) {
|
|
||||||
var self = this;
|
|
||||||
|
|
||||||
var walletId = walletService.walletId;
|
|
||||||
if (self.subscriptions[walletId]) return;
|
|
||||||
|
|
||||||
walletService.getMainAddresses({}, function(err, addresses) {
|
|
||||||
if (err) {
|
|
||||||
delete self.subscriptions[walletId];
|
|
||||||
return cb(new Error('Could not subscribe to addresses for wallet ' + walletId));
|
|
||||||
}
|
|
||||||
self.subscribeAddresses(walletService.walletId, _.pluck(addresses, 'address'));
|
|
||||||
return cb();
|
return cb();
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
module.exports = BlockchainMonitor;
|
module.exports = BlockchainMonitor;
|
||||||
|
|
|
@ -312,6 +312,20 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Storage.prototype.fetchWalletIdByAddress = function(address, cb) {
|
||||||
|
var self = this;
|
||||||
|
|
||||||
|
this.db.collection(collections.ADDRESSES).findOne({
|
||||||
|
address: address,
|
||||||
|
}, function(err, result) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
if (!result) return cb();
|
||||||
|
|
||||||
|
return cb(null, result.walletId);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
Storage.prototype._dump = function(cb, fn) {
|
Storage.prototype._dump = function(cb, fn) {
|
||||||
fn = fn || console.log;
|
fn = fn || console.log;
|
||||||
cb = cb || function() {};
|
cb = cb || function() {};
|
||||||
|
|
|
@ -9,10 +9,11 @@ var Uuid = require('uuid');
|
||||||
|
|
||||||
var WalletService = require('./server');
|
var WalletService = require('./server');
|
||||||
var MessageBroker = require('./messagebroker');
|
var MessageBroker = require('./messagebroker');
|
||||||
|
var BlockchainMonitor = require('./blockchainmonitor');
|
||||||
|
|
||||||
log.level = 'debug';
|
log.level = 'debug';
|
||||||
|
|
||||||
var io, messageBroker;
|
var io, messageBroker, blockchainMonitor;
|
||||||
|
|
||||||
var WsApp = function() {};
|
var WsApp = function() {};
|
||||||
|
|
||||||
|
|
|
@ -20,4 +20,4 @@ server.on('connection', function(socket) {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
console.log('Message queue server listening on port ' + opts.port)
|
console.log('Message broker server listening on port ' + opts.port)
|
||||||
|
|
Loading…
Reference in New Issue