From 87c10a122bef4e155962d6d83b2b89baab337c29 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Sun, 9 Feb 2014 19:33:39 -0300 Subject: [PATCH] fix broadcast --- app/controllers/socket.js | 6 ++-- app/models/Status.js | 51 ++++++++++++++++++-------------- insight.js | 10 +++---- lib/BlockDb.js | 11 +++++-- lib/HistoricSync.js | 3 +- lib/PeerSync.js | 33 +++++++-------------- lib/Sync.js | 39 +++++++++++++----------- public/js/controllers/address.js | 6 ++-- public/js/controllers/header.js | 2 +- public/js/controllers/index.js | 5 ++-- public/views/status.html | 8 ++++- 11 files changed, 91 insertions(+), 83 deletions(-) diff --git a/app/controllers/socket.js b/app/controllers/socket.js index 6f37f5f..ec9ba60 100644 --- a/app/controllers/socket.js +++ b/app/controllers/socket.js @@ -14,15 +14,15 @@ module.exports.init = function(app, io_ext) { }); }; -module.exports.broadcast_tx = function(tx) { +module.exports.broadcastTx = function(tx) { if (ios) ios.sockets.in('inv').emit('tx', tx); }; -module.exports.broadcast_block = function(block) { +module.exports.broadcastBlock = function(block) { if (ios) ios.sockets.in('inv').emit('block', block); }; -module.exports.broadcast_address_tx = function(address, tx) { +module.exports.broadcastAddressTx = function(address, tx) { if (ios) ios.sockets.in(address).emit(address, tx); }; diff --git a/app/models/Status.js b/app/models/Status.js index 517b51c..106c309 100644 --- a/app/models/Status.js +++ b/app/models/Status.js @@ -5,10 +5,12 @@ require('classtool'); function spec() { var async = require('async'); var RpcClient = require('bitcore/RpcClient').class(); + var BlockDb = require('../../lib/BlockDb').class(); var config = require('../../config/config'); var rpc = new RpcClient(config.bitcoind); function Status() { + this.bDb = new BlockDb(); } Status.prototype.getInfo = function(next) { @@ -21,7 +23,7 @@ function spec() { that.info = info.result; return cb(); }); - } + }, ], function (err) { return next(err); }); @@ -69,7 +71,8 @@ function spec() { that.bestblockhash = bbh.result; return cb(); }); - } + }, + ], function (err) { return next(err); }); @@ -77,27 +80,29 @@ function spec() { Status.prototype.getLastBlockHash = function(next) { var that = this; - - async.waterfall( - [ - function(callback){ - rpc.getBlockCount(function(err, bc){ - if (err) return callback(err); - callback(null, bc.result); - }); - }, - function(bc, callback){ - rpc.getBlockHash(bc, function(err, bh){ - if (err) return callback(err); - callback(null, bh.result); - }); - } - ], - function (err, result) { - that.lastblockhash = result; - return next(); - } - ); + that.bDb.getTip(function(err,tip) { + that.syncTipHash = tip; + async.waterfall( + [ + function(callback){ + rpc.getBlockCount(function(err, bc){ + if (err) return callback(err); + callback(null, bc.result); + }); + }, + function(bc, callback){ + rpc.getBlockHash(bc, function(err, bh){ + if (err) return callback(err); + callback(null, bh.result); + }); + } + ], + function (err, result) { + that.lastblockhash = result; + return next(); + } + ); + }); }; return Status; diff --git a/insight.js b/insight.js index 0b811f7..b37690b 100644 --- a/insight.js +++ b/insight.js @@ -40,15 +40,16 @@ var walk = function(path) { walk(models_path); +var syncOpts = { +}; + /** * p2pSync process */ if (!config.disableP2pSync) { var ps = new PeerSync(); ps.init({ - broadcast_txs: true, - broadcast_address_tx: true, - broadcast_blocks: true, + shouldBroadcast: true, }, function() { ps.run(); }); @@ -63,8 +64,7 @@ if (!config.disableHistoricSync) { historicSync = new HistoricSync(); historicSync.init({ - shouldBroadcast: true, - networkName: config.network + shouldBroadcastSync: true, }, function(err) { if (err) { var txt = 'ABORTED with error: ' + err.message; diff --git a/lib/BlockDb.js b/lib/BlockDb.js index 97e2a32..c83cd4b 100644 --- a/lib/BlockDb.js +++ b/lib/BlockDb.js @@ -40,8 +40,8 @@ function spec(b) { }); }; - // adds a block TIP block. Does not update Next pointer in - // the block prev to the new block. + // adds a block. Does not update Next pointer in + // the block prev to the new block, nor TIP pointer // BlockDb.prototype.add = function(b, cb) { var time_key = TIMESTAMP_PREFIX + @@ -49,7 +49,6 @@ function spec(b) { return db.batch() .put(time_key, b.hash) - .put(TIP, b.hash) .put(MAIN_PREFIX + b.hash, 1) .put(PREV_PREFIX + b.hash, b.previousblockhash) .write(cb); @@ -61,6 +60,12 @@ function spec(b) { }); }; + BlockDb.prototype.setTip = function(hash, cb) { + db.put(TIP, hash, function(err) { + return cb(err); + }); + }; + //mainly for testing BlockDb.prototype.setPrev = function(hash, prevHash, cb) { db.put(PREV_PREFIX + hash, prevHash, function(err) { diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js index 6e477e3..800587f 100644 --- a/lib/HistoricSync.js +++ b/lib/HistoricSync.js @@ -97,6 +97,7 @@ function spec() { skippedBlocks: this.skippedBlocks, syncedBlocks: this.syncedBlocks, orphanBlocks: this.orphanBlocks, + syncTipHash: this.sync.tip, error: this.error, type: this.type, }; @@ -116,7 +117,7 @@ function spec() { p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks)); } - if (self.opts.shouldBroadcast) { + if (self.opts.shouldBroadcastSync) { sockets.broadcastSyncInfo(self.info()); } diff --git a/lib/PeerSync.js b/lib/PeerSync.js index cb09b37..dab7094 100644 --- a/lib/PeerSync.js +++ b/lib/PeerSync.js @@ -17,16 +17,12 @@ function spec() { PeerSync.prototype.init = function(opts, cb) { if (!opts) opts = {}; - var networkName = opts && (opts.network || 'testnet'); - var network = networkName === 'testnet' ? networks.testnet : network.livenet; - - this.verbose = opts.verbose; this.peerdb = undefined; - this.sync = new Sync(); this.allowReorgs = false; + this.sync = new Sync(); this.PeerManager = require('bitcore/PeerManager').createClass({ - network: network + network: (config.network === 'testnet' ? networks.testnet : networks.livenet) }); this.peerman = new this.PeerManager(); this.load_peers(); @@ -44,23 +40,18 @@ function spec() { fs.writeFileSync(peerdb_fn, JSON.stringify(this.peerdb)); }; - PeerSync.prototype.handle_inv = function(info) { - var self = this; + PeerSync.prototype.handleInv = function(info) { var invs = info.message.invs; invs.forEach(function(inv) { - if (self.verbose) { - console.log('[p2p_sync] Handle inv for a ' + CoinConst.MSG.to_str(inv.type)); - } + console.log('[p2p_sync] Handle inv for a ' + CoinConst.MSG.to_str(inv.type)); }); // TODO: should limit the invs to objects we haven't seen yet info.conn.sendGetData(invs); }; - PeerSync.prototype.handle_tx = function(info) { + PeerSync.prototype.handleTx = function(info) { var tx = info.message.tx.getStandardizedObject(); - if (this.verbose) { - console.log('[p2p_sync] Handle tx: ' + tx.hash); - } + console.log('[p2p_sync] Handle tx: ' + tx.hash); this.sync.storeTxs([tx.hash], function(err) { if (err) { @@ -69,7 +60,7 @@ function spec() { }); }; - PeerSync.prototype.handle_block = function(info) { + PeerSync.prototype.handleBlock = function(info) { var self = this; var block = info.message.block; var blockHash = coinUtil.formatHashFull(block.calcHash()); @@ -95,9 +86,7 @@ function spec() { PeerSync.prototype.handle_connected = function(data) { var peerman = data.pm; var peers_n = peerman.peers.length; - if (this.verbose) { - console.log('[p2p_sync] Connected to ' + peers_n + ' peer' + (peers_n !== 1 ? 's': '')); - } + console.log('[p2p_sync] Connected to ' + peers_n + ' peer' + (peers_n !== 1 ? 's': '')); }; PeerSync.prototype.run = function() { @@ -109,9 +98,9 @@ function spec() { }); this.peerman.on('connection', function(conn) { - conn.on('inv', self.handle_inv.bind(self)); - conn.on('block', self.handle_block.bind(self)); - conn.on('tx', self.handle_tx.bind(self)); + conn.on('inv', self.handleInv.bind(self)); + conn.on('block', self.handleBlock.bind(self)); + conn.on('tx', self.handleTx.bind(self)); }); this.peerman.on('connect', self.handle_connected.bind(self)); diff --git a/lib/Sync.js b/lib/Sync.js index a90623f..5e4fbbc 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -124,14 +124,16 @@ function spec() { }, function(c) { if (!needReorg) return c(); - console.log('NEW TIP: %s NEED REORG (old tip: %s)', b.hash, oldTip); // TODO should modify updatedTxs and addrs. self.processReorg(oldTip, oldNext, newPrev, c); }, function(c) { - self.bDb.setNext(newPrev, b.hash, function(err) { - return c(err); + self.bDb.setTip(b.hash, function(err) { + if (err) return c(err); + self.bDb.setNext(newPrev, b.hash, function(err) { + return c(err); + }); }); }], function(err) { @@ -245,23 +247,24 @@ function spec() { Sync.prototype._handleBroadcast = function(hash, updatedTxs, updatedAddrs) { var self = this; - if (hash && self.opts.broadcast_blocks) { - sockets.broadcast_block({hash: hash}); - } - - if (updatedTxs && self.opts.broadcast_txs) { - updatedTxs.forEach(function(tx) { - sockets.broadcast_tx(tx); - }); - } - - if (updatedAddrs && self.opts.broadcast_addresses) { - updatedAddrs.forEach(function(addr, txs){ - txs.forEach(function(addr, t){ - sockets.broadcast_address_tx(addr, {'txid': t}); + if (self.opts.shouldBroadcast) { + if (hash) { + sockets.broadcastBlock(hash); + } + if (updatedTxs) { + updatedTxs.forEach(function(tx) { + sockets.broadcastTx(tx); }); - }); + } + + if (updatedAddrs ) { + updatedAddrs.forEach(function(addr, txs){ + txs.forEach(function(addr, t){ + sockets.broadcastAddressTx(addr, t); + }); + }); + } } }; diff --git a/public/js/controllers/address.js b/public/js/controllers/address.js index 4ae129b..89780ee 100644 --- a/public/js/controllers/address.js +++ b/public/js/controllers/address.js @@ -31,10 +31,8 @@ function($scope, $rootScope, $routeParams, $location, Global, Address, getSocket var socket = getSocket($scope); socket.emit('subscribe', $routeParams.addrStr); socket.on($routeParams.addrStr, function(tx) { - console.log('atx ' + tx.txid); - var beep = new Audio('/sound/transaction.mp3'); - beep.play(); - $rootScope.$broadcast('tx', tx.txid); + console.log('AddressTx event received ' + tx); + $rootScope.$broadcast('tx', tx); }); $scope.params = $routeParams; diff --git a/public/js/controllers/header.js b/public/js/controllers/header.js index 7147272..3837bbc 100755 --- a/public/js/controllers/header.js +++ b/public/js/controllers/header.js @@ -33,7 +33,7 @@ angular.module('insight.system').controller('HeaderController', }; socket.on('block', function(block) { - var blockHash = block.hash.toString(); + var blockHash = block.toString(); console.log('Updated Blocks Height!'); _getBlock(blockHash); }); diff --git a/public/js/controllers/index.js b/public/js/controllers/index.js index 71e432a..687c491 100755 --- a/public/js/controllers/index.js +++ b/public/js/controllers/index.js @@ -32,7 +32,7 @@ angular.module('insight.system').controller('IndexController', socket.on('tx', function(tx) { console.log('Transaction received! ' + JSON.stringify(tx)); - var txStr = tx.txid.toString(); + var txStr = tx.toString(); _getTransaction(txStr, function(res) { $scope.txs.unshift(res); if (parseInt($scope.txs.length, 10) >= parseInt(TRANSACTION_DISPLAYED, 10)) { @@ -42,8 +42,9 @@ angular.module('insight.system').controller('IndexController', }); socket.on('block', function(block) { - var blockHash = block.hash.toString(); console.log('Block received! ' + JSON.stringify(block)); + + var blockHash = block.toString(); if (parseInt($scope.blocks.length, 10) > parseInt(BLOCKS_DISPLAYED, 10) - 1) { $scope.blocks.pop(); } diff --git a/public/views/status.html b/public/views/status.html index 0cf5060..7f7f4e2 100644 --- a/public/views/status.html +++ b/public/views/status.html @@ -38,6 +38,7 @@ Skipped Blocks (previously synced) {{sync.skippedBlocks}} + @@ -80,9 +81,14 @@ - + + + + + +
Last Block HashLast Block Hash (Bitcoind) {{lastblockhash}}
Current Blockchain Tip(Insight){{syncTipHash}}