Address Service: Use streams to combine inputs and outputs

This commit is contained in:
Braydon Fuller 2016-01-06 19:14:45 -05:00
parent cef2f7686d
commit 8298e380ed
2 changed files with 207 additions and 123 deletions

View File

@ -2,6 +2,7 @@
var bitcore = require('bitcore-lib');
var async = require('async');
var CombinedStream = require('./streams/combined');
var _ = bitcore.deps._;
/**
@ -19,7 +20,6 @@ function AddressHistory(args) {
} else {
this.addresses = [args.addresses];
}
this.transactionInfo = [];
this.combinedArray = [];
this.detailedArray = [];
}
@ -35,129 +35,47 @@ AddressHistory.prototype.get = function(callback) {
var self = this;
var totalCount;
async.eachLimit(
self.addresses,
AddressHistory.MAX_ADDRESS_QUERIES,
function(address, next) {
self.getTransactionInfo(address, next);
},
function(err) {
if (err) {
return callback(err);
}
self.combineTransactionInfo();
totalCount = Number(self.combinedArray.length);
self.sortAndPaginateCombinedArray();
async.eachSeries(
self.combinedArray,
function(txInfo, next) {
self.getDetailedInfo(txInfo, next);
},
function(err) {
if (err) {
return callback(err);
}
callback(null, {
totalCount: totalCount,
items: self.detailedArray
});
}
);
}
);
};
/**
* This function will retrieve input and output information for an address
* and set the property `this.transactionInfo`.
* @param {String} address - A base58check encoded address
* @param {Function} next
*/
AddressHistory.prototype.getTransactionInfo = function(address, next) {
var self = this;
var args = {
start: self.options.start,
end: self.options.end,
queryMempool: _.isUndefined(self.options.queryMempool) ? true : self.options.queryMempool
};
var outputs;
var inputs;
async.parallel([
function(done) {
self.node.services.address.getOutputs(address, args, function(err, result) {
if (err) {
return done(err);
}
outputs = result;
done();
});
},
function(done) {
self.node.services.address.getInputs(address, args, function(err, result) {
if (err) {
return done(err);
}
inputs = result;
done();
});
}
], function(err) {
if (err) {
return next(err);
}
self.transactionInfo = self.transactionInfo.concat(outputs, inputs);
next();
});
};
/**
* This function combines results from getInputs and getOutputs at
* `this.transactionInfo` to be "txid" unique at `this.combinedArray`.
*/
AddressHistory.prototype.combineTransactionInfo = function() {
var combinedArrayMap = {};
this.combinedArray = [];
var l = this.transactionInfo.length;
for(var i = 0; i < l; i++) {
var item = this.transactionInfo[i];
var mapKey = item.txid;
if (combinedArrayMap[mapKey] >= 0) {
var combined = this.combinedArray[combinedArrayMap[mapKey]];
if (!combined.addresses[item.address]) {
combined.addresses[item.address] = {
outputIndexes: [],
inputIndexes: []
};
}
if (item.outputIndex >= 0) {
combined.satoshis += item.satoshis;
combined.addresses[item.address].outputIndexes.push(item.outputIndex);
} else if (item.inputIndex >= 0) {
combined.addresses[item.address].inputIndexes.push(item.inputIndex);
}
} else {
item.addresses = {};
item.addresses[item.address] = {
outputIndexes: [],
inputIndexes: []
};
if (item.outputIndex >= 0) {
item.addresses[item.address].outputIndexes.push(item.outputIndex);
} else if (item.inputIndex >= 0) {
item.addresses[item.address].inputIndexes.push(item.inputIndex);
}
delete item.outputIndex;
delete item.inputIndex;
delete item.address;
this.combinedArray.push(item);
combinedArrayMap[mapKey] = this.combinedArray.length - 1;
}
// TODO: handle multiple addresses (restore previous functionality)
if (self.addresses.length > 1) {
return callback('Only single address queries supported currently');
}
var address = self.addresses[0];
var combinedStream = new CombinedStream({
inputStream: this.node.services.address.createInputsStream(address, this.options),
outputStream: this.node.services.address.createOutputsStream(address, this.options)
});
// Results from the transaction info stream are grouped into
// sets based on block height
combinedStream.on('data', function(block) {
self.combinedArray = self.combinedArray.concat(block);
});
combinedStream.on('end', function() {
totalCount = Number(self.combinedArray.length);
self.sortAndPaginateCombinedArray();
// TODO: Add the mempool transactions
async.eachSeries(
self.combinedArray,
function(txInfo, next) {
self.getDetailedInfo(txInfo, next);
},
function(err) {
if (err) {
return callback(err);
}
callback(null, {
totalCount: totalCount,
items: self.detailedArray
});
}
);
});
};
/**

View File

@ -0,0 +1,166 @@
'use strict';
var ReadableStream = require('stream').Readable;
var inherits = require('util').inherits;
function TransactionInfoStream(options) {
ReadableStream.call(this, {
objectMode: true
});
// TODO: Be able to specify multiple input and output streams
// so that it's possible to query multiple addresses at the same time.
this._inputStream = options.inputStream;
this._outputStream = options.outputStream;
// This holds a collection of combined inputs and outputs
// grouped into the matching block heights.
this._blocks = {};
this._inputCurrentHeight = 0;
this._outputCurrentHeight = 0;
this._inputFinishedHeights = [];
this._outputFinishedHeights = [];
this._inputEnded = false;
this._outputEnded = false;
this._listenStreamEvents();
}
inherits(TransactionInfoStream, ReadableStream);
TransactionInfoStream.prototype._listenStreamEvents = function() {
var self = this;
self._inputStream.on('data', function(input) {
self._addToBlock(input);
if (input.height > self._inputCurrentHeight) {
self._inputFinishedHeights.push(input.height);
}
self._inputCurrentHeight = input.height;
self._maybePushBlock();
});
self._outputStream.on('data', function(output) {
self._addToBlock(output);
if (output.height > self._outputCurrentHeight) {
self._outputFinishedHeights.push(output.height);
}
self._outputCurrentHeight = output.height;
self._maybePushBlock();
});
self._inputStream.on('end', function() {
self._inputFinishedHeights.push(self._inputCurrentHeight);
self._inputEnded = true;
self._maybeEndStream();
});
self._outputStream.on('end', function() {
self._outputFinishedHeights.push(self._outputCurrentHeight);
self._outputEnded = true;
self._maybeEndStream();
});
};
TransactionInfoStream.prototype._read = function() {
this._inputStream.resume();
this._outputStream.resume();
};
TransactionInfoStream.prototype._addToBlock = function(data) {
if (!this._blocks[data.height]) {
this._blocks[data.height] = [];
}
this._blocks[data.height].push(data);
};
TransactionInfoStream.prototype._maybeEndStream = function() {
if (this._inputEnded && this._outputEnded) {
this._pushRemainingBlocks();
this.push(null);
}
};
TransactionInfoStream.prototype._pushRemainingBlocks = function() {
var keys = Object.keys(this._blocks);
for (var i = 0; i < keys.length; i++) {
this.push(this._blocks[keys[i]]);
delete this._blocks[keys[i]];
}
};
TransactionInfoStream.prototype._combineTransactionInfo = function(transactionInfo) {
var combinedArrayMap = {};
var combinedArray = [];
var l = transactionInfo.length;
for(var i = 0; i < l; i++) {
var item = transactionInfo[i];
var mapKey = item.txid;
if (combinedArrayMap[mapKey] >= 0) {
var combined = combinedArray[combinedArrayMap[mapKey]];
if (!combined.addresses[item.address]) {
combined.addresses[item.address] = {
outputIndexes: [],
inputIndexes: []
};
}
if (item.outputIndex >= 0) {
combined.satoshis += item.satoshis;
combined.addresses[item.address].outputIndexes.push(item.outputIndex);
} else if (item.inputIndex >= 0) {
combined.addresses[item.address].inputIndexes.push(item.inputIndex);
}
} else {
item.addresses = {};
item.addresses[item.address] = {
outputIndexes: [],
inputIndexes: []
};
if (item.outputIndex >= 0) {
item.addresses[item.address].outputIndexes.push(item.outputIndex);
} else if (item.inputIndex >= 0) {
item.addresses[item.address].inputIndexes.push(item.inputIndex);
}
delete item.outputIndex;
delete item.inputIndex;
delete item.address;
combinedArray.push(item);
combinedArrayMap[mapKey] = combinedArray.length - 1;
}
}
return combinedArray;
};
TransactionInfoStream.prototype._maybePushBlock = function() {
if (!this._inputFinishedHeights[0] && !this._outputFinishedHeights[0]) {
return;
}
var inputFinished = this._inputFinishedHeights[0];
var outputFinished = this._outputFinishedHeights[0];
var bothFinished;
if (inputFinished === outputFinished) {
bothFinished = inputFinished;
this._inputFinishedHeights.shift();
this._outputFinishedHeights.shift();
} else if (inputFinished <= outputFinished) {
bothFinished = inputFinished;
this._inputFinishedHeights.shift();
} else if (outputFinished <= inputFinished) {
bothFinished = outputFinished;
this._outputFinishedHeights.shift();
}
if (bothFinished) {
var block = this._combineTransactionInfo(this._blocks[bothFinished]);
this.push(block);
delete this._blocks[bothFinished];
//this._inputStream.pause();
//this._outputStream.pause();
}
};
module.exports = TransactionInfoStream;