diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js new file mode 100644 index 00000000..f3b99f14 --- /dev/null +++ b/lib/HistoricSync.js @@ -0,0 +1,248 @@ +'use strict'; + +require('classtool'); + + +function spec() { + var mongoose = require('mongoose'); + var util = require('util'); + var RpcClient = require('bitcore/RpcClient').class(); + var networks = require('bitcore/networks'); + var async = require('async'); + var config = require('../config/config'); + var Block = require('../app/models/Block'); + var Transaction = require('../app/models/Transaction'); + var TransactionItem = require('../app/models/TransactionItem'); + var Sync = require('./Sync').class(); + var sockets = require('../app/views/sockets/main.js'); + var CONCURRENCY = 5; + + + function HistoricSync(opts) { + this.block_count= 0; + this.block_total= 0; + this.network = config.network === 'testnet' ? networks.testnet: networks.livenet; + this.sync = new Sync(opts); + } + + function p() { + var params = Array.prototype.slice.call(arguments); + + params.unshift('[historic_sync]'); + console.log.apply(this,params); + } + + var progress_bar = function(string, current, total) { + p(util.format('%s %d/%d [%d%%]', string, current, total, parseInt(100 * current / total))); + }; + + HistoricSync.prototype.init = function(opts,cb) { + this.rpc = new RpcClient(config.bitcoind); + this.opts = opts; + this.sync.init(opts, cb); + }; + + HistoricSync.prototype.close = function() { + this.sync.close(); + }; + + HistoricSync.prototype.getPrevNextBlock = function(blockHash, blockEnd, opts, cb) { + + var that = this; + + // recursion end. + if (!blockHash || (blockEnd && blockEnd === blockHash) ) { + return cb(); + } + + var existed = 0; + var blockInfo; + var blockObj; + + async.series([ + // Already got it? + function(c) { + Block.findOne({hash:blockHash}, function(err,block){ + if (err) { p(err); return c(err); }; + if (block) { + existed = 1; + blockObj = block; + } + + return c(); + }); + }, + //show some (inacurate) status + function(c) { + if (that.block_count++ % 1000 === 0) { + progress_bar('sync status:', that.block_count, that.block_total); + } + return c(); + }, + //get Info from RPC + function(c) { + + // TODO: if we store prev/next, no need to go to RPC + // if (blockObj && blockObj.nextBlockHash) return c(); + + that.rpc.getBlock(blockHash, function(err, ret) { + if (err) return c(err); + + blockInfo = ret; + return c(); + }); + }, + //store it + function(c) { + if (existed) return c(); + + that.sync.storeBlock(blockInfo.result, function(err, block) { + existed = err && err.toString().match(/E11000/); + if (err && ! existed) return c(err); + return c(); + }); + }, + /* TODO: Should Start to sync backwards? (this is for partial syncs) + function(c) { + + if (blockInfo.result.prevblockhash != current.blockHash) { + p("reorg?"); + opts.prev = 1; + } + return c(); + } + */ + ], + function (err){ + + if (err) + p('ERROR: @%s: %s [count: block_count: %d]', blockHash, err, that.block_count); + + if (blockInfo && blockInfo.result) { + if (opts.prev && blockInfo.result.previousblockhash) { + return that.getPrevNextBlock(blockInfo.result.previousblockhash, blockEnd, opts, cb); + } + + if (opts.next && blockInfo.result.nextblockhash) + return that.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, opts, cb); + } + return cb(err); + }); + }; + + HistoricSync.prototype.syncBlocks = function(start, end, isForward, cb) { + var that = this; + + p('Syncing Blocks, starting from: %s end: %s isForward:', + start, end, isForward); + + + return that.getPrevNextBlock( start, end, + isForward ? { next: 1 } : { prev: 1}, cb); + }; + + HistoricSync.prototype.do_import_history = function(opts, next) { + var that = this; + + var retry_attemps = 100; + var retry_secs = 2; + + var block_best; + var block_height; + + async.series([ + function(cb) { + if (opts.destroy) { + p('Deleting Blocks...'); + that.db.collections.blocks.drop(cb); + } else { + return cb(); + } + }, + function(cb) { + if (opts.destroy) { + p('Deleting TXs...'); + that.db.collections.transactions.drop(cb); + } else { + return cb(); + } + }, + function(cb) { + if (opts.destroy) { + p('Deleting TXItems...'); + that.db.collections.transactionitems.drop(cb); + } else { + return cb(); + } + }, + function(cb) { + that.rpc.getInfo(function(err, res) { + if (err) cb(err); + + that.block_total = res.result.blocks; + return cb(); + }); + }, + // We are not using getBestBlockHash, because is not available in all clients + function(cb) { + if (!opts.reverse) return cb(); + + that.rpc.getBlockCount(function(err, res) { + if (err) cb(err); + block_height = res.result; + return cb(); + }); + }, + function(cb) { + if (!opts.reverse) return cb(); + + that.rpc.getBlockHash(block_height, function(err, res) { + if (err) cb(err); + + block_best = res.result; + return cb(); + }); + }, + ], + function(err) { + + function sync() { + var start, end, isForward; + + if (opts.reverse) { + start = block_best; + end = that.network.genesisBlock.hash.reverse().toString('hex'); + isForward = false; + } + else { + start = that.network.genesisBlock.hash.reverse().toString('hex'); + end = null; + isForward = true; + } + + that.syncBlocks(start, end, isForward, function(err) { + + if (err && err.message.match(/ECONNREFUSED/) && retry_attemps--){ + setTimeout(function() { + p("Retrying in %d secs ", retry_secs); + sync(); + }, retry_secs * 1000); + } + else + return next(err, that.block_count); + }); + } + sync(); + }); + } + + HistoricSync.prototype.import_history = function(opts, next) { + var that = this; + that.do_import_history(opts, next); + }; + + + return HistoricSync; +} +module.defineClass(spec); + diff --git a/lib/PeerSync.js b/lib/PeerSync.js index fff731b6..f7e51b65 100644 --- a/lib/PeerSync.js +++ b/lib/PeerSync.js @@ -11,23 +11,27 @@ function spec() { var peerdb_fn = 'peerdb.json'; function PeerSync() {} - PeerSync.prototype.init = function(config) { + + + PeerSync.prototype.init = function(config, cb) { + + var that = this; var network = config && (config.network || 'testnet'); - this.peerdb = undefined; - this.sync = new Sync({ + that.peerdb = undefined; + that.sync = new Sync({ networkName: network }); - this.sync.init(config); - - this.PeerManager = require('bitcore/PeerManager').createClass({ - config: { - network: network - } + that.sync.init(config, function() { + that.PeerManager = require('bitcore/PeerManager').createClass({ + config: { + network: network + } + }); + that.load_peers(); + return cb(); }); - this.load_peers(); - }; PeerSync.prototype.load_peers = function() { diff --git a/lib/Sync.js b/lib/Sync.js index 2e425ba1..084a1563 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -7,7 +7,6 @@ function spec() { var mongoose = require('mongoose'); var util = require('util'); var RpcClient = require('bitcore/RpcClient').class(); - var networks = require('bitcore/networks'); var async = require('async'); var config = require('../config/config'); var Block = require('../app/models/Block'); @@ -19,100 +18,8 @@ function spec() { function Sync(config) { this.tx_count = 0; - this.block_count= 0; - this.block_total= 0; - this.network = config.networkName === 'testnet' ? networks.testnet: networks.livenet; } - var progress_bar = function(string, current, total) { - console.log(util.format('\t%s %d/%d [%d%%]', string, current, total, parseInt(100 * current / total))); - }; - - Sync.prototype.getPrevNextBlock = function(blockHash, blockEnd, opts, cb) { - - var that = this; - - // recursion end. - if (!blockHash || (blockEnd && blockEnd == blockHash) ) { - console.log("Reach end:", blockHash, blockEnd); - return cb(); - } - - var existed = 0; - var blockInfo; - var blockObj; - - async.series([ - // Already got it? - function(c) { - Block.findOne({hash:blockHash}, function(err,block){ - if (err) { console.log(err); return c(err); }; - if (block) { - existed = 1; - blockObj = block; - } - - return c(); - }); - }, - //show some (inacurate) status - function(c) { - if (that.block_count++ % 1000 === 0) { - progress_bar('Historic sync status:', that.block_count, that.block_total); - } - return c(); - }, - //get Info from RPC - function(c) { - - // TODO: if we store prev/next, no need to go to RPC - // if (blockObj && blockObj.nextBlockHash) return c(); - - that.rpc.getBlock(blockHash, function(err, ret) { - if (err) return c(err); - - blockInfo = ret; - return c(); - }); - }, - //store it - function(c) { - if (existed) return c(); - - that.storeBlock(blockInfo.result, function(err, block) { - existed = err && err.toString().match(/E11000/); - if (err && ! existed) return c(err); - return c(); - }); - }, - /* TODO: Should Start to sync backwards? (this is for partial syncs) - function(c) { - - if (blockInfo.result.prevblockhash != current.blockHash) { - console.log("reorg?"); - opts.prev = 1; - } - return c(); - } - */ - ], - function (err){ - - if (err) - console.log("ERROR: @%s: %s [count: block_count: %d]", blockHash, err, that.block_count); - - if (blockInfo && blockInfo.result) { - if (opts.prev && blockInfo.result.previousblockhash) { - return that.getPrevNextBlock(blockInfo.result.previousblockhash, blockEnd, opts, cb); - } - - if (opts.next && blockInfo.result.nextblockhash) - return that.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, opts, cb); - } - return cb(err); - }); - }; - Sync.prototype.storeBlock = function(block, cb) { var that = this; @@ -165,231 +72,36 @@ function spec() { isForward ? { next: 1 } : { prev: 1}, cb); }; - // This is not currently used. Transactions are represented by txid only - // in mongodb - Sync.prototype.syncTXs = function(cb) { - + Sync.prototype.init = function(opts, cb) { var that = this; - console.log('Syncing TXs...'); - - Transaction.find({ - blockhash: null - }, - function(err, txs) { - if (err) return cb(err); - - var read = 0; - var pull = 0; - var write = 0; - var total = txs.length; - console.log('\tneed to pull %d txs', total); - - if (!total) return cb(); - - async.each(txs, function(tx, next) { - if (!tx.txid) { - console.log('NO TXID skipping...', tx); - return next(); - } - - if (read++ % 1000 === 0) progress_bar('read', read, total); - - that.rpc.getRawTransaction(tx.txid, 1, function(err, txInfo) { - - if (pull++ % 1000 === 0) progress_bar('\tpull', pull, total); - - if (!err && txInfo) { - Transaction.update({ - txid: tx.txid - }, - txInfo.result, function(err) { - if (err) return next(err); - - if (write++ % 1000 === 0) progress_bar('\t\twrite', write, total); - - return next(); - }); - } - else return next(); - }); - }, - function(err) { - if (err) return cb(err); - return cb(err); - }); - }); - }; - - - // Not used - Sync.prototype.processTXs = function(reindex, cb) { - - var that = this; - - console.log('Syncing TXs...'); - - var filter = reindex ? {} : { processed: false } ; - - Transaction.find(filter, function(err, txs) { - if (err) return cb(err); - - var read = 0, - pull = 0, - proc = 0, - total = txs.length; - - console.log('\tneed to pull %d txs', total); - - if (!total) return cb(); - - - async.forEachLimit(txs, CONCURRENCY, function(tx, next) { - if (read++ % 1000 === 0) progress_bar('read', read, total); - - if (!tx.txid) { - console.log('NO TXID skipping...', tx); - return next(); - } - - // This will trigger an RPC call - Transaction.explodeTransactionItems( tx.txid, tx.time, function(err) { - if (proc++ % 1000 === 0) progress_bar('\tproc', pull, total); - next(err); - }); - }, - cb); - }); - }; - - Sync.prototype.init = function(opts) { - this.rpc = new RpcClient(config.bitcoind); - + that.opts = opts; if (!(opts && opts.skip_db_connection)) { mongoose.connect(config.db, {server: {auto_reconnect: true}} ); - } - this.opts = opts; - this.db = mongoose.connection; - this.db.on('error', function(err) { - console.log('connection error:' + err); - moogose.disconnect(); - }); + this.db = mongoose.connection; - this.db.on('disconnect', function(err) { - console.log('disconnect:' + err); - mongoose.connect(config.db, {server: {auto_reconnect: true}} ); - }); - - - }; - - Sync.prototype.import_history = function(opts, next) { - - var that = this; - - var retry_attemps = 100; - var retry_secs = 2; - - var block_best; - var block_height; - - this.db.once('open', function() { - async.series([ - function(cb) { - if (opts.destroy) { - console.log('Deleting Blocks...'); - that.db.collections.blocks.drop(cb); - } else { - return cb(); - } - }, - function(cb) { - if (opts.destroy) { - console.log('Deleting TXs...'); - that.db.collections.transactions.drop(cb); - } else { - return cb(); - } - }, - function(cb) { - if (opts.destroy) { - console.log('Deleting TXItems...'); - that.db.collections.transactionitems.drop(cb); - } else { - return cb(); - } - }, - function(cb) { - that.rpc.getInfo(function(err, res) { - if (err) cb(err); - - that.block_total = res.result.blocks; - return cb(); - }); - }, - // We are not using getBestBlockHash, because is not available in all clients - function(cb) { - if (!opts.reverse) return cb(); - - that.rpc.getBlockCount(function(err, res) { - if (err) cb(err); - block_height = res.result; - return cb(); - }); - }, - function(cb) { - if (!opts.reverse) return cb(); - - that.rpc.getBlockHash(block_height, function(err, res) { - if (err) cb(err); - - block_best = res.result; - return cb(); - }); - }, - ], function(err) { - - - function sync() { - - var start, end, isForward; - - if (opts.reverse) { - start = block_best; - end = that.network.genesisBlock.hash.reverse().toString('hex'); - isForward = false; - } - else { - start = that.network.genesisBlock.hash.reverse().toString('hex'); - end = null; - isForward = true; - } - - that.syncBlocks(start, end, isForward, function(err) { - - if (err && err.message.match(/ECONNREFUSED/) && retry_attemps--){ - setTimeout(function() { - console.log("Retrying in %d secs ", retry_secs); - sync(); - }, retry_secs * 1000); - } - else - return next(err, that.block_count); - }); - } - - if (!opts.skip_blocks) { - sync(); - } + this.db.on('error', function(err) { + console.log('connection error:' + err); + moogose.disconnect(); }); - }); + + this.db.on('disconnect', function(err) { + console.log('disconnect:' + err); + mongoose.connect(config.db, {server: {auto_reconnect: true}} ); + }); + + return that.db.once('open', cb); + } + else return cb(); }; Sync.prototype.close = function() { - console.log("closing connection"); - this.db.close(); + if (!(this.opts && this.opts.skip_db_connection)) { + console.log("closing connection"); + this.db.close(); + } }; return Sync; } diff --git a/util/sync.js b/util/sync.js index ff5d54ea..569ca782 100755 --- a/util/sync.js +++ b/util/sync.js @@ -8,7 +8,7 @@ require('buffertools').extend(); var SYNC_VERSION = '0.1'; var program = require('commander'); -var Sync = require('../lib/Sync').class(); +var HistoricSync = require('../lib/HistoricSync').class(); var async = require('async'); program @@ -18,7 +18,7 @@ program .option('-R --reverse', 'Sync backwards', 0) .parse(process.argv); -var sync = new Sync({ +var historicSync = new HistoricSync({ networkName: program.network }); @@ -27,23 +27,22 @@ if (program.remove) { } async.series([ -function(cb) { - sync.init(program); - cb(); -}, -function(cb) { - sync.import_history(program, function(err, count) { - if (err) { - console.log('CRITICAL ERROR: ', err); - } - else { - console.log('Done! [%d blocks]', count, err); - } + function(cb) { + historicSync.init(program, cb); + }, + function(cb) { + historicSync.import_history(program, function(err, count) { + if (err) { + console.log('CRITICAL ERROR: ', err); + } + else { + console.log('Done! [%d blocks]', count, err); + } + cb(); + }); + }, + function(cb) { + historicSync.close(); cb(); - }); -}, -function(cb) { - sync.close(); - cb(); }]);