refactor of historic sync. Handles all know scenarios
This commit is contained in:
parent
601318f0e1
commit
b8f95b7a0e
|
@ -0,0 +1,56 @@
|
|||
|
||||
|
||||
first 5%
|
||||
|
||||
=> with data + mongo + w/RPC for blocks: 48.8s
|
||||
=> with RPC + mongo: 2m26s
|
||||
=> with files + mongo + wo/RPC for blocks: 36.7s
|
||||
=> with files + mongo + wo/RPC for blocks + wo/mongoIndexes:
|
||||
|
||||
|
||||
first 10%
|
||||
|
||||
=> sin RPC, sin Tx, sin store block => 0.7s
|
||||
=> sin RPC, sin grabar, procesando TX => 8.5s
|
||||
=> sin RPC, sin TX processing, sin grabar => 12s28
|
||||
=> con RPC, TX processing, sin Grabar Tx, grabando bloques => 29s
|
||||
=> con RPC, sin TX processing, sin Grabar Tx, grabando bloques => 35s
|
||||
=> con RPC, TX processing, sin Grabar Tx, grabando bloques => 43s
|
||||
|
||||
=> TX processing, sin RPC, sin saves TX, y blocks => 11.6s
|
||||
=> TX processing, CON RPC, sin saves TX, y blocks => 35s
|
||||
=> con RPC, TX processing, sin saves TX => 45s
|
||||
=> con RPC, TX processing, Grabarndo todo => 78s
|
||||
=> con RPC, TX processing, Grabarndo todo => 78s
|
||||
(18k blocks, 36k txouts)
|
||||
|
||||
//LEVEL DB
|
||||
=> sin RPC, TX processing, todo en level => 14s
|
||||
=> con RPC, TX processing, todo en level => 39.7s
|
||||
=> con RPC, TX processing, tx mongo, blocks en level => 64s
|
||||
|
||||
|
||||
=> sin RPC, TX processing, todo en level, handling REORGs, more data => 28s
|
||||
=> sin RPC, TX processing, todo en level, handling REORGs, more data, tx ts => 34t s
|
||||
|
||||
|
||||
//FROM blk00002.dat (more txs), 5%
|
||||
|
||||
=> now total : 1m13s
|
||||
=> removing block writes => 1m8s
|
||||
=> sacando los contenidos adentro de getblock from file de => 4.5s!!
|
||||
|
||||
=> con base58 cpp => 21s
|
||||
=> toda la testnet => 17m
|
||||
|
||||
10% de blk2
|
||||
=> 50s con base58cpp
|
||||
=> 41s commentando todo addr
|
||||
=> 5s commentando todo get HistoricSync.prototype.getBlockFromFile = function(cb) {
|
||||
=> 15s commentando todo get HistoricSync.prototype.getBlockFromFile = function(cb) {
|
||||
|
||||
10% de blk 1
|
||||
=> 59s
|
||||
=> 15s comentando desde b.getStandardizedObject()
|
||||
=> 39s comentando dps b.getStandardizedObject()
|
||||
|
35
insight.js
35
insight.js
|
@ -44,48 +44,29 @@ walk(models_path);
|
|||
* p2pSync process
|
||||
*/
|
||||
|
||||
var peerSync = new PeerSync();
|
||||
var peerSync = new PeerSync({shouldBroadcast: true});
|
||||
|
||||
if (!config.disableP2pSync) {
|
||||
var ps = peerSync;
|
||||
ps.init({
|
||||
shouldBroadcast: true,
|
||||
}, function() {
|
||||
ps.run();
|
||||
});
|
||||
peerSync.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* historic_sync process
|
||||
*/
|
||||
var historicSync = {};
|
||||
var historicSync = new HistoricSync({ shouldBroadcastSync: true });
|
||||
peerSync.historicSync = historicSync;
|
||||
|
||||
if (!config.disableHistoricSync) {
|
||||
historicSync = new HistoricSync();
|
||||
|
||||
historicSync.init({
|
||||
shouldBroadcastSync: true,
|
||||
}, function(err) {
|
||||
historicSync.start({}, function(err){
|
||||
if (err) {
|
||||
var txt = 'ABORTED with error: ' + err.message;
|
||||
console.log('[historic_sync] ' + txt);
|
||||
}
|
||||
else {
|
||||
historicSync.smartImport({}, function(err){
|
||||
var txt = 'ended.';
|
||||
if (err)
|
||||
txt = 'ABORTED with error: ' + err.message;
|
||||
else if (ps) {
|
||||
ps.allowReorgs = true;
|
||||
ps.historicSync = historicSync;
|
||||
}
|
||||
|
||||
console.log('[historic_sync] ' + txt, historicSync.info());
|
||||
});
|
||||
}
|
||||
if (peerSync) peerSync.allowReorgs = true;
|
||||
});
|
||||
}
|
||||
|
||||
else
|
||||
if (peerSync) peerSync.allowReorgs = true;
|
||||
|
||||
|
||||
//express settings
|
||||
|
|
|
@ -136,7 +136,7 @@ function spec(b) {
|
|||
});
|
||||
};
|
||||
|
||||
BlockDb.prototype.countNotOrphan = function(cb) {
|
||||
BlockDb.prototype.countConnected = function(cb) {
|
||||
var c = 0;
|
||||
console.log('Counting connected blocks. This could take some minutes');
|
||||
db.createReadStream({start: MAIN_PREFIX, end: MAIN_PREFIX + '~' })
|
||||
|
|
|
@ -6,6 +6,7 @@ require('classtool');
|
|||
|
||||
function spec() {
|
||||
var util = require('util');
|
||||
var assert = require('assert');
|
||||
var RpcClient = require('bitcore/RpcClient').class();
|
||||
var Script = require('bitcore/Script').class();
|
||||
var networks = require('bitcore/networks');
|
||||
|
@ -20,14 +21,19 @@ function spec() {
|
|||
|
||||
var BAD_GEN_ERROR = 'Bad genesis block. Network mismatch between Insight and bitcoind? Insight is configured for:';
|
||||
|
||||
function HistoricSync() {
|
||||
var BAD_GEN_ERROR_DB = 'Bad genesis block. Network mismatch between Insight and levelDB? Insight is configured for:';
|
||||
function HistoricSync(opts) {
|
||||
opts = opts || {};
|
||||
|
||||
this.network = config.network === 'testnet' ? networks.testnet: networks.livenet;
|
||||
|
||||
var genesisHashReversed = new Buffer(32);
|
||||
this.network.genesisBlock.hash.copy(genesisHashReversed);
|
||||
this.genesis = genesisHashReversed.reverse().toString('hex');
|
||||
|
||||
|
||||
this.rpc = new RpcClient(config.bitcoind);
|
||||
this.shouldBroadcast = opts.shouldBroadcastSync;
|
||||
this.sync = new Sync(opts);
|
||||
}
|
||||
|
||||
function p() {
|
||||
|
@ -52,7 +58,7 @@ function spec() {
|
|||
self.updatePercentage();
|
||||
p(util.format('status: [%d%%]', self.syncPercentage));
|
||||
}
|
||||
if (self.opts.shouldBroadcastSync) {
|
||||
if (self.shouldBroadcast) {
|
||||
sockets.broadcastSyncInfo(self.info());
|
||||
}
|
||||
|
||||
|
@ -67,38 +73,10 @@ function spec() {
|
|||
self.error = err.toString();
|
||||
self.status='error';
|
||||
self.showProgress();
|
||||
return err;
|
||||
};
|
||||
|
||||
HistoricSync.prototype.init = function(opts, cb) {
|
||||
|
||||
var self = this;
|
||||
self.opts = opts;
|
||||
|
||||
self.rpc = new RpcClient(config.bitcoind);
|
||||
self.sync = new Sync(opts);
|
||||
|
||||
self.sync.init(opts, function(err) {
|
||||
if (err) {
|
||||
self.setError(err);
|
||||
return cb(err);
|
||||
}
|
||||
else {
|
||||
|
||||
self.startTs = parseInt(Date.now());
|
||||
|
||||
// check testnet?
|
||||
self.rpc.getBlockHash(0, function(err, res){
|
||||
if (!err && ( res && res.result !== self.genesis)) {
|
||||
err = new Error(BAD_GEN_ERROR + config.network);
|
||||
self.setError(err);
|
||||
}
|
||||
if (err) self.setError(err);
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
};
|
||||
|
||||
HistoricSync.prototype.close = function() {
|
||||
this.sync.close();
|
||||
|
@ -112,7 +90,6 @@ function spec() {
|
|||
blockChainHeight: this.blockChainHeight,
|
||||
syncPercentage: this.syncPercentage,
|
||||
syncedBlocks: this.syncedBlocks,
|
||||
orphanBlocks: this.orphanBlocks,
|
||||
syncTipHash: this.sync.tip,
|
||||
error: this.error,
|
||||
type: this.type,
|
||||
|
@ -127,100 +104,30 @@ function spec() {
|
|||
if (this.syncPercentage > 100) this.syncPercentage = 100;
|
||||
};
|
||||
|
||||
HistoricSync.prototype.getPrevNextBlock = function(blockHash, blockEnd, scanOpts, cb) {
|
||||
HistoricSync.prototype.getBlockFromRPC = function(cb) {
|
||||
var self = this;
|
||||
|
||||
// recursion end.
|
||||
if (!blockHash) return cb();
|
||||
if (!self.currentRpcHash) return cb();
|
||||
|
||||
var existed = false;
|
||||
var blockInfo;
|
||||
self.rpc.getBlock(self.currentRpcHash, function(err, ret) {
|
||||
if (err) return cb(err);
|
||||
if (ret) {
|
||||
blockInfo = ret.result;
|
||||
// this is to match block retreived from file
|
||||
if (blockInfo.hash === self.genesis)
|
||||
blockInfo.previousblockhash =
|
||||
self.network.genesisBlock.prev_hash.toString('hex');
|
||||
|
||||
async.series([
|
||||
// Already got it?
|
||||
function(c) {
|
||||
self.sync.bDb.has(blockHash, function(err, ret) {
|
||||
if (err) {
|
||||
p(err);
|
||||
return c(err);
|
||||
}
|
||||
|
||||
if (ret) existed = true;
|
||||
return c();
|
||||
});
|
||||
},
|
||||
//show some (inacurate) status
|
||||
function(c) {
|
||||
self.showProgress();
|
||||
return c();
|
||||
},
|
||||
|
||||
function(c) {
|
||||
self.rpc.getBlock(blockHash, function(err, ret) {
|
||||
if (err) return c(err);
|
||||
if (ret) {
|
||||
blockInfo = ret.result;
|
||||
// this is to match block retreived from file
|
||||
if (blockInfo.hash === self.genesis)
|
||||
blockInfo.previousblockhash = self.network.genesisBlock.prev_hash.toString('hex');
|
||||
}
|
||||
else {
|
||||
blockInfo = null;
|
||||
}
|
||||
|
||||
return c();
|
||||
});
|
||||
},
|
||||
//store it
|
||||
function(c) {
|
||||
if (existed) return c();
|
||||
|
||||
// When storing files from RPC recusively, reorgs are disabled
|
||||
self.sync.storeTipBlock(blockInfo, false, function(err) {
|
||||
return c(err);
|
||||
});
|
||||
}], function(err) {
|
||||
|
||||
if (err) {
|
||||
self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]',
|
||||
blockHash, err, self.syncedBlocks));
|
||||
return cb(err);
|
||||
self.currentRpcHash = blockInfo.nextblockhash;
|
||||
}
|
||||
else {
|
||||
self.status = 'syncing';
|
||||
blockInfo = null;
|
||||
}
|
||||
|
||||
if ( blockEnd && blockEnd === blockHash) {
|
||||
p('blockEnd found!:' + blockEnd);
|
||||
self.found=1;
|
||||
}
|
||||
|
||||
if ( self.found && self.syncedBlocks >= self.blockChainHeight ) {
|
||||
|
||||
self.endTs = parseInt(Date.now());
|
||||
self.status = 'finished';
|
||||
p('DONE. Height: ' , self.syncedBlocks);
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
// Continue
|
||||
if (blockInfo) {
|
||||
|
||||
self.syncedBlocks++;
|
||||
|
||||
// recursion
|
||||
if (scanOpts.prev && blockInfo.previousblockhash)
|
||||
return self.getPrevNextBlock(blockInfo.previousblockhash, blockEnd, scanOpts, cb);
|
||||
|
||||
if (scanOpts.next && blockInfo.nextblockhash)
|
||||
return self.getPrevNextBlock(blockInfo.nextblockhash, blockEnd, scanOpts, cb);
|
||||
}
|
||||
return cb(err);
|
||||
return cb(null, blockInfo);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
//var a=1;
|
||||
HistoricSync.prototype.getBlockFromFile = function(cb) {
|
||||
var self = this;
|
||||
|
||||
|
@ -228,12 +135,8 @@ function spec() {
|
|||
|
||||
//get Info
|
||||
self.blockExtractor.getNextBlock(function(err, b) {
|
||||
// a++;
|
||||
//return cb(null,{previousblockhash:a.toString(), hash:(a-1).toString() });
|
||||
if (err || ! b) return cb(err);
|
||||
blockInfo = b.getStandardizedObject(b.txs, self.network);
|
||||
// blockInfo.curWork = Deserialize.intFromCompact(b.bits);
|
||||
// We keep the RPC field names
|
||||
blockInfo.previousblockhash = blockInfo.prev_block;
|
||||
|
||||
var ti=0;
|
||||
|
@ -260,241 +163,258 @@ function spec() {
|
|||
to++;
|
||||
});
|
||||
});
|
||||
|
||||
return cb(err,blockInfo);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
HistoricSync.prototype.nextBlockFromFile = function(scanOpts, cb) {
|
||||
var self = this;
|
||||
|
||||
self.showProgress();
|
||||
self.getBlockFromFile(function(err, blockInfo) {
|
||||
if (err) {
|
||||
self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]',
|
||||
blockInfo ? blockInfo.hash : '-', err, self.syncedBlocks));
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
self.sync.storeTipBlock(blockInfo, function(err) {
|
||||
if (err) {
|
||||
self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]',
|
||||
blockInfo ? blockInfo.hash : '-', err, self.syncedBlocks));
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
self.sync.bDb.setLastFileIndex(self.blockExtractor.currentFileIndex, function(err) {
|
||||
if (err) return cb(err);
|
||||
|
||||
if (blockInfo && blockInfo.hash) {
|
||||
self.syncedBlocks++;
|
||||
}
|
||||
else {
|
||||
self.endTs = parseInt(Date.now());
|
||||
self.status = 'finished';
|
||||
}
|
||||
|
||||
return cb(err);
|
||||
});
|
||||
self.sync.bDb.setLastFileIndex(self.blockExtractor.currentFileIndex, function(err) {
|
||||
return cb(err,blockInfo);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
HistoricSync.prototype.countNotOrphan = function(cb) {
|
||||
HistoricSync.prototype.updateConnectedCountDB = function(cb) {
|
||||
var self = this;
|
||||
|
||||
if (self.notOrphanCount) return cb(null, self.notOrphanCount);
|
||||
|
||||
|
||||
self.sync.bDb.countNotOrphan(function(err, count) {
|
||||
if (err) return cb(err);
|
||||
self.notOrphanCount = count;
|
||||
return cb(null, self.notOrphanCount);
|
||||
self.sync.bDb.countConnected(function(err, count) {
|
||||
self.connectedCountDB = count || 0;
|
||||
self.syncedBlocks = count || 0;
|
||||
return cb(err);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
HistoricSync.prototype.updateBlockCount = function(cb) {
|
||||
HistoricSync.prototype.updateBlockChainHeight = function(cb) {
|
||||
var self = this;
|
||||
|
||||
if (self.blockChainHeight) return cb();
|
||||
|
||||
self.rpc.getBlockCount(function(err, res) {
|
||||
if (err) return cb(err);
|
||||
self.blockChainHeight = res.result;
|
||||
return cb();
|
||||
return cb(err);
|
||||
});
|
||||
};
|
||||
|
||||
HistoricSync.prototype.smartImport = function(scanOpts, next) {
|
||||
|
||||
HistoricSync.prototype.checkNetworkSettings = function(next) {
|
||||
var self = this;
|
||||
|
||||
var genesis, count;
|
||||
var lastBlock;
|
||||
var tip;
|
||||
self.hasGenesis = false;
|
||||
|
||||
//available status: starting / syncing / finished / aborted
|
||||
this.status = 'starting';
|
||||
this.error = null;
|
||||
|
||||
this.syncPercentage = 0;
|
||||
this.syncedBlocks = 0;
|
||||
this.orphanBlocks = 0;
|
||||
this.type ='';
|
||||
|
||||
async.series([
|
||||
function(s_c) {
|
||||
if (!scanOpts.destroy) return s_c();
|
||||
|
||||
p('Deleting DB...');
|
||||
return self.sync.destroy(s_c);
|
||||
},
|
||||
function(s_c) {
|
||||
self.sync.bDb.has(self.genesis, function(err, b) {
|
||||
genesis = b;
|
||||
return s_c(err);
|
||||
});
|
||||
},
|
||||
function(s_c) {
|
||||
self.countNotOrphan(function(err, c) {
|
||||
count = c;
|
||||
return s_c(err);
|
||||
});
|
||||
},
|
||||
function(s_c) {
|
||||
if (!config.bitcoind.dataDir) return s_c();
|
||||
if (scanOpts.startFile) {
|
||||
self.blockExtractor = new BlockExtractor(config.bitcoind.dataDir, config.network);
|
||||
self.blockExtractor.currentFileIndex = scanOpts.startFile;
|
||||
return s_c();
|
||||
// check network config
|
||||
self.rpc.getBlockHash(0, function(err, res){
|
||||
if (!err && ( res && res.result !== self.genesis)) {
|
||||
err = new Error(BAD_GEN_ERROR + config.network);
|
||||
}
|
||||
if (err) return next(err);
|
||||
self.sync.bDb.has(self.genesis, function(err, b) {
|
||||
if (!err && ( res && res.result !== self.genesis)) {
|
||||
err = new Error(BAD_GEN_ERROR_DB + config.network);
|
||||
}
|
||||
self.sync.bDb.getLastFileIndex(function(err, idx) {
|
||||
self.blockExtractor = new BlockExtractor(config.bitcoind.dataDir, config.network);
|
||||
if (idx) self.blockExtractor.currentFileIndex = idx;
|
||||
return s_c(err);
|
||||
});
|
||||
},
|
||||
function(s_c) {
|
||||
self.updateBlockCount(s_c);
|
||||
},
|
||||
self.hasGenesis = b?true:false;
|
||||
return next(err);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
// define sync strategy
|
||||
function(s_c) {
|
||||
if (!genesis || scanOpts.destroy || count < self.blockChainHeight * 0.9 ) {
|
||||
HistoricSync.prototype.updateStartBlock = function(next) {
|
||||
var self = this;
|
||||
|
||||
// Full sync.
|
||||
if (!genesis)
|
||||
p('Could not find Genesis block. Running FULL SYNC');
|
||||
else
|
||||
p('Less that 90% of current blockchain is stored. Running FULL SYNC',
|
||||
parseInt(count/self.blockChainHeight*100));
|
||||
self.startBlock = self.genesis;
|
||||
|
||||
if (config.bitcoind.dataDir) {
|
||||
p('bitcoind dataDir configured...importing blocks from .dat files');
|
||||
p('Starting from file: ' + self.blockExtractor.currentFileIndex);
|
||||
scanOpts.fromFiles = true;
|
||||
self.sync.bDb.getTip(function(err,tip) {
|
||||
if (!tip) return next();
|
||||
|
||||
var blockInfo;
|
||||
var oldtip;
|
||||
|
||||
//check that the tip is still on the mainchain
|
||||
async.doWhilst(
|
||||
function(cb) {
|
||||
self.sync.bDb.fromHashWithInfo(tip, function(err, bi) {
|
||||
blockInfo = bi ? bi.info : {};
|
||||
if (oldtip)
|
||||
self.sync.setBlockMain(oldtip, false, cb);
|
||||
else
|
||||
return cb();
|
||||
});
|
||||
},
|
||||
function(err) {
|
||||
if (err) return next(err);
|
||||
var ret = false;
|
||||
if ( self.blockChainHeight === blockInfo.height ||
|
||||
blockInfo.confirmations > 0) {
|
||||
ret = false;
|
||||
}
|
||||
else {
|
||||
scanOpts.reverse = true;
|
||||
oldtip = tip;
|
||||
tip = blockInfo.previousblockhash;
|
||||
assert(tip);
|
||||
p('Previous TIP is now orphan. Back to:' + tip);
|
||||
ret = true;
|
||||
}
|
||||
return ret;
|
||||
},
|
||||
function(err) {
|
||||
self.startBlock = tip;
|
||||
p('Resuming sync from block:'+tip);
|
||||
return next(err);
|
||||
}
|
||||
else {
|
||||
p('Genesis block found. Syncing upto old TIP.');
|
||||
p('Got ' + count + ' out of ' + self.blockChainHeight + ' blocks');
|
||||
scanOpts.reverse = true;
|
||||
}
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
if (!scanOpts.reverse) return s_c();
|
||||
HistoricSync.prototype.prepareFileSync = function(opts, next) {
|
||||
var self = this;
|
||||
|
||||
self.rpc.getBlockHash(self.blockChainHeight, function(err, res) {
|
||||
if (err) return s_c(err);
|
||||
lastBlock = res.result;
|
||||
return s_c();
|
||||
});
|
||||
},
|
||||
function(s_c) {
|
||||
if (!scanOpts.reverse) return s_c();
|
||||
self.sync.bDb.getTip(function(err, res) {
|
||||
if (err) return s_c(err);
|
||||
tip = res;
|
||||
|
||||
console.log('Old Tip:', tip);
|
||||
return s_c();
|
||||
});
|
||||
},
|
||||
function(s_c) {
|
||||
if (!scanOpts.reverse) return s_c();
|
||||
self.countNotOrphan(function(err, count) {
|
||||
if (err) return s_c(err);
|
||||
|
||||
self.syncedBlocks = count || 0;
|
||||
return s_c();
|
||||
});
|
||||
}],
|
||||
function(err) {
|
||||
// SETUP Sync params
|
||||
var start, end;
|
||||
if (err) {
|
||||
self.setError(err);
|
||||
return next(err, 0);
|
||||
}
|
||||
|
||||
if (!self.step) {
|
||||
var step = parseInt( (self.blockChainHeight - self.syncedBlocks) / 1000);
|
||||
|
||||
if (self.opts.progressStep) {
|
||||
step = self.opts.progressStep;
|
||||
}
|
||||
|
||||
if (step < 10) step = 10;
|
||||
self.step = step;
|
||||
}
|
||||
|
||||
if (scanOpts.reverse) {
|
||||
start = lastBlock;
|
||||
end = tip || self.genesis;
|
||||
scanOpts.prev = true;
|
||||
}
|
||||
else {
|
||||
start = self.genesis;
|
||||
end = null;
|
||||
scanOpts.next = true;
|
||||
}
|
||||
p('Starting from: ', start);
|
||||
p(' to : ', end);
|
||||
p(' scanOpts: ', JSON.stringify(scanOpts));
|
||||
|
||||
if (scanOpts.fromFiles) {
|
||||
|
||||
self.status = 'syncing';
|
||||
self.type = 'from .dat Files';
|
||||
if (
|
||||
opts.forceRPC || !config.bitcoind.dataDir ||
|
||||
self.connectedCountDB > self.blockChainHeight > 0.9) return next();
|
||||
|
||||
|
||||
async.whilst(function() {
|
||||
return self.status === 'syncing';
|
||||
}, function (w_cb) {
|
||||
self.nextBlockFromFile(scanOpts, function(err) {
|
||||
self.blockExtractor = new BlockExtractor(config.bitcoind.dataDir, config.network);
|
||||
|
||||
self.getFn = self.getBlockFromFile;
|
||||
self.allowReorgs = true;
|
||||
self.sync.bDb.getLastFileIndex(function(err, idx) {
|
||||
if (opts.forceStartFile)
|
||||
self.blockExtractor.currentFileIndex = opts.forceStartFile;
|
||||
else if (idx) self.blockExtractor.currentFileIndex = idx;
|
||||
|
||||
var h = self.genesis;
|
||||
|
||||
p('Seeking file to:' + self.startBlock);
|
||||
//forward till startBlock
|
||||
async.whilst(
|
||||
function() {
|
||||
return h !== self.startBlock;
|
||||
},
|
||||
function (w_cb) {
|
||||
self.getBlockFromFile(function(err,b) {
|
||||
h=b.hash;
|
||||
setImmediate(function(){
|
||||
return w_cb(err);
|
||||
});
|
||||
});
|
||||
}, function(err) {
|
||||
self.showProgress();
|
||||
return next(err);
|
||||
});
|
||||
}
|
||||
else {
|
||||
self.type = 'from RPC calls';
|
||||
self.getPrevNextBlock(start, end, scanOpts, function(err) {
|
||||
self.showProgress();
|
||||
return next(err);
|
||||
});
|
||||
}
|
||||
}, next);
|
||||
});
|
||||
};
|
||||
|
||||
//NOP
|
||||
HistoricSync.prototype.prepareRpcSync = function(opts, next) {
|
||||
var self = this;
|
||||
|
||||
if (self.blockExtractor) return next();
|
||||
self.getFn = self.getBlockFromRPC;
|
||||
self.currentRpcHash = self.startBlock;
|
||||
self.allowReorgs = false;
|
||||
return next();
|
||||
};
|
||||
|
||||
HistoricSync.prototype.showSyncStartMessage = function() {
|
||||
var self = this;
|
||||
|
||||
p('Got ' + self.connectedCountDB +
|
||||
' blocks in current DB, out of ' + self.blockChainHeight + ' block at bitcoind');
|
||||
|
||||
if (self.blockExtractor) {
|
||||
p('bitcoind dataDir configured...importing blocks from .dat files');
|
||||
p('First file index: ' + self.blockExtractor.currentFileIndex);
|
||||
}
|
||||
else {
|
||||
p('syncing from RPC (slow)');
|
||||
}
|
||||
|
||||
p('Starting from: ', self.startBlock);
|
||||
self.showProgress();
|
||||
};
|
||||
|
||||
|
||||
HistoricSync.prototype.setupSyncStatus = function() {
|
||||
var self = this;
|
||||
|
||||
var step = parseInt( (self.blockChainHeight - self.syncedBlocks) / 1000);
|
||||
if (step < 10) step = 10;
|
||||
|
||||
self.step = step;
|
||||
self.type = self.blockExtractor?'from .dat Files':'from RPC calls';
|
||||
self.status = 'syncing';
|
||||
self.startTs = Date.now();
|
||||
self.endTs = null;
|
||||
this.error = null;
|
||||
this.syncPercentage = 0;
|
||||
};
|
||||
|
||||
HistoricSync.prototype.prepareToSync = function(opts, next) {
|
||||
var self = this;
|
||||
|
||||
self.status = 'starting';
|
||||
async.series([
|
||||
function(s_c) {
|
||||
self.checkNetworkSettings(s_c);
|
||||
},
|
||||
function(s_c) {
|
||||
self.updateConnectedCountDB(s_c);
|
||||
},
|
||||
function(s_c) {
|
||||
self.updateBlockChainHeight(s_c);
|
||||
},
|
||||
function(s_c) {
|
||||
self.updateStartBlock(s_c);
|
||||
},
|
||||
function(s_c) {
|
||||
self.prepareFileSync(opts, s_c);
|
||||
},
|
||||
function(s_c) {
|
||||
self.prepareRpcSync(opts, s_c);
|
||||
},
|
||||
],
|
||||
function(err) {
|
||||
if (err) return(self.setError(err));
|
||||
|
||||
self.showSyncStartMessage();
|
||||
self.setupSyncStatus();
|
||||
return next();
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
HistoricSync.prototype.start = function(opts, next) {
|
||||
var self = this;
|
||||
|
||||
if (self.status==='starting' || self.status==='syncing') {
|
||||
p('## Wont start to sync while status is %s', self.status);
|
||||
return next();
|
||||
}
|
||||
|
||||
self.prepareToSync(opts, function(err) {
|
||||
if (err) return next(self.setError(err));
|
||||
|
||||
async.whilst(
|
||||
function() {
|
||||
return self.status === 'syncing';
|
||||
},
|
||||
function (w_cb) {
|
||||
self.getFn(function(err,blockInfo) {
|
||||
if (err) return w_cb(self.setError(err));
|
||||
|
||||
self.showProgress();
|
||||
if (blockInfo && blockInfo.hash) {
|
||||
self.syncedBlocks++;
|
||||
self.sync.storeTipBlock(blockInfo, self.allowReorgs, function(err) {
|
||||
if (err) return w_cb(self.setError(err));
|
||||
|
||||
self.sync.bDb.setTip(blockInfo.hash, function(err) {
|
||||
if (err) return w_cb(self.setError(err));
|
||||
|
||||
setImmediate(function(){
|
||||
return w_cb(err);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
else {
|
||||
self.endTs = Date.now();
|
||||
self.status = 'finished';
|
||||
return w_cb(err);
|
||||
}
|
||||
});
|
||||
}, next);
|
||||
});
|
||||
};
|
||||
return HistoricSync;
|
||||
}
|
||||
module.defineClass(spec);
|
||||
|
|
|
@ -3,8 +3,7 @@ require('classtool');
|
|||
|
||||
function spec() {
|
||||
var fs = require('fs');
|
||||
var CoinConst = require('bitcore/const');
|
||||
var coinUtil = require('bitcore/util/util');
|
||||
var bitcoreUtil = require('bitcore/util/util');
|
||||
var Sync = require('./Sync').class();
|
||||
var Peer = require('bitcore/Peer').class();
|
||||
var config = require('../config/config');
|
||||
|
@ -12,26 +11,17 @@ function spec() {
|
|||
|
||||
var peerdb_fn = 'peerdb.json';
|
||||
|
||||
function PeerSync() {}
|
||||
|
||||
|
||||
PeerSync.prototype.init = function(opts, cb) {
|
||||
if (!opts) opts = {};
|
||||
function PeerSync(opts) {
|
||||
this.connected = false;
|
||||
this.peerdb = undefined;
|
||||
this.allowReorgs = false;
|
||||
|
||||
this.sync = new Sync();
|
||||
this.PeerManager = require('bitcore/PeerManager').createClass({
|
||||
network: (config.network === 'testnet' ? networks.testnet : networks.livenet)
|
||||
});
|
||||
this.peerman = new this.PeerManager();
|
||||
this.load_peers();
|
||||
this.sync.init(opts, function() {
|
||||
if (!cb) return;
|
||||
return cb();
|
||||
});
|
||||
};
|
||||
this.sync = new Sync(opts);
|
||||
}
|
||||
|
||||
PeerSync.prototype.load_peers = function() {
|
||||
this.peerdb = [{
|
||||
|
@ -72,20 +62,26 @@ function spec() {
|
|||
PeerSync.prototype.handleBlock = function(info) {
|
||||
var self = this;
|
||||
var block = info.message.block;
|
||||
var blockHash = coinUtil.formatHashFull(block.calcHash());
|
||||
var blockHash = bitcoreUtil.formatHashFull(block.calcHash());
|
||||
|
||||
console.log('[p2p_sync] Handle block: %s (allowReorgs: %s)', blockHash, self.allowReorgs);
|
||||
|
||||
var tx_hashes = block.txs.map(function(tx) {
|
||||
return coinUtil.formatHashFull(tx.hash);
|
||||
return bitcoreUtil.formatHashFull(tx.hash);
|
||||
});
|
||||
|
||||
this.sync.storeTipBlock({
|
||||
'hash': blockHash,
|
||||
'tx': tx_hashes,
|
||||
'previousblockhash': coinUtil.formatHashFull(block.prev_hash),
|
||||
'previousblockhash': bitcoreUtil.formatHashFull(block.prev_hash),
|
||||
}, self.allowReorgs, function(err) {
|
||||
if (err) {
|
||||
if (err && err.message.match(/NEED_SYNC/) && self.historicSync) {
|
||||
console.log('[p2p_sync] Orphan block received. Triggering sync');
|
||||
self.historicSync.start({}, function(){
|
||||
console.log('[p2p_sync] Done resync.');
|
||||
});
|
||||
}
|
||||
else if (err) {
|
||||
console.log('[p2p_sync] Error in handle Block: ' + err);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -25,6 +25,7 @@ function spec(b) {
|
|||
self.stringsKl = self.stringsK.length;
|
||||
};
|
||||
|
||||
|
||||
PoolMatch.prototype.match = function(buffer) {
|
||||
var self = this;
|
||||
|
||||
|
|
35
lib/Sync.js
35
lib/Sync.js
|
@ -13,20 +13,15 @@ function spec() {
|
|||
var async = require('async');
|
||||
|
||||
|
||||
function Sync() {}
|
||||
|
||||
Sync.prototype.init = function(opts, cb) {
|
||||
var self = this;
|
||||
self.opts = opts;
|
||||
function Sync(opts) {
|
||||
this.opts = opts || {};
|
||||
this.bDb = new BlockDb(opts);
|
||||
this.txDb = new TransactionDb(opts);
|
||||
this.txDb.on('tx_for_address', this.handleTxForAddress.bind(this));
|
||||
this.txDb.on('new_tx', this.handleNewTx.bind(this));
|
||||
this.bDb.on('new_block', this.handleNewBlock.bind(this));
|
||||
|
||||
this.network = config.network === 'testnet' ? networks.testnet : networks.livenet;
|
||||
return cb();
|
||||
};
|
||||
}
|
||||
|
||||
Sync.prototype.close = function(cb) {
|
||||
var self = this;
|
||||
|
@ -74,7 +69,8 @@ function spec() {
|
|||
*
|
||||
* A-B-C-D-E(TIP) ... NEW
|
||||
*
|
||||
* NEW is ignored
|
||||
* NEW is ignored (if allowReorgs is false)
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
|
@ -100,15 +96,11 @@ function spec() {
|
|||
},
|
||||
function(c) {
|
||||
if (!allowReorgs) return c();
|
||||
|
||||
self.bDb.has(newPrev, function(err, val) {
|
||||
if (!val && newPrev.match(/^0+$/)) return c();
|
||||
/// AQUI! -> return is reor...
|
||||
//
|
||||
// needSync
|
||||
// =>
|
||||
|
||||
return c(err ||
|
||||
(!val ? new Error('WARN: Ignoring block with non existing prev:' + b.hash) : null));
|
||||
(!val ? new Error('NEED_SYNC Ignoring block with non existing prev:' + b.hash) : null));
|
||||
});
|
||||
},
|
||||
function(c) {
|
||||
|
@ -126,7 +118,6 @@ function spec() {
|
|||
},
|
||||
function(c) {
|
||||
if (!needReorg) return c();
|
||||
|
||||
self.bDb.getNext(newPrev, function(err, val) {
|
||||
if (err) return c(err);
|
||||
oldNext = val;
|
||||
|
@ -142,13 +133,17 @@ function spec() {
|
|||
self.processReorg(oldTip, oldNext, newPrev, c);
|
||||
},
|
||||
function(c) {
|
||||
if (!allowReorgs) return c();
|
||||
self.bDb.setTip(b.hash, function(err) {
|
||||
if (err) return c(err);
|
||||
self.bDb.setNext(newPrev, b.hash, function(err) {
|
||||
return c(err);
|
||||
});
|
||||
return c(err);
|
||||
});
|
||||
},
|
||||
function(c) {
|
||||
self.bDb.setNext(newPrev, b.hash, function(err) {
|
||||
return c(err);
|
||||
});
|
||||
}
|
||||
|
||||
],
|
||||
function(err) {
|
||||
if (err && err.toString().match(/WARN/)) {
|
||||
|
|
|
@ -9,22 +9,12 @@ describe('PeerSync', function() {
|
|||
|
||||
beforeEach(function(done) {
|
||||
ps = new PeerSync();
|
||||
ps.init({
|
||||
verbose: false
|
||||
}, done);
|
||||
done();
|
||||
});
|
||||
afterEach(function() {
|
||||
ps.close();
|
||||
});
|
||||
|
||||
describe('#init()', function() {
|
||||
it('should return with no errors', function() {
|
||||
var other_ps = new PeerSync();
|
||||
expect(other_ps.init.bind(other_ps)).not.to.
|
||||
throw (Error);
|
||||
other_ps.close();
|
||||
});
|
||||
});
|
||||
|
||||
describe('#handleInv()', function() {
|
||||
var inv_info = {
|
||||
|
|
10
util/p2p.js
10
util/p2p.js
|
@ -10,17 +10,9 @@ var program = require('commander');
|
|||
|
||||
program
|
||||
.version(PROGRAM_VERSION)
|
||||
.option('-N --network [testnet]', 'Set bitcoin network [testnet]', 'testnet')
|
||||
.option('-V --verbose', 'Verbose', 1)
|
||||
.parse(process.argv);
|
||||
|
||||
var ps = new PeerSync();
|
||||
ps.init(program, function(err){
|
||||
if (err) {
|
||||
console.log(err);
|
||||
process.exit(1);
|
||||
}
|
||||
ps.run();
|
||||
});
|
||||
ps.run();
|
||||
|
||||
|
||||
|
|
22
util/sync.js
22
util/sync.js
|
@ -14,34 +14,28 @@ program
|
|||
.version(SYNC_VERSION)
|
||||
.option('-D --destroy', 'Remove current DB (and start from there)', 0)
|
||||
.option('-S --startfile', 'Number of file from bitcoind to start(default=0)')
|
||||
.option('-R --rpc', 'Force sync with RPC')
|
||||
.option('-v --verbose', 'Verbose 0/1', 0)
|
||||
.parse(process.argv);
|
||||
|
||||
var historicSync = new HistoricSync();
|
||||
|
||||
/* TODO: Sure?
|
||||
if (program.remove) {
|
||||
|
||||
}
|
||||
*/
|
||||
async.series([
|
||||
function(cb) {
|
||||
historicSync.init(program, cb);
|
||||
if (!program.destroy) return cb();
|
||||
console.log('Deleting Sync DB...');
|
||||
historicSync.sync.destroy(cb);
|
||||
},
|
||||
function(cb) {
|
||||
historicSync.smartImport({
|
||||
destroy: program.destroy,
|
||||
startFile: program.startfile,
|
||||
historicSync.start({
|
||||
forceStartFile: program.startfile,
|
||||
forceRPC: program.rpc,
|
||||
},cb);
|
||||
},
|
||||
],
|
||||
function(err) {
|
||||
historicSync.close();
|
||||
if (err) {
|
||||
console.log('CRITICAL ERROR: ', historicSync.info());
|
||||
}
|
||||
else {
|
||||
console.log('Finished.\n Status:\n', historicSync.info());
|
||||
}
|
||||
if (err) console.log('CRITICAL ERROR: ', historicSync.info());
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in New Issue