refactor statistics to filter at the db level
This commit is contained in:
parent
da3df6449b
commit
544f916f4f
262
lib/stats.js
262
lib/stats.js
|
@ -3,155 +3,143 @@
|
|||
'use strict';
|
||||
|
||||
var _ = require('lodash');
|
||||
var $ = require('preconditions').singleton();
|
||||
var async = require('async');
|
||||
var log = require('npmlog');
|
||||
log.debug = log.verbose;
|
||||
log.disableColor();
|
||||
var mongodb = require('mongodb');
|
||||
var moment = require('moment');
|
||||
var async = require('async');
|
||||
|
||||
var config = require('../config');
|
||||
var storage = require('./storage');
|
||||
|
||||
function Stats(opts) {
|
||||
opts = opts || {};
|
||||
|
||||
var c = config.storageOpts.mongoDb;
|
||||
var url = c.uri;
|
||||
var startDate = moment();
|
||||
var endDate = moment();
|
||||
|
||||
var stats = {};
|
||||
var wallets = {};
|
||||
var bwsStats = {};
|
||||
|
||||
bwsStats.cleanUp = function() {
|
||||
stats = {
|
||||
'livenet': {},
|
||||
'testnet': {}
|
||||
};
|
||||
this.network = opts.network || 'livenet';
|
||||
this.from = moment(opts.from || '2015-01-01');
|
||||
this.to = moment(opts.to);
|
||||
this.fromTs = Math.floor(this.from.startOf('day').valueOf() / 1000);
|
||||
this.toTs = Math.floor(this.to.endOf('day').valueOf() / 1000);
|
||||
};
|
||||
|
||||
Stats.prototype.run = function(cb) {
|
||||
var self = this;
|
||||
|
||||
bwsStats.AddingWalletToCache = function(data) {
|
||||
if (!data) return;
|
||||
wallets[data.id] = data.network;
|
||||
};
|
||||
|
||||
bwsStats.TotalNewWallets = function(data) {
|
||||
if (!data) return;
|
||||
var day = moment(data.createdOn * 1000).format('YYYYMMDD');
|
||||
if (!stats[data.network][day]) {
|
||||
stats[data.network][day] = {
|
||||
totalTx: 0,
|
||||
totalAmount: 0,
|
||||
totalNewWallets: 0
|
||||
};
|
||||
}
|
||||
stats[data.network][day].totalNewWallets++;
|
||||
};
|
||||
|
||||
bwsStats.TotalTxps = function(data) {
|
||||
if (!data) return;
|
||||
var day = moment(data.createdOn * 1000).format('YYYYMMDD');
|
||||
var network = wallets[data.walletId];
|
||||
if (!stats[network][day]) {
|
||||
stats[network][day] = {
|
||||
totalTx: 0,
|
||||
totalAmount: 0,
|
||||
totalNewWallets: 0
|
||||
};
|
||||
}
|
||||
stats[network][day].totalTx++;
|
||||
stats[network][day].totalAmount += data.amount;
|
||||
};
|
||||
|
||||
bwsStats.ProcessData = function(DB, cb) {
|
||||
bwsStats.ProccesWallets(DB, function() {
|
||||
bwsStats.ProccesNewWallets(DB, function() {
|
||||
bwsStats.ProccesTxs(DB, function() {
|
||||
DB.close();
|
||||
cb();
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
bwsStats.ProccesWallets = function(DB, cb) {
|
||||
var collection = DB.collection('wallets');
|
||||
collection.find({}).toArray(function(err, items) {
|
||||
var uri = config.storageOpts.mongoDb.uri;
|
||||
mongodb.MongoClient.connect(uri, function(err, db) {
|
||||
if (err) {
|
||||
console.log('Error.', err);
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
items.forEach(function(it) {
|
||||
bwsStats.AddingWalletToCache(it);
|
||||
});
|
||||
cb();
|
||||
});
|
||||
};
|
||||
|
||||
bwsStats.ProccesNewWallets = function(DB, cb) {
|
||||
var collection = DB.collection('wallets');
|
||||
var start = Math.floor(startDate.startOf('day').valueOf() / 1000);
|
||||
var end = Math.floor(endDate.endOf('day').valueOf() / 1000);
|
||||
|
||||
collection.find({
|
||||
createdOn: {
|
||||
$gt: start,
|
||||
$lt: end
|
||||
}
|
||||
}).toArray(function(err, items) {
|
||||
if (err) {
|
||||
console.log('Error.', err);
|
||||
return cb(err);
|
||||
}
|
||||
items.forEach(function(it) {
|
||||
bwsStats.TotalNewWallets(it);
|
||||
});
|
||||
cb();
|
||||
});
|
||||
};
|
||||
|
||||
bwsStats.ProccesTxs = function(DB, cb) {
|
||||
var collection = DB.collection('txs');
|
||||
var start = Math.floor(startDate.startOf('day').valueOf() / 1000);
|
||||
var end = Math.floor(endDate.endOf('day').valueOf() / 1000);
|
||||
|
||||
collection.find({
|
||||
createdOn: {
|
||||
$gt: start,
|
||||
$lt: end
|
||||
}
|
||||
}).toArray(
|
||||
function(err, items) {
|
||||
if (err) {
|
||||
console.log('Error.', err);
|
||||
return cb(err);
|
||||
} else {
|
||||
items.forEach(function(it) {
|
||||
bwsStats.TotalTxps(it);
|
||||
});
|
||||
}
|
||||
cb();
|
||||
});
|
||||
};
|
||||
|
||||
bwsStats.getStats = function(opts, cb) {
|
||||
if (opts) {
|
||||
startDate = moment(opts.from);
|
||||
endDate = moment(opts.to);
|
||||
}
|
||||
bwsStats.cleanUp();
|
||||
|
||||
mongodb.MongoClient.connect(url, function(err, db) {
|
||||
if (err) {
|
||||
console.log('Unable to connect to the mongoDB server. Error:', err);
|
||||
log.error('Unable to connect to the mongoDB', err);
|
||||
return cb(err, null);
|
||||
}
|
||||
console.log('Connection established to ', url);
|
||||
bwsStats.ProcessData(db, function(err) {
|
||||
if (err) {
|
||||
console.log('Error.', err);
|
||||
return cb(err, null);
|
||||
}
|
||||
cb(null, stats);
|
||||
log.info('Connection established to ' + uri);
|
||||
self.db = db;
|
||||
self._getStats(function(err, stats) {
|
||||
if (err) return cb(err);
|
||||
return cb(null, stats);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = bwsStats;
|
||||
Stats.prototype._getStats = function(cb) {
|
||||
var self = this;
|
||||
var result = {};
|
||||
async.parallel([
|
||||
|
||||
function(next) {
|
||||
self._getNewWallets(next);
|
||||
},
|
||||
function(next) {
|
||||
self._getTxProposals(next);
|
||||
},
|
||||
], function(err, results) {
|
||||
if (err) return cb(err);
|
||||
|
||||
result.newWallets = results[0];
|
||||
return cb(null, result);
|
||||
});
|
||||
};
|
||||
|
||||
Stats.prototype._countBy = function(data, key) {
|
||||
return _.map(_.groupBy(data, key), function(v, k) {
|
||||
var item = {};
|
||||
item[key] = k;
|
||||
item['count'] = v.length;
|
||||
return item;
|
||||
});
|
||||
};
|
||||
|
||||
Stats.prototype._sumBy = function(data, key, attr) {
|
||||
return _.map(_.groupBy(data, key), function(v, k) {
|
||||
var item = {};
|
||||
item[key] = k;
|
||||
item[attr] = _.reduce(v, function(memo, x) {
|
||||
return memo + x[attr];
|
||||
}, 0);
|
||||
return item;
|
||||
});
|
||||
};
|
||||
|
||||
Stats.prototype._getNewWallets = function(cb) {
|
||||
var self = this;
|
||||
|
||||
self.db.collection(storage.collections.WALLETS)
|
||||
.find({
|
||||
network: self.network,
|
||||
createdOn: {
|
||||
$gte: self.fromTs,
|
||||
$lte: self.toTs,
|
||||
},
|
||||
})
|
||||
.toArray(function(err, wallets) {
|
||||
if (err) return cb(err);
|
||||
|
||||
var data = _.map(wallets, function(wallet) {
|
||||
return {
|
||||
day: moment(wallet.createdOn * 1000).format('YYYYMMDD'),
|
||||
type: wallet.m + '-of-' + wallet.n,
|
||||
};
|
||||
});
|
||||
|
||||
var stats = {
|
||||
byDay: self._countBy(data, 'day'),
|
||||
byType: self._countBy(data, 'type'),
|
||||
};
|
||||
|
||||
return cb(null, stats);
|
||||
});
|
||||
};
|
||||
|
||||
Stats.prototype._getTxProposals = function(cb) {
|
||||
var self = this;
|
||||
|
||||
self.db.collection(storage.collections.TXS)
|
||||
.find({
|
||||
network: self.network,
|
||||
status: 'broadcasted',
|
||||
createdOn: {
|
||||
$gte: self.fromTs,
|
||||
$lte: self.toTs,
|
||||
},
|
||||
})
|
||||
.toArray(function(err, txps) {
|
||||
if (err) return cb(err);
|
||||
|
||||
var data = _.map(txps, function(txp) {
|
||||
return {
|
||||
day: moment(txp.createdOn * 1000).format('YYYYMMDD'),
|
||||
amount: txp.amount,
|
||||
};
|
||||
});
|
||||
|
||||
var stats = {
|
||||
nbByDay: self._countBy(data, 'day'),
|
||||
amountByDay: self._sumBy(data, 'day', 'amount'),
|
||||
};
|
||||
|
||||
return cb(null, stats);
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = Stats;
|
||||
|
|
|
@ -426,4 +426,5 @@ Storage.prototype._dump = function(cb, fn) {
|
|||
});
|
||||
};
|
||||
|
||||
Storage.collections = collections;
|
||||
module.exports = Storage;
|
||||
|
|
Loading…
Reference in New Issue