From f0ec42416154e2fd1442db842f8ee2e20cde3715 Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Thu, 29 Oct 2015 15:27:35 -0400 Subject: [PATCH 1/9] Added bindings to be able to listen to tx leaving mempool. --- etc/bitcoin.patch | 17 +++++++- integration/regtest.js | 24 +++++++++++ lib/services/bitcoind.js | 9 ++++- src/libbitcoind.cc | 73 ++++++++++++++++++++++++++++++++++ test/services/bitcoind.unit.js | 6 ++- 5 files changed, 124 insertions(+), 5 deletions(-) diff --git a/etc/bitcoin.patch b/etc/bitcoin.patch index e262753e..8de1497d 100644 --- a/etc/bitcoin.patch +++ b/etc/bitcoin.patch @@ -367,14 +367,27 @@ index f94771a..72ee00e 100644 diff --git a/src/net.h b/src/net.h -index 17502b9..e181d68 100644 +index 17502b9..c9ae1b2 100644 --- a/src/net.h +++ b/src/net.h -@@ -99,6 +99,7 @@ struct CNodeSignals +@@ -99,6 +99,8 @@ struct CNodeSignals { boost::signals2::signal GetHeight; boost::signals2::signal ProcessMessages; + boost::signals2::signal TxToMemPool; ++ boost::signals2::signal TxLeaveMemPool; boost::signals2::signal SendMessages; boost::signals2::signal InitializeNode; boost::signals2::signal FinalizeNode; +diff --git a/src/txmempool.cpp b/src/txmempool.cpp +index c3d1b60..03e265d 100644 +--- a/src/txmempool.cpp ++++ b/src/txmempool.cpp +@@ -133,6 +133,7 @@ void CTxMemPool::remove(const CTransaction &origTx, std::list& rem + if (!mapTx.count(hash)) + continue; + const CTransaction& tx = mapTx[hash].GetTx(); ++ GetNodeSignals().TxLeaveMemPool(tx); + if (fRecursive) { + for (unsigned int i = 0; i < tx.vout.size(); i++) { + std::map::iterator it = mapNextTx.find(COutPoint(hash, i)); diff --git a/integration/regtest.js b/integration/regtest.js index 9b80ac10..17f0b8e6 100644 --- a/integration/regtest.js +++ b/integration/regtest.js @@ -360,6 +360,30 @@ describe('Daemon Binding Functionality', function() { }); }); + describe('transactions leaving the mempool', function() { + it('receive event when transaction leaves', function(done) { + + // add transaction to build a new block + var tx = bitcore.Transaction(); + tx.from(utxos[4]); + tx.change(privateKey.toAddress()); + tx.to(destKey.toAddress(), utxos[4].amount * 1e8 - 1000); + tx.sign(bitcore.PrivateKey.fromWIF(utxos[4].privateKeyWIF)); + bitcoind.sendTransaction(tx.serialize()); + + bitcoind.once('txleave', function(txInfo) { + txInfo.hash.should.equal(tx.hash); + done(); + }); + + client.generate(1, function(err, response) { + if (err) { + throw err; + } + }); + }); + }); + describe('mempool functionality', function() { var fromAddress = 'mszYqVnqKoQx4jcTdJXxwKAissE3Jbrrc1'; diff --git a/lib/services/bitcoind.js b/lib/services/bitcoind.js index ece34092..e7a67467 100644 --- a/lib/services/bitcoind.js +++ b/lib/services/bitcoind.js @@ -110,12 +110,19 @@ Bitcoin.prototype._registerEventHandlers = function() { // Set the height and emit a new tip bindings.onTipUpdate(self._onTipUpdate.bind(this)); - // Register callback function to handle incoming transactions + // Register callback function to handle transactions entering the mempool bindings.startTxMon(function(txs) { for(var i = 0; i < txs.length; i++) { self.emit('tx', txs[i]); } }); + + // Register callback function to handle transactions leaving the mempool + bindings.startTxMonLeave(function(txs) { + for(var i = 0; i < txs.length; i++) { + self.emit('txleave', txs[i]); + } + }); }; Bitcoin.prototype._onReady = function(result, callback) { diff --git a/src/libbitcoind.cc b/src/libbitcoind.cc index c3e27274..b7b87b04 100644 --- a/src/libbitcoind.cc +++ b/src/libbitcoind.cc @@ -39,6 +39,9 @@ extern int64_t nTimeBestReceived; static void tx_notifier(uv_async_t *handle); +static void +txleave_notifier(uv_async_t *handle); + static void async_tip_update(uv_work_t *req); @@ -90,6 +93,9 @@ async_get_tx_and_info_after(uv_work_t *req); static bool queueTx(const CTransaction&); +static bool +queueTxLeave(const CTransaction&); + extern "C" void init(Handle); @@ -98,9 +104,13 @@ init(Handle); * Used only by bitcoind functions. */ static std::vector txQueue; +static std::vector txQueueLeave; static uv_async_t txmon_async; +static uv_async_t txmonleave_async; static Eternal txmon_callback; +static Eternal txmonleave_callback; static bool txmon_callback_available; +static bool txmonleave_callback_available; static volatile bool shutdown_complete = false; static char *g_data_dir = NULL; @@ -259,6 +269,21 @@ NAN_METHOD(StartTxMon) { info.GetReturnValue().Set(Null()); }; +NAN_METHOD(StartTxMonLeave) { + Isolate* isolate = info.GetIsolate(); + Local callback = Local::Cast(info[0]); + Eternal cb(isolate, callback); + txmonleave_callback = cb; + txmonleave_callback_available = true; + + CNodeSignals& nodeSignals = GetNodeSignals(); + nodeSignals.TxLeaveMemPool.connect(&queueTxLeave); + + uv_async_init(uv_default_loop(), &txmonleave_async, txleave_notifier); + + info.GetReturnValue().Set(Null()); +}; + static void tx_notifier(uv_async_t *handle) { Isolate* isolate = Isolate::GetCurrent(); @@ -307,6 +332,53 @@ queueTx(const CTransaction& tx) { return true; } +static void +txleave_notifier(uv_async_t *handle) { + Isolate* isolate = Isolate::GetCurrent(); + HandleScope scope(isolate); + + Local results = Array::New(isolate); + int arrayIndex = 0; + + LOCK(cs_main); + BOOST_FOREACH(const CTransaction& tx, txQueueLeave) { + + CDataStream ssTx(SER_NETWORK, PROTOCOL_VERSION); + ssTx << tx; + std::string stx = ssTx.str(); + Nan::MaybeLocal txBuffer = Nan::CopyBuffer((char *)stx.c_str(), stx.size()); + + uint256 hash = tx.GetHash(); + + Local obj = New(); + + Nan::Set(obj, New("buffer").ToLocalChecked(), txBuffer.ToLocalChecked()); + Nan::Set(obj, New("hash").ToLocalChecked(), New(hash.GetHex()).ToLocalChecked()); + + results->Set(arrayIndex, obj); + arrayIndex++; + } + + const unsigned argc = 1; + Local argv[argc] = { + Local::New(isolate, results) + }; + + Local cb = txmonleave_callback.Get(isolate); + + cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); + + txQueueLeave.clear(); + +} +static bool +queueTxLeave(const CTransaction& tx) { + LOCK(cs_main); + txQueueLeave.push_back(tx); + uv_async_send(&txmonleave_async); + return true; +} + /** * Functions */ @@ -1527,6 +1599,7 @@ NAN_MODULE_INIT(init) { Nan::Set(target, New("sendTransaction").ToLocalChecked(), GetFunction(New(SendTransaction)).ToLocalChecked()); Nan::Set(target, New("estimateFee").ToLocalChecked(), GetFunction(New(EstimateFee)).ToLocalChecked()); Nan::Set(target, New("startTxMon").ToLocalChecked(), GetFunction(New(StartTxMon)).ToLocalChecked()); + Nan::Set(target, New("startTxMonLeave").ToLocalChecked(), GetFunction(New(StartTxMonLeave)).ToLocalChecked()); Nan::Set(target, New("syncPercentage").ToLocalChecked(), GetFunction(New(SyncPercentage)).ToLocalChecked()); Nan::Set(target, New("isSynced").ToLocalChecked(), GetFunction(New(IsSynced)).ToLocalChecked()); Nan::Set(target, New("getBestBlockHash").ToLocalChecked(), GetFunction(New(GetBestBlockHash)).ToLocalChecked()); diff --git a/test/services/bitcoind.unit.js b/test/services/bitcoind.unit.js index 03853647..47156b50 100644 --- a/test/services/bitcoind.unit.js +++ b/test/services/bitcoind.unit.js @@ -144,7 +144,8 @@ describe('Bitcoin Service', function() { name.should.equal('bitcoind.node'); return { onTipUpdate: sinon.stub(), - startTxMon: sinon.stub().callsArgWith(0, [transaction]) + startTxMon: sinon.stub().callsArgWith(0, [transaction]), + startTxMonLeave: sinon.stub().callsArgWith(0, [transaction]) }; } }); @@ -175,7 +176,8 @@ describe('Bitcoin Service', function() { callback(height++); }); }, - startTxMon: sinon.stub() + startTxMon: sinon.stub(), + startTxMonLeave: sinon.stub() }; } }); From 086ba5fcfc7f7a32b720e361e87756af04ead062 Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Thu, 29 Oct 2015 17:13:36 -0400 Subject: [PATCH 2/9] Switch to remove items from mempool index as they leave asynchronously. --- lib/services/address/index.js | 111 ++++++++++++++++++++-------- test/services/address/index.unit.js | 36 +++------ 2 files changed, 92 insertions(+), 55 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 3a9d0e29..3cc62510 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -6,7 +6,6 @@ var async = require('async'); var index = require('../../'); var log = index.log; var errors = index.errors; -var Transaction = require('../../transaction'); var bitcore = require('bitcore-lib'); var levelup = require('levelup'); var $ = bitcore.util.preconditions; @@ -35,6 +34,7 @@ var AddressService = function(options) { this.subscriptions['address/balance'] = {}; this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); + this.node.services.bitcoind.on('txleave', this.transactionLeaveHandler.bind(this)); this.mempoolOutputIndex = {}; this.mempoolInputIndex = {}; @@ -131,6 +131,17 @@ AddressService.prototype.transactionOutputHandler = function(messages, tx, outpu } }; +/** + * This will handle data from the daemon "txleave" that a transaction has left the mempool. + * @param {Object} txInfo - The data from the daemon.on('txleave') event + * @param {Buffer} txInfo.buffer - The transaction buffer + * @param {String} txInfo.hash - The hash of the transaction + */ +AddressService.prototype.transactionLeaveHandler = function(txInfo) { + var tx = bitcore.Transaction().fromBuffer(txInfo.buffer); + this.removeMempoolIndex(tx); +}; + /** * This will handle data from the daemon "tx" event, go through each of the outputs * and send messages by calling `transactionEventHandler` to any subscribers for a @@ -164,6 +175,72 @@ AddressService.prototype.transactionHandler = function(txInfo) { } }; +AddressService.prototype.removeMempoolIndex = function(tx) { + + var txid = tx.hash; + + var outputLength = tx.outputs.length; + for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { + var output = tx.outputs[outputIndex]; + if (!output.script) { + continue; + } + var addressInfo = this._extractAddressInfoFromScript(output.script); + if (!addressInfo) { + continue; + } + + var addressStr = bitcore.Address({ + hashBuffer: addressInfo.hashBuffer, + type: addressInfo.addressType, + network: this.node.network + }).toString(); + + // Remove from the mempool output index + if (this.mempoolOutputIndex[addressStr]) { + var txs = this.mempoolOutputIndex[addressStr]; + for (var t = 0; t < txs.length; t++) { + if (txs[t].txid === txid && txs[t].outputIndex === outputIndex) { + txs.splice(t, 1); + } + } + if (txs.length === 0) { + delete this.mempoolOutputIndex[addressStr]; + } + } + } + var inputLength = tx.inputs.length; + for (var inputIndex = 0; inputIndex < inputLength; inputIndex++) { + + var input = tx.inputs[inputIndex]; + + // Remove from the mempool spent index + var spentIndexKey = [input.prevTxId.toString('hex'), input.outputIndex].join('-'); + if (this.mempoolSpentIndex[spentIndexKey]) { + delete this.mempoolSpentIndex[spentIndexKey]; + } + + var address = input.script.toAddress(this.node.network); + if (!address) { + continue; + } + var inputAddressStr = address.toString(); + + // Remove from the mempool input index + if (this.mempoolInputIndex[inputAddressStr]) { + var inputTxs = this.mempoolInputIndex[inputAddressStr]; + for (var x = 0; x < inputTxs.length; x++) { + if (inputTxs[x].txid === txid && inputTxs[x].inputIndex === inputIndex) { + inputTxs.splice(x, 1); + } + } + if (inputTxs.length === 0) { + delete this.mempoolInputIndex[inputAddressStr]; + } + } + } +}; + /** * This function will update the mempool address index with the necessary * information for further lookups. There are three indexes: @@ -239,11 +316,11 @@ AddressService.prototype.updateMempoolIndex = function(tx) { if (!address) { continue; } - var addressStr = address.toString(); - if (!this.mempoolInputIndex[addressStr]) { - this.mempoolInputIndex[addressStr] = []; + var inputAddressStr = address.toString(); + if (!this.mempoolInputIndex[inputAddressStr]) { + this.mempoolInputIndex[inputAddressStr] = []; } - this.mempoolInputIndex[addressStr].push({ + this.mempoolInputIndex[inputAddressStr].push({ txid: tx.hash, // TODO use buffer inputIndex: inputIndex }); @@ -251,30 +328,6 @@ AddressService.prototype.updateMempoolIndex = function(tx) { }; -/** - * This function is called by the Database Service when the database itself - * has finished synchronization. It will retrieve a copy of all transactions - * from the mempool and create an address index for fast look-ups. The previous - * index will be reset. - */ -AddressService.prototype.resetMempoolIndex = function(callback) { - var self = this; - var transactionBuffers = self.node.services.bitcoind.getMempoolTransactions(); - this.mempoolInputIndex = {}; - this.mempoolOutputIndex = {}; - this.mempoolSpentIndex = {}; - async.each(transactionBuffers, function(txBuffer, next) { - var tx = Transaction().fromBuffer(txBuffer); - self.updateMempoolIndex(tx); - setImmediate(next); - }, function(err) { - if (err) { - return callback(err); - } - callback(); - }); -}; - /** * This function is optimized to return address information about an output script * without constructing a Bitcore Address instance. diff --git a/test/services/address/index.unit.js b/test/services/address/index.unit.js index 41e65307..a63f6647 100644 --- a/test/services/address/index.unit.js +++ b/test/services/address/index.unit.js @@ -1036,9 +1036,8 @@ describe('Address Service', function() { }); }); }); - describe('#updateMempoolIndex', function() { + describe('#updateMempoolIndex/#removeMempoolIndex', function() { var am; - var db = {}; var tx = Transaction().fromBuffer(txBuf); before(function() { @@ -1049,35 +1048,20 @@ describe('Address Service', function() { am.updateMempoolIndex(tx); am.mempoolInputIndex['18Z29uNgWyUDtNyTKE1PaurbSR131EfANc'][0].txid.should.equal('45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea2'); am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5'][0].txid.should.equal('45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea2'); + Object.keys(am.mempoolSpentIndex).length.should.equal(14); am.mempoolInputIndex['1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8'].length.should.equal(12); am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5'].length.should.equal(1); }); - }); - describe('#resetMempoolIndex', function() { - var am; - var db = {}; - - before(function() { - var testnode = { - db: db, - services: { - bitcoind: { - getMempoolTransactions: sinon.stub().returns([txBuf]), - on: sinon.stub() - } - } - }; - am = new AddressService({node: testnode}); - am.updateMempoolIndex = sinon.stub(); - - }); - it('will reset the input and output indexes', function(done) { - am.resetMempoolIndex(function() { - am.updateMempoolIndex.callCount.should.equal(1); - done(); - }); + it('will remove the input and output indexes', function() { + am.removeMempoolIndex(tx); + should.not.exist(am.mempoolInputIndex['18Z29uNgWyUDtNyTKE1PaurbSR131EfANc']); + should.not.exist(am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5']); + Object.keys(am.mempoolSpentIndex).length.should.equal(0); + should.not.exist(am.mempoolInputIndex['1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8']); + should.not.exist(am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5']); }); + }); describe('#getAddressSummary', function() { var node = { From 89ef28f0b7ac140b0af525816be07cb08f0adb0d Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Fri, 30 Oct 2015 16:57:01 -0400 Subject: [PATCH 3/9] Optimize mempool address index memory footprint - Adds default to store a large portion of the mempool index in leveldb - Includes an option to use memdown to have the mempool index in-memory --- integration/regtest-node.js | 13 +- lib/services/address/index.js | 470 +++++++++++++++++++--------- test/services/address/index.unit.js | 409 +++++++++++++++++++----- 3 files changed, 667 insertions(+), 225 deletions(-) diff --git a/integration/regtest-node.js b/integration/regtest-node.js index 4e39bb97..8b55db7f 100644 --- a/integration/regtest-node.js +++ b/integration/regtest-node.js @@ -726,12 +726,15 @@ describe('Node Functionality', function() { node.services.bitcoind.sendTransaction(tx.serialize()); setImmediate(function() { - var length = node.services.address.mempoolOutputIndex[address].length; - length.should.equal(1); - should.exist(node.services.address.mempoolOutputIndex[address]); - done(); + var hashBuffer = bitcore.Address(address).hashBuffer; + node.services.address._getOutputsMempool(address, hashBuffer, function(err, outs) { + if (err) { + throw err; + } + outs.length.should.equal(1); + done(); + }); }); - }); }); diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 3cc62510..fa315217 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -1,13 +1,18 @@ 'use strict'; +var fs = require('fs'); var BaseService = require('../../service'); var inherits = require('util').inherits; var async = require('async'); +var mkdirp = require('mkdirp'); var index = require('../../'); var log = index.log; var errors = index.errors; var bitcore = require('bitcore-lib'); +var Networks = bitcore.Networks; var levelup = require('levelup'); +var leveldown = require('leveldown'); +var memdown = require('memdown'); var $ = bitcore.util.preconditions; var _ = bitcore.deps._; var Hash = bitcore.crypto.Hash; @@ -36,10 +41,14 @@ var AddressService = function(options) { this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); this.node.services.bitcoind.on('txleave', this.transactionLeaveHandler.bind(this)); - this.mempoolOutputIndex = {}; - this.mempoolInputIndex = {}; - this.mempoolSpentIndex = {}; - + this._setMempoolIndexPath(); + if (options.mempoolMemoryIndex) { + this.levelupStore = memdown; + } else { + this.levelupStore = leveldown; + } + this.mempoolIndex = null; // Used for larger mempool indexes + this.mempoolSpentIndex = {}; // Used for small quick synchronous lookups }; inherits(AddressService, BaseService); @@ -55,9 +64,73 @@ AddressService.PREFIXES = { SPENTSMAP: new Buffer('05', 'hex') // Get the input that spends an output }; +AddressService.MEMPREFIXES = { + OUTPUTS: new Buffer('01', 'hex'), // Query mempool outputs by address + SPENTS: new Buffer('02', 'hex'), // Query mempool inputs by address + SPENTSMAP: new Buffer('03', 'hex') // Query mempool for the input that spends an output +}; + AddressService.SPACER_MIN = new Buffer('00', 'hex'); AddressService.SPACER_MAX = new Buffer('ff', 'hex'); +AddressService.prototype.start = function(callback) { + var self = this; + + async.series([ + function(next) { + // Flush any existing mempool index + if (fs.existsSync(self.mempoolIndexPath)) { + leveldown.destroy(self.mempoolIndexPath, next); + } else { + setImmediate(next); + } + }, + function(next) { + if (!fs.existsSync(self.mempoolIndexPath)) { + mkdirp(self.mempoolIndexPath, next); + } else { + setImmediate(next); + } + }, + function(next) { + self.mempoolIndex = levelup( + self.mempoolIndexPath, + { + db: self.levelupStore, + keyEncoding: 'binary', + valueEncoding: 'binary', + fillCache: false + }, + next + ); + } + ], callback); + +}; + +AddressService.prototype.stop = function(callback) { + // TODO Keep track of ongoing db requests before shutting down + this.mempoolIndex.close(callback); +}; + +/** + * This function will set `this.dataPath` based on `this.node.network`. + * @private + */ +AddressService.prototype._setMempoolIndexPath = function() { + $.checkState(this.node.datadir, 'Node is expected to have a "datadir" property'); + var regtest = Networks.get('regtest'); + if (this.node.network === Networks.livenet) { + this.mempoolIndexPath = this.node.datadir + '/bitcore-addressmempool.db'; + } else if (this.node.network === Networks.testnet) { + this.mempoolIndexPath = this.node.datadir + '/testnet3/bitcore-addressmempool.db'; + } else if (this.node.network === regtest) { + this.mempoolIndexPath = this.node.datadir + '/regtest/bitcore-addressmempool.db'; + } else { + throw new Error('Unknown network: ' + this.network); + } +}; + /** * Called by the Node to get the available API methods for this service, * that can be exposed over the JSON-RPC interface. @@ -139,7 +212,7 @@ AddressService.prototype.transactionOutputHandler = function(messages, tx, outpu */ AddressService.prototype.transactionLeaveHandler = function(txInfo) { var tx = bitcore.Transaction().fromBuffer(txInfo.buffer); - this.removeMempoolIndex(tx); + this.updateMempoolIndex(tx, false); }; /** @@ -166,7 +239,7 @@ AddressService.prototype.transactionHandler = function(txInfo) { // Update mempool index if (txInfo.mempool) { - this.updateMempoolIndex(tx); + this.updateMempoolIndex(tx, true); } for (var key in messages) { @@ -175,94 +248,21 @@ AddressService.prototype.transactionHandler = function(txInfo) { } }; -AddressService.prototype.removeMempoolIndex = function(tx) { - - var txid = tx.hash; - - var outputLength = tx.outputs.length; - for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { - var output = tx.outputs[outputIndex]; - if (!output.script) { - continue; - } - var addressInfo = this._extractAddressInfoFromScript(output.script); - if (!addressInfo) { - continue; - } - - var addressStr = bitcore.Address({ - hashBuffer: addressInfo.hashBuffer, - type: addressInfo.addressType, - network: this.node.network - }).toString(); - - // Remove from the mempool output index - if (this.mempoolOutputIndex[addressStr]) { - var txs = this.mempoolOutputIndex[addressStr]; - for (var t = 0; t < txs.length; t++) { - if (txs[t].txid === txid && txs[t].outputIndex === outputIndex) { - txs.splice(t, 1); - } - } - if (txs.length === 0) { - delete this.mempoolOutputIndex[addressStr]; - } - } - } - var inputLength = tx.inputs.length; - for (var inputIndex = 0; inputIndex < inputLength; inputIndex++) { - - var input = tx.inputs[inputIndex]; - - // Remove from the mempool spent index - var spentIndexKey = [input.prevTxId.toString('hex'), input.outputIndex].join('-'); - if (this.mempoolSpentIndex[spentIndexKey]) { - delete this.mempoolSpentIndex[spentIndexKey]; - } - - var address = input.script.toAddress(this.node.network); - if (!address) { - continue; - } - var inputAddressStr = address.toString(); - - // Remove from the mempool input index - if (this.mempoolInputIndex[inputAddressStr]) { - var inputTxs = this.mempoolInputIndex[inputAddressStr]; - for (var x = 0; x < inputTxs.length; x++) { - if (inputTxs[x].txid === txid && inputTxs[x].inputIndex === inputIndex) { - inputTxs.splice(x, 1); - } - } - if (inputTxs.length === 0) { - delete this.mempoolInputIndex[inputAddressStr]; - } - } - } -}; - /** * This function will update the mempool address index with the necessary - * information for further lookups. There are three indexes: - * - * mempoolOutputIndex, an object keyed by base58check encoded addresses with values: - * txid - A hex string of the transaction hash - * outputIndex - A number of the corresponding output - * satoshis - Total number of satoshis - * script - The script as a hex string - * - * mempoolInputIndex, an object keyed by base58check encoded addresses with values: - * txid - A hex string of the transaction hash - * inputIndex - A number of the corresponding input - * - * mempoolSpentIndex, an object keyed by - with (buffer) values: - * inputTxId - A 32 byte buffer of the input txid - * inputIndex - 4 bytes stored as UInt32BE - * + * information for further lookups. * @param {Transaction} - An instance of a Bitcore Transaction + * @param {Boolean} - Add/remove from the index */ -AddressService.prototype.updateMempoolIndex = function(tx) { - /* jshint maxstatements: 30 */ +AddressService.prototype.updateMempoolIndex = function(tx, add) { + /* jshint maxstatements: 100 */ + + var operations = []; + + var action = 'put'; + if (!add) { + action = 'del'; + } var txid = tx.hash; var txidBuffer = new Buffer(txid, 'hex'); @@ -278,21 +278,23 @@ AddressService.prototype.updateMempoolIndex = function(tx) { continue; } - var addressStr = bitcore.Address({ - hashBuffer: addressInfo.hashBuffer, - type: addressInfo.addressType, - network: this.node.network - }).toString(); + // Update output index + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); - if (!this.mempoolOutputIndex[addressStr]) { - this.mempoolOutputIndex[addressStr] = []; - } + var outKey = Buffer.concat([ + AddressService.MEMPREFIXES.OUTPUTS, + addressInfo.hashBuffer, + txidBuffer, + outputIndexBuffer + ]); - this.mempoolOutputIndex[addressStr].push({ - txid: txid, - outputIndex: outputIndex, - satoshis: output.satoshis, - script: output._scriptBuffer.toString('hex') //TODO use a buffer + var outValue = this._encodeOutputValue(output.satoshis, output._scriptBuffer); + + operations.push({ + type: action, + key: outKey, + value: outValue }); } @@ -301,31 +303,71 @@ AddressService.prototype.updateMempoolIndex = function(tx) { var input = tx.inputs[inputIndex]; - // Update spent index - var spentIndexKey = [input.prevTxId.toString('hex'), input.outputIndex].join('-'); + var inputOutputIndexBuffer = new Buffer(4); + inputOutputIndexBuffer.writeUInt32BE(input.outputIndex); + // Add an additional small spent index for fast synchronous lookups + var spentIndexSyncKey = this._encodeSpentIndexSyncKey( + input.prevTxId, + input.outputIndex + ); + if (add) { + this.mempoolSpentIndex[spentIndexSyncKey] = true; + } else { + delete this.mempoolSpentIndex[spentIndexSyncKey]; + } + + // Add a more detailed spent index with values + var spentIndexKey = Buffer.concat([ + AddressService.MEMPREFIXES.SPENTSMAP, + input.prevTxId, + inputOutputIndexBuffer + ]); var inputIndexBuffer = new Buffer(4); inputIndexBuffer.writeUInt32BE(inputIndex); var inputIndexValue = Buffer.concat([ txidBuffer, inputIndexBuffer ]); - this.mempoolSpentIndex[spentIndexKey] = inputIndexValue; + operations.push({ + type: action, + key: spentIndexKey, + value: inputIndexValue + }); - var address = input.script.toAddress(this.node.network); - if (!address) { + // Update input index + var inputHashBuffer; + if (input.script.isPublicKeyHashIn()) { + inputHashBuffer = Hash.sha256ripemd160(input.script.chunks[1].buf); + } else if (input.script.isScriptHashIn()) { + inputHashBuffer = Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf); + } else { continue; } - var inputAddressStr = address.toString(); - if (!this.mempoolInputIndex[inputAddressStr]) { - this.mempoolInputIndex[inputAddressStr] = []; - } - this.mempoolInputIndex[inputAddressStr].push({ - txid: tx.hash, // TODO use buffer - inputIndex: inputIndex + var inputKey = Buffer.concat([ + AddressService.MEMPREFIXES.SPENTS, + inputHashBuffer, + input.prevTxId, + inputOutputIndexBuffer + ]); + var inputValue = Buffer.concat([ + txidBuffer, + inputIndexBuffer + ]); + operations.push({ + type: action, + key: inputKey, + value: inputValue }); + } + this.mempoolIndex.batch(operations, function(err) { + if (err) { + return log.error(err); + } + }); + }; /** @@ -489,6 +531,16 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) { }); }; +AddressService.prototype._encodeSpentIndexSyncKey = function(txidBuffer, outputIndex) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var key = Buffer.concat([ + txidBuffer, + outputIndexBuffer + ]); + return key.toString('binary'); +}; + AddressService.prototype._encodeOutputKey = function(hashBuffer, height, txidBuffer, outputIndex) { var heightBuffer = new Buffer(4); heightBuffer.writeUInt32BE(height); @@ -801,15 +853,9 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options txidBuffer = new Buffer(txid, 'hex'); } if (options.queryMempool) { - var spentIndexKey = [txid.toString('hex'), outputIndex].join('-'); - if (this.mempoolSpentIndex[spentIndexKey]) { - var mempoolValue = this.mempoolSpentIndex[spentIndexKey]; - var inputTxId = mempoolValue.slice(0, 32); - var inputIndex = mempoolValue.readUInt32BE(32); - return callback(null, { - inputTxId: inputTxId.toString('hex'), - inputIndex: inputIndex - }); + var spentIndexSyncKey = this._encodeSpentIndexSyncKey(txidBuffer, outputIndex); + if (this.mempoolSpentIndex[spentIndexSyncKey]) { + return this._getSpentMempool(txidBuffer, outputIndex, callback); } } var key = this._encodeInputKeyMap(txidBuffer, outputIndex); @@ -920,26 +966,97 @@ AddressService.prototype.getInputs = function(addressStr, options, callback) { } if(options.queryMempool) { - var mempoolInputs = self.mempoolInputIndex[addressStr]; - if (mempoolInputs) { - for(var i = 0; i < mempoolInputs.length; i++) { - var newInput = _.clone(mempoolInputs[i]); - newInput.address = addressStr; - newInput.height = -1; - newInput.confirmations = 0; - inputs.push(newInput); + self._getInputsMempool(addressStr, hashBuffer, function(err, mempoolInputs) { + if (err) { + return callback(err); } - } + inputs = inputs.concat(mempoolInputs); + callback(null, inputs); + }); + } else { + callback(null, inputs); } - callback(null, inputs); - }); return stream; }; +AddressService.prototype._getInputsMempool = function(addressStr, hashBuffer, callback) { + var self = this; + var mempoolInputs = []; + + var stream = self.mempoolIndex.createReadStream({ + gte: Buffer.concat([ + AddressService.MEMPREFIXES.SPENTS, + hashBuffer, + AddressService.SPACER_MIN + ]), + lte: Buffer.concat([ + AddressService.MEMPREFIXES.SPENTS, + hashBuffer, + AddressService.SPACER_MAX + ]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + + stream.on('data', function(data) { + var txid = data.value.slice(0, 32); + var inputIndex = data.value.readUInt32BE(32); + var output = { + address: addressStr, + txid: txid.toString('hex'), //TODO use a buffer + inputIndex: inputIndex, + height: -1, + confirmations: 0 + }; + mempoolInputs.push(output); + }); + + var error; + + stream.on('error', function(streamError) { + if (streamError) { + error = streamError; + } + }); + + stream.on('close', function() { + if (error) { + return callback(error); + } + callback(null, mempoolInputs); + }); + +}; + +AddressService.prototype._getSpentMempool = function(txidBuffer, outputIndex, callback) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var spentIndexKey = Buffer.concat([ + AddressService.MEMPREFIXES.SPENTSMAP, + txidBuffer, + outputIndexBuffer + ]); + + this.mempoolIndex.get( + spentIndexKey, + function(err, mempoolValue) { + if (err) { + return callback(err); + } + var inputTxId = mempoolValue.slice(0, 32); + var inputIndex = mempoolValue.readUInt32BE(32); + callback(null, { + inputTxId: inputTxId.toString('hex'), + inputIndex: inputIndex + }); + } + ); +}; + /** * Will give outputs for an address as an object with: * address - The base58check encoded address @@ -1033,24 +1150,75 @@ AddressService.prototype.getOutputs = function(addressStr, options, callback) { } if(options.queryMempool) { - var mempoolOutputs = self.mempoolOutputIndex[addressStr]; - if (mempoolOutputs) { - for(var i = 0; i < mempoolOutputs.length; i++) { - var newOutput = _.clone(mempoolOutputs[i]); - newOutput.address = addressStr; - newOutput.height = -1; - newOutput.confirmations = 0; - outputs.push(newOutput); + self._getOutputsMempool(addressStr, hashBuffer, function(err, mempoolOutputs) { + if (err) { + return callback(err); } - } + outputs = outputs.concat(mempoolOutputs); + callback(null, outputs); + }); + } else { + callback(null, outputs); } - callback(null, outputs); }); return stream; }; +AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, callback) { + var self = this; + var mempoolOutputs = []; + + var stream = self.mempoolIndex.createReadStream({ + gte: Buffer.concat([ + AddressService.MEMPREFIXES.OUTPUTS, + hashBuffer, + AddressService.SPACER_MIN + ]), + lte: Buffer.concat([ + AddressService.MEMPREFIXES.OUTPUTS, + hashBuffer, + AddressService.SPACER_MAX + ]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + + stream.on('data', function(data) { + // Format of data: prefix: 1, hashBuffer: 20, txid: 32, outputIndex: 4 + var txid = data.key.slice(21, 53); + var outputIndex = data.key.readUInt32BE(53); + var value = self._decodeOutputValue(data.value); + var output = { + address: addressStr, + txid: txid.toString('hex'), //TODO use a buffer + outputIndex: outputIndex, + height: -1, + satoshis: value.satoshis, + script: value.scriptBuffer.toString('hex'), //TODO use a buffer + confirmations: 0 + }; + mempoolOutputs.push(output); + }); + + var error; + + stream.on('error', function(streamError) { + if (streamError) { + error = streamError; + } + }); + + stream.on('close', function() { + if (error) { + return callback(error); + } + callback(null, mempoolOutputs); + }); + +}; + /** * Will give unspent outputs for an address or an array of addresses. * @param {Array|String} addresses - An array of addresses @@ -1141,8 +1309,8 @@ AddressService.prototype.isSpent = function(output, options, callback) { var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid; var spent = self.node.services.bitcoind.isSpent(txid, output.outputIndex); if (!spent && queryMempool) { - var spentIndexKey = [txid, output.outputIndex].join('-'); - spent = self.mempoolSpentIndex[spentIndexKey] ? true : false; + var spentIndexSyncKey = this._encodeSpentIndexSyncKey(output.prevTxId, output.outputIndex); + spent = self.mempoolSpentIndex[spentIndexSyncKey] ? true : false; } setImmediate(function() { // TODO error should be the first argument? @@ -1150,6 +1318,7 @@ AddressService.prototype.isSpent = function(output, options, callback) { }); }; + /** * This will give the history for many addresses limited by a range of block heights (to limit * the database lookup times) and/or paginated to limit the results length. @@ -1248,8 +1417,11 @@ AddressService.prototype.getAddressSummary = function(address, options, callback for(var i = 0; i < outputs.length; i++) { // Bitcoind's isSpent only works for confirmed transactions var spentDB = self.node.services.bitcoind.isSpent(outputs[i].txid, outputs[i].outputIndex); - var spentIndexKey = [outputs[i].txid, outputs[i].outputIndex].join('-'); - var spentMempool = self.mempoolSpentIndex[spentIndexKey]; + var spentIndexSyncKey = self._encodeSpentIndexSyncKey( + new Buffer(outputs[i].txid, 'hex'), // TODO: get buffer directly + outputs[i].outputIndex + ); + var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; txids.push(outputs[i]); diff --git a/test/services/address/index.unit.js b/test/services/address/index.unit.js index a63f6647..89fd0245 100644 --- a/test/services/address/index.unit.js +++ b/test/services/address/index.unit.js @@ -19,6 +19,8 @@ var mockdb = { }; var mocknode = { + network: Networks.testnet, + datadir: 'testdir', db: mockdb, services: { bitcoind: { @@ -31,7 +33,10 @@ describe('Address Service', function() { var txBuf = new Buffer(txData[0], 'hex'); describe('#getAPIMethods', function() { it('should return the correct methods', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var methods = am.getAPIMethods(); methods.length.should.equal(7); }); @@ -39,7 +44,10 @@ describe('Address Service', function() { describe('#getPublishEvents', function() { it('will return an array of publish event objects', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.subscribe = sinon.spy(); am.unsubscribe = sinon.spy(); var events = am.getPublishEvents(); @@ -75,7 +83,10 @@ describe('Address Service', function() { it('create a message for an address', function() { var txBuf = new Buffer('01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000', 'hex'); var tx = bitcore.Transaction().fromBuffer(txBuf); - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.node.network = Networks.livenet; var address = '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX'; var hashHex = bitcore.Address(address).hashBuffer.toString('hex'); @@ -94,7 +105,10 @@ describe('Address Service', function() { describe('#transactionHandler', function() { it('will pass outputs to transactionOutputHandler and call transactionEventHandler and balanceEventHandler', function() { var txBuf = new Buffer('01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000', 'hex'); - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var address = '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX'; var message = {}; am.transactionOutputHandler = function(messages) { @@ -113,7 +127,10 @@ describe('Address Service', function() { describe('#_extractAddressInfoFromScript', function() { var am; before(function() { - am = new AddressService({node: mocknode}); + am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.node.network = Networks.livenet; }); it('pay-to-publickey', function() { @@ -149,7 +166,10 @@ describe('Address Service', function() { var testBlock = bitcore.Block.fromString(blockData); before(function() { - am = new AddressService({node: mocknode}); + am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.node.network = Networks.livenet; }); @@ -204,7 +224,10 @@ describe('Address Service', function() { }); }); it('should continue if output script is null', function(done) { - var am = new AddressService({node: mocknode, network: 'livenet'}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode, + }); var block = { __height: 345003, @@ -236,6 +259,7 @@ describe('Address Service', function() { var testBlock = bitcore.Block.fromString(blockData); var db = {}; var testnode = { + datadir: 'testdir', db: db, services: { bitcoind: { @@ -243,7 +267,10 @@ describe('Address Service', function() { } } }; - var am = new AddressService({node: testnode, network: 'livenet'}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); am.transactionEventHandler = sinon.spy(); am.balanceEventHandler = sinon.spy(); @@ -269,15 +296,46 @@ describe('Address Service', function() { }); }); + describe('#_encodeSpentIndexSyncKey', function() { + it('will encode to 36 bytes (string)', function() { + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + var txidBuffer = new Buffer('3b6bc2939d1a70ce04bc4f619ee32608fbff5e565c1f9b02e4eaa97959c59ae7', 'hex'); + var key = am._encodeSpentIndexSyncKey(txidBuffer, 12); + key.length.should.equal(36); + }); + it('will be able to decode encoded value', function() { + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + var txid = '3b6bc2939d1a70ce04bc4f619ee32608fbff5e565c1f9b02e4eaa97959c59ae7'; + var txidBuffer = new Buffer(txid, 'hex'); + var key = am._encodeSpentIndexSyncKey(txidBuffer, 12); + var keyBuffer = new Buffer(key, 'binary'); + keyBuffer.slice(0, 32).toString('hex').should.equal(txid); + var outputIndex = keyBuffer.readUInt32BE(32); + outputIndex.should.equal(12); + }); + }); + describe('#_encodeInputKeyMap/#_decodeInputKeyMap roundtrip', function() { var encoded; var outputTxIdBuffer = new Buffer('3b6bc2939d1a70ce04bc4f619ee32608fbff5e565c1f9b02e4eaa97959c59ae7', 'hex'); it('encode key', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); encoded = am._encodeInputKeyMap(outputTxIdBuffer, 13); }); it('decode key', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var key = am._decodeInputKeyMap(encoded); key.outputTxId.toString('hex').should.equal(outputTxIdBuffer.toString('hex')); key.outputIndex.should.equal(13); @@ -288,11 +346,17 @@ describe('Address Service', function() { var encoded; var inputTxIdBuffer = new Buffer('3b6bc2939d1a70ce04bc4f619ee32608fbff5e565c1f9b02e4eaa97959c59ae7', 'hex'); it('encode key', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); encoded = am._encodeInputValueMap(inputTxIdBuffer, 7); }); it('decode key', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var key = am._decodeInputValueMap(encoded); key.inputTxId.toString('hex').should.equal(inputTxIdBuffer.toString('hex')); key.inputIndex.should.equal(7); @@ -301,7 +365,10 @@ describe('Address Service', function() { describe('#transactionEventHandler', function() { it('will emit a transaction if there is a subscriber', function(done) { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var emitter = new EventEmitter(); var address = bitcore.Address('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); am.subscriptions['address/transaction'] = {}; @@ -335,7 +402,10 @@ describe('Address Service', function() { describe('#balanceEventHandler', function() { it('will emit a balance if there is a subscriber', function(done) { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var emitter = new EventEmitter(); var address = bitcore.Address('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); am.subscriptions['address/balance'][address.hashBuffer.toString('hex')] = [emitter]; @@ -358,7 +428,10 @@ describe('Address Service', function() { describe('#subscribe', function() { it('will add emitters to the subscribers array (transaction)', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var emitter = new EventEmitter(); var address = bitcore.Address('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); @@ -378,7 +451,10 @@ describe('Address Service', function() { .should.deep.equal([emitter, emitter2]); }); it('will add an emitter to the subscribers array (balance)', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var emitter = new EventEmitter(); var name = 'address/balance'; var address = bitcore.Address('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); @@ -400,7 +476,10 @@ describe('Address Service', function() { describe('#unsubscribe', function() { it('will remove emitter from subscribers array (transaction)', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var emitter = new EventEmitter(); var emitter2 = new EventEmitter(); var address = bitcore.Address('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); @@ -411,7 +490,10 @@ describe('Address Service', function() { .should.deep.equal([emitter2]); }); it('will remove emitter from subscribers array (balance)', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var emitter = new EventEmitter(); var emitter2 = new EventEmitter(); var address = bitcore.Address('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); @@ -422,7 +504,10 @@ describe('Address Service', function() { .should.deep.equal([emitter2]); }); it('should unsubscribe from all addresses if no addresses are specified', function() { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var emitter = new EventEmitter(); var emitter2 = new EventEmitter(); var address1 = bitcore.Address('1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W'); @@ -439,7 +524,10 @@ describe('Address Service', function() { describe('#getBalance', function() { it('should sum up the unspent outputs', function(done) { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); var outputs = [ {satoshis: 1000}, {satoshis: 2000}, {satoshis: 3000} ]; @@ -452,7 +540,10 @@ describe('Address Service', function() { }); it('will handle error from unspent outputs', function(done) { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.getUnspentOutputs = sinon.stub().callsArgWith(2, new Error('error')); am.getBalance('someaddress', false, function(err) { should.exist(err); @@ -473,6 +564,8 @@ describe('Address Service', function() { } }; var testnode = { + network: Networks.testnet, + datadir: 'testdir', services: { db: db, bitcoind: { @@ -481,7 +574,10 @@ describe('Address Service', function() { } }; before(function() { - am = new AddressService({node: testnode}); + am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); }); it('will add mempool inputs on close', function(done) { @@ -492,6 +588,8 @@ describe('Address Service', function() { } }; var testnode = { + network: Networks.testnet, + datadir: 'testdir', services: { db: db, bitcoind: { @@ -499,19 +597,20 @@ describe('Address Service', function() { } } }; - var am = new AddressService({node: testnode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); var args = { start: 15, end: 12, queryMempool: true }; - am.mempoolInputIndex[address] = [ - { - address: address, - height: -1, - confirmations: 0 - } - ]; + am._getInputsMempool = sinon.stub().callsArgWith(2, null, { + address: address, + height: -1, + confirmations: 0 + }); am.getInputs(address, args, function(err, inputs) { should.not.exist(err); inputs.length.should.equal(1); @@ -542,6 +641,7 @@ describe('Address Service', function() { am.node.services.bitcoind = { getMempoolInputs: sinon.stub().returns([]) }; + am._getInputsMempool = sinon.stub().callsArgWith(2, null, []); am.getInputs(address, args, function(err, inputs) { should.not.exist(err); inputs.length.should.equal(1); @@ -615,6 +715,82 @@ describe('Address Service', function() { }); + describe('#_getInputsMempool', function() { + var am; + var address = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W'; + var hashBuffer = bitcore.Address(address).hashBuffer; + var db = { + tip: { + __height: 1 + } + }; + var testnode = { + network: Networks.testnet, + datadir: 'testdir', + services: { + db: db, + bitcoind: { + on: sinon.stub() + } + } + }; + before(function() { + am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); + }); + it('it will handle error', function(done) { + var testStream = new EventEmitter(); + am.mempoolIndex = {}; + am.mempoolIndex.createReadStream = sinon.stub().returns(testStream); + + am._getInputsMempool(address, hashBuffer, function(err, outputs) { + should.exist(err); + err.message.should.equal('readstreamerror'); + done(); + }); + + testStream.emit('error', new Error('readstreamerror')); + setImmediate(function() { + testStream.emit('close'); + }); + }); + it('it will parse data', function(done) { + var testStream = new EventEmitter(); + am.mempoolIndex = {}; + am.mempoolIndex.createReadStream = sinon.stub().returns(testStream); + + am._getInputsMempool(address, hashBuffer, function(err, outputs) { + should.not.exist(err); + outputs.length.should.equal(1); + outputs[0].address.should.equal(address); + outputs[0].txid.should.equal(txid); + outputs[0].inputIndex.should.equal(5); + outputs[0].height.should.equal(-1); + outputs[0].confirmations.should.equal(0); + done(); + }); + + var txid = '5d32f0fff6871c377e00c16f48ebb5e89c723d0b9dd25f68fdda70c3392bee61'; + var inputIndex = 5; + var inputIndexBuffer = new Buffer(4); + inputIndexBuffer.writeUInt32BE(inputIndex); + var valueData = Buffer.concat([ + new Buffer(txid, 'hex'), + inputIndexBuffer + ]); + + // Note: key is not used currently + testStream.emit('data', { + value: valueData + }); + setImmediate(function() { + testStream.emit('close'); + }); + }); + }); + describe('#getOutputs', function() { var am; var address = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W'; @@ -625,6 +801,8 @@ describe('Address Service', function() { } }; var testnode = { + network: Networks.testnet, + datadir: 'testdir', services: { db: db, bitcoind: { @@ -637,7 +815,10 @@ describe('Address Service', function() { }; before(function() { - am = new AddressService({node: testnode}); + am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); }); it('will get outputs for an address and timestamp', function(done) { @@ -658,6 +839,7 @@ describe('Address Service', function() { return testStream; } }; + am._getOutputsMempool = sinon.stub().callsArgWith(2, null, []); am.getOutputs(address, args, function(err, outputs) { should.not.exist(err); outputs.length.should.equal(1); @@ -684,16 +866,17 @@ describe('Address Service', function() { createReadStream: sinon.stub().returns(readStream1) }; - am.mempoolOutputIndex = {}; - - am.mempoolOutputIndex[address] = [ + am._getOutputsMempool = sinon.stub().callsArgWith(2, null, [ { + address: address, + height: -1, + confirmations: 0, txid: 'aa2db23f670596e96ed94c405fd11848c8f236d266ee96da37ecd919e53b4371', satoshis: 307627737, script: '76a914f6db95c81dea3d10f0ff8d890927751bf7b203c188ac', outputIndex: 0 } - ]; + ]); am.getOutputs(address, options, function(err, outputs) { should.not.exist(err); @@ -762,6 +945,8 @@ describe('Address Service', function() { var db = {}; var testnode = { + network: Networks.testnet, + datadir: 'testdir', services: { db: db, bitcoind: { @@ -769,7 +954,10 @@ describe('Address Service', function() { } } }; - var am = new AddressService({node: testnode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); am.getUnspentOutputsForAddress = function(address, queryMempool, callback) { var result = addresses[address]; if(result instanceof Error) { @@ -794,6 +982,8 @@ describe('Address Service', function() { var db = {}; var testnode = { + network: Networks.testnet, + datadir: 'testdir', db: db, services: { bitcoind: { @@ -801,7 +991,10 @@ describe('Address Service', function() { } } }; - var am = new AddressService({node: testnode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); am.getUnspentOutputsForAddress = function(address, queryMempool, callback) { var result = addresses[address]; if(result instanceof Error) { @@ -827,6 +1020,8 @@ describe('Address Service', function() { var db = {}; var testnode = { + network: Networks.testnet, + datadir: 'testdir', db: db, services: { bitcoind: { @@ -834,7 +1029,10 @@ describe('Address Service', function() { } } }; - var am = new AddressService({node: testnode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); am.getUnspentOutputsForAddress = function(address, queryMempool, callback) { var result = addresses[address]; if(result instanceof Error) { @@ -870,7 +1068,10 @@ describe('Address Service', function() { ]; var i = 0; - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.getOutputs = sinon.stub().callsArgWith(2, null, outputs); am.isUnspent = function(output, options, callback) { callback(!outputs[i].spent); @@ -886,7 +1087,10 @@ describe('Address Service', function() { }); }); it('should handle an error from getOutputs', function(done) { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.getOutputs = sinon.stub().callsArgWith(2, new Error('error')); am.getUnspentOutputsForAddress('1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W', false, function(err, outputs) { should.exist(err); @@ -895,7 +1099,10 @@ describe('Address Service', function() { }); }); it('should handle when there are no outputs', function(done) { - var am = new AddressService({node: mocknode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.getOutputs = sinon.stub().callsArgWith(2, null, []); am.getUnspentOutputsForAddress('1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W', false, function(err, outputs) { should.exist(err); @@ -910,7 +1117,10 @@ describe('Address Service', function() { var am; before(function() { - am = new AddressService({node: mocknode}); + am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); }); it('should give true when isSpent() gives false', function(done) { @@ -941,6 +1151,8 @@ describe('Address Service', function() { describe('#isSpent', function() { var db = {}; var testnode = { + network: Networks.testnet, + datadir: 'testdir', db: db, services: { bitcoind: { @@ -949,7 +1161,10 @@ describe('Address Service', function() { } }; it('should give true if bitcoind.isSpent gives true', function(done) { - var am = new AddressService({node: testnode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); am.node.services.bitcoind = { isSpent: sinon.stub().returns(true), on: sinon.stub() @@ -960,7 +1175,10 @@ describe('Address Service', function() { }); }); it('should give true if bitcoind.isSpent is false and mempoolSpentIndex is true', function(done) { - var am = new AddressService({node: testnode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); am.node.services.bitcoind = { isSpent: sinon.stub().returns(false), on: sinon.stub() @@ -971,15 +1189,23 @@ describe('Address Service', function() { prevTxId: new Buffer(txid, 'hex'), outputIndex: outputIndex }; - var spentKey = [txid, outputIndex].join('-'); - am.mempoolSpentIndex[spentKey] = new Buffer(5); + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var spentKey = Buffer.concat([ + new Buffer(txid, 'hex'), + outputIndexBuffer + ]).toString('binary'); + am.mempoolSpentIndex[spentKey] = true; am.isSpent(output, {queryMempool: true}, function(spent) { spent.should.equal(true); done(); }); }); it('should give false if spent in mempool with queryMempool set to false', function(done) { - var am = new AddressService({node: testnode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); am.node.services.bitcoind = { isSpent: sinon.stub().returns(false), on: sinon.stub() @@ -998,19 +1224,27 @@ describe('Address Service', function() { }); }); it('default to querying the mempool', function(done) { - var am = new AddressService({node: testnode}); + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); am.node.services.bitcoind = { isSpent: sinon.stub().returns(false), on: sinon.stub() }; - var txid = '3b6bc2939d1a70ce04bc4f619ee32608fbff5e565c1f9b02e4eaa97959c59ae7'; + var txidBuffer = new Buffer('3b6bc2939d1a70ce04bc4f619ee32608fbff5e565c1f9b02e4eaa97959c59ae7', 'hex'); var outputIndex = 0; var output = { - prevTxId: new Buffer(txid, 'hex'), + prevTxId: txidBuffer, outputIndex: outputIndex }; - var spentKey = [txid, outputIndex].join('-'); - am.mempoolSpentIndex[spentKey] = new Buffer(5); + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var spentKey = Buffer.concat([ + txidBuffer, + outputIndexBuffer + ]).toString('binary'); + am.mempoolSpentIndex[spentKey] = true; am.isSpent(output, {}, function(spent) { spent.should.equal(true); done(); @@ -1029,7 +1263,10 @@ describe('Address Service', function() { var TestAddressService = proxyquire('../../../lib/services/address', { './history': TestAddressHistory }); - var am = new TestAddressService({node: mocknode}); + var am = new TestAddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); am.getAddressHistory([], {}, function(err, history) { TestAddressHistory.prototype.get.callCount.should.equal(1); done(); @@ -1041,30 +1278,55 @@ describe('Address Service', function() { var tx = Transaction().fromBuffer(txBuf); before(function() { - am = new AddressService({node: mocknode}); + am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); }); it('will update the input and output indexes', function() { - am.updateMempoolIndex(tx); - am.mempoolInputIndex['18Z29uNgWyUDtNyTKE1PaurbSR131EfANc'][0].txid.should.equal('45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea2'); - am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5'][0].txid.should.equal('45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea2'); - Object.keys(am.mempoolSpentIndex).length.should.equal(14); - am.mempoolInputIndex['1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8'].length.should.equal(12); - am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5'].length.should.equal(1); + am.mempoolIndex = {}; + am.mempoolIndex.batch = function(operations, callback) { + callback.should.be.a('function'); + Object.keys(am.mempoolSpentIndex).length.should.equal(14); + for (var i = 0; i < operations.length; i++) { + operations[i].type.should.equal('put'); + } + var expectedValue = '45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea200000002'; + operations[7].value.toString('hex').should.equal(expectedValue); + var matches = 0; + for (var j = 0; j < operations.length; j++) { + var match = Buffer.concat([ + AddressService.MEMPREFIXES.SPENTS, + bitcore.Address('1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8').hashBuffer + ]).toString('hex'); + + if (operations[j].key.slice(0, 21).toString('hex') === match) { + matches++; + } + } + matches.should.equal(12); + }; + am.updateMempoolIndex(tx, true); }); it('will remove the input and output indexes', function() { - am.removeMempoolIndex(tx); - should.not.exist(am.mempoolInputIndex['18Z29uNgWyUDtNyTKE1PaurbSR131EfANc']); - should.not.exist(am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5']); - Object.keys(am.mempoolSpentIndex).length.should.equal(0); - should.not.exist(am.mempoolInputIndex['1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8']); - should.not.exist(am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5']); + am.mempoolIndex = {}; + am.mempoolIndex.batch = function(operations, callback) { + callback.should.be.a('function'); + Object.keys(am.mempoolSpentIndex).length.should.equal(0); + for (var i = 0; i < operations.length; i++) { + operations[i].type.should.equal('del'); + } + }; + am.updateMempoolIndex(tx, false); }); }); describe('#getAddressSummary', function() { var node = { + datadir: 'testdir', + network: Networks.testnet, services: { bitcoind: { isSpent: sinon.stub().returns(false), @@ -1101,13 +1363,18 @@ describe('Address Service', function() { } ]; - var as = new AddressService({node: node}); + var as = new AddressService({ + mempoolMemoryIndex: true, + node: node + }); as.getInputs = sinon.stub().callsArgWith(2, null, inputs); as.getOutputs = sinon.stub().callsArgWith(2, null, outputs); - as.mempoolSpentIndex = { - '689e9f543fa4aa5b2daa3b5bb65f9a00ad5aa1a2e9e1fc4e11061d85f2aa9bc5-0': true - }; - + var key = Buffer.concat([ + new Buffer('689e9f543fa4aa5b2daa3b5bb65f9a00ad5aa1a2e9e1fc4e11061d85f2aa9bc5', 'hex'), + new Buffer(Array(4)) + ]).toString('binary'); + as.mempoolSpentIndex = {}; + as.mempoolSpentIndex[key] = true; it('should handle unconfirmed and confirmed outputs and inputs', function(done) { as.getAddressSummary('mpkDdnLq26djg17s6cYknjnysAm3QwRzu2', {}, function(err, summary) { should.not.exist(err); From 5ac3b1c61f46cc3d85f877316f73ee10369e1cc5 Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Mon, 2 Nov 2015 17:10:43 -0500 Subject: [PATCH 4/9] Address: Added unit tests for new mempool index methods --- test/services/address/index.unit.js | 385 ++++++++++++++++++++++++++++ 1 file changed, 385 insertions(+) diff --git a/test/services/address/index.unit.js b/test/services/address/index.unit.js index 89fd0245..c3bdcdee 100644 --- a/test/services/address/index.unit.js +++ b/test/services/address/index.unit.js @@ -7,6 +7,8 @@ var bitcorenode = require('../../../'); var AddressService = bitcorenode.services.Address; var blockData = require('../../data/livenet-345003.json'); var bitcore = require('bitcore-lib'); +var memdown = require('memdown'); +var leveldown = require('leveldown'); var Script = bitcore.Script; var Address = bitcore.Address; var Networks = bitcore.Networks; @@ -31,6 +33,258 @@ var mocknode = { describe('Address Service', function() { var txBuf = new Buffer(txData[0], 'hex'); + + describe('@constructor', function() { + it('config to use memdown for mempool index', function() { + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.levelupStore.should.equal(memdown); + }); + it('config to use leveldown for mempool index', function() { + var am = new AddressService({ + node: mocknode + }); + am.levelupStore.should.equal(leveldown); + }); + }); + + describe('#start', function() { + it('will flush existing mempool', function(done) { + var leveldownmock = { + destroy: sinon.stub().callsArgWith(1, null) + }; + var TestAddressService = proxyquire('../../../lib/services/address', { + 'fs': { + existsSync: sinon.stub().returns(true) + }, + 'leveldown': leveldownmock, + 'levelup': sinon.stub().callsArgWith(2, null), + 'mkdirp': sinon.stub().callsArgWith(1, null) + }); + var am = new TestAddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.start(function() { + leveldownmock.destroy.callCount.should.equal(1); + leveldownmock.destroy.args[0][0].should.equal('testdir/testnet3/bitcore-addressmempool.db'); + done(); + }); + }); + it('will mkdirp if directory does not exist', function(done) { + var leveldownmock = { + destroy: sinon.stub().callsArgWith(1, null) + }; + var mkdirpmock = sinon.stub().callsArgWith(1, null); + var TestAddressService = proxyquire('../../../lib/services/address', { + 'fs': { + existsSync: sinon.stub().returns(false) + }, + 'leveldown': leveldownmock, + 'levelup': sinon.stub().callsArgWith(2, null), + 'mkdirp': mkdirpmock + }); + var am = new TestAddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.start(function() { + mkdirpmock.callCount.should.equal(1); + mkdirpmock.args[0][0].should.equal('testdir/testnet3/bitcore-addressmempool.db'); + done(); + }); + }); + it('start levelup db for mempool index', function(done) { + var TestAddressService = proxyquire('../../../lib/services/address', { + 'fs': { + existsSync: sinon.stub().returns(true) + }, + 'leveldown': { + destroy: sinon.stub().callsArgWith(1, null) + }, + 'levelup': function(dbPath, options, callback) { + dbPath.should.equal('testdir/testnet3/bitcore-addressmempool.db'); + options.db.should.equal(memdown); + options.keyEncoding.should.equal('binary'); + options.valueEncoding.should.equal('binary'); + options.fillCache.should.equal(false); + setImmediate(callback); + }, + 'mkdirp': sinon.stub().callsArgWith(1, null) + }); + var am = new TestAddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.start(function() { + done(); + }); + }); + it('handle error from mkdirp', function(done) { + var TestAddressService = proxyquire('../../../lib/services/address', { + 'fs': { + existsSync: sinon.stub().returns(false) + }, + 'leveldown': { + destroy: sinon.stub().callsArgWith(1, null) + }, + 'levelup': sinon.stub().callsArgWith(2, null), + 'mkdirp': sinon.stub().callsArgWith(1, new Error('testerror')) + }); + var am = new TestAddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.start(function(err) { + err.message.should.equal('testerror'); + done(); + }); + }); + it('handle error from levelup', function(done) { + var TestAddressService = proxyquire('../../../lib/services/address', { + 'fs': { + existsSync: sinon.stub().returns(false) + }, + 'leveldown': { + destroy: sinon.stub().callsArgWith(1, null) + }, + 'levelup': sinon.stub().callsArgWith(2, new Error('leveltesterror')), + 'mkdirp': sinon.stub().callsArgWith(1, null) + }); + var am = new TestAddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.start(function(err) { + err.message.should.equal('leveltesterror'); + done(); + }); + }); + it('handle error from leveldown.destroy', function(done) { + var TestAddressService = proxyquire('../../../lib/services/address', { + 'fs': { + existsSync: sinon.stub().returns(true) + }, + 'leveldown': { + destroy: sinon.stub().callsArgWith(1, new Error('destroy')) + }, + 'levelup': sinon.stub().callsArgWith(2, null), + 'mkdirp': sinon.stub().callsArgWith(1, null) + }); + var am = new TestAddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.start(function(err) { + err.message.should.equal('destroy'); + done(); + }); + }); + }); + + describe('#stop', function() { + it('will close mempool levelup', function(done) { + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.mempoolIndex = {}; + am.mempoolIndex.close = sinon.stub().callsArg(0); + am.stop(function() { + am.mempoolIndex.close.callCount.should.equal(1); + done(); + }); + }); + }); + + describe('#_setMempoolIndexPath', function() { + it('should set the database path', function() { + var testnode = { + network: Networks.livenet, + datadir: process.env.HOME + '/.bitcoin', + services: { + bitcoind: { + on: sinon.stub() + } + } + }; + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); + am._setMempoolIndexPath(); + am.mempoolIndexPath.should.equal(process.env.HOME + '/.bitcoin/bitcore-addressmempool.db'); + }); + it('should load the db for testnet', function() { + var testnode = { + network: Networks.testnet, + datadir: process.env.HOME + '/.bitcoin', + services: { + bitcoind: { + on: sinon.stub() + } + } + }; + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); + am._setMempoolIndexPath(); + am.mempoolIndexPath.should.equal(process.env.HOME + '/.bitcoin/testnet3/bitcore-addressmempool.db'); + }); + it('error with unknown network', function() { + var testnode = { + network: 'unknown', + datadir: process.env.HOME + '/.bitcoin', + services: { + bitcoind: { + on: sinon.stub() + } + } + }; + (function() { + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); + }).should.throw('Unknown network'); + }); + it('should load the db with regtest', function() { + // Switch to use regtest + // Networks.remove(Networks.testnet); + Networks.add({ + name: 'regtest', + alias: 'regtest', + pubkeyhash: 0x6f, + privatekey: 0xef, + scripthash: 0xc4, + xpubkey: 0x043587cf, + xprivkey: 0x04358394, + networkMagic: 0xfabfb5da, + port: 18444, + dnsSeeds: [ ] + }); + var regtest = Networks.get('regtest'); + var testnode = { + network: regtest, + datadir: process.env.HOME + '/.bitcoin', + services: { + bitcoind: { + on: sinon.stub() + } + } + }; + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); + am.mempoolIndexPath.should.equal(process.env.HOME + '/.bitcoin/regtest/bitcore-addressmempool.db'); + Networks.remove(regtest); + }); + }); + describe('#getAPIMethods', function() { it('should return the correct methods', function() { var am = new AddressService({ @@ -791,6 +1045,50 @@ describe('Address Service', function() { }); }); + describe('#_getSpentMempool', function() { + it('will decode data from the database', function() { + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.mempoolIndex = {}; + var mempoolValue = Buffer.concat([ + new Buffer('85630d684f1f414264f88a31bddfc79dd0c00659330dcdc393b321c121f4078b', 'hex'), + new Buffer('00000003', 'hex') + ]); + am.mempoolIndex.get = sinon.stub().callsArgWith(1, null, mempoolValue); + var prevTxIdBuffer = new Buffer('e7888264d286be2da26b0a4dbd2fc5c9ece82b3e40e6791b137e4155b6da8981', 'hex'); + var outputIndex = 1; + var outputIndexBuffer = new Buffer('00000001', 'hex'); + var expectedKey = Buffer.concat([ + new Buffer('03', 'hex'), + prevTxIdBuffer, + outputIndexBuffer + ]).toString('hex'); + am._getSpentMempool(prevTxIdBuffer, outputIndex, function(err, value) { + if (err) { + throw err; + } + am.mempoolIndex.get.args[0][0].toString('hex').should.equal(expectedKey); + value.inputTxId.should.equal('85630d684f1f414264f88a31bddfc79dd0c00659330dcdc393b321c121f4078b'); + value.inputIndex.should.equal(3); + }); + }); + it('handle error from levelup', function() { + var am = new AddressService({ + mempoolMemoryIndex: true, + node: mocknode + }); + am.mempoolIndex = {}; + am.mempoolIndex.get = sinon.stub().callsArgWith(1, new Error('test')); + var prevTxIdBuffer = new Buffer('e7888264d286be2da26b0a4dbd2fc5c9ece82b3e40e6791b137e4155b6da8981', 'hex'); + var outputIndex = 1; + am._getSpentMempool(prevTxIdBuffer, outputIndex, function(err) { + err.message.should.equal('test'); + }); + }); + }); + describe('#getOutputs', function() { var am; var address = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W'; @@ -935,6 +1233,93 @@ describe('Address Service', function() { }); }); + describe('#_getOutputsMempool', function() { + var am; + var address = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W'; + var hashBuffer = bitcore.Address(address).hashBuffer; + var db = { + tip: { + __height: 1 + } + }; + var testnode = { + network: Networks.testnet, + datadir: 'testdir', + services: { + db: db, + bitcoind: { + on: sinon.stub() + } + } + }; + before(function() { + am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); + }); + it('it will handle error', function(done) { + var testStream = new EventEmitter(); + am.mempoolIndex = {}; + am.mempoolIndex.createReadStream = sinon.stub().returns(testStream); + am._getOutputsMempool(address, hashBuffer, function(err, outputs) { + should.exist(err); + err.message.should.equal('readstreamerror'); + done(); + }); + testStream.emit('error', new Error('readstreamerror')); + setImmediate(function() { + testStream.emit('close'); + }); + }); + it('it will parse data', function(done) { + var testStream = new EventEmitter(); + am.mempoolIndex = {}; + am.mempoolIndex.createReadStream = sinon.stub().returns(testStream); + + am._getOutputsMempool(address, hashBuffer, function(err, outputs) { + if (err) { + throw err; + } + outputs.length.should.equal(1); + outputs[0].address.should.equal(address); + outputs[0].txid.should.equal(txid); + outputs[0].outputIndex.should.equal(outputIndex); + outputs[0].height.should.equal(-1); + outputs[0].satoshis.should.equal(3); + outputs[0].script.should.equal('ac'); + outputs[0].confirmations.should.equal(0); + done(); + }); + + var txid = '5d32f0fff6871c377e00c16f48ebb5e89c723d0b9dd25f68fdda70c3392bee61'; + var txidBuffer = new Buffer(txid, 'hex'); + var outputIndex = 3; + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var keyData = Buffer.concat([ + new Buffer('01', 'hex'), + hashBuffer, + txidBuffer, + outputIndexBuffer + ]); + + var valueData = Buffer.concat([ + new Buffer('4008000000000000', 'hex'), + new Buffer('ac', 'hex') + ]); + + // Note: key is not used currently + testStream.emit('data', { + key: keyData, + value: valueData + }); + setImmediate(function() { + testStream.emit('close'); + }); + }); + }); + describe('#getUnspentOutputs', function() { it('should concatenate utxos for multiple addresses, even those with none found', function(done) { var addresses = { From a1bae366b36d221d422e410d95782210593b460e Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Mon, 2 Nov 2015 17:17:03 -0500 Subject: [PATCH 5/9] Database: Removed `runAllMempoolIndexes` method Replaced with using `tx` and `txleave` to manage the state of the mempool indexes. --- docs/services/db.md | 10 ---------- lib/services/db.js | 29 ++--------------------------- test/services/db.unit.js | 2 -- 3 files changed, 2 insertions(+), 39 deletions(-) diff --git a/docs/services/db.md b/docs/services/db.md index 305587a0..57439247 100644 --- a/docs/services/db.md +++ b/docs/services/db.md @@ -19,16 +19,6 @@ CustomService.prototype.blockHandler = function(block, add, callback) { Take a look at the Address Service implementation for more details about how to encode the key, value for the best efficiency and ways to format the keys for streaming reads. -Additionally the mempool can have an index, the mempool index will be updated once bitcoind and the db have both fully synced. A service can implement a `resetMempoolIndex` method that will be run during this time, and the "synced" event will wait until this task has been finished: - -```js -CustomService.prototype.resetMempoolIndex = function(callback) { - var transactionBuffers = this.node.services.bitcoind.getMempoolTransactions(); - // interact over the transactions asynchronously here - callback(); -}; -``` - ## API Documentation These methods are exposed over the JSON-RPC interface and can be called directly from a node via: diff --git a/lib/services/db.js b/lib/services/db.js index 8ad70bd5..0c655051 100644 --- a/lib/services/db.js +++ b/lib/services/db.js @@ -454,24 +454,6 @@ DB.prototype.disconnectBlock = function(block, callback) { this.runAllBlockHandlers(block, false, callback); }; -/** - * Will run all `resetMempoolIndex` methods implemented on sibling - * services to update the mempool indexes. - */ -DB.prototype.runAllMempoolIndexes = function(callback) { - async.eachSeries( - this.node.services, - function(service, next) { - if (service.resetMempoolIndex) { - service.resetMempoolIndex(next); - } else { - setImmediate(next); - } - }, - callback - ); -}; - /** * Will collect all database operations for a block from other services that implement * `blockHandler` methods and then save operations to the database. @@ -745,15 +727,8 @@ DB.prototype.sync = function() { } if (self.node.services.bitcoind.isSynced()) { - self.runAllMempoolIndexes(function(err) { - if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - - self.bitcoindSyncing = false; - self.node.emit('synced'); - }); + self.bitcoindSyncing = false; + self.node.emit('synced'); } else { self.bitcoindSyncing = false; } diff --git a/test/services/db.unit.js b/test/services/db.unit.js index 0d5ea257..bf4f0c4e 100644 --- a/test/services/db.unit.js +++ b/test/services/db.unit.js @@ -839,7 +839,6 @@ describe('DB Service', function() { var blockBuffer = new Buffer(blockData, 'hex'); var block = Block.fromBuffer(blockBuffer); db.node.services = {}; - db.runAllMempoolIndexes = sinon.stub().callsArg(0); db.node.services.bitcoind = { getBlock: sinon.stub().callsArgWith(1, null, blockBuffer), isSynced: sinon.stub().returns(true), @@ -858,7 +857,6 @@ describe('DB Service', function() { callback(); }; db.node.once('synced', function() { - db.runAllMempoolIndexes.callCount.should.equal(1); done(); }); db.sync(); From 826114b57521ee74b870b19793d94a046166d0af Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Mon, 2 Nov 2015 17:31:41 -0500 Subject: [PATCH 6/9] Docs: Update docs for bitcoind bindings with txleave event --- docs/services/bitcoind.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/services/bitcoind.md b/docs/services/bitcoind.md index eba2b3d5..38c29d66 100644 --- a/docs/services/bitcoind.md +++ b/docs/services/bitcoind.md @@ -101,7 +101,11 @@ node.services.bitcoind.on('tip', function(blockHash) { }); node.services.bitcoind.on('tx', function(txInfo) { - // a new transaction has been broadcast in the network + // a new transaction has entered the mempool +}); + +node.services.bitcoind.on('txleave', function(txInfo) { + // a new transaction has left the mempool }); ``` @@ -110,7 +114,7 @@ The `txInfo` object will have the format: ```js { buffer: , - mempool: true, + mempool: true, // will currently always be true hash: '7426c707d0e9705bdd8158e60983e37d0f5d63529086d6672b07d9238d5aa623' } ``` From fccd6197c6146ff927d1615928992e45e57682ab Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Mon, 2 Nov 2015 18:02:00 -0500 Subject: [PATCH 7/9] Docs: Updated bitcoind event docs to be less ambiguous --- docs/services/bitcoind.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/docs/services/bitcoind.md b/docs/services/bitcoind.md index 38c29d66..922965c4 100644 --- a/docs/services/bitcoind.md +++ b/docs/services/bitcoind.md @@ -104,7 +104,7 @@ node.services.bitcoind.on('tx', function(txInfo) { // a new transaction has entered the mempool }); -node.services.bitcoind.on('txleave', function(txInfo) { +node.services.bitcoind.on('txleave', function(txLeaveInfo) { // a new transaction has left the mempool }); ``` @@ -118,3 +118,12 @@ The `txInfo` object will have the format: hash: '7426c707d0e9705bdd8158e60983e37d0f5d63529086d6672b07d9238d5aa623' } ``` + +The `txLeaveInfo` object will have the format: + +```js +{ + buffer: , + hash: '7426c707d0e9705bdd8158e60983e37d0f5d63529086d6672b07d9238d5aa623' +} +``` From c5c8e21c6c48dda6927318a75322a6bd9e61eeac Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Tue, 3 Nov 2015 10:26:32 -0500 Subject: [PATCH 8/9] Address: Fixed bug with isSpent confusion with prevTxId and txid --- lib/services/address/index.js | 3 ++- test/services/address/index.unit.js | 37 ++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index fa315217..4477e1ba 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -1309,7 +1309,8 @@ AddressService.prototype.isSpent = function(output, options, callback) { var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid; var spent = self.node.services.bitcoind.isSpent(txid, output.outputIndex); if (!spent && queryMempool) { - var spentIndexSyncKey = this._encodeSpentIndexSyncKey(output.prevTxId, output.outputIndex); + var txidBuffer = new Buffer(txid, 'hex'); + var spentIndexSyncKey = this._encodeSpentIndexSyncKey(txidBuffer, output.outputIndex); spent = self.mempoolSpentIndex[spentIndexSyncKey] ? true : false; } setImmediate(function() { diff --git a/test/services/address/index.unit.js b/test/services/address/index.unit.js index c3bdcdee..d69fa63d 100644 --- a/test/services/address/index.unit.js +++ b/test/services/address/index.unit.js @@ -1545,16 +1545,47 @@ describe('Address Service', function() { } } }; - it('should give true if bitcoind.isSpent gives true', function(done) { + it('should give true if bitcoind.isSpent gives true (with output info)', function(done) { var am = new AddressService({ mempoolMemoryIndex: true, node: testnode }); + var isSpent = sinon.stub().returns(true); am.node.services.bitcoind = { - isSpent: sinon.stub().returns(true), + isSpent: isSpent, on: sinon.stub() }; - am.isSpent({}, {}, function(spent) { + var output = { + txid: '4228d3f41051f914f71a1dcbbe4098e29a07cc2672fdadab0763d88ffd6ffa57', + outputIndex: 3 + }; + am.isSpent(output, {}, function(spent) { + isSpent.callCount.should.equal(1); + isSpent.args[0][0].should.equal(output.txid); + isSpent.args[0][1].should.equal(output.outputIndex); + spent.should.equal(true); + done(); + }); + }); + it('should give true if bitcoind.isSpent gives true (with input)', function(done) { + var am = new AddressService({ + mempoolMemoryIndex: true, + node: testnode + }); + var isSpent = sinon.stub().returns(true); + am.node.services.bitcoind = { + isSpent: isSpent, + on: sinon.stub() + }; + var txid = '4228d3f41051f914f71a1dcbbe4098e29a07cc2672fdadab0763d88ffd6ffa57'; + var output = { + prevTxId: new Buffer(txid, 'hex'), + outputIndex: 4 + }; + am.isSpent(output, {}, function(spent) { + isSpent.callCount.should.equal(1); + isSpent.args[0][0].should.equal(txid); + isSpent.args[0][1].should.equal(output.outputIndex); spent.should.equal(true); done(); }); From 0ea035c4f0cd2d1cdd1d47c022cfd23c36522ccb Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Tue, 3 Nov 2015 17:11:35 -0500 Subject: [PATCH 9/9] Address: Fixed race condition with transaction event handlers --- lib/services/address/index.js | 48 +++++++++++++++++++++-------- test/services/address/index.unit.js | 22 ++++++++----- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 4477e1ba..a557ba24 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -223,8 +223,10 @@ AddressService.prototype.transactionLeaveHandler = function(txInfo) { * @param {Buffer} txInfo.buffer - The transaction buffer * @param {Boolean} txInfo.mempool - If the transaction was accepted in the mempool * @param {String} txInfo.hash - The hash of the transaction + * @param {Function} [callback] - Optional callback */ -AddressService.prototype.transactionHandler = function(txInfo) { +AddressService.prototype.transactionHandler = function(txInfo, callback) { + var self = this; // Basic transaction format is handled by the daemon // and we can safely assume the buffer is properly formatted. @@ -237,15 +239,31 @@ AddressService.prototype.transactionHandler = function(txInfo) { this.transactionOutputHandler(messages, tx, i, !txInfo.mempool); } - // Update mempool index - if (txInfo.mempool) { - this.updateMempoolIndex(tx, true); + if (!callback) { + callback = function(err) { + if (err) { + return log.error(err); + } + }; } - for (var key in messages) { - this.transactionEventHandler(messages[key]); - this.balanceEventHandler(null, messages[key].addressInfo); + function finish(err) { + if (err) { + return callback(err); + } + for (var key in messages) { + self.transactionEventHandler(messages[key]); + self.balanceEventHandler(null, messages[key].addressInfo); + } + callback(); } + + if (txInfo.mempool) { + self.updateMempoolIndex(tx, true, finish); + } else { + setImmediate(finish); + } + }; /** @@ -254,7 +272,7 @@ AddressService.prototype.transactionHandler = function(txInfo) { * @param {Transaction} - An instance of a Bitcore Transaction * @param {Boolean} - Add/remove from the index */ -AddressService.prototype.updateMempoolIndex = function(tx, add) { +AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { /* jshint maxstatements: 100 */ var operations = []; @@ -362,11 +380,15 @@ AddressService.prototype.updateMempoolIndex = function(tx, add) { } - this.mempoolIndex.batch(operations, function(err) { - if (err) { - return log.error(err); - } - }); + if (!callback) { + callback = function(err) { + if (err) { + return log.error(err); + } + }; + } + + this.mempoolIndex.batch(operations, callback); }; diff --git a/test/services/address/index.unit.js b/test/services/address/index.unit.js index d69fa63d..e7660649 100644 --- a/test/services/address/index.unit.js +++ b/test/services/address/index.unit.js @@ -357,24 +357,30 @@ describe('Address Service', function() { }); describe('#transactionHandler', function() { - it('will pass outputs to transactionOutputHandler and call transactionEventHandler and balanceEventHandler', function() { + it('will pass outputs to transactionOutputHandler and call transactionEventHandler and balanceEventHandler', function(done) { var txBuf = new Buffer('01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000', 'hex'); - var am = new AddressService({ + var am1 = new AddressService({ mempoolMemoryIndex: true, node: mocknode }); var address = '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX'; var message = {}; - am.transactionOutputHandler = function(messages) { + am1.transactionOutputHandler = function(messages) { messages[address] = message; }; - am.transactionEventHandler = sinon.spy(); - am.balanceEventHandler = sinon.spy(); - am.transactionHandler({ + am1.transactionEventHandler = sinon.stub(); + am1.balanceEventHandler = sinon.stub(); + am1.transactionHandler({ buffer: txBuf + }, function(err) { + if (err) { + throw err; + } + am1.transactionEventHandler.callCount.should.equal(1); + am1.balanceEventHandler.callCount.should.equal(1); + done(); }); - am.transactionEventHandler.callCount.should.equal(1); - am.balanceEventHandler.callCount.should.equal(1); + }); });