refactor tdb and bdb

This commit is contained in:
Matias Alejo Garcia 2014-05-23 21:23:44 -03:00
parent 4aaece66a5
commit f0f936d2d8
12 changed files with 2231 additions and 843 deletions

View File

@ -5,9 +5,11 @@
*/ */
var common = require('./common'), var common = require('./common'),
async = require('async'), async = require('async'),
BlockDb = require('../../lib/BlockDb'); BlockDb = require('../../lib/BlockDb'),
TransactionDb = require('../../lib/TransactionDb');
var bdb = new BlockDb(); var bdb = new BlockDb();
var tdb = new TransactionDb();
/** /**
* Find block by hash ... * Find block by hash ...
@ -17,7 +19,7 @@ exports.block = function(req, res, next, hash) {
if (err || !block) if (err || !block)
return common.handleErrors(err, res, next); return common.handleErrors(err, res, next);
else { else {
bdb.getPoolInfo(block.info.tx[0], function(info) { tdb.getPoolInfo(block.info.tx[0], function(info) {
block.info.poolInfo = info; block.info.poolInfo = info;
req.block = block.info; req.block = block.info;
return next(); return next();
@ -67,7 +69,7 @@ var getBlock = function(blockhash, cb) {
}; };
} }
bdb.getPoolInfo(block.info.tx[0], function(info) { tdb.getPoolInfo(block.info.tx[0], function(info) {
block.info.poolInfo = info; block.info.poolInfo = info;
return cb(err, block.info); return cb(err, block.info);
}); });

1328
dev-util/node-tick-report Normal file

File diff suppressed because it is too large Load Diff

View File

@ -58,3 +58,13 @@ first 10%
Mon Mar 10 11:59:25 ART 2014 Mon Mar 10 11:59:25 ART 2014
10% de blk 0 (testnet) 10% de blk 0 (testnet)
=> 37s => 37s
Thu May 22 13:42:50 ART 2014 (base58check + toString opts + custom getStandardizedObject)
10% testnet
=> 29s
100% testnet
=> 17m6s (user time)

View File

@ -1,6 +1,5 @@
'use strict'; 'use strict';
var imports = require('soop').imports(); var imports = require('soop').imports();
var ThisParent = imports.parent || require('events').EventEmitter;
var TIMESTAMP_PREFIX = 'bts-'; // bts-<ts> => <hash> var TIMESTAMP_PREFIX = 'bts-'; // bts-<ts> => <hash>
var PREV_PREFIX = 'bpr-'; // bpr-<hash> => <prev_hash> var PREV_PREFIX = 'bpr-'; // bpr-<hash> => <prev_hash>
var NEXT_PREFIX = 'bne-'; // bne-<hash> => <next_hash> var NEXT_PREFIX = 'bne-'; // bne-<hash> => <next_hash>
@ -8,6 +7,10 @@ var MAIN_PREFIX = 'bma-'; // bma-<hash> => <height> (0 is unconnect
var TIP = 'bti-'; // bti = <hash>:<height> last block on the chain var TIP = 'bti-'; // bti = <hash>:<height> last block on the chain
var LAST_FILE_INDEX = 'file-'; // last processed file index var LAST_FILE_INDEX = 'file-'; // last processed file index
// txid - blockhash mapping (only for confirmed txs, ONLY FOR BEST BRANCH CHAIN)
var IN_BLK_PREFIX = 'btx-'; //btx-<txid> = <block>
var MAX_OPEN_FILES = 500; var MAX_OPEN_FILES = 500;
/** /**
@ -18,10 +21,15 @@ var levelup = require('levelup'),
var db = imports.db || levelup(config.leveldb + '/blocks',{maxOpenFiles: MAX_OPEN_FILES} ); var db = imports.db || levelup(config.leveldb + '/blocks',{maxOpenFiles: MAX_OPEN_FILES} );
var Rpc = imports.rpc || require('./Rpc'); var Rpc = imports.rpc || require('./Rpc');
var logger = require('./logger').logger;
var d = logger.log;
var info = logger.info;
var BlockDb = function() { var BlockDb = function() {
this.txDb = require('./TransactionDb').default();
BlockDb.super(this, arguments); BlockDb.super(this, arguments);
}; };
BlockDb.parent = ThisParent;
BlockDb.prototype.close = function(cb) { BlockDb.prototype.close = function(cb) {
db.close(cb); db.close(cb);
@ -37,42 +45,139 @@ BlockDb.prototype.drop = function(cb) {
}); });
}; };
// adds a block. Does not update Next pointer in
// the block prev to the new block, nor TIP pointer BlockDb.prototype._addBlockScript = function(b, height) {
//
BlockDb.prototype.add = function(b, height, cb) {
var self = this;
var time_key = TIMESTAMP_PREFIX + var time_key = TIMESTAMP_PREFIX +
( b.time || Math.round(new Date().getTime() / 1000) ); ( b.time || Math.round(new Date().getTime() / 1000) );
return db.batch() return [
.put(time_key, b.hash) {
.put(MAIN_PREFIX + b.hash, height) type: 'put',
.put(PREV_PREFIX + b.hash, b.previousblockhash) key: time_key,
.write(function(err){ value: b.hash,
if (!err) { },
self.emit('new_block', {blockid: b.hash}); {
} type: 'put',
cb(err); key: MAIN_PREFIX + b.hash,
value: height,
},
{
type: 'put',
key:PREV_PREFIX + b.hash,
value: b.previousblockhash,
},
];
};
BlockDb.prototype._delTxsScript = function(txs) {
var dbScript =[];
for(var ii in txs){
dbScript.push({
type: 'del',
key: IN_BLK_PREFIX + txs[ii],
}); });
}
return dbScript;
};
BlockDb.prototype._addTxsScript = function(txs, hash, height) {
var dbScript =[];
for(var ii in txs){
dbScript.push({
type: 'put',
key: IN_BLK_PREFIX + txs[ii],
value: hash+':'+height,
});
}
return dbScript;
};
// Returns blockHash and height for a given txId (If the tx is on the MAIN chain).
BlockDb.prototype.getBlockForTx = function(txId, cb) {
db.get(IN_BLK_PREFIX + txId,function (err, val) {
if (err && err.notFound) return cb();
if (err) return cb(err);
var v = val.split(':');
return cb(err,v[0],parseInt(v[1]));
});
};
BlockDb.prototype._changeBlockHeight = function(hash, height, cb) {
var self = this;
var dbScript1 = this._setHeightScript(hash,height);
d('Getting TXS FROM %s to set it Main', hash);
this.fromHashWithInfo(hash, function(err, bi) {
if (!bi || !bi.info || !bi.info.tx)
throw new Error('unable to get info for block:'+ hash);
var dbScript2;
if (height>=0) {
dbScript2 = self._addTxsScript(bi.info.tx, hash, height);
info('\t%s %d Txs', 'Confirming', bi.info.tx.length);
} else {
dbScript2 = self._delTxsScript(bi.info.tx);
info('\t%s %d Txs', 'Unconfirming', bi.info.tx.length);
}
db.batch(dbScript2.concat(dbScript1),cb);
});
};
BlockDb.prototype.setBlockMain = function(hash, height, cb) {
this._changeBlockHeight(hash,height,cb);
};
BlockDb.prototype.setBlockNotMain = function(hash, cb) {
this._changeBlockHeight(hash,-1,cb);
};
// adds a block (and its txs). Does not update Next pointer in
// the block prev to the new block, nor TIP pointer
//
BlockDb.prototype.add = function(b, height, cb) {
d('adding block %s #d', b,height);
var dbScript = this._addBlockScript(b,height);
dbScript = dbScript.concat(this._addTxsScript(b.tx,b.hash, height));
this.txDb.addMany(b.tx, function(err) {
if (err) return cb(err);
db.batch(dbScript,cb);
});
}; };
BlockDb.prototype.getTip = function(cb) { BlockDb.prototype.getTip = function(cb) {
if (this.cachedTip){
var v = this.cachedTip.split(':');
return cb(null,v[0], parseInt(v[1]));
}
var self = this;
db.get(TIP, function(err, val) { db.get(TIP, function(err, val) {
if (!val) return cb(); if (!val) return cb();
self.cachedTip = val;
var v = val.split(':'); var v = val.split(':');
return cb(err,v[0], parseInt(v[1])); return cb(err,v[0], parseInt(v[1]));
}); });
}; };
BlockDb.prototype.setTip = function(hash, height, cb) { BlockDb.prototype.setTip = function(hash, height, cb) {
console.log('[BlockDb.js.75] TIP', hash, height); //TODO this.cachedTip = hash + ':' + height;
db.put(TIP, hash + ':' + height, function(err) { db.put(TIP, this.cachedTip, function(err) {
return cb(err); return cb(err);
}); });
}; };
BlockDb.prototype.getDepth = function(hash, cb) {
var v = this.cachedTip.split(':');
if (!v) throw new Error('getDepth called with not cachedTip');
this.getHeight(hash, function(err,h){
return cb(err,parseInt(v[1]) - h);
});
};
//mainly for testing //mainly for testing
BlockDb.prototype.setPrev = function(hash, prevHash, cb) { BlockDb.prototype.setPrev = function(hash, prevHash, cb) {
db.put(PREV_PREFIX + hash, prevHash, function(err) { db.put(PREV_PREFIX + hash, prevHash, function(err) {
@ -119,11 +224,13 @@ BlockDb.prototype.getHeight = function(hash, cb) {
}); });
}; };
BlockDb.prototype.setHeight = function(hash, height, cb) { BlockDb.prototype._setHeightScript = function(hash, height) {
if (!height) console.log('\tNew orphan: %s',hash); d('setHeight: %s #%d', hash,height);
db.put(MAIN_PREFIX + hash, height, function(err) { return ([{
return cb(err); type: 'put',
}); key: MAIN_PREFIX + hash,
value: height,
}]);
}; };
BlockDb.prototype.setNext = function(hash, nextHash, cb) { BlockDb.prototype.setNext = function(hash, nextHash, cb) {

View File

@ -14,12 +14,28 @@ var Sync = require('./Sync');
var sockets = require('../app/controllers/socket.js'); var sockets = require('../app/controllers/socket.js');
var BlockExtractor = require('./BlockExtractor.js'); var BlockExtractor = require('./BlockExtractor.js');
var buffertools = require('buffertools'); var buffertools = require('buffertools');
var Address = bitcore.Address; var bitcoreUtil = bitcore.util;
var Bignum = bitcore.Bignum;
var Script = bitcore.Script;
// This is 0.1.2 = > c++ version of base58-native
var base58 = require('base58-native').base58Check;
var encodedData = require('soop').load('bitcore/util/EncodedData',{
base58: base58
});
var versionedData= require('soop').load('bitcore/util/VersionedData',{
parent: encodedData
});
var Address = require('soop').load('bitcore/lib/Address',{
parent: versionedData
});
var logger = require('./logger').logger;
var d = logger.log;
var info = logger.info;
var error = logger.error;
// var bitcoreUtil = require('bitcore/util/util');
// var Deserialize = require('bitcore/Deserialize'); // var Deserialize = require('bitcore/Deserialize');
var BAD_GEN_ERROR = 'Bad genesis block. Network mismatch between Insight and bitcoind? Insight is configured for:'; var BAD_GEN_ERROR = 'Bad genesis block. Network mismatch between Insight and bitcoind? Insight is configured for:';
var BAD_GEN_ERROR_DB = 'Bad genesis block. Network mismatch between Insight and levelDB? Insight is configured for:'; var BAD_GEN_ERROR_DB = 'Bad genesis block. Network mismatch between Insight and levelDB? Insight is configured for:';
@ -41,35 +57,26 @@ function HistoricSync(opts) {
this.sync = new Sync(opts); this.sync = new Sync(opts);
} }
function p() {
var args = [];
Array.prototype.push.apply(args, arguments);
args.unshift('[historic_sync]');
/*jshint validthis:true */
console.log.apply(this, args);
}
HistoricSync.prototype.showProgress = function() { HistoricSync.prototype.showProgress = function() {
var self = this; var self = this;
if ( self.status ==='syncing' && if ( self.status ==='syncing' &&
( self.syncedBlocks ) % self.step !== 1) return; ( self.syncedBlocks ) % self.step !== 1) return;
if (self.error) { if (self.error)
p('ERROR: ' + self.error); error(self.error);
}
else { else {
self.updatePercentage(); self.updatePercentage();
p(util.format('status: [%d%%]', self.syncPercentage)); info(util.format('status: [%d%%]', self.syncPercentage));
} }
if (self.shouldBroadcast) { if (self.shouldBroadcast) {
sockets.broadcastSyncInfo(self.info()); sockets.broadcastSyncInfo(self.info());
} }
//
// if (self.syncPercentage > 10) { // if (self.syncPercentage > 10) {
// process.exit(-1); // process.exit(-1);
// } // }
}; };
@ -133,6 +140,61 @@ HistoricSync.prototype.getBlockFromRPC = function(cb) {
}); });
}; };
HistoricSync.prototype._fromBuffer = function (buf) {
var buf2 = buffertools.reverse(buf);
return parseInt(buf2.toString('hex'), 16);
};
HistoricSync.prototype.getStandardizedTx = function (tx, time) {
var self = this;
tx.txid = bitcoreUtil.formatHashFull(tx.getHash());
var ti=0;
tx.vin = tx.ins.map(function(txin) {
var ret = {n: ti++};
if (txin.isCoinBase()) {
ret.isCoinBase = true;
} else {
ret.txid = buffertools.reverse(new Buffer(txin.getOutpointHash())).toString('hex');
ret.vout = txin.getOutpointIndex();
}
return ret;
});
var to = 0;
tx.vout = tx.outs.map(function(txout) {
var val;
if (txout.s) {
var s = new Script(txout.s);
var addrs = new Address.fromScriptPubKey(s, config.network);
// support only for p2pubkey p2pubkeyhash and p2sh
if (addrs && addrs.length === 1) {
val = {addresses: [addrs[0].toString() ] };
}
}
return {
valueSat: self._fromBuffer(txout.v),
scriptPubKey: val,
n: to++,
};
});
tx.time = time;
return tx;
};
HistoricSync.prototype.getStandardizedBlock = function(b) {
var self = this;
var block = {
hash: bitcoreUtil.formatHashFull(b.getHash()),
previousblockhash: bitcoreUtil.formatHashFull(b.prev_hash),
time: b.timestamp,
};
block.tx = b.txs.map(function(tx){
return self.getStandardizedTx(tx, b.timestamp);
});
return block;
};
HistoricSync.prototype.getBlockFromFile = function(cb) { HistoricSync.prototype.getBlockFromFile = function(cb) {
var self = this; var self = this;
@ -141,36 +203,7 @@ HistoricSync.prototype.getBlockFromFile = function(cb) {
//get Info //get Info
self.blockExtractor.getNextBlock(function(err, b) { self.blockExtractor.getNextBlock(function(err, b) {
if (err || ! b) return cb(err); if (err || ! b) return cb(err);
blockInfo = b.getStandardizedObject(b.txs, self.network); blockInfo = self.getStandardizedBlock(b);
blockInfo.previousblockhash = blockInfo.prev_block;
var ti=0;
// Get TX Address
b.txs.forEach(function(t) {
var objTx = blockInfo.tx[ti++];
//add time from block
objTx.time = blockInfo.time;
var to=0;
t.outs.forEach( function(o) {
try {
var s = new Script(o.s);
var addrs = new Address.fromScriptPubKey(s, config.network);
// support only for p2pubkey p2pubkeyhash and p2sh
if (addrs.length === 1) {
objTx.out[to].addrStr = addrs[0];
}
} catch (e) {
console.log('WARN Could not processs: ' + objTx.txid ,e);
}
to++;
});
});
self.sync.bDb.setLastFileIndex(self.blockExtractor.currentFileIndex, function(err) { self.sync.bDb.setLastFileIndex(self.blockExtractor.currentFileIndex, function(err) {
return cb(err,blockInfo); return cb(err,blockInfo);
}); });
@ -235,7 +268,7 @@ HistoricSync.prototype.updateStartBlock = function(next) {
self.sync.bDb.fromHashWithInfo(tip, function(err, bi) { self.sync.bDb.fromHashWithInfo(tip, function(err, bi) {
blockInfo = bi ? bi.info : {}; blockInfo = bi ? bi.info : {};
if (oldtip) if (oldtip)
self.sync.setBlockHeight(oldtip, false, cb); self.sync.setBlockHeight(oldtip, -1, cb);
else else
return cb(); return cb();
}); });
@ -249,16 +282,18 @@ HistoricSync.prototype.updateStartBlock = function(next) {
} }
else { else {
oldtip = tip; oldtip = tip;
if (!tip)
throw new Error('Previous blockchain tip was not found on bitcoind. Please reset Insight DB. Tip was:'+tip)
tip = blockInfo.previousblockhash; tip = blockInfo.previousblockhash;
assert(tip); info('Previous TIP is now orphan. Back to:' + tip);
p('Previous TIP is now orphan. Back to:' + tip);
ret = true; ret = true;
} }
return ret; return ret;
}, },
function(err) { function(err) {
self.startBlock = tip; self.startBlock = tip;
p('Resuming sync from block:'+tip); info('Resuming sync from block:'+tip);
return next(err); return next(err);
} }
); );
@ -275,7 +310,7 @@ HistoricSync.prototype.prepareFileSync = function(opts, next) {
try { try {
self.blockExtractor = new BlockExtractor(config.bitcoind.dataDir, config.network); self.blockExtractor = new BlockExtractor(config.bitcoind.dataDir, config.network);
} catch (e) { } catch (e) {
p(e.message + '. Disabling file sync.'); info(e.message + '. Disabling file sync.');
return next(); return next();
} }
@ -288,7 +323,7 @@ HistoricSync.prototype.prepareFileSync = function(opts, next) {
var h = self.genesis; var h = self.genesis;
p('Seeking file to:' + self.startBlock); info('Seeking file to:' + self.startBlock);
//forward till startBlock //forward till startBlock
async.whilst( async.whilst(
function() { function() {
@ -320,18 +355,18 @@ HistoricSync.prototype.prepareRpcSync = function(opts, next) {
HistoricSync.prototype.showSyncStartMessage = function() { HistoricSync.prototype.showSyncStartMessage = function() {
var self = this; var self = this;
p('Got ' + self.connectedCountDB + info('Got ' + self.connectedCountDB +
' blocks in current DB, out of ' + self.blockChainHeight + ' block at bitcoind'); ' blocks in current DB, out of ' + self.blockChainHeight + ' block at bitcoind');
if (self.blockExtractor) { if (self.blockExtractor) {
p('bitcoind dataDir configured...importing blocks from .dat files'); info('bitcoind dataDir configured...importing blocks from .dat files');
p('First file index: ' + self.blockExtractor.currentFileIndex); info('First file index: ' + self.blockExtractor.currentFileIndex);
} }
else { else {
p('syncing from RPC (slow)'); info('syncing from RPC (slow)');
} }
p('Starting from: ', self.startBlock); info('Starting from: ', self.startBlock);
self.showProgress(); self.showProgress();
}; };
@ -389,7 +424,7 @@ HistoricSync.prototype.start = function(opts, next) {
var self = this; var self = this;
if (self.status==='starting' || self.status==='syncing') { if (self.status==='starting' || self.status==='syncing') {
p('## Wont start to sync while status is %s', self.status); error('## Wont start to sync while status is %s', self.status);
return next(); return next();
} }

View File

@ -63,9 +63,9 @@ PeerSync.prototype.handleTx = function(info) {
if (self.shouldBroadcast) { if (self.shouldBroadcast) {
sockets.broadcastTx(tx); sockets.broadcastTx(tx);
if (tx.addrsToEmit) { if (tx.addrsToEmit) {
tx.addrsToEmit.forEach(function(a) { for(var ii in tx.addrsToEmit){
sockets.broadcastAddressTx(a, tx.txid); sockets.broadcastAddressTx(tx.addrsToEmit[ii], tx.txid);
}); }
} }
} }
} }

View File

@ -7,6 +7,12 @@ var bitcore = require('bitcore');
var networks = bitcore.networks; var networks = bitcore.networks;
var async = require('async'); var async = require('async');
var logger = require('./logger').logger;
var d = logger.log;
var info = logger.info;
var syncId = 0; var syncId = 0;
function Sync(opts) { function Sync(opts) {
@ -15,6 +21,7 @@ function Sync(opts) {
this.bDb = require('./BlockDb').default(); this.bDb = require('./BlockDb').default();
this.txDb = require('./TransactionDb').default(); this.txDb = require('./TransactionDb').default();
this.network = config.network === 'testnet' ? networks.testnet : networks.livenet; this.network = config.network === 'testnet' ? networks.testnet : networks.livenet;
this.cachedLastHash = null;
} }
Sync.prototype.close = function(cb) { Sync.prototype.close = function(cb) {
@ -56,7 +63,8 @@ Sync.prototype.destroy = function(next) {
* \ * \
* F-G-NEW * F-G-NEW
* 1) Set F-G as connected (mark TXs as valid) * 1) Set F-G as connected (mark TXs as valid)
* 2) Declare D-E orphans (and possible invalidate TXs on them) * 2) Set new heights in F-G-NEW
* 3) Declare D-E orphans (and possible invalidate TXs on them)
* *
* *
* Case 3) * Case 3)
@ -77,22 +85,22 @@ Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
if (!b) return cb(); if (!b) return cb();
var self = this; var self = this;
var oldTip, oldNext, height, needReorg = false; var oldTip, oldNext, oldHeight, needReorg = false, height = -1;
var newPrev = b.previousblockhash; var newPrev = b.previousblockhash;
async.series([ async.series([
function(c) { function(c) {
// TODO? remove this check?
self.bDb.has(b.hash, function(err, val) { self.bDb.has(b.hash, function(err, val) {
return c(err || return c(err ||
(val ? new Error('WARN: Ignoring already existing block:' + b.hash) : null)); (val ? new Error('WARN: Ignoring already existing block:' + b.hash) : null));
}); });
}, },
function(c) { function(c) {
if (!allowReorgs) return c(); if (!allowReorgs || newPrev === self.cachedLastHash) return c();
self.bDb.has(newPrev, function(err, val) { self.bDb.has(newPrev, function(err, val) {
// Genesis? no problem
if (!val && newPrev.match(/^0+$/)) return c(); if (!val && newPrev.match(/^0+$/)) return c();
return c(err || return c(err ||
(!val ? new Error('NEED_SYNC Ignoring block with non existing prev:' + b.hash) : null)); (!val ? new Error('NEED_SYNC Ignoring block with non existing prev:' + b.hash) : null));
}); });
@ -101,8 +109,8 @@ Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
if (!allowReorgs) return c(); if (!allowReorgs) return c();
self.bDb.getTip(function(err, hash, h) { self.bDb.getTip(function(err, hash, h) {
oldTip = hash; oldTip = hash;
if (!hash) height = -1 oldHeight = hash ? (h || 0) : -1
else height = h || 0;
if (oldTip && newPrev !== oldTip) { if (oldTip && newPrev !== oldTip) {
needReorg = true; needReorg = true;
console.log('## REORG Triggered, tip mismatch'); console.log('## REORG Triggered, tip mismatch');
@ -110,11 +118,7 @@ Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
return c(); return c();
}); });
}, },
function(c) {
self.txDb.createFromBlock(b, height, function(err) {
return c(err);
});
},
function(c) { function(c) {
if (!needReorg) return c(); if (!needReorg) return c();
self.bDb.getNext(newPrev, function(err, val) { self.bDb.getNext(newPrev, function(err, val) {
@ -123,17 +127,29 @@ Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
return c(); return c();
}); });
}, },
function(c) { function(c) {
self.bDb.add(b, height + 1, c); if (!allowReorgs) return c();
if (needReorg) {
info('NEW TIP: %s NEED REORG (old tip: %s #%d)', b.hash, oldTip, oldHeight);
self.processReorg(oldTip, oldNext, newPrev, oldHeight, function(err, h) {
if (err) throw err;
height = h;
return c();
});
}
else {
height = oldHeight + 1;
return c();
}
}, },
function(c) { function(c) {
if (!needReorg) return c(); self.cachedLastHash = b.hash; // just for speed up.
console.log('NEW TIP: %s #%d NEED REORG (old tip: %s)', b.hash, height + 1, oldTip); self.bDb.add(b, height, c);
self.processReorg(oldTip, oldNext, newPrev, height + 1, c);
}, },
function(c) { function(c) {
if (!allowReorgs) return c(); if (!allowReorgs) return c();
self.bDb.setTip(b.hash, height + 1, function(err) { self.bDb.setTip(b.hash, height, function(err) {
return c(err); return c(err);
}); });
}, },
@ -152,21 +168,20 @@ Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
}); });
}; };
Sync.prototype.processReorg = function(oldTip, oldNext, newPrev, oldHeight, cb) {
Sync.prototype.processReorg = function(oldTip, oldNext, newPrev, newHeight, cb) {
var self = this; var self = this;
var orphanizeFrom; var orphanizeFrom, newHeight;
async.series([ async.series([
function(c) { function(c) {
self.bDb.getHeight(newPrev, function(err, height) { self.bDb.getHeight(newPrev, function(err, height) {
if (!height) return c(); if (!height) return c(new Error('Could not found block:' + newPrev));
if (height<0) return c();
console.log('# Reorg Case 1)'); newHeight = height + 1;
// case 1 info('# Reorg Case 1) OldNext: %s NewHeight: %d', oldNext, newHeight);
orphanizeFrom = oldNext; orphanizeFrom = oldNext;
return c(err); return c(err);
}); });
@ -174,10 +189,12 @@ Sync.prototype.processReorg = function(oldTip, oldNext, newPrev, newHeight, cb)
function(c) { function(c) {
if (orphanizeFrom) return c(); if (orphanizeFrom) return c();
console.log('# Reorg Case 2)'); info('# Reorg Case 2)');
self.setBranchConnectedBackwards(newPrev, newHeight-1, function(err, yHash, newYHashNext) { self.setBranchConnectedBackwards(newPrev, function(err, yHash, newYHashNext, height) {
if (err) return c(err); if (err) return c(err);
newHeight = height;
self.bDb.getNext(yHash, function(err, yHashNext) { self.bDb.getNext(yHash, function(err, yHashNext) {
// Connect the new branch, and orphanize the old one.
orphanizeFrom = yHashNext; orphanizeFrom = yHashNext;
self.bDb.setNext(yHash, newYHashNext, function(err) { self.bDb.setNext(yHash, newYHashNext, function(err) {
return c(err); return c(err);
@ -187,26 +204,17 @@ Sync.prototype.processReorg = function(oldTip, oldNext, newPrev, newHeight, cb)
}, },
function(c) { function(c) {
if (!orphanizeFrom) return c(); if (!orphanizeFrom) return c();
self.setBranchOrphan(orphanizeFrom, function(err) { self._setBranchOrphan(orphanizeFrom, function(err) {
return c(err); return c(err);
}); });
}, },
], ],
function(err) { function(err) {
return cb(err); return cb(err, newHeight);
}); });
}; };
//height = false => unconnnected Sync.prototype._setBranchOrphan = function(fromHash, cb) {
Sync.prototype.setBlockHeight = function(hash, height, cb) {
var self = this;
self.bDb.setHeight(hash, height, function(err) {
if (err) return cb(err);
return self.txDb.handleBlockChange(hash, height>-1? true : false, cb);
});
};
Sync.prototype.setBranchOrphan = function(fromHash, cb) {
var self = this, var self = this,
hashInterator = fromHash; hashInterator = fromHash;
@ -215,7 +223,7 @@ Sync.prototype.setBranchOrphan = function(fromHash, cb) {
return hashInterator; return hashInterator;
}, },
function(c) { function(c) {
self.setBlockHeight(hashInterator, false, function(err) { self.bDb.setBlockNotMain(hashInterator, function(err) {
if (err) return cb(err); if (err) return cb(err);
self.bDb.getNext(hashInterator, function(err, val) { self.bDb.getNext(hashInterator, function(err, val) {
hashInterator = val; hashInterator = val;
@ -225,45 +233,53 @@ Sync.prototype.setBranchOrphan = function(fromHash, cb) {
}, cb); }, cb);
}; };
Sync.prototype.setBranchConnectedBackwards = function(fromHash, initialHeight, cb) { Sync.prototype.setBranchConnectedBackwards = function(fromHash, cb) {
//console.log('[Sync.js.219:setBranchConnectedBackwards:]',fromHash); //TODO
var self = this, var self = this,
hashInterator = fromHash, hashInterator = fromHash,
lastHash = fromHash, lastHash = fromHash,
isMain; yHeight,
branch = [];
async.doWhilst( async.doWhilst(
function(c) { function(c) {
self.setBlockHeight(hashInterator, initialHeight--, function(err) { branch.unshift(hashInterator);
self.bDb.getPrev(hashInterator, function(err, val) {
if (err) return c(err); if (err) return c(err);
self.bDb.getPrev(hashInterator, function(err, val) { lastHash = hashInterator;
if (err) return c(err); hashInterator = val;
lastHash = hashInterator; self.bDb.getHeight(hashInterator, function(err, height) {
hashInterator = val; yHeight = height;
self.bDb.getHeight(hashInterator, function(err, height) { return c();
isMain = height ? 1 : 0;
return c();
});
}); });
}); });
}, },
function() { function() {
return hashInterator && !isMain; return hashInterator && !yHeight;
}, },
function(err) { function() {
console.log('\tFound yBlock:', hashInterator); info('\tFound yBlock: %s #%d', hashInterator, yHeight);
return cb(err, hashInterator, lastHash); var heightIter = yHeight + 1;
} var hashIter;
); async.whilst(
function() {
hashIter = branch.shift();
return hashIter;
},
function(c) {
self.setBlockMain(hashIter, heightIter++, c);
},
function(err) {
return cb(err, hashInterator, lastHash, heightIter);
});
});
}; };
//Store unconfirmed TXs //Store unconfirmed TXs
Sync.prototype.storeTxs = function(txs, cb) { Sync.prototype.storeTxs = function(txs, cb) {
var self = this; this.txDb.addMany(txs, cb);
self.txDb.createFromArray(txs, null, null, function(err) {
if (err) return cb(err);
return cb(err);
});
}; };

View File

@ -1,15 +1,7 @@
'use strict'; 'use strict';
var imports = require('soop').imports(); var imports = require('soop').imports();
var PoolMatch = imports.poolMatch || require('soop').load('./PoolMatch',config);
// txid - blockhash mapping (only for confirmed txs)
var IN_BLK_PREFIX = 'txb-'; //txb-<txid> = <block>
var RECENT_ = 'tx-'; //tx-<block> = <txid>
pre-<height>- =txid
// to show tx outs // to show tx outs
var OUTS_PREFIX = 'txo-'; //txo-<txid>-<n> => [addr, btc_sat] var OUTS_PREFIX = 'txo-'; //txo-<txid>-<n> => [addr, btc_sat]
@ -37,19 +29,13 @@ var bitcore = require('bitcore'),
config = require('../config/config'), config = require('../config/config'),
assert = require('assert'); assert = require('assert');
var logger = require('./logger').logger;
var d = logger.log;
var info = logger.info;
var warn = logger.warn;
var db = imports.db || levelup(config.leveldb + '/txs',{maxOpenFiles: MAX_OPEN_FILES} ); var db = imports.db || levelup(config.leveldb + '/txs',{maxOpenFiles: MAX_OPEN_FILES} );
var Script = bitcore.Script;
// This is 0.1.2 = > c++ version of base57-native var PoolMatch = imports.poolMatch || require('soop').load('./PoolMatch',config);
var base58 = require('base58-native').base58Check;
var encodedData = require('soop').load('bitcore/util/EncodedData',{
base58: base58
});
var versionedData= require('soop').load('bitcore/util/VersionedData',{
parent: encodedData
});
var Address = require('soop').load('bitcore/lib/Address',{
parent: versionedData
});
var TransactionDb = function() { var TransactionDb = function() {
TransactionDb.super(this, arguments); TransactionDb.super(this, arguments);
@ -195,9 +181,8 @@ TransactionDb.prototype._fillOutpoints = function(info, cb) {
async.eachLimit(info.vin, CONCURRENCY, function(i, c_in) { async.eachLimit(info.vin, CONCURRENCY, function(i, c_in) {
self.fromTxIdN(i.txid, i.vout, info.confirmations, function(err, ret) { self.fromTxIdN(i.txid, i.vout, info.confirmations, function(err, ret) {
//console.log('[TransactionDb.js.154:ret:]',ret); //TODO
if (!ret || !ret.addr || !ret.valueSat) { if (!ret || !ret.addr || !ret.valueSat) {
console.log('Could not get TXouts in %s,%d from %s ', i.txid, i.vout, info.txid); info('Could not get TXouts in %s,%d from %s ', i.txid, i.vout, info.txid);
if (ret) i.unconfirmedInput = ret.unconfirmedInput; if (ret) i.unconfirmedInput = ret.unconfirmedInput;
incompleteInputs = 1; incompleteInputs = 1;
return c_in(); // error not scalated return c_in(); // error not scalated
@ -258,7 +243,6 @@ TransactionDb.prototype._getInfo = function(txid, next) {
Rpc.getTxInfo(txid, function(err, info) { Rpc.getTxInfo(txid, function(err, info) {
if (err) return next(err); if (err) return next(err);
self._fillOutpoints(info, function() { self._fillOutpoints(info, function() {
self._fillSpent(info, function() { self._fillSpent(info, function() {
return next(null, info); return next(null, info);
@ -341,10 +325,13 @@ TransactionDb.prototype.fromTxIdN = function(txid, n, confirmations, cb) {
TransactionDb.prototype.fillConfirmations = function(o, cb) { TransactionDb.prototype.fillConfirmations = function(o, cb) {
var self = this; var self = this;
self.getHeight(o.txid, function(err, height) { console.log('[TransactionDb.js.339]'); //TODO
self.getBlock(o.txid, function(err, hash) {
console.log('[TransactionDb.js.342]'); //TODO
if (err) return cb(err); if (err) return cb(err);
o.height = height; o.isConfirmed = hash?1:0;
if (!o.spentTxId) return cb(); if (!o.spentTxId) return cb();
if (o.multipleSpentAttempts) { if (o.multipleSpentAttempts) {
@ -352,20 +339,20 @@ TransactionDb.prototype.fillConfirmations = function(o, cb) {
//TODO save it for later is height > 6 //TODO save it for later is height > 6
async.eachLimit(o.multipleSpentAttempts, CONCURRENCY, async.eachLimit(o.multipleSpentAttempts, CONCURRENCY,
function(oi, e_c) { function(oi, e_c) {
self.getHeight(oi.spentTxId, function(err, height) { self.getBlock(oi.spentTxId, function(err, hash) {
if (err) return; if (err) return;
if (height) { if (hash) {
o.spentTxId = oi.spentTxId; o.spentTxId = oi.spentTxId;
o.index = oi.index; o.index = oi.index;
o.spentHeight = height; o.spentIsConfirmed = 1;
} }
return e_c(); return e_c();
}); });
}, cb); }, cb);
} else { } else {
self.getHeight(o.spentTxId, function(err, height) { self.getBlock(o.spentTxId, function(err, hash) {
if (err) return cb(err); if (err) return cb(err);
o.spentHeight = height; o.spentIsConfirmed = hash?1:0;
return cb(); return cb();
}); });
} }
@ -459,215 +446,95 @@ TransactionDb.prototype.removeFromTxId = function(txid, cb) {
}; };
TransactionDb.prototype.adaptTxObject = function(txInfo) {
var self = this;
// adapt bitcore TX object to bitcoind JSON response
txInfo.txid = txInfo.hash;
TransactionDb.prototype._addScript = function(tx) {
var relatedAddrs = [];
var dbScript = [];
var ts = tx.time;
var txid = tx.txid;
var to = 0; // Input Outpoints (mark them as spent)
var tx = txInfo; if (!tx.isCoinBase){
if (tx.outs) { for(var ii in tx.vin) {
tx.outs.forEach(function(o) { var i = tx.vin[ii];
var s = new Script(o.s); dbScript.push({
var addrs = new Address.fromScriptPubKey(s, config.network); type: 'put',
key: SPENT_PREFIX + i.txid + '-' + i.vout + '-' + txid + '-' + i.n,
// support only for p2pubkey p2pubkeyhash and p2sh value: ts || 0,
if (addrs.length === 1) { });
tx.out[to].addrStr = addrs[0].toString(); }
tx.out[to].n = to;
}
to++;
});
} }
var count = 0; for(var ii in tx.vout) {
txInfo.vin = txInfo. in .map(function(txin) { var o = tx.vout[ii];
var i = {}; if ((o.value||o.valueSat) &&
o.scriptPubKey &&
o.scriptPubKey.addresses &&
o.scriptPubKey.addresses[0] && !o.scriptPubKey.addresses[1] // TODO : not supported=> standard multisig
) {
var addr = o.scriptPubKey.addresses[0];
var sat = o.valueSat || (o.value * util.COIN).toFixed(0);
if (txin.coinbase) { relatedAddrs[addr]=1;
txInfo.isCoinBase = true; var k = OUTS_PREFIX + txid + '-' + o.n;
} else { dbScript.push({
i.txid = txin.prev_out.hash; type: 'put',
i.vout = txin.prev_out.n; key: k,
} value: addr + ':' + sat,
i.n = count++; },{
return i; type: 'put',
}); key: ADDR_PREFIX + addr + '-' + txid + '-' + o.n,
value: sat + ':' + ts,
});
count = 0; }
txInfo.vout = txInfo.out.map(function(txout) { }
var o = {}; tx.relatedAddrs=relatedAddrs;
return dbScript;
o.value = txout.value;
o.n = count++;
if (txout.addrStr) {
o.scriptPubKey = {};
o.scriptPubKey.addresses = [txout.addrStr];
}
return o;
});
}; };
TransactionDb.prototype.add = function(tx, blockhash, cb) {
TransactionDb.prototype.add = function(tx, blockhash, height, cb) { var dbScript = this._addScript(tx, blockhash);
var self = this; db.batch(dbScript, cb);
var addrs = [];
if (typeof height === 'undefined')
throw new Error('add should received height');
if (tx.hash) self.adaptTxObject(tx);
var ts = tx.time;
async.series([
// Input Outpoints (mark them as spent)
function(p_c) {
if (tx.isCoinBase) return p_c();
async.forEachLimit(tx.vin, CONCURRENCY,
function(i, next_out) {
db.batch()
.put(SPENT_PREFIX + i.txid + '-' + i.vout + '-' + tx.txid + '-' + i.n,
ts || 0)
.write(next_out);
},
function(err) {
return p_c(err);
});
},
// Parse Outputs
function(p_c) {
async.forEachLimit(tx.vout, CONCURRENCY,
function(o, next_out) {
if (o.value && o.scriptPubKey &&
o.scriptPubKey.addresses &&
o.scriptPubKey.addresses[0] && !o.scriptPubKey.addresses[1] // TODO : not supported
) {
var addr = o.scriptPubKey.addresses[0];
var sat = Math.round(o.value * util.COIN);
if (addrs.indexOf(addr) === -1) {
addrs.push(addr);
}
// existed?
var k = OUTS_PREFIX + tx.txid + '-' + o.n;
db.get(k, function(err, val) {
if (!val || (err && err.notFound)) {
db.batch()
.put(k, addr + ':' + sat)
.put(ADDR_PREFIX + addr + '-' + tx.txid + '-' + o.n, sat + ':' + ts)
.write(next_out);
} else {
return next_out();
}
});
} else {
return next_out();
}
},
function(err) {
if (err) {
console.log('ERR at TX %s: %s', tx.txid, err);
return cb(err);
}
return p_c();
});
},
function(p_c) {
if (!blockhash) {
return p_c();
}
return self.setBlock(tx.txid, blockhash, p_c);
},
], function(err) {
if (addrs.length > 0 && !blockhash) {
// only emit if we are processing a single tx (not from a block)
tx.addrsToEmit=addrs;
}
return cb(err);
});
}; };
TransactionDb.prototype._addManyFromObjs = function(txs, next) {
var dbScript = [];
for(var ii in txs){
TransactionDb.prototype.setBlock = function(txId, blockHash, c) { var s = this._addScript(txs[ii]);
if (!blockHash) return c(); dbScript = dbScript.concat(s);
db.batch() }
.put(IN_BLK_PREFIX + txId, blockHash) db.batch(dbScript, next);
.write(c);
}; };
TransactionDb.prototype.getBlock = function(txId, c) { TransactionDb.prototype._addManyFromHashes = function(txs, next) {
db.get(IN_BLK_PREFIX + txId,c); var self=this;
}; var dbScript = [];
async.eachLimit(txs, CONCURRENCY, function(tx, each_cb) {
if (tx === genesisTXID)
return each_cb();
TransactionDb.prototype.handleBlockChange = function(hash, isConnected, cb) { Rpc.getTxInfo(tx, function(err, inInfo) {
var toChange = []; if (!inInfo) return each_cb(err);
console.log('\tSearching Txs from block:' + hash); dbScript = dbScript.concat(self._addScript(inInfo));
return each_cb();
var k = FROM_BLK_PREFIX + hash;
var k2 = IN_BLK_PREFIX;
db.createReadStream({
start: k,
end: k + '~'
})
.on('data', function(data) {
var ks = data.key.split('-');
toChange.push({
key: k2 + ks[2] + '-' + ks[1],
type: 'put',
value: height,
});
})
.on('error', function(err) {
return cb(err);
})
.on('end', function(err) {
if (err) return cb(err);
console.log('\t%s %d Txs', height ? 'Confirming' : 'Unconfirming', toChange.length);
db.batch(toChange, cb);
}); });
};
// txs can be a [hashes] or [txObjects]
TransactionDb.prototype.createFromArray = function(txs, blockHash, height, next) {
var self = this;
if (!txs) return next();
async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) {
if (typeof t === 'string') {
// TODO: parse it from networks.genesisTX?
if (t === genesisTXID) return each_cb();
Rpc.getTxInfo(t, function(err, inInfo) {
if (!inInfo) return each_cb(err);
return self.add(inInfo, blockHash, height, each_cb);
});
} else {
return self.add(t, blockHash, height, each_cb);
}
}, },
function(err) { function(err) {
return next(err); if (err) return next(err);
db.batch(dbScript,next);
}); });
}; };
TransactionDb.prototype.createFromBlock = function(b, height, next) { TransactionDb.prototype.addMany = function(txs, next) {
var self = this; if (!txs) return next();
if (!b || !b.tx) return next();
if (typeof height === 'undefined') var fn = (typeof txs[0] ==='string') ?
throw new Error('createFromBlock should received height'); this._addManyFromHashes : this._addManyFromObjs;
return self.createFromArray(b.tx, b.hash, height, next); return fn.apply(this,[txs, next]);
}; };

View File

@ -60,13 +60,13 @@
"soop": "=0.1.5", "soop": "=0.1.5",
"commander": "*", "commander": "*",
"bignum": "*", "bignum": "*",
"winston": "*",
"express": "~3.4.7", "express": "~3.4.7",
"buffertools": "*", "buffertools": "*",
"should": "~2.1.1", "should": "~2.1.1",
"socket.io": "~0.9.16", "socket.io": "~0.9.16",
"moment": "~2.5.0", "moment": "~2.5.0",
"sinon": "~1.7.3", "sinon": "~1.7.3",
"chai": "~1.8.1",
"xmlhttprequest": "~1.6.0", "xmlhttprequest": "~1.6.0",
"bufferput": "git://github.com/bitpay/node-bufferput.git" "bufferput": "git://github.com/bitpay/node-bufferput.git"
}, },
@ -80,6 +80,7 @@
"grunt-nodemon": "~0.2.0", "grunt-nodemon": "~0.2.0",
"grunt-mocha-test": "~0.8.1", "grunt-mocha-test": "~0.8.1",
"should": "2.1.1", "should": "2.1.1",
"chai": "=1.9.1",
"grunt-markdown": "~0.5.0" "grunt-markdown": "~0.5.0"
} }
} }

View File

@ -29,8 +29,12 @@ describe('TransactionDb fromIdWithInfo', function(){
assert.equal(tx.txid, txid); assert.equal(tx.txid, txid);
assert(!tx.info.isCoinBase); assert(!tx.info.isCoinBase);
for(var i=0; i<20; i++) for(var i=0; i<20; i++) {
assert(parseFloat(tx.info.vin[i].value) === parseFloat(50), 'input '+i); assert(parseFloat(tx.info.vin[i].value) === parseFloat(50), 'input '+i);
}
console.log('[01-transactionouts.js.34:tx:]',tx.info.vin[0]); //TODO
assert(tx.info.vin[0].addr === 'msGKGCy2i8wbKS5Fo1LbWUTJnf1GoFFG59', 'addr 0'); assert(tx.info.vin[0].addr === 'msGKGCy2i8wbKS5Fo1LbWUTJnf1GoFFG59', 'addr 0');
assert(tx.info.vin[1].addr === 'mfye7oHsdrHbydtj4coPXCasKad2eYSv5P', 'addr 1'); assert(tx.info.vin[1].addr === 'mfye7oHsdrHbydtj4coPXCasKad2eYSv5P', 'addr 1');
done(); done();
@ -134,7 +138,7 @@ describe('TransactionDb Outs', function(){
assert.equal(readItems.length,0); assert.equal(readItems.length,0);
var unmatch=[]; var unmatch=[];
txDb.createFromArray([v.txid], null, function(err) { txDb.addMany([v.txid], function(err) {
if (err) return done(err); if (err) return done(err);
txDb.fromTxId( v.txid, function(err, readItems) { txDb.fromTxId( v.txid, function(err, readItems) {

View File

@ -27,7 +27,7 @@ describe('TransactionDb Expenses', function(){
function(txid,c_out) { function(txid,c_out) {
async.each(spentValid[txid], async.each(spentValid[txid],
function(i,c_in) { function(i,c_in) {
txDb.createFromArray([i.txid], null, function(err) { txDb.addMany([i.txid], function(err) {
return c_in(); return c_in();
}); });
}, },

File diff suppressed because it is too large Load Diff