From a743f1949ade12de6e8a4687b9cf9544599c55e5 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Tue, 21 Jun 2016 16:59:07 -0300 Subject: [PATCH 1/2] refactor using mapReduce --- lib/stats.js | 277 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 208 insertions(+), 69 deletions(-) diff --git a/lib/stats.js b/lib/stats.js index f96063b..9b7295e 100644 --- a/lib/stats.js +++ b/lib/stats.js @@ -14,14 +14,17 @@ var moment = require('moment'); var config = require('../config'); var storage = require('./storage'); + +var INITIAL_DATE = '2015-01-01'; + function Stats(opts) { opts = opts || {}; this.network = opts.network || 'livenet'; - this.from = moment(opts.from || '2015-01-01'); + this.from = moment(opts.from || INITIAL_DATE); this.to = moment(opts.to); - this.fromTs = Math.floor(this.from.startOf('day').valueOf() / 1000); - this.toTs = Math.floor(this.to.endOf('day').valueOf() / 1000); + this.fromTs = this.from.startOf('day').valueOf(); + this.toTs = this.to.endOf('day').valueOf(); }; Stats.prototype.run = function(cb) { @@ -62,90 +65,226 @@ Stats.prototype._getStats = function(cb) { }); }; -Stats.prototype._countBy = function(data, key) { - return _.map(_.groupBy(data, key), function(v, k) { - var item = {}; - item[key] = k; - item['count'] = v.length; - return item; - }); -}; - -Stats.prototype._sumBy = function(data, key, attr) { - return _.map(_.groupBy(data, key), function(v, k) { - var item = {}; - item[key] = k; - item[attr] = _.reduce(v, function(memo, x) { - return memo + x[attr]; - }, 0); - return item; - }); -}; - Stats.prototype._getNewWallets = function(cb) { var self = this; - self.db.collection(storage.collections.WALLETS) - .find({ - network: self.network, - createdOn: { - $gte: self.fromTs, - $lte: self.toTs, - }, - }) - .toArray(function(err, wallets) { - if (err) return cb(err); - - var data = _.map(wallets, function(wallet) { - return { - day: moment(wallet.createdOn * 1000).format('YYYYMMDD'), - type: (wallet.m == 1 && wallet.n == 1) ? 'personal' : 'shared', - config: wallet.m + '-of-' + wallet.n, - }; + function getLastDate(cb) { + self.db.collection('stats_wallets') + .find({}) + .sort({ + '_id.day': -1 + }) + .limit(1) + .toArray(function(err, lastRecord) { + if (_.isEmpty(lastRecord)) return cb(null, moment(INITIAL_DATE)); + return cb(null, moment(lastRecord[0]._id.day)); }); + }; - var stats = { - byDay: self._countBy(data, 'day'), - byConfig: self._countBy(data, 'config'), + function updateStats(from, cb) { + var map = function() { + var day = new Date(this.createdOn * 1000); + day.setHours(0); + day.setMinutes(0); + day.setSeconds(0); + var key = { + day: +day, + network: this.network, }; - - stats.byTypeThenDay = _.groupBy(data, 'type'); - _.each(stats.byTypeThenDay, function(v, k) { - stats.byTypeThenDay[k] = self._countBy(v, 'day'); + var value = 1; + emit(key, value); + }; + var reduce = function(k, v) { + return v.length; + }; + var opts = { + query: { + createdOn: { + $gt: from.unix(), + }, + }, + out: { + merge: 'stats_wallets', + } + }; + self.db.collection(storage.collections.WALLETS) + .mapReduce(map, reduce, opts, function(err, collection, stats) { + return cb(err); }); + }; - return cb(null, stats); + function queryStats(cb) { + self.db.collection('stats_wallets') + .find({ + '_id.network': self.network, + '_id.day': { + $gte: self.fromTs, + $lte: self.toTs, + }, + }) + .sort({ + '_id.day': 1 + }) + .toArray(function(err, results) { + if (err) return cb(err); + var stats = _.map(results, function(record) { + var day = moment(record._id.day).format('YYYYMMDD'); + return { + day: day, + count: record.value + }; + }); + return cb(null, stats); + }); + }; + + async.series([ + + function(next) { + getLastDate(function(err, lastDate) { + if (err) return next(err); + + lastDate = lastDate.startOf('day'); + var yesterday = moment().subtract(1, 'day').startOf('day'); + if (lastDate.isBefore(yesterday)) { + // Needs update + return updateStats(lastDate, next); + } + next(); + }); + }, + function(next) { + queryStats(function(err, result) { + if (err) return next(err); + var stats = { + byDay: result + }; + return next(null, stats); + }); + }, + ], + function(err, res) { + return cb(err, res[1]); }); }; Stats.prototype._getTxProposals = function(cb) { var self = this; - self.db.collection(storage.collections.TXS) - .find({ - network: self.network, - status: 'broadcasted', - createdOn: { - $gte: self.fromTs, - $lte: self.toTs, - }, - }) - .toArray(function(err, txps) { - if (err) return cb(err); - - var data = _.map(txps, function(txp) { - return { - day: moment(txp.createdOn * 1000).format('YYYYMMDD'), - amount: txp.amount, - }; + function getLastDate(cb) { + self.db.collection('stats_txps') + .find({}) + .sort({ + '_id.day': -1 + }) + .limit(1) + .toArray(function(err, lastRecord) { + if (_.isEmpty(lastRecord)) return cb(null, moment(INITIAL_DATE)); + return cb(null, moment(lastRecord[0]._id.day)); }); + }; - var stats = { - nbByDay: self._countBy(data, 'day'), - amountByDay: self._sumBy(data, 'day', 'amount'), + function updateStats(from, cb) { + var map = function() { + var day = new Date(this.broadcastedOn * 1000); + day.setHours(0); + day.setMinutes(0); + day.setSeconds(0); + var key = { + day: +day, + network: this.network, }; + var value = { + count: 1, + amount: this.amount + }; + emit(key, value); + }; + var reduce = function(k, v) { + var amount = 0; + for (var i = 0; i < v.length; i++) { + amount += v[i].amount; + } + return { + count: v.length, + amount: amount, + }; + }; + var opts = { + query: { + status: 'broadcasted', + broadcastedOn: { + $gt: from.unix(), + }, + }, + out: { + merge: 'stats_txps', + } + }; + self.db.collection(storage.collections.TXS) + .mapReduce(map, reduce, opts, function(err, collection, stats) { + return cb(err); + }); + }; - return cb(null, stats); + function queryStats(cb) { + self.db.collection('stats_txps') + .find({ + '_id.network': self.network, + '_id.day': { + $gte: self.fromTs, + $lte: self.toTs, + }, + }) + .sort({ + '_id.day': 1 + }) + .toArray(function(err, results) { + if (err) return cb(err); + + var stats = { + nbByDay: [], + amountByDay: [] + }; + _.each(results, function(record) { + var day = moment(record._id.day).format('YYYYMMDD'); + stats.nbByDay.push({ + day: day, + count: record.value.count, + }); + stats.amountByDay.push({ + day: day, + amount: record.value.amount, + }); + }); + return cb(null, stats); + }); + }; + + async.series([ + + function(next) { + getLastDate(function(err, lastDate) { + if (err) return next(err); + + lastDate = lastDate.startOf('day'); + var yesterday = moment().subtract(1, 'day').startOf('day'); + if (lastDate.isBefore(yesterday)) { + // Needs update + return updateStats(lastDate, next); + } + next(); + }); + }, + function(next) { + queryStats(function(err, result) { + if (err) return next(err); + return next(null, result); + }); + }, + ], + function(err, res) { + return cb(err, res[1]); }); }; From dbea849e7627aabcd2bb15574285693f85b59f18 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 22 Jun 2016 16:46:57 -0300 Subject: [PATCH 2/2] fix last day issues + log errors --- lib/stats.js | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/lib/stats.js b/lib/stats.js index 9b7295e..bc57804 100644 --- a/lib/stats.js +++ b/lib/stats.js @@ -82,6 +82,7 @@ Stats.prototype._getNewWallets = function(cb) { }; function updateStats(from, cb) { + var to = moment().subtract(1, 'day').endOf('day'); var map = function() { var day = new Date(this.createdOn * 1000); day.setHours(0); @@ -91,16 +92,25 @@ Stats.prototype._getNewWallets = function(cb) { day: +day, network: this.network, }; - var value = 1; + var value = { + count: 1 + }; emit(key, value); }; var reduce = function(k, v) { - return v.length; + var count = 0; + for (var i = 0; i < v.length; i++) { + count += v[i].count; + } + return { + count: count, + }; }; var opts = { query: { createdOn: { $gt: from.unix(), + $lte: to.unix(), }, }, out: { @@ -127,11 +137,12 @@ Stats.prototype._getNewWallets = function(cb) { }) .toArray(function(err, results) { if (err) return cb(err); - var stats = _.map(results, function(record) { + var stats = {}; + stats.byDay = _.map(results, function(record) { var day = moment(record._id.day).format('YYYYMMDD'); return { day: day, - count: record.value + count: record.value.count, }; }); return cb(null, stats); @@ -154,16 +165,13 @@ Stats.prototype._getNewWallets = function(cb) { }); }, function(next) { - queryStats(function(err, result) { - if (err) return next(err); - var stats = { - byDay: result - }; - return next(null, stats); - }); + queryStats(next); }, ], function(err, res) { + if (err) { + log.error(err); + } return cb(err, res[1]); }); }; @@ -185,6 +193,7 @@ Stats.prototype._getTxProposals = function(cb) { }; function updateStats(from, cb) { + var to = moment().subtract(1, 'day').endOf('day'); var map = function() { var day = new Date(this.broadcastedOn * 1000); day.setHours(0); @@ -201,12 +210,14 @@ Stats.prototype._getTxProposals = function(cb) { emit(key, value); }; var reduce = function(k, v) { - var amount = 0; + var count = 0, + amount = 0; for (var i = 0; i < v.length; i++) { + count += v[i].count; amount += v[i].amount; } return { - count: v.length, + count: count, amount: amount, }; }; @@ -215,6 +226,7 @@ Stats.prototype._getTxProposals = function(cb) { status: 'broadcasted', broadcastedOn: { $gt: from.unix(), + $lte: to.unix(), }, }, out: { @@ -277,13 +289,13 @@ Stats.prototype._getTxProposals = function(cb) { }); }, function(next) { - queryStats(function(err, result) { - if (err) return next(err); - return next(null, result); - }); + queryStats(next); }, ], function(err, res) { + if (err) { + log.error(err); + } return cb(err, res[1]); }); };