Merge pull request #57 from matiu/feature/04sync

awesome!
This commit is contained in:
Mario Colque 2014-01-15 12:43:50 -08:00
commit 8f9fdc1a6e
6 changed files with 203 additions and 169 deletions

View File

@ -8,9 +8,13 @@ var mongoose = require('mongoose'),
RpcClient = require('bitcore/RpcClient').class(),
util = require('bitcore/util/util'),
BitcoreBlock= require('bitcore/Block').class(),
Transaction = require('./Transaction'),
async = require('async'),
config = require('../../config/config')
;
var CONCURRENCY = 5;
/**
* Block Schema
*/
@ -25,7 +29,6 @@ var BlockSchema = new Schema({
unique: true,
},
time: Number,
fromP2P: Boolean,
});
/**
@ -43,9 +46,18 @@ BlockSchema.path('title').validate(function(title) {
*/
BlockSchema.statics.createTimestamped = function(block, cb) {
var that = this;
var now = Math.round(new Date().getTime() / 1000);
block.time = now;
this.create(block, cb);
var BlockSchema = mongoose.model('Block', BlockSchema);
var newBlock = new that();
newBlock.time = now;
Transaction.createFromArray(block.tx, function(err, inserted_txs) {
if (err) return cb(err);
newBlock.save(cb);
});
};
BlockSchema.statics.load = function(id, cb) {

View File

@ -30,15 +30,12 @@ var TransactionSchema = new Schema({
index: true,
unique: true,
},
processed: {
type: Boolean,
default: false,
index: true,
},
/* TODO?
orphaned: {
type: Boolean,
default: false,
},
*/
time: Number,
});
@ -71,10 +68,10 @@ TransactionSchema.statics.fromIdWithInfo = function(txid, cb) {
tx = new That();
tx.txid = txid;
tx.queryInfo(function(err, txInfo) {
tx.fillInfo(function(err, txInfo) {
if (!txInfo)
return cb(new Error('TX not found1'));
return cb(new Error('TX not found'));
tx.save(function(err) {
return cb(err,tx);
@ -82,7 +79,7 @@ TransactionSchema.statics.fromIdWithInfo = function(txid, cb) {
});
}
else {
tx.queryInfo(function(err) {
tx.fillInfo(function(err) {
return cb(err,tx);
});
}
@ -94,59 +91,63 @@ TransactionSchema.statics.createFromArray = function(txs, next) {
var that = this;
if (!txs) return next();
var mongo_txs = [];
async.forEach(txs,
function(tx, cb) {
var now = Math.round(new Date().getTime() / 1000);
that.create({ txid: tx, time: now }, function(err, new_tx) {
if (err) {
if (err.toString().match(/E11000/)) {
return cb();
}
return cb(err);
}
mongo_txs.push(new_tx);
var now = Math.round(new Date().getTime() / 1000);
async.forEachLimit(txs, CONCURRENCY, function(txid, cb) {
that.explodeTransactionItems( txid, function(err) {
that.create({txid: txid, time: now}, function(err, new_tx) {
//console.log("created:", err, new_tx);
if (err && ! err.toString().match(/E11000/)) return cb(err);
if (new_tx) mongo_txs.push(new_tx);
return cb();
});
},
function(err) {
return next(err, mongo_txs);
}
);
})
},
function(err) {
return next(err, mongo_txs);
});
};
TransactionSchema.statics.explodeTransactionItems = function(txid, cb) {
this.fromIdWithInfo(txid, function(err, t) {
if (err || !t) return cb(err);
this.queryInfo(txid, function(err, info) {
//console.log("INFO",info);
if (err || !info) return cb(err);
var index = 0;
t.info.vin.forEach( function(i){
info.vin.forEach( function(i){
i.n = index++;
});
async.forEachLimit(t.info.vin, CONCURRENCY, function(i, next_in) {
async.forEachLimit(info.vin, CONCURRENCY, function(i, next_in) {
if (i.addr && i.value) {
//console.log("Creating IN %s %d", i.addr, i.valueSat);
TransactionItem.create({
txid : t.txid,
txid : txid,
value_sat : -1 * i.valueSat,
addr : i.addr,
index : i.n,
ts : t.info.time,
ts : info.time,
}, next_in);
}
else {
if ( !i.coinbase ) {
console.log ('TX: %s,%d could not parse INPUT', t.txid, i.n);
console.log ('TX: %s,%d could not parse INPUT', txid, i.n);
}
return next_in();
}
},
function (err) {
if (err) console.log (err);
async.forEachLimit(t.info.vout, CONCURRENCY, function(o, next_out) {
async.forEachLimit(info.vout, CONCURRENCY, function(o, next_out) {
/*
* TODO Support multisigs
@ -154,15 +155,15 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) {
if (o.value && o.scriptPubKey && o.scriptPubKey.addresses && o.scriptPubKey.addresses[0]) {
//console.log("Creating OUT %s %d", o.scriptPubKey.addresses[0], o.valueSat);
TransactionItem.create({
txid : t.txid,
txid : txid,
value_sat : o.valueSat,
addr : o.scriptPubKey.addresses[0],
index : o.n,
ts : t.info.time,
ts : info.time,
}, next_out);
}
else {
console.log ('TX: %s,%d could not parse OUTPUT', t.txid, o.n);
console.log ('TX: %s,%d could not parse OUTPUT', txid, o.n);
return next_out();
}
},
@ -175,14 +176,13 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) {
TransactionSchema.methods.fillInputValues = function (tx, next) {
TransactionSchema.statics.getOutpoints = function (tx, next) {
if (tx.isCoinBase()) return next();
if (! this.rpc) this.rpc = new RpcClient(config.bitcoind);
var rpc = new RpcClient(config.bitcoind);
var network = ( config.network === 'testnet') ? networks.testnet : networks.livenet ;
var that = this;
async.forEachLimit(tx.ins, CONCURRENCY, function(i, cb) {
var outHash = i.getOutpointHash();
@ -190,7 +190,7 @@ TransactionSchema.methods.fillInputValues = function (tx, next) {
var outHashBase64 = outHash.reverse().toString('hex');
var c=0;
that.rpc.getRawTransaction(outHashBase64, function(err, txdata) {
rpc.getRawTransaction(outHashBase64, function(err, txdata) {
var txin = new Transaction();
if (err || ! txdata.result) return cb( new Error('Input TX '+outHashBase64+' not found'));
@ -229,42 +229,40 @@ TransactionSchema.methods.fillInputValues = function (tx, next) {
);
};
TransactionSchema.methods.queryInfo = function (next) {
var that = this;
TransactionSchema.statics.queryInfo = function(txid, cb) {
var that = this;
var network = ( config.network === 'testnet') ? networks.testnet : networks.livenet ;
this.rpc = new RpcClient(config.bitcoind);
var rpc = new RpcClient(config.bitcoind);
rpc.getRawTransaction(txid, 1, function(err, txInfo) {
if (err) return cb(err);
this.rpc.getRawTransaction(this.txid, 1, function(err, txInfo) {
if (err) return next(err);
that.info = txInfo.result;
var info = txInfo.result;
// Transaction parsing
var b = new Buffer(txInfo.result.hex,'hex');
var tx = new Transaction();
tx.parse(b);
that.fillInputValues(tx, function(err) {
that.getOutpoints(tx, function(err) {
if (err) return cb(err);
// Copy TX relevant values to .info
var c = 0;
var valueIn = bignum(0);
var valueOut = bignum(0);
if ( tx.isCoinBase() ) {
that.info.isCoinBase = true;
info.isCoinBase = true;
}
else {
tx.ins.forEach(function(i) {
if (i.value) {
that.info.vin[c].value = util.formatValue(i.value);
info.vin[c].value = util.formatValue(i.value);
var n = util.valueToBigInt(i.value).toNumber();
that.info.vin[c].valueSat = n;
info.vin[c].valueSat = n;
valueIn = valueIn.add( n );
var scriptSig = i.getScript();
@ -275,11 +273,11 @@ TransactionSchema.methods.queryInfo = function (next) {
var pubKeyHash = util.sha256ripe160(pubKey);
var addr = new Address(network.addressPubkey, pubKeyHash);
var addrStr = addr.toString();
that.info.vin[c].addr = addrStr;
info.vin[c].addr = addrStr;
}
else {
if (i.addrFromOutput)
that.info.vin[c].addr = i.addrFromOutput;
info.vin[c].addr = i.addrFromOutput;
}
}
else {
@ -294,33 +292,42 @@ TransactionSchema.methods.queryInfo = function (next) {
var n = util.valueToBigInt(i.v).toNumber();
valueOut = valueOut.add(n);
that.info.vout[c].valueSat = n;
info.vout[c].valueSat = n;
c++;
});
that.info.valueOut = valueOut / util.COIN;
info.valueOut = valueOut / util.COIN;
if ( !tx.isCoinBase() ) {
that.info.valueIn = valueIn / util.COIN;
that.info.feeds = (valueIn - valueOut) / util.COIN;
info.valueIn = valueIn / util.COIN;
info.feeds = (valueIn - valueOut) / util.COIN;
}
else {
var reward = BitcoreBlock.getBlockValue(that.info.height) / util.COIN;
that.info.vin[0].reward = reward;
that.info.valueIn = reward;
var reward = BitcoreBlock.getBlockValue(info.height) / util.COIN;
info.vin[0].reward = reward;
info.valueIn = reward;
}
info.size = b.length;
that.info.size = b.length;
return next(err, that.info);
return cb(null, info);
});
});
};
TransactionSchema.methods.fillInfo = function(next) {
var that = this;
mongoose.model('Transaction', TransactionSchema).queryInfo(that.txid, function(err, info) {
if (err) return next(err);
that.info = info;
return next();
});
};
module.exports = mongoose.model('Transaction', TransactionSchema);

1
config/env/test.js vendored
View File

@ -12,6 +12,7 @@ module.exports = {
pass: process.env.BITCOIND_PASS || 'real_mystery',
host: process.env.BITCOIND_HOST || '127.0.0.1',
port: process.env.BITCOIND_PORT || '18332',
keepConnectionAlive: false,
},
network: 'testnet',
}

View File

@ -69,22 +69,21 @@ function spec() {
var block = info.message.block;
var blockHash = coinUtil.formatHashFull(block.calcHash());
console.log('[p2p_sync] Handle block: ' + blockHash);
var tx_hashes = block.txs.map(function(tx) {
return coinUtil.formatHashFull(tx.hash);
});
this.sync.storeBlock({
'hash': blockHash,
'fromP2P': true,
'tx': tx_hashes,
},
function(err) {
if (err) {
console.log('[p2p_sync] Error in handle Block: ' + err);
} else {
// if no errors importing block, import the transactions
var hashes = block.txs.map(function(tx) {
return coinUtil.formatHashFull(tx.hash);
});
self.sync.storeTxs(hashes, function() {});
}
});
};
PeerSync.prototype.handle_connected = function(data) {

View File

@ -14,104 +14,130 @@ function spec() {
var Transaction = require('../app/models/Transaction');
var TransactionItem = require('../app/models/TransactionItem');
var sockets = require('../app/views/sockets/main.js');
var CONCURRENCY = 1;
var CONCURRENCY = 5;
function Sync(config) {
this.tx_count =0;
this.network = config.networkName === 'testnet' ? networks.testnet: networks.livenet;
this.tx_count = 0;
this.block_count= 0;
this.block_total= 0;
this.network = config.networkName === 'testnet' ? networks.testnet: networks.livenet;
}
var progress_bar = function(string, current, total) {
console.log(util.format('\t%s %d/%d [%d%%]', string, current, total, parseInt(100 * current / total)));
};
Sync.prototype.getNextBlock = function(blockHash, cb) {
Sync.prototype.getPrevNextBlock = function(blockHash, blockEnd, opts, cb) {
var that = this;
if (!blockHash) {
// recursion end.
if (!blockHash || (blockEnd && blockEnd == blockHash) ) {
return cb();
}
this.rpc.getBlock(blockHash, function(err, blockInfo) {
if (err) return cb(err);
if (blockInfo.result.height % 1000 === 0) {
var h = blockInfo.result.height,
d = blockInfo.result.confirmations;
progress_bar(util.format('Height [txs:%d]',that.tx_count), h, h + d);
}
that.storeBlock(blockInfo.result, function(err, existed) {
if (!err) {
var txs = blockInfo.result.tx;
that.storeTxs(txs, function(err) {
if (!err)
return that.getNextBlock(blockInfo.result.nextblockhash, cb);
});
var existed = 0;
var blockInfo;
var blockObj;
async.series([
// Already got it?
function(c) {
Block.findOne({hash:blockHash}, function(err,block){
if (err) { console.log(err); return c(err); };
if (block) {
existed = 1;
blockObj = block;
}
return c();
});
},
//show some (inacurate) status
function(c) {
if (that.block_count++ % 1000 === 0) {
progress_bar('Historic sync status:', that.block_count, that.block_total);
}
else {
if (err.toString().match(/E11000/))
return that.getNextBlock(blockInfo.result.nextblockhash, cb);
else
return cb(err);
return c();
},
//get Info from RPC
function(c) {
// TODO: if we store prev/next, no need to go to RPC
// if (blockObj && blockObj.nextBlockHash) return c();
that.rpc.getBlock(blockHash, function(err, ret) {
if (err) return c(err);
blockInfo = ret;
return c();
});
},
//store it
function(c) {
if (existed) return c();
that.storeBlock(blockInfo.result, function(err, block) {
existed = err && err.toString().match(/E11000/);
if (err && ! existed) return c(err);
return c();
});
},
/* TODO: Should Start to sync backwards? (this is for partial syncs)
function(c) {
if (blockInfo.result.prevblockhash != current.blockHash) {
console.log("reorg?");
opts.prev = 1;
}
});
return c();
}
*/
],
function (err){
if (err)
console.log("ERROR: @%s: %s [count: block_count: %d]", blockHash, err, that.block_count);
if (blockInfo && blockInfo.result) {
if (opts.prev && blockInfo.result.prevblockhash)
return that.getPrevNextBlock(blockInfo.result.prevblockhash, blockEnd, opts, cb);
if (opts.next && blockInfo.result.nextblockhash)
return that.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, opts, cb);
}
return cb(err);
});
};
Sync.prototype.storeBlock = function(block, cb) {
var that = this;
Block.createTimestamped(block, function(err, b){
if (b && that.opts.broadcast_blocks) {
sockets.broadcast_block(b);
}
});
};
Sync.prototype.storeTxs = function(txids, cb) {
var that=this;
Transaction.createFromArray(txids, function(err, inserted_txs) {
if (err) return cb(err);
async.forEachLimit(inserted_txs, CONCURRENCY, function(new_tx, next) {
var txid = new_tx.txid;
if (that.opts.broadcast_txs) {
if (that.opts.broadcast_txs) {
block.tx.each(function(tx) {
sockets.broadcast_tx(new_tx);
}
// This will trigger an RPC call
Transaction.explodeTransactionItems( txid, function(err) {
that.tx_count++;
next(err);
});
},
function(err) {
return cb();
});
}
that.tx_count += block.tx.length;
return cb();
});
};
Sync.prototype.syncBlocks = function( cb) {
Sync.prototype.syncBlocks = function(start, end, cb) {
var that = this;
var genesisHash = this.network.genesisBlock.hash.reverse().toString('hex');
console.log('Syncing Blocks... ' );
console.log('Syncing Blocks, starting from: %s end: %s ',start, end);
Block.findOne(
{ 'fromP2P':{$in:[null, false]} },
{},
{
sort: {
'time': - 1
}
},
function(err, block) {
if (err) return cb(err);
var nextHash = block && block.hash ? block.hash: genesisHash;
console.log('\tStarting at hash: ' + nextHash);
return that.getNextBlock(nextHash, cb);
});
return that.getPrevNextBlock(start, end, { next: 1 }, cb);
};
// This is not currently used. Transactions are represented by txid only
@ -267,11 +293,20 @@ function spec() {
cb();
}
},
function(cb) {
that.rpc.getInfo(function(err, res) {
if (err) cb(err);
that.block_total = res.result.blocks;
return cb();
});
},
function(cb) {
function sync() {
that.syncBlocks( function(err) {
var startingBlockHash = that.network.genesisBlock.hash.reverse().toString('hex');
that.syncBlocks( startingBlockHash, null, function(err) {
if (err && err.message.match(/ECONNREFUSED/) && retry_attemps--){
setTimeout(function() {
@ -280,7 +315,7 @@ function spec() {
}, retry_secs * 1000);
}
else
return next(err);
return next(err, that.block_count);
});
}
@ -288,28 +323,8 @@ function spec() {
sync();
}
},
/* Exploding happens on block insertion
function(cb) {
if (! opts.skip_txs) {
that.processTXs(opts.reindex, cb);
}
else {
return cb();
}
}
*/
/* We dont sync any contents from TXs, only their IDs are stored
function(cb) {
if (! opts.skip_txs) {
that.syncTXs(opts.reindex, cb);
}
else {
return cb();
}
}
*/
], function(err) {
return next(err);
return next(err, that.block_count);
});
});
};

View File

@ -33,12 +33,12 @@ function(cb) {
cb();
},
function(cb) {
sync.import_history(program, function(err) {
sync.import_history(program, function(err, count) {
if (err) {
console.log('CRITICAL ERROR: ', err);
}
else {
console.log('Done!');
console.log('Done! [%d blocks]', count);
}
cb();
});