Merge pull request #178 from matiu/feature/file-syncing2
Feature/file syncing2
This commit is contained in:
commit
6c7ce4b3fa
|
@ -7,6 +7,7 @@ var mongoose = require('mongoose'),
|
|||
Schema = mongoose.Schema,
|
||||
RpcClient = require('bitcore/RpcClient').class(),
|
||||
util = require('bitcore/util/util'),
|
||||
async = require('async'),
|
||||
BitcoreBlock= require('bitcore/Block').class(),
|
||||
TransactionOut = require('./TransactionOut'),
|
||||
config = require('../../config/config')
|
||||
|
@ -85,14 +86,26 @@ BlockSchema.statics.customCreate = function(block, cb) {
|
|||
|
||||
newBlock.time = block.time ? block.time : Math.round(new Date().getTime() / 1000);
|
||||
newBlock.hashStr = block.hash;
|
||||
newBlock.isOrphan = block.isOrphan;
|
||||
newBlock.nextBlockHashStr = block.nextBlockHash;
|
||||
|
||||
TransactionOut.createFromArray(block.tx, function(err, inserted_txs, update_addrs) {
|
||||
if (err) return cb(err);
|
||||
var insertedTxs, updateAddrs;
|
||||
|
||||
newBlock.save(function(err) {
|
||||
return cb(err, newBlock, inserted_txs, update_addrs);
|
||||
async.series([
|
||||
function(a_cb) {
|
||||
TransactionOut.createFromTxs(block.tx, block.isOrphan,
|
||||
function(err, inInsertedTxs, inUpdateAddrs) {
|
||||
insertedTxs = inInsertedTxs;
|
||||
updateAddrs = inUpdateAddrs;
|
||||
return a_cb(err);
|
||||
});
|
||||
}, function(a_cb) {
|
||||
newBlock.save(function(err) {
|
||||
return a_cb(err);
|
||||
});
|
||||
}],
|
||||
function (err) {
|
||||
return cb(err, newBlock, insertedTxs, updateAddrs);
|
||||
});
|
||||
};
|
||||
|
||||
|
|
|
@ -24,13 +24,16 @@ var TransactionOutSchema = new Schema({
|
|||
index: true,
|
||||
},
|
||||
value_sat: Number,
|
||||
fromOrphan: Boolean,
|
||||
|
||||
spendTxIdBuf: Buffer,
|
||||
spendIndex: Number,
|
||||
spendFromOrphan: Boolean,
|
||||
});
|
||||
|
||||
|
||||
// Compound index
|
||||
|
||||
TransactionOutSchema.index({txidBuf: 1, index: 1}, {unique: true, sparse: true});
|
||||
TransactionOutSchema.index({spendTxIdBuf: 1, spendIndex: 1}, {unique: true, sparse: true});
|
||||
|
||||
|
@ -91,27 +94,57 @@ TransactionOutSchema.statics.removeFromTxId = function(txid, cb) {
|
|||
|
||||
|
||||
|
||||
TransactionOutSchema.statics._explodeTransactionOuts = function(txid, cb) {
|
||||
TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, fromOrphan, cb) {
|
||||
|
||||
var Self = this;
|
||||
var addrs = [];
|
||||
var is_new = true;
|
||||
|
||||
// Is it from genesis block? (testnet==livenet)
|
||||
// TODO: parse it from networks.genesisTX
|
||||
if (txid === genesisTXID) return cb();
|
||||
if (txInfo.hash) {
|
||||
|
||||
TransactionRpc.getRpcInfo(txid, function(err, info) {
|
||||
// adapt bitcore TX object to bitcoind JSON response
|
||||
txInfo.txid = txInfo.hash;
|
||||
|
||||
if (err || !info) return cb(err);
|
||||
var count = 0;
|
||||
txInfo.vin = txInfo.in.map(function (txin) {
|
||||
var i = {};
|
||||
|
||||
if (txin.coinbase) {
|
||||
txInfo.isCoinBase = true;
|
||||
}
|
||||
else {
|
||||
i.txid= txin.prev_out.hash;
|
||||
i.vout= txin.prev_out.n;
|
||||
};
|
||||
i.n = count++;
|
||||
return i;
|
||||
});
|
||||
|
||||
|
||||
count = 0;
|
||||
txInfo.vout = txInfo.out.map(function (txout) {
|
||||
var o = {};
|
||||
|
||||
o.value = txout.value;
|
||||
o.n = count++;
|
||||
|
||||
if (txout.addrStr){
|
||||
o.scriptPubKey = {};
|
||||
o.scriptPubKey.addresses = [txout.addrStr];
|
||||
}
|
||||
return o;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
var bTxId = new Buffer(txInfo.txid,'hex');
|
||||
|
||||
var bTxId = new Buffer(txid,'hex');
|
||||
|
||||
async.series([
|
||||
// Input Outputs (mark them as spended)
|
||||
// Input Outpoints (mark them as spended)
|
||||
function(p_c) {
|
||||
if (info.isCoinBase) return p_c();
|
||||
async.forEachLimit(info.vin, CONCURRENCY,
|
||||
if (txInfo.isCoinBase) return p_c();
|
||||
async.forEachLimit(txInfo.vin, CONCURRENCY,
|
||||
function(i, next_out) {
|
||||
var b = new Buffer(i.txid,'hex');
|
||||
var data = {
|
||||
|
@ -121,12 +154,13 @@ TransactionOutSchema.statics._explodeTransactionOuts = function(txid, cb) {
|
|||
spendTxIdBuf: bTxId,
|
||||
spendIndex: i.n,
|
||||
};
|
||||
if (fromOrphan) data.spendFromOrphan = true;
|
||||
Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out);
|
||||
},
|
||||
function (err) {
|
||||
if (err) {
|
||||
if (!err.message.match(/E11000/)) {
|
||||
console.log('ERR at TX %s: %s', txid, err);
|
||||
console.log('ERR at TX %s: %s', txInfo.txid, err);
|
||||
return cb(err);
|
||||
}
|
||||
}
|
||||
|
@ -135,7 +169,7 @@ TransactionOutSchema.statics._explodeTransactionOuts = function(txid, cb) {
|
|||
},
|
||||
// Parse Outputs
|
||||
function(p_c) {
|
||||
async.forEachLimit(info.vout, CONCURRENCY,
|
||||
async.forEachLimit(txInfo.vout, CONCURRENCY,
|
||||
function(o, next_out) {
|
||||
if (o.value && o.scriptPubKey &&
|
||||
o.scriptPubKey.addresses &&
|
||||
|
@ -143,10 +177,10 @@ TransactionOutSchema.statics._explodeTransactionOuts = function(txid, cb) {
|
|||
! o.scriptPubKey.addresses[1] // TODO : not supported
|
||||
){
|
||||
|
||||
// This is only to broadcast
|
||||
if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
|
||||
addrs.push(o.scriptPubKey.addresses[0]);
|
||||
}
|
||||
// This is only to broadcast (WIP)
|
||||
// if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
|
||||
// addrs.push(o.scriptPubKey.addresses[0]);
|
||||
// }
|
||||
|
||||
var data = {
|
||||
txidBuf: bTxId,
|
||||
|
@ -155,10 +189,11 @@ TransactionOutSchema.statics._explodeTransactionOuts = function(txid, cb) {
|
|||
value_sat : o.value * util.COIN,
|
||||
addr : o.scriptPubKey.addresses[0],
|
||||
};
|
||||
if (fromOrphan) data.fromOrphan = true;
|
||||
Self.update({txidBuf: bTxId, index: o.n}, data, {upsert: true}, next_out);
|
||||
}
|
||||
else {
|
||||
console.log ('WARN in TX: %s could not parse OUTPUT %d', txid, o.n);
|
||||
console.log ('WARN in TX: %s could not parse OUTPUT %d', txInfo.txid, o.n);
|
||||
return next_out();
|
||||
}
|
||||
},
|
||||
|
@ -168,42 +203,62 @@ TransactionOutSchema.statics._explodeTransactionOuts = function(txid, cb) {
|
|||
is_new = false;
|
||||
}
|
||||
else {
|
||||
console.log('ERR at TX %s: %s', txid, err);
|
||||
console.log('ERR at TX %s: %s', txInfo.txid, err);
|
||||
return cb(err);
|
||||
}
|
||||
}
|
||||
return p_c();
|
||||
});
|
||||
}], function() {
|
||||
return cb(null, addrs, is_new);
|
||||
});
|
||||
}], function(err) {
|
||||
return cb(err, addrs, is_new);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
TransactionOutSchema.statics.createFromArray = function(txs, next) {
|
||||
|
||||
// txs can be a [hashes] or [txObjects]
|
||||
TransactionOutSchema.statics.createFromTxs = function(txs, fromOrphan, next) {
|
||||
var Self = this;
|
||||
|
||||
if (typeof fromOrphan === 'function') {
|
||||
next = fromOrphan;
|
||||
fromOrphan = false;
|
||||
}
|
||||
|
||||
if (!txs) return next();
|
||||
|
||||
var inserted_txs = [];
|
||||
var updated_addrs = {};
|
||||
|
||||
async.forEachLimit(txs, CONCURRENCY, function(txid, cb, was_new) {
|
||||
async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) {
|
||||
|
||||
Self._explodeTransactionOuts( txid, function(err, addrs) {
|
||||
var txInfo;
|
||||
|
||||
if (err) return next(err);
|
||||
|
||||
if (was_new) {
|
||||
inserted_txs.push(txid);
|
||||
addrs.each(function(a) {
|
||||
if ( !updated_addrs[a]) updated_addrs[a] = [];
|
||||
updated_addrs[a].push(txid);
|
||||
});
|
||||
async.series([
|
||||
function(a_cb) {
|
||||
if (typeof t !== 'string') {
|
||||
txInfo = t;
|
||||
return a_cb();
|
||||
}
|
||||
|
||||
return cb();
|
||||
// Is it from genesis block? (testnet==livenet)
|
||||
// TODO: parse it from networks.genesisTX?
|
||||
if (t === genesisTXID) return a_cb();
|
||||
|
||||
TransactionRpc.getRpcInfo(t, function(err, inInfo) {
|
||||
txInfo =inInfo;
|
||||
return a_cb(err);
|
||||
});
|
||||
},
|
||||
function(a_cb) {
|
||||
if (!txInfo) return a_cb();
|
||||
|
||||
Self.storeTransactionOuts(txInfo, fromOrphan, function(err, addrs) {
|
||||
if (err) return a_cb(err);
|
||||
return a_cb();
|
||||
});
|
||||
}],
|
||||
function(err) {
|
||||
return each_cb(err);
|
||||
});
|
||||
},
|
||||
function(err) {
|
||||
|
|
|
@ -31,6 +31,7 @@ module.exports = {
|
|||
host: process.env.BITCOIND_HOST || '127.0.0.1',
|
||||
port: process.env.BITCOIND_PORT || '18332',
|
||||
p2pPort: process.env.BITCOIND_P2P_PORT || '18333',
|
||||
dataDir: process.env.BITCOIND_DATADIR || './testnet3',
|
||||
|
||||
// DO NOT CHANGE THIS!
|
||||
disableAgent: true
|
||||
|
|
|
@ -25,7 +25,7 @@ mongoose.connection.on('open', function() {
|
|||
|
||||
var b = new Buffer(hash,'hex');
|
||||
|
||||
T.createFromArray([hash], function(err, ret) {
|
||||
T.createFromTxs([hash], function(err, ret) {
|
||||
|
||||
console.log('Err:');
|
||||
console.log(err);
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
#!/usr/bin/env node
|
||||
'use strict';
|
||||
|
||||
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
|
||||
|
||||
var assert = require('assert'),
|
||||
config = require('../config/config'),
|
||||
BlockExtractor = require('../lib/BlockExtractor').class(),
|
||||
networks = require('bitcore/networks'),
|
||||
util = require('bitcore/util/util');
|
||||
|
||||
var be = new BlockExtractor(config.bitcoind.dataDir, config.network);
|
||||
var network = config.network === 'testnet' ? networks.testnet: networks.livenet;
|
||||
// console.log('[read_block.js.13]', be.nextFile() );
|
||||
|
||||
var c=0;
|
||||
while (c++ < 100) {
|
||||
be.getNextBlock(function(err, b) {
|
||||
console.log('[read_block.js.14]',err, c, b?util.formatHashAlt(b.hash):''); //TODO
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,156 @@
|
|||
'use strict';
|
||||
|
||||
require('classtool');
|
||||
|
||||
function spec() {
|
||||
var Block = require('bitcore/block').class(),
|
||||
networks = require('bitcore/networks'),
|
||||
Parser = require('bitcore/util/BinaryParser').class(),
|
||||
fs = require('fs'),
|
||||
Buffer = require('buffer').Buffer,
|
||||
glob = require('glob'),
|
||||
async = require('async');
|
||||
|
||||
function BlockExtractor(dataDir, network) {
|
||||
|
||||
var self = this;
|
||||
var path = dataDir + '/blocks/blk*.dat';
|
||||
|
||||
self.dataDir = dataDir;
|
||||
self.files = glob.sync(path);
|
||||
self.nfiles = self.files.length;
|
||||
|
||||
if (self.nfiles === 0)
|
||||
throw new Error('Could not find block files at: ' + path);
|
||||
|
||||
self.currentFileIndex = 0;
|
||||
self.isCurrentRead = false;
|
||||
self.currentBuffer = null;
|
||||
self.currentParser = null;
|
||||
self.network = network === 'testnet' ? networks.testnet: networks.livenet;
|
||||
self.magic = self.network.magic.toString('hex');
|
||||
}
|
||||
|
||||
BlockExtractor.prototype.currentFile = function() {
|
||||
var self = this;
|
||||
|
||||
return self.files[self.currentFileIndex];
|
||||
};
|
||||
|
||||
|
||||
BlockExtractor.prototype.nextFile = function() {
|
||||
var self = this;
|
||||
|
||||
if (self.currentFileIndex < 0) return false;
|
||||
|
||||
var ret = true;
|
||||
|
||||
self.isCurrentRead = false;
|
||||
self.currentBuffer = null;
|
||||
self.currentParser = null;
|
||||
|
||||
if (self.currentFileIndex < self.nfiles - 1) {
|
||||
self.currentFileIndex++;
|
||||
}
|
||||
else {
|
||||
self.currentFileIndex=-1;
|
||||
ret = false;
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
|
||||
BlockExtractor.prototype.readCurrentFileSync = function() {
|
||||
var self = this;
|
||||
|
||||
if (self.currentFileIndex < 0 || self.isCurrentRead) return;
|
||||
|
||||
|
||||
self.isCurrentRead = true;
|
||||
|
||||
var fname = self.currentFile();
|
||||
if (!fname) return;
|
||||
|
||||
|
||||
var stats = fs.statSync(fname);
|
||||
|
||||
var size = stats.size;
|
||||
|
||||
console.log('Reading Blockfile %s [%d MB]',
|
||||
fname, parseInt(size/1024/1024));
|
||||
|
||||
var fd = fs.openSync(fname, 'r');
|
||||
|
||||
var buffer = new Buffer(size);
|
||||
|
||||
fs.readSync(fd, buffer, 0, size, 0);
|
||||
|
||||
self.currentBuffer = buffer;
|
||||
self.currentParser = new Parser(buffer);
|
||||
};
|
||||
|
||||
|
||||
|
||||
BlockExtractor.prototype.getNextBlock = function(cb) {
|
||||
var self = this;
|
||||
|
||||
var b;
|
||||
var magic;
|
||||
async.series([
|
||||
function (a_cb) {
|
||||
|
||||
async.whilst(
|
||||
function() {
|
||||
return (!magic);
|
||||
},
|
||||
function(w_cb) {
|
||||
|
||||
self.readCurrentFileSync();
|
||||
|
||||
if (self.currentFileIndex < 0) return cb();
|
||||
|
||||
|
||||
magic = self.currentParser ? self.currentParser.buffer(4).toString('hex')
|
||||
: null ;
|
||||
|
||||
if (!self.currentParser || self.currentParser.eof()) {
|
||||
magic = null;
|
||||
if (self.nextFile()) {
|
||||
console.log('Moving forward to file:' + self.currentFile() );
|
||||
return w_cb();
|
||||
}
|
||||
else {
|
||||
console.log('Finished all files');
|
||||
return cb();
|
||||
}
|
||||
}
|
||||
else {
|
||||
return w_cb();
|
||||
}
|
||||
}, a_cb);
|
||||
},
|
||||
function (a_cb) {
|
||||
if (magic !== self.magic) {
|
||||
var e = new Error('CRITICAL ERROR: Magic number mismatch: ' +
|
||||
magic + '!=' + self.magic);
|
||||
return a_cb(e);
|
||||
}
|
||||
|
||||
// spacer?
|
||||
self.currentParser.word32le();
|
||||
return a_cb();
|
||||
},
|
||||
function (a_cb) {
|
||||
b = new Block();
|
||||
b.parse(self.currentParser);
|
||||
b.getHash();
|
||||
return a_cb();
|
||||
},
|
||||
], function(err) {
|
||||
return cb(err,b);
|
||||
});
|
||||
};
|
||||
|
||||
return BlockExtractor;
|
||||
}
|
||||
module.defineClass(spec);
|
||||
|
|
@ -7,12 +7,16 @@ require('classtool');
|
|||
function spec() {
|
||||
var util = require('util');
|
||||
var RpcClient = require('bitcore/RpcClient').class();
|
||||
var bitutil = require('bitcore/util/util');
|
||||
var Address = require('bitcore/Address').class();
|
||||
var Script = require('bitcore/Script').class();
|
||||
var networks = require('bitcore/networks');
|
||||
var async = require('async');
|
||||
var config = require('../config/config');
|
||||
var Block = require('../app/models/Block');
|
||||
var Sync = require('./Sync').class();
|
||||
var sockets = require('../app/controllers/socket.js');
|
||||
var BlockExtractor = require('./BlockExtractor.js').class();
|
||||
|
||||
|
||||
var BAD_GEN_ERROR = 'Bad genesis block. Network mismatch between Insight and bitcoind? Insight is configured for:';
|
||||
|
@ -95,7 +99,7 @@ function spec() {
|
|||
};
|
||||
};
|
||||
|
||||
HistoricSync.prototype.showProgress = function() {
|
||||
HistoricSync.prototype.showProgress = function(height) {
|
||||
var self = this;
|
||||
|
||||
if (self.error) {
|
||||
|
@ -105,7 +109,7 @@ function spec() {
|
|||
self.syncPercentage = parseFloat(100 * (self.syncedBlocks + self.skippedBlocks) / self.blockChainHeight).toFixed(3);
|
||||
if (self.syncPercentage > 100) self.syncPercentage = 100;
|
||||
|
||||
p(util.format('status: [%d%%] skipped: %d', self.syncPercentage, self.skippedBlocks));
|
||||
p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks, height));
|
||||
}
|
||||
if (self.opts.shouldBroadcast) {
|
||||
sockets.broadcastSyncInfo(self.info());
|
||||
|
@ -120,7 +124,6 @@ function spec() {
|
|||
|
||||
var existed = false;
|
||||
var blockInfo;
|
||||
var blockObj;
|
||||
|
||||
async.series([
|
||||
// Already got it?
|
||||
|
@ -132,7 +135,6 @@ function spec() {
|
|||
}
|
||||
if (block) {
|
||||
existed = true;
|
||||
blockObj = block;
|
||||
}
|
||||
return c();
|
||||
});
|
||||
|
@ -145,22 +147,20 @@ function spec() {
|
|||
|
||||
return c();
|
||||
},
|
||||
//get Info from RPC
|
||||
function(c) {
|
||||
|
||||
// TODO: if we store prev/next, no need to go to RPC
|
||||
// if (blockObj && blockObj.nextBlockHash) return c();
|
||||
function(c) {
|
||||
self.rpc.getBlock(blockHash, function(err, ret) {
|
||||
if (err) return c(err);
|
||||
|
||||
blockInfo = ret;
|
||||
blockInfo = ret ? ret.result : null;
|
||||
return c();
|
||||
});
|
||||
},
|
||||
//store it
|
||||
function(c) {
|
||||
if (existed) return c();
|
||||
self.sync.storeBlock(blockInfo.result, function(err) {
|
||||
|
||||
self.sync.storeBlock(blockInfo, function(err) {
|
||||
|
||||
existed = err && err.toString().match(/E11000/);
|
||||
|
||||
|
@ -185,6 +185,7 @@ function spec() {
|
|||
self.status = 'aborted';
|
||||
self.showProgress();
|
||||
p(self.err);
|
||||
return cb(err);
|
||||
}
|
||||
else {
|
||||
self.err = null;
|
||||
|
@ -200,7 +201,7 @@ function spec() {
|
|||
}
|
||||
|
||||
// Continue
|
||||
if (blockInfo && blockInfo.result) {
|
||||
if (blockInfo) {
|
||||
|
||||
if (existed)
|
||||
self.skippedBlocks++;
|
||||
|
@ -208,16 +209,167 @@ function spec() {
|
|||
self.syncedBlocks++;
|
||||
|
||||
// recursion
|
||||
if (scanOpts.prev && blockInfo.result.previousblockhash)
|
||||
return self.getPrevNextBlock(blockInfo.result.previousblockhash, blockEnd, scanOpts, cb);
|
||||
if (scanOpts.prev && blockInfo.previousblockhash)
|
||||
return self.getPrevNextBlock(blockInfo.previousblockhash, blockEnd, scanOpts, cb);
|
||||
|
||||
if (scanOpts.next && blockInfo.result.nextblockhash)
|
||||
return self.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, scanOpts, cb);
|
||||
if (scanOpts.next && blockInfo.nextblockhash)
|
||||
return self.getPrevNextBlock(blockInfo.nextblockhash, blockEnd, scanOpts, cb);
|
||||
}
|
||||
return cb(err);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
// TODO. replace with
|
||||
// Script.prototype.getAddrStrs if that one get merged in bitcore
|
||||
HistoricSync.prototype.getAddrStr = function(s) {
|
||||
var self = this;
|
||||
|
||||
var addrStrs = [];
|
||||
var type = s.classify();
|
||||
var addr;
|
||||
|
||||
switch(type) {
|
||||
case Script.TX_PUBKEY:
|
||||
var chunk = s.captureOne();
|
||||
addr = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk));
|
||||
addrStrs = [ addr.toString() ];
|
||||
break;
|
||||
case Script.TX_PUBKEYHASH:
|
||||
addr = new Address(self.network.addressPubkey, s.captureOne());
|
||||
addrStrs = [ addr.toString() ];
|
||||
break;
|
||||
case Script.TX_SCRIPTHASH:
|
||||
addr = new Address(self.network.addressScript, s.captureOne());
|
||||
addrStrs = [ addr.toString() ];
|
||||
break;
|
||||
case Script.TX_MULTISIG:
|
||||
var addrs = [];
|
||||
var chunks = s.capture();
|
||||
|
||||
chunks.forEach(function(chunk) {
|
||||
var a = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk));
|
||||
addrStrs.push(a.toString());
|
||||
});
|
||||
break;
|
||||
case Script.TX_UNKNOWN:
|
||||
break;
|
||||
}
|
||||
|
||||
return addrStrs;
|
||||
};
|
||||
|
||||
|
||||
HistoricSync.prototype.getBlockFromFile = function(height, scanOpts, cb) {
|
||||
var self = this;
|
||||
|
||||
var nextHash;
|
||||
var blockInfo;
|
||||
var isMainChain;
|
||||
var existed;
|
||||
|
||||
async.series([
|
||||
// Is it in mainchain?
|
||||
function(c) {
|
||||
self.rpc.getBlockHash(height, function(err, res) {
|
||||
if (err) return cb(err);
|
||||
|
||||
nextHash = res.result;
|
||||
return c();
|
||||
});
|
||||
},
|
||||
//show some (inacurate) status
|
||||
function(c) {
|
||||
if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) {
|
||||
self.showProgress(height);
|
||||
}
|
||||
|
||||
return c();
|
||||
},
|
||||
//get Info
|
||||
function(c) {
|
||||
self.blockExtractor.getNextBlock(function(err, b) {
|
||||
if (err || ! b) return c(err);
|
||||
|
||||
blockInfo = b.getStandardizedObject(b.txs, self.network);
|
||||
|
||||
var ti=0;
|
||||
// Get TX Address
|
||||
b.txs.forEach(function(t) {
|
||||
var objTx = blockInfo.tx[ti++];
|
||||
var to=0;
|
||||
t.outs.forEach( function(o) {
|
||||
|
||||
|
||||
var s = new Script(o.s);
|
||||
var addrs = self.getAddrStr(s);
|
||||
|
||||
// support only p2pubkey p2pubkeyhash and p2sh
|
||||
if (addrs.length === 1) {
|
||||
objTx.out[to].addrStr = addrs[0];
|
||||
}
|
||||
to++;
|
||||
});
|
||||
});
|
||||
|
||||
return c();
|
||||
});
|
||||
},
|
||||
//store it
|
||||
function(c) {
|
||||
|
||||
isMainChain = blockInfo.hash === nextHash;
|
||||
|
||||
blockInfo.isOrphan = !isMainChain;
|
||||
|
||||
/*
|
||||
* In file sync, orphan blocks are just ignored.
|
||||
* This is to simplify our schema and the
|
||||
* sync process
|
||||
*/
|
||||
if (blockInfo.isOrphan) return c();
|
||||
|
||||
self.sync.storeBlock(blockInfo, function(err) {
|
||||
existed = err && err.toString().match(/E11000/);
|
||||
|
||||
if (err && ! existed) return c(err);
|
||||
return c();
|
||||
});
|
||||
},
|
||||
], function(err) {
|
||||
|
||||
if (err) {
|
||||
self.err = util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockInfo.hash, err, self.syncedBlocks);
|
||||
self.status = 'aborted';
|
||||
self.showProgress();
|
||||
p(err);
|
||||
return cb(err);
|
||||
}
|
||||
else {
|
||||
|
||||
// Continue
|
||||
if (blockInfo) {
|
||||
|
||||
// mainchain
|
||||
if (isMainChain) height++;
|
||||
|
||||
self.syncedBlocks++;
|
||||
self.err = null;
|
||||
self.status = 'syncing';
|
||||
|
||||
return self.getBlockFromFile(height, scanOpts, cb);
|
||||
}
|
||||
else {
|
||||
self.err = null;
|
||||
self.status = 'finished';
|
||||
}
|
||||
}
|
||||
return cb(err);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
|
||||
HistoricSync.prototype.importHistory = function(scanOpts, next) {
|
||||
var self = this;
|
||||
|
||||
|
@ -285,6 +437,12 @@ function spec() {
|
|||
p(' to : ', end);
|
||||
p(' scanOpts: ', JSON.stringify(scanOpts));
|
||||
|
||||
if (scanOpts.fromFiles) {
|
||||
self.getBlockFromFile(0, scanOpts, function(err) {
|
||||
return next(err);
|
||||
});
|
||||
}
|
||||
else {
|
||||
self.getPrevNextBlock(start, end, scanOpts, function(err) {
|
||||
if (err && err.message.match(/ECONNREFUSED/)) {
|
||||
setTimeout(function() {
|
||||
|
@ -296,6 +454,7 @@ function spec() {
|
|||
else return next(err);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!self.step) {
|
||||
|
@ -321,25 +480,31 @@ function spec() {
|
|||
};
|
||||
|
||||
// upto if we have genesis block?
|
||||
HistoricSync.prototype.smartImport = function(next) {
|
||||
HistoricSync.prototype.smartImport = function(scanOpts, next) {
|
||||
var self = this;
|
||||
|
||||
Block.fromHash(self.genesis, function(err, b) {
|
||||
|
||||
if (err) return next(err);
|
||||
|
||||
if (!b) {
|
||||
|
||||
if (!b || scanOpts.destroy) {
|
||||
p('Could not find Genesis block. Running FULL SYNC');
|
||||
if (config.bitcoind.dataDir) {
|
||||
p('bitcoind dataDir configured...importing blocks from .dat files');
|
||||
scanOpts.fromFiles = true;
|
||||
self.blockExtractor = new BlockExtractor(config.bitcoind.dataDir, config.network);
|
||||
}
|
||||
else {
|
||||
scanOpts.reverse = true;
|
||||
}
|
||||
}
|
||||
else {
|
||||
p('Genesis block found. Syncing upto known blocks.');
|
||||
scanOpts.reverse = true;
|
||||
scanOpts.upToExisting = true;
|
||||
}
|
||||
|
||||
var scanOpts = {
|
||||
reverse: true,
|
||||
upToExisting: b ? true: false,
|
||||
};
|
||||
|
||||
return self.importHistory(scanOpts, next);
|
||||
});
|
||||
};
|
||||
|
|
|
@ -65,6 +65,7 @@ function spec() {
|
|||
function(b) { try {self.db.collections.transactionouts.drop(b);} catch (e) { return b(); } },
|
||||
], next);
|
||||
};
|
||||
|
||||
Sync.prototype.storeBlock = function(block, cb) {
|
||||
var self = this;
|
||||
|
||||
|
@ -106,7 +107,7 @@ function spec() {
|
|||
Sync.prototype.storeTxs = function(txs, cb) {
|
||||
var self = this;
|
||||
|
||||
TransactionOut.createFromArray(txs, function(err, inserted_txs, updated_addrs) {
|
||||
TransactionOut.createFromTxs(txs, function(err, inserted_txs, updated_addrs) {
|
||||
if (err) return cb(err);
|
||||
|
||||
self._handleBroadcast(null, inserted_txs, updated_addrs);
|
||||
|
|
|
@ -52,6 +52,7 @@
|
|||
},
|
||||
"dependencies": {
|
||||
"async": "*",
|
||||
"glob": "*",
|
||||
"classtool": "*",
|
||||
"commander": "*",
|
||||
"bignum": "*",
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
#!/usr/bin/env node
|
||||
'use strict';
|
||||
|
||||
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
|
||||
|
||||
|
||||
|
||||
var assert = require('assert'),
|
||||
config = require('../../config/config'),
|
||||
BlockExtractor = require('../../lib/BlockExtractor').class(),
|
||||
networks = require('bitcore/networks'),
|
||||
util = require('bitcore/util/util');
|
||||
|
||||
//var txItemsValid = JSON.parse(fs.readFileSync('test/model/txitems.json'));
|
||||
|
||||
describe('TransactionOut', function(){
|
||||
|
||||
var be = new BlockExtractor(config.bitcoind.dataDir, config.network);
|
||||
|
||||
var network = config.network === 'testnet' ? networks.testnet: networks.livenet;
|
||||
|
||||
it('should glob block files ', function(done) {
|
||||
assert(be.files.length>0);
|
||||
done();
|
||||
});
|
||||
|
||||
var lastTs;
|
||||
|
||||
it('should read genesis block ', function(done) {
|
||||
be.getNextBlock(function(err,b) {
|
||||
assert(!err);
|
||||
var genesisHashReversed = new Buffer(32);
|
||||
network.genesisBlock.hash.copy(genesisHashReversed);
|
||||
var genesis = util.formatHashFull(network.genesisBlock.hash);
|
||||
|
||||
assert.equal(util.formatHashFull(b.hash),genesis);
|
||||
assert.equal(b.nounce,network.genesisBlock.nounce);
|
||||
assert.equal(b.timestamp,network.genesisBlock.timestamp);
|
||||
assert.equal(b.merkle_root.toString('hex'),network.genesisBlock.merkle_root.toString('hex'));
|
||||
|
||||
lastTs = b.timestamp;
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should read next testnet block ', function(done) {
|
||||
be.getNextBlock(function(err,b) {
|
||||
assert(!err);
|
||||
assert(b.timestamp > lastTs, 'timestamp > genesis_ts');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should read 100000 blocks with no error ', function(done) {
|
||||
|
||||
var i=0;
|
||||
while(i++<100000) {
|
||||
be.getNextBlock(function(err,b) {
|
||||
assert(!err,err);
|
||||
assert(lastTs < b.timestamp, 'genesisTS < b.timestamp: ' + lastTs + '<' + b.timestamp + ":" + i);
|
||||
if(i % 1000 === 1) process.stdout.write('.');
|
||||
if(i === 100000) done();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
|
||||
});
|
||||
|
||||
|
||||
|
|
@ -16,6 +16,7 @@ program
|
|||
.option('-D --destroy', 'Remove current DB (and start from there)', 0)
|
||||
.option('-R --reverse', 'Sync backwards', 0)
|
||||
.option('-U --uptoexisting', 'Sync only until an existing block is found', 0)
|
||||
.option('-F --fromfiles', 'Sync using bitcoind .dat block files (faster)', 0)
|
||||
.parse(process.argv);
|
||||
|
||||
var historicSync = new HistoricSync();
|
||||
|
@ -32,13 +33,16 @@ async.series([
|
|||
function(cb) {
|
||||
|
||||
if (program.smart) {
|
||||
historicSync.smartImport(cb);
|
||||
historicSync.smartImport({
|
||||
destroy: program.destroy,
|
||||
},cb);
|
||||
}
|
||||
else {
|
||||
historicSync.importHistory({
|
||||
destroy: program.destroy,
|
||||
reverse: program.reverse,
|
||||
upToExisting: program.uptoexisting,
|
||||
fromFiles: program.fromfiles,
|
||||
}, cb);
|
||||
}
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue