Address Service: Use address summary cache for pagination

This commit is contained in:
Braydon Fuller 2016-01-06 21:20:10 -05:00
parent 8298e380ed
commit 5c4f3c4453
5 changed files with 133 additions and 222 deletions

View File

@ -45,7 +45,7 @@ exports.SUMMARY_CACHE_THRESHOLD = 10000;
// The default maximum length queries
exports.MAX_INPUTS_QUERY_LENGTH = 50000;
exports.MAX_OUTPUTS_QUERY_LENGTH = 50000;
exports.MAX_HISTORY_QUERY_LENGTH = 1000;
module.exports = exports;

View File

@ -181,8 +181,12 @@ exports.encodeSummaryCacheValue = function(cache, tipHeight) {
buffer.writeDoubleBE(cache.result.totalReceived, 4);
buffer.writeDoubleBE(cache.result.balance, 12);
var txidBuffers = [];
for (var key in cache.result.appearanceIds) {
txidBuffers.push(new Buffer(key, 'hex'));
for (var i = 0; i < cache.result.txids.length; i++) {
var buf = new Buffer(new Array(36));
var txid = cache.result.txids[i];
buf.write(txid, 'hex');
buf.writeUInt32BE(cache.result.appearanceIds[txid], 32);
txidBuffers.push(buf);
}
var txidsBuffer = Buffer.concat(txidBuffers);
var value = Buffer.concat([buffer, txidsBuffer]);
@ -198,17 +202,21 @@ exports.decodeSummaryCacheValue = function(buffer) {
// read 32 byte chunks until exhausted
var appearanceIds = {};
var pos = 16;
var txids = [];
var pos = 20;
while(pos < buffer.length) {
var txid = buffer.slice(pos, pos + 32).toString('hex');
appearanceIds[txid] = true;
pos += 32;
var txidHeight = buffer.readUInt32BE(pos + 32);
txids.push(txid);
appearanceIds[txid] = txidHeight;
pos += 36;
}
var cache = {
height: height,
result: {
appearanceIds: appearanceIds,
txids: txids,
totalReceived: totalReceived,
balance: balance,
unconfirmedAppearanceIds: {}, // unconfirmed values are never stored in cache

View File

@ -2,9 +2,10 @@
var bitcore = require('bitcore-lib');
var async = require('async');
var CombinedStream = require('./streams/combined');
var _ = bitcore.deps._;
var constants = require('./constants');
/**
* This represents an instance that keeps track of data over a series of
* asynchronous I/O calls to get the transaction history for a group of
@ -20,7 +21,21 @@ function AddressHistory(args) {
} else {
this.addresses = [args.addresses];
}
this.combinedArray = [];
this.maxHistoryQueryLength = constants.MAX_HISTORY_QUERY_LENGTH;
this.addressStrings = [];
for (var i = 0; i < this.addresses.length; i++) {
var address = this.addresses[i];
if (address instanceof bitcore.Address) {
this.addressStrings.push(address.toString());
} else if (_.isString(address)) {
this.addressStrings.push(address);
} else {
throw new TypeError('Addresses are expected to be strings');
}
}
this.detailedArray = [];
}
@ -42,28 +57,33 @@ AddressHistory.prototype.get = function(callback) {
var address = self.addresses[0];
var combinedStream = new CombinedStream({
inputStream: this.node.services.address.createInputsStream(address, this.options),
outputStream: this.node.services.address.createOutputsStream(address, this.options)
});
this.node.services.address.getAddressSummary(address, this.options, function(err, summary) {
if (err) {
return callback(err);
}
// Results from the transaction info stream are grouped into
// sets based on block height
combinedStream.on('data', function(block) {
self.combinedArray = self.combinedArray.concat(block);
});
totalCount = summary.txids.length;
combinedStream.on('end', function() {
totalCount = Number(self.combinedArray.length);
// TODO: Make sure txids are sorted by height and time
var fromOffset = summary.txids.length - self.options.from;
var toOffset = summary.txids.length - self.options.to;
var txids = summary.txids.slice(toOffset, fromOffset);
self.sortAndPaginateCombinedArray();
// Verify that this query isn't too long
if (txids.length > self.maxHistoryQueryLength) {
return callback(new Error(
'Maximum length query (' + self.maxAddressQueryLength + ') exceeded for addresses:' +
this.address.join(',')
));
}
// TODO: Add the mempool transactions
// Reverse to include most recent at the top
txids.reverse();
async.eachSeries(
self.combinedArray,
function(txInfo, next) {
self.getDetailedInfo(txInfo, next);
txids,
function(txid, next) {
self.getDetailedInfo(txid, next);
},
function(err) {
if (err) {
@ -75,12 +95,15 @@ AddressHistory.prototype.get = function(callback) {
});
}
);
});
};
/**
* A helper function to sort and slice/paginate the `combinedArray`
*/
// TODO: Remove once txids summary results are verified to be sorted
AddressHistory.prototype.sortAndPaginateCombinedArray = function() {
this.combinedArray.sort(AddressHistory.sortByHeight);
if (!_.isUndefined(this.options.from) && !_.isUndefined(this.options.to)) {
@ -94,6 +117,7 @@ AddressHistory.prototype.sortAndPaginateCombinedArray = function() {
* @param {Object} a - An item from the `combinedArray`
* @param {Object} b
*/
// TODO: Remove once txids summary results are verified to be sorted
AddressHistory.sortByHeight = function(a, b) {
if (a.height < 0 && b.height < 0) {
// Both are from the mempool, compare timestamps
@ -123,12 +147,12 @@ AddressHistory.sortByHeight = function(a, b) {
* @param {Object} txInfo - An item from the `combinedArray`
* @param {Function} next
*/
AddressHistory.prototype.getDetailedInfo = function(txInfo, next) {
AddressHistory.prototype.getDetailedInfo = function(txid, next) {
var self = this;
var queryMempool = _.isUndefined(self.options.queryMempool) ? true : self.options.queryMempool;
self.node.services.db.getTransactionWithBlockInfo(
txInfo.txid,
txid,
queryMempool,
function(err, transaction) {
if (err) {
@ -136,13 +160,15 @@ AddressHistory.prototype.getDetailedInfo = function(txInfo, next) {
}
transaction.populateInputs(self.node.services.db, [], function(err) {
if(err) {
if (err) {
return next(err);
}
var addressDetails = self.getAddressDetailsForTransaction(transaction);
self.detailedArray.push({
addresses: txInfo.addresses,
satoshis: self.getSatoshisDetail(transaction, txInfo),
addresses: addressDetails.addresses,
satoshis: addressDetails.satoshis,
height: transaction.__height,
confirmations: self.getConfirmationsDetail(transaction),
timestamp: transaction.__timestamp,
@ -169,23 +195,52 @@ AddressHistory.prototype.getConfirmationsDetail = function(transaction) {
return confirmations;
};
/**
* A helper function for `getDetailedInfo` for getting the satoshis.
* @param {Transaction} transaction - A transaction populated with previous outputs
* @param {Object} txInfo - An item from `combinedArray`
*/
AddressHistory.prototype.getSatoshisDetail = function(transaction, txInfo) {
var satoshis = txInfo.satoshis || 0;
AddressHistory.prototype.getAddressDetailsForTransaction = function(transaction) {
var result = {
addresses: {},
satoshis: 0
};
for(var address in txInfo.addresses) {
if (txInfo.addresses[address].inputIndexes.length >= 0) {
for(var j = 0; j < txInfo.addresses[address].inputIndexes.length; j++) {
satoshis -= transaction.inputs[txInfo.addresses[address].inputIndexes[j]].output.satoshis;
for (var inputIndex = 0; inputIndex < transaction.inputs.length; inputIndex++) {
var input = transaction.inputs[inputIndex];
if (!input.script) {
continue;
}
var inputAddress = input.script.toAddress(this.node.network);
if (inputAddress && this.addressStrings.indexOf(inputAddress.toString()) > 0) {
if (!result.addresses[inputAddress]) {
result.addresses[inputAddress] = {
inputIndexes: [],
outputIndexes: []
};
} else {
result.addresses[inputAddress].inputIndexes.push(inputIndex);
}
result.satoshis -= input.output.satoshis;
}
}
return satoshis;
for (var outputIndex = 0; outputIndex < transaction.outputs.length; outputIndex++) {
var output = transaction.outputs[outputIndex];
if (!output.script) {
continue;
}
var outputAddress = output.script.toAddress(this.node.network);
if (outputAddress && this.addressStrings.indexOf(outputAddress.toString()) > 0) {
if (!result.addresses[outputAddress]) {
result.addresses[outputAddress] = {
inputIndexes: [],
outputIndexes: []
};
} else {
result.addresses[outputAddress].inputIndexes.push(outputIndex);
}
result.satoshis += output.satoshis;
}
}
return result;
};
module.exports = AddressHistory;

View File

@ -763,7 +763,7 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options
* @param {Number} [options.end] - The relevant end block height
* @param {Function} callback
*/
AddressService.prototype.createInputsStream = function(addressStr, options, callback) {
AddressService.prototype.createInputsStream = function(addressStr, options) {
var inputStream = new InputsTransformStream({
address: new Address(addressStr, this.node.network),
@ -1331,6 +1331,9 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb
function(cache, next) {
self._getAddressOutputsSummary(address, cache, tipHeight, next);
},
function(cache, next) {
self._sortTxids(cache, tipHeight, next);
},
function(cache, next) {
self._saveAddressSummaryCache(address, cache, tipHeight, next);
}
@ -1340,7 +1343,7 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb
}
var result = cache.result;
var confirmedTxids = Object.keys(result.appearanceIds);
var confirmedTxids = result.txids;
var unconfirmedTxids = Object.keys(result.unconfirmedAppearanceIds);
var summary = {
@ -1360,16 +1363,7 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb
}
if (!options.noTxList) {
var txids = confirmedTxids.concat(unconfirmedTxids);
// sort by height
summary.txids = txids.sort(function(a, b) {
return a.height > b.height ? 1 : -1;
}).map(function(obj) {
return obj.txid;
}).filter(function(value, index, self) {
return self.indexOf(value) === index;
});
summary.txids = confirmedTxids.concat(unconfirmedTxids);
}
callback(null, summary);
@ -1378,8 +1372,22 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb
};
AddressService.prototype._sortTxids = function(cache, tipHeight, callback) {
if (cache.height === tipHeight) {
return callback(null, cache);
}
cache.result.txids = Object.keys(cache.result.appearanceIds);
cache.result.txids.sort(function(a, b) {
return cache.result.appearanceIds[a] - cache.result.appearanceIds[b];
});
callback(null, cache);
};
AddressService.prototype._saveAddressSummaryCache = function(address, cache, tipHeight, callback) {
var transactionLength = Object.keys(cache.result.appearanceIds).length;
if (cache.height === tipHeight) {
return callback(null, cache);
}
var transactionLength = cache.result.txids.length;
var exceedsCacheThreshold = (transactionLength > this.summaryCacheThreshold);
if (exceedsCacheThreshold) {
log.info('Saving address summary cache for: ' + address.toString() + 'at height: ' + tipHeight);
@ -1422,6 +1430,9 @@ AddressService.prototype._getAddressSummaryCache = function(address, callback) {
};
AddressService.prototype._getAddressInputsSummary = function(address, cache, tipHeight, callback) {
if (cache.height === tipHeight) {
return callback(null, cache);
}
$.checkArgument(address instanceof Address);
var self = this;
@ -1435,7 +1446,7 @@ AddressService.prototype._getAddressInputsSummary = function(address, cache, tip
var inputsStream = self.createInputsStream(address, opts);
inputsStream.on('data', function(input) {
var txid = input.txid;
cache.result.appearanceIds[txid] = true;
cache.result.appearanceIds[txid] = input.height;
});
inputsStream.on('error', function(err) {
@ -1462,6 +1473,9 @@ AddressService.prototype._getAddressInputsSummary = function(address, cache, tip
};
AddressService.prototype._getAddressOutputsSummary = function(address, cache, tipHeight, callback) {
if (cache.height === tipHeight) {
return callback(null, cache);
}
$.checkArgument(address instanceof Address);
$.checkArgument(!_.isUndefined(cache.result) &&
!_.isUndefined(cache.result.appearanceIds) &&
@ -1484,7 +1498,7 @@ AddressService.prototype._getAddressOutputsSummary = function(address, cache, ti
// Bitcoind's isSpent only works for confirmed transactions
var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex);
cache.result.totalReceived += output.satoshis;
cache.result.appearanceIds[txid] = true;
cache.result.appearanceIds[txid] = output.height;
if (!spentDB) {
cache.result.balance += output.satoshis;

View File

@ -1,166 +0,0 @@
'use strict';
var ReadableStream = require('stream').Readable;
var inherits = require('util').inherits;
function TransactionInfoStream(options) {
ReadableStream.call(this, {
objectMode: true
});
// TODO: Be able to specify multiple input and output streams
// so that it's possible to query multiple addresses at the same time.
this._inputStream = options.inputStream;
this._outputStream = options.outputStream;
// This holds a collection of combined inputs and outputs
// grouped into the matching block heights.
this._blocks = {};
this._inputCurrentHeight = 0;
this._outputCurrentHeight = 0;
this._inputFinishedHeights = [];
this._outputFinishedHeights = [];
this._inputEnded = false;
this._outputEnded = false;
this._listenStreamEvents();
}
inherits(TransactionInfoStream, ReadableStream);
TransactionInfoStream.prototype._listenStreamEvents = function() {
var self = this;
self._inputStream.on('data', function(input) {
self._addToBlock(input);
if (input.height > self._inputCurrentHeight) {
self._inputFinishedHeights.push(input.height);
}
self._inputCurrentHeight = input.height;
self._maybePushBlock();
});
self._outputStream.on('data', function(output) {
self._addToBlock(output);
if (output.height > self._outputCurrentHeight) {
self._outputFinishedHeights.push(output.height);
}
self._outputCurrentHeight = output.height;
self._maybePushBlock();
});
self._inputStream.on('end', function() {
self._inputFinishedHeights.push(self._inputCurrentHeight);
self._inputEnded = true;
self._maybeEndStream();
});
self._outputStream.on('end', function() {
self._outputFinishedHeights.push(self._outputCurrentHeight);
self._outputEnded = true;
self._maybeEndStream();
});
};
TransactionInfoStream.prototype._read = function() {
this._inputStream.resume();
this._outputStream.resume();
};
TransactionInfoStream.prototype._addToBlock = function(data) {
if (!this._blocks[data.height]) {
this._blocks[data.height] = [];
}
this._blocks[data.height].push(data);
};
TransactionInfoStream.prototype._maybeEndStream = function() {
if (this._inputEnded && this._outputEnded) {
this._pushRemainingBlocks();
this.push(null);
}
};
TransactionInfoStream.prototype._pushRemainingBlocks = function() {
var keys = Object.keys(this._blocks);
for (var i = 0; i < keys.length; i++) {
this.push(this._blocks[keys[i]]);
delete this._blocks[keys[i]];
}
};
TransactionInfoStream.prototype._combineTransactionInfo = function(transactionInfo) {
var combinedArrayMap = {};
var combinedArray = [];
var l = transactionInfo.length;
for(var i = 0; i < l; i++) {
var item = transactionInfo[i];
var mapKey = item.txid;
if (combinedArrayMap[mapKey] >= 0) {
var combined = combinedArray[combinedArrayMap[mapKey]];
if (!combined.addresses[item.address]) {
combined.addresses[item.address] = {
outputIndexes: [],
inputIndexes: []
};
}
if (item.outputIndex >= 0) {
combined.satoshis += item.satoshis;
combined.addresses[item.address].outputIndexes.push(item.outputIndex);
} else if (item.inputIndex >= 0) {
combined.addresses[item.address].inputIndexes.push(item.inputIndex);
}
} else {
item.addresses = {};
item.addresses[item.address] = {
outputIndexes: [],
inputIndexes: []
};
if (item.outputIndex >= 0) {
item.addresses[item.address].outputIndexes.push(item.outputIndex);
} else if (item.inputIndex >= 0) {
item.addresses[item.address].inputIndexes.push(item.inputIndex);
}
delete item.outputIndex;
delete item.inputIndex;
delete item.address;
combinedArray.push(item);
combinedArrayMap[mapKey] = combinedArray.length - 1;
}
}
return combinedArray;
};
TransactionInfoStream.prototype._maybePushBlock = function() {
if (!this._inputFinishedHeights[0] && !this._outputFinishedHeights[0]) {
return;
}
var inputFinished = this._inputFinishedHeights[0];
var outputFinished = this._outputFinishedHeights[0];
var bothFinished;
if (inputFinished === outputFinished) {
bothFinished = inputFinished;
this._inputFinishedHeights.shift();
this._outputFinishedHeights.shift();
} else if (inputFinished <= outputFinished) {
bothFinished = inputFinished;
this._inputFinishedHeights.shift();
} else if (outputFinished <= inputFinished) {
bothFinished = outputFinished;
this._outputFinishedHeights.shift();
}
if (bothFinished) {
var block = this._combineTransactionInfo(this._blocks[bothFinished]);
this.push(block);
delete this._blocks[bothFinished];
//this._inputStream.pause();
//this._outputStream.pause();
}
};
module.exports = TransactionInfoStream;