Merge pull request #533 from isocolsky/ref/stats

Improve stats performance
This commit is contained in:
Matias Alejo Garcia 2016-06-23 08:36:14 -03:00 committed by GitHub
commit 21afe4d9b7
1 changed files with 220 additions and 69 deletions

View File

@ -14,14 +14,17 @@ var moment = require('moment');
var config = require('../config');
var storage = require('./storage');
var INITIAL_DATE = '2015-01-01';
function Stats(opts) {
opts = opts || {};
this.network = opts.network || 'livenet';
this.from = moment(opts.from || '2015-01-01');
this.from = moment(opts.from || INITIAL_DATE);
this.to = moment(opts.to);
this.fromTs = Math.floor(this.from.startOf('day').valueOf() / 1000);
this.toTs = Math.floor(this.to.endOf('day').valueOf() / 1000);
this.fromTs = this.from.startOf('day').valueOf();
this.toTs = this.to.endOf('day').valueOf();
};
Stats.prototype.run = function(cb) {
@ -62,90 +65,238 @@ Stats.prototype._getStats = function(cb) {
});
};
Stats.prototype._countBy = function(data, key) {
return _.map(_.groupBy(data, key), function(v, k) {
var item = {};
item[key] = k;
item['count'] = v.length;
return item;
});
};
Stats.prototype._sumBy = function(data, key, attr) {
return _.map(_.groupBy(data, key), function(v, k) {
var item = {};
item[key] = k;
item[attr] = _.reduce(v, function(memo, x) {
return memo + x[attr];
}, 0);
return item;
});
};
Stats.prototype._getNewWallets = function(cb) {
var self = this;
self.db.collection(storage.collections.WALLETS)
.find({
network: self.network,
createdOn: {
$gte: self.fromTs,
$lte: self.toTs,
},
})
.toArray(function(err, wallets) {
if (err) return cb(err);
var data = _.map(wallets, function(wallet) {
return {
day: moment(wallet.createdOn * 1000).format('YYYYMMDD'),
type: (wallet.m == 1 && wallet.n == 1) ? 'personal' : 'shared',
config: wallet.m + '-of-' + wallet.n,
};
function getLastDate(cb) {
self.db.collection('stats_wallets')
.find({})
.sort({
'_id.day': -1
})
.limit(1)
.toArray(function(err, lastRecord) {
if (_.isEmpty(lastRecord)) return cb(null, moment(INITIAL_DATE));
return cb(null, moment(lastRecord[0]._id.day));
});
};
var stats = {
byDay: self._countBy(data, 'day'),
byConfig: self._countBy(data, 'config'),
function updateStats(from, cb) {
var to = moment().subtract(1, 'day').endOf('day');
var map = function() {
var day = new Date(this.createdOn * 1000);
day.setHours(0);
day.setMinutes(0);
day.setSeconds(0);
var key = {
day: +day,
network: this.network,
};
stats.byTypeThenDay = _.groupBy(data, 'type');
_.each(stats.byTypeThenDay, function(v, k) {
stats.byTypeThenDay[k] = self._countBy(v, 'day');
var value = {
count: 1
};
emit(key, value);
};
var reduce = function(k, v) {
var count = 0;
for (var i = 0; i < v.length; i++) {
count += v[i].count;
}
return {
count: count,
};
};
var opts = {
query: {
createdOn: {
$gt: from.unix(),
$lte: to.unix(),
},
},
out: {
merge: 'stats_wallets',
}
};
self.db.collection(storage.collections.WALLETS)
.mapReduce(map, reduce, opts, function(err, collection, stats) {
return cb(err);
});
};
return cb(null, stats);
function queryStats(cb) {
self.db.collection('stats_wallets')
.find({
'_id.network': self.network,
'_id.day': {
$gte: self.fromTs,
$lte: self.toTs,
},
})
.sort({
'_id.day': 1
})
.toArray(function(err, results) {
if (err) return cb(err);
var stats = {};
stats.byDay = _.map(results, function(record) {
var day = moment(record._id.day).format('YYYYMMDD');
return {
day: day,
count: record.value.count,
};
});
return cb(null, stats);
});
};
async.series([
function(next) {
getLastDate(function(err, lastDate) {
if (err) return next(err);
lastDate = lastDate.startOf('day');
var yesterday = moment().subtract(1, 'day').startOf('day');
if (lastDate.isBefore(yesterday)) {
// Needs update
return updateStats(lastDate, next);
}
next();
});
},
function(next) {
queryStats(next);
},
],
function(err, res) {
if (err) {
log.error(err);
}
return cb(err, res[1]);
});
};
Stats.prototype._getTxProposals = function(cb) {
var self = this;
self.db.collection(storage.collections.TXS)
.find({
network: self.network,
status: 'broadcasted',
createdOn: {
$gte: self.fromTs,
$lte: self.toTs,
},
})
.toArray(function(err, txps) {
if (err) return cb(err);
var data = _.map(txps, function(txp) {
return {
day: moment(txp.createdOn * 1000).format('YYYYMMDD'),
amount: txp.amount,
};
function getLastDate(cb) {
self.db.collection('stats_txps')
.find({})
.sort({
'_id.day': -1
})
.limit(1)
.toArray(function(err, lastRecord) {
if (_.isEmpty(lastRecord)) return cb(null, moment(INITIAL_DATE));
return cb(null, moment(lastRecord[0]._id.day));
});
};
var stats = {
nbByDay: self._countBy(data, 'day'),
amountByDay: self._sumBy(data, 'day', 'amount'),
function updateStats(from, cb) {
var to = moment().subtract(1, 'day').endOf('day');
var map = function() {
var day = new Date(this.broadcastedOn * 1000);
day.setHours(0);
day.setMinutes(0);
day.setSeconds(0);
var key = {
day: +day,
network: this.network,
};
var value = {
count: 1,
amount: this.amount
};
emit(key, value);
};
var reduce = function(k, v) {
var count = 0,
amount = 0;
for (var i = 0; i < v.length; i++) {
count += v[i].count;
amount += v[i].amount;
}
return {
count: count,
amount: amount,
};
};
var opts = {
query: {
status: 'broadcasted',
broadcastedOn: {
$gt: from.unix(),
$lte: to.unix(),
},
},
out: {
merge: 'stats_txps',
}
};
self.db.collection(storage.collections.TXS)
.mapReduce(map, reduce, opts, function(err, collection, stats) {
return cb(err);
});
};
return cb(null, stats);
function queryStats(cb) {
self.db.collection('stats_txps')
.find({
'_id.network': self.network,
'_id.day': {
$gte: self.fromTs,
$lte: self.toTs,
},
})
.sort({
'_id.day': 1
})
.toArray(function(err, results) {
if (err) return cb(err);
var stats = {
nbByDay: [],
amountByDay: []
};
_.each(results, function(record) {
var day = moment(record._id.day).format('YYYYMMDD');
stats.nbByDay.push({
day: day,
count: record.value.count,
});
stats.amountByDay.push({
day: day,
amount: record.value.amount,
});
});
return cb(null, stats);
});
};
async.series([
function(next) {
getLastDate(function(err, lastDate) {
if (err) return next(err);
lastDate = lastDate.startOf('day');
var yesterday = moment().subtract(1, 'day').startOf('day');
if (lastDate.isBefore(yesterday)) {
// Needs update
return updateStats(lastDate, next);
}
next();
});
},
function(next) {
queryStats(next);
},
],
function(err, res) {
if (err) {
log.error(err);
}
return cb(err, res[1]);
});
};