Merge pull request #352 from braydonf/mem

Optimized address service mempool index size
This commit is contained in:
Patrick Nagurny 2015-11-03 17:23:28 -05:00
commit 02620a5b47
12 changed files with 1279 additions and 260 deletions

View File

@ -101,7 +101,11 @@ node.services.bitcoind.on('tip', function(blockHash) {
});
node.services.bitcoind.on('tx', function(txInfo) {
// a new transaction has been broadcast in the network
// a new transaction has entered the mempool
});
node.services.bitcoind.on('txleave', function(txLeaveInfo) {
// a new transaction has left the mempool
});
```
@ -110,7 +114,16 @@ The `txInfo` object will have the format:
```js
{
buffer: <Buffer...>,
mempool: true,
mempool: true, // will currently always be true
hash: '7426c707d0e9705bdd8158e60983e37d0f5d63529086d6672b07d9238d5aa623'
}
```
The `txLeaveInfo` object will have the format:
```js
{
buffer: <Buffer...>,
hash: '7426c707d0e9705bdd8158e60983e37d0f5d63529086d6672b07d9238d5aa623'
}
```

View File

@ -19,16 +19,6 @@ CustomService.prototype.blockHandler = function(block, add, callback) {
Take a look at the Address Service implementation for more details about how to encode the key, value for the best efficiency and ways to format the keys for streaming reads.
Additionally the mempool can have an index, the mempool index will be updated once bitcoind and the db have both fully synced. A service can implement a `resetMempoolIndex` method that will be run during this time, and the "synced" event will wait until this task has been finished:
```js
CustomService.prototype.resetMempoolIndex = function(callback) {
var transactionBuffers = this.node.services.bitcoind.getMempoolTransactions();
// interact over the transactions asynchronously here
callback();
};
```
## API Documentation
These methods are exposed over the JSON-RPC interface and can be called directly from a node via:

View File

@ -367,14 +367,27 @@ index f94771a..72ee00e 100644
diff --git a/src/net.h b/src/net.h
index 17502b9..e181d68 100644
index 17502b9..c9ae1b2 100644
--- a/src/net.h
+++ b/src/net.h
@@ -99,6 +99,7 @@ struct CNodeSignals
@@ -99,6 +99,8 @@ struct CNodeSignals
{
boost::signals2::signal<int ()> GetHeight;
boost::signals2::signal<bool (CNode*), CombinerAll> ProcessMessages;
+ boost::signals2::signal<bool (const CTransaction&)> TxToMemPool;
+ boost::signals2::signal<bool (const CTransaction&)> TxLeaveMemPool;
boost::signals2::signal<bool (CNode*, bool), CombinerAll> SendMessages;
boost::signals2::signal<void (NodeId, const CNode*)> InitializeNode;
boost::signals2::signal<void (NodeId)> FinalizeNode;
diff --git a/src/txmempool.cpp b/src/txmempool.cpp
index c3d1b60..03e265d 100644
--- a/src/txmempool.cpp
+++ b/src/txmempool.cpp
@@ -133,6 +133,7 @@ void CTxMemPool::remove(const CTransaction &origTx, std::list<CTransaction>& rem
if (!mapTx.count(hash))
continue;
const CTransaction& tx = mapTx[hash].GetTx();
+ GetNodeSignals().TxLeaveMemPool(tx);
if (fRecursive) {
for (unsigned int i = 0; i < tx.vout.size(); i++) {
std::map<COutPoint, CInPoint>::iterator it = mapNextTx.find(COutPoint(hash, i));

View File

@ -726,12 +726,15 @@ describe('Node Functionality', function() {
node.services.bitcoind.sendTransaction(tx.serialize());
setImmediate(function() {
var length = node.services.address.mempoolOutputIndex[address].length;
length.should.equal(1);
should.exist(node.services.address.mempoolOutputIndex[address]);
done();
var hashBuffer = bitcore.Address(address).hashBuffer;
node.services.address._getOutputsMempool(address, hashBuffer, function(err, outs) {
if (err) {
throw err;
}
outs.length.should.equal(1);
done();
});
});
});
});

View File

@ -360,6 +360,30 @@ describe('Daemon Binding Functionality', function() {
});
});
describe('transactions leaving the mempool', function() {
it('receive event when transaction leaves', function(done) {
// add transaction to build a new block
var tx = bitcore.Transaction();
tx.from(utxos[4]);
tx.change(privateKey.toAddress());
tx.to(destKey.toAddress(), utxos[4].amount * 1e8 - 1000);
tx.sign(bitcore.PrivateKey.fromWIF(utxos[4].privateKeyWIF));
bitcoind.sendTransaction(tx.serialize());
bitcoind.once('txleave', function(txInfo) {
txInfo.hash.should.equal(tx.hash);
done();
});
client.generate(1, function(err, response) {
if (err) {
throw err;
}
});
});
});
describe('mempool functionality', function() {
var fromAddress = 'mszYqVnqKoQx4jcTdJXxwKAissE3Jbrrc1';

View File

@ -1,14 +1,18 @@
'use strict';
var fs = require('fs');
var BaseService = require('../../service');
var inherits = require('util').inherits;
var async = require('async');
var mkdirp = require('mkdirp');
var index = require('../../');
var log = index.log;
var errors = index.errors;
var Transaction = require('../../transaction');
var bitcore = require('bitcore-lib');
var Networks = bitcore.Networks;
var levelup = require('levelup');
var leveldown = require('leveldown');
var memdown = require('memdown');
var $ = bitcore.util.preconditions;
var _ = bitcore.deps._;
var Hash = bitcore.crypto.Hash;
@ -35,11 +39,16 @@ var AddressService = function(options) {
this.subscriptions['address/balance'] = {};
this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this));
this.node.services.bitcoind.on('txleave', this.transactionLeaveHandler.bind(this));
this.mempoolOutputIndex = {};
this.mempoolInputIndex = {};
this.mempoolSpentIndex = {};
this._setMempoolIndexPath();
if (options.mempoolMemoryIndex) {
this.levelupStore = memdown;
} else {
this.levelupStore = leveldown;
}
this.mempoolIndex = null; // Used for larger mempool indexes
this.mempoolSpentIndex = {}; // Used for small quick synchronous lookups
};
inherits(AddressService, BaseService);
@ -55,9 +64,73 @@ AddressService.PREFIXES = {
SPENTSMAP: new Buffer('05', 'hex') // Get the input that spends an output
};
AddressService.MEMPREFIXES = {
OUTPUTS: new Buffer('01', 'hex'), // Query mempool outputs by address
SPENTS: new Buffer('02', 'hex'), // Query mempool inputs by address
SPENTSMAP: new Buffer('03', 'hex') // Query mempool for the input that spends an output
};
AddressService.SPACER_MIN = new Buffer('00', 'hex');
AddressService.SPACER_MAX = new Buffer('ff', 'hex');
AddressService.prototype.start = function(callback) {
var self = this;
async.series([
function(next) {
// Flush any existing mempool index
if (fs.existsSync(self.mempoolIndexPath)) {
leveldown.destroy(self.mempoolIndexPath, next);
} else {
setImmediate(next);
}
},
function(next) {
if (!fs.existsSync(self.mempoolIndexPath)) {
mkdirp(self.mempoolIndexPath, next);
} else {
setImmediate(next);
}
},
function(next) {
self.mempoolIndex = levelup(
self.mempoolIndexPath,
{
db: self.levelupStore,
keyEncoding: 'binary',
valueEncoding: 'binary',
fillCache: false
},
next
);
}
], callback);
};
AddressService.prototype.stop = function(callback) {
// TODO Keep track of ongoing db requests before shutting down
this.mempoolIndex.close(callback);
};
/**
* This function will set `this.dataPath` based on `this.node.network`.
* @private
*/
AddressService.prototype._setMempoolIndexPath = function() {
$.checkState(this.node.datadir, 'Node is expected to have a "datadir" property');
var regtest = Networks.get('regtest');
if (this.node.network === Networks.livenet) {
this.mempoolIndexPath = this.node.datadir + '/bitcore-addressmempool.db';
} else if (this.node.network === Networks.testnet) {
this.mempoolIndexPath = this.node.datadir + '/testnet3/bitcore-addressmempool.db';
} else if (this.node.network === regtest) {
this.mempoolIndexPath = this.node.datadir + '/regtest/bitcore-addressmempool.db';
} else {
throw new Error('Unknown network: ' + this.network);
}
};
/**
* Called by the Node to get the available API methods for this service,
* that can be exposed over the JSON-RPC interface.
@ -131,6 +204,17 @@ AddressService.prototype.transactionOutputHandler = function(messages, tx, outpu
}
};
/**
* This will handle data from the daemon "txleave" that a transaction has left the mempool.
* @param {Object} txInfo - The data from the daemon.on('txleave') event
* @param {Buffer} txInfo.buffer - The transaction buffer
* @param {String} txInfo.hash - The hash of the transaction
*/
AddressService.prototype.transactionLeaveHandler = function(txInfo) {
var tx = bitcore.Transaction().fromBuffer(txInfo.buffer);
this.updateMempoolIndex(tx, false);
};
/**
* This will handle data from the daemon "tx" event, go through each of the outputs
* and send messages by calling `transactionEventHandler` to any subscribers for a
@ -139,8 +223,10 @@ AddressService.prototype.transactionOutputHandler = function(messages, tx, outpu
* @param {Buffer} txInfo.buffer - The transaction buffer
* @param {Boolean} txInfo.mempool - If the transaction was accepted in the mempool
* @param {String} txInfo.hash - The hash of the transaction
* @param {Function} [callback] - Optional callback
*/
AddressService.prototype.transactionHandler = function(txInfo) {
AddressService.prototype.transactionHandler = function(txInfo, callback) {
var self = this;
// Basic transaction format is handled by the daemon
// and we can safely assume the buffer is properly formatted.
@ -153,39 +239,48 @@ AddressService.prototype.transactionHandler = function(txInfo) {
this.transactionOutputHandler(messages, tx, i, !txInfo.mempool);
}
// Update mempool index
if (txInfo.mempool) {
this.updateMempoolIndex(tx);
if (!callback) {
callback = function(err) {
if (err) {
return log.error(err);
}
};
}
for (var key in messages) {
this.transactionEventHandler(messages[key]);
this.balanceEventHandler(null, messages[key].addressInfo);
function finish(err) {
if (err) {
return callback(err);
}
for (var key in messages) {
self.transactionEventHandler(messages[key]);
self.balanceEventHandler(null, messages[key].addressInfo);
}
callback();
}
if (txInfo.mempool) {
self.updateMempoolIndex(tx, true, finish);
} else {
setImmediate(finish);
}
};
/**
* This function will update the mempool address index with the necessary
* information for further lookups. There are three indexes:
*
* mempoolOutputIndex, an object keyed by base58check encoded addresses with values:
* txid - A hex string of the transaction hash
* outputIndex - A number of the corresponding output
* satoshis - Total number of satoshis
* script - The script as a hex string
*
* mempoolInputIndex, an object keyed by base58check encoded addresses with values:
* txid - A hex string of the transaction hash
* inputIndex - A number of the corresponding input
*
* mempoolSpentIndex, an object keyed by <prevTxId>-<outputIndex> with (buffer) values:
* inputTxId - A 32 byte buffer of the input txid
* inputIndex - 4 bytes stored as UInt32BE
*
* information for further lookups.
* @param {Transaction} - An instance of a Bitcore Transaction
* @param {Boolean} - Add/remove from the index
*/
AddressService.prototype.updateMempoolIndex = function(tx) {
/* jshint maxstatements: 30 */
AddressService.prototype.updateMempoolIndex = function(tx, add, callback) {
/* jshint maxstatements: 100 */
var operations = [];
var action = 'put';
if (!add) {
action = 'del';
}
var txid = tx.hash;
var txidBuffer = new Buffer(txid, 'hex');
@ -201,21 +296,23 @@ AddressService.prototype.updateMempoolIndex = function(tx) {
continue;
}
var addressStr = bitcore.Address({
hashBuffer: addressInfo.hashBuffer,
type: addressInfo.addressType,
network: this.node.network
}).toString();
// Update output index
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
if (!this.mempoolOutputIndex[addressStr]) {
this.mempoolOutputIndex[addressStr] = [];
}
var outKey = Buffer.concat([
AddressService.MEMPREFIXES.OUTPUTS,
addressInfo.hashBuffer,
txidBuffer,
outputIndexBuffer
]);
this.mempoolOutputIndex[addressStr].push({
txid: txid,
outputIndex: outputIndex,
satoshis: output.satoshis,
script: output._scriptBuffer.toString('hex') //TODO use a buffer
var outValue = this._encodeOutputValue(output.satoshis, output._scriptBuffer);
operations.push({
type: action,
key: outKey,
value: outValue
});
}
@ -224,55 +321,75 @@ AddressService.prototype.updateMempoolIndex = function(tx) {
var input = tx.inputs[inputIndex];
// Update spent index
var spentIndexKey = [input.prevTxId.toString('hex'), input.outputIndex].join('-');
var inputOutputIndexBuffer = new Buffer(4);
inputOutputIndexBuffer.writeUInt32BE(input.outputIndex);
// Add an additional small spent index for fast synchronous lookups
var spentIndexSyncKey = this._encodeSpentIndexSyncKey(
input.prevTxId,
input.outputIndex
);
if (add) {
this.mempoolSpentIndex[spentIndexSyncKey] = true;
} else {
delete this.mempoolSpentIndex[spentIndexSyncKey];
}
// Add a more detailed spent index with values
var spentIndexKey = Buffer.concat([
AddressService.MEMPREFIXES.SPENTSMAP,
input.prevTxId,
inputOutputIndexBuffer
]);
var inputIndexBuffer = new Buffer(4);
inputIndexBuffer.writeUInt32BE(inputIndex);
var inputIndexValue = Buffer.concat([
txidBuffer,
inputIndexBuffer
]);
this.mempoolSpentIndex[spentIndexKey] = inputIndexValue;
operations.push({
type: action,
key: spentIndexKey,
value: inputIndexValue
});
var address = input.script.toAddress(this.node.network);
if (!address) {
// Update input index
var inputHashBuffer;
if (input.script.isPublicKeyHashIn()) {
inputHashBuffer = Hash.sha256ripemd160(input.script.chunks[1].buf);
} else if (input.script.isScriptHashIn()) {
inputHashBuffer = Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf);
} else {
continue;
}
var addressStr = address.toString();
if (!this.mempoolInputIndex[addressStr]) {
this.mempoolInputIndex[addressStr] = [];
}
this.mempoolInputIndex[addressStr].push({
txid: tx.hash, // TODO use buffer
inputIndex: inputIndex
var inputKey = Buffer.concat([
AddressService.MEMPREFIXES.SPENTS,
inputHashBuffer,
input.prevTxId,
inputOutputIndexBuffer
]);
var inputValue = Buffer.concat([
txidBuffer,
inputIndexBuffer
]);
operations.push({
type: action,
key: inputKey,
value: inputValue
});
}
};
if (!callback) {
callback = function(err) {
if (err) {
return log.error(err);
}
};
}
this.mempoolIndex.batch(operations, callback);
/**
* This function is called by the Database Service when the database itself
* has finished synchronization. It will retrieve a copy of all transactions
* from the mempool and create an address index for fast look-ups. The previous
* index will be reset.
*/
AddressService.prototype.resetMempoolIndex = function(callback) {
var self = this;
var transactionBuffers = self.node.services.bitcoind.getMempoolTransactions();
this.mempoolInputIndex = {};
this.mempoolOutputIndex = {};
this.mempoolSpentIndex = {};
async.each(transactionBuffers, function(txBuffer, next) {
var tx = Transaction().fromBuffer(txBuffer);
self.updateMempoolIndex(tx);
setImmediate(next);
}, function(err) {
if (err) {
return callback(err);
}
callback();
});
};
/**
@ -436,6 +553,16 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) {
});
};
AddressService.prototype._encodeSpentIndexSyncKey = function(txidBuffer, outputIndex) {
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
var key = Buffer.concat([
txidBuffer,
outputIndexBuffer
]);
return key.toString('binary');
};
AddressService.prototype._encodeOutputKey = function(hashBuffer, height, txidBuffer, outputIndex) {
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(height);
@ -748,15 +875,9 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options
txidBuffer = new Buffer(txid, 'hex');
}
if (options.queryMempool) {
var spentIndexKey = [txid.toString('hex'), outputIndex].join('-');
if (this.mempoolSpentIndex[spentIndexKey]) {
var mempoolValue = this.mempoolSpentIndex[spentIndexKey];
var inputTxId = mempoolValue.slice(0, 32);
var inputIndex = mempoolValue.readUInt32BE(32);
return callback(null, {
inputTxId: inputTxId.toString('hex'),
inputIndex: inputIndex
});
var spentIndexSyncKey = this._encodeSpentIndexSyncKey(txidBuffer, outputIndex);
if (this.mempoolSpentIndex[spentIndexSyncKey]) {
return this._getSpentMempool(txidBuffer, outputIndex, callback);
}
}
var key = this._encodeInputKeyMap(txidBuffer, outputIndex);
@ -867,26 +988,97 @@ AddressService.prototype.getInputs = function(addressStr, options, callback) {
}
if(options.queryMempool) {
var mempoolInputs = self.mempoolInputIndex[addressStr];
if (mempoolInputs) {
for(var i = 0; i < mempoolInputs.length; i++) {
var newInput = _.clone(mempoolInputs[i]);
newInput.address = addressStr;
newInput.height = -1;
newInput.confirmations = 0;
inputs.push(newInput);
self._getInputsMempool(addressStr, hashBuffer, function(err, mempoolInputs) {
if (err) {
return callback(err);
}
}
inputs = inputs.concat(mempoolInputs);
callback(null, inputs);
});
} else {
callback(null, inputs);
}
callback(null, inputs);
});
return stream;
};
AddressService.prototype._getInputsMempool = function(addressStr, hashBuffer, callback) {
var self = this;
var mempoolInputs = [];
var stream = self.mempoolIndex.createReadStream({
gte: Buffer.concat([
AddressService.MEMPREFIXES.SPENTS,
hashBuffer,
AddressService.SPACER_MIN
]),
lte: Buffer.concat([
AddressService.MEMPREFIXES.SPENTS,
hashBuffer,
AddressService.SPACER_MAX
]),
valueEncoding: 'binary',
keyEncoding: 'binary'
});
stream.on('data', function(data) {
var txid = data.value.slice(0, 32);
var inputIndex = data.value.readUInt32BE(32);
var output = {
address: addressStr,
txid: txid.toString('hex'), //TODO use a buffer
inputIndex: inputIndex,
height: -1,
confirmations: 0
};
mempoolInputs.push(output);
});
var error;
stream.on('error', function(streamError) {
if (streamError) {
error = streamError;
}
});
stream.on('close', function() {
if (error) {
return callback(error);
}
callback(null, mempoolInputs);
});
};
AddressService.prototype._getSpentMempool = function(txidBuffer, outputIndex, callback) {
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
var spentIndexKey = Buffer.concat([
AddressService.MEMPREFIXES.SPENTSMAP,
txidBuffer,
outputIndexBuffer
]);
this.mempoolIndex.get(
spentIndexKey,
function(err, mempoolValue) {
if (err) {
return callback(err);
}
var inputTxId = mempoolValue.slice(0, 32);
var inputIndex = mempoolValue.readUInt32BE(32);
callback(null, {
inputTxId: inputTxId.toString('hex'),
inputIndex: inputIndex
});
}
);
};
/**
* Will give outputs for an address as an object with:
* address - The base58check encoded address
@ -980,24 +1172,75 @@ AddressService.prototype.getOutputs = function(addressStr, options, callback) {
}
if(options.queryMempool) {
var mempoolOutputs = self.mempoolOutputIndex[addressStr];
if (mempoolOutputs) {
for(var i = 0; i < mempoolOutputs.length; i++) {
var newOutput = _.clone(mempoolOutputs[i]);
newOutput.address = addressStr;
newOutput.height = -1;
newOutput.confirmations = 0;
outputs.push(newOutput);
self._getOutputsMempool(addressStr, hashBuffer, function(err, mempoolOutputs) {
if (err) {
return callback(err);
}
}
outputs = outputs.concat(mempoolOutputs);
callback(null, outputs);
});
} else {
callback(null, outputs);
}
callback(null, outputs);
});
return stream;
};
AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, callback) {
var self = this;
var mempoolOutputs = [];
var stream = self.mempoolIndex.createReadStream({
gte: Buffer.concat([
AddressService.MEMPREFIXES.OUTPUTS,
hashBuffer,
AddressService.SPACER_MIN
]),
lte: Buffer.concat([
AddressService.MEMPREFIXES.OUTPUTS,
hashBuffer,
AddressService.SPACER_MAX
]),
valueEncoding: 'binary',
keyEncoding: 'binary'
});
stream.on('data', function(data) {
// Format of data: prefix: 1, hashBuffer: 20, txid: 32, outputIndex: 4
var txid = data.key.slice(21, 53);
var outputIndex = data.key.readUInt32BE(53);
var value = self._decodeOutputValue(data.value);
var output = {
address: addressStr,
txid: txid.toString('hex'), //TODO use a buffer
outputIndex: outputIndex,
height: -1,
satoshis: value.satoshis,
script: value.scriptBuffer.toString('hex'), //TODO use a buffer
confirmations: 0
};
mempoolOutputs.push(output);
});
var error;
stream.on('error', function(streamError) {
if (streamError) {
error = streamError;
}
});
stream.on('close', function() {
if (error) {
return callback(error);
}
callback(null, mempoolOutputs);
});
};
/**
* Will give unspent outputs for an address or an array of addresses.
* @param {Array|String} addresses - An array of addresses
@ -1088,8 +1331,9 @@ AddressService.prototype.isSpent = function(output, options, callback) {
var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid;
var spent = self.node.services.bitcoind.isSpent(txid, output.outputIndex);
if (!spent && queryMempool) {
var spentIndexKey = [txid, output.outputIndex].join('-');
spent = self.mempoolSpentIndex[spentIndexKey] ? true : false;
var txidBuffer = new Buffer(txid, 'hex');
var spentIndexSyncKey = this._encodeSpentIndexSyncKey(txidBuffer, output.outputIndex);
spent = self.mempoolSpentIndex[spentIndexSyncKey] ? true : false;
}
setImmediate(function() {
// TODO error should be the first argument?
@ -1097,6 +1341,7 @@ AddressService.prototype.isSpent = function(output, options, callback) {
});
};
/**
* This will give the history for many addresses limited by a range of block heights (to limit
* the database lookup times) and/or paginated to limit the results length.
@ -1195,8 +1440,11 @@ AddressService.prototype.getAddressSummary = function(address, options, callback
for(var i = 0; i < outputs.length; i++) {
// Bitcoind's isSpent only works for confirmed transactions
var spentDB = self.node.services.bitcoind.isSpent(outputs[i].txid, outputs[i].outputIndex);
var spentIndexKey = [outputs[i].txid, outputs[i].outputIndex].join('-');
var spentMempool = self.mempoolSpentIndex[spentIndexKey];
var spentIndexSyncKey = self._encodeSpentIndexSyncKey(
new Buffer(outputs[i].txid, 'hex'), // TODO: get buffer directly
outputs[i].outputIndex
);
var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey];
txids.push(outputs[i]);

View File

@ -110,12 +110,19 @@ Bitcoin.prototype._registerEventHandlers = function() {
// Set the height and emit a new tip
bindings.onTipUpdate(self._onTipUpdate.bind(this));
// Register callback function to handle incoming transactions
// Register callback function to handle transactions entering the mempool
bindings.startTxMon(function(txs) {
for(var i = 0; i < txs.length; i++) {
self.emit('tx', txs[i]);
}
});
// Register callback function to handle transactions leaving the mempool
bindings.startTxMonLeave(function(txs) {
for(var i = 0; i < txs.length; i++) {
self.emit('txleave', txs[i]);
}
});
};
Bitcoin.prototype._onReady = function(result, callback) {

View File

@ -454,24 +454,6 @@ DB.prototype.disconnectBlock = function(block, callback) {
this.runAllBlockHandlers(block, false, callback);
};
/**
* Will run all `resetMempoolIndex` methods implemented on sibling
* services to update the mempool indexes.
*/
DB.prototype.runAllMempoolIndexes = function(callback) {
async.eachSeries(
this.node.services,
function(service, next) {
if (service.resetMempoolIndex) {
service.resetMempoolIndex(next);
} else {
setImmediate(next);
}
},
callback
);
};
/**
* Will collect all database operations for a block from other services that implement
* `blockHandler` methods and then save operations to the database.
@ -745,15 +727,8 @@ DB.prototype.sync = function() {
}
if (self.node.services.bitcoind.isSynced()) {
self.runAllMempoolIndexes(function(err) {
if (err) {
Error.captureStackTrace(err);
return self.node.emit('error', err);
}
self.bitcoindSyncing = false;
self.node.emit('synced');
});
self.bitcoindSyncing = false;
self.node.emit('synced');
} else {
self.bitcoindSyncing = false;
}

View File

@ -39,6 +39,9 @@ extern int64_t nTimeBestReceived;
static void
tx_notifier(uv_async_t *handle);
static void
txleave_notifier(uv_async_t *handle);
static void
async_tip_update(uv_work_t *req);
@ -90,6 +93,9 @@ async_get_tx_and_info_after(uv_work_t *req);
static bool
queueTx(const CTransaction&);
static bool
queueTxLeave(const CTransaction&);
extern "C" void
init(Handle<Object>);
@ -98,9 +104,13 @@ init(Handle<Object>);
* Used only by bitcoind functions.
*/
static std::vector<CTransaction> txQueue;
static std::vector<CTransaction> txQueueLeave;
static uv_async_t txmon_async;
static uv_async_t txmonleave_async;
static Eternal<Function> txmon_callback;
static Eternal<Function> txmonleave_callback;
static bool txmon_callback_available;
static bool txmonleave_callback_available;
static volatile bool shutdown_complete = false;
static char *g_data_dir = NULL;
@ -259,6 +269,21 @@ NAN_METHOD(StartTxMon) {
info.GetReturnValue().Set(Null());
};
NAN_METHOD(StartTxMonLeave) {
Isolate* isolate = info.GetIsolate();
Local<Function> callback = Local<Function>::Cast(info[0]);
Eternal<Function> cb(isolate, callback);
txmonleave_callback = cb;
txmonleave_callback_available = true;
CNodeSignals& nodeSignals = GetNodeSignals();
nodeSignals.TxLeaveMemPool.connect(&queueTxLeave);
uv_async_init(uv_default_loop(), &txmonleave_async, txleave_notifier);
info.GetReturnValue().Set(Null());
};
static void
tx_notifier(uv_async_t *handle) {
Isolate* isolate = Isolate::GetCurrent();
@ -307,6 +332,53 @@ queueTx(const CTransaction& tx) {
return true;
}
static void
txleave_notifier(uv_async_t *handle) {
Isolate* isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
Local<Array> results = Array::New(isolate);
int arrayIndex = 0;
LOCK(cs_main);
BOOST_FOREACH(const CTransaction& tx, txQueueLeave) {
CDataStream ssTx(SER_NETWORK, PROTOCOL_VERSION);
ssTx << tx;
std::string stx = ssTx.str();
Nan::MaybeLocal<v8::Object> txBuffer = Nan::CopyBuffer((char *)stx.c_str(), stx.size());
uint256 hash = tx.GetHash();
Local<Object> obj = New<Object>();
Nan::Set(obj, New("buffer").ToLocalChecked(), txBuffer.ToLocalChecked());
Nan::Set(obj, New("hash").ToLocalChecked(), New(hash.GetHex()).ToLocalChecked());
results->Set(arrayIndex, obj);
arrayIndex++;
}
const unsigned argc = 1;
Local<Value> argv[argc] = {
Local<Value>::New(isolate, results)
};
Local<Function> cb = txmonleave_callback.Get(isolate);
cb->Call(isolate->GetCurrentContext()->Global(), argc, argv);
txQueueLeave.clear();
}
static bool
queueTxLeave(const CTransaction& tx) {
LOCK(cs_main);
txQueueLeave.push_back(tx);
uv_async_send(&txmonleave_async);
return true;
}
/**
* Functions
*/
@ -1527,6 +1599,7 @@ NAN_MODULE_INIT(init) {
Nan::Set(target, New("sendTransaction").ToLocalChecked(), GetFunction(New<FunctionTemplate>(SendTransaction)).ToLocalChecked());
Nan::Set(target, New("estimateFee").ToLocalChecked(), GetFunction(New<FunctionTemplate>(EstimateFee)).ToLocalChecked());
Nan::Set(target, New("startTxMon").ToLocalChecked(), GetFunction(New<FunctionTemplate>(StartTxMon)).ToLocalChecked());
Nan::Set(target, New("startTxMonLeave").ToLocalChecked(), GetFunction(New<FunctionTemplate>(StartTxMonLeave)).ToLocalChecked());
Nan::Set(target, New("syncPercentage").ToLocalChecked(), GetFunction(New<FunctionTemplate>(SyncPercentage)).ToLocalChecked());
Nan::Set(target, New("isSynced").ToLocalChecked(), GetFunction(New<FunctionTemplate>(IsSynced)).ToLocalChecked());
Nan::Set(target, New("getBestBlockHash").ToLocalChecked(), GetFunction(New<FunctionTemplate>(GetBestBlockHash)).ToLocalChecked());

File diff suppressed because it is too large Load Diff

View File

@ -144,7 +144,8 @@ describe('Bitcoin Service', function() {
name.should.equal('bitcoind.node');
return {
onTipUpdate: sinon.stub(),
startTxMon: sinon.stub().callsArgWith(0, [transaction])
startTxMon: sinon.stub().callsArgWith(0, [transaction]),
startTxMonLeave: sinon.stub().callsArgWith(0, [transaction])
};
}
});
@ -175,7 +176,8 @@ describe('Bitcoin Service', function() {
callback(height++);
});
},
startTxMon: sinon.stub()
startTxMon: sinon.stub(),
startTxMonLeave: sinon.stub()
};
}
});

View File

@ -839,7 +839,6 @@ describe('DB Service', function() {
var blockBuffer = new Buffer(blockData, 'hex');
var block = Block.fromBuffer(blockBuffer);
db.node.services = {};
db.runAllMempoolIndexes = sinon.stub().callsArg(0);
db.node.services.bitcoind = {
getBlock: sinon.stub().callsArgWith(1, null, blockBuffer),
isSynced: sinon.stub().returns(true),
@ -858,7 +857,6 @@ describe('DB Service', function() {
callback();
};
db.node.once('synced', function() {
db.runAllMempoolIndexes.callCount.should.equal(1);
done();
});
db.sync();