diff --git a/lib/BlockDb.js b/lib/BlockDb.js index 59ba778b..44831ae7 100644 --- a/lib/BlockDb.js +++ b/lib/BlockDb.js @@ -6,7 +6,7 @@ require('classtool'); function spec() { var TIMESTAMP_ROOT = 'b-ts-'; - var ORPHAN_FLAG_ROOT = 'b-orphan-'; + var PREV_ROOT = 'b-prev-'; // b-prev- => (0 if orphan) /** @@ -16,7 +16,6 @@ function spec() { util = require('bitcore/util/util'), levelup = require('levelup'), BitcoreBlock= require('bitcore/Block').class(), - TransactionDb = require('.//TransactionDb'), config = require('../config/config'), fs = require('fs'); @@ -44,18 +43,34 @@ function spec() { var time_key = TIMESTAMP_ROOT + ( b.timestamp || Math.round(new Date().getTime() / 1000) ); - self.db.batch() .put(time_key, b.hash) - .put(ORPHAN_FLAG_ROOT + b.hash, b.isOrphan || 0) + .put(PREV_ROOT + b.hash, b.prev_block) .write(cb); }; + + BlockDb.prototype.setOrphan = function(hash, cb) { + var self = this; + + + var k = PREV_ROOT + hash; + self.db.get(k, function (err,oldPrevHash) { + if (err || !oldPrevHash) return cb(err); + + self.db.put(PREV_ROOT + hash, 0, function() { + return cb(err, oldPrevHash); + }); + }); + + // We keep the block in TIMESTAMP_ROOT + }; + BlockDb.prototype.countNotOrphan = function(hash, cb) { var c = 0; - this.db.createReadStream({start: ORPHAN_FLAG_ROOT}) + this.db.createReadStream({start: PREV_ROOT}) .on('data', function (data) { - if (data === false) c++; + if (data.value !== 0) c++; }) .on('error', function (err) { return cb(err); @@ -71,11 +86,9 @@ function spec() { BlockDb.prototype.has = function(hash, cb) { var self = this; - var k = ORPHAN_FLAG_ROOT + hash; + var k = PREV_ROOT + hash; self.db.get(k, function (err,val) { - var ret; - if (err && err.notFound) { err = null; ret = false; diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js index 9f1b3062..69ac2ead 100644 --- a/lib/HistoricSync.js +++ b/lib/HistoricSync.js @@ -313,14 +313,17 @@ var kk=0; }, //store it function(c) { + if (self.prevHash && blockInfo.prev_block !== self.prevHash) { - /* - * In file sync, orphan blocks are just ignored. - * This is to simplify our schema and the - * sync process - */ - if (blockInfo.isOrphan) return c(); + console.log('Orphans found: %s vs %s @%s', + self.prevHash + ' vs. ' + blockInfo.prev_block, blockInfo.hash); + self.sync.setOrphan(self.prevHash, blockInfo.prev_block, c); + } + else return c(); + }, + //store it + function(c) { self.sync.storeBlock(blockInfo, function(err) { existed = err && err.toString().match(/E11000/); @@ -329,18 +332,11 @@ var kk=0; }); }, function(c) { - - if (self.prevHash && blockInfo.prev_block !== self.prevHash) { - self.setError('found orphan:' + self.prevHash + ' vs. ' + blockInfo.prev_block); - } - else { - - if (blockInfo && blockInfo.hash) { - self.prevHash = blockInfo.hash; - self.syncedBlocks++; - } else - self.status = 'finished'; - } + if (blockInfo && blockInfo.hash) { + self.prevHash = blockInfo.hash; + self.syncedBlocks++; + } else + self.status = 'finished'; return c(); }, diff --git a/lib/Sync.js b/lib/Sync.js index 39b4348b..bfa2d556 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -96,6 +96,32 @@ function spec() { }); }; + Sync.prototype.setOrphan = function(fromBlock, toBlock, c) { + var self = this; + + var c = fromBlock; + + async.whilst( + function () { + return c !== toBlock; + }, + function () { + console.log('[Sync.js.113]: setOrphan', c); //TODO + self.txDb.setOrphan(c, function(err, insertedTxs, updateAddrs) { + if (err) return cb(err); + + self.blockDb.setOrphan(c, function(err, prevHash){ + + c = prevHash; + return cb(err); + }); + }); + }, + function (err) { + return c(err); + } + ); + }; Sync.prototype._handleBroadcast = function(hash, inserted_txs, updated_addrs) { var self = this; diff --git a/lib/TransactionDb.js b/lib/TransactionDb.js index 4ca43e62..918fe7a5 100644 --- a/lib/TransactionDb.js +++ b/lib/TransactionDb.js @@ -4,9 +4,16 @@ require('classtool'); function spec() { - var ROOT = 'tx-'; //tx-- => [addr, btc_sat] + + // blockHash -> txid mapping (to orphanize )/ + var ROOT = 'tx-b-'; //tx-b- => txid + + // to show tx outs var OUTS_ROOT = 'txouts-'; //txouts-- => [addr, btc_sat] - var ADDR_ROOT = 'txouts-addr-'; //txouts-addr---- => (+/-) btc_sat + + // to sum up addr balance + var ADDR_ROOT = 'txouts-addr-'; //txouts-addr---- => + btc_sat + var SPEND_ROOT = 'txouts-spend-';//txouts-spend-- => [txid(in),n(in),ts] // TODO: use bitcore networks module var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'; @@ -37,29 +44,12 @@ function spec() { }); }; -/* - txidBuf: { - type: Buffer, - index: true, - }, - index: Number, - addr: { - type: String, - index: true, - }, - value_sat: Number, - fromOrphan: Boolean, - - spendTxIdBuf: Buffer, - spendIndex: Number, - spendFromOrphan: Boolean, - */ // TransactionDb.prototype.fromTxIdOne = function(txid, cb) { TODO TransactionDb.prototype.has = function(txid, cb) { var self = this; - var k = ROOT + txid; + var k = OUTS_ROOT + txid; self.db.get(k, function (err,val) { var ret; @@ -75,6 +65,31 @@ function spec() { }); }; + TransactionDb.prototype.fromTxId = function(txid, cb) { + var self = this; + + var k = OUTS_ROOT + txid; + var ret=[]; + + self.db.createReadStream({start: k, end: k + '~'}) + .on('data', function (data) { + var k = data.key.split('-'); + var v = data.value.split(':'); + ret.push({ + addr: v[0], + value_sat: v[1], + index: k[2], + }); + }) + .on('error', function (err) { + return cb(err); + }) + .on('end', function (err) { + return cb(err, ret); + }); + }; + + TransactionDb.prototype.fromTxIdN = function(txid, n, cb) { var self = this; @@ -92,6 +107,49 @@ function spec() { }); }; + // Only for testing. Very slow (toRm outs to rm, only to speedup) + TransactionDb.prototype.removeFromTxId = function(txid, toRm, cb) { + var self = this; + + async.series([ + function(c) { + self.db.createReadStream({ + start: OUTS_ROOT + txid, + end: OUTS_ROOT + txid + '~', + }).pipe( + self.db.createWriteStream({type:'del'}) + ).on('close', c); + }, + function(s_c) { + if (toRm && toRm.length) return s_c(); + + toRm = []; + self.db.createReadStream({ + start: SPEND_ROOT + }) + .on('data', function(data) { + if (data.value.indexOf(txid) >= 0) { + toRm.push(data.key); + console.log('To Remove Found', data.key); //TODO + } + }) + .on('end', function() { + return s_c(); + }); + }, + function(s_c) { + async.each(toRm, function(k,e_c) { + self.db.del(k,e_c); + }, s_c); + }], + function(err) { + cb(err); + }); + + }; + + + TransactionDb.prototype.adaptTxObject = function(txInfo) { // adapt bitcore TX object to bitcoind JSON response @@ -129,42 +187,26 @@ function spec() { }; - TransactionDb.prototype.add = function(tx, fromOrphan, cb) { + TransactionDb.prototype.add = function(tx, cb) { var self = this; var addrs = []; var is_new = true; if (tx.hash) self.adaptTxObject(tx); - //TODO - var ts = 1; + var ts = tx.timestamp; - //TODO - if (fromOrphan) return cb(); - async.series([ // Input Outpoints (mark them as spended) function(p_c) { if (tx.isCoinBase) return p_c(); async.forEachLimit(tx.vin, CONCURRENCY, function(i, next_out) { - - // TODO -return next_out(); - -/* self.db.batch() - .put() - var data = { - txidBuf: b, - index: i.vout, - - spendTxIdBuf: bTxId, - spendIndex: i.n, - }; - if (fromOrphan) data.spendFromOrphan = true; - Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out); -*/ + self.db.batch() + .put( SPEND_ROOT + i.txid + '-' + i.vout , + tx.txid + ':' + i.n + ':' + ts) + .write(next_out); }, function (err) { if (err) { @@ -185,18 +227,15 @@ return next_out(); o.scriptPubKey.addresses[0] && ! o.scriptPubKey.addresses[1] // TODO : not supported ){ - // This is only to broadcast (WIP) // if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { // addrs.push(o.scriptPubKey.addresses[0]); // } - //if (fromOrphan) data.fromOrphan = true; // TODO - var addr = o.scriptPubKey.addresses[0]; var sat = o.value * util.COIN; self.db.batch() - .put( OUTS_ROOT + tx.txid + o.n, addr + ':' + sat) + .put( OUTS_ROOT + tx.txid + '-' + o.n, addr + ':' + sat) .put( ADDR_ROOT + addr + '-' + ts + '-' + tx.txid + '-' + o.n, sat) .write(next_out); @@ -224,7 +263,7 @@ return next_out(); }); }; - TransactionDb.prototype.createFromArray = function(txs, fromOrphan, blockHash, next) { + TransactionDb.prototype.createFromArray = function(txs, blockHash, next) { var self = this; if (!txs) return next(); @@ -235,6 +274,7 @@ return next_out(); async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) { if (typeof t === 'string') { + // Is it from genesis block? (testnet==livenet) // TODO: parse it from networks.genesisTX? if (t === genesisTXID) return each_cb(); @@ -242,28 +282,27 @@ return next_out(); TransactionRpc.getRpcInfo(t, function(err, inInfo) { if (!inInfo) return each_cb(err); - self.add(inInfo, fromOrphan, function(err) { - if (err) return each_cb(err); + self.add(inInfo, function(err) { + if (err || !blockHash) return each_cb(err); - self.db.put(ROOT + t, blockHash, function(err) { + self.db.put(ROOT + blockHash, t, function(err) { return each_cb(err); }); }); }); } else { - self.add(t, fromOrphan, function(err) { + self.add(t, function(err) { if (err) return each_cb(err); - self.db.put(ROOT + t.txid, blockHash, function(err) { + self.db.put(ROOT + blockHash, t.txid, function(err) { return each_cb(err); }); }); } }, function(err) { - - +console.log('[TransactionDb.js.308]'); //TODO return next(err, insertedTxs, updatedAddrs); }); }; @@ -274,9 +313,22 @@ return next_out(); var self = this; if (!b.tx) return next(); - return self.createFromArray(b.tx, b.isOrphan, b.hash, next); + return self.createFromArray(b.tx, b.hash, next); }; + + TransactionDb.prototype.setOrphan = function(blockHash, next) { +// var self = this; + + //Get Txs +// TODO + + //Mark Tx's output as fromOrphan + //Mark Tx's outpoiunt as fromOrphan. Undo spents + return next(); + }; + + return TransactionDb; } module.defineClass(spec); diff --git a/test/integration/01-transactionouts.js b/test/integration/01-transactionouts.js index 57a7f2d4..fc5a3160 100644 --- a/test/integration/01-transactionouts.js +++ b/test/integration/01-transactionouts.js @@ -5,60 +5,57 @@ process.env.NODE_ENV = process.env.NODE_ENV || 'development'; -var mongoose = require('mongoose'), +var assert = require('assert'), fs = require('fs'), util = require('util'), config = require('../../config/config'), - TransactionOut = require('../../app/models/TransactionOut'); + TransactionDb = require('../../lib/TransactionDb').class(); -var txItemsValid = JSON.parse(fs.readFileSync('test/model/txitems.json')); - -mongoose.connection.on('error', function(err) { console.log(err); }); +var txItemsValid = JSON.parse(fs.readFileSync('test/integration/txitems.json')); describe('TransactionOut', function(){ - before(function(done) { - mongoose.connect(config.db); - done(); - }); - - after(function(done) { - mongoose.connection.close(); - done(); - }); + var tdb = new TransactionDb(); txItemsValid.forEach( function(v) { if (v.disabled) return; - it('test a exploding tx ' + v.txid, function(done) { + it('test a processing tx ' + v.txid, function(done) { + this.timeout(60000); // Remove first - TransactionOut.removeFromTxId(v.txid, function(err) { - TransactionOut._explodeTransactionOuts(v.txid, function(err, tx) { - if (err) done(err); + tdb.removeFromTxId(v.txid, v.toRm, function() { - TransactionOut - .fromTxId( v.txid, function(err, readItems) { + tdb.fromTxId( v.txid, function(err, readItems) { + assert.equal(readItems.length,0); - var unmatch={}; + var unmatch=[]; + tdb.createFromArray([v.txid], null, function(err) { + if (err) return done(err); + + tdb.fromTxId( v.txid, function(err, readItems) { + + v.items.forEach(function(validItem){ + unmatch[validItem.addr] =1; + }); + assert.equal(readItems.length,v.items.length); + + v.items.forEach(function(validItem){ + var readItem = readItems.shift(); + + assert.equal(readItem.addr,validItem.addr); + assert.equal(readItem.value_sat,validItem.value_sat); + assert.equal(readItem.index,validItem.index); + assert.equal(readItem.spendIndex, null); + assert.equal(readItem.spendTxIdBuf, null); + delete unmatch[validItem.addr]; + }); + + var valid = util.inspect(v.items, { depth: null }); + assert(!Object.keys(unmatch).length,'\n\tUnmatchs:' + Object.keys(unmatch) + "\n\n" +valid + '\nvs.\n' + readItems); + return done(); - v.items.forEach(function(validItem){ - unmatch[validItem.addr] =1; }); - v.items.forEach(function(validItem){ - var readItem = readItems.shift(); - assert.equal(readItem.addr,validItem.addr); - assert.equal(readItem.value_sat,validItem.value_sat); - assert.equal(readItem.index,validItem.index); - assert.equal(readItem.spendIndex, null); - assert.equal(readItem.spendTxIdBuf, null); - delete unmatch[validItem.addr]; - }); - - var valid = util.inspect(v.items, { depth: null }); - assert(!Object.keys(unmatch).length,'\n\tUnmatchs:' + Object.keys(unmatch) + "\n\n" +valid + '\nvs.\n' + readItems); - done(); - }); }); }); diff --git a/test/integration/block.js b/test/integration/block.js index 9ca09c09..1b6ea508 100644 --- a/test/integration/block.js +++ b/test/integration/block.js @@ -7,68 +7,38 @@ process.env.NODE_ENV = process.env.NODE_ENV || 'development'; var TESTING_BLOCK = '000000000185678d3d7ecc9962c96418174431f93fe20bf216d5565272423f74'; var - mongoose= require('mongoose'), assert = require('assert'), config = require('../../config/config'), - Block = require('../../app/models/Block'); + BlockDb = require('../../lib/BlockDb').class(); -mongoose.connection.on('error', function(err) { console.log(err); }); - -describe('Block fromHashWithInfo', function(){ - - before(function(done) { - mongoose.connect(config.db); - done(); - }); - - after(function(done) { - mongoose.connection.close(); - done(); - }); - - - it('should poll block\'s info from mongoose', function(done) { - Block.fromHashWithInfo(TESTING_BLOCK, function(err, b2) { - if (err) done(err); - - - var h = new Buffer(TESTING_BLOCK,'hex'); - assert(b2.hashStr === TESTING_BLOCK); - assert.equal(b2.hashStr, TESTING_BLOCK); - done(); - }); - }); +describe('BlockDb fromHashWithInfo', function(){ + var bdb = new BlockDb(); it('should poll block\'s info from bitcoind', function(done) { - Block.fromHashWithInfo(TESTING_BLOCK, function(err, b2) { + bdb.fromHashWithInfo(TESTING_BLOCK, function(err, b2) { if (err) done(err); + assert.equal(b2.hash, TESTING_BLOCK); assert.equal(b2.info.hash, TESTING_BLOCK); assert.equal(b2.info.chainwork, '000000000000000000000000000000000000000000000000001b6dc969ffe847'); done(); }); }); - - - it('hash Virtuals SET', function(done) { - var b = new Block(); - b.hashStr = 'a1a2'; - assert.equal(b.hash.toString('hex'),'a1a2'); - b.nextBlockHashStr = 'a1a3'; - assert.equal(b.nextBlockHash.toString('hex'),'a1a3'); - done(); + it('return true in has', function(done) { + bdb.has(TESTING_BLOCK, function(err, has) { + assert.equal(has, true); +console.log('[block.js.29:has:]',has); //TODO + done(); + }); + }); + it('return false in has', function(done) { + bdb.has('111', function(err, has) { + assert.equal(has, false); + done(); + }); }); - it('hash Virtuals GET', function(done) { - var b = new Block(); - b.hash = new Buffer('a1a2','hex'); - assert.equal(b.hashStr,'a1a2'); - - b.nextBlockHash = new Buffer('b2b1','hex'); - assert.equal(b.nextBlockHashStr,'b2b1'); - done(); - }); }); diff --git a/test/integration/txitems.json b/test/integration/txitems.json index 72ff9699..eae6b116 100644 --- a/test/integration/txitems.json +++ b/test/integration/txitems.json @@ -5,6 +5,9 @@ }, { "txid": "21798ddc9664ac0ef618f52b151dda82dafaf2e26d2bbef6cdaf55a6957ca237", + "toRm": [ + "txouts-spend-86a03cac7d87f596008c6d5a8d3fd8b88842932ea6f0337673eda16f6b472f7f-0" + ], "items": [ { "addr": "mzjLe62faUqCSjkwQkwPAL5nYyR8K132fA", @@ -20,6 +23,9 @@ }, { "txid": "b633a6249d4a2bc123e7f8a151cae2d4afd17aa94840009f8697270c7818ceee", + "toRm": [ + "txouts-spend-01621403689cb4a95699a3dbae029d7031c5667678ef14e2054793954fb27917-0" + ], "items": [ { "addr": "mhfQJUSissP6nLM5pz6DxHfctukrrLct2T", @@ -35,6 +41,9 @@ }, { "txid": "ca2f42e44455b8a84434de139efea1fe2c7d71414a8939e0a20f518849085c3b", + "toRm": [ + "txouts-spend-2d7b680fb06e4d7eeb65ca49ac7522276586e0090b7fe662fc708129429c5e6a-0" + ], "items": [ { "addr": "mhqyL1nDQDo1WLH9qH8sjRjx2WwrnmAaXE",