reorg case 1) working and tested

This commit is contained in:
Matias Alejo Garcia 2014-02-08 10:57:37 -03:00
parent 94edb188d0
commit 9ebc70a3e4
11 changed files with 551 additions and 353 deletions

View File

@ -78,7 +78,7 @@ function spec() {
self.transactions.push(txItem.spendTxId);
self.txApperances +=2;
}
};
}
});
return cb();
});

30
dev-util/level.js Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env node
'use strict';
var config = require('../config/config'),
levelup = require('levelup');
var s = process.argv[2];
var isBlock = process.argv[3] === '1';
var dbPath = config.leveldb + (isBlock ? '/blocks' : '/txs');
console.log('DB: ',dbPath); //TODO
var db = levelup(dbPath );
db.createReadStream({start: s, end: s+'~'})
.on('data', function (data) {
console.log(data.key + ' => ' + data.value); //TODO
})
.on('error', function () {
})
.on('end', function () {
});

View File

@ -1,19 +0,0 @@
#!/usr/bin/env node
var
config = require('../config/config'),
levelup = require('levelup');
db = levelup(config.leveldb + '/txs');
var s = 'txouts-addr-mgqvRGJMwR9JU5VhJ3x9uX9MTkzTsmmDgQ';
db.createReadStream({start: s, end: s+'~'})
.on('data', function (data) {
console.log('[block-level.js.11:data:]',data); //TODO
if (data==false) c++;
})
.on('end', function () {
});

View File

@ -5,8 +5,11 @@ require('classtool');
function spec(b) {
var TIMESTAMP_ROOT = 'b-ts-'; // b-ts-<ts> => <hash>
var PREV_ROOT = 'b-prev-'; // b-prev-<hash> => <prev_hash> (0 if orphan)
var TIMESTAMP_PREFIX = 'b-ts-'; // b-ts-<ts> => <hash>
var PREV_PREFIX = 'b-prev-'; // b-prev-<hash> => <prev_hash>
var NEXT_PREFIX = 'b-next-'; // b-next-<hash> => <next_hash>
var MAIN_PREFIX = 'b-main-'; // b-main-<hash> => 1/0
var TIP = 'b-tip-'; // last block on the chain
/**
@ -20,7 +23,6 @@ function spec(b) {
var db = b.db || levelup(config.leveldb + '/blocks');
var rpc = b.rpc || new RpcClient(config.bitcoind);
var BlockDb = function() {
};
@ -38,53 +40,71 @@ function spec(b) {
});
};
// adds a block TIP block. Does not update Next pointer in
// the block prev to the new block.
//
BlockDb.prototype.add = function(b, cb) {
if (!b.hash) return cb(new Error('no Hash at Block.save'));
var time_key = TIMESTAMP_ROOT +
var time_key = TIMESTAMP_PREFIX +
( b.time || Math.round(new Date().getTime() / 1000) );
db.batch()
return db.batch()
.put(time_key, b.hash)
.put(PREV_ROOT + b.hash, b.previousblockhash)
.put(TIP, b.hash)
.put(MAIN_PREFIX + b.hash, 1)
.put(PREV_PREFIX + b.hash, b.previousblockhash)
.write(cb);
};
BlockDb.prototype.setOrphan = function(hash, cb) {
var k = PREV_ROOT + hash;
db.get(k, function (err,oldPrevHash) {
if (err || !oldPrevHash) return cb(err);
db.put(PREV_ROOT + hash, 0, function() {
return cb(err, oldPrevHash);
});
});
// We keep the block in TIMESTAMP_ROOT
};
//mainly for testing
BlockDb.prototype.setPrev = function(hash, prevHash, cb) {
db.put(PREV_ROOT + hash, prevHash, function(err) {
return cb(err);
});
};
//mainly for testing
BlockDb.prototype.getPrev = function(hash, cb) {
db.get(PREV_ROOT + hash, function(err,val) {
BlockDb.prototype.getTip = function(cb) {
db.get(TIP, function(err, val) {
return cb(err,val);
});
};
//mainly for testing
BlockDb.prototype.setPrev = function(hash, prevHash, cb) {
db.put(PREV_PREFIX + hash, prevHash, function(err) {
return cb(err);
});
};
BlockDb.prototype.getPrev = function(hash, cb) {
db.get(PREV_PREFIX + hash, function(err,val) {
if (err && err.notFound) { err = null; val = null;}
return cb(err,val);
});
};
BlockDb.prototype.getNext = function(hash, cb) {
db.get(NEXT_PREFIX + hash, function(err,val) {
if (err && err.notFound) { err = null; val = null;}
return cb(err,val);
});
};
BlockDb.prototype.isMain = function(hash, cb) {
db.get(MAIN_PREFIX + hash, function(err, val) {
if (err && err.notFound) { err = null; val = 0;}
return cb(err,parseInt(val));
});
};
BlockDb.prototype.setMain = function(hash, isMain, cb) {
if (!isMain) console.log('ORPHAN: %s',hash);
db.put(MAIN_PREFIX + hash, isMain?1:0, function(err) {
return cb(err);
});
};
BlockDb.prototype.setNext = function(hash, nextHash, cb) {
db.put(NEXT_PREFIX + hash, nextHash, function(err) {
return cb(err);
});
};
BlockDb.prototype.countNotOrphan = function(cb) {
var c = 0;
console.log('Counting connected blocks. This could take some minutes');
db.createReadStream({start: PREV_ROOT, end: PREV_ROOT + '~' })
db.createReadStream({start: MAIN_PREFIX, end: MAIN_PREFIX + '~' })
.on('data', function (data) {
if (data.value !== 0) c++;
})
@ -96,8 +116,9 @@ function spec(b) {
});
};
// .has() return true orphans also
BlockDb.prototype.has = function(hash, cb) {
var k = PREV_ROOT + hash;
var k = PREV_PREFIX + hash;
db.get(k, function (err,val) {
var ret;
if (err && err.notFound) {
@ -130,14 +151,14 @@ function spec(b) {
BlockDb.prototype.getBlocksByDate = function(start_ts, end_ts, limit, cb) {
var list = [];
db.createReadStream({
start: TIMESTAMP_ROOT + start_ts,
end: TIMESTAMP_ROOT + end_ts,
start: TIMESTAMP_PREFIX + start_ts,
end: TIMESTAMP_PREFIX + end_ts,
fillCache: true,
limit: parseInt(limit) // force to int
})
.on('data', function (data) {
list.push({
ts: data.key.replace(TIMESTAMP_ROOT, ''),
ts: data.key.replace(TIMESTAMP_PREFIX, ''),
hash: data.value,
});
})

View File

@ -105,6 +105,8 @@ function spec() {
HistoricSync.prototype.showProgress = function() {
var self = this;
if ( ( self.syncedBlocks + self.skippedBlocks) % self.step !== 1) return;
if (self.error) {
p('ERROR: ' + self.error);
}
@ -143,18 +145,23 @@ function spec() {
},
//show some (inacurate) status
function(c) {
if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) {
self.showProgress();
}
self.showProgress();
return c();
},
function(c) {
self.rpc.getBlock(blockHash, function(err, ret) {
if (err) return c(err);
if (ret) {
blockInfo = ret.result;
// this is to match block retreived from file
if (blockInfo.hash === self.genesis)
blockInfo.previousblockhash='0000000000000000000000000000000000000000000000000000000000000000';
}
else {
blockInfo = null;
}
blockInfo = ret ? ret.result : null;
return c();
});
},
@ -162,34 +169,25 @@ function spec() {
function(c) {
if (existed) return c();
self.sync.storeBlock(blockInfo, function(err) {
self.sync.storeTipBlock(blockInfo, function(err) {
return c(err);
});
},
/* TODO: Should Start to sync backwards? (this is for partial syncs)
function(c) {
if (blockInfo.result.prevblockhash != current.blockHash) {
p("reorg?");
scanOpts.prev = 1;
}
return c();
}
*/
], function(err) {
}], function(err) {
if (err) {
self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockHash, err, self.syncedBlocks));
self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]',
blockHash, err, self.syncedBlocks));
return cb(err);
}
else {
self.status = 'syncing';
}
if ( (scanOpts.upToExisting && existed && self.syncedBlocks >= self.blockChainHeight) ||
if ( (scanOpts.upToExisting && existed &&
self.syncedBlocks >= self.blockChainHeight) ||
(blockEnd && blockEnd === blockHash)) {
self.status = 'finished';
p('DONE. Found existing block: ', blockHash);
p('DONE. Found block: ', blockHash);
self.showProgress();
return cb(err);
}
@ -251,91 +249,72 @@ function spec() {
return addrStrs;
};
HistoricSync.prototype.getBlockFromFile = function(scanOpts, cb) {
HistoricSync.prototype.getBlockFromFile = function(cb) {
var self = this;
var blockInfo;
//get Info
self.blockExtractor.getNextBlock(function(err, b) {
if (err || ! b) return cb(err);
blockInfo = b.getStandardizedObject(b.txs, self.network);
// blockInfo.curWork = Deserialize.intFromCompact(b.bits);
// We keep the RPC field names
blockInfo.previousblockhash = blockInfo.prev_block;
var ti=0;
// Get TX Address
b.txs.forEach(function(t) {
var objTx = blockInfo.tx[ti++];
//add time from block
objTx.time = blockInfo.time;
var to=0;
t.outs.forEach( function(o) {
var s = new Script(o.s);
var addrs = self.getAddrStr(s);
// support only for p2pubkey p2pubkeyhash and p2sh
if (addrs.length === 1) {
objTx.out[to].addrStr = addrs[0];
}
to++;
});
});
return cb(err,blockInfo);
});
};
HistoricSync.prototype.nextBlockFromFile = function(scanOpts, cb) {
var self = this;
var blockInfo;
var existed;
async.series([
//show some (inacurate) status
function(c) {
if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) {
self.showProgress();
}
self.showProgress();
return c();
},
//get Info
function(c) {
self.blockExtractor.getNextBlock(function(err, b) {
if (err || ! b) return c(err);
blockInfo = b.getStandardizedObject(b.txs, self.network);
// blockInfo.curWork = Deserialize.intFromCompact(b.bits);
// We keep the RPC field names
blockInfo.previousblockhash = blockInfo.prev_block;
var ti=0;
// Get TX Address
b.txs.forEach(function(t) {
var objTx = blockInfo.tx[ti++];
//add time from block
objTx.time = blockInfo.time;
var to=0;
t.outs.forEach( function(o) {
var s = new Script(o.s);
var addrs = self.getAddrStr(s);
// support only p2pubkey p2pubkeyhash and p2sh
if (addrs.length === 1) {
objTx.out[to].addrStr = addrs[0];
}
to++;
});
});
return c();
});
},
//check prev
function(c) {
if (blockInfo && self.prevHash && blockInfo.previousblockhash !== self.prevHash) {
console.log('Hole found @%s', blockInfo.hash);
console.log('From: %s To: %s', self.prevHash, blockInfo.previousblockhash);
self.sync.checkOrphan(self.prevHash, blockInfo.previousblockhash, c);
}
else return c();
},
//check it
function(c) {
if (!blockInfo) return c();
self.sync.bDb.has(blockInfo.hash, function(err, had) {
existed = had;
return c(err);
});
},
//store it
function(c) {
if (!blockInfo || existed) return c();
self.sync.storeBlock(blockInfo, function(err) {
self.getBlockFromFile(function(err, inBlockInfo) {
blockInfo = inBlockInfo;
return c(err);
});
},
function(c) {
self.sync.storeTipBlock(blockInfo, function(err) {
return c(err);
});
},
function(c) {
// continue
if (blockInfo && blockInfo.hash) {
self.prevHash = blockInfo.hash;
if (existed)
self.skippedBlocks++;
else
self.syncedBlocks++;
} else
self.status = 'finished';
@ -452,12 +431,15 @@ function spec() {
p(' scanOpts: ', JSON.stringify(scanOpts));
if (scanOpts.fromFiles) {
self.status = 'syncing';
self.type = 'from .dat Files';
self.type = 'from .dat Files';
async.whilst(function() {
return self.status === 'syncing';
}, function (w_cb) {
self.getBlockFromFile(scanOpts, function(err) {
self.nextBlockFromFile(scanOpts, function(err) {
setImmediate(function(){
return w_cb(err);
});

View File

@ -8,6 +8,7 @@ function spec() {
var Sync = require('./Sync').class();
var Peer = require('bitcore/Peer').class();
var config = require('../config/config');
var networks = require('bitcore/networks');
var peerdb_fn = 'peerdb.json';
@ -16,18 +17,15 @@ function spec() {
PeerSync.prototype.init = function(opts, cb) {
if (!opts) opts = {};
var network = opts && (opts.network || 'testnet');
var networkName = opts && (opts.network || 'testnet');
var network = networkName === 'testnet' ? networks.testnet : network.livenet;
this.verbose = opts.verbose;
this.peerdb = undefined;
this.sync = new Sync({
networkName: network
});
this.sync = new Sync();
this.PeerManager = require('bitcore/PeerManager').createClass({
opts: {
network: network
}
});
this.peerman = new this.PeerManager();
this.load_peers();
@ -77,19 +75,23 @@ function spec() {
console.log('[p2p_sync] Handle block: ' + blockHash);
}
var tx_hashes = block.txs.map(function(tx) {
return coinUtil.formatHashFull(tx.hash);
});
this.sync.storeBlock({
this.sync.storeTipBlock({
'hash': blockHash,
'tx': tx_hashes,
'previousblockhash': coinUtil.formatHashFull(block.prev_hash),
},
function(err) {
if (err) {
console.log('[p2p_sync] Error in handle Block: ' + err);
}
// Check for reorgs...
// The previous last block hash
// if different => call
});
};
@ -110,6 +112,8 @@ function spec() {
});
this.peerman.on('connection', function(conn) {
console.log('[PeerSync.js.113]'); //TODO
conn.on('inv', self.handle_inv.bind(self));
conn.on('block', self.handle_block.bind(self));
conn.on('tx', self.handle_tx.bind(self));

View File

@ -11,15 +11,13 @@ function spec() {
function Sync() {
this.bDb = new BlockDb();
this.txDb = new TransactionDb();
}
Sync.prototype.init = function(opts, cb) {
var self = this;
self.opts = opts;
this.bDb = new BlockDb(opts);
this.txDb = new TransactionDb(opts);
return cb();
};
@ -39,79 +37,202 @@ function spec() {
], next);
};
/*
* Arrives a NEW block, which is the new TIP
*
* Case 0) Simple case
* A-B-C-D-E(TIP)-NEW
*
* Case 1)
* A-B-C-D-E(TIP)
* \
* NEW
*
* 1) Declare D-E orphans (and possible invalidate TXs on them)
*
* Case 2)
* A-B-C-D-E(TIP)
* \
* F-G-NEW
* 1) Set F-G as connected (mark TXs as valid)
* 2) Declare D-E orphans (and possible invalidate TXs on them)
*
*
* Case 3)
*
* A-B-C-D-E(TIP) ... NEW
*
* 1) Get NEW.prev recusively until existing block
* then case 0) / 1) / 2)
*
*/
Sync.prototype.storeBlock = function(block, cb) {
Sync.prototype.storeTipBlock = function(b, cb) {
var self = this;
var oldTip, oldNext, needReorg = true;
var newPrev = b.previousblockhash;
var updatedTxs, updatedAddrs;
async.series([
function(c) {
self.txDb.createFromBlock(b, function(err, txs, addrs) {
updatedTxs = txs;
updatedAddrs = addrs;
return c(err);
});
},
function(c) {
self.bDb.getTip(function(err, val) {
oldTip = val;
if (typeof oldTip === 'undefined' || newPrev === oldTip) {
needReorg = false;
}
return c();
});
},
function(c) {
if (!needReorg) return c();
self.bDb.getNext( newPrev, function(err, val) {
if (err) return c(err);
oldNext = val;
return c();
});
},
function(c) {
self.bDb.add(b, c);
},
function(c) {
if (!needReorg) return c();
console.log('NEW TIP: %s NEED REORG', b.hash, oldTip);
// TODO should modify updatedTxs and addrs.
self.processReorg(oldTip, oldNext, newPrev, cb);
},
function(c) {
self.bDb.setNext(newPrev, b.hash, function(err) {
return c(err);
});
}],
function(err) {
self._handleBroadcast(b, updatedTxs, updatedAddrs);
return cb(err);
});
};
Sync.prototype.processReorg = function(oldTip, oldNext, newPrev, cb) {
var self = this;
self.txDb.createFromBlock(block, function(err, insertedTxs, updateAddrs) {
if (err) return cb(err);
var newPrevExisted, orphanizeFrom;
self.bDb.add(block, function(err){
if (err) return cb(err);
self._handleBroadcast(block, insertedTxs, updateAddrs);
return cb();
});
async.series([
function(c) {
self.bDb.has(newPrev, function(err, ret) {
newPrevExisted = ret;
return c();
});
},
function(c) {
if (newPrevExisted) return c();
console.log('[BlockDb.js.133] case 3) not implemented yet in reorg'); //TODO
process.exit(1);
},
function(c) {
self.bDb.isMain(newPrev, function(err,val) {
if (!val) return c();
// case 1
orphanizeFrom = oldNext;
return c(err);
});
},
function(c) {
if (orphanizeFrom) return c();
self.setBranchConnectedBackwards(newPrev, function(err, yHash) {
if (err) return c(err);
self.bDb.getNext(yHash, function(err, yHashNext) {
orphanizeFrom = yHashNext;
return c(err);
});
});
},
function(c) {
if (!orphanizeFrom) return c();
self.setBranchOrphan(orphanizeFrom, function(err) {
return c(err);
});
},
],
function(err) {
return cb(err);
});
};
Sync.prototype.setBranchOrphan = function(fromHash, cb) {
var self = this,
hashInterator = fromHash;
async.whilst(
function() { return hashInterator; },
function(c) {
self.setBlockMain(hashInterator, false, function(err) {
if (err) return cb(err);
self.bDb.getNext(hashInterator, function (err, val) {
hashInterator = val;
return c(err);
});
});
}, cb);
};
Sync.prototype.setBlockMain = function(hash, isMain, cb) {
var self = this;
self.bDb.setMain(hash, isMain, function(err) {
if (err) return cb(err);
return self.txDb.handleBlockChange(hash, isMain, cb);
});
};
Sync.prototype.checkOrphan = function(fromBlock, toBlock, c) {
var self = this;
Sync.prototype.setBranchConnectedBackwards = function(fromHash, cb) {
var self = this,
hashInterator = fromHash,
isMain;
var hash = fromBlock;
var co = 0;
var limit = 10;
var cont = 1;
async.whilst(
function () {
if (++co > limit) {
console.log('[Sync.js.109] WARN: Reach reog depth limit'); //TODO
}
return cont && hash && hash !== toBlock && co < limit;
},
function (w_c) {
//check with RPC if the block is mainchain
self.bDb.fromHashWithInfo(hash, function (err, info) {
if (!info) {
console.log('[Sync.js.107:hash:ORPHAN]',hash); //TODO
self.txDb.setOrphan(hash, function(err) {
if (err) return w_c(err);
self.bDb.setOrphan(hash, function(err, prevHash){
hash = prevHash;
return w_c(err);
});
async.doWhilst(
function(c) {
self.setConnected(hashInterator, function (err) {
if (err) return c(err);
self.bDb.getPrev(hashInterator, function (err, val) {
if (err) return c(err);
hashInterator = val;
self.bDb.isMain(hashInterator, function (err, val) {
isMain = val;
return c();
});
}
else {
console.log('[Sync.js.107:hash:NOT ORPHAN]',hash); //TODO
cont = 0;
return w_c();
}
});
});
},
function (err) {
return c(err);
}
);
function() { return hashInterator; }, cb);
};
Sync.prototype._handleBroadcast = function(hash, inserted_txs, updated_addrs) {
Sync.prototype._handleBroadcast = function(hash, updatedTxs, updatedAddrs) {
var self = this;
if (hash && self.opts.broadcast_blocks) {
sockets.broadcast_block({hash: hash});
}
if (inserted_txs && self.opts.broadcast_txs) {
inserted_txs.forEach(function(tx) {
if (updatedTxs && self.opts.broadcast_txs) {
updatedTxs.forEach(function(tx) {
sockets.broadcast_tx(tx);
});
}
if (updated_addrs && self.opts.broadcast_addresses) {
updated_addrs.forEach(function(addr, txs){
if (updatedAddrs && self.opts.broadcast_addresses) {
updatedAddrs.forEach(function(addr, txs){
txs.forEach(function(addr, t){
sockets.broadcast_address_tx(addr, {'txid': t});
@ -120,15 +241,13 @@ console.log('[Sync.js.109] WARN: Reach reog depth limit'); //TODO
}
};
Sync.prototype.storeTxs = function(txs, cb) {
var self = this;
self.txDb.createFromArray(txs, null, function(err, inserted_txs, updated_addrs) {
self.txDb.createFromArray(txs, null, function(err, updatedTxs, updatedAddrs) {
if (err) return cb(err);
self._handleBroadcast(null, inserted_txs, updated_addrs);
self._handleBroadcast(null, updatedTxs, updatedAddrs);
return cb(err);
});
};

View File

@ -5,15 +5,18 @@ require('classtool');
function spec(b) {
// blockHash -> txid mapping (to reorgs)
var ROOT = 'tx-b-'; //tx-b-<txid>-<block> => 1/0 (connected or not)
// blockHash -> txid mapping
var IN_BLK_PREFIX = 'tx-b-'; //tx-b-<txid>-<block> => 1/0 (connected or not)
// Only for orphan blocks
// var FROM_BLK_PREFIX = 'tx-'; //tx-<block>-<txid> => 1/0 (connected or not)
// to show tx outs
var OUTS_ROOT = 'txouts-'; //txouts-<txid>-<n> => [addr, btc_sat]
var OUTS_PREFIX = 'txouts-'; //txouts-<txid>-<n> => [addr, btc_sat]
// to sum up addr balance
var ADDR_ROOT = 'txouts-addr-'; //txouts-addr-<addr>-<ts>-<txid>-<n> => + btc_sat
var SPEND_ROOT = 'txouts-spend-';//txouts-spend-<txid(out)>-<n(out)> => [txid(in),n(in),ts]
var ADDR_PREFIX = 'txouts-addr-'; //txouts-addr-<addr>-<ts>-<txid>-<n> => + btc_sat
var SPEND_PREFIX = 'txouts-spend-';//txouts-spend-<txid(out)>-<n(out)> => [txid(in),n(in),ts]
// TODO: use bitcore networks module
var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b';
@ -51,7 +54,7 @@ function spec(b) {
// TransactionDb.prototype.fromTxIdOne = function(txid, cb) { TODO
TransactionDb.prototype.has = function(txid, cb) {
var k = OUTS_ROOT + txid;
var k = OUTS_PREFIX + txid;
db.get(k, function (err,val) {
var ret;
@ -69,7 +72,7 @@ function spec(b) {
TransactionDb.prototype.fromTxId = function(txid, cb) {
var k = OUTS_ROOT + txid;
var k = OUTS_PREFIX + txid;
var ret=[];
// outs.
@ -87,7 +90,7 @@ function spec(b) {
return cb(err);
})
.on('end', function () {
var k = SPEND_ROOT + txid;
var k = SPEND_PREFIX + txid;
var l = ret.length;
db.createReadStream({start: k, end: k + '~'})
.on('data', function (data) {
@ -174,7 +177,7 @@ function spec(b) {
TransactionDb.prototype.fromTxIdN = function(txid, n, cb) {
var k = OUTS_ROOT + txid + '-' + n;
var k = OUTS_PREFIX + txid + '-' + n;
db.get(k, function (err,val) {
if (err && err.notFound) {
@ -206,7 +209,7 @@ function spec(b) {
TransactionDb.prototype.fromAddr = function(addr, cb) {
var self = this;
var k = ADDR_ROOT + addr;
var k = ADDR_PREFIX + addr;
var ret=[];
db.createReadStream({start: k, end: k + '~'})
@ -226,7 +229,7 @@ function spec(b) {
.on('end', function () {
async.each(ret, function(o, e_c) {
var k = SPEND_ROOT + o.txid + '-' + o.index;
var k = SPEND_PREFIX + o.txid + '-' + o.index;
db.get(k, function(err, val) {
if (err && err.notFound) err=null;
if (err || !val) return e_c(err);
@ -238,7 +241,7 @@ function spec(b) {
return e_c();
});
},
function(err) {
function() {
async.each(ret, function(o, e_c){
self.fillConfirmations(o,e_c);
},function(err) {
@ -255,16 +258,16 @@ function spec(b) {
async.series([
function(c) {
db.createReadStream({
start: OUTS_ROOT + txid,
end: OUTS_ROOT + txid + '~',
start: OUTS_PREFIX + txid,
end: OUTS_PREFIX + txid + '~',
}).pipe(
db.createWriteStream({type:'del'})
).on('close', c);
},
function(c) {
db.createReadStream({
start: SPEND_ROOT + txid,
end: SPEND_ROOT + txid + '~'
start: SPEND_PREFIX + txid,
end: SPEND_PREFIX + txid + '~'
})
.pipe(
db.createWriteStream({type:'del'})
@ -315,7 +318,8 @@ function spec(b) {
};
TransactionDb.prototype.add = function(tx, cb) {
TransactionDb.prototype.add = function(tx, blockhash, cb) {
var self = this;
var addrs = [];
var is_new = true;
@ -331,7 +335,7 @@ function spec(b) {
async.forEachLimit(tx.vin, CONCURRENCY,
function(i, next_out) {
db.batch()
.put( SPEND_ROOT + i.txid + '-' + i.vout ,
.put( SPEND_PREFIX + i.txid + '-' + i.vout ,
tx.txid + ':' + i.n + ':' + ts)
.write(next_out);
},
@ -355,15 +359,15 @@ function spec(b) {
! o.scriptPubKey.addresses[1] // TODO : not supported
){
// This is only to broadcast (WIP)
// if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
// addrs.push(o.scriptPubKey.addresses[0]);
// }
if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
addrs.push(o.scriptPubKey.addresses[0]);
}
var addr = o.scriptPubKey.addresses[0];
var sat = Math.round(o.value * util.COIN);
db.batch()
.put( OUTS_ROOT + tx.txid + '-' + o.n, addr + ':' + sat)
.put( ADDR_ROOT + addr + '-' + ts + '-' + tx.txid +
.put( OUTS_PREFIX + tx.txid + '-' + o.n, addr + ':' + sat)
.put( ADDR_PREFIX + addr + '-' + ts + '-' + tx.txid +
'-' + o.n, sat)
.write(next_out);
@ -385,33 +389,32 @@ function spec(b) {
}
return p_c();
});
}], function(err) {
},
function (p_c) {
if (!blockhash) return p_c();
return self.setConfirmation(tx.txid,blockhash, true, p_c);
},
], function(err) {
return cb(err, addrs, is_new);
});
};
TransactionDb.prototype.deleteConfirmation = function(txId, blockHash, c) {
TransactionDb.prototype.setConfirmation = function(txId, blockHash, confirmed, c) {
if (!blockHash) return c();
db.put(ROOT + txId + '-' + blockHash, 0, function(err) {
return c(err);
});
confirmed = confirmed ? 1 : 0;
db.batch()
.put(IN_BLK_PREFIX + txId + '-' + blockHash, confirmed)
.write(c);
};
TransactionDb.prototype.addConfirmation = function(txId, blockHash, c) {
if (!blockHash) return c();
db.put(ROOT + txId + '-' + blockHash, 1, function(err) {
return c(err);
});
};
// This slowdown addr balance calculation 100%
// This slowdown addr balance calculation by 100%
TransactionDb.prototype.isConfirmed = function(txId, c) {
var k = ROOT + txId;
var k = IN_BLK_PREFIX + txId;
var ret = false;
db.createReadStream({start: k, end: k + '~'})
@ -422,11 +425,34 @@ function spec(b) {
return c(err);
})
.on('end', function (err) {
return c(err,ret);
});
};
TransactionDb.prototype.handleBlockChange = function(hash, isMain, cb) {
var k = IN_BLK_PREFIX;
var toChange = [];
console.log('Searching Txs from block:' + hash);
// This is slow, but prevent us to create a new block->tx index.
db.createReadStream({start: k, end: k + '~'})
.on('data', function (data) {
if (data.key.indexOf(hash)>=0)
toChange.push({
type: 'put',
key: data.key,
value: isMain?1:0,
});
})
.on('error', function (err) {
return cb(err);
})
.on('end', function (err) {
if (err) return cb(err);
console.log('\t%s %d Txs', isMain?'Confirming':'Invalidating',toChange.length);
db.batch(toChange, cb);
});
};
// txs can be a [hashes] or [txObjects]
TransactionDb.prototype.createFromArray = function(txs, blockHash, next) {
@ -435,7 +461,6 @@ function spec(b) {
if (!txs) return next();
// TODO
var insertedTxs = [];
var updatedAddrs = {};
async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) {
@ -448,31 +473,17 @@ function spec(b) {
TransactionRpc.getRpcInfo(t, function(err, inInfo) {
if (!inInfo) return each_cb(err);
self.add(inInfo, function(err) {
if (err) return each_cb(err);
// This could mean that the TX was mined since we first received.
if (blockHash && inInfo.blockhash !== blockHash)
console.log('WARN in tx %s: different blockHashses: %s vs %s',
t, blockHash, inInfo.blockhash);
insertedTxs.push(t);
return self.addConfirmation(t,inInfo.blockhash, each_cb);
});
return self.add(inInfo, blockHash,each_cb);
});
}
else {
self.add(t, function(err) {
if (err || !blockHash) return each_cb(err);
return self.addConfirmation(t,blockHash, each_cb);
});
return self.add(t, blockHash, each_cb);
}
},
function(err) {
return next(err, insertedTxs, updatedAddrs);
});
};
return next(err, updatedAddrs);
});
};
TransactionDb.prototype.createFromBlock = function(b, next) {

View File

@ -43,9 +43,9 @@
},
{
"addr": "mzW2hdZN2um7WBvTDerdahKqRgj3md9C29",
"txApperances": 6033,
"balance": 1049.69744101,
"totalReceived": 1049.69744101,
"txApperances": 6046,
"balance": 1149.19744101,
"totalReceived": 1149.19744101,
"totalSent": 0
},
{

View File

@ -7,86 +7,135 @@ process.env.NODE_ENV = process.env.NODE_ENV || 'development';
var
assert = require('assert'),
async = require('async'),
Sync = require('../../lib/Sync').class();
var b = [
'00000000c4cbd75af741f3a2b2ff72d9ed4d83a048462c1efe331be31ccf006b', //B#16
'00000000fe198cce4c8abf9dca0fee1182cb130df966cc428ad2a230df8da743',
'000000008d55c3e978639f70af1d2bf1fe6f09cb3143e104405a599215c89a48',
'000000009b3bca4909f38313f2746120129cce4a699a1f552390955da470c5a9',
'00000000ede57f31cc598dc241d129ccb4d8168ef112afbdc870dc60a85f5dd3', //B#20
];
var fix = function(s,cb) {
async.each([1,2,3,4], function(i,c) {
s.bDb.setPrev(b[i],b[i-1], function() {
return c();
});
}, cb);
};
var test = function(s,cb) {
async.each([2,3,4], function(i,c) {
s.bDb.getPrev(b[i], function(err, p) {
assert.equal(p,0);
return c();
});
}, function() {
s.bDb.getPrev(b[1], function(err, p) {
assert.equal(p,b[0]);
return cb();
});
});
};
var testNo = function(s,cb) {
async.each([2,3,4], function(i,c) {
s.bDb.getPrev(b[i], function(err, p) {
assert.equal(p,b[i-1]);
return c();
});
}, function() {
s.bDb.getPrev(b[1], function(err, p) {
assert.equal(p,b[0]);
return cb();
});
});
};
HistoricSync = require('../../lib/HistoricSync').class();
var s;
var b = [
'00000000c4cbd75af741f3a2b2ff72d9ed4d83a048462c1efe331be31ccf006b', //0 B#16
'00000000fe198cce4c8abf9dca0fee1182cb130df966cc428ad2a230df8da743', //1
'000000008d55c3e978639f70af1d2bf1fe6f09cb3143e104405a599215c89a48', //2
'000000009b3bca4909f38313f2746120129cce4a699a1f552390955da470c5a9', //3
'00000000ede57f31cc598dc241d129ccb4d8168ef112afbdc870dc60a85f5dd3', //4 B#20
];
var t = [
'd08582d3711f75d085c618874fb0d049ae09d5ec95ec6f5abd289f4b54712c54', // TX from B#16
'1729001087e0cebea8d14de1653d5cf59628d9746bc1ae65f776f1cbaff7ebad',
'cf53d7ccd83a099acfbc319ee10c1e3b10e3d42ba675b569fdd6b69cb8d2db4e',
'cf53d7ccd83a099acfbc319ee10c1e3b10e3d42ba675b569fdd6b69cb8d2db4e',
'd45f9da73619799e9d7bd03cc290e70875ea4cbad56b8bffa15135fbbb3df9ea', //4 Tx from B20
];
var test = function(cb) {
async.each([2,3,4], function(i,c) {
s.sync.bDb.getPrev(b[i], function(err, p) {
assert.equal(p,b[i-1]);
return c();
});
}, function() {
async.each([0,1,2,3,4], function(i,c) {
s.sync.bDb.has(b[i], function(err, p) {
assert(p);
return c();
});
}, function() {
async.each([0,1,2,3], function(i,c) {
s.sync.bDb.getNext(b[i], function(err, p) {
assert.equal(p,b[i+1]);
return c();
});
}, cb);
});
});
};
describe('Sync checkOrphan', function(){
before(function(done) {
s = new Sync();
fix(s,done);
});
after(function(done) {
fix(s,function() {
s.close(done);
s = new HistoricSync();
s.init({}, function(err) {
if (err) return done(err);
s.sync.destroy(done);
});
});
it('checkOrphan', function(done) {
this.timeout(100000);
s.bDb.has(b[0], function(err, has) {
assert(has);
s.bDb.has(b[1], function(err, has) {
assert(has);
s.checkOrphan(b[4],b[1], function() {
testNo(s,done);
});
it('simple RPC forward syncing', function(done) {
s.getPrevNextBlock(s.genesis,b[4], {
next: true,
}, function(err) {
if (err) return done(err);
test(done);
});
});
});
it('reorg, case 1', function(done) {
var case1 = {
hash: '0000000000000000000000000000000000000000000000000000000000000001',
tx: [ '1000000000000000000000000000000000000000000000000000000000000000' ],
time: 1296690099,
previousblockhash: b[2],
};
async.series([
function (c) {
s.sync.txDb.isConfirmed(t[0], function(err,is) {
assert(!err);
assert(is);
return c();
});
},
function (c) {
s.sync.txDb.isConfirmed(t[4], function(err,is) {
assert(!err);
assert(is);
return c();
});
},
function (c) {
s.sync.storeTipBlock(case1, function(err) {
assert(!err, 'shouldnt return error' + err);
return c();
});
},
function (c) {
s.sync.bDb.isMain(b[2], function(err,is) {
assert(!err);
assert(is);
return c();
});
},
function (c) {
s.sync.bDb.isMain(b[3], function(err,is) {
assert(!err);
assert(!is, b[3] + 'should not be on main chain');
return c();
});
},
function (c) {
s.sync.bDb.isMain(b[4], function(err,is) {
assert(!err);
assert(!is);
return c();
});
},
function (c) {
s.sync.txDb.isConfirmed(t[0], function(err,is) {
assert(!err);
assert(is);
return c();
});
},
function (c) {
s.sync.txDb.isConfirmed(t[4], function(err,is) {
assert(!err);
assert(!is);
return c();
});
},
], done );
});
});

View File

@ -18,6 +18,7 @@ program
.option('-R --reverse', 'Sync backwards', 0)
.option('-U --uptoexisting', 'Sync only until an existing block is found', 0)
.option('-F --fromfiles', 'Sync using bitcoind .dat block files (faster)', 0)
.option('-v --verbose', 'Verbose 0/1', 0)
.parse(process.argv);
var historicSync = new HistoricSync();