sync working embedded in http server

This commit is contained in:
Matias Alejo Garcia 2014-02-08 20:09:54 -03:00
parent 97e1903dd1
commit b5ad5643b1
8 changed files with 95 additions and 59 deletions

View File

@ -40,6 +40,20 @@ var walk = function(path) {
walk(models_path);
/**
* p2pSync process
*/
if (!config.disableP2pSync) {
var ps = new PeerSync();
ps.init({
broadcast_txs: true,
broadcast_address_tx: true,
broadcast_blocks: true,
}, function() {
ps.run();
});
}
/**
* historic_sync process
*/
@ -60,25 +74,16 @@ if (!config.disableHistoricSync) {
historicSync.smartImport({}, function(err){
var txt = 'ended.';
if (err) txt = 'ABORTED with error: ' + err.message;
else
ps.allowReorgs = true;
console.log('[historic_sync] ' + txt, historicSync.info());
});
}
});
}
/**
* p2pSync process
*/
if (!config.disableP2pSync) {
var ps = new PeerSync();
ps.init({
broadcast_txs: true,
broadcast_address_tx: true,
broadcast_blocks: true,
}, function() {
ps.run();
});
}
//express settings
require('./config/express')(expressApp, historicSync);

View File

@ -161,8 +161,7 @@ function spec() {
blockInfo = ret.result;
// this is to match block retreived from file
if (blockInfo.hash === self.genesis)
blockInfo.previousblockhash =
'0000000000000000000000000000000000000000000000000000000000000000';
blockInfo.previousblockhash = self.network.genesisBlock.prev_hash.toString('hex');
}
else {
blockInfo = null;
@ -175,7 +174,8 @@ function spec() {
function(c) {
if (existed) return c();
self.sync.storeTipBlock(blockInfo, function(err) {
// When storing files from RPC recusively, reorgs are disabled
self.sync.storeTipBlock(blockInfo, false, function(err) {
return c(err);
});
}], function(err) {

View File

@ -23,6 +23,7 @@ function spec() {
this.verbose = opts.verbose;
this.peerdb = undefined;
this.sync = new Sync();
this.allowReorgs = false;
this.PeerManager = require('bitcore/PeerManager').createClass({
network: network
@ -60,38 +61,34 @@ function spec() {
if (this.verbose) {
console.log('[p2p_sync] Handle tx: ' + tx.hash);
}
this.sync.storeTxs([tx.hash], function(err) {
if (err) {
console.log('[PeerSync.js.71:err:]',err); //TODO
console.log('[p2p_sync] Error in handle TX: ' + JSON.stringify(err));
}
});
};
PeerSync.prototype.handle_block = function(info) {
var self = this;
var block = info.message.block;
var blockHash = coinUtil.formatHashFull(block.calcHash());
if (this.verbose) {
console.log('[p2p_sync] Handle block: ' + blockHash);
}
console.log('[p2p_sync] Handle block: %s (allowReorgs: %s)', blockHash, self.allowReorgs);
var tx_hashes = block.txs.map(function(tx) {
return coinUtil.formatHashFull(tx.hash);
});
// Reorgs enabled.
this.sync.storeTipBlock({
'hash': blockHash,
'tx': tx_hashes,
'previousblockhash': coinUtil.formatHashFull(block.prev_hash),
},
function(err) {
}, self.allowReorgs, function(err) {
if (err) {
console.log('[p2p_sync] Error in handle Block: ' + err);
}
// Check for reorgs...
// The previous last block hash
// if different => call
});
};
@ -112,8 +109,6 @@ function spec() {
});
this.peerman.on('connection', function(conn) {
console.log('[PeerSync.js.113]'); //TODO
conn.on('inv', self.handle_inv.bind(self));
conn.on('block', self.handle_block.bind(self));
conn.on('tx', self.handle_tx.bind(self));

View File

@ -66,11 +66,16 @@ function spec() {
*
*/
Sync.prototype.storeTipBlock = function(b, cb) {
Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
if (typeof allowReorgs === 'function') {
cb = allowReorgs;
allowReorgs = true;
}
if (!b) return cb();
var self = this;
var oldTip, oldNext, needReorg = true;
var oldTip, oldNext, needReorg = false;
var newPrev = b.previousblockhash;
var updatedTxs, updatedAddrs;
@ -82,6 +87,8 @@ function spec() {
});
},
function(c) {
if (!allowReorgs) return c();
self.bDb.has(newPrev, function(err, val) {
if (!val && newPrev.match(/^0+$/)) return c();
return c(err ||
@ -89,18 +96,17 @@ function spec() {
});
},
function(c) {
self.txDb.createFromBlock(b, function(err, txs, addrs) {
updatedTxs = txs;
self.txDb.createFromBlock(b, function(err, addrs) {
updatedTxs = b.tx;
updatedAddrs = addrs;
return c(err);
});
},
function(c) {
if (!allowReorgs) return c();
self.bDb.getTip(function(err, val) {
oldTip = val;
if (typeof oldTip === 'undefined' || newPrev === oldTip) {
needReorg = false;
}
if (oldTip && newPrev !== oldTip) needReorg = true;
return c();
});
},
@ -262,10 +268,10 @@ function spec() {
Sync.prototype.storeTxs = function(txs, cb) {
var self = this;
self.txDb.createFromArray(txs, null, function(err, updatedTxs, updatedAddrs) {
self.txDb.createFromArray(txs, null, function(err, updatedAddrs) {
if (err) return cb(err);
self._handleBroadcast(null, updatedTxs, updatedAddrs);
self._handleBroadcast(null, txs, updatedAddrs);
return cb(err);
});
};

View File

@ -183,7 +183,7 @@ function spec(b) {
if (err && err.notFound) {
err = null;
}
var a = val.split(':');
var a = val?val.split(':'):[null,null];
return cb(err, a[0], parseInt(a[1]));
});
};
@ -460,8 +460,7 @@ function spec(b) {
if (!txs) return next();
// TODO
var updatedAddrs = {};
var updatedAddrs = []; // TODO
async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) {
if (typeof t === 'string') {
@ -473,7 +472,7 @@ function spec(b) {
TransactionRpc.getRpcInfo(t, function(err, inInfo) {
if (!inInfo) return each_cb(err);
return self.add(inInfo, blockHash,each_cb);
return self.add(inInfo, blockHash, each_cb);
});
}
else {

View File

@ -496,6 +496,53 @@ describe('Sync Reorgs', function(){
], done );
});
var p2p = {
hash: '0000000000000000000000000000000000000000000000000000000000000006',
tx: ['f6c2901f39fd07f2f2e503183d76f73ecc1aee9ac9216fde58e867bc29ce674e'],
time: 1296690099,
previousblockhash: '111',
};
it('p2p, no reorg allowed', function(done) {
async.series([
function (c) {
s.sync.storeTipBlock(p2p, false, function(err) {
assert(!err, 'shouldnt return error' + err);
return c();
});
},
function (c) {
s.sync.bDb.has(p2p.hash, function(err,is) {
assert(!err);
assert(is);
return c();
});
},
function (c) {
s.sync.txDb.isConfirmed(p2p.tx[0], function(err,is) {
assert(!err);
assert(is);
return c();
});
},
function (c) {
s.sync.bDb.getNext(p2p.hash, function(err,v) {
assert(!err);
assert.equal(v,p2p.nextblockhash);
return c();
});
},
function (c) {
s.sync.bDb.getNext(p2p.previousblockhash, function(err,v) {
assert(!err);
assert.equal(v,p2p.hash);
return c();
});
},
], done );
});
});

View File

@ -8,7 +8,7 @@ var TESTING_BLOCK = '000000000185678d3d7ecc9962c96418174431f93fe20bf216d55652724
var
assert = require('assert'),
config = require('../../config/config'),
// config = require('../../config/config'),
BlockDb = require('../../lib/BlockDb').class();
var bDb;
@ -36,22 +36,5 @@ describe('BlockDb fromHashWithInfo', function(){
done();
});
});
it('setOrphan', function(done) {
var b16 = '00000000c4cbd75af741f3a2b2ff72d9ed4d83a048462c1efe331be31ccf006b';
var b17 = '00000000fe198cce4c8abf9dca0fee1182cb130df966cc428ad2a230df8da743';
bDb.has(b17, function(err, has) {
assert(has);
bDb.setOrphan(b17, function(err, oldPrev) {
assert.equal(oldPrev, b16);
bDb.setPrev(b17, b16, function(err, oldPrev) {
bDb.getPrev(b17, function(err, p) {
assert.equal(p, b16);
done();
});
});
});
});
});
});

View File

@ -11,6 +11,7 @@ var program = require('commander');
program
.version(PROGRAM_VERSION)
.option('-N --network [testnet]', 'Set bitcoin network [testnet]', 'testnet')
.option('-V --verbose', 'Verbose', 1)
.parse(process.argv);
var ps = new PeerSync();