From 213a8e55431fe9a8698d0f2e59187d62b2d8037f Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Wed, 15 Jan 2014 17:36:49 -0300 Subject: [PATCH] sync refactoring --- app/models/Block.js | 18 +++- app/models/Transaction.js | 139 ++++++++++++++------------- config/env/test.js | 1 + lib/PeerSync.js | 15 ++- lib/Sync.js | 195 ++++++++++++++++++++------------------ util/sync.js | 4 +- 6 files changed, 203 insertions(+), 169 deletions(-) diff --git a/app/models/Block.js b/app/models/Block.js index f3133295..15064228 100644 --- a/app/models/Block.js +++ b/app/models/Block.js @@ -8,9 +8,13 @@ var mongoose = require('mongoose'), RpcClient = require('bitcore/RpcClient').class(), util = require('bitcore/util/util'), BitcoreBlock= require('bitcore/Block').class(), + Transaction = require('./Transaction'), + async = require('async'), config = require('../../config/config') ; +var CONCURRENCY = 5; + /** * Block Schema */ @@ -25,7 +29,6 @@ var BlockSchema = new Schema({ unique: true, }, time: Number, - fromP2P: Boolean, }); /** @@ -43,9 +46,18 @@ BlockSchema.path('title').validate(function(title) { */ BlockSchema.statics.createTimestamped = function(block, cb) { + + var that = this; var now = Math.round(new Date().getTime() / 1000); - block.time = now; - this.create(block, cb); + + var BlockSchema = mongoose.model('Block', BlockSchema); + var newBlock = new that(); + newBlock.time = now; + + Transaction.createFromArray(block.tx, function(err, inserted_txs) { + if (err) return cb(err); + newBlock.save(cb); + }); }; BlockSchema.statics.load = function(id, cb) { diff --git a/app/models/Transaction.js b/app/models/Transaction.js index 219e79a7..79b309c9 100644 --- a/app/models/Transaction.js +++ b/app/models/Transaction.js @@ -30,15 +30,12 @@ var TransactionSchema = new Schema({ index: true, unique: true, }, - processed: { - type: Boolean, - default: false, - index: true, - }, +/* TODO? orphaned: { type: Boolean, default: false, }, + */ time: Number, }); @@ -71,10 +68,10 @@ TransactionSchema.statics.fromIdWithInfo = function(txid, cb) { tx = new That(); tx.txid = txid; - tx.queryInfo(function(err, txInfo) { + tx.fillInfo(function(err, txInfo) { if (!txInfo) - return cb(new Error('TX not found1')); + return cb(new Error('TX not found')); tx.save(function(err) { return cb(err,tx); @@ -82,7 +79,7 @@ TransactionSchema.statics.fromIdWithInfo = function(txid, cb) { }); } else { - tx.queryInfo(function(err) { + tx.fillInfo(function(err) { return cb(err,tx); }); } @@ -94,59 +91,63 @@ TransactionSchema.statics.createFromArray = function(txs, next) { var that = this; if (!txs) return next(); var mongo_txs = []; - async.forEach(txs, - function(tx, cb) { - var now = Math.round(new Date().getTime() / 1000); - that.create({ txid: tx, time: now }, function(err, new_tx) { - if (err) { - if (err.toString().match(/E11000/)) { - return cb(); - } - return cb(err); - } - mongo_txs.push(new_tx); + var now = Math.round(new Date().getTime() / 1000); + + async.forEachLimit(txs, CONCURRENCY, function(txid, cb) { + + that.explodeTransactionItems( txid, function(err) { + + that.create({txid: txid, time: now}, function(err, new_tx) { + + //console.log("created:", err, new_tx); + + if (err && ! err.toString().match(/E11000/)) return cb(err); + + if (new_tx) mongo_txs.push(new_tx); return cb(); }); - }, - function(err) { - return next(err, mongo_txs); - } - ); + }) + }, + function(err) { + return next(err, mongo_txs); + }); }; TransactionSchema.statics.explodeTransactionItems = function(txid, cb) { - this.fromIdWithInfo(txid, function(err, t) { - if (err || !t) return cb(err); + this.queryInfo(txid, function(err, info) { + + //console.log("INFO",info); + if (err || !info) return cb(err); var index = 0; - t.info.vin.forEach( function(i){ + info.vin.forEach( function(i){ i.n = index++; }); - async.forEachLimit(t.info.vin, CONCURRENCY, function(i, next_in) { + async.forEachLimit(info.vin, CONCURRENCY, function(i, next_in) { if (i.addr && i.value) { //console.log("Creating IN %s %d", i.addr, i.valueSat); TransactionItem.create({ - txid : t.txid, + txid : txid, value_sat : -1 * i.valueSat, addr : i.addr, index : i.n, - ts : t.info.time, + ts : info.time, }, next_in); } else { if ( !i.coinbase ) { - console.log ('TX: %s,%d could not parse INPUT', t.txid, i.n); + console.log ('TX: %s,%d could not parse INPUT', txid, i.n); } return next_in(); } }, function (err) { if (err) console.log (err); - async.forEachLimit(t.info.vout, CONCURRENCY, function(o, next_out) { + async.forEachLimit(info.vout, CONCURRENCY, function(o, next_out) { /* * TODO Support multisigs @@ -154,15 +155,15 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) { if (o.value && o.scriptPubKey && o.scriptPubKey.addresses && o.scriptPubKey.addresses[0]) { //console.log("Creating OUT %s %d", o.scriptPubKey.addresses[0], o.valueSat); TransactionItem.create({ - txid : t.txid, + txid : txid, value_sat : o.valueSat, addr : o.scriptPubKey.addresses[0], index : o.n, - ts : t.info.time, + ts : info.time, }, next_out); } else { - console.log ('TX: %s,%d could not parse OUTPUT', t.txid, o.n); + console.log ('TX: %s,%d could not parse OUTPUT', txid, o.n); return next_out(); } }, @@ -175,14 +176,13 @@ TransactionSchema.statics.explodeTransactionItems = function(txid, cb) { -TransactionSchema.methods.fillInputValues = function (tx, next) { +TransactionSchema.statics.getOutpoints = function (tx, next) { if (tx.isCoinBase()) return next(); - if (! this.rpc) this.rpc = new RpcClient(config.bitcoind); + var rpc = new RpcClient(config.bitcoind); var network = ( config.network === 'testnet') ? networks.testnet : networks.livenet ; - var that = this; async.forEachLimit(tx.ins, CONCURRENCY, function(i, cb) { var outHash = i.getOutpointHash(); @@ -190,7 +190,7 @@ TransactionSchema.methods.fillInputValues = function (tx, next) { var outHashBase64 = outHash.reverse().toString('hex'); var c=0; - that.rpc.getRawTransaction(outHashBase64, function(err, txdata) { + rpc.getRawTransaction(outHashBase64, function(err, txdata) { var txin = new Transaction(); if (err || ! txdata.result) return cb( new Error('Input TX '+outHashBase64+' not found')); @@ -229,42 +229,40 @@ TransactionSchema.methods.fillInputValues = function (tx, next) { ); }; -TransactionSchema.methods.queryInfo = function (next) { - var that = this; +TransactionSchema.statics.queryInfo = function(txid, cb) { + var that = this; var network = ( config.network === 'testnet') ? networks.testnet : networks.livenet ; - this.rpc = new RpcClient(config.bitcoind); + var rpc = new RpcClient(config.bitcoind); + rpc.getRawTransaction(txid, 1, function(err, txInfo) { + if (err) return cb(err); - this.rpc.getRawTransaction(this.txid, 1, function(err, txInfo) { - if (err) return next(err); - - that.info = txInfo.result; + var info = txInfo.result; // Transaction parsing var b = new Buffer(txInfo.result.hex,'hex'); var tx = new Transaction(); tx.parse(b); - that.fillInputValues(tx, function(err) { + that.getOutpoints(tx, function(err) { + if (err) return cb(err); // Copy TX relevant values to .info var c = 0; - - var valueIn = bignum(0); var valueOut = bignum(0); if ( tx.isCoinBase() ) { - that.info.isCoinBase = true; + info.isCoinBase = true; } else { tx.ins.forEach(function(i) { if (i.value) { - that.info.vin[c].value = util.formatValue(i.value); + info.vin[c].value = util.formatValue(i.value); var n = util.valueToBigInt(i.value).toNumber(); - that.info.vin[c].valueSat = n; + info.vin[c].valueSat = n; valueIn = valueIn.add( n ); var scriptSig = i.getScript(); @@ -275,11 +273,11 @@ TransactionSchema.methods.queryInfo = function (next) { var pubKeyHash = util.sha256ripe160(pubKey); var addr = new Address(network.addressPubkey, pubKeyHash); var addrStr = addr.toString(); - that.info.vin[c].addr = addrStr; + info.vin[c].addr = addrStr; } else { if (i.addrFromOutput) - that.info.vin[c].addr = i.addrFromOutput; + info.vin[c].addr = i.addrFromOutput; } } else { @@ -294,33 +292,42 @@ TransactionSchema.methods.queryInfo = function (next) { var n = util.valueToBigInt(i.v).toNumber(); valueOut = valueOut.add(n); - that.info.vout[c].valueSat = n; + info.vout[c].valueSat = n; c++; }); - that.info.valueOut = valueOut / util.COIN; + info.valueOut = valueOut / util.COIN; if ( !tx.isCoinBase() ) { - that.info.valueIn = valueIn / util.COIN; - that.info.feeds = (valueIn - valueOut) / util.COIN; + info.valueIn = valueIn / util.COIN; + info.feeds = (valueIn - valueOut) / util.COIN; } else { - var reward = BitcoreBlock.getBlockValue(that.info.height) / util.COIN; - that.info.vin[0].reward = reward; - that.info.valueIn = reward; + var reward = BitcoreBlock.getBlockValue(info.height) / util.COIN; + info.vin[0].reward = reward; + info.valueIn = reward; } + info.size = b.length; - that.info.size = b.length; - - - - return next(err, that.info); + return cb(null, info); }); }); }; +TransactionSchema.methods.fillInfo = function(next) { + var that = this; + + mongoose.model('Transaction', TransactionSchema).queryInfo(that.txid, function(err, info) { + if (err) return next(err); + + that.info = info; + return next(); + }); +}; + + module.exports = mongoose.model('Transaction', TransactionSchema); diff --git a/config/env/test.js b/config/env/test.js index dd7a70ee..6f34e9af 100755 --- a/config/env/test.js +++ b/config/env/test.js @@ -12,6 +12,7 @@ module.exports = { pass: process.env.BITCOIND_PASS || 'real_mystery', host: process.env.BITCOIND_HOST || '127.0.0.1', port: process.env.BITCOIND_PORT || '18332', + keepConnectionAlive: false, }, network: 'testnet', } diff --git a/lib/PeerSync.js b/lib/PeerSync.js index b0f0cc39..4727ed9c 100644 --- a/lib/PeerSync.js +++ b/lib/PeerSync.js @@ -69,22 +69,21 @@ function spec() { var block = info.message.block; var blockHash = coinUtil.formatHashFull(block.calcHash()); console.log('[p2p_sync] Handle block: ' + blockHash); + + + var tx_hashes = block.txs.map(function(tx) { + return coinUtil.formatHashFull(tx.hash); + }); + this.sync.storeBlock({ 'hash': blockHash, - 'fromP2P': true, + 'tx': tx_hashes, }, function(err) { if (err) { console.log('[p2p_sync] Error in handle Block: ' + err); - } else { - // if no errors importing block, import the transactions - var hashes = block.txs.map(function(tx) { - return coinUtil.formatHashFull(tx.hash); - }); - self.sync.storeTxs(hashes, function() {}); } }); - }; PeerSync.prototype.handle_connected = function(data) { diff --git a/lib/Sync.js b/lib/Sync.js index c09804ee..ecfd85fd 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -14,104 +14,130 @@ function spec() { var Transaction = require('../app/models/Transaction'); var TransactionItem = require('../app/models/TransactionItem'); var sockets = require('../app/views/sockets/main.js'); - var CONCURRENCY = 1; + var CONCURRENCY = 5; + function Sync(config) { - this.tx_count =0; - this.network = config.networkName === 'testnet' ? networks.testnet: networks.livenet; + this.tx_count = 0; + this.block_count= 0; + this.block_total= 0; + this.network = config.networkName === 'testnet' ? networks.testnet: networks.livenet; } var progress_bar = function(string, current, total) { console.log(util.format('\t%s %d/%d [%d%%]', string, current, total, parseInt(100 * current / total))); }; - Sync.prototype.getNextBlock = function(blockHash, cb) { + Sync.prototype.getPrevNextBlock = function(blockHash, blockEnd, opts, cb) { var that = this; - if (!blockHash) { + + // recursion end. + if (!blockHash || (blockEnd && blockEnd == blockHash) ) { return cb(); } - this.rpc.getBlock(blockHash, function(err, blockInfo) { - if (err) return cb(err); - if (blockInfo.result.height % 1000 === 0) { - var h = blockInfo.result.height, - d = blockInfo.result.confirmations; - progress_bar(util.format('Height [txs:%d]',that.tx_count), h, h + d); - } - that.storeBlock(blockInfo.result, function(err, existed) { - - if (!err) { - var txs = blockInfo.result.tx; - that.storeTxs(txs, function(err) { - if (!err) - return that.getNextBlock(blockInfo.result.nextblockhash, cb); - }); + var existed = 0; + var blockInfo; + var blockObj; + + async.series([ + // Already got it? + function(c) { + Block.findOne({hash:blockHash}, function(err,block){ + if (err) { console.log(err); return c(err); }; + + if (block) { + existed = 1; + blockObj = block; + } + + return c(); + }); + }, + //show some (inacurate) status + function(c) { + if (that.block_count++ % 1000 === 0) { + progress_bar('Historic sync status:', that.block_count, that.block_total); } - else { - if (err.toString().match(/E11000/)) - return that.getNextBlock(blockInfo.result.nextblockhash, cb); - else - return cb(err); + return c(); + }, + //get Info from RPC + function(c) { + + // TODO: if we store prev/next, no need to go to RPC + // if (blockObj && blockObj.nextBlockHash) return c(); + + that.rpc.getBlock(blockHash, function(err, ret) { + if (err) return c(err); + + blockInfo = ret; + return c(); + }); + }, + //store it + function(c) { + if (existed) return c(); + + that.storeBlock(blockInfo.result, function(err, block) { + existed = err && err.toString().match(/E11000/); + if (err && ! existed) return c(err); + return c(); + }); + }, + /* TODO: Should Start to sync backwards? (this is for partial syncs) + function(c) { + + if (blockInfo.result.prevblockhash != current.blockHash) { + console.log("reorg?"); + opts.prev = 1; } - }); + return c(); + } + */ + ], + function (err){ + + if (err) + console.log("ERROR: @%s: %s [count: block_count: %d]", blockHash, err, that.block_count); + + if (blockInfo && blockInfo.result) { + if (opts.prev && blockInfo.result.prevblockhash) + return that.getPrevNextBlock(blockInfo.result.prevblockhash, blockEnd, opts, cb); + + if (opts.next && blockInfo.result.nextblockhash) + return that.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, opts, cb); + } + return cb(err); }); }; Sync.prototype.storeBlock = function(block, cb) { var that = this; + Block.createTimestamped(block, function(err, b){ + if (b && that.opts.broadcast_blocks) { sockets.broadcast_block(b); } - }); - }; - Sync.prototype.storeTxs = function(txids, cb) { - var that=this; - Transaction.createFromArray(txids, function(err, inserted_txs) { - if (err) return cb(err); - async.forEachLimit(inserted_txs, CONCURRENCY, function(new_tx, next) { - var txid = new_tx.txid; - - if (that.opts.broadcast_txs) { + if (that.opts.broadcast_txs) { + block.tx.each(function(tx) { sockets.broadcast_tx(new_tx); - } - - - // This will trigger an RPC call - Transaction.explodeTransactionItems( txid, function(err) { - that.tx_count++; - next(err); }); - }, - function(err) { - return cb(); - }); + } + + that.tx_count += block.tx.length; + + return cb(); }); }; - Sync.prototype.syncBlocks = function( cb) { + Sync.prototype.syncBlocks = function(start, end, cb) { var that = this; - var genesisHash = this.network.genesisBlock.hash.reverse().toString('hex'); - console.log('Syncing Blocks... ' ); + console.log('Syncing Blocks, starting from: %s end: %s ',start, end); - Block.findOne( - { 'fromP2P':{$in:[null, false]} }, - {}, - { - sort: { - 'time': - 1 - } - }, - function(err, block) { - if (err) return cb(err); - - var nextHash = block && block.hash ? block.hash: genesisHash; - - console.log('\tStarting at hash: ' + nextHash); - return that.getNextBlock(nextHash, cb); - }); + return that.getPrevNextBlock(start, end, { next: 1 }, cb); }; // This is not currently used. Transactions are represented by txid only @@ -267,11 +293,20 @@ function spec() { cb(); } }, - + function(cb) { + that.rpc.getInfo(function(err, res) { + if (err) cb(err); + + that.block_total = res.result.blocks; + return cb(); + }); + }, function(cb) { function sync() { - that.syncBlocks( function(err) { + var startingBlockHash = that.network.genesisBlock.hash.reverse().toString('hex'); + + that.syncBlocks( startingBlockHash, null, function(err) { if (err && err.message.match(/ECONNREFUSED/) && retry_attemps--){ setTimeout(function() { @@ -280,7 +315,7 @@ function spec() { }, retry_secs * 1000); } else - return next(err); + return next(err, that.block_count); }); } @@ -288,28 +323,8 @@ function spec() { sync(); } }, -/* Exploding happens on block insertion - function(cb) { - if (! opts.skip_txs) { - that.processTXs(opts.reindex, cb); - } - else { - return cb(); - } - } -*/ -/* We dont sync any contents from TXs, only their IDs are stored - function(cb) { - if (! opts.skip_txs) { - that.syncTXs(opts.reindex, cb); - } - else { - return cb(); - } - } -*/ ], function(err) { - return next(err); + return next(err, that.block_count); }); }); }; diff --git a/util/sync.js b/util/sync.js index 3d69804c..95be4aa0 100755 --- a/util/sync.js +++ b/util/sync.js @@ -33,12 +33,12 @@ function(cb) { cb(); }, function(cb) { - sync.import_history(program, function(err) { + sync.import_history(program, function(err, count) { if (err) { console.log('CRITICAL ERROR: ', err); } else { - console.log('Done!'); + console.log('Done! [%d blocks]', count); } cb(); });