Merge pull request #207 from matiu/bug/fix-broadcast

fix broadcast
This commit is contained in:
Matias Alejo Garcia 2014-02-09 20:34:54 -02:00
commit c40cd41bfd
11 changed files with 91 additions and 83 deletions

View File

@ -14,15 +14,15 @@ module.exports.init = function(app, io_ext) {
});
};
module.exports.broadcast_tx = function(tx) {
module.exports.broadcastTx = function(tx) {
if (ios) ios.sockets.in('inv').emit('tx', tx);
};
module.exports.broadcast_block = function(block) {
module.exports.broadcastBlock = function(block) {
if (ios) ios.sockets.in('inv').emit('block', block);
};
module.exports.broadcast_address_tx = function(address, tx) {
module.exports.broadcastAddressTx = function(address, tx) {
if (ios) ios.sockets.in(address).emit(address, tx);
};

View File

@ -5,10 +5,12 @@ require('classtool');
function spec() {
var async = require('async');
var RpcClient = require('bitcore/RpcClient').class();
var BlockDb = require('../../lib/BlockDb').class();
var config = require('../../config/config');
var rpc = new RpcClient(config.bitcoind);
function Status() {
this.bDb = new BlockDb();
}
Status.prototype.getInfo = function(next) {
@ -21,7 +23,7 @@ function spec() {
that.info = info.result;
return cb();
});
}
},
], function (err) {
return next(err);
});
@ -69,7 +71,8 @@ function spec() {
that.bestblockhash = bbh.result;
return cb();
});
}
},
], function (err) {
return next(err);
});
@ -77,27 +80,29 @@ function spec() {
Status.prototype.getLastBlockHash = function(next) {
var that = this;
async.waterfall(
[
function(callback){
rpc.getBlockCount(function(err, bc){
if (err) return callback(err);
callback(null, bc.result);
});
},
function(bc, callback){
rpc.getBlockHash(bc, function(err, bh){
if (err) return callback(err);
callback(null, bh.result);
});
}
],
function (err, result) {
that.lastblockhash = result;
return next();
}
);
that.bDb.getTip(function(err,tip) {
that.syncTipHash = tip;
async.waterfall(
[
function(callback){
rpc.getBlockCount(function(err, bc){
if (err) return callback(err);
callback(null, bc.result);
});
},
function(bc, callback){
rpc.getBlockHash(bc, function(err, bh){
if (err) return callback(err);
callback(null, bh.result);
});
}
],
function (err, result) {
that.lastblockhash = result;
return next();
}
);
});
};
return Status;

View File

@ -40,15 +40,16 @@ var walk = function(path) {
walk(models_path);
var syncOpts = {
};
/**
* p2pSync process
*/
if (!config.disableP2pSync) {
var ps = new PeerSync();
ps.init({
broadcast_txs: true,
broadcast_address_tx: true,
broadcast_blocks: true,
shouldBroadcast: true,
}, function() {
ps.run();
});
@ -63,8 +64,7 @@ if (!config.disableHistoricSync) {
historicSync = new HistoricSync();
historicSync.init({
shouldBroadcast: true,
networkName: config.network
shouldBroadcastSync: true,
}, function(err) {
if (err) {
var txt = 'ABORTED with error: ' + err.message;

View File

@ -40,8 +40,8 @@ function spec(b) {
});
};
// adds a block TIP block. Does not update Next pointer in
// the block prev to the new block.
// adds a block. Does not update Next pointer in
// the block prev to the new block, nor TIP pointer
//
BlockDb.prototype.add = function(b, cb) {
var time_key = TIMESTAMP_PREFIX +
@ -49,7 +49,6 @@ function spec(b) {
return db.batch()
.put(time_key, b.hash)
.put(TIP, b.hash)
.put(MAIN_PREFIX + b.hash, 1)
.put(PREV_PREFIX + b.hash, b.previousblockhash)
.write(cb);
@ -61,6 +60,12 @@ function spec(b) {
});
};
BlockDb.prototype.setTip = function(hash, cb) {
db.put(TIP, hash, function(err) {
return cb(err);
});
};
//mainly for testing
BlockDb.prototype.setPrev = function(hash, prevHash, cb) {
db.put(PREV_PREFIX + hash, prevHash, function(err) {

View File

@ -97,6 +97,7 @@ function spec() {
skippedBlocks: this.skippedBlocks,
syncedBlocks: this.syncedBlocks,
orphanBlocks: this.orphanBlocks,
syncTipHash: this.sync.tip,
error: this.error,
type: this.type,
};
@ -116,7 +117,7 @@ function spec() {
p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks));
}
if (self.opts.shouldBroadcast) {
if (self.opts.shouldBroadcastSync) {
sockets.broadcastSyncInfo(self.info());
}

View File

@ -17,16 +17,12 @@ function spec() {
PeerSync.prototype.init = function(opts, cb) {
if (!opts) opts = {};
var networkName = opts && (opts.network || 'testnet');
var network = networkName === 'testnet' ? networks.testnet : network.livenet;
this.verbose = opts.verbose;
this.peerdb = undefined;
this.sync = new Sync();
this.allowReorgs = false;
this.sync = new Sync();
this.PeerManager = require('bitcore/PeerManager').createClass({
network: network
network: (config.network === 'testnet' ? networks.testnet : networks.livenet)
});
this.peerman = new this.PeerManager();
this.load_peers();
@ -44,23 +40,18 @@ function spec() {
fs.writeFileSync(peerdb_fn, JSON.stringify(this.peerdb));
};
PeerSync.prototype.handle_inv = function(info) {
var self = this;
PeerSync.prototype.handleInv = function(info) {
var invs = info.message.invs;
invs.forEach(function(inv) {
if (self.verbose) {
console.log('[p2p_sync] Handle inv for a ' + CoinConst.MSG.to_str(inv.type));
}
console.log('[p2p_sync] Handle inv for a ' + CoinConst.MSG.to_str(inv.type));
});
// TODO: should limit the invs to objects we haven't seen yet
info.conn.sendGetData(invs);
};
PeerSync.prototype.handle_tx = function(info) {
PeerSync.prototype.handleTx = function(info) {
var tx = info.message.tx.getStandardizedObject();
if (this.verbose) {
console.log('[p2p_sync] Handle tx: ' + tx.hash);
}
console.log('[p2p_sync] Handle tx: ' + tx.hash);
this.sync.storeTxs([tx.hash], function(err) {
if (err) {
@ -69,7 +60,7 @@ function spec() {
});
};
PeerSync.prototype.handle_block = function(info) {
PeerSync.prototype.handleBlock = function(info) {
var self = this;
var block = info.message.block;
var blockHash = coinUtil.formatHashFull(block.calcHash());
@ -95,9 +86,7 @@ function spec() {
PeerSync.prototype.handle_connected = function(data) {
var peerman = data.pm;
var peers_n = peerman.peers.length;
if (this.verbose) {
console.log('[p2p_sync] Connected to ' + peers_n + ' peer' + (peers_n !== 1 ? 's': ''));
}
console.log('[p2p_sync] Connected to ' + peers_n + ' peer' + (peers_n !== 1 ? 's': ''));
};
PeerSync.prototype.run = function() {
@ -109,9 +98,9 @@ function spec() {
});
this.peerman.on('connection', function(conn) {
conn.on('inv', self.handle_inv.bind(self));
conn.on('block', self.handle_block.bind(self));
conn.on('tx', self.handle_tx.bind(self));
conn.on('inv', self.handleInv.bind(self));
conn.on('block', self.handleBlock.bind(self));
conn.on('tx', self.handleTx.bind(self));
});
this.peerman.on('connect', self.handle_connected.bind(self));

View File

@ -124,14 +124,16 @@ function spec() {
},
function(c) {
if (!needReorg) return c();
console.log('NEW TIP: %s NEED REORG (old tip: %s)', b.hash, oldTip);
// TODO should modify updatedTxs and addrs.
self.processReorg(oldTip, oldNext, newPrev, c);
},
function(c) {
self.bDb.setNext(newPrev, b.hash, function(err) {
return c(err);
self.bDb.setTip(b.hash, function(err) {
if (err) return c(err);
self.bDb.setNext(newPrev, b.hash, function(err) {
return c(err);
});
});
}],
function(err) {
@ -245,23 +247,24 @@ function spec() {
Sync.prototype._handleBroadcast = function(hash, updatedTxs, updatedAddrs) {
var self = this;
if (hash && self.opts.broadcast_blocks) {
sockets.broadcast_block({hash: hash});
}
if (updatedTxs && self.opts.broadcast_txs) {
updatedTxs.forEach(function(tx) {
sockets.broadcast_tx(tx);
});
}
if (updatedAddrs && self.opts.broadcast_addresses) {
updatedAddrs.forEach(function(addr, txs){
txs.forEach(function(addr, t){
sockets.broadcast_address_tx(addr, {'txid': t});
if (self.opts.shouldBroadcast) {
if (hash) {
sockets.broadcastBlock(hash);
}
if (updatedTxs) {
updatedTxs.forEach(function(tx) {
sockets.broadcastTx(tx);
});
});
}
if (updatedAddrs ) {
updatedAddrs.forEach(function(addr, txs){
txs.forEach(function(addr, t){
sockets.broadcastAddressTx(addr, t);
});
});
}
}
};

View File

@ -31,10 +31,8 @@ function($scope, $rootScope, $routeParams, $location, Global, Address, getSocket
var socket = getSocket($scope);
socket.emit('subscribe', $routeParams.addrStr);
socket.on($routeParams.addrStr, function(tx) {
console.log('atx ' + tx.txid);
var beep = new Audio('/sound/transaction.mp3');
beep.play();
$rootScope.$broadcast('tx', tx.txid);
console.log('AddressTx event received ' + tx);
$rootScope.$broadcast('tx', tx);
});
$scope.params = $routeParams;

View File

@ -33,7 +33,7 @@ angular.module('insight.system').controller('HeaderController',
};
socket.on('block', function(block) {
var blockHash = block.hash.toString();
var blockHash = block.toString();
console.log('Updated Blocks Height!');
_getBlock(blockHash);
});

View File

@ -32,7 +32,7 @@ angular.module('insight.system').controller('IndexController',
socket.on('tx', function(tx) {
console.log('Transaction received! ' + JSON.stringify(tx));
var txStr = tx.txid.toString();
var txStr = tx.toString();
_getTransaction(txStr, function(res) {
$scope.txs.unshift(res);
if (parseInt($scope.txs.length, 10) >= parseInt(TRANSACTION_DISPLAYED, 10)) {
@ -42,8 +42,9 @@ angular.module('insight.system').controller('IndexController',
});
socket.on('block', function(block) {
var blockHash = block.hash.toString();
console.log('Block received! ' + JSON.stringify(block));
var blockHash = block.toString();
if (parseInt($scope.blocks.length, 10) > parseInt(BLOCKS_DISPLAYED, 10) - 1) {
$scope.blocks.pop();
}

View File

@ -38,6 +38,7 @@
<tr>
<td>Skipped Blocks (previously synced)</td>
<td class="text-right">{{sync.skippedBlocks}}</td>
</tbody>
</table>
@ -80,9 +81,14 @@
<table class="table" style="table-layout: fixed" data-ng-controller="StatusController" data-ng-init="getStatus('LastBlockHash')">
<thead data-ng-include src="'/views/includes/infoStatus.html'"> </thead>
<tr>
<td>Last Block Hash</td>
<td>Last Block Hash (Bitcoind)</td>
<td class="text-right ellipsis"><a href="/block/{{lastblockhash}}">{{lastblockhash}}</a></td>
</tr>
<tr>
<td>Current Blockchain Tip(Insight)</td>
<td class="text-right ellipsis"><a href="/block/{{syncTipHash}}">{{syncTipHash}}</a></td>
</tbody>
</table>
</div> <!-- END OF COL-8 -->