Handle forks with bitcoindSync

- added bindings to listen for tip updates from the daemon
- update the height during syncing for continuity
- call sync when there is a new tip
- added ability to rewind to a ancestor when there is a fork
This commit is contained in:
Braydon Fuller 2015-07-23 09:32:46 -04:00
parent 6dc3577e00
commit 9eda30ae2b
5 changed files with 253 additions and 269 deletions

View File

@ -41,38 +41,6 @@ function Daemon(options) {
this.network = Daemon.regtest;
}
// Add hardcoded peers
var data = fs.readFileSync(this.config, 'utf8');
if (this.network.peers.length) {
var peers = this.network.peers.reduce(function(out, peer) {
if (!~data.indexOf('addnode=' + peer)) {
return out + 'addnode=' + peer + '\n';
}
return out;
}, '\n');
fs.writeFileSync(data + peers);
}
if (this.network.name === 'testnet') {
if (!fs.existsSync(this.datadir + '/testnet3')) {
fs.mkdirSync(this.datadir + '/testnet3');
}
fs.writeFileSync(
this.datadir + '/testnet3/bitcoin.conf',
fs.readFileSync(this.config)
);
}
if (this.network.name === 'regtest') {
if (!fs.existsSync(this.datadir + '/regtest')) {
fs.mkdirSync(this.datadir + '/regtest');
}
fs.writeFileSync(
this.datadir + '/regtest/bitcoin.conf',
fs.readFileSync(this.config)
);
}
Object.keys(exports).forEach(function(key) {
self[key] = exports[key];
});
@ -237,6 +205,20 @@ Daemon.prototype.start = function(options, callback) {
self.emit('ready', result);
});
function onTipUpdateListener(err, result) {
if (err) {
self.emit('error', err);
} else {
// Emit and event that the tip was updated
self.emit('tip', result);
}
// Recursively wait until the next update
bitcoindjs.onTipUpdate(onTipUpdateListener);
}
bitcoindjs.onTipUpdate(onTipUpdateListener);
setTimeout(function callee() {
// Wait until wallet is loaded:
if (callback) {

View File

@ -5,7 +5,6 @@ var Chain = require('./chain');
var Block = require('./block');
var DB = require('./db');
var chainlib = require('chainlib');
var P2P = chainlib.P2P;
var fs = require('fs');
var BaseNode = chainlib.Node;
var util = require('util');
@ -32,27 +31,8 @@ Node.prototype._loadConfiguration = function(config) {
Node.super_.prototype._loadConfiguration.call(self, config);
};
Node.SYNC_STRATEGIES = {
P2P: 'p2p',
BITCOIND: 'bitcoind'
};
Node.DEFAULT_DAEMON_CONFIG = 'whitelist=127.0.0.1\n' + 'txindex=1\n';
Node.prototype.setSyncStrategy = function(strategy) {
this.syncStrategy = strategy;
if (this.syncStrategy === Node.SYNC_STRATEGIES.P2P) {
this.p2p.startSync();
} else if (this.syncStrategy === Node.SYNC_STRATEGIES.BITCOIND) {
this.p2p.disableSync = true;
this._syncBitcoind();
} else {
throw new Error('Strategy "' + strategy + '" is unknown.');
}
};
Node.prototype._loadBitcoinConf = function(config) {
$.checkArgument(config.datadir, 'Please specify "datadir" in configuration options');
var datadir = config.datadir.replace(/^~/, process.env.HOME);
@ -82,6 +62,7 @@ Node.prototype._loadBitcoinConf = function(config) {
this.bitcoinConfiguration[option[0]] = value;
}
}
};
Node.prototype._loadBitcoind = function(config) {
@ -94,36 +75,155 @@ Node.prototype._loadBitcoind = function(config) {
};
/**
* This function will attempt to rewind the chain to the common ancestor
* between the current chain and a forked block.
* @param {Block} block - The new tip that forks the current chain.
* @param {Function} done - A callback function that is called when complete.
*/
Node.prototype._syncBitcoindRewind = function(block, done) {
var self = this;
self.chain.getHashes(self.chain.tip.hash, function(err, currentHashes) {
if (err) {
done(err);
}
self.chain.getHashes(block.hash, function(err, newHashes) {
if (err) {
done(err);
}
var ancestorHash = newHashes.pop();
// Create a hash map for faster lookups
var currentHashesMap = {};
var length = currentHashes.length;
for (var i = 0; i < length; i++) {
currentHashesMap[currentHashes[i]] = true;
}
// Step back the chain of hashes until we find the intersection point
while (ancestorHash && !currentHashesMap[ancestorHash]) {
ancestorHash = newHashes.pop();
}
// Hash map is no-longer needed
currentHashesMap = null;
if (!ancestorHash) {
return done(new Error('Can not rewind, unknown common ancestor.'));
}
// Rewind the chain to the common ancestor
async.doWhilst(
function(removeDone) {
var block = self.chain.tip;
self.getBlock(block.prevHash, function(err, previousTip) {
if (err) {
removeDone(err);
}
self.db._onChainRemoveBlock(block, function(err) {
if (err) {
return removeDone(err);
}
delete self.chain.tip.__transactions;
var previousHeight = self.chain.tip.__height - 1;
previousTip.__height = previousHeight;
self.chain.tip = previousTip;
self.chain.saveMetadata();
self.chain.emit('removeblock', block);
removeDone();
});
});
}, function() {
return self.chain.tip.hash !== ancestorHash;
}, done
);
});
});
};
/**
* This function will synchronize additional indexes for the chain based on
* the current active chain in the bitcoin daemon. In the event that there is
* a reorganization in the daemon, the chain will rewind to the last common
* ancestor and then resume syncing.
*/
Node.prototype._syncBitcoind = function() {
var self = this;
if (self.bitcoindSyncing) {
return;
}
if (!self.chain.tip) {
return;
}
self.bitcoindSyncing = true;
log.info('Starting Bitcoind Sync');
var info = self.bitcoind.getInfo();
var height;
async.whilst(function() {
if (self.syncStrategy !== Node.SYNC_STRATEGIES.BITCOIND) {
log.info('Stopping Bitcoind Sync');
return false;
}
height = self.chain.tip.__height;
return height < info.blocks;
}, function(next) {
return height < self.bitcoindHeight;
}, function(done) {
self.bitcoind.getBlock(height + 1, function(err, blockBuffer) {
if (err) {
return next(err);
return done(err);
}
var block = self.Block.fromBuffer(blockBuffer);
if (block.prevHash === self.chain.tip.hash) {
// This block appends to the current chain tip and we can
// immediately add it to the chain and create indexes.
// Populate height
block.__height = self.chain.tip.__height + 1;
// Create indexes
self.db._onChainAddBlock(block, function(err) {
if (err) {
return done(err);
}
delete self.chain.tip.__transactions;
self.chain.tip = block;
log.debug('Saving metadata');
self.chain.saveMetadata();
log.debug('Chain added block to main chain');
self.chain.emit('addblock', block);
done();
});
} else {
// This block doesn't progress the current tip, so we'll attempt
// to rewind the chain to the common ancestor of the block and
// then we can resume syncing.
self._syncBitcoindRewind(block, done);
}
self.chain.addBlock(self.Block.fromBuffer(blockBuffer), next);
});
}, function(err) {
log.info('Stopping Bitcoind Sync');
self.bitcoindSyncing = false;
if (err) {
Error.captureStackTrace(err);
return self.emit('error', err);
}
// we're done resume syncing via p2p to handle forks
self.p2p.synced = true;
self.setSyncStrategy(Node.SYNC_STRATEGIES.P2P);
self.emit('synced');
});
@ -180,38 +280,13 @@ Node.prototype._loadDB = function(config) {
}
config.db.network = this.network;
if (!fs.existsSync(config.db.path)) {
mkdirp.sync(config.db.path);
}
this.db = new DB(config.db);
};
Node.prototype._loadP2P = function(config) {
if (!config.p2p) {
config.p2p = {};
}
config.p2p.noListen = true;
config.p2p.network = this.network;
// We only want to directly connect via p2p to the trusted bitcoind daemon
var port = 8333;
if (this.bitcoinConfiguration && this.bitcoinConfiguration.port) {
port = this.bitcoinConfiguration.port;
} else if (this.network === Networks.testnet) {
port = 18333;
}
config.p2p.addrs = [
{
ip: {
v4: '127.0.0.1'
},
port: port
}
];
config.p2p.dnsSeed = false;
config.p2p.Transaction = this.db.Transaction;
config.p2p.Block = this.Block;
config.p2p.disableSync = true; // Disable p2p syncing and instead use bitcoind sync
this.p2p = new P2P(config.p2p);
};
Node.prototype._loadConsensus = function(config) {
if (!config.consensus) {
config.consensus = {};
@ -240,9 +315,11 @@ Node.prototype._loadConsensus = function(config) {
Node.prototype._initializeBitcoind = function() {
var self = this;
// Bitcoind
this.bitcoind.on('ready', function(status) {
log.info('Bitcoin Daemon Ready');
// Set the current chain height
var info = self.bitcoind.getInfo();
self.bitcoindHeight = info.blocks;
self.db.initialize();
});
@ -250,6 +327,13 @@ Node.prototype._initializeBitcoind = function() {
log.info('Bitcoin Core Daemon Status:', status);
});
// Notify that there is a new tip
this.bitcoind.on('tip', function(height) {
log.info('Bitcoin Core Daemon New Height:', height);
self.bitcoindHeight = height;
self._syncBitcoind();
});
this.bitcoind.on('error', function(err) {
Error.captureStackTrace(err);
self.emit('error', err);
@ -278,30 +362,10 @@ Node.prototype._initializeChain = function() {
// Chain
this.chain.on('ready', function() {
log.info('Bitcoin Chain Ready');
self.p2p.initialize();
});
this.chain.on('error', function(err) {
Error.captureStackTrace(err);
self.emit('error', err);
});
};
Node.prototype._initializeP2P = function() {
var self = this;
// Peer-to-Peer
this.p2p.on('ready', function() {
log.info('Bitcoin P2P Ready');
self.emit('ready');
});
this.p2p.on('synced', function() {
log.info('Bitcoin P2P Synced');
self.emit('synced');
});
this.p2p.on('error', function(err) {
this.chain.on('error', function(err) {
Error.captureStackTrace(err);
self.emit('error', err);
});
@ -318,21 +382,11 @@ Node.prototype._initialize = function() {
// Chain References
this.chain.db = this.db;
this.chain.p2p = this.p2p;
// P2P References
this.p2p.db = this.db;
this.p2p.chain = this.chain;
// Setup Chain of Events
this._initializeBitcoind();
this._initializeDatabase();
this._initializeChain();
this._initializeP2P();
this.on('ready', function() {
self.setSyncStrategy(Node.SYNC_STRATEGIES.BITCOIND);
});
};

View File

@ -23,11 +23,18 @@ using namespace v8;
extern void WaitForShutdown(boost::thread_group* threadGroup);
static termios orig_termios;
extern CTxMemPool mempool;
extern int64_t nTimeBestReceived;
/**
* Node.js Internal Function Templates
*/
static void
async_tip_update(uv_work_t *req);
static void
async_tip_update_after(uv_work_t *req);
static void
async_start_node(uv_work_t *req);
@ -84,6 +91,11 @@ static bool g_txindex = false;
* Used for async functions and necessary linked lists at points.
*/
struct async_tip_update_data {
size_t result;
Eternal<Function> callback;
};
/**
* async_node_data
* Where the uv async request data resides.
@ -188,6 +200,68 @@ set_cooked(void);
* Functions
*/
NAN_METHOD(OnTipUpdate) {
Isolate* isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
Local<Function> callback;
callback = Local<Function>::Cast(args[0]);
async_tip_update_data *data = new async_tip_update_data();
Eternal<Function> eternal(isolate, callback);
data->callback = eternal;
uv_work_t *req = new uv_work_t();
req->data = data;
int status = uv_queue_work(uv_default_loop(),
req, async_tip_update,
(uv_after_work_cb)async_tip_update_after);
assert(status == 0);
NanReturnValue(Undefined(isolate));
}
static void
async_tip_update(uv_work_t *req) {
async_tip_update_data *data = static_cast<async_tip_update_data*>(req->data);
int64_t nLastBest = nTimeBestReceived;
while(nLastBest == nTimeBestReceived) {
usleep(1E6);
}
data->result = chainActive.Height();
}
static void
async_tip_update_after(uv_work_t *req) {
Isolate* isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
async_tip_update_data *data = static_cast<async_tip_update_data*>(req->data);
Local<Function> cb = data->callback.Get(isolate);
const unsigned argc = 2;
Local<Value> argv[argc] = {
v8::Null(isolate),
Local<Value>::New(isolate, NanNew<Number>(data->result))
};
TryCatch try_catch;
cb->Call(isolate->GetCurrentContext()->Global(), argc, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
delete data;
delete req;
}
NAN_METHOD(OnBlocksReady) {
Isolate* isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
@ -212,7 +286,6 @@ NAN_METHOD(OnBlocksReady) {
assert(status == 0);
NanReturnValue(Undefined(isolate));
}
/**
@ -291,7 +364,6 @@ async_blocks_ready_after(uv_work_t *req) {
* bitcoind.start(callback)
* Start the bitcoind node with AppInit2() on a separate thread.
*/
NAN_METHOD(StartBitcoind) {
Isolate* isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
@ -1232,6 +1304,7 @@ init(Handle<Object> target) {
NODE_SET_METHOD(target, "start", StartBitcoind);
NODE_SET_METHOD(target, "onBlocksReady", OnBlocksReady);
NODE_SET_METHOD(target, "onTipUpdate", OnTipUpdate);
NODE_SET_METHOD(target, "stop", StopBitcoind);
NODE_SET_METHOD(target, "stopping", IsStopping);
NODE_SET_METHOD(target, "stopped", IsStopped);

View File

@ -16,6 +16,7 @@
NAN_METHOD(StartBitcoind);
NAN_METHOD(OnBlocksReady);
NAN_METHOD(OnTipUpdate);
NAN_METHOD(IsStopping);
NAN_METHOD(IsStopped);
NAN_METHOD(StopBitcoind);
@ -28,4 +29,3 @@ NAN_METHOD(GetMempoolOutputs);
NAN_METHOD(AddMempoolUncheckedTransaction);
NAN_METHOD(VerifyScript);
NAN_METHOD(SendTransaction);

View File

@ -41,30 +41,6 @@ describe('Bitcoind Node', function() {
BaseNode.prototype._loadConfiguration.called.should.equal(true);
});
});
describe('#setSyncStrategy', function() {
it('will call p2p.startSync', function() {
var node = new Node({});
node.p2p = {
startSync: sinon.spy()
};
node.setSyncStrategy(Node.SYNC_STRATEGIES.P2P);
node.p2p.startSync.callCount.should.equal(1);
});
it('will call this._syncBitcoind and disable p2p sync', function() {
var node = new Node({});
node.p2p = {};
node._syncBitcoind = sinon.spy();
node.setSyncStrategy(Node.SYNC_STRATEGIES.BITCOIND);
node._syncBitcoind.callCount.should.equal(1);
node.p2p.disableSync.should.equal(true);
});
it('will error with an unknown strategy', function() {
var node = new Node({});
(function(){
node.setSyncStrategy('unknown');
}).should.throw('Strategy "unknown" is unknown');
});
});
describe('#_loadBitcoinConf', function() {
it('will parse a bitcoin.conf file', function() {
var node = new Node({});
@ -84,53 +60,48 @@ describe('Bitcoind Node', function() {
describe('#_loadBitcoind', function() {
it('should initialize', function() {
var node = new Node({});
node._loadBitcoind({});
node._loadBitcoind({datadir: './test'});
should.exist(node.bitcoind);
});
it('should initialize with testnet', function() {
var node = new Node({});
node._loadBitcoind({testnet: true});
node._loadBitcoind({datadir: './test', testnet: true});
should.exist(node.bitcoind);
});
});
describe('#_syncBitcoind', function() {
it('will get and add block up to the tip height', function(done) {
var node = new Node({});
node.p2p = {
synced: false
};
node.Block = Block;
node.syncStrategy = Node.SYNC_STRATEGIES.BITCOIND;
node.setSyncStrategy = sinon.stub();
node.bitcoindHeight = 1;
var blockBuffer = new Buffer(blockData);
var block = Block.fromBuffer(blockBuffer);
node.bitcoind = {
getInfo: sinon.stub().returns({blocks: 2}),
getBlock: sinon.stub().callsArgWith(1, null, new Buffer(blockData))
getBlock: sinon.stub().callsArgWith(1, null, blockBuffer)
};
node.chain = {
tip: {
__height: 0
__height: 0,
hash: block.prevHash
},
addBlock: function(block, callback) {
saveMetadata: sinon.stub(),
emit: sinon.stub()
};
node.db = {
_onChainAddBlock: function(block, callback) {
node.chain.tip.__height += 1;
callback();
}
};
node.on('synced', function() {
node.p2p.synced.should.equal(true);
node.setSyncStrategy.callCount.should.equal(1);
done();
});
node._syncBitcoind();
});
it('will exit and emit error with error from bitcoind.getBlock', function(done) {
var node = new Node({});
node.p2p = {
synced: false
};
node.syncStrategy = Node.SYNC_STRATEGIES.BITCOIND;
node.setSyncStrategy = sinon.stub();
node.bitcoindHeight = 1;
node.bitcoind = {
getInfo: sinon.stub().returns({blocks: 2}),
getBlock: sinon.stub().callsArgWith(1, new Error('test error'))
};
node.chain = {
@ -144,28 +115,6 @@ describe('Bitcoind Node', function() {
});
node._syncBitcoind();
});
it('will exit if sync strategy is changed to bitcoind', function(done) {
var node = new Node({});
node.p2p = {
synced: false
};
node.syncStrategy = Node.SYNC_STRATEGIES.P2P;
node.setSyncStrategy = sinon.stub();
node.bitcoind = {
getInfo: sinon.stub().returns({blocks: 2})
};
node.chain = {
tip: {
__height: 0
}
};
node.on('synced', function() {
node.p2p.synced.should.equal(true);
node.setSyncStrategy.callCount.should.equal(1);
done();
});
node._syncBitcoind();
});
});
describe('#_loadNetwork', function() {
it('should use the testnet network if testnet is specified', function() {
@ -280,22 +229,6 @@ describe('Bitcoind Node', function() {
});
});
});
describe('#_loadP2P', function() {
it('should load p2p', function() {
var config = {};
var node = new Node(config);
node.db = {
Transaction: bitcore.Transaction
};
node.network = Networks.get('testnet');
node._loadP2P(config);
should.exist(node.p2p);
node.p2p.noListen.should.equal(true);
node.p2p.pool.network.should.deep.equal(node.network);
node.db.Transaction.should.equal(bitcore.Transaction);
});
});
describe('#_loadConsensus', function() {
var node = new Node({});
@ -327,6 +260,7 @@ describe('Bitcoind Node', function() {
it('will call db.initialize() on ready event', function(done) {
var node = new Node({});
node.bitcoind = new EventEmitter();
node.bitcoind.getInfo = sinon.stub().returns({blocks: 10});
node.db = {
initialize: sinon.spy()
};
@ -336,6 +270,7 @@ describe('Bitcoind Node', function() {
chainlib.log.info.callCount.should.equal(1);
chainlib.log.info.restore();
node.db.initialize.callCount.should.equal(1);
node.bitcoindHeight.should.equal(10);
done();
});
});
@ -388,24 +323,6 @@ describe('Bitcoind Node', function() {
});
describe('#_initializeChain', function() {
it('will call p2p.initialize() on ready event', function(done) {
var node = new Node({});
node.chain = new EventEmitter();
node.p2p = {
initialize: sinon.spy()
};
sinon.stub(chainlib.log, 'info');
node.chain.on('ready', function() {
setImmediate(function() {
chainlib.log.info.callCount.should.equal(1);
chainlib.log.info.restore();
node.p2p.initialize.callCount.should.equal(1);
done();
});
});
node._initializeChain();
node.chain.emit('ready');
});
it('will call emit an error from chain', function(done) {
var node = new Node({});
node.chain = new EventEmitter();
@ -419,41 +336,6 @@ describe('Bitcoind Node', function() {
});
});
describe('#_initializeP2P', function() {
it('will emit node "ready" when p2p is ready', function(done) {
var node = new Node({});
node.p2p = new EventEmitter();
sinon.stub(chainlib.log, 'info');
node.on('ready', function() {
chainlib.log.info.callCount.should.equal(1);
chainlib.log.info.restore();
done();
});
node._initializeP2P();
node.p2p.emit('ready');
});
it('will call emit an error from p2p', function(done) {
var node = new Node({});
node.p2p = new EventEmitter();
node.on('error', function(err) {
should.exist(err);
err.message.should.equal('test error');
done();
});
node._initializeP2P();
node.p2p.emit('error', new Error('test error'));
});
it('will relay synced event from p2p to node', function(done) {
var node = new Node({});
node.p2p = new EventEmitter();
node.on('synced', function() {
done();
});
node._initializeP2P();
node.p2p.emit('synced');
});
});
describe('#_initialize', function() {
it('should initialize', function(done) {
@ -461,13 +343,11 @@ describe('Bitcoind Node', function() {
node.chain = {};
node.Block = 'Block';
node.bitcoind = 'bitcoind';
node.p2p = {};
node.db = {};
node._initializeBitcoind = sinon.spy();
node._initializeDatabase = sinon.spy();
node._initializeChain = sinon.spy();
node._initializeP2P = sinon.spy();
node._initialize();
// references
@ -475,21 +355,16 @@ describe('Bitcoind Node', function() {
node.db.Block.should.equal(node.Block);
node.db.bitcoind.should.equal(node.bitcoind);
node.chain.db.should.equal(node.db);
node.chain.p2p.should.equal(node.p2p);
node.chain.db.should.equal(node.db);
node.p2p.db.should.equal(node.db);
node.p2p.chain.should.equal(node.chain);
// events
node._initializeBitcoind.callCount.should.equal(1);
node._initializeDatabase.callCount.should.equal(1);
node._initializeChain.callCount.should.equal(1);
node._initializeP2P.callCount.should.equal(1);
// start syncing
node.setSyncStrategy = sinon.spy();
node.on('ready', function() {
node.setSyncStrategy.callCount.should.equal(1);
done();
});
node.emit('ready');