Merge pull request #61 from matiu/bug/05sync

Bug/05sync
This commit is contained in:
Manuel Aráoz 2014-01-16 08:03:26 -08:00
commit 6c3968869b
9 changed files with 96 additions and 42 deletions

View File

@ -26,6 +26,7 @@ var BlockSchema = new Schema({
unique: true,
},
time: Number,
nextBlockHash: String,
});
/**
@ -42,19 +43,23 @@ BlockSchema.path('title').validate(function(title) {
* Statics
*/
BlockSchema.statics.createTimestamped = function(block, cb) {
BlockSchema.statics.customCreate = function(block, cb) {
var That= this;
var now = Math.round(new Date().getTime() / 1000);
var BlockSchema = mongoose.model('Block', BlockSchema);
var newBlock = new That();
newBlock.time = now;
Transaction.createFromArray(block.tx, function(err, inserted_txs) {
var newBlock = new That();
newBlock.time = block.time ? block.time : Math.round(new Date().getTime() / 1000);
newBlock.hash = block.hash;
newBlock.nextBlockHash = block.nextBlockHash;
Transaction.createFromArray(block.tx, newBlock.time, function(err, inserted_txs) {
if (err) return cb(err);
newBlock.save(function(err) {
return cb(err, inserted_txs);
return cb(err, newBlock, inserted_txs);
});
});
};

View File

@ -87,18 +87,17 @@ TransactionSchema.statics.fromIdWithInfo = function(txid, cb) {
};
TransactionSchema.statics.createFromArray = function(txs, next) {
TransactionSchema.statics.createFromArray = function(txs, time, next) {
var that = this;
if (!txs) return next();
var mongo_txs = [];
var now = Math.round(new Date().getTime() / 1000);
async.forEachLimit(txs, CONCURRENCY, function(txid, cb) {
that.explodeTransactionItems( txid, function(err) {
that.explodeTransactionItems( txid, time, function(err) {
if (err) return next(err);
that.create({txid: txid, time: now}, function(err, new_tx) {
that.create({txid: txid, time: time}, function(err, new_tx) {
if (err && ! err.toString().match(/E11000/)) return cb(err);
if (new_tx) mongo_txs.push(new_tx);
@ -112,7 +111,7 @@ TransactionSchema.statics.createFromArray = function(txs, next) {
};
TransactionSchema.statics.explodeTransactionItems = function(txid, cb) {
TransactionSchema.statics.explodeTransactionItems = function(txid, time, cb) {
this.queryInfo(txid, function(err, info) {
if (err || !info) return cb(err);
@ -131,7 +130,7 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) {
value_sat : -1 * i.valueSat,
addr : i.addr,
index : i.n,
ts : info.time,
ts : time,
}, next_in);
}
else {
@ -155,7 +154,7 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) {
value_sat : o.valueSat,
addr : o.scriptPubKey.addresses[0],
index : o.n,
ts : info.time,
ts : time,
}, next_out);
}
else {

View File

@ -11,6 +11,7 @@ module.exports = {
pass: process.env.BITCOIND_PASS || 'real_mystery',
host: process.env.BITCOIND_HOST || '127.0.0.1',
port: process.env.BITCOIND_PORT || '18332',
disableAgent: true,
},
network: 'testnet',
}

2
config/env/test.js vendored
View File

@ -12,7 +12,7 @@ module.exports = {
pass: process.env.BITCOIND_PASS || 'real_mystery',
host: process.env.BITCOIND_HOST || '127.0.0.1',
port: process.env.BITCOIND_PORT || '18332',
keepConnectionAlive: false,
disableAgent: true,
},
network: 'testnet',
}

View File

@ -57,7 +57,7 @@ function spec() {
PeerSync.prototype.handle_tx = function(info) {
var tx = info.message.tx.getStandardizedObject();
console.log('[p2p_sync] Handle tx: ' + tx.hash);
this.sync.storeTxs([tx.hash], function(err) {
this.sync.storeTxs([tx.hash], null, function(err) {
if (err) {
console.log('[p2p_sync] Error in handle TX: ' + err);
}
@ -78,6 +78,7 @@ function spec() {
this.sync.storeBlock({
'hash': blockHash,
'tx': tx_hashes,
// TODO NEXT BLOCK / PREV BLOCK?
},
function(err) {
if (err) {

View File

@ -29,10 +29,12 @@ function spec() {
};
Sync.prototype.getPrevNextBlock = function(blockHash, blockEnd, opts, cb) {
var that = this;
// recursion end.
if (!blockHash || (blockEnd && blockEnd == blockHash) ) {
console.log("Reach end:", blockHash, blockEnd);
return cb();
}
@ -100,8 +102,9 @@ function spec() {
console.log("ERROR: @%s: %s [count: block_count: %d]", blockHash, err, that.block_count);
if (blockInfo && blockInfo.result) {
if (opts.prev && blockInfo.result.prevblockhash)
return that.getPrevNextBlock(blockInfo.result.prevblockhash, blockEnd, opts, cb);
if (opts.prev && blockInfo.result.previousblockhash) {
return that.getPrevNextBlock(blockInfo.result.previousblockhash, blockEnd, opts, cb);
}
if (opts.next && blockInfo.result.nextblockhash)
return that.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, opts, cb);
@ -113,30 +116,53 @@ function spec() {
Sync.prototype.storeBlock = function(block, cb) {
var that = this;
Block.createTimestamped(block, function(err, b){
Block.customCreate(block, function(err, block, inserted_txs){
if (b && that.opts.broadcast_blocks) {
sockets.broadcast_block(b);
if (block && that.opts.broadcast_blocks) {
sockets.broadcast_block(block);
}
if (that.opts.broadcast_txs) {
block.tx.each(function(tx) {
sockets.broadcast_tx(new_tx);
if (inserted_txs && that.opts.broadcast_txs) {
inserted_txs.forEach(function(tx) {
sockets.broadcast_tx(tx);
});
}
that.tx_count += block.tx.length;
if (inserted_txs)
that.tx_count += inserted_txs.length;
return cb();
});
};
Sync.prototype.syncBlocks = function(start, end, cb) {
Sync.prototype.storeTxs = function(txs, inTime, cb) {
var that = this;
console.log('Syncing Blocks, starting from: %s end: %s ',start, end);
var time = inTime ? inTime : Math.round(new Date().getTime() / 1000);
return that.getPrevNextBlock(start, end, { next: 1 }, cb);
Transaction.createFromArray(txs, time, function(err, inserted_txs) {
if (!err && inserted_txs && that.opts.broadcast_txs) {
inserted_txs.forEach(function(tx) {
sockets.broadcast_tx(tx);
});
}
return cb(err);
});
};
Sync.prototype.syncBlocks = function(start, end, isForward, cb) {
var that = this;
console.log('Syncing Blocks, starting \n\tfrom: %s \n\tend: %s \n\tisForward:',
start, end, isForward);
return that.getPrevNextBlock( start, end,
isForward ? { next: 1 } : { prev: 1}, cb);
};
// This is not currently used. Transactions are represented by txid only
@ -227,7 +253,7 @@ function spec() {
}
// This will trigger an RPC call
Transaction.explodeTransactionItems( tx.txid, function(err) {
Transaction.explodeTransactionItems( tx.txid, tx.time, function(err) {
if (proc++ % 1000 === 0) progress_bar('\tproc', pull, total);
next(err);
});
@ -266,6 +292,8 @@ function spec() {
var retry_attemps = 100;
var retry_secs = 2;
var block_best;
this.db.once('open', function() {
async.series([
function(cb) {
@ -273,7 +301,7 @@ function spec() {
console.log('Deleting Blocks...');
that.db.collections.blocks.drop(cb);
} else {
cb();
return cb();
}
},
function(cb) {
@ -281,7 +309,7 @@ function spec() {
console.log('Deleting TXs...');
that.db.collections.transactions.drop(cb);
} else {
cb();
return cb();
}
},
function(cb) {
@ -289,7 +317,7 @@ function spec() {
console.log('Deleting TXItems...');
that.db.collections.transactionitems.drop(cb);
} else {
cb();
return cb();
}
},
function(cb) {
@ -301,11 +329,34 @@ function spec() {
});
},
function(cb) {
if (!opts.reverse) return cb();
that.rpc.getBestBlockHash(function(err, res) {
if (err) cb(err);
block_best = res.result;
return cb();
});
},
], function(err) {
function sync() {
var startingBlockHash = that.network.genesisBlock.hash.reverse().toString('hex');
var start, end, isForward;
that.syncBlocks( startingBlockHash, null, function(err) {
if (opts.reverse) {
start = block_best;
end = that.network.genesisBlock.hash.reverse().toString('hex');
isForward = false;
}
else {
start = that.network.genesisBlock.hash.reverse().toString('hex');
end = null;
isForward = true;
}
that.syncBlocks(start, end, isForward, function(err) {
if (err && err.message.match(/ECONNREFUSED/) && retry_attemps--){
setTimeout(function() {
@ -321,9 +372,6 @@ function spec() {
if (!opts.skip_blocks) {
sync();
}
},
], function(err) {
return next(err, that.block_count);
});
});
};

View File

@ -118,7 +118,8 @@ describe('Transaction', function(){
// Remove first
TransactionItem.remove({txid: v.txid}, function(err) {
Transaction.explodeTransactionItems(v.txid, function(err, tx) {
var now = Math.round(new Date().getTime() / 1000);
Transaction.explodeTransactionItems(v.txid, now, function(err, tx) {
if (err) done(err);
TransactionItem

View File

@ -11,7 +11,7 @@ var block_hash = process.argv[2] || '0000000000b6288775bbd326bedf324ca8717a15191
var rpc = new RpcClient(config.bitcoind);
var block = rpc.getBlock(block_hash, function(err, block) {
var block = rpc.getBestBlockHash( function(err, block) {
console.log("Err:");
console.log(err);

View File

@ -14,9 +14,8 @@ var async = require('async');
program
.version(SYNC_VERSION)
.option('-N --network [livenet]', 'Set bitcoin network [testnet]', 'testnet')
.option('-D --destroy', 'Remove current DB (and start from there)', '0')
.option('--skip_blocks', 'Sync blocks')
.option('--skip_txs', 'Sync transactions')
.option('-D --destroy', 'Remove current DB (and start from there)', 0)
.option('-R --reverse', 'Sync backwards', 0)
.parse(process.argv);
var sync = new Sync({
@ -38,7 +37,7 @@ function(cb) {
console.log('CRITICAL ERROR: ', err);
}
else {
console.log('Done! [%d blocks]', count);
console.log('Done! [%d blocks]', count, err);
}
cb();
});