files sync workgin

This commit is contained in:
Matias Alejo Garcia 2014-02-01 13:39:29 -03:00
parent 474b3daa53
commit 23960f0c37
3 changed files with 146 additions and 41 deletions

View File

@ -86,13 +86,15 @@ BlockSchema.statics.customCreate = function(block, cb) {
newBlock.time = block.time ? block.time : Math.round(new Date().getTime() / 1000); newBlock.time = block.time ? block.time : Math.round(new Date().getTime() / 1000);
newBlock.hashStr = block.hash; newBlock.hashStr = block.hash;
newBlock.isOrphan = block.isOrphan;
newBlock.nextBlockHashStr = block.nextBlockHash; newBlock.nextBlockHashStr = block.nextBlockHash;
var insertedTxs, updateAddrs; var insertedTxs, updateAddrs;
async.series([ async.series([
function(a_cb) { function(a_cb) {
TransactionOut.createFromTxs(block.tx, function(err, inInsertedTxs, inUpdateAddrs) { TransactionOut.createFromTxs(block.tx, block.isOrphan,
function(err, inInsertedTxs, inUpdateAddrs) {
insertedTxs = inInsertedTxs; insertedTxs = inInsertedTxs;
updateAddrs = inUpdateAddrs; updateAddrs = inUpdateAddrs;
return a_cb(err); return a_cb(err);

View File

@ -24,13 +24,16 @@ var TransactionOutSchema = new Schema({
index: true, index: true,
}, },
value_sat: Number, value_sat: Number,
fromOrphan: Boolean,
spendTxIdBuf: Buffer, spendTxIdBuf: Buffer,
spendIndex: Number, spendIndex: Number,
spendFromOrphan: Boolean,
}); });
// Compound index // Compound index
TransactionOutSchema.index({txidBuf: 1, index: 1}, {unique: true, sparse: true}); TransactionOutSchema.index({txidBuf: 1, index: 1}, {unique: true, sparse: true});
TransactionOutSchema.index({spendTxIdBuf: 1, spendIndex: 1}, {unique: true, sparse: true}); TransactionOutSchema.index({spendTxIdBuf: 1, spendIndex: 1}, {unique: true, sparse: true});
@ -91,15 +94,52 @@ TransactionOutSchema.statics.removeFromTxId = function(txid, cb) {
TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) { TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, fromOrphan, cb) {
var Self = this; var Self = this;
var addrs = []; var addrs = [];
var is_new = true; var is_new = true;
if (txInfo.hash) {
// adapt bitcore TX object to bitcoind JSON response
txInfo.txid = txInfo.hash;
var count = 0;
txInfo.vin = txInfo.in.map(function (txin) {
var i = {};
if (txin.coinbase) {
txInfo.isCoinBase = true;
}
else {
i.txid= txin.prev_out.hash;
i.vout= txin.prev_out.n;
};
i.n = count++;
return i;
});
count = 0;
txInfo.vout = txInfo.out.map(function (txout) {
var o = {};
o.value = txout.value;
o.n = count++;
if (txout.addrStr){
o.scriptPubKey = {};
o.scriptPubKey.addresses = [txout.addrStr];
}
return o;
});
}
var bTxId = new Buffer(txInfo.txid,'hex'); var bTxId = new Buffer(txInfo.txid,'hex');
async.series([ async.series([
// Input Outpoints (mark them as spended) // Input Outpoints (mark them as spended)
function(p_c) { function(p_c) {
@ -114,6 +154,7 @@ TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) {
spendTxIdBuf: bTxId, spendTxIdBuf: bTxId,
spendIndex: i.n, spendIndex: i.n,
}; };
if (fromOrphan) data.spendFromOrphan = true;
Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out); Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out);
}, },
function (err) { function (err) {
@ -136,10 +177,10 @@ TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) {
! o.scriptPubKey.addresses[1] // TODO : not supported ! o.scriptPubKey.addresses[1] // TODO : not supported
){ ){
// This is only to broadcast // This is only to broadcast (WIP)
if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { // if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
addrs.push(o.scriptPubKey.addresses[0]); // addrs.push(o.scriptPubKey.addresses[0]);
} // }
var data = { var data = {
txidBuf: bTxId, txidBuf: bTxId,
@ -148,6 +189,7 @@ TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) {
value_sat : o.value * util.COIN, value_sat : o.value * util.COIN,
addr : o.scriptPubKey.addresses[0], addr : o.scriptPubKey.addresses[0],
}; };
if (fromOrphan) data.fromOrphan = true;
Self.update({txidBuf: bTxId, index: o.n}, data, {upsert: true}, next_out); Self.update({txidBuf: bTxId, index: o.n}, data, {upsert: true}, next_out);
} }
else { else {
@ -174,45 +216,49 @@ TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, cb) {
// txs can be a [hashes] or [txObjects] // txs can be a [hashes] or [txObjects]
TransactionOutSchema.statics.createFromTxs = function(txs, next) { TransactionOutSchema.statics.createFromTxs = function(txs, fromOrphan, next) {
var Self = this; var Self = this;
if (typeof fromOrphan === 'function') {
next = fromOrphan;
fromOrphan = false;
}
if (!txs) return next(); if (!txs) return next();
var inserted_txs = []; var inserted_txs = [];
var updated_addrs = {}; var updated_addrs = {};
async.forEachLimit(txs, CONCURRENCY, function(txid, cb, was_new) { async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) {
var txInfo; var txInfo;
async.series([ async.series([
function(a_cb) { function(a_cb) {
if (typeof t !== 'string') {
txInfo = t;
return a_cb();
}
// Is it from genesis block? (testnet==livenet) // Is it from genesis block? (testnet==livenet)
// TODO: parse it from networks.genesisTX? // TODO: parse it from networks.genesisTX?
if (txid === genesisTXID) return a_cb(); if (t === genesisTXID) return a_cb();
TransactionRpc.getRpcInfo(txid, function(err, inInfo) {
TransactionRpc.getRpcInfo(t, function(err, inInfo) {
txInfo =inInfo; txInfo =inInfo;
return a_cb(err); return a_cb(err);
}); });
}, },
function(a_cb) { function(a_cb) {
if (txid === genesisTXID) return a_cb(); if (!txInfo) return a_cb();
Self.storeTransactionOuts(txInfo, function(err, addrs) { Self.storeTransactionOuts(txInfo, fromOrphan, function(err, addrs) {
if (err) return a_cb(err); if (err) return a_cb(err);
if (was_new) {
inserted_txs.push(txid);
addrs.each(function(a) {
if ( !updated_addrs[a]) updated_addrs[a] = [];
updated_addrs[a].push(txid);
});
}
return a_cb(); return a_cb();
}); });
}], }],
function(err) { function(err) {
return cb(err); return each_cb(err);
}); });
}, },
function(err) { function(err) {

View File

@ -7,6 +7,9 @@ require('classtool');
function spec() { function spec() {
var util = require('util'); var util = require('util');
var RpcClient = require('bitcore/RpcClient').class(); var RpcClient = require('bitcore/RpcClient').class();
var bitutil = require('bitcore/util/util');
var Address = require('bitcore/Address').class();
var Script = require('bitcore/Script').class();
var networks = require('bitcore/networks'); var networks = require('bitcore/networks');
var async = require('async'); var async = require('async');
var config = require('../config/config'); var config = require('../config/config');
@ -96,7 +99,7 @@ function spec() {
}; };
}; };
HistoricSync.prototype.showProgress = function() { HistoricSync.prototype.showProgress = function(height) {
var self = this; var self = this;
if (self.error) { if (self.error) {
@ -106,12 +109,7 @@ function spec() {
self.syncPercentage = parseFloat(100 * (self.syncedBlocks + self.skippedBlocks) / self.blockChainHeight).toFixed(3); self.syncPercentage = parseFloat(100 * (self.syncedBlocks + self.skippedBlocks) / self.blockChainHeight).toFixed(3);
if (self.syncPercentage > 100) self.syncPercentage = 100; if (self.syncPercentage > 100) self.syncPercentage = 100;
p(util.format('status: [%d%%] skipped: %d', self.syncPercentage, self.skippedBlocks)); p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks, height));
//TODO
if (self.syncPercentage>5) {
process.exit(1);
}
//
} }
if (self.opts.shouldBroadcast) { if (self.opts.shouldBroadcast) {
sockets.broadcastSyncInfo(self.info()); sockets.broadcastSyncInfo(self.info());
@ -222,6 +220,46 @@ if (self.syncPercentage>5) {
}; };
// TODO. replace with
// Script.prototype.getAddrStrs if that one get merged in bitcore
HistoricSync.prototype.getAddrStr = function(s) {
var self = this;
var addrStrs = [];
var type = s.classify();
var addr;
switch(type) {
case Script.TX_PUBKEY:
var chunk = s.captureOne();
addr = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk));
addrStrs = [ addr.toString() ];
break;
case Script.TX_PUBKEYHASH:
addr = new Address(self.network.addressPubkey, s.captureOne());
addrStrs = [ addr.toString() ];
break;
case Script.TX_SCRIPTHASH:
addr = new Address(self.network.addressScript, s.captureOne());
addrStrs = [ addr.toString() ];
break;
case Script.TX_MULTISIG:
var addrs = [];
var chunks = s.capture();
chunks.forEach(function(chunk) {
var a = new Address(self.network.addressPubkey, bitutil.sha256ripe160(chunk));
addrStrs.push(a.toString());
});
break;
case Script.TX_UNKNOWN:
break;
}
return addrStrs;
};
HistoricSync.prototype.getBlockFromFile = function(height, scanOpts, cb) { HistoricSync.prototype.getBlockFromFile = function(height, scanOpts, cb) {
var self = this; var self = this;
@ -237,14 +275,13 @@ if (self.syncPercentage>5) {
if (err) return cb(err); if (err) return cb(err);
nextHash = res.result; nextHash = res.result;
//console.log('[HistoricSync.js.235:nextHash:]',nextHash); //TODO
return c(); return c();
}); });
}, },
//show some (inacurate) status //show some (inacurate) status
function(c) { function(c) {
if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) { if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) {
self.showProgress(); self.showProgress(height);
} }
return c(); return c();
@ -254,7 +291,27 @@ if (self.syncPercentage>5) {
self.blockExtractor.getNextBlock(function(err, b) { self.blockExtractor.getNextBlock(function(err, b) {
if (err || ! b) return c(err); if (err || ! b) return c(err);
blockInfo = b.getStandardizedObject(b.txs); blockInfo = b.getStandardizedObject(b.txs, self.network);
var ti=0;
// Get TX Address
b.txs.forEach(function(t) {
var objTx = blockInfo.tx[ti++];
var to=0;
t.outs.forEach( function(o) {
var s = new Script(o.s);
var addrs = self.getAddrStr(s);
// support only p2pubkey p2pubkeyhash and p2sh
if (addrs.length === 1) {
objTx.out[to].addrStr = addrs[0];
}
to++;
});
});
return c(); return c();
}); });
}, },