Merge pull request #721 from matiu/opt/fast-balance-cache
Opt/fast balance cache
This commit is contained in:
commit
a1ec459311
|
@ -16,6 +16,7 @@
|
||||||
"maxlen": 120,
|
"maxlen": 120,
|
||||||
"maxparams": 4,
|
"maxparams": 4,
|
||||||
"maxstatements": 15,
|
"maxstatements": 15,
|
||||||
|
"no-extra-semi": 0,
|
||||||
"mocha": true,
|
"mocha": true,
|
||||||
"newcap": true,
|
"newcap": true,
|
||||||
"noarg": true,
|
"noarg": true,
|
||||||
|
|
|
@ -96,6 +96,12 @@ Defaults.CONFIRMATIONS_TO_START_CACHING = 6 * 6; // ~ 6hrs
|
||||||
// Number of addresses from which tx history is enabled in a wallet
|
// Number of addresses from which tx history is enabled in a wallet
|
||||||
Defaults.HISTORY_CACHE_ADDRESS_THRESOLD = 100;
|
Defaults.HISTORY_CACHE_ADDRESS_THRESOLD = 100;
|
||||||
|
|
||||||
|
// Number of addresses from which balance in cache for a few seconds
|
||||||
|
Defaults.BALANCE_CACHE_ADDRESS_THRESOLD = Defaults.HISTORY_CACHE_ADDRESS_THRESOLD;
|
||||||
|
|
||||||
|
Defaults.BALANCE_CACHE_DIRECT_DURATION = 60;
|
||||||
|
Defaults.BALANCE_CACHE_DURATION = 10;
|
||||||
|
|
||||||
// Cache time for blockchain height (in seconds)
|
// Cache time for blockchain height (in seconds)
|
||||||
Defaults.BLOCKHEIGHT_CACHE_TIME = 10 * 60;
|
Defaults.BLOCKHEIGHT_CACHE_TIME = 10 * 60;
|
||||||
|
|
||||||
|
|
101
lib/server.js
101
lib/server.js
|
@ -1243,35 +1243,65 @@ WalletService.prototype._totalizeUtxos = function(utxos) {
|
||||||
|
|
||||||
WalletService.prototype._getBalanceFromAddresses = function(opts, cb, i) {
|
WalletService.prototype._getBalanceFromAddresses = function(opts, cb, i) {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
var opts = opts || {};
|
var opts = opts || {};
|
||||||
|
opts.addresses = opts.addresses || [];
|
||||||
|
|
||||||
|
|
||||||
|
function checkBalanceCache(cb) {
|
||||||
|
if (opts.addresses.length < Defaults.BALANCE_CACHE_ADDRESS_THRESOLD || !opts.fastCache)
|
||||||
|
return cb();
|
||||||
|
|
||||||
|
self.storage.checkAndUseBalanceCache(self.walletId, opts.addresses, opts.fastCache, cb);
|
||||||
|
};
|
||||||
|
|
||||||
|
function storeBalanceCache(balance, cb) {
|
||||||
|
if (opts.addresses.length < Defaults.BALANCE_CACHE_ADDRESS_THRESOLD)
|
||||||
|
return cb(null, balance);
|
||||||
|
|
||||||
|
self.storage.storeBalanceCache(self.walletId, opts.addresses, balance, function(err) {
|
||||||
|
if (err)
|
||||||
|
log.warn('Could not save cache:',err);
|
||||||
|
|
||||||
|
return cb(null, balance);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
// This lock is to prevent server starvation on big wallets
|
// This lock is to prevent server starvation on big wallets
|
||||||
self._runLocked(cb, function(cb) {
|
self._runLocked(cb, function(cb) {
|
||||||
self._getUtxosForCurrentWallet({
|
checkBalanceCache(function(err, cache) {
|
||||||
coin: opts.coin,
|
|
||||||
addresses: opts.addresses
|
|
||||||
}, function(err, utxos) {
|
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
|
|
||||||
var balance = self._totalizeUtxos(utxos);
|
if (cache) {
|
||||||
|
log.info('Using UTXO Cache');
|
||||||
|
return cb(null, cache, true);
|
||||||
|
}
|
||||||
|
|
||||||
// Compute balance by address
|
self._getUtxosForCurrentWallet({
|
||||||
var byAddress = {};
|
coin: opts.coin,
|
||||||
_.each(_.indexBy(_.sortBy(utxos, 'address'), 'address'), function(value, key) {
|
addresses: opts.addresses
|
||||||
byAddress[key] = {
|
}, function(err, utxos) {
|
||||||
address: key,
|
if (err) return cb(err);
|
||||||
path: value.path,
|
|
||||||
amount: 0,
|
var balance = self._totalizeUtxos(utxos);
|
||||||
};
|
|
||||||
|
// Compute balance by address
|
||||||
|
var byAddress = {};
|
||||||
|
_.each(_.indexBy(_.sortBy(utxos, 'address'), 'address'), function(value, key) {
|
||||||
|
byAddress[key] = {
|
||||||
|
address: key,
|
||||||
|
path: value.path,
|
||||||
|
amount: 0,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
_.each(utxos, function(utxo) {
|
||||||
|
byAddress[utxo.address].amount += utxo.satoshis;
|
||||||
|
});
|
||||||
|
|
||||||
|
balance.byAddress = _.values(byAddress);
|
||||||
|
|
||||||
|
storeBalanceCache(balance, cb);
|
||||||
});
|
});
|
||||||
|
|
||||||
_.each(utxos, function(utxo) {
|
|
||||||
byAddress[utxo.address].amount += utxo.satoshis;
|
|
||||||
});
|
|
||||||
|
|
||||||
balance.byAddress = _.values(byAddress);
|
|
||||||
return cb(null, balance);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
@ -1281,10 +1311,17 @@ WalletService.prototype._getBalanceOneStep = function(opts, cb) {
|
||||||
|
|
||||||
self.storage.fetchAddresses(self.walletId, function(err, addresses) {
|
self.storage.fetchAddresses(self.walletId, function(err, addresses) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
|
|
||||||
|
if (addresses.length == opts.alreadyQueriedLength) {
|
||||||
|
log.info('Query Skipped, all active addresses');
|
||||||
|
return cb(null,null, true);
|
||||||
|
}
|
||||||
|
|
||||||
self._getBalanceFromAddresses({
|
self._getBalanceFromAddresses({
|
||||||
coin: opts.coin,
|
coin: opts.coin,
|
||||||
addresses: addresses
|
addresses: addresses,
|
||||||
}, function(err, balance) {
|
fastCache: opts.fastCache,
|
||||||
|
}, function(err, balance, cacheUsed) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
|
|
||||||
// Update cache
|
// Update cache
|
||||||
|
@ -1293,7 +1330,7 @@ WalletService.prototype._getBalanceOneStep = function(opts, cb) {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.warn('Could not update wallet cache', err);
|
log.warn('Could not update wallet cache', err);
|
||||||
}
|
}
|
||||||
return cb(null, balance);
|
return cb(null, balance, cacheUsed);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -1367,10 +1404,10 @@ WalletService.prototype.getBalance = function(opts, cb, i) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!opts.twoStep) {
|
if (!opts.twoStep) {
|
||||||
|
opts.fastCache = Defaults.BALANCE_CACHE_DIRECT_DURATION;
|
||||||
return self._getBalanceOneStep(opts, cb);
|
return self._getBalanceOneStep(opts, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
self.storage.getTwoStepCache(self.walletId, function(err, twoStepCache) {
|
self.storage.getTwoStepCache(self.walletId, function(err, twoStepCache) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
twoStepCache = twoStepCache || {};
|
twoStepCache = twoStepCache || {};
|
||||||
|
@ -1392,9 +1429,9 @@ WalletService.prototype.getBalance = function(opts, cb, i) {
|
||||||
self._getBalanceFromAddresses({
|
self._getBalanceFromAddresses({
|
||||||
coin: opts.coin,
|
coin: opts.coin,
|
||||||
addresses: activeAddresses
|
addresses: activeAddresses
|
||||||
}, function(err, partialBalance) {
|
}, function(err, partialBalance, cacheUsed) {
|
||||||
if (err) return cb(err);
|
if (err) return cb(err);
|
||||||
cb(null, partialBalance);
|
cb(null, partialBalance, cacheUsed);
|
||||||
|
|
||||||
var now = Math.floor(Date.now() / 1000);
|
var now = Math.floor(Date.now() / 1000);
|
||||||
|
|
||||||
|
@ -1406,13 +1443,19 @@ WalletService.prototype.getBalance = function(opts, cb, i) {
|
||||||
|
|
||||||
setTimeout(function() {
|
setTimeout(function() {
|
||||||
log.debug('Running full balance query');
|
log.debug('Running full balance query');
|
||||||
self._getBalanceOneStep(opts, function(err, fullBalance) {
|
|
||||||
|
opts.alreadyQueriedLength = activeAddresses.length;
|
||||||
|
opts.fastCache = Defaults.BALANCE_CACHE_DURATION;
|
||||||
|
|
||||||
|
self._getBalanceOneStep(opts, function(err, fullBalance, skipped) {
|
||||||
if (err) return;
|
if (err) return;
|
||||||
if (!_.isEqual(partialBalance, fullBalance)) {
|
if (!skipped && !_.isEqual(partialBalance, fullBalance)) {
|
||||||
log.info('Balance in active addresses differs from final balance');
|
log.info('Balance in active addresses differs from final balance');
|
||||||
self._notify('BalanceUpdated', fullBalance, {
|
self._notify('BalanceUpdated', fullBalance, {
|
||||||
isGlobal: true
|
isGlobal: true
|
||||||
});
|
});
|
||||||
|
} else if (skipped) {
|
||||||
|
return;
|
||||||
} else {
|
} else {
|
||||||
// updates cache
|
// updates cache
|
||||||
twoStepCache.lastEmpty = now;
|
twoStepCache.lastEmpty = now;
|
||||||
|
|
|
@ -7,7 +7,7 @@ var log = require('npmlog');
|
||||||
log.debug = log.verbose;
|
log.debug = log.verbose;
|
||||||
log.disableColor();
|
log.disableColor();
|
||||||
var util = require('util');
|
var util = require('util');
|
||||||
|
var Bitcore = require('bitcore-lib');
|
||||||
var mongodb = require('mongodb');
|
var mongodb = require('mongodb');
|
||||||
|
|
||||||
var Model = require('./model');
|
var Model = require('./model');
|
||||||
|
@ -550,7 +550,7 @@ Storage.prototype.fetchAddressByCoin = function(coin, address, cb) {
|
||||||
if (!result || _.isEmpty(result)) return cb();
|
if (!result || _.isEmpty(result)) return cb();
|
||||||
if (result.length > 1) {
|
if (result.length > 1) {
|
||||||
result = _.find(result, function(address) {
|
result = _.find(result, function(address) {
|
||||||
return coin == (address.coin || Defaults.COIN);
|
return coin == (address.coin || 'btc');
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
result = _.first(result);
|
result = _.first(result);
|
||||||
|
@ -1065,5 +1065,65 @@ Storage.prototype._dump = function(cb, fn) {
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
Storage.prototype._addressHash = function(addresses) {
|
||||||
|
var all = addresses.join();
|
||||||
|
return Bitcore.crypto.Hash.ripemd160(new Buffer(all)).toString('hex');
|
||||||
|
};
|
||||||
|
|
||||||
|
Storage.prototype.checkAndUseBalanceCache = function(walletId, addresses, duration, cb) {
|
||||||
|
var self = this;
|
||||||
|
var key = self._addressHash(addresses);
|
||||||
|
var now = Date.now();
|
||||||
|
|
||||||
|
|
||||||
|
self.db.collection(collections.CACHE).findOne({
|
||||||
|
walletId: walletId || key,
|
||||||
|
type: 'balanceCache',
|
||||||
|
key: key,
|
||||||
|
}, function(err, ret) {
|
||||||
|
if (err) return cb(err);
|
||||||
|
if (!ret) return cb();
|
||||||
|
|
||||||
|
var validFor = ret.ts + duration * 1000 - now;
|
||||||
|
|
||||||
|
if (validFor > 0) {
|
||||||
|
log.debug('','Using Balance Cache valid for %d ms more', validFor);
|
||||||
|
cb(null, ret.result);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
cb();
|
||||||
|
|
||||||
|
log.debug('','Balance cache expired, deleting');
|
||||||
|
self.db.collection(collections.CACHE).remove({
|
||||||
|
walletId: walletId,
|
||||||
|
type: 'balanceCache',
|
||||||
|
key: key,
|
||||||
|
}, {}, function() {});
|
||||||
|
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
Storage.prototype.storeBalanceCache = function (walletId, addresses, balance, cb) {
|
||||||
|
var key = this._addressHash(addresses);
|
||||||
|
var now = Date.now();
|
||||||
|
|
||||||
|
this.db.collection(collections.CACHE).update({
|
||||||
|
walletId: walletId || key,
|
||||||
|
type: 'balanceCache',
|
||||||
|
key: key,
|
||||||
|
}, {
|
||||||
|
"$set":
|
||||||
|
{
|
||||||
|
ts: now,
|
||||||
|
result: balance,
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
w: 1,
|
||||||
|
upsert: true,
|
||||||
|
}, cb);
|
||||||
|
};
|
||||||
|
|
||||||
Storage.collections = collections;
|
Storage.collections = collections;
|
||||||
module.exports = Storage;
|
module.exports = Storage;
|
||||||
|
|
|
@ -2617,6 +2617,109 @@ describe('Wallet service', function() {
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
describe('#getBalance fast cache', function() {
|
||||||
|
var server, wallet, clock;
|
||||||
|
var _old = Defaults.BALANCE_CACHE_ADDRESS_THRESOLD;
|
||||||
|
beforeEach(function(done) {
|
||||||
|
clock = sinon.useFakeTimers(Date.now(), 'Date');
|
||||||
|
Defaults.BALANCE_CACHE_ADDRESS_THRESOLD = 0;
|
||||||
|
|
||||||
|
helpers.createAndJoinWallet(1, 1, function(s, w) {
|
||||||
|
server = s;
|
||||||
|
wallet = w;
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
afterEach(function() {
|
||||||
|
clock.restore();
|
||||||
|
Defaults.BALANCE_CACHE_ADDRESS_THRESOLD = _old;
|
||||||
|
});
|
||||||
|
|
||||||
|
function checkBalance(balance) {
|
||||||
|
should.exist(balance);
|
||||||
|
balance.totalAmount.should.equal(helpers.toSatoshi(6));
|
||||||
|
should.exist(balance.byAddress);
|
||||||
|
balance.byAddress.length.should.equal(2);
|
||||||
|
balance.byAddress[1].amount.should.equal(helpers.toSatoshi(2));
|
||||||
|
};
|
||||||
|
|
||||||
|
it('should get balance from insight and store cache', function(done) {
|
||||||
|
helpers.stubUtxos(server, wallet, [1, 'u2', 3], function() {
|
||||||
|
server.getBalance({
|
||||||
|
twoStep: false
|
||||||
|
}, function(err, balance, cacheUsed) {
|
||||||
|
should.not.exist(err);
|
||||||
|
should.not.exist(cacheUsed);
|
||||||
|
checkBalance(balance);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get balance from cache', function(done) {
|
||||||
|
helpers.stubUtxos(server, wallet, [1, 'u2', 3], function() {
|
||||||
|
server.getBalance({
|
||||||
|
twoStep: false
|
||||||
|
}, function(err, balance, cacheUsed) {
|
||||||
|
should.not.exist(err);
|
||||||
|
should.not.exist(cacheUsed);
|
||||||
|
server.getBalance({
|
||||||
|
twoStep: false
|
||||||
|
}, function(err, balance, cacheUsed) {
|
||||||
|
should.not.exist(err);
|
||||||
|
cacheUsed.should.equal(true);
|
||||||
|
checkBalance(balance);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
it('should not get balance from cache, after X secs, on a direct hit', function(done) {
|
||||||
|
helpers.stubUtxos(server, wallet, [1, 'u2', 3], function() {
|
||||||
|
server.getBalance({
|
||||||
|
twoStep: false
|
||||||
|
}, function(err, balance, cacheUsed) {
|
||||||
|
should.not.exist(err);
|
||||||
|
should.not.exist(cacheUsed);
|
||||||
|
clock.tick(( Defaults.BALANCE_CACHE_DIRECT_DURATION +1) * 1000);
|
||||||
|
server.getBalance({
|
||||||
|
twoStep: false
|
||||||
|
}, function(err, balance, cacheUsed) {
|
||||||
|
should.not.exist(err);
|
||||||
|
should.not.exist(cacheUsed);
|
||||||
|
checkBalance(balance);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
it('should not get balance from cache, after X secs, on a twostep hit', function(done) {
|
||||||
|
helpers.stubUtxos(server, wallet, [1, 'u2', 3], function() {
|
||||||
|
server.getBalance({
|
||||||
|
twoStep: false
|
||||||
|
}, function(err, balance, cacheUsed) {
|
||||||
|
should.not.exist(err);
|
||||||
|
should.not.exist(cacheUsed);
|
||||||
|
clock.tick(( Defaults.BALANCE_CACHE_DIRECT_DURATION - 1) * 1000);
|
||||||
|
server.getBalance({
|
||||||
|
twoStep: true
|
||||||
|
}, function(err, balance, cacheUsed) {
|
||||||
|
should.not.exist(err);
|
||||||
|
should.not.exist(cacheUsed);
|
||||||
|
checkBalance(balance);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
describe('#getFeeLevels', function() {
|
describe('#getFeeLevels', function() {
|
||||||
var server, wallet, levels;
|
var server, wallet, levels;
|
||||||
before(function() {
|
before(function() {
|
||||||
|
@ -7495,7 +7598,7 @@ describe('Wallet service', function() {
|
||||||
function(server, wallet, next) {
|
function(server, wallet, next) {
|
||||||
server.scan({}, function(err) {
|
server.scan({}, function(err) {
|
||||||
should.not.exist(err);
|
should.not.exist(err);
|
||||||
server.getBalance(wallet.id, function(err, balance) {
|
server.getBalance({}, function(err, balance) {
|
||||||
balance.totalAmount.should.equal(helpers.toSatoshi(6));
|
balance.totalAmount.should.equal(helpers.toSatoshi(6));
|
||||||
next();
|
next();
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue