diff --git a/app/models/Address.js b/app/models/Address.js index 7e44970..657b67e 100644 --- a/app/models/Address.js +++ b/app/models/Address.js @@ -78,7 +78,7 @@ function spec() { self.transactions.push(txItem.spendTxId); self.txApperances +=2; } - }; + } }); return cb(); }); diff --git a/dev-util/level.js b/dev-util/level.js new file mode 100755 index 0000000..f897c2a --- /dev/null +++ b/dev-util/level.js @@ -0,0 +1,30 @@ +#!/usr/bin/env node +'use strict'; + +var config = require('../config/config'), + levelup = require('levelup'); + + + +var s = process.argv[2]; +var isBlock = process.argv[3] === '1'; + + +var dbPath = config.leveldb + (isBlock ? '/blocks' : '/txs'); +console.log('DB: ',dbPath); //TODO + + + +var db = levelup(dbPath ); + + +db.createReadStream({start: s, end: s+'~'}) + .on('data', function (data) { + console.log(data.key + ' => ' + data.value); //TODO + }) + .on('error', function () { + }) + .on('end', function () { + }); + + diff --git a/dev-util/tx-level.js b/dev-util/tx-level.js deleted file mode 100755 index a90cc2e..0000000 --- a/dev-util/tx-level.js +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env node - -var - config = require('../config/config'), - levelup = require('levelup'); - - -db = levelup(config.leveldb + '/txs'); - -var s = 'txouts-addr-mgqvRGJMwR9JU5VhJ3x9uX9MTkzTsmmDgQ'; -db.createReadStream({start: s, end: s+'~'}) - .on('data', function (data) { -console.log('[block-level.js.11:data:]',data); //TODO - if (data==false) c++; - }) - .on('end', function () { - }); - - diff --git a/lib/BlockDb.js b/lib/BlockDb.js index 3e1946b..b253ca8 100644 --- a/lib/BlockDb.js +++ b/lib/BlockDb.js @@ -5,8 +5,11 @@ require('classtool'); function spec(b) { - var TIMESTAMP_ROOT = 'b-ts-'; // b-ts- => - var PREV_ROOT = 'b-prev-'; // b-prev- => (0 if orphan) + var TIMESTAMP_PREFIX = 'b-ts-'; // b-ts- => + var PREV_PREFIX = 'b-prev-'; // b-prev- => + var NEXT_PREFIX = 'b-next-'; // b-next- => + var MAIN_PREFIX = 'b-main-'; // b-main- => 1/0 + var TIP = 'b-tip-'; // last block on the chain /** @@ -20,7 +23,6 @@ function spec(b) { var db = b.db || levelup(config.leveldb + '/blocks'); var rpc = b.rpc || new RpcClient(config.bitcoind); - var BlockDb = function() { }; @@ -38,53 +40,71 @@ function spec(b) { }); }; + // adds a block TIP block. Does not update Next pointer in + // the block prev to the new block. + // BlockDb.prototype.add = function(b, cb) { - if (!b.hash) return cb(new Error('no Hash at Block.save')); - - - var time_key = TIMESTAMP_ROOT + + var time_key = TIMESTAMP_PREFIX + ( b.time || Math.round(new Date().getTime() / 1000) ); - db.batch() + return db.batch() .put(time_key, b.hash) - .put(PREV_ROOT + b.hash, b.previousblockhash) + .put(TIP, b.hash) + .put(MAIN_PREFIX + b.hash, 1) + .put(PREV_PREFIX + b.hash, b.previousblockhash) .write(cb); }; - - - BlockDb.prototype.setOrphan = function(hash, cb) { - var k = PREV_ROOT + hash; - - db.get(k, function (err,oldPrevHash) { - if (err || !oldPrevHash) return cb(err); - db.put(PREV_ROOT + hash, 0, function() { - return cb(err, oldPrevHash); - }); - }); - // We keep the block in TIMESTAMP_ROOT - }; - - //mainly for testing - BlockDb.prototype.setPrev = function(hash, prevHash, cb) { - db.put(PREV_ROOT + hash, prevHash, function(err) { - return cb(err); - }); - }; - - //mainly for testing - BlockDb.prototype.getPrev = function(hash, cb) { - db.get(PREV_ROOT + hash, function(err,val) { + BlockDb.prototype.getTip = function(cb) { + db.get(TIP, function(err, val) { return cb(err,val); }); }; + //mainly for testing + BlockDb.prototype.setPrev = function(hash, prevHash, cb) { + db.put(PREV_PREFIX + hash, prevHash, function(err) { + return cb(err); + }); + }; + BlockDb.prototype.getPrev = function(hash, cb) { + db.get(PREV_PREFIX + hash, function(err,val) { + if (err && err.notFound) { err = null; val = null;} + return cb(err,val); + }); + }; + + BlockDb.prototype.getNext = function(hash, cb) { + db.get(NEXT_PREFIX + hash, function(err,val) { + if (err && err.notFound) { err = null; val = null;} + return cb(err,val); + }); + }; + + BlockDb.prototype.isMain = function(hash, cb) { + db.get(MAIN_PREFIX + hash, function(err, val) { + if (err && err.notFound) { err = null; val = 0;} + return cb(err,parseInt(val)); + }); + }; + + BlockDb.prototype.setMain = function(hash, isMain, cb) { + if (!isMain) console.log('ORPHAN: %s',hash); + db.put(MAIN_PREFIX + hash, isMain?1:0, function(err) { + return cb(err); + }); + }; + BlockDb.prototype.setNext = function(hash, nextHash, cb) { + db.put(NEXT_PREFIX + hash, nextHash, function(err) { + return cb(err); + }); + }; BlockDb.prototype.countNotOrphan = function(cb) { var c = 0; console.log('Counting connected blocks. This could take some minutes'); - db.createReadStream({start: PREV_ROOT, end: PREV_ROOT + '~' }) + db.createReadStream({start: MAIN_PREFIX, end: MAIN_PREFIX + '~' }) .on('data', function (data) { if (data.value !== 0) c++; }) @@ -96,8 +116,9 @@ function spec(b) { }); }; + // .has() return true orphans also BlockDb.prototype.has = function(hash, cb) { - var k = PREV_ROOT + hash; + var k = PREV_PREFIX + hash; db.get(k, function (err,val) { var ret; if (err && err.notFound) { @@ -130,14 +151,14 @@ function spec(b) { BlockDb.prototype.getBlocksByDate = function(start_ts, end_ts, limit, cb) { var list = []; db.createReadStream({ - start: TIMESTAMP_ROOT + start_ts, - end: TIMESTAMP_ROOT + end_ts, + start: TIMESTAMP_PREFIX + start_ts, + end: TIMESTAMP_PREFIX + end_ts, fillCache: true, limit: parseInt(limit) // force to int }) .on('data', function (data) { list.push({ - ts: data.key.replace(TIMESTAMP_ROOT, ''), + ts: data.key.replace(TIMESTAMP_PREFIX, ''), hash: data.value, }); }) diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js index 6b547bc..8534ecf 100644 --- a/lib/HistoricSync.js +++ b/lib/HistoricSync.js @@ -105,6 +105,8 @@ function spec() { HistoricSync.prototype.showProgress = function() { var self = this; + if ( ( self.syncedBlocks + self.skippedBlocks) % self.step !== 1) return; + if (self.error) { p('ERROR: ' + self.error); } @@ -143,18 +145,23 @@ function spec() { }, //show some (inacurate) status function(c) { - if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) { - self.showProgress(); - } - + self.showProgress(); return c(); }, function(c) { self.rpc.getBlock(blockHash, function(err, ret) { if (err) return c(err); + if (ret) { + blockInfo = ret.result; + // this is to match block retreived from file + if (blockInfo.hash === self.genesis) + blockInfo.previousblockhash='0000000000000000000000000000000000000000000000000000000000000000'; + } + else { + blockInfo = null; + } - blockInfo = ret ? ret.result : null; return c(); }); }, @@ -162,34 +169,25 @@ function spec() { function(c) { if (existed) return c(); - self.sync.storeBlock(blockInfo, function(err) { + self.sync.storeTipBlock(blockInfo, function(err) { return c(err); }); - }, - /* TODO: Should Start to sync backwards? (this is for partial syncs) - function(c) { - - if (blockInfo.result.prevblockhash != current.blockHash) { - p("reorg?"); - scanOpts.prev = 1; - } - return c(); - } - */ - ], function(err) { + }], function(err) { if (err) { - self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockHash, err, self.syncedBlocks)); + self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', + blockHash, err, self.syncedBlocks)); return cb(err); } else { self.status = 'syncing'; } - if ( (scanOpts.upToExisting && existed && self.syncedBlocks >= self.blockChainHeight) || + if ( (scanOpts.upToExisting && existed && + self.syncedBlocks >= self.blockChainHeight) || (blockEnd && blockEnd === blockHash)) { self.status = 'finished'; - p('DONE. Found existing block: ', blockHash); + p('DONE. Found block: ', blockHash); self.showProgress(); return cb(err); } @@ -251,91 +249,72 @@ function spec() { return addrStrs; }; - HistoricSync.prototype.getBlockFromFile = function(scanOpts, cb) { + HistoricSync.prototype.getBlockFromFile = function(cb) { + var self = this; + + var blockInfo; + + //get Info + self.blockExtractor.getNextBlock(function(err, b) { + if (err || ! b) return cb(err); + + blockInfo = b.getStandardizedObject(b.txs, self.network); + // blockInfo.curWork = Deserialize.intFromCompact(b.bits); + // We keep the RPC field names + blockInfo.previousblockhash = blockInfo.prev_block; + + var ti=0; + // Get TX Address + b.txs.forEach(function(t) { + + + var objTx = blockInfo.tx[ti++]; + + //add time from block + objTx.time = blockInfo.time; + + var to=0; + t.outs.forEach( function(o) { + + + var s = new Script(o.s); + var addrs = self.getAddrStr(s); + + // support only for p2pubkey p2pubkeyhash and p2sh + if (addrs.length === 1) { + objTx.out[to].addrStr = addrs[0]; + } + to++; + }); + }); + + return cb(err,blockInfo); + }); + }; + + + HistoricSync.prototype.nextBlockFromFile = function(scanOpts, cb) { var self = this; var blockInfo; - var existed; async.series([ - //show some (inacurate) status function(c) { - if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) { - self.showProgress(); - } - + self.showProgress(); return c(); }, - //get Info function(c) { - - self.blockExtractor.getNextBlock(function(err, b) { - if (err || ! b) return c(err); - - blockInfo = b.getStandardizedObject(b.txs, self.network); - // blockInfo.curWork = Deserialize.intFromCompact(b.bits); - // We keep the RPC field names - blockInfo.previousblockhash = blockInfo.prev_block; - - var ti=0; - // Get TX Address - b.txs.forEach(function(t) { - - - var objTx = blockInfo.tx[ti++]; - - //add time from block - objTx.time = blockInfo.time; - - var to=0; - t.outs.forEach( function(o) { - - - var s = new Script(o.s); - var addrs = self.getAddrStr(s); - - // support only p2pubkey p2pubkeyhash and p2sh - if (addrs.length === 1) { - objTx.out[to].addrStr = addrs[0]; - } - to++; - }); - }); - - return c(); - }); - }, - //check prev - function(c) { - if (blockInfo && self.prevHash && blockInfo.previousblockhash !== self.prevHash) { - - console.log('Hole found @%s', blockInfo.hash); - console.log('From: %s To: %s', self.prevHash, blockInfo.previousblockhash); - self.sync.checkOrphan(self.prevHash, blockInfo.previousblockhash, c); - } - else return c(); - }, - //check it - function(c) { - if (!blockInfo) return c(); - - self.sync.bDb.has(blockInfo.hash, function(err, had) { - existed = had; - return c(err); - }); - }, - //store it - function(c) { - if (!blockInfo || existed) return c(); - - self.sync.storeBlock(blockInfo, function(err) { + self.getBlockFromFile(function(err, inBlockInfo) { + blockInfo = inBlockInfo; return c(err); }); }, function(c) { + self.sync.storeTipBlock(blockInfo, function(err) { + return c(err); + }); + }, + function(c) { + // continue if (blockInfo && blockInfo.hash) { - self.prevHash = blockInfo.hash; - if (existed) - self.skippedBlocks++; - else self.syncedBlocks++; } else self.status = 'finished'; @@ -452,12 +431,15 @@ function spec() { p(' scanOpts: ', JSON.stringify(scanOpts)); if (scanOpts.fromFiles) { + self.status = 'syncing'; - self.type = 'from .dat Files'; + self.type = 'from .dat Files'; + + async.whilst(function() { return self.status === 'syncing'; }, function (w_cb) { - self.getBlockFromFile(scanOpts, function(err) { + self.nextBlockFromFile(scanOpts, function(err) { setImmediate(function(){ return w_cb(err); }); diff --git a/lib/PeerSync.js b/lib/PeerSync.js index 3c8525d..4f7a493 100644 --- a/lib/PeerSync.js +++ b/lib/PeerSync.js @@ -8,6 +8,7 @@ function spec() { var Sync = require('./Sync').class(); var Peer = require('bitcore/Peer').class(); var config = require('../config/config'); + var networks = require('bitcore/networks'); var peerdb_fn = 'peerdb.json'; @@ -16,18 +17,15 @@ function spec() { PeerSync.prototype.init = function(opts, cb) { if (!opts) opts = {}; - var network = opts && (opts.network || 'testnet'); + var networkName = opts && (opts.network || 'testnet'); + var network = networkName === 'testnet' ? networks.testnet : network.livenet; this.verbose = opts.verbose; this.peerdb = undefined; - this.sync = new Sync({ - networkName: network - }); + this.sync = new Sync(); this.PeerManager = require('bitcore/PeerManager').createClass({ - opts: { network: network - } }); this.peerman = new this.PeerManager(); this.load_peers(); @@ -77,19 +75,23 @@ function spec() { console.log('[p2p_sync] Handle block: ' + blockHash); } - var tx_hashes = block.txs.map(function(tx) { return coinUtil.formatHashFull(tx.hash); }); - this.sync.storeBlock({ + this.sync.storeTipBlock({ 'hash': blockHash, 'tx': tx_hashes, + 'previousblockhash': coinUtil.formatHashFull(block.prev_hash), }, function(err) { if (err) { console.log('[p2p_sync] Error in handle Block: ' + err); } + + // Check for reorgs... + // The previous last block hash + // if different => call }); }; @@ -110,6 +112,8 @@ function spec() { }); this.peerman.on('connection', function(conn) { + +console.log('[PeerSync.js.113]'); //TODO conn.on('inv', self.handle_inv.bind(self)); conn.on('block', self.handle_block.bind(self)); conn.on('tx', self.handle_tx.bind(self)); diff --git a/lib/Sync.js b/lib/Sync.js index fbae854..e923db2 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -11,15 +11,13 @@ function spec() { function Sync() { - this.bDb = new BlockDb(); - this.txDb = new TransactionDb(); } Sync.prototype.init = function(opts, cb) { var self = this; - self.opts = opts; - + this.bDb = new BlockDb(opts); + this.txDb = new TransactionDb(opts); return cb(); }; @@ -39,79 +37,202 @@ function spec() { ], next); }; + /* + * Arrives a NEW block, which is the new TIP + * + * Case 0) Simple case + * A-B-C-D-E(TIP)-NEW + * + * Case 1) + * A-B-C-D-E(TIP) + * \ + * NEW + * + * 1) Declare D-E orphans (and possible invalidate TXs on them) + * + * Case 2) + * A-B-C-D-E(TIP) + * \ + * F-G-NEW + * 1) Set F-G as connected (mark TXs as valid) + * 2) Declare D-E orphans (and possible invalidate TXs on them) + * + * + * Case 3) + * + * A-B-C-D-E(TIP) ... NEW + * + * 1) Get NEW.prev recusively until existing block + * then case 0) / 1) / 2) + * + */ - Sync.prototype.storeBlock = function(block, cb) { + Sync.prototype.storeTipBlock = function(b, cb) { + var self = this; + var oldTip, oldNext, needReorg = true; + var newPrev = b.previousblockhash; + var updatedTxs, updatedAddrs; + + async.series([ + function(c) { + self.txDb.createFromBlock(b, function(err, txs, addrs) { + updatedTxs = txs; + updatedAddrs = addrs; + return c(err); + }); + }, + function(c) { + self.bDb.getTip(function(err, val) { + oldTip = val; + if (typeof oldTip === 'undefined' || newPrev === oldTip) { + needReorg = false; + } + return c(); + }); + }, + function(c) { + if (!needReorg) return c(); + + self.bDb.getNext( newPrev, function(err, val) { + if (err) return c(err); + oldNext = val; + return c(); + }); + }, + function(c) { + self.bDb.add(b, c); + }, + function(c) { + if (!needReorg) return c(); + + console.log('NEW TIP: %s NEED REORG', b.hash, oldTip); + // TODO should modify updatedTxs and addrs. + self.processReorg(oldTip, oldNext, newPrev, cb); + }, + function(c) { + self.bDb.setNext(newPrev, b.hash, function(err) { + return c(err); + }); + }], + function(err) { + self._handleBroadcast(b, updatedTxs, updatedAddrs); + return cb(err); + }); + }; + + + + Sync.prototype.processReorg = function(oldTip, oldNext, newPrev, cb) { var self = this; - self.txDb.createFromBlock(block, function(err, insertedTxs, updateAddrs) { - if (err) return cb(err); + var newPrevExisted, orphanizeFrom; - self.bDb.add(block, function(err){ - if (err) return cb(err); - self._handleBroadcast(block, insertedTxs, updateAddrs); - return cb(); - }); + async.series([ + function(c) { + self.bDb.has(newPrev, function(err, ret) { + newPrevExisted = ret; + return c(); + }); + }, + function(c) { + if (newPrevExisted) return c(); + console.log('[BlockDb.js.133] case 3) not implemented yet in reorg'); //TODO + process.exit(1); + }, + function(c) { + self.bDb.isMain(newPrev, function(err,val) { + if (!val) return c(); + // case 1 + orphanizeFrom = oldNext; + return c(err); + }); + }, + function(c) { + if (orphanizeFrom) return c(); + + self.setBranchConnectedBackwards(newPrev, function(err, yHash) { + if (err) return c(err); + self.bDb.getNext(yHash, function(err, yHashNext) { + orphanizeFrom = yHashNext; + return c(err); + }); + }); + }, + function(c) { + if (!orphanizeFrom) return c(); + self.setBranchOrphan(orphanizeFrom, function(err) { + return c(err); + }); + }, + ], + function(err) { + return cb(err); + }); + }; + + Sync.prototype.setBranchOrphan = function(fromHash, cb) { + var self = this, + hashInterator = fromHash; + + async.whilst( + function() { return hashInterator; }, + function(c) { + self.setBlockMain(hashInterator, false, function(err) { + if (err) return cb(err); + self.bDb.getNext(hashInterator, function (err, val) { + hashInterator = val; + return c(err); + }); + }); + }, cb); + }; + + Sync.prototype.setBlockMain = function(hash, isMain, cb) { + var self = this; + + self.bDb.setMain(hash, isMain, function(err) { + if (err) return cb(err); + return self.txDb.handleBlockChange(hash, isMain, cb); }); }; - Sync.prototype.checkOrphan = function(fromBlock, toBlock, c) { - var self = this; + Sync.prototype.setBranchConnectedBackwards = function(fromHash, cb) { + var self = this, + hashInterator = fromHash, + isMain; - var hash = fromBlock; - - var co = 0; - var limit = 10; - var cont = 1; - - async.whilst( - function () { - if (++co > limit) { -console.log('[Sync.js.109] WARN: Reach reog depth limit'); //TODO - } - return cont && hash && hash !== toBlock && co < limit; - }, - function (w_c) { - //check with RPC if the block is mainchain - self.bDb.fromHashWithInfo(hash, function (err, info) { - - if (!info) { - console.log('[Sync.js.107:hash:ORPHAN]',hash); //TODO - self.txDb.setOrphan(hash, function(err) { - if (err) return w_c(err); - self.bDb.setOrphan(hash, function(err, prevHash){ - hash = prevHash; - return w_c(err); - }); + async.doWhilst( + function(c) { + self.setConnected(hashInterator, function (err) { + if (err) return c(err); + self.bDb.getPrev(hashInterator, function (err, val) { + if (err) return c(err); + hashInterator = val; + self.bDb.isMain(hashInterator, function (err, val) { + isMain = val; + return c(); }); - } - else { - console.log('[Sync.js.107:hash:NOT ORPHAN]',hash); //TODO - cont = 0; - return w_c(); - } + }); }); }, - function (err) { - return c(err); - } - ); + function() { return hashInterator; }, cb); }; - Sync.prototype._handleBroadcast = function(hash, inserted_txs, updated_addrs) { + Sync.prototype._handleBroadcast = function(hash, updatedTxs, updatedAddrs) { var self = this; if (hash && self.opts.broadcast_blocks) { sockets.broadcast_block({hash: hash}); } - if (inserted_txs && self.opts.broadcast_txs) { - inserted_txs.forEach(function(tx) { + if (updatedTxs && self.opts.broadcast_txs) { + updatedTxs.forEach(function(tx) { sockets.broadcast_tx(tx); }); } - if (updated_addrs && self.opts.broadcast_addresses) { - updated_addrs.forEach(function(addr, txs){ + if (updatedAddrs && self.opts.broadcast_addresses) { + updatedAddrs.forEach(function(addr, txs){ txs.forEach(function(addr, t){ sockets.broadcast_address_tx(addr, {'txid': t}); @@ -120,15 +241,13 @@ console.log('[Sync.js.109] WARN: Reach reog depth limit'); //TODO } }; - - Sync.prototype.storeTxs = function(txs, cb) { var self = this; - self.txDb.createFromArray(txs, null, function(err, inserted_txs, updated_addrs) { + self.txDb.createFromArray(txs, null, function(err, updatedTxs, updatedAddrs) { if (err) return cb(err); - self._handleBroadcast(null, inserted_txs, updated_addrs); + self._handleBroadcast(null, updatedTxs, updatedAddrs); return cb(err); }); }; diff --git a/lib/TransactionDb.js b/lib/TransactionDb.js index 0c562b3..d2fa9ef 100644 --- a/lib/TransactionDb.js +++ b/lib/TransactionDb.js @@ -5,15 +5,18 @@ require('classtool'); function spec(b) { - // blockHash -> txid mapping (to reorgs) - var ROOT = 'tx-b-'; //tx-b-- => 1/0 (connected or not) + // blockHash -> txid mapping + var IN_BLK_PREFIX = 'tx-b-'; //tx-b-- => 1/0 (connected or not) + + // Only for orphan blocks +// var FROM_BLK_PREFIX = 'tx-'; //tx-- => 1/0 (connected or not) // to show tx outs - var OUTS_ROOT = 'txouts-'; //txouts-- => [addr, btc_sat] + var OUTS_PREFIX = 'txouts-'; //txouts-- => [addr, btc_sat] // to sum up addr balance - var ADDR_ROOT = 'txouts-addr-'; //txouts-addr---- => + btc_sat - var SPEND_ROOT = 'txouts-spend-';//txouts-spend-- => [txid(in),n(in),ts] + var ADDR_PREFIX = 'txouts-addr-'; //txouts-addr---- => + btc_sat + var SPEND_PREFIX = 'txouts-spend-';//txouts-spend-- => [txid(in),n(in),ts] // TODO: use bitcore networks module var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'; @@ -51,7 +54,7 @@ function spec(b) { // TransactionDb.prototype.fromTxIdOne = function(txid, cb) { TODO TransactionDb.prototype.has = function(txid, cb) { - var k = OUTS_ROOT + txid; + var k = OUTS_PREFIX + txid; db.get(k, function (err,val) { var ret; @@ -69,7 +72,7 @@ function spec(b) { TransactionDb.prototype.fromTxId = function(txid, cb) { - var k = OUTS_ROOT + txid; + var k = OUTS_PREFIX + txid; var ret=[]; // outs. @@ -87,7 +90,7 @@ function spec(b) { return cb(err); }) .on('end', function () { - var k = SPEND_ROOT + txid; + var k = SPEND_PREFIX + txid; var l = ret.length; db.createReadStream({start: k, end: k + '~'}) .on('data', function (data) { @@ -174,7 +177,7 @@ function spec(b) { TransactionDb.prototype.fromTxIdN = function(txid, n, cb) { - var k = OUTS_ROOT + txid + '-' + n; + var k = OUTS_PREFIX + txid + '-' + n; db.get(k, function (err,val) { if (err && err.notFound) { @@ -206,7 +209,7 @@ function spec(b) { TransactionDb.prototype.fromAddr = function(addr, cb) { var self = this; - var k = ADDR_ROOT + addr; + var k = ADDR_PREFIX + addr; var ret=[]; db.createReadStream({start: k, end: k + '~'}) @@ -226,7 +229,7 @@ function spec(b) { .on('end', function () { async.each(ret, function(o, e_c) { - var k = SPEND_ROOT + o.txid + '-' + o.index; + var k = SPEND_PREFIX + o.txid + '-' + o.index; db.get(k, function(err, val) { if (err && err.notFound) err=null; if (err || !val) return e_c(err); @@ -238,7 +241,7 @@ function spec(b) { return e_c(); }); }, - function(err) { + function() { async.each(ret, function(o, e_c){ self.fillConfirmations(o,e_c); },function(err) { @@ -255,16 +258,16 @@ function spec(b) { async.series([ function(c) { db.createReadStream({ - start: OUTS_ROOT + txid, - end: OUTS_ROOT + txid + '~', + start: OUTS_PREFIX + txid, + end: OUTS_PREFIX + txid + '~', }).pipe( db.createWriteStream({type:'del'}) ).on('close', c); }, function(c) { db.createReadStream({ - start: SPEND_ROOT + txid, - end: SPEND_ROOT + txid + '~' + start: SPEND_PREFIX + txid, + end: SPEND_PREFIX + txid + '~' }) .pipe( db.createWriteStream({type:'del'}) @@ -315,7 +318,8 @@ function spec(b) { }; - TransactionDb.prototype.add = function(tx, cb) { + + TransactionDb.prototype.add = function(tx, blockhash, cb) { var self = this; var addrs = []; var is_new = true; @@ -331,7 +335,7 @@ function spec(b) { async.forEachLimit(tx.vin, CONCURRENCY, function(i, next_out) { db.batch() - .put( SPEND_ROOT + i.txid + '-' + i.vout , + .put( SPEND_PREFIX + i.txid + '-' + i.vout , tx.txid + ':' + i.n + ':' + ts) .write(next_out); }, @@ -355,15 +359,15 @@ function spec(b) { ! o.scriptPubKey.addresses[1] // TODO : not supported ){ // This is only to broadcast (WIP) - // if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { - // addrs.push(o.scriptPubKey.addresses[0]); - // } + if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { + addrs.push(o.scriptPubKey.addresses[0]); + } var addr = o.scriptPubKey.addresses[0]; var sat = Math.round(o.value * util.COIN); db.batch() - .put( OUTS_ROOT + tx.txid + '-' + o.n, addr + ':' + sat) - .put( ADDR_ROOT + addr + '-' + ts + '-' + tx.txid + + .put( OUTS_PREFIX + tx.txid + '-' + o.n, addr + ':' + sat) + .put( ADDR_PREFIX + addr + '-' + ts + '-' + tx.txid + '-' + o.n, sat) .write(next_out); @@ -385,33 +389,32 @@ function spec(b) { } return p_c(); }); - }], function(err) { + }, + function (p_c) { + if (!blockhash) return p_c(); + return self.setConfirmation(tx.txid,blockhash, true, p_c); + }, + ], function(err) { return cb(err, addrs, is_new); }); }; - TransactionDb.prototype.deleteConfirmation = function(txId, blockHash, c) { + + TransactionDb.prototype.setConfirmation = function(txId, blockHash, confirmed, c) { if (!blockHash) return c(); - db.put(ROOT + txId + '-' + blockHash, 0, function(err) { - return c(err); - }); + confirmed = confirmed ? 1 : 0; + + db.batch() + .put(IN_BLK_PREFIX + txId + '-' + blockHash, confirmed) + .write(c); }; - TransactionDb.prototype.addConfirmation = function(txId, blockHash, c) { - if (!blockHash) return c(); - - db.put(ROOT + txId + '-' + blockHash, 1, function(err) { - return c(err); - }); - }; - - - // This slowdown addr balance calculation 100% + // This slowdown addr balance calculation by 100% TransactionDb.prototype.isConfirmed = function(txId, c) { - var k = ROOT + txId; + var k = IN_BLK_PREFIX + txId; var ret = false; db.createReadStream({start: k, end: k + '~'}) @@ -422,11 +425,34 @@ function spec(b) { return c(err); }) .on('end', function (err) { - return c(err,ret); }); }; + TransactionDb.prototype.handleBlockChange = function(hash, isMain, cb) { + var k = IN_BLK_PREFIX; + var toChange = []; + console.log('Searching Txs from block:' + hash); + + // This is slow, but prevent us to create a new block->tx index. + db.createReadStream({start: k, end: k + '~'}) + .on('data', function (data) { + if (data.key.indexOf(hash)>=0) + toChange.push({ + type: 'put', + key: data.key, + value: isMain?1:0, + }); + }) + .on('error', function (err) { + return cb(err); + }) + .on('end', function (err) { + if (err) return cb(err); + console.log('\t%s %d Txs', isMain?'Confirming':'Invalidating',toChange.length); + db.batch(toChange, cb); + }); + }; // txs can be a [hashes] or [txObjects] TransactionDb.prototype.createFromArray = function(txs, blockHash, next) { @@ -435,7 +461,6 @@ function spec(b) { if (!txs) return next(); // TODO - var insertedTxs = []; var updatedAddrs = {}; async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) { @@ -448,31 +473,17 @@ function spec(b) { TransactionRpc.getRpcInfo(t, function(err, inInfo) { if (!inInfo) return each_cb(err); - self.add(inInfo, function(err) { - if (err) return each_cb(err); - - // This could mean that the TX was mined since we first received. - if (blockHash && inInfo.blockhash !== blockHash) - console.log('WARN in tx %s: different blockHashses: %s vs %s', - t, blockHash, inInfo.blockhash); - insertedTxs.push(t); - - return self.addConfirmation(t,inInfo.blockhash, each_cb); - }); + return self.add(inInfo, blockHash,each_cb); }); } else { - self.add(t, function(err) { - if (err || !blockHash) return each_cb(err); - - return self.addConfirmation(t,blockHash, each_cb); - }); + return self.add(t, blockHash, each_cb); } }, function(err) { - return next(err, insertedTxs, updatedAddrs); - }); -}; + return next(err, updatedAddrs); + }); + }; TransactionDb.prototype.createFromBlock = function(b, next) { diff --git a/test/integration/addr.json b/test/integration/addr.json index b40c03b..2fcc42f 100644 --- a/test/integration/addr.json +++ b/test/integration/addr.json @@ -43,9 +43,9 @@ }, { "addr": "mzW2hdZN2um7WBvTDerdahKqRgj3md9C29", - "txApperances": 6033, - "balance": 1049.69744101, - "totalReceived": 1049.69744101, + "txApperances": 6046, + "balance": 1149.19744101, + "totalReceived": 1149.19744101, "totalSent": 0 }, { diff --git a/test/integration/sync.js b/test/integration/sync.js index 82b87c4..9433f50 100644 --- a/test/integration/sync.js +++ b/test/integration/sync.js @@ -7,86 +7,135 @@ process.env.NODE_ENV = process.env.NODE_ENV || 'development'; var assert = require('assert'), async = require('async'), - Sync = require('../../lib/Sync').class(); - - -var b = [ - '00000000c4cbd75af741f3a2b2ff72d9ed4d83a048462c1efe331be31ccf006b', //B#16 - '00000000fe198cce4c8abf9dca0fee1182cb130df966cc428ad2a230df8da743', - '000000008d55c3e978639f70af1d2bf1fe6f09cb3143e104405a599215c89a48', - '000000009b3bca4909f38313f2746120129cce4a699a1f552390955da470c5a9', - '00000000ede57f31cc598dc241d129ccb4d8168ef112afbdc870dc60a85f5dd3', //B#20 -]; - -var fix = function(s,cb) { - async.each([1,2,3,4], function(i,c) { - s.bDb.setPrev(b[i],b[i-1], function() { - return c(); - }); - }, cb); -}; - -var test = function(s,cb) { - async.each([2,3,4], function(i,c) { - s.bDb.getPrev(b[i], function(err, p) { - assert.equal(p,0); - return c(); - }); - }, function() { - s.bDb.getPrev(b[1], function(err, p) { - assert.equal(p,b[0]); - return cb(); - }); - }); -}; - - - -var testNo = function(s,cb) { - async.each([2,3,4], function(i,c) { - s.bDb.getPrev(b[i], function(err, p) { - assert.equal(p,b[i-1]); - return c(); - }); - }, function() { - s.bDb.getPrev(b[1], function(err, p) { - assert.equal(p,b[0]); - return cb(); - }); - }); -}; - - + HistoricSync = require('../../lib/HistoricSync').class(); var s; +var b = [ + '00000000c4cbd75af741f3a2b2ff72d9ed4d83a048462c1efe331be31ccf006b', //0 B#16 + '00000000fe198cce4c8abf9dca0fee1182cb130df966cc428ad2a230df8da743', //1 + '000000008d55c3e978639f70af1d2bf1fe6f09cb3143e104405a599215c89a48', //2 + '000000009b3bca4909f38313f2746120129cce4a699a1f552390955da470c5a9', //3 + '00000000ede57f31cc598dc241d129ccb4d8168ef112afbdc870dc60a85f5dd3', //4 B#20 +]; +var t = [ + 'd08582d3711f75d085c618874fb0d049ae09d5ec95ec6f5abd289f4b54712c54', // TX from B#16 + '1729001087e0cebea8d14de1653d5cf59628d9746bc1ae65f776f1cbaff7ebad', + 'cf53d7ccd83a099acfbc319ee10c1e3b10e3d42ba675b569fdd6b69cb8d2db4e', + 'cf53d7ccd83a099acfbc319ee10c1e3b10e3d42ba675b569fdd6b69cb8d2db4e', + 'd45f9da73619799e9d7bd03cc290e70875ea4cbad56b8bffa15135fbbb3df9ea', //4 Tx from B20 +]; + +var test = function(cb) { + async.each([2,3,4], function(i,c) { + s.sync.bDb.getPrev(b[i], function(err, p) { + assert.equal(p,b[i-1]); + return c(); + }); + }, function() { + async.each([0,1,2,3,4], function(i,c) { + s.sync.bDb.has(b[i], function(err, p) { + assert(p); + return c(); + }); + }, function() { + async.each([0,1,2,3], function(i,c) { + s.sync.bDb.getNext(b[i], function(err, p) { + assert.equal(p,b[i+1]); + return c(); + }); + }, cb); + }); + }); +}; + describe('Sync checkOrphan', function(){ before(function(done) { - s = new Sync(); - fix(s,done); - }); - - after(function(done) { - - fix(s,function() { - s.close(done); + s = new HistoricSync(); + s.init({}, function(err) { + if (err) return done(err); + s.sync.destroy(done); }); - }); - it('checkOrphan', function(done) { - this.timeout(100000); - - s.bDb.has(b[0], function(err, has) { - assert(has); - s.bDb.has(b[1], function(err, has) { - assert(has); - s.checkOrphan(b[4],b[1], function() { - testNo(s,done); - }); + it('simple RPC forward syncing', function(done) { + s.getPrevNextBlock(s.genesis,b[4], { + next: true, + }, function(err) { + if (err) return done(err); + test(done); }); - }); + }); + + + it('reorg, case 1', function(done) { + var case1 = { + hash: '0000000000000000000000000000000000000000000000000000000000000001', + tx: [ '1000000000000000000000000000000000000000000000000000000000000000' ], + time: 1296690099, + previousblockhash: b[2], + }; + + async.series([ + function (c) { + s.sync.txDb.isConfirmed(t[0], function(err,is) { + assert(!err); + assert(is); + return c(); + }); + }, + function (c) { + s.sync.txDb.isConfirmed(t[4], function(err,is) { + assert(!err); + assert(is); + return c(); + }); + }, + function (c) { + s.sync.storeTipBlock(case1, function(err) { + assert(!err, 'shouldnt return error' + err); + return c(); + }); + }, + function (c) { + s.sync.bDb.isMain(b[2], function(err,is) { + assert(!err); + assert(is); + return c(); + }); + }, + function (c) { + s.sync.bDb.isMain(b[3], function(err,is) { + assert(!err); + assert(!is, b[3] + 'should not be on main chain'); + return c(); + }); + }, + function (c) { + s.sync.bDb.isMain(b[4], function(err,is) { + assert(!err); + assert(!is); + return c(); + }); + }, + function (c) { + s.sync.txDb.isConfirmed(t[0], function(err,is) { + assert(!err); + assert(is); + return c(); + }); + }, + function (c) { + s.sync.txDb.isConfirmed(t[4], function(err,is) { + assert(!err); + assert(!is); + return c(); + }); + }, + + ], done ); }); }); + diff --git a/util/sync.js b/util/sync.js index 7a9a094..0906092 100755 --- a/util/sync.js +++ b/util/sync.js @@ -18,6 +18,7 @@ program .option('-R --reverse', 'Sync backwards', 0) .option('-U --uptoexisting', 'Sync only until an existing block is found', 0) .option('-F --fromfiles', 'Sync using bitcoind .dat block files (faster)', 0) + .option('-v --verbose', 'Verbose 0/1', 0) .parse(process.argv); var historicSync = new HistoricSync();