Merge pull request #155 from maraoz/add/rate-limits

Add rate limits
This commit is contained in:
Gustavo Maximiliano Cortez 2014-08-22 11:59:34 -03:00
commit 7398a07c10
18 changed files with 253 additions and 157 deletions

View File

@ -88,7 +88,10 @@ INSIGHT_NETWORK [= 'livenet' | 'testnet']
INSIGHT_DB # Path where to store insight's internal DB. (defaults to $HOME/.insight)
INSIGHT_SAFE_CONFIRMATIONS=6 # Nr. of confirmation needed to start caching transaction information
INSIGHT_IGNORE_CACHE # True to ignore cache of spents in transaction, with more than INSIGHT_SAFE_CONFIRMATIONS confirmations. This is useful for tracking double spents for old transactions.
ENABLE_MESSAGE_BROKER # if "true" will enable message broker module
ENABLE_MAILBOX # if "true" will enable mailbox plugin
ENABLE_RATELIMITER # if "true" will enable the ratelimiter plugin
LOGGER_LEVEL # defaults to 'info', can be 'debug','verbose','error', etc.
ENABLE_HTTPS # if "true" it will server using SSL/HTTPS
```

View File

@ -3,76 +3,28 @@
// server-side socket behaviour
var ios = null; // io is already taken in express
var util = require('bitcore').util;
var mdb = require('../../lib/MessageDb').default();
var microtime = require('microtime');
var enableMessageBroker;
var logger = require('../../lib/logger').logger;
var verbose = false;
var log = function() {
if (verbose) {
console.log(arguments);
}
}
module.exports.init = function(io_ext, config) {
enableMessageBroker = config ? config.enableMessageBroker : false;
module.exports.init = function(io_ext) {
ios = io_ext;
if (ios) {
// when a new socket connects
ios.sockets.on('connection', function(socket) {
log('New connection from ' + socket.id);
logger.verbose('New connection from ' + socket.id);
// when it subscribes, make it join the according room
socket.on('subscribe', function(topic) {
if (socket.rooms.length === 1) {
log('subscribe to ' + topic);
socket.join(topic);
}
logger.debug('subscribe to ' + topic);
socket.join(topic);
});
if (enableMessageBroker) {
// when it requests sync, send him all pending messages
socket.on('sync', function(ts) {
log('Sync requested by ' + socket.id);
log(' from timestamp '+ts);
var rooms = socket.rooms;
if (rooms.length !== 2) {
socket.emit('insight-error', 'Must subscribe with public key before syncing');
return;
}
var to = rooms[1];
var upper_ts = Math.round(microtime.now());
log(' to timestamp '+upper_ts);
mdb.getMessages(to, ts, upper_ts, function(err, messages) {
if (err) {
throw new Error('Couldn\'t get messages on sync request: ' + err);
}
log('\tFound ' + messages.length + ' message' + (messages.length !== 1 ? 's' : ''));
for (var i = 0; i < messages.length; i++) {
broadcastMessage(messages[i], socket);
}
});
});
// disconnect handler
socket.on('disconnect', function() {
logger.verbose('disconnected ' + socket.id);
});
// when it sends a message, add it to db
socket.on('message', function(m) {
log('Message sent from ' + m.pubkey + ' to ' + m.to);
mdb.addMessage(m, function(err) {
if (err) {
throw new Error('Couldn\'t add message to database: ' + err);
}
});
});
// disconnect handler
socket.on('disconnect', function() {
log('disconnected ' + socket.id);
});
}
});
if (enableMessageBroker)
mdb.on('message', broadcastMessage);
}
return ios;
};
var simpleTx = function(tx) {
@ -118,12 +70,3 @@ module.exports.broadcastSyncInfo = function(historicSync) {
if (ios)
ios.sockets.in('sync').emit('status', historicSync);
};
var broadcastMessage = module.exports.broadcastMessage = function(message, socket) {
if (ios) {
var s = socket || ios.sockets.in(message.to);
log('sending message to ' + message.to);
s.emit('message', message);
}
}

View File

@ -76,7 +76,10 @@ var bitcoindConf = {
disableAgent: true
};
var enableMessageBroker = process.env.ENABLE_MESSAGE_BROKER === 'true';
var enableMailbox = process.env.ENABLE_MAILBOX === 'true';
var enableRatelimiter = process.env.ENABLE_RATELIMITER === 'true';
var loggerLevel = process.env.LOGGER_LEVEL || 'info';
var enableHTTPS = process.env.ENABLE_HTTPS === 'true';
if (!fs.existsSync(db)) {
var err = fs.mkdirSync(db);
@ -89,7 +92,10 @@ if (!fs.existsSync(db)) {
}
module.exports = {
enableMessageBroker: enableMessageBroker,
enableMailbox: enableMailbox,
enableRatelimiter: enableRatelimiter,
loggerLevel: loggerLevel,
enableHTTPS: enableHTTPS,
version: version,
root: rootPath,
publicPath: process.env.INSIGHT_PUBLIC_PATH || false,

View File

@ -3,9 +3,10 @@
/**
* Module dependencies.
*/
var express = require('express'),
config = require('./config'),
path = require('path');
var express = require('express');
var config = require('./config');
var path = require('path');
var logger = require('../lib/logger').logger;
module.exports = function(app, historicSync, peerSync) {
@ -22,13 +23,9 @@ module.exports = function(app, historicSync, peerSync) {
};
app.set('showStackError', true);
// Compress JSON outputs
app.set('json spaces', 0);
//Enable jsonp
app.enable('jsonp callback');
app.use(config.apiPrefix + '/sync', setHistoric);
app.use(config.apiPrefix + '/peer', setPeer);
app.use(express.logger('dev'));
@ -37,21 +34,12 @@ module.exports = function(app, historicSync, peerSync) {
app.use(express.methodOverride());
app.use(express.compress());
app.use(function(req, res, next) {
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS, PUT, PATCH, DELETE');
res.setHeader('Access-Control-Allow-Headers', 'X-Requested-With,content-type');
next();
});
if (config.publicPath) {
var staticPath = path.normalize(config.rootPath + '/../' + config.publicPath);
//IMPORTANT: for html5mode, this line must to be before app.router
app.use(express.static(staticPath));
}
// manual helpers
app.use(function(req, res, next) {
app.locals.config = config;
next();
@ -60,15 +48,10 @@ module.exports = function(app, historicSync, peerSync) {
//routes should be at the last
app.use(app.router);
//Assume "not found" in the error msgs is a 404. this is somewhat silly, but valid, you can do whatever you like, set properties, use instanceof etc.
//Assume "not found" in the error msgs is a 404
app.use(function(err, req, res, next) {
//Treat as 404
if (~err.message.indexOf('not found')) return next();
//Log it
console.error(err.stack);
//Error page
res.status(500).jsonp({
status: 500,
error: err.stack

13
config/headers.js Normal file
View File

@ -0,0 +1,13 @@
'use strict';
var logger = require('../lib/logger').logger;
module.exports = function(app) {
app.use(function(req, res, next) {
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS, PUT, PATCH, DELETE');
res.setHeader('Access-Control-Allow-Headers', 'X-Requested-With,content-type');
next();
});
};

14
etc/test-cert.pem Normal file
View File

@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICMjCCAZugAwIBAgIJAK9dKmjfxq+BMA0GCSqGSIb3DQEBCwUAMDIxCzAJBgNV
BAYTAkFSMRMwEQYDVQQIDApTb21lLVN0YXRlMQ4wDAYDVQQKDAVDb3BheTAeFw0x
NDA4MjExNzQyMTBaFw0xNDA5MjAxNzQyMTBaMDIxCzAJBgNVBAYTAkFSMRMwEQYD
VQQIDApTb21lLVN0YXRlMQ4wDAYDVQQKDAVDb3BheTCBnzANBgkqhkiG9w0BAQEF
AAOBjQAwgYkCgYEA1BbMI6V06LKoBrcf5bJ8LH7EjwqbEacIOpiY7B+8W3sAM1bB
6hA2IlPvKL3qTdhMMKFZGZMYypmlAQTI1N+VNSwJHNjyepFbtkdNytSC8qw8bhak
yt4TByYEw1NMYx7I0OOdjh/DKsS+EOIgQDT9zSB+NgErKb0mKrginwgk5XkCAwEA
AaNQME4wHQYDVR0OBBYEFM0G1agUfY4zRNfxJ+0sHV3EsoGKMB8GA1UdIwQYMBaA
FM0G1agUfY4zRNfxJ+0sHV3EsoGKMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEL
BQADgYEAOg7n1RCyB1BJ6TuF99i25H7kpGUSL57ajNyyCKDciTPmpxVJ5knAjPYa
hbXX+dlq2B8QEnfkE5FMDLkO3RS3xU8YfekIDHofDuXR9boD/4rRlsN8md2Jmkr6
MyRtYPtsPWVeoz0WmG5f1yobHmh7mYf17oN+uRJKX68s8G6b/SQ=
-----END CERTIFICATE-----

15
etc/test-key.pem Normal file
View File

@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQDUFswjpXTosqgGtx/lsnwsfsSPCpsRpwg6mJjsH7xbewAzVsHq
EDYiU+8ovepN2EwwoVkZkxjKmaUBBMjU35U1LAkc2PJ6kVu2R03K1ILyrDxuFqTK
3hMHJgTDU0xjHsjQ452OH8MqxL4Q4iBANP3NIH42ASspvSYquCKfCCTleQIDAQAB
AoGAMUzDUx3o2RZ+XGFA9uHQX39wLVfnx+itzwEduvV9kT48Q7LNDJ2MF9qu4yeS
SVoYC83Vqk45Gw8v/dag4GrAgdk1NHZZ56Z/G55m06Y45xS6ZarBdbe0N1jdZEab
RG3FgxyPSUiZ5aLIMxMMtgt/DRv9BPpIeLNDMgyQRjVWlMkCQQDzlLwkp4bo+CAY
UMcsSN+KGurEMsuF0qc/+TLqpKDoOaLtd1F+Ntn20tQqeH0YLWktFvzAgY7wYXrb
lhMuAxa7AkEA3ucGEXNqwu1qVP4fXfEN1E0Y5X/euXMsfgNG8IK82hF3h83hnqNM
3FcGFOyKnL7E5TfRlJfxhAGqUfCe+2zjWwJBAKA6CID8CkyZW1NjX4EL9q+8AQ5K
c4J2DTqRzCJ5ZLcdosUeJecmYb5w9MtzMqaCyJq2clCXaNVK6iwjzj4IHh0CQQCY
sgwvIjCtrfQcmyUjtoExwUrf1LPfuK1u+ZG8KuNyQ2rtxjTb9qQtgRPye4QNEoZR
O+a/c0MImhdyIHLYa+RnAkEAwfLD4q+FDx4eX0ANO7/PI/XiJGqi6x1cYUwyRg9o
2S6hN5RnUD/nf2HKHU0esp34UMY/UWMrodCRDZj/ijg4UA==
-----END RSA PRIVATE KEY-----

View File

@ -4,19 +4,21 @@
//Set the node enviornment variable if not set before
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
/**
* Module dependencies.
*/
var express = require('express'),
fs = require('fs'),
PeerSync = require('./lib/PeerSync'),
HistoricSync = require('./lib/HistoricSync');
var fs = require('fs');
var PeerSync = require('./lib/PeerSync');
var HistoricSync = require('./lib/HistoricSync');
var http = require('http');
var https = require('https');
var express = require('express');
var program = require('commander');
//Initializing system variables
var config = require('./config/config');
var logger = require('./lib/logger').logger;
program
.version(config.version);
// text title
/*jshint multistr: true */
console.log(
'\n\
____ _ __ __ ___ _ \n\
@ -25,8 +27,9 @@ console.log(
_/ // / / (__ ) / /_/ / / / / /_ / ___ |/ /_/ / / \n\
/___/_/ /_/____/_/\\__, /_/ /_/\\__/ /_/ |_/ .___/_/ \n\
/____/ /_/ \n\
\n\t\t\t\t\t\tv%s\n\
# Configuration:\n\
\n\t\t\t\t\t\tv%s\n', config.version);
program.on('--help', function() {
logger.info('\n# Configuration:\n\
\tINSIGHT_NETWORK (Network): %s\n\
\tINSIGHT_DB (Database Path): %s\n\
\tINSIGHT_SAFE_CONFIRMATIONS (Safe Confirmations): %s\n\
@ -43,25 +46,37 @@ console.log(
\nChange setting by assigning the enviroment variables above. Example:\n\
$ INSIGHT_NETWORK="testnet" BITCOIND_HOST="123.123.123.123" ./insight.js\
\n\n',
config.version,
config.network, config.leveldb, config.safeConfirmations, config.ignoreCache ? 'yes' : 'no',
config.bitcoind.user,
config.bitcoind.pass ? 'Yes(hidden)' : 'No',
config.bitcoind.protocol,
config.bitcoind.host,
config.bitcoind.port,
config.bitcoind.p2pPort,
config.bitcoind.dataDir + (config.network === 'testnet' ? '*' : ''), (config.network === 'testnet' ? '* (/testnet3 is added automatically)' : '')
);
config.network, config.leveldb, config.safeConfirmations, config.ignoreCache ? 'yes' : 'no',
config.bitcoind.user,
config.bitcoind.pass ? 'Yes(hidden)' : 'No',
config.bitcoind.protocol,
config.bitcoind.host,
config.bitcoind.port,
config.bitcoind.p2pPort,
config.bitcoind.dataDir + (config.network === 'testnet' ? '*' : ''), (config.network === 'testnet' ? '* (/testnet3 is added automatically)' : '')
);
});
/**
* express app
*/
program.parse(process.argv);
// create express app
var expressApp = express();
/**
* Bootstrap models
*/
// setup headers
require('./config/headers')(expressApp);
// setup http/https base server
var server;
if (config.enableHTTPS) {
var serverOpts = {};
serverOpts.key = fs.readFileSync('./etc/test-key.pem');
serverOpts.cert = fs.readFileSync('./etc/test-cert.pem');
server = https.createServer(serverOpts, expressApp);
} else {
server = http.createServer(expressApp);
}
// Bootstrap models
var models_path = __dirname + '/app/models';
var walk = function(path) {
fs.readdirSync(path).forEach(function(file) {
@ -79,10 +94,7 @@ var walk = function(path) {
walk(models_path);
/**
* p2pSync process
*/
// p2pSync process
var peerSync = new PeerSync({
shouldBroadcast: true
});
@ -91,9 +103,7 @@ if (!config.disableP2pSync) {
peerSync.run();
}
/**
* historic_sync process
*/
// historic_sync process
var historicSync = new HistoricSync({
shouldBroadcastSync: true
});
@ -111,20 +121,30 @@ if (!config.disableHistoricSync) {
if (peerSync) peerSync.allowReorgs = true;
//express settings
require('./config/express')(expressApp, historicSync, peerSync);
//Bootstrap routes
require('./config/routes')(expressApp);
// socket.io
var server = require('http').createServer(expressApp);
var ios = require('socket.io')(server);
require('./app/controllers/socket.js').init(ios, config);
var ios = require('socket.io')(server, config);
require('./app/controllers/socket.js').init(ios);
// plugins
if (config.enableRatelimiter) {
require('./plugins/ratelimiter').init(expressApp, config.ratelimiter);
}
if (config.enableMailbox) {
require('./plugins/mailbox').init(ios, config.mailbox);
}
// express settings
require('./config/express')(expressApp, historicSync, peerSync);
require('./config/routes')(expressApp);
//Start the app by listening on <port>
server.listen(config.port, function() {
console.log('insight server listening on port %d in %s mode', server.address().port, process.env.NODE_ENV);
logger.info('insight server listening on port %d in %s mode', server.address().port, process.env.NODE_ENV);
});
//expose app

View File

@ -26,7 +26,6 @@ var async = require('async');
var logger = require('./logger').logger;
var d = logger.log;
var info = logger.info;
var BlockDb = function(opts) {
@ -113,7 +112,7 @@ BlockDb.prototype._changeBlockHeight = function(hash, height, cb) {
var self = this;
var dbScript1 = this._setHeightScript(hash,height);
d('Getting TXS FROM %s to set it Main', hash);
logger.log('Getting TXS FROM %s to set it Main', hash);
this.fromHashWithInfo(hash, function(err, bi) {
if (!bi || !bi.info || !bi.info.tx)
throw new Error('unable to get info for block:'+ hash);
@ -121,10 +120,10 @@ BlockDb.prototype._changeBlockHeight = function(hash, height, cb) {
var dbScript2;
if (height>=0) {
dbScript2 = self._addTxsScript(bi.info.tx, hash, height);
info('\t%s %d Txs', 'Confirming', bi.info.tx.length);
logger.info('\t%s %d Txs', 'Confirming', bi.info.tx.length);
} else {
dbScript2 = self._delTxsScript(bi.info.tx);
info('\t%s %d Txs', 'Unconfirming', bi.info.tx.length);
logger.info('\t%s %d Txs', 'Unconfirming', bi.info.tx.length);
}
db.batch(dbScript2.concat(dbScript1),cb);
});
@ -230,7 +229,7 @@ BlockDb.prototype.getHeight = function(hash, cb) {
};
BlockDb.prototype._setHeightScript = function(hash, height) {
d('setHeight: %s #%d', hash,height);
logger.log('setHeight: %s #%d', hash,height);
return ([{
type: 'put',
key: MAIN_PREFIX + hash,

View File

@ -427,7 +427,8 @@ HistoricSync.prototype.start = function(opts, next) {
else {
self.endTs = Date.now();
self.status = 'finished';
console.log('Done Syncing', self.info());
var info = self.info();
logger.debug('Done Syncing blockchain', info.type, 'to height', info.height);
return w_cb(err);
}
});

View File

@ -19,10 +19,7 @@ var MAX_OPEN_FILES = 500;
var CONCURRENCY = 5;
var d = logger.log;
var info = logger.info;
var db;
var MessageDb = function(opts) {
opts = opts || {};
this.path = config.leveldb + '/messages' + (opts.name ? ('-' + opts.name) : '');

View File

@ -86,10 +86,10 @@ Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
var self = this;
if ( self.storingBlock ) {
console.log('## Storing a block already. Delaying storeTipBlock with:' +
logger.debug('Storing a block already. Delaying storeTipBlock with:' +
b.hash);
return setTimeout( function() {
console.log('## Retrying storeTipBlock with: ' + b.hash);
logger.debug('Retrying storeTipBlock with: ' + b.hash);
self.storeTipBlock(b,allowReorgs,cb);
}, 1000);
}
@ -123,7 +123,7 @@ Sync.prototype.storeTipBlock = function(b, allowReorgs, cb) {
oldHeight = hash ? (h || 0) : -1
if (oldTip && newPrev !== oldTip) {
needReorg = true;
console.log('## REORG Triggered, tip mismatch');
logger.debug('REORG Triggered, tip mismatch');
}
return c();
});
@ -195,7 +195,7 @@ Sync.prototype.processReorg = function(oldTip, oldNext, newPrev, oldHeight, cb)
if (height<0) return c();
newHeight = height + 1;
info('# Reorg Case 1) OldNext: %s NewHeight: %d', oldNext, newHeight);
info('Reorg Case 1) OldNext: %s NewHeight: %d', oldNext, newHeight);
orphanizeFrom = oldNext;
return c(err);
});
@ -203,7 +203,7 @@ Sync.prototype.processReorg = function(oldTip, oldNext, newPrev, oldHeight, cb)
function(c) {
if (orphanizeFrom) return c();
info('# Reorg Case 2)');
info('Reorg Case 2)');
self.setBranchConnectedBackwards(newPrev, function(err, yHash, newYHashNext, height) {
if (err) return c(err);
newHeight = height;

View File

@ -39,7 +39,7 @@ var bitcore = require('bitcore'),
bitcoreUtil = bitcore.util,
buffertools = require('buffertools');
var logger = require('./logger').logger;
var logger = require('./logger');
var inf = logger.info;
var db = imports.db || levelup(config.leveldb + '/txs', {

View File

@ -1,5 +1,13 @@
var winston = require('winston');
var config = require('../config/config');
winston.info('starting...')
var logger = new winston.Logger({
transports: [
new winston.transports.Console({
level: 'error'
}),
]
});
logger.transports.console.level = config.loggerLevel;
module.exports.logger=winston;
module.exports.logger = logger;

View File

@ -57,7 +57,8 @@
"bitcore": "git://github.com/bitpay/bitcore.git#aa41c70cff2583d810664c073a324376c39c8b36",
"bufferput": "git://github.com/bitpay/node-bufferput.git",
"buffertools": "*",
"commander": "*",
"commander": "^2.3.0",
"connect-ratelimit": "git://github.com/dharmafly/connect-ratelimit.git#0550eff209c54f35078f46445000797fa942ab97",
"express": "~3.4.7",
"glob": "*",
"leveldown": "*",
@ -66,11 +67,11 @@
"preconditions": "^1.0.7",
"should": "~2.1.1",
"sinon": "~1.7.3",
"socket.io": "1.0.6",
"socket.io-client": "1.0.6",
"soop": "=0.1.5",
"winston": "*",
"xmlhttprequest": "~1.6.0",
"socket.io": "1.0.6",
"socket.io-client": "1.0.6"
"xmlhttprequest": "~1.6.0"
},
"devDependencies": {
"chai": "*",

58
plugins/mailbox.js Normal file
View File

@ -0,0 +1,58 @@
var microtime = require('microtime');
var mdb = require('../lib/MessageDb').default();
var logger = require('../lib/logger').logger;
var preconditions = require('preconditions').singleton();
var io;
module.exports.init = function(ext_io, config) {
logger.info('Using mailbox plugin');
preconditions.checkArgument(ext_io);
io = ext_io;
io.sockets.on('connection', function(socket) {
// when it requests sync, send him all pending messages
socket.on('sync', function(ts) {
logger.verbose('Sync requested by ' + socket.id);
logger.debug(' from timestamp ' + ts);
var rooms = socket.rooms;
if (rooms.length !== 2) {
socket.emit('insight-error', 'Must subscribe with public key before syncing');
return;
}
var to = rooms[1];
var upper_ts = Math.round(microtime.now());
logger.debug(' to timestamp ' + upper_ts);
mdb.getMessages(to, ts, upper_ts, function(err, messages) {
if (err) {
throw new Error('Couldn\'t get messages on sync request: ' + err);
}
logger.verbose('\tFound ' + messages.length + ' message' + (messages.length !== 1 ? 's' : ''));
for (var i = 0; i < messages.length; i++) {
broadcastMessage(messages[i], socket);
}
});
});
// when it sends a message, add it to db
socket.on('message', function(m) {
logger.debug('Message sent from ' + m.pubkey + ' to ' + m.to);
mdb.addMessage(m, function(err) {
if (err) {
throw new Error('Couldn\'t add message to database: ' + err);
}
});
});
});
mdb.on('message', broadcastMessage);
};
var broadcastMessage = module.exports.broadcastMessage = function(message, socket) {
preconditions.checkState(io);
var s = socket || io.sockets.in(message.to);
logger.debug('sending message to ' + message.to);
s.emit('message', message);
}

35
plugins/ratelimiter.js Normal file
View File

@ -0,0 +1,35 @@
var logger = require('../lib/logger').logger;
var preconditions = require('preconditions').singleton();
var limiter = require('connect-ratelimit');
var ONE_HOUR = 60 * 60 * 1000;
module.exports.init = function(app, config) {
preconditions.checkArgument(app);
logger.info('Using ratelimiter plugin');
config = config || {};
config.whitelistRPH = config.whitelistRPH || 50000;
config.normalRPH = config.normalRPH || 1000;
config.blacklistRPH = config.blacklistRPH || 0;
app.use(limiter({
whitelist: [],
end: true,
blacklist: [], // 'example.com'
categories: {
whitelist: {
totalRequests: config.whitelistRPH,
every: ONE_HOUR
},
blacklist: {
totalRequests: config.blacklistRPH,
every: ONE_HOUR
},
normal: {
totalRequests: config.normalRPH,
every: ONE_HOUR
}
}
}));
};

View File

@ -15,7 +15,7 @@ describe('socket server', function() {
});
it('should register socket handlers', function() {
var io = {
sockets: new EventEmitter()
sockets: new EventEmitter(),
}
socket.init(io);