From 5ea85584c46e4dbb0247f10b45559118c4204e46 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Thu, 30 Jan 2014 11:50:05 -0300 Subject: [PATCH 1/7] git fixes --- app/models/Block.js | 21 +++- app/models/TransactionOut.js | 179 ++++++++++++++++-------------- config/env/development.js | 21 ++++ config/env/production.js | 22 ++++ config/env/test.js | 22 ++++ dev-util/explode_tx.js | 2 +- {util => dev-util}/status_info.js | 0 lib/HistoricSync.js | 22 +++- lib/Sync.js | 2 +- package.json | 1 + 10 files changed, 194 insertions(+), 98 deletions(-) create mode 100755 config/env/development.js create mode 100755 config/env/production.js create mode 100755 config/env/test.js rename {util => dev-util}/status_info.js (100%) diff --git a/app/models/Block.js b/app/models/Block.js index 030131dc..93016699 100644 --- a/app/models/Block.js +++ b/app/models/Block.js @@ -7,6 +7,7 @@ var mongoose = require('mongoose'), Schema = mongoose.Schema, RpcClient = require('bitcore/RpcClient').class(), util = require('bitcore/util/util'), + async = require('async'), BitcoreBlock= require('bitcore/Block').class(), TransactionOut = require('./TransactionOut'), config = require('../../config/config') @@ -87,12 +88,22 @@ BlockSchema.statics.customCreate = function(block, cb) { newBlock.hashStr = block.hash; newBlock.nextBlockHashStr = block.nextBlockHash; - TransactionOut.createFromArray(block.tx, function(err, inserted_txs, update_addrs) { - if (err) return cb(err); + var insertedTxs, updateAddrs; - newBlock.save(function(err) { - return cb(err, newBlock, inserted_txs, update_addrs); - }); + async.series([ + function(a_cb) { + TransactionOut.createFromTxs(block.tx, function(err, inInsertedTxs, inUpdateAddrs) { + insertedTxs = inInsertedTxs; + updateAddrs = inUpdateAddrs; + return a_cb(err); + }); + }, function(a_cb) { + newBlock.save(function(err) { + return a_cb(err); + }); + }], + function (err) { + return cb(err, newBlock, insertedTxs, updateAddrs); }); }; diff --git a/app/models/TransactionOut.js b/app/models/TransactionOut.js index 4bf4c61c..56548dc9 100644 --- a/app/models/TransactionOut.js +++ b/app/models/TransactionOut.js @@ -91,97 +91,90 @@ TransactionOutSchema.statics.removeFromTxId = function(txid, cb) { -TransactionOutSchema.statics._explodeTransactionOuts = function(txid, cb) { +TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) { var Self = this; var addrs = []; var is_new = true; - // Is it from genesis block? (testnet==livenet) - // TODO: parse it from networks.genesisTX - if (txid === genesisTXID) return cb(); - TransactionRpc.getRpcInfo(txid, function(err, info) { + var bTxId = new Buffer(txInfo.txid,'hex'); - if (err || !info) return cb(err); + async.series([ + // Input Outpoints (mark them as spended) + function(p_c) { + if (txInfo.isCoinBase) return p_c(); + async.forEachLimit(txInfo.vin, CONCURRENCY, + function(i, next_out) { + var b = new Buffer(i.txid,'hex'); + var data = { + txidBuf: b, + index: i.vout, - var bTxId = new Buffer(txid,'hex'); - - async.series([ - // Input Outputs (mark them as spended) - function(p_c) { - if (info.isCoinBase) return p_c(); - async.forEachLimit(info.vin, CONCURRENCY, - function(i, next_out) { - var b = new Buffer(i.txid,'hex'); - var data = { - txidBuf: b, - index: i.vout, - - spendTxIdBuf: bTxId, - spendIndex: i.n, - }; - Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out); - }, - function (err) { - if (err) { - if (!err.message.match(/E11000/)) { - console.log('ERR at TX %s: %s', txid, err); - return cb(err); - } + spendTxIdBuf: bTxId, + spendIndex: i.n, + }; + Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out); + }, + function (err) { + if (err) { + if (!err.message.match(/E11000/)) { + console.log('ERR at TX %s: %s', txInfo.txid, err); + return cb(err); } - return p_c(); - }); - }, - // Parse Outputs - function(p_c) { - async.forEachLimit(info.vout, CONCURRENCY, - function(o, next_out) { - if (o.value && o.scriptPubKey && - o.scriptPubKey.addresses && - o.scriptPubKey.addresses[0] && - ! o.scriptPubKey.addresses[1] // TODO : not supported - ){ + } + return p_c(); + }); + }, + // Parse Outputs + function(p_c) { + async.forEachLimit(txInfo.vout, CONCURRENCY, + function(o, next_out) { + if (o.value && o.scriptPubKey && + o.scriptPubKey.addresses && + o.scriptPubKey.addresses[0] && + ! o.scriptPubKey.addresses[1] // TODO : not supported + ){ - // This is only to broadcast - if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { - addrs.push(o.scriptPubKey.addresses[0]); - } + // This is only to broadcast + if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { + addrs.push(o.scriptPubKey.addresses[0]); + } - var data = { - txidBuf: bTxId, - index : o.n, + var data = { + txidBuf: bTxId, + index : o.n, - value_sat : o.value * util.COIN, - addr : o.scriptPubKey.addresses[0], - }; - Self.update({txidBuf: bTxId, index: o.n}, data, {upsert: true}, next_out); + value_sat : o.value * util.COIN, + addr : o.scriptPubKey.addresses[0], + }; + Self.update({txidBuf: bTxId, index: o.n}, data, {upsert: true}, next_out); + } + else { + console.log ('WARN in TX: %s could not parse OUTPUT %d', txInfo.txid, o.n); + return next_out(); + } + }, + function (err) { + if (err) { + if (err.message.match(/E11000/)) { + is_new = false; } else { - console.log ('WARN in TX: %s could not parse OUTPUT %d', txid, o.n); - return next_out(); + console.log('ERR at TX %s: %s', txInfo.txid, err); + return cb(err); } - }, - function (err) { - if (err) { - if (err.message.match(/E11000/)) { - is_new = false; - } - else { - console.log('ERR at TX %s: %s', txid, err); - return cb(err); - } - } - return p_c(); - }); - }], function() { - return cb(null, addrs, is_new); + } + return p_c(); }); + }], function(err) { + return cb(err, addrs, is_new); }); }; -TransactionOutSchema.statics.createFromArray = function(txs, next) { +// txs can be a [hashes] or [txObjects] +TransactionOutSchema.statics.createFromTxs = function(txs, next) { var Self = this; if (!txs) return next(); @@ -191,24 +184,38 @@ TransactionOutSchema.statics.createFromArray = function(txs, next) { async.forEachLimit(txs, CONCURRENCY, function(txid, cb, was_new) { - Self._explodeTransactionOuts( txid, function(err, addrs) { - - if (err) return next(err); - - if (was_new) { - inserted_txs.push(txid); - addrs.each(function(a) { - if ( !updated_addrs[a]) updated_addrs[a] = []; - updated_addrs[a].push(txid); + var txInfo; + async.series([ + function(a_cb) { + // Is it from genesis block? (testnet==livenet) + // TODO: parse it from networks.genesisTX? + if (txid === genesisTXID) return a_cb(); + TransactionRpc.getRpcInfo(txid, function(err, inInfo) { + txInfo =inInfo; + return a_cb(err); }); - } + }, + function(a_cb) { + Self.storeTransactionOuts(txInfo, function(err, addrs) { + if (err) return a_cb(err); - return cb(); + if (was_new) { + inserted_txs.push(txid); + addrs.each(function(a) { + if ( !updated_addrs[a]) updated_addrs[a] = []; + updated_addrs[a].push(txid); + }); + } + return a_cb(); + }); + }], + function(err) { + return cb(err); + }); + }, + function(err) { + return next(err, inserted_txs, updated_addrs); }); - }, - function(err) { - return next(err, inserted_txs, updated_addrs); - }); }; diff --git a/config/env/development.js b/config/env/development.js new file mode 100755 index 00000000..2e411109 --- /dev/null +++ b/config/env/development.js @@ -0,0 +1,21 @@ +'use strict'; + +module.exports = { + db: 'mongodb://localhost/insight-dev', + app: { + name: 'Insight - Development' + }, + bitcoind: { + protocol: process.env.BITCOIND_PROTO || 'http', + user: process.env.BITCOIND_USER || 'user', + pass: process.env.BITCOIND_PASS || 'pass', + host: process.env.BITCOIND_HOST || '127.0.0.1', + port: process.env.BITCOIND_PORT || '18332', + p2pPort: process.env.BITCOIND_P2P_PORT || '18333', + disableAgent: true, + dataDir: process.env.BITCOIND_DATADIR || './testnet3', + }, + network: process.env.INSIGHT_NETWORK || 'testnet', + disableP2pSync: false, + disableHistoricSync: false, +}; diff --git a/config/env/production.js b/config/env/production.js new file mode 100755 index 00000000..c679713b --- /dev/null +++ b/config/env/production.js @@ -0,0 +1,22 @@ +'use strict'; + +module.exports = { + db: 'mongodb://localhost/insight-test', + app: { + name: 'Insight - Prod' + }, + port: '3301', + bitcoind: { + protocol: process.env.BITCOIND_PROTO || 'http', + user: process.env.BITCOIND_USER || 'user', + pass: process.env.BITCOIND_PASS || 'pass', + host: process.env.BITCOIND_HOST || '127.0.0.1', + port: process.env.BITCOIND_PORT || '18332', + p2pPort: process.env.BITCOIND_P2P_PORT || '18333', + disableAgent: true, + dataDir: process.env.BITCOIND_DATADIR || './testnet3', + }, + network: process.env.INSIGHT_NETWORK || 'testnet', + disableP2pSync: false, + disableHistoricSync: false, +}; diff --git a/config/env/test.js b/config/env/test.js new file mode 100755 index 00000000..3faf05be --- /dev/null +++ b/config/env/test.js @@ -0,0 +1,22 @@ +'use strict'; + +module.exports = { + db: 'mongodb://localhost/insight-test', + app: { + name: 'Insight - Test' + }, + port: '3301', + bitcoind: { + protocol: process.env.BITCOIND_PROTO || 'http', + user: process.env.BITCOIND_USER || 'user', + pass: process.env.BITCOIND_PASS || 'pass', + host: process.env.BITCOIND_HOST || '127.0.0.1', + port: process.env.BITCOIND_PORT || '18332', + p2pPort: process.env.BITCOIND_P2P_PORT || '18333', + disableAgent: true, + dataDir: process.env.BITCOIND_DATADIR || './testnet3', + }, + network: process.env.INSIGHT_NETWORK || 'testnet', + disableP2pSync: false, + disableHistoricSync: false, +}; diff --git a/dev-util/explode_tx.js b/dev-util/explode_tx.js index 814b33b6..f9849c2e 100755 --- a/dev-util/explode_tx.js +++ b/dev-util/explode_tx.js @@ -25,7 +25,7 @@ mongoose.connection.on('open', function() { var b = new Buffer(hash,'hex'); - T.createFromArray([hash], function(err, ret) { + T.createFromTxs([hash], function(err, ret) { console.log('Err:'); console.log(err); diff --git a/util/status_info.js b/dev-util/status_info.js similarity index 100% rename from util/status_info.js rename to dev-util/status_info.js diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js index 55065c72..0fcf69b7 100644 --- a/lib/HistoricSync.js +++ b/lib/HistoricSync.js @@ -106,6 +106,11 @@ function spec() { if (self.syncPercentage > 100) self.syncPercentage = 100; p(util.format('status: [%d%%] skipped: %d', self.syncPercentage, self.skippedBlocks)); +//TODO +if (self.syncPercentage>5) { + process.exit(1); +} + // } if (self.opts.shouldBroadcast) { sockets.broadcastSyncInfo(self.info()); @@ -328,18 +333,25 @@ function spec() { if (err) return next(err); + + var scanOpts = {}; + if (!b) { p('Could not find Genesis block. Running FULL SYNC'); + if (config.bitcoind.dataDir) { + p('bitcoind dataDir configured...importing blocks from .dat files'); + scanOpts.fromFiles = true; + } + else { + scanOpts.reverse = true; + } } else { p('Genesis block found. Syncing upto known blocks.'); + scanOpts.reverse = true; + scanOpts.upToExisting = true; } - var scanOpts = { - reverse: true, - upToExisting: b ? true: false, - }; - return self.importHistory(scanOpts, next); }); }; diff --git a/lib/Sync.js b/lib/Sync.js index ea4794b0..ab70e4d9 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -106,7 +106,7 @@ function spec() { Sync.prototype.storeTxs = function(txs, cb) { var self = this; - TransactionOut.createFromArray(txs, function(err, inserted_txs, updated_addrs) { + TransactionOut.createFromTxs(txs, function(err, inserted_txs, updated_addrs) { if (err) return cb(err); self._handleBroadcast(null, inserted_txs, updated_addrs); diff --git a/package.json b/package.json index c0fa2e7b..faf621ff 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,7 @@ }, "dependencies": { "async": "*", + "glob": "*", "classtool": "*", "commander": "*", "bignum": "*", From c8dcf1e9792c911837b25131395a8ad7a5ab79e7 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Sat, 1 Feb 2014 22:44:53 -0300 Subject: [PATCH 2/7] git fixes --- config/env/development.js | 21 --------------------- config/env/production.js | 22 ---------------------- config/env/test.js | 22 ---------------------- 3 files changed, 65 deletions(-) delete mode 100755 config/env/development.js delete mode 100755 config/env/production.js delete mode 100755 config/env/test.js diff --git a/config/env/development.js b/config/env/development.js deleted file mode 100755 index 2e411109..00000000 --- a/config/env/development.js +++ /dev/null @@ -1,21 +0,0 @@ -'use strict'; - -module.exports = { - db: 'mongodb://localhost/insight-dev', - app: { - name: 'Insight - Development' - }, - bitcoind: { - protocol: process.env.BITCOIND_PROTO || 'http', - user: process.env.BITCOIND_USER || 'user', - pass: process.env.BITCOIND_PASS || 'pass', - host: process.env.BITCOIND_HOST || '127.0.0.1', - port: process.env.BITCOIND_PORT || '18332', - p2pPort: process.env.BITCOIND_P2P_PORT || '18333', - disableAgent: true, - dataDir: process.env.BITCOIND_DATADIR || './testnet3', - }, - network: process.env.INSIGHT_NETWORK || 'testnet', - disableP2pSync: false, - disableHistoricSync: false, -}; diff --git a/config/env/production.js b/config/env/production.js deleted file mode 100755 index c679713b..00000000 --- a/config/env/production.js +++ /dev/null @@ -1,22 +0,0 @@ -'use strict'; - -module.exports = { - db: 'mongodb://localhost/insight-test', - app: { - name: 'Insight - Prod' - }, - port: '3301', - bitcoind: { - protocol: process.env.BITCOIND_PROTO || 'http', - user: process.env.BITCOIND_USER || 'user', - pass: process.env.BITCOIND_PASS || 'pass', - host: process.env.BITCOIND_HOST || '127.0.0.1', - port: process.env.BITCOIND_PORT || '18332', - p2pPort: process.env.BITCOIND_P2P_PORT || '18333', - disableAgent: true, - dataDir: process.env.BITCOIND_DATADIR || './testnet3', - }, - network: process.env.INSIGHT_NETWORK || 'testnet', - disableP2pSync: false, - disableHistoricSync: false, -}; diff --git a/config/env/test.js b/config/env/test.js deleted file mode 100755 index 3faf05be..00000000 --- a/config/env/test.js +++ /dev/null @@ -1,22 +0,0 @@ -'use strict'; - -module.exports = { - db: 'mongodb://localhost/insight-test', - app: { - name: 'Insight - Test' - }, - port: '3301', - bitcoind: { - protocol: process.env.BITCOIND_PROTO || 'http', - user: process.env.BITCOIND_USER || 'user', - pass: process.env.BITCOIND_PASS || 'pass', - host: process.env.BITCOIND_HOST || '127.0.0.1', - port: process.env.BITCOIND_PORT || '18332', - p2pPort: process.env.BITCOIND_P2P_PORT || '18333', - disableAgent: true, - dataDir: process.env.BITCOIND_DATADIR || './testnet3', - }, - network: process.env.INSIGHT_NETWORK || 'testnet', - disableP2pSync: false, - disableHistoricSync: false, -}; From ddeaa5d8354db2e42112a7929986575378a30062 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Thu, 30 Jan 2014 19:51:18 -0300 Subject: [PATCH 3/7] blockExtractor class to read blocks from bitcoind .dat files --- dev-util/read_block.js | 25 +++++ lib/BlockExtractor.js | 158 +++++++++++++++++++++++++++++ test/integration/blockExtractor.js | 72 +++++++++++++ 3 files changed, 255 insertions(+) create mode 100755 dev-util/read_block.js create mode 100644 lib/BlockExtractor.js create mode 100644 test/integration/blockExtractor.js diff --git a/dev-util/read_block.js b/dev-util/read_block.js new file mode 100755 index 00000000..99fd0d88 --- /dev/null +++ b/dev-util/read_block.js @@ -0,0 +1,25 @@ +#!/usr/bin/env node +'use strict'; + +process.env.NODE_ENV = process.env.NODE_ENV || 'development'; + +var assert = require('assert'), + config = require('../config/config'), + BlockExtractor = require('../lib/BlockExtractor').class(), + networks = require('bitcore/networks'), + util = require('bitcore/util/util'); + + var be = new BlockExtractor(config.bitcoind.dataDir, config.network); + var network = config.network === 'testnet' ? networks.testnet: networks.livenet; +// console.log('[read_block.js.13]', be.nextFile() ); + + var c=0; + while (c++ < 100) { + be.getNextBlock(function(err, b) { + console.log('[read_block.js.14]',err, c, b?util.formatHashAlt(b.hash):''); //TODO + }); + } + + + + diff --git a/lib/BlockExtractor.js b/lib/BlockExtractor.js new file mode 100644 index 00000000..5f1b7d30 --- /dev/null +++ b/lib/BlockExtractor.js @@ -0,0 +1,158 @@ +'use strict'; + +require('classtool'); + +function spec() { + var Block = require('bitcore/block').class(), + networks = require('bitcore/networks'), + Parser = require('bitcore/util/BinaryParser').class(), + fs = require('fs'), + Buffer = require('buffer').Buffer, + glob = require('glob'), + async = require('async'); + + function BlockExtractor(dataDir, network) { + + var self = this; + var path = dataDir + '/blocks/blk*.dat'; + + self.dataDir = dataDir; + self.files = glob.sync(path); + self.nfiles = self.files.length; + + if (self.nfiles === 0) + throw new Error('Could not find block files at: ' + path); + + self.currentFileIndex = 0; + self.isCurrentRead = false; + self.currentBuffer = null; + self.currentParser = null; + self.network = network === 'testnet' ? networks.testnet: networks.livenet; + self.magic = self.network.magic.toString('hex'); + } + + BlockExtractor.prototype.currentFile = function() { + var self = this; + + return self.files[self.currentFileIndex]; + }; + + + BlockExtractor.prototype.nextFile = function() { + var self = this; + + if (self.currentFileIndex < 0) return false; + + var ret = true; + + self.isCurrentRead = false; + self.currentBuffer = null; + self.currentParser = null; + + if (self.currentFileIndex < self.nfiles - 1) { + self.currentFileIndex++; + } + else { + self.currentFileIndex=-1; + ret = false; + } + return ret; + }; + + BlockExtractor.prototype.readCurrentFileSync = function() { + var self = this; + + if (self.currentFileIndex < 0 || self.isCurrentRead) return; + + + self.isCurrentRead = true; + + var fname = self.currentFile(); + if (!fname) return; + + + var stats = fs.statSync(fname); + + var size = stats.size; + + console.log('Reading Blockfile %s [%d MB]', + fname, parseInt(size/1024/1024)); + + var fd = fs.openSync(fname, 'r'); + +// if (status) return cb(new Error(status.message)); + + var buffer = new Buffer(size); + + var num = fs.readSync(fd, buffer, 0, size, 0); + + self.currentBuffer = buffer; + self.currentParser = new Parser(buffer); + }; + + + + BlockExtractor.prototype.getNextBlock = function(cb) { + var self = this; + + var b; + var magic; + async.series([ + function (a_cb) { + + async.whilst( + function() { + return (!magic); + }, + function(w_cb) { + + self.readCurrentFileSync(); + + if (self.currentFileIndex < 0) return cb(); + + + magic = self.currentParser ? self.currentParser.buffer(4).toString('hex') + : null ; + + if (!self.currentParser || self.currentParser.eof()) { + magic = null; + if (self.nextFile()) { + console.log('Moving forward to file:' + self.currentFile() ); + return w_cb(); + } + else { + console.log('Finished all files'); + return cb(); + } + } + else { + return w_cb(); + } + }, a_cb); + }, + function (a_cb) { + if (magic !== self.magic) { + var e = new Error('CRITICAL ERROR: Magic number mismatch: ' + + magic + '!=' + self.magic); + return a_cb(e); + } + + // spacer? + self.currentParser.word32le(); + return a_cb(); + }, + function (a_cb) { + b = new Block(); + b.parse(self.currentParser); + b.getHash(); + return a_cb(); + }, + ], function(err) { + return cb(err,b); + }); + }; + + return BlockExtractor; +} +module.defineClass(spec); + diff --git a/test/integration/blockExtractor.js b/test/integration/blockExtractor.js new file mode 100644 index 00000000..e7ff291a --- /dev/null +++ b/test/integration/blockExtractor.js @@ -0,0 +1,72 @@ +#!/usr/bin/env node +'use strict'; + +process.env.NODE_ENV = process.env.NODE_ENV || 'development'; + + + +var assert = require('assert'), + config = require('../../config/config'), + BlockExtractor = require('../../lib/BlockExtractor').class(), + networks = require('bitcore/networks'), + util = require('bitcore/util/util'); + +//var txItemsValid = JSON.parse(fs.readFileSync('test/model/txitems.json')); + +describe('TransactionOut', function(){ + + var be = new BlockExtractor(config.bitcoind.dataDir, config.network); + + var network = config.network === 'testnet' ? networks.testnet: networks.livenet; + + it('should glob block files ', function(done) { + assert(be.files.length>0); + done(); + }); + + var lastTs; + + it('should read genesis block ', function(done) { + be.getNextBlock(function(err,b) { + assert(!err); + var genesisHashReversed = new Buffer(32); + network.genesisBlock.hash.copy(genesisHashReversed); + var genesis = util.formatHashFull(network.genesisBlock.hash); + + assert.equal(util.formatHashFull(b.hash),genesis); + assert.equal(b.nounce,network.genesisBlock.nounce); + assert.equal(b.timestamp,network.genesisBlock.timestamp); + assert.equal(b.merkle_root.toString('hex'),network.genesisBlock.merkle_root.toString('hex')); + + lastTs = b.timestamp; + done(); + }); + }); + + it('should read next testnet block ', function(done) { + be.getNextBlock(function(err,b) { + assert(!err); + assert(b.timestamp > lastTs, 'timestamp > genesis_ts'); + done(); + }); + }); + + it('should read 100000 blocks with no error ', function(done) { + + var i=0; + while(i++<100000) { + be.getNextBlock(function(err,b) { + assert(!err,err); + assert(lastTs < b.timestamp, 'genesisTS < b.timestamp: ' + lastTs + '<' + b.timestamp + ":" + i); + if(i % 1000 === 1) process.stdout.write('.'); + if(i === 100000) done(); + }); + } + }); + + + +}); + + + From 474b3daa53690df189254dfe8c51f197837d3fed Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Thu, 30 Jan 2014 23:16:43 -0300 Subject: [PATCH 4/7] fromFiles WIP --- app/models/TransactionOut.js | 2 + lib/BlockExtractor.js | 4 +- lib/HistoricSync.js | 140 ++++++++++++++++++++++++++++------- lib/Sync.js | 1 + util/sync.js | 6 +- 5 files changed, 122 insertions(+), 31 deletions(-) diff --git a/app/models/TransactionOut.js b/app/models/TransactionOut.js index 56548dc9..8247c5e2 100644 --- a/app/models/TransactionOut.js +++ b/app/models/TransactionOut.js @@ -196,6 +196,8 @@ TransactionOutSchema.statics.createFromTxs = function(txs, next) { }); }, function(a_cb) { + if (txid === genesisTXID) return a_cb(); + Self.storeTransactionOuts(txInfo, function(err, addrs) { if (err) return a_cb(err); diff --git a/lib/BlockExtractor.js b/lib/BlockExtractor.js index 5f1b7d30..1802788b 100644 --- a/lib/BlockExtractor.js +++ b/lib/BlockExtractor.js @@ -80,11 +80,9 @@ function spec() { var fd = fs.openSync(fname, 'r'); -// if (status) return cb(new Error(status.message)); - var buffer = new Buffer(size); - var num = fs.readSync(fd, buffer, 0, size, 0); + fs.readSync(fd, buffer, 0, size, 0); self.currentBuffer = buffer; self.currentParser = new Parser(buffer); diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js index 0fcf69b7..f1aec64a 100644 --- a/lib/HistoricSync.js +++ b/lib/HistoricSync.js @@ -13,6 +13,7 @@ function spec() { var Block = require('../app/models/Block'); var Sync = require('./Sync').class(); var sockets = require('../app/controllers/socket.js'); + var BlockExtractor = require('./BlockExtractor.js').class(); var BAD_GEN_ERROR = 'Bad genesis block. Network mismatch between Insight and bitcoind? Insight is configured for:'; @@ -125,7 +126,6 @@ if (self.syncPercentage>5) { var existed = false; var blockInfo; - var blockObj; async.series([ // Already got it? @@ -137,7 +137,6 @@ if (self.syncPercentage>5) { } if (block) { existed = true; - blockObj = block; } return c(); }); @@ -150,22 +149,20 @@ if (self.syncPercentage>5) { return c(); }, - //get Info from RPC - function(c) { - // TODO: if we store prev/next, no need to go to RPC - // if (blockObj && blockObj.nextBlockHash) return c(); + function(c) { self.rpc.getBlock(blockHash, function(err, ret) { if (err) return c(err); - blockInfo = ret; + blockInfo = ret ? ret.result : null; return c(); }); }, //store it function(c) { if (existed) return c(); - self.sync.storeBlock(blockInfo.result, function(err) { + + self.sync.storeBlock(blockInfo, function(err) { existed = err && err.toString().match(/E11000/); @@ -190,6 +187,7 @@ if (self.syncPercentage>5) { self.status = 'aborted'; self.showProgress(); p(self.err); + return cb(err); } else { self.err = null; @@ -205,7 +203,7 @@ if (self.syncPercentage>5) { } // Continue - if (blockInfo && blockInfo.result) { + if (blockInfo) { if (existed) self.skippedBlocks++; @@ -213,16 +211,98 @@ if (self.syncPercentage>5) { self.syncedBlocks++; // recursion - if (scanOpts.prev && blockInfo.result.previousblockhash) - return self.getPrevNextBlock(blockInfo.result.previousblockhash, blockEnd, scanOpts, cb); + if (scanOpts.prev && blockInfo.previousblockhash) + return self.getPrevNextBlock(blockInfo.previousblockhash, blockEnd, scanOpts, cb); - if (scanOpts.next && blockInfo.result.nextblockhash) - return self.getPrevNextBlock(blockInfo.result.nextblockhash, blockEnd, scanOpts, cb); + if (scanOpts.next && blockInfo.nextblockhash) + return self.getPrevNextBlock(blockInfo.nextblockhash, blockEnd, scanOpts, cb); } return cb(err); }); }; + + HistoricSync.prototype.getBlockFromFile = function(height, scanOpts, cb) { + var self = this; + + var nextHash; + var blockInfo; + var isMainChain; + var existed; + + async.series([ + // Is it in mainchain? + function(c) { + self.rpc.getBlockHash(height, function(err, res) { + if (err) return cb(err); + + nextHash = res.result; +//console.log('[HistoricSync.js.235:nextHash:]',nextHash); //TODO + return c(); + }); + }, + //show some (inacurate) status + function(c) { + if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) { + self.showProgress(); + } + + return c(); + }, + //get Info + function(c) { + self.blockExtractor.getNextBlock(function(err, b) { + if (err || ! b) return c(err); + + blockInfo = b.getStandardizedObject(b.txs); + return c(); + }); + }, + //store it + function(c) { + + isMainChain = blockInfo.hash === nextHash; + + //TODO + blockInfo.isOrphan = !isMainChain; + + self.sync.storeBlock(blockInfo, function(err) { + existed = err && err.toString().match(/E11000/); + + if (err && ! existed) return c(err); + return c(); + }); + }, + ], function(err) { + + if (err) { + self.err = util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockInfo.hash, err, self.syncedBlocks); + self.status = 'aborted'; + self.showProgress(); + p(err); + return cb(err); + } + else { + self.err = null; + self.status = 'syncing'; + } + + // Continue + if (blockInfo) { + + // mainchain + if (isMainChain) height++; + + self.syncedBlocks++; + + return self.getBlockFromFile(height, scanOpts, cb); + } + return cb(err); + }); + }; + + + HistoricSync.prototype.importHistory = function(scanOpts, next) { var self = this; @@ -290,16 +370,23 @@ if (self.syncPercentage>5) { p(' to : ', end); p(' scanOpts: ', JSON.stringify(scanOpts)); - self.getPrevNextBlock(start, end, scanOpts, function(err) { - if (err && err.message.match(/ECONNREFUSED/)) { - setTimeout(function() { - p('Retrying in %d secs', retry_secs); - sync(); - }, - retry_secs * 1000); - } - else return next(err); - }); + if (scanOpts.fromFiles) { + self.getBlockFromFile(0, scanOpts, function(err) { + return next(err); + }); + } + else { + self.getPrevNextBlock(start, end, scanOpts, function(err) { + if (err && err.message.match(/ECONNREFUSED/)) { + setTimeout(function() { + p('Retrying in %d secs', retry_secs); + sync(); + }, + retry_secs * 1000); + } + else return next(err); + }); + } } @@ -326,7 +413,7 @@ if (self.syncPercentage>5) { }; // upto if we have genesis block? - HistoricSync.prototype.smartImport = function(next) { + HistoricSync.prototype.smartImport = function(scanOpts, next) { var self = this; Block.fromHash(self.genesis, function(err, b) { @@ -334,13 +421,12 @@ if (self.syncPercentage>5) { if (err) return next(err); - var scanOpts = {}; - - if (!b) { + if (!b || scanOpts.destroy) { p('Could not find Genesis block. Running FULL SYNC'); if (config.bitcoind.dataDir) { p('bitcoind dataDir configured...importing blocks from .dat files'); scanOpts.fromFiles = true; + self.blockExtractor = new BlockExtractor(config.bitcoind.dataDir, config.network); } else { scanOpts.reverse = true; diff --git a/lib/Sync.js b/lib/Sync.js index ab70e4d9..bb445fad 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -65,6 +65,7 @@ function spec() { function(b) { try {self.db.collections.transactionouts.drop(b);} catch (e) { return b(); } }, ], next); }; + Sync.prototype.storeBlock = function(block, cb) { var self = this; diff --git a/util/sync.js b/util/sync.js index 48a4307c..3c1af7d9 100755 --- a/util/sync.js +++ b/util/sync.js @@ -16,6 +16,7 @@ program .option('-D --destroy', 'Remove current DB (and start from there)', 0) .option('-R --reverse', 'Sync backwards', 0) .option('-U --uptoexisting', 'Sync only until an existing block is found', 0) + .option('-F --fromfiles', 'Sync using bitcoind .dat block files (faster)', 0) .parse(process.argv); var historicSync = new HistoricSync(); @@ -32,13 +33,16 @@ async.series([ function(cb) { if (program.smart) { - historicSync.smartImport(cb); + historicSync.smartImport({ + destroy: program.destroy, + },cb); } else { historicSync.importHistory({ destroy: program.destroy, reverse: program.reverse, upToExisting: program.uptoexisting, + fromFiles: program.fromfiles, }, cb); } }, From 23960f0c373757a64efe5504913990c103ed28e7 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Sat, 1 Feb 2014 13:39:29 -0300 Subject: [PATCH 5/7] files sync workgin --- app/models/Block.js | 10 ++-- app/models/TransactionOut.js | 100 +++++++++++++++++++++++++---------- lib/HistoricSync.js | 77 +++++++++++++++++++++++---- 3 files changed, 146 insertions(+), 41 deletions(-) diff --git a/app/models/Block.js b/app/models/Block.js index 93016699..8a01ca57 100644 --- a/app/models/Block.js +++ b/app/models/Block.js @@ -86,16 +86,18 @@ BlockSchema.statics.customCreate = function(block, cb) { newBlock.time = block.time ? block.time : Math.round(new Date().getTime() / 1000); newBlock.hashStr = block.hash; + newBlock.isOrphan = block.isOrphan; newBlock.nextBlockHashStr = block.nextBlockHash; var insertedTxs, updateAddrs; async.series([ function(a_cb) { - TransactionOut.createFromTxs(block.tx, function(err, inInsertedTxs, inUpdateAddrs) { - insertedTxs = inInsertedTxs; - updateAddrs = inUpdateAddrs; - return a_cb(err); + TransactionOut.createFromTxs(block.tx, block.isOrphan, + function(err, inInsertedTxs, inUpdateAddrs) { + insertedTxs = inInsertedTxs; + updateAddrs = inUpdateAddrs; + return a_cb(err); }); }, function(a_cb) { newBlock.save(function(err) { diff --git a/app/models/TransactionOut.js b/app/models/TransactionOut.js index 8247c5e2..a0fec41d 100644 --- a/app/models/TransactionOut.js +++ b/app/models/TransactionOut.js @@ -24,13 +24,16 @@ var TransactionOutSchema = new Schema({ index: true, }, value_sat: Number, + fromOrphan: Boolean, spendTxIdBuf: Buffer, spendIndex: Number, + spendFromOrphan: Boolean, }); // Compound index + TransactionOutSchema.index({txidBuf: 1, index: 1}, {unique: true, sparse: true}); TransactionOutSchema.index({spendTxIdBuf: 1, spendIndex: 1}, {unique: true, sparse: true}); @@ -91,15 +94,52 @@ TransactionOutSchema.statics.removeFromTxId = function(txid, cb) { -TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) { +TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, fromOrphan, cb) { var Self = this; var addrs = []; var is_new = true; + if (txInfo.hash) { + + // adapt bitcore TX object to bitcoind JSON response + txInfo.txid = txInfo.hash; + + var count = 0; + txInfo.vin = txInfo.in.map(function (txin) { + var i = {}; + + if (txin.coinbase) { + txInfo.isCoinBase = true; + } + else { + i.txid= txin.prev_out.hash; + i.vout= txin.prev_out.n; + }; + i.n = count++; + return i; + }); + + + count = 0; + txInfo.vout = txInfo.out.map(function (txout) { + var o = {}; + + o.value = txout.value; + o.n = count++; + + if (txout.addrStr){ + o.scriptPubKey = {}; + o.scriptPubKey.addresses = [txout.addrStr]; + } + return o; + }); + + } var bTxId = new Buffer(txInfo.txid,'hex'); + async.series([ // Input Outpoints (mark them as spended) function(p_c) { @@ -114,6 +154,7 @@ TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) { spendTxIdBuf: bTxId, spendIndex: i.n, }; + if (fromOrphan) data.spendFromOrphan = true; Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out); }, function (err) { @@ -136,10 +177,10 @@ TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) { ! o.scriptPubKey.addresses[1] // TODO : not supported ){ - // This is only to broadcast - if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { - addrs.push(o.scriptPubKey.addresses[0]); - } + // This is only to broadcast (WIP) +// if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { +// addrs.push(o.scriptPubKey.addresses[0]); +// } var data = { txidBuf: bTxId, @@ -148,11 +189,12 @@ TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) { value_sat : o.value * util.COIN, addr : o.scriptPubKey.addresses[0], }; + if (fromOrphan) data.fromOrphan = true; Self.update({txidBuf: bTxId, index: o.n}, data, {upsert: true}, next_out); } else { - console.log ('WARN in TX: %s could not parse OUTPUT %d', txInfo.txid, o.n); - return next_out(); + console.log ('WARN in TX: %s could not parse OUTPUT %d', txInfo.txid, o.n); + return next_out(); } }, function (err) { @@ -174,50 +216,54 @@ TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) { // txs can be a [hashes] or [txObjects] -TransactionOutSchema.statics.createFromTxs = function(txs, next) { - +TransactionOutSchema.statics.createFromTxs = function(txs, fromOrphan, next) { var Self = this; + + if (typeof fromOrphan === 'function') { + next = fromOrphan; + fromOrphan = false; + } + if (!txs) return next(); var inserted_txs = []; var updated_addrs = {}; - async.forEachLimit(txs, CONCURRENCY, function(txid, cb, was_new) { + async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) { var txInfo; + async.series([ function(a_cb) { + if (typeof t !== 'string') { + txInfo = t; + return a_cb(); + } + // Is it from genesis block? (testnet==livenet) // TODO: parse it from networks.genesisTX? - if (txid === genesisTXID) return a_cb(); - TransactionRpc.getRpcInfo(txid, function(err, inInfo) { + if (t === genesisTXID) return a_cb(); + + TransactionRpc.getRpcInfo(t, function(err, inInfo) { txInfo =inInfo; return a_cb(err); }); }, function(a_cb) { - if (txid === genesisTXID) return a_cb(); + if (!txInfo) return a_cb(); - Self.storeTransactionOuts(txInfo, function(err, addrs) { + Self.storeTransactionOuts(txInfo, fromOrphan, function(err, addrs) { if (err) return a_cb(err); - - if (was_new) { - inserted_txs.push(txid); - addrs.each(function(a) { - if ( !updated_addrs[a]) updated_addrs[a] = []; - updated_addrs[a].push(txid); - }); - } return a_cb(); }); }], function(err) { - return cb(err); - }); - }, - function(err) { - return next(err, inserted_txs, updated_addrs); + return each_cb(err); }); + }, + function(err) { + return next(err, inserted_txs, updated_addrs); + }); }; diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js index f1aec64a..74e85014 100644 --- a/lib/HistoricSync.js +++ b/lib/HistoricSync.js @@ -7,6 +7,9 @@ require('classtool'); function spec() { var util = require('util'); var RpcClient = require('bitcore/RpcClient').class(); + var bitutil = require('bitcore/util/util'); + var Address = require('bitcore/Address').class(); + var Script = require('bitcore/Script').class(); var networks = require('bitcore/networks'); var async = require('async'); var config = require('../config/config'); @@ -96,7 +99,7 @@ function spec() { }; }; - HistoricSync.prototype.showProgress = function() { + HistoricSync.prototype.showProgress = function(height) { var self = this; if (self.error) { @@ -106,12 +109,7 @@ function spec() { self.syncPercentage = parseFloat(100 * (self.syncedBlocks + self.skippedBlocks) / self.blockChainHeight).toFixed(3); if (self.syncPercentage > 100) self.syncPercentage = 100; - p(util.format('status: [%d%%] skipped: %d', self.syncPercentage, self.skippedBlocks)); -//TODO -if (self.syncPercentage>5) { - process.exit(1); -} - // + p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks, height)); } if (self.opts.shouldBroadcast) { sockets.broadcastSyncInfo(self.info()); @@ -222,6 +220,46 @@ if (self.syncPercentage>5) { }; + // TODO. replace with + // Script.prototype.getAddrStrs if that one get merged in bitcore + HistoricSync.prototype.getAddrStr = function(s) { + var self = this; + + var addrStrs = []; + var type = s.classify(); + var addr; + + switch(type) { + case Script.TX_PUBKEY: + var chunk = s.captureOne(); + addr = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk)); + addrStrs = [ addr.toString() ]; + break; + case Script.TX_PUBKEYHASH: + addr = new Address(self.network.addressPubkey, s.captureOne()); + addrStrs = [ addr.toString() ]; + break; + case Script.TX_SCRIPTHASH: + addr = new Address(self.network.addressScript, s.captureOne()); + addrStrs = [ addr.toString() ]; + break; + case Script.TX_MULTISIG: + var addrs = []; + var chunks = s.capture(); + + chunks.forEach(function(chunk) { + var a = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk)); + addrStrs.push(a.toString()); + }); + break; + case Script.TX_UNKNOWN: + break; + } + + return addrStrs; + }; + + HistoricSync.prototype.getBlockFromFile = function(height, scanOpts, cb) { var self = this; @@ -237,14 +275,13 @@ if (self.syncPercentage>5) { if (err) return cb(err); nextHash = res.result; -//console.log('[HistoricSync.js.235:nextHash:]',nextHash); //TODO return c(); }); }, //show some (inacurate) status function(c) { if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) { - self.showProgress(); + self.showProgress(height); } return c(); @@ -254,7 +291,27 @@ if (self.syncPercentage>5) { self.blockExtractor.getNextBlock(function(err, b) { if (err || ! b) return c(err); - blockInfo = b.getStandardizedObject(b.txs); + blockInfo = b.getStandardizedObject(b.txs, self.network); + + var ti=0; + // Get TX Address + b.txs.forEach(function(t) { + var objTx = blockInfo.tx[ti++]; + var to=0; + t.outs.forEach( function(o) { + + + var s = new Script(o.s); + var addrs = self.getAddrStr(s); + + // support only p2pubkey p2pubkeyhash and p2sh + if (addrs.length === 1) { + objTx.out[to].addrStr = addrs[0]; + } + to++; + }); + }); + return c(); }); }, From 02c3512f94b2086b5ef5a21a8f852f346924c8f5 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Sat, 1 Feb 2014 22:39:08 -0300 Subject: [PATCH 6/7] orphan blocks ignored on file sync --- lib/HistoricSync.js | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js index 74e85014..3ab8f60d 100644 --- a/lib/HistoricSync.js +++ b/lib/HistoricSync.js @@ -320,9 +320,15 @@ function spec() { isMainChain = blockInfo.hash === nextHash; - //TODO blockInfo.isOrphan = !isMainChain; + /* + * In file sync, orphan blocks are just ignored. + * This is to simplify our schema and the + * sync process + */ + if (blockInfo.isOrphan) return c(); + self.sync.storeBlock(blockInfo, function(err) { existed = err && err.toString().match(/E11000/); @@ -340,19 +346,23 @@ function spec() { return cb(err); } else { - self.err = null; - self.status = 'syncing'; - } - // Continue - if (blockInfo) { + // Continue + if (blockInfo) { - // mainchain - if (isMainChain) height++; + // mainchain + if (isMainChain) height++; - self.syncedBlocks++; + self.syncedBlocks++; + self.err = null; + self.status = 'syncing'; - return self.getBlockFromFile(height, scanOpts, cb); + return self.getBlockFromFile(height, scanOpts, cb); + } + else { + self.err = null; + self.status = 'finished'; + } } return cb(err); }); From 39abe56f273a930cdfbd89f7dd9569f267dda045 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Sat, 1 Feb 2014 22:47:16 -0300 Subject: [PATCH 7/7] config files --- config/config.js | 1 + 1 file changed, 1 insertion(+) diff --git a/config/config.js b/config/config.js index dc6eb10a..f28964cb 100644 --- a/config/config.js +++ b/config/config.js @@ -31,6 +31,7 @@ module.exports = { host: process.env.BITCOIND_HOST || '127.0.0.1', port: process.env.BITCOIND_PORT || '18332', p2pPort: process.env.BITCOIND_P2P_PORT || '18333', + dataDir: process.env.BITCOIND_DATADIR || './testnet3', // DO NOT CHANGE THIS! disableAgent: true