fix broadcast

This commit is contained in:
Matias Alejo Garcia 2014-02-09 19:33:39 -03:00
parent 2eacaf3bf9
commit 87c10a122b
11 changed files with 91 additions and 83 deletions

View File

@ -14,15 +14,15 @@ module.exports.init = function(app, io_ext) {
}); });
}; };
module.exports.broadcast_tx = function(tx) { module.exports.broadcastTx = function(tx) {
if (ios) ios.sockets.in('inv').emit('tx', tx); if (ios) ios.sockets.in('inv').emit('tx', tx);
}; };
module.exports.broadcast_block = function(block) { module.exports.broadcastBlock = function(block) {
if (ios) ios.sockets.in('inv').emit('block', block); if (ios) ios.sockets.in('inv').emit('block', block);
}; };
module.exports.broadcast_address_tx = function(address, tx) { module.exports.broadcastAddressTx = function(address, tx) {
if (ios) ios.sockets.in(address).emit(address, tx); if (ios) ios.sockets.in(address).emit(address, tx);
}; };

View File

@ -5,10 +5,12 @@ require('classtool');
function spec() { function spec() {
var async = require('async'); var async = require('async');
var RpcClient = require('bitcore/RpcClient').class(); var RpcClient = require('bitcore/RpcClient').class();
var BlockDb = require('../../lib/BlockDb').class();
var config = require('../../config/config'); var config = require('../../config/config');
var rpc = new RpcClient(config.bitcoind); var rpc = new RpcClient(config.bitcoind);
function Status() { function Status() {
this.bDb = new BlockDb();
} }
Status.prototype.getInfo = function(next) { Status.prototype.getInfo = function(next) {
@ -21,7 +23,7 @@ function spec() {
that.info = info.result; that.info = info.result;
return cb(); return cb();
}); });
} },
], function (err) { ], function (err) {
return next(err); return next(err);
}); });
@ -69,7 +71,8 @@ function spec() {
that.bestblockhash = bbh.result; that.bestblockhash = bbh.result;
return cb(); return cb();
}); });
} },
], function (err) { ], function (err) {
return next(err); return next(err);
}); });
@ -77,27 +80,29 @@ function spec() {
Status.prototype.getLastBlockHash = function(next) { Status.prototype.getLastBlockHash = function(next) {
var that = this; var that = this;
that.bDb.getTip(function(err,tip) {
async.waterfall( that.syncTipHash = tip;
[ async.waterfall(
function(callback){ [
rpc.getBlockCount(function(err, bc){ function(callback){
if (err) return callback(err); rpc.getBlockCount(function(err, bc){
callback(null, bc.result); if (err) return callback(err);
}); callback(null, bc.result);
}, });
function(bc, callback){ },
rpc.getBlockHash(bc, function(err, bh){ function(bc, callback){
if (err) return callback(err); rpc.getBlockHash(bc, function(err, bh){
callback(null, bh.result); if (err) return callback(err);
}); callback(null, bh.result);
} });
], }
function (err, result) { ],
that.lastblockhash = result; function (err, result) {
return next(); that.lastblockhash = result;
} return next();
); }
);
});
}; };
return Status; return Status;

View File

@ -40,15 +40,16 @@ var walk = function(path) {
walk(models_path); walk(models_path);
var syncOpts = {
};
/** /**
* p2pSync process * p2pSync process
*/ */
if (!config.disableP2pSync) { if (!config.disableP2pSync) {
var ps = new PeerSync(); var ps = new PeerSync();
ps.init({ ps.init({
broadcast_txs: true, shouldBroadcast: true,
broadcast_address_tx: true,
broadcast_blocks: true,
}, function() { }, function() {
ps.run(); ps.run();
}); });
@ -63,8 +64,7 @@ if (!config.disableHistoricSync) {
historicSync = new HistoricSync(); historicSync = new HistoricSync();
historicSync.init({ historicSync.init({
shouldBroadcast: true, shouldBroadcastSync: true,
networkName: config.network
}, function(err) { }, function(err) {
if (err) { if (err) {
var txt = 'ABORTED with error: ' + err.message; var txt = 'ABORTED with error: ' + err.message;

View File

@ -40,8 +40,8 @@ function spec(b) {
}); });
}; };
// adds a block TIP block. Does not update Next pointer in // adds a block. Does not update Next pointer in
// the block prev to the new block. // the block prev to the new block, nor TIP pointer
// //
BlockDb.prototype.add = function(b, cb) { BlockDb.prototype.add = function(b, cb) {
var time_key = TIMESTAMP_PREFIX + var time_key = TIMESTAMP_PREFIX +
@ -49,7 +49,6 @@ function spec(b) {
return db.batch() return db.batch()
.put(time_key, b.hash) .put(time_key, b.hash)
.put(TIP, b.hash)
.put(MAIN_PREFIX + b.hash, 1) .put(MAIN_PREFIX + b.hash, 1)
.put(PREV_PREFIX + b.hash, b.previousblockhash) .put(PREV_PREFIX + b.hash, b.previousblockhash)
.write(cb); .write(cb);
@ -61,6 +60,12 @@ function spec(b) {
}); });
}; };
BlockDb.prototype.setTip = function(hash, cb) {
db.put(TIP, hash, function(err) {
return cb(err);
});
};
//mainly for testing //mainly for testing
BlockDb.prototype.setPrev = function(hash, prevHash, cb) { BlockDb.prototype.setPrev = function(hash, prevHash, cb) {
db.put(PREV_PREFIX + hash, prevHash, function(err) { db.put(PREV_PREFIX + hash, prevHash, function(err) {

View File

@ -97,6 +97,7 @@ function spec() {
skippedBlocks: this.skippedBlocks, skippedBlocks: this.skippedBlocks,
syncedBlocks: this.syncedBlocks, syncedBlocks: this.syncedBlocks,
orphanBlocks: this.orphanBlocks, orphanBlocks: this.orphanBlocks,
syncTipHash: this.sync.tip,
error: this.error, error: this.error,
type: this.type, type: this.type,
}; };
@ -116,7 +117,7 @@ function spec() {
p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks)); p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks));
} }
if (self.opts.shouldBroadcast) { if (self.opts.shouldBroadcastSync) {
sockets.broadcastSyncInfo(self.info()); sockets.broadcastSyncInfo(self.info());
} }

View File

@ -17,16 +17,12 @@ function spec() {
PeerSync.prototype.init = function(opts, cb) { PeerSync.prototype.init = function(opts, cb) {
if (!opts) opts = {}; if (!opts) opts = {};
var networkName = opts && (opts.network || 'testnet');
var network = networkName === 'testnet' ? networks.testnet : network.livenet;
this.verbose = opts.verbose;
this.peerdb = undefined; this.peerdb = undefined;
this.sync = new Sync();
this.allowReorgs = false; this.allowReorgs = false;
this.sync = new Sync();
this.PeerManager = require('bitcore/PeerManager').createClass({ this.PeerManager = require('bitcore/PeerManager').createClass({
network: network network: (config.network === 'testnet' ? networks.testnet : networks.livenet)
}); });
this.peerman = new this.PeerManager(); this.peerman = new this.PeerManager();
this.load_peers(); this.load_peers();
@ -44,23 +40,18 @@ function spec() {
fs.writeFileSync(peerdb_fn, JSON.stringify(this.peerdb)); fs.writeFileSync(peerdb_fn, JSON.stringify(this.peerdb));
}; };
PeerSync.prototype.handle_inv = function(info) { PeerSync.prototype.handleInv = function(info) {
var self = this;
var invs = info.message.invs; var invs = info.message.invs;
invs.forEach(function(inv) { invs.forEach(function(inv) {
if (self.verbose) { console.log('[p2p_sync] Handle inv for a ' + CoinConst.MSG.to_str(inv.type));
console.log('[p2p_sync] Handle inv for a ' + CoinConst.MSG.to_str(inv.type));
}
}); });
// TODO: should limit the invs to objects we haven't seen yet // TODO: should limit the invs to objects we haven't seen yet
info.conn.sendGetData(invs); info.conn.sendGetData(invs);
}; };
PeerSync.prototype.handle_tx = function(info) { PeerSync.prototype.handleTx = function(info) {
var tx = info.message.tx.getStandardizedObject(); var tx = info.message.tx.getStandardizedObject();
if (this.verbose) { console.log('[p2p_sync] Handle tx: ' + tx.hash);
console.log('[p2p_sync] Handle tx: ' + tx.hash);
}
this.sync.storeTxs([tx.hash], function(err) { this.sync.storeTxs([tx.hash], function(err) {
if (err) { if (err) {
@ -69,7 +60,7 @@ function spec() {
}); });
}; };
PeerSync.prototype.handle_block = function(info) { PeerSync.prototype.handleBlock = function(info) {
var self = this; var self = this;
var block = info.message.block; var block = info.message.block;
var blockHash = coinUtil.formatHashFull(block.calcHash()); var blockHash = coinUtil.formatHashFull(block.calcHash());
@ -95,9 +86,7 @@ function spec() {
PeerSync.prototype.handle_connected = function(data) { PeerSync.prototype.handle_connected = function(data) {
var peerman = data.pm; var peerman = data.pm;
var peers_n = peerman.peers.length; var peers_n = peerman.peers.length;
if (this.verbose) { console.log('[p2p_sync] Connected to ' + peers_n + ' peer' + (peers_n !== 1 ? 's': ''));
console.log('[p2p_sync] Connected to ' + peers_n + ' peer' + (peers_n !== 1 ? 's': ''));
}
}; };
PeerSync.prototype.run = function() { PeerSync.prototype.run = function() {
@ -109,9 +98,9 @@ function spec() {
}); });
this.peerman.on('connection', function(conn) { this.peerman.on('connection', function(conn) {
conn.on('inv', self.handle_inv.bind(self)); conn.on('inv', self.handleInv.bind(self));
conn.on('block', self.handle_block.bind(self)); conn.on('block', self.handleBlock.bind(self));
conn.on('tx', self.handle_tx.bind(self)); conn.on('tx', self.handleTx.bind(self));
}); });
this.peerman.on('connect', self.handle_connected.bind(self)); this.peerman.on('connect', self.handle_connected.bind(self));

View File

@ -124,14 +124,16 @@ function spec() {
}, },
function(c) { function(c) {
if (!needReorg) return c(); if (!needReorg) return c();
console.log('NEW TIP: %s NEED REORG (old tip: %s)', b.hash, oldTip); console.log('NEW TIP: %s NEED REORG (old tip: %s)', b.hash, oldTip);
// TODO should modify updatedTxs and addrs. // TODO should modify updatedTxs and addrs.
self.processReorg(oldTip, oldNext, newPrev, c); self.processReorg(oldTip, oldNext, newPrev, c);
}, },
function(c) { function(c) {
self.bDb.setNext(newPrev, b.hash, function(err) { self.bDb.setTip(b.hash, function(err) {
return c(err); if (err) return c(err);
self.bDb.setNext(newPrev, b.hash, function(err) {
return c(err);
});
}); });
}], }],
function(err) { function(err) {
@ -245,23 +247,24 @@ function spec() {
Sync.prototype._handleBroadcast = function(hash, updatedTxs, updatedAddrs) { Sync.prototype._handleBroadcast = function(hash, updatedTxs, updatedAddrs) {
var self = this; var self = this;
if (hash && self.opts.broadcast_blocks) { if (self.opts.shouldBroadcast) {
sockets.broadcast_block({hash: hash}); if (hash) {
} sockets.broadcastBlock(hash);
}
if (updatedTxs && self.opts.broadcast_txs) {
updatedTxs.forEach(function(tx) {
sockets.broadcast_tx(tx);
});
}
if (updatedAddrs && self.opts.broadcast_addresses) {
updatedAddrs.forEach(function(addr, txs){
txs.forEach(function(addr, t){
sockets.broadcast_address_tx(addr, {'txid': t});
if (updatedTxs) {
updatedTxs.forEach(function(tx) {
sockets.broadcastTx(tx);
}); });
}); }
if (updatedAddrs ) {
updatedAddrs.forEach(function(addr, txs){
txs.forEach(function(addr, t){
sockets.broadcastAddressTx(addr, t);
});
});
}
} }
}; };

View File

@ -31,10 +31,8 @@ function($scope, $rootScope, $routeParams, $location, Global, Address, getSocket
var socket = getSocket($scope); var socket = getSocket($scope);
socket.emit('subscribe', $routeParams.addrStr); socket.emit('subscribe', $routeParams.addrStr);
socket.on($routeParams.addrStr, function(tx) { socket.on($routeParams.addrStr, function(tx) {
console.log('atx ' + tx.txid); console.log('AddressTx event received ' + tx);
var beep = new Audio('/sound/transaction.mp3'); $rootScope.$broadcast('tx', tx);
beep.play();
$rootScope.$broadcast('tx', tx.txid);
}); });
$scope.params = $routeParams; $scope.params = $routeParams;

View File

@ -33,7 +33,7 @@ angular.module('insight.system').controller('HeaderController',
}; };
socket.on('block', function(block) { socket.on('block', function(block) {
var blockHash = block.hash.toString(); var blockHash = block.toString();
console.log('Updated Blocks Height!'); console.log('Updated Blocks Height!');
_getBlock(blockHash); _getBlock(blockHash);
}); });

View File

@ -32,7 +32,7 @@ angular.module('insight.system').controller('IndexController',
socket.on('tx', function(tx) { socket.on('tx', function(tx) {
console.log('Transaction received! ' + JSON.stringify(tx)); console.log('Transaction received! ' + JSON.stringify(tx));
var txStr = tx.txid.toString(); var txStr = tx.toString();
_getTransaction(txStr, function(res) { _getTransaction(txStr, function(res) {
$scope.txs.unshift(res); $scope.txs.unshift(res);
if (parseInt($scope.txs.length, 10) >= parseInt(TRANSACTION_DISPLAYED, 10)) { if (parseInt($scope.txs.length, 10) >= parseInt(TRANSACTION_DISPLAYED, 10)) {
@ -42,8 +42,9 @@ angular.module('insight.system').controller('IndexController',
}); });
socket.on('block', function(block) { socket.on('block', function(block) {
var blockHash = block.hash.toString();
console.log('Block received! ' + JSON.stringify(block)); console.log('Block received! ' + JSON.stringify(block));
var blockHash = block.toString();
if (parseInt($scope.blocks.length, 10) > parseInt(BLOCKS_DISPLAYED, 10) - 1) { if (parseInt($scope.blocks.length, 10) > parseInt(BLOCKS_DISPLAYED, 10) - 1) {
$scope.blocks.pop(); $scope.blocks.pop();
} }

View File

@ -38,6 +38,7 @@
<tr> <tr>
<td>Skipped Blocks (previously synced)</td> <td>Skipped Blocks (previously synced)</td>
<td class="text-right">{{sync.skippedBlocks}}</td> <td class="text-right">{{sync.skippedBlocks}}</td>
</tbody> </tbody>
</table> </table>
@ -80,9 +81,14 @@
<table class="table" style="table-layout: fixed" data-ng-controller="StatusController" data-ng-init="getStatus('LastBlockHash')"> <table class="table" style="table-layout: fixed" data-ng-controller="StatusController" data-ng-init="getStatus('LastBlockHash')">
<thead data-ng-include src="'/views/includes/infoStatus.html'"> </thead> <thead data-ng-include src="'/views/includes/infoStatus.html'"> </thead>
<tr> <tr>
<td>Last Block Hash</td> <td>Last Block Hash (Bitcoind)</td>
<td class="text-right ellipsis"><a href="/block/{{lastblockhash}}">{{lastblockhash}}</a></td> <td class="text-right ellipsis"><a href="/block/{{lastblockhash}}">{{lastblockhash}}</a></td>
</tr> </tr>
<tr>
<td>Current Blockchain Tip(Insight)</td>
<td class="text-right ellipsis"><a href="/block/{{syncTipHash}}">{{syncTipHash}}</a></td>
</tbody> </tbody>
</table> </table>
</div> <!-- END OF COL-8 --> </div> <!-- END OF COL-8 -->