Merge pull request #78 from matiu/feature/07sync/uptosync
Feature/07sync/uptosync
This commit is contained in:
commit
62c5d90b5a
|
@ -55,6 +55,7 @@ BlockSchema.statics.customCreate = function(block, cb) {
|
|||
newBlock.hash = block.hash;
|
||||
newBlock.nextBlockHash = block.nextBlockHash;
|
||||
|
||||
|
||||
Transaction.createFromArray(block.tx, newBlock.time, function(err, inserted_txs) {
|
||||
if (err) return cb(err);
|
||||
|
||||
|
|
|
@ -19,6 +19,9 @@ var mongoose = require('mongoose'),
|
|||
|
||||
var CONCURRENCY = 5;
|
||||
|
||||
// TODO: use bitcore networks module
|
||||
var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b';
|
||||
|
||||
/**
|
||||
*/
|
||||
var TransactionSchema = new Schema({
|
||||
|
@ -113,6 +116,10 @@ TransactionSchema.statics.createFromArray = function(txs, time, next) {
|
|||
|
||||
TransactionSchema.statics.explodeTransactionItems = function(txid, time, cb) {
|
||||
|
||||
// Is it from genesis block? (testnet==livenet)
|
||||
// TODO: parse it from networks.genesisTX
|
||||
if (txid === genesisTXID) return cb();
|
||||
|
||||
this.queryInfo(txid, function(err, info) {
|
||||
if (err || !info) return cb(err);
|
||||
|
||||
|
|
|
@ -16,6 +16,10 @@ function spec() {
|
|||
this.block_count= 0;
|
||||
this.block_total= 0;
|
||||
this.network = config.network === 'testnet' ? networks.testnet: networks.livenet;
|
||||
|
||||
var genesisHashReversed = new Buffer(32);
|
||||
this.network.genesisBlock.hash.copy(genesisHashReversed);
|
||||
this.genesis = genesisHashReversed.reverse().toString('hex');
|
||||
this.sync = new Sync(opts);
|
||||
}
|
||||
|
||||
|
@ -44,12 +48,10 @@ function spec() {
|
|||
|
||||
HistoricSync.prototype.getPrevNextBlock = function(blockHash, blockEnd, opts, cb) {
|
||||
|
||||
var that = this;
|
||||
var self = this;
|
||||
|
||||
// recursion end.
|
||||
if (!blockHash || (blockEnd && blockEnd === blockHash) ) {
|
||||
return cb();
|
||||
}
|
||||
if (!blockHash ) return cb();
|
||||
|
||||
var existed = 0;
|
||||
var blockInfo;
|
||||
|
@ -61,17 +63,16 @@ function spec() {
|
|||
Block.findOne({hash:blockHash}, function(err,block){
|
||||
if (err) { p(err); return c(err); }
|
||||
if (block) {
|
||||
existed = 1;
|
||||
blockObj = block;
|
||||
existed =1;
|
||||
blockObj =block;
|
||||
}
|
||||
|
||||
return c();
|
||||
});
|
||||
},
|
||||
//show some (inacurate) status
|
||||
function(c) {
|
||||
if (that.block_count++ % 1000 === 0) {
|
||||
progress_bar('sync status:', that.block_count, that.block_total);
|
||||
if (self.block_count % 1000 === 1) {
|
||||
progress_bar('sync status:', self.block_count, self.block_total);
|
||||
}
|
||||
return c();
|
||||
},
|
||||
|
@ -81,7 +82,7 @@ function spec() {
|
|||
// TODO: if we store prev/next, no need to go to RPC
|
||||
// if (blockObj && blockObj.nextBlockHash) return c();
|
||||
|
||||
that.rpc.getBlock(blockHash, function(err, ret) {
|
||||
self.rpc.getBlock(blockHash, function(err, ret) {
|
||||
if (err) return c(err);
|
||||
|
||||
blockInfo = ret;
|
||||
|
@ -91,9 +92,10 @@ function spec() {
|
|||
//store it
|
||||
function(c) {
|
||||
if (existed) return c();
|
||||
self.sync.storeBlock(blockInfo.result, function(err) {
|
||||
|
||||
that.sync.storeBlock(blockInfo.result, function(err) {
|
||||
existed = err && err.toString().match(/E11000/);
|
||||
|
||||
if (err && ! existed) return c(err);
|
||||
return c();
|
||||
});
|
||||
|
@ -112,34 +114,35 @@ function spec() {
|
|||
function (err){
|
||||
|
||||
if (err)
|
||||
p('ERROR: @%s: %s [count: block_count: %d]', blockHash, err, that.block_count);
|
||||
p('ERROR: @%s: %s [count: block_count: %d]', blockHash, err, self.block_count);
|
||||
|
||||
if (opts.uptoexisting && existed) {
|
||||
p('DONE. Found existing block: ', blockHash);
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
if (blockEnd && blockEnd === blockHash) {
|
||||
p('DONE. Found END block: ', blockHash);
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
|
||||
// Continue
|
||||
if (blockInfo && blockInfo.result) {
|
||||
self.block_count++;
|
||||
if (opts.prev && blockInfo.result.previousblockhash) {
|
||||
return that.getPrevNextBlock(blockInfo.result.previousblockhash, blockEnd, opts, cb);
|
||||
return self.getPrevNextBlock(blockInfo.result.previousblockhash, blockEnd, opts, cb);
|
||||
}
|
||||
|
||||
if (opts.next && blockInfo.result.nextblockhash)
|
||||
return that.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, opts, cb);
|
||||
return self.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, opts, cb);
|
||||
}
|
||||
return cb(err);
|
||||
});
|
||||
};
|
||||
|
||||
HistoricSync.prototype.syncBlocks = function(start, end, isForward, cb) {
|
||||
var that = this;
|
||||
|
||||
p('Starting from: ', start);
|
||||
p(' to : ', end);
|
||||
p(' isForward: ', isForward);
|
||||
|
||||
|
||||
return that.getPrevNextBlock( start, end,
|
||||
isForward ? { next: 1 } : { prev: 1}, cb);
|
||||
};
|
||||
|
||||
HistoricSync.prototype.do_import_history = function(opts, next) {
|
||||
var that = this;
|
||||
HistoricSync.prototype.import_history = function(opts, next) {
|
||||
var self = this;
|
||||
|
||||
var retry_attemps = 100;
|
||||
var retry_secs = 2;
|
||||
|
@ -151,7 +154,7 @@ function spec() {
|
|||
function(cb) {
|
||||
if (opts.destroy) {
|
||||
p('Deleting Blocks...');
|
||||
that.db.collections.blocks.drop(cb);
|
||||
self.db.collections.blocks.drop(cb);
|
||||
} else {
|
||||
return cb();
|
||||
}
|
||||
|
@ -159,7 +162,7 @@ function spec() {
|
|||
function(cb) {
|
||||
if (opts.destroy) {
|
||||
p('Deleting TXs...');
|
||||
that.db.collections.transactions.drop(cb);
|
||||
self.db.collections.transactions.drop(cb);
|
||||
} else {
|
||||
return cb();
|
||||
}
|
||||
|
@ -167,16 +170,16 @@ function spec() {
|
|||
function(cb) {
|
||||
if (opts.destroy) {
|
||||
p('Deleting TXItems...');
|
||||
that.db.collections.transactionitems.drop(cb);
|
||||
self.db.collections.transactionitems.drop(cb);
|
||||
} else {
|
||||
return cb();
|
||||
}
|
||||
},
|
||||
function(cb) {
|
||||
that.rpc.getInfo(function(err, res) {
|
||||
if (err) cb(err);
|
||||
self.rpc.getInfo(function(err, res) {
|
||||
if (err) return cb(err);
|
||||
|
||||
that.block_total = res.result.blocks;
|
||||
self.block_total = res.result.blocks;
|
||||
return cb();
|
||||
});
|
||||
},
|
||||
|
@ -184,8 +187,8 @@ function spec() {
|
|||
function(cb) {
|
||||
if (!opts.reverse) return cb();
|
||||
|
||||
that.rpc.getBlockCount(function(err, res) {
|
||||
if (err) cb(err);
|
||||
self.rpc.getBlockCount(function(err, res) {
|
||||
if (err) return cb(err);
|
||||
block_height = res.result;
|
||||
return cb();
|
||||
});
|
||||
|
@ -193,8 +196,8 @@ function spec() {
|
|||
function(cb) {
|
||||
if (!opts.reverse) return cb();
|
||||
|
||||
that.rpc.getBlockHash(block_height, function(err, res) {
|
||||
if (err) cb(err);
|
||||
self.rpc.getBlockHash(block_height, function(err, res) {
|
||||
if (err) return cb(err);
|
||||
|
||||
block_best = res.result;
|
||||
return cb();
|
||||
|
@ -203,22 +206,24 @@ function spec() {
|
|||
],
|
||||
function(err) {
|
||||
|
||||
var start, end;
|
||||
function sync() {
|
||||
var start, end, isForward;
|
||||
|
||||
if (opts.reverse) {
|
||||
start = block_best;
|
||||
end = that.network.genesisBlock.hash.reverse().toString('hex');
|
||||
isForward = false;
|
||||
end = self.genesis;
|
||||
opts.prev = true;
|
||||
}
|
||||
else {
|
||||
start = that.network.genesisBlock.hash.reverse().toString('hex');
|
||||
start = self.genesis;
|
||||
end = null;
|
||||
isForward = true;
|
||||
opts.next = true;
|
||||
}
|
||||
|
||||
that.syncBlocks(start, end, isForward, function(err) {
|
||||
p('Starting from: ', start);
|
||||
p(' to : ', end);
|
||||
p(' opts: ', JSON.stringify(opts));
|
||||
|
||||
self.getPrevNextBlock( start, end, opts , function(err) {
|
||||
if (err && err.message.match(/ECONNREFUSED/) && retry_attemps--){
|
||||
setTimeout(function() {
|
||||
p('Retrying in %d secs', retry_secs);
|
||||
|
@ -226,7 +231,7 @@ function spec() {
|
|||
}, retry_secs * 1000);
|
||||
}
|
||||
else
|
||||
return next(err, that.block_count);
|
||||
return next(err, self.block_count);
|
||||
});
|
||||
}
|
||||
if (!err)
|
||||
|
@ -236,9 +241,28 @@ function spec() {
|
|||
});
|
||||
};
|
||||
|
||||
HistoricSync.prototype.import_history = function(opts, next) {
|
||||
var that = this;
|
||||
that.do_import_history(opts, next);
|
||||
// upto if we have genesis block?
|
||||
HistoricSync.prototype.smart_import = function(next) {
|
||||
var self = this;
|
||||
|
||||
Block.findOne({hash:self.genesis}, function(err, b){
|
||||
if (err) return next(err);
|
||||
|
||||
|
||||
if (!b) {
|
||||
p('Could not find Genesis block. Running FULL SYNC');
|
||||
}
|
||||
else {
|
||||
p('Genesis block found. Syncing upto know blocks.');
|
||||
}
|
||||
|
||||
var opts = {
|
||||
reverse: 1,
|
||||
uptoexisting: b ? true: false,
|
||||
};
|
||||
|
||||
return self.import_history(opts, next);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
|
|
12
lib/Sync.js
12
lib/Sync.js
|
@ -19,6 +19,7 @@ function spec() {
|
|||
var that = this;
|
||||
|
||||
Block.customCreate(block, function(err, block, inserted_txs){
|
||||
if (err) return cb(err);
|
||||
|
||||
if (block && that.opts.broadcast_blocks) {
|
||||
sockets.broadcast_block(block);
|
||||
|
@ -56,17 +57,6 @@ function spec() {
|
|||
};
|
||||
|
||||
|
||||
Sync.prototype.syncBlocks = function(start, end, isForward, cb) {
|
||||
var that = this;
|
||||
|
||||
console.log('Syncing Blocks, starting \n\tfrom: %s \n\tend: %s \n\tisForward:',
|
||||
start, end, isForward);
|
||||
|
||||
|
||||
return that.getPrevNextBlock( start, end,
|
||||
isForward ? { next: 1 } : { prev: 1}, cb);
|
||||
};
|
||||
|
||||
Sync.prototype.init = function(opts, cb) {
|
||||
var that = this;
|
||||
|
||||
|
|
|
@ -50,10 +50,8 @@ if (!config.disableHistoricSync) {
|
|||
skip_db_connection: true,
|
||||
networkName: config.network
|
||||
}, function() {
|
||||
hs.import_history({
|
||||
reverse: 1,
|
||||
}, function(){
|
||||
console.log('historic_sync finished!');
|
||||
hs.smart_import(function(){
|
||||
console.log('[historic_sync] finished!');
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
33
util/sync.js
33
util/sync.js
|
@ -14,35 +14,50 @@ var async = require('async');
|
|||
program
|
||||
.version(SYNC_VERSION)
|
||||
.option('-N --network [livenet]', 'Set bitcoin network [testnet]', 'testnet')
|
||||
.option('-S --smart', 'genesis stored? uptoexisting = 1', 1)
|
||||
.option('-D --destroy', 'Remove current DB (and start from there)', 0)
|
||||
.option('-R --reverse', 'Sync backwards', 0)
|
||||
.option('-U --uptoexisting', 'Sync only until an existing block is found', 0)
|
||||
.parse(process.argv);
|
||||
|
||||
var historicSync = new HistoricSync({
|
||||
networkName: program.network
|
||||
});
|
||||
|
||||
/* TODO: Sure?
|
||||
if (program.remove) {
|
||||
|
||||
}
|
||||
*/
|
||||
|
||||
async.series([
|
||||
function(cb) {
|
||||
historicSync.init(program, cb);
|
||||
},
|
||||
function(cb) {
|
||||
historicSync.import_history(program, function(err, count) {
|
||||
if (program.smart) {
|
||||
historicSync.smart_import(cb);
|
||||
}
|
||||
else {
|
||||
historicSync.import_history({
|
||||
destroy: program.destroy,
|
||||
reverse: program.reverse,
|
||||
uptoexisting: program.uptoexisting,
|
||||
}, cb);
|
||||
}
|
||||
},
|
||||
function(cb) {
|
||||
historicSync.close();
|
||||
return cb();
|
||||
},
|
||||
],
|
||||
function(err, count) {
|
||||
if (err) {
|
||||
console.log('CRITICAL ERROR: ', err);
|
||||
}
|
||||
else {
|
||||
console.log('Done! [%d blocks]', count, err);
|
||||
console.log('Finished. [%d blocks synced]', count[1]);
|
||||
}
|
||||
cb();
|
||||
});
|
||||
},
|
||||
function(cb) {
|
||||
historicSync.close();
|
||||
cb();
|
||||
}]);
|
||||
return;
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in New Issue