proxy switching added and working

This commit is contained in:
Jerry Brady 2014-04-07 03:11:53 +00:00
parent f276554bd5
commit 27a8de62cb
6 changed files with 326 additions and 88 deletions

View File

@ -23,39 +23,45 @@
"statUpdateInterval": 1.5,
"hashrateWindow": 300
},
/*
In a proxy configuration, you can setup ports that accept miners for work based on
a specific algorithm instead of a specific coin. Miners that connect to these ports
are automatically switched a coin determined by the server.
The default coin is the first configured pool for each algorithm and coin switching
can be triggered using the coinSwitch.js script in the scripts folder.
Please note miner address authentication must be disabled when using NOMP in a proxy
configuration and that payout processing is left up to the server administrator.
*/
"proxy": {
"enabled": false,
"ports": {
"80": {
"diff": 32,
"varDiff": {
"minDiff" : 8,
"maxDiff" : 512,
"targetTime" : 15,
"retargetTime" : 90,
"variancePercent" : 30
}
},
"6000": {
"diff": 32,
"varDiff": {
"minDiff" : 8,
"maxDiff" : 512,
"targetTime" : 15,
"retargetTime" : 90,
"variancePercent" : 30
}
},
"8080": {
"diff": 32,
"varDiff": {
"minDiff" : 8,
"maxDiff" : 512,
"targetTime" : 15,
"retargetTime" : 90,
"variancePercent" : 30
}
"sha256": {
"enabled": false,
"port": "3333",
"diff": 10,
"varDiff": {
"minDiff": 16, //Minimum difficulty
"maxDiff": 512, //Network difficulty will be used if it is lower than this
"targetTime": 15, //Try to get 1 share per this many seconds
"retargetTime": 90, //Check to see if we should retarget every this many seconds
"variancePercent": 30 //Allow time to very this % from target without retargeting
}
},
"scrypt": {
"enabled": false,
"port": "4444",
"diff": 10,
"varDiff": {
"minDiff": 16, //Minimum difficulty
"maxDiff": 512, //Network difficulty will be used if it is lower than this
"targetTime": 15, //Try to get 1 share per this many seconds
"retargetTime": 90, //Check to see if we should retarget every this many seconds
"variancePercent": 30 //Allow time to very this % from target without retargeting
}
},
"scrypt-n": {
"enabled": false,
"port": "5555"
}
}
}
}

71
init.js
View File

@ -6,6 +6,7 @@ var cluster = require('cluster');
var async = require('async');
var PoolLogger = require('./libs/logUtil.js');
var BlocknotifyListener = require('./libs/blocknotifyListener.js');
var CoinswitchListener = require('./libs/coinswitchListener.js');
var RedisBlocknotifyListener = require('./libs/redisblocknotifyListener.js');
var WorkerListener = require('./libs/workerListener.js');
var PoolWorker = require('./libs/poolWorker.js');
@ -32,9 +33,11 @@ var logger = new PoolLogger({
try {
/*
require('newrelic');
if (cluster.isMaster)
logger.debug('NewRelic', 'Monitor', 'New Relic initiated');
*/
} catch(e) {}
@ -71,19 +74,7 @@ if (cluster.isWorker){
}
return;
} /* else {
var coinNames = ['alphacoin','frankocoin','emerald','kittehcoin'];
var curIndex = 0;
setInterval(function () {
var newCoinName = coinNames[++curIndex % coinNames.length];
console.log("SWITCHING to "+newCoinName);
var ipcMessage = {type:'switch', coin: newCoinName};
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].send(ipcMessage);
});
}, 20000);
} */
}
//Read all pool configs from pool_configs and join them with their coin profile
@ -102,11 +93,11 @@ var buildPoolConfigs = function(){
var coinProfile = JSON.parse(JSON.minify(fs.readFileSync(coinFilePath, {encoding: 'utf8'})));
poolOptions.coin = coinProfile;
configs[poolOptions.coin.name] = poolOptions;
configs[poolOptions.coin.name.toLowerCase()] = poolOptions;
if (!(coinProfile.algorithm in algos)){
logger.error('Master', coinProfile.name, 'Cannot run a pool for unsupported algorithm "' + coinProfile.algorithm + '"');
delete configs[poolOptions.coin.name];
delete configs[poolOptions.coin.name.toLowerCase()];
}
});
@ -198,6 +189,54 @@ var startBlockListener = function(portalConfig){
listener.start();
};
//
// Receives authenticated events from coin switch listener and triggers proxy
// to swtich to a new coin.
//
var startCoinswitchListener = function(portalConfig){
var listener = new CoinswitchListener(portalConfig.coinSwitchListener);
listener.on('log', function(text){
logger.debug('Master', 'Coinswitch', text);
});
listener.on('switchcoin', function(message){
var ipcMessage = {type:'blocknotify', coin: message.coin, hash: message.hash};
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].send(ipcMessage);
});
var ipcMessage = {
type:'switch',
coin: message.coin.toLowerCase()
};
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].send(ipcMessage);
});
});
listener.start();
/*
if !cluster.isWorker
else {
var coinNames = ['Emoticoin','Infinitecoin'];
var curIndex = 0;
setInterval(function () {
var newCoinName = coinNames[++curIndex % coinNames.length];
console.log("SWITCHING to " + newCoinName);
var ipcMessage = {
type:'switch',
coin: newCoinName
};
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].send(ipcMessage);
});
}, 30000);
}
*/
};
var startRedisBlockListener = function(portalConfig){
//block notify options
//setup block notify here and use IPC to tell appropriate pools
@ -273,6 +312,8 @@ var startWebsite = function(portalConfig, poolConfigs){
startBlockListener(portalConfig);
startCoinswitchListener(portalConfig);
startRedisBlockListener(portalConfig);
startWorkerListener(poolConfigs);

View File

@ -0,0 +1,56 @@
var events = require('events');
var net = require('net');
var listener = module.exports = function listener(options){
var _this = this;
var emitLog = function(text){
_this.emit('log', text);
};
this.start = function(){
if (!options || !options.enabled){
emitLog('Coinswitch listener disabled');
return;
}
var coinswitchServer = net.createServer(function(c) {
emitLog('Coinswitch listener has incoming connection');
var data = '';
try {
c.on('data', function (d) {
emitLog('Coinswitch listener received switch request');
data += d;
if (data.slice(-1) === '\n') {
c.end();
}
});
c.on('end', function () {
var message = JSON.parse(data);
if (message.password === options.password) {
_this.emit('switchcoin', message);
}
else
emitLog('Coinswitch listener received notification with incorrect password');
});
}
catch(e){
emitLog('Coinswitch listener failed to parse message ' + data);
}
});
coinswitchServer.listen(options.port, function() {
emitLog('Coinswitch notify listener server started on port ' + options.port)
});
emitLog("Coinswitch listener is enabled, starting server on port " + options.port);
}
};
listener.prototype.__proto__ = events.EventEmitter.prototype;

View File

@ -1,14 +1,14 @@
var Stratum = require('stratum-pool');
var Vardiff = require('stratum-pool/lib/varDiff.js');
var redis = require('redis');
var net = require('net');
var MposCompatibility = require('./mposCompatibility.js');
var ShareProcessor = require('./shareProcessor.js');
module.exports = function(logger){
var _this = this;
var poolConfigs = JSON.parse(process.env.pools);
var portalConfig = JSON.parse(process.env.portalConfig);
@ -17,30 +17,54 @@ module.exports = function(logger){
var pools = {};
var proxyStuff = {};
var proxySwitch = {};
//Handle messages from master process sent via IPC
process.on('message', function(message) {
switch(message.type){
case 'blocknotify':
var pool = pools[message.coin.toLowerCase()]
if (pool) pool.processBlockNotify(message.hash)
break;
// IPC message for pool switching
case 'switch':
var newCoinPool = pools[message.coin.toLowerCase()];
if (newCoinPool) {
var oldPool = pools[proxyStuff.curActivePool];
var logSystem = 'Proxy';
var logComponent = 'Switch';
var logSubCat = 'Thread ' + (parseInt(forkId) + 1);
var newCoin = message.coin.toLowerCase();
if (!poolConfigs.hasOwnProperty(newCoin)) {
logger.debug(logSystem, logComponent, logSubCat, 'Switch message to coin that is not recognized: ' + newCoin);
break;
}
var algo = poolConfigs[newCoin].coin.algorithm;
var newPool = pools[newCoin];
var oldCoin = proxySwitch[algo].currentPool;
var oldPool = pools[oldCoin];
var proxyPort = proxySwitch[algo].port;
if (newCoin == oldCoin) {
logger.debug(logSystem, logComponent, logSubCat, 'Switch message would have no effect - ignoring ' + newCoin);
break;
}
logger.debug(logSystem, logComponent, logSubCat, 'Proxy message for ' + algo + ' from ' + oldCoin + ' to ' + newCoin);
if (newPool) {
oldPool.relinquishMiners(
function (miner, cback) {
// relinquish miners that are attached to one of the "Auto-switch" ports and leave the others there.
cback(typeof(portalConfig.proxy.ports[miner.client.socket.localPort]) !== 'undefined')
cback(miner.client.socket.localPort == proxyPort)
},
function (clients) {
newCoinPool.attachMiners(clients);
proxyStuff.curActivePool = message.coin.toLowerCase();
newPool.attachMiners(clients);
}
)
);
proxySwitch[algo].currentPool = newCoin;
//TODO write new pool to REDIS
}
break;
}
@ -55,7 +79,6 @@ module.exports = function(logger){
var logComponent = coin;
var logSubCat = 'Thread ' + (parseInt(forkId) + 1);
var handlers = {
auth: function(){},
share: function(){},
@ -128,51 +151,125 @@ module.exports = function(logger){
else if (isValidBlock)
logger.debug(logSystem, logComponent, logSubCat, 'Block found: ' + data.blockHash);
if (isValidShare)
logger.debug(logSystem, logComponent, logSubCat, 'Share accepted at diff ' + data.difficulty + ' with diff ' + data.shareDiff + ' by ' + data.worker + ' [' + data.ip + ']' );
logger.debug(logSystem, logComponent, logSubCat, 'Share accepted at diff ' + data.difficulty + ' by ' + data.worker + ' [' + data.ip + ']' );
else if (!isValidShare)
logger.debug(logSystem, logComponent, logSubCat, 'Share rejected: ' + shareData);
handlers.share(isValidShare, isValidBlock, data)
}).on('difficultyUpdate', function(workerName, diff){
logger.debug(logSystem, logComponent, logSubCat, 'Difficulty update to diff ' + diff + ' workerName=' + JSON.stringify(workerName));
handlers.diff(workerName, diff);
}).on('log', function(severity, text) {
logger[severity](logSystem, logComponent, logSubCat, text);
});
pool.start();
pools[poolOptions.coin.name.toLowerCase()] = pool;
});
if (typeof(portalConfig.proxy) !== 'undefined' && portalConfig.proxy.enabled === true) {
proxyStuff.curActivePool = Object.keys(pools)[0];
proxyStuff.proxys = {};
proxyStuff.varDiffs = {};
Object.keys(portalConfig.proxy.ports).forEach(function(port) {
proxyStuff.varDiffs[port] = new Vardiff(port, portalConfig.proxy.ports[port].varDiff);
});
Object.keys(pools).forEach(function (coinName) {
var p = pools[coinName];
Object.keys(proxyStuff.varDiffs).forEach(function(port) {
p.setVarDiff(port, proxyStuff.varDiffs[port]);
if (typeof(portalConfig.proxy) !== 'undefined') {
var logSystem = 'Proxy';
var logComponent = 'Setup';
var logSubCat = 'Thread ' + (parseInt(forkId) + 1);
var proxyState = {};
//
// Load proxy state for each algorithm from redis which allows NOMP to resume operation
// on the last pool it was using when reloaded or restarted
//
logger.debug(logSystem, logComponent, logSubCat, 'Loading last proxy state from redis');
var redisClient = redis.createClient(6379, "localhost") //TODO figure out where redis config will come from for such things
redisClient.on('ready', function(){
redisClient.hgetall("proxyState", function(error, obj) {
if (error) {
logger.debug(logSystem, logComponent, logSubCat, 'No last proxy state found in redis');
}
else {
proxyState = obj;
logger.debug(logSystem, logComponent, logSubCat, 'Last proxy state loaded from redis');
}
//
// Setup proxySwitch object to control proxy operations from configuration and any restored
// state. Each algorithm has a listening port, current coin name, and an active pool to
// which traffic is directed when activated in the config.
//
// In addition, the proxy config also takes diff and varDiff parmeters the override the
// defaults for the standard config of the coin.
//
Object.keys(portalConfig.proxy).forEach(function(algorithm) {
if (portalConfig.proxy[algorithm].enabled === true) {
var initalPool = proxyState.hasOwnProperty(algorithm) ? proxyState[algorithm].currentPool : _this.getFirstPoolForAlgorithm(algorithm);
proxySwitch[algorithm] = {
port: portalConfig.proxy[algorithm].port,
currentPool: initalPool,
proxy: {}
}
// Copy diff and vardiff configuation into pools that match our algorithm so the stratum server can pick them up
//
// Note: This seems a bit wonky and brittle - better if proxy just used the diff config of the port it was
// routed into instead.
//
if (portalConfig.proxy[algorithm].hasOwnProperty('varDiff')) {
proxySwitch[algorithm].varDiff = new Vardiff(proxySwitch[algorithm].port, portalConfig.proxy[algorithm].varDiff);
proxySwitch[algorithm].diff = portalConfig.proxy[algorithm].diff;
}
Object.keys(pools).forEach(function (coinName) {
var a = poolConfigs[coinName].coin.algorithm;
var p = pools[coinName];
if (a == algorithm) {
p.setVarDiff(proxySwitch[algorithm].port, proxySwitch[algorithm].varDiff);
// Set diff for proxy port by mimicking coin port config and setting it in the pool
// the diff wasn't being picked up by the stratum server for proxy workers and was always using the default of 8
//p.options.ports[proxySwitch[algorithm].port] = {};
//p.options.ports[proxySwitch[algorithm].port].proxy = true;
//p.options.ports[proxySwitch[algorithm].port].diff = proxySwitch[algorithm].diff;
}
});
proxySwitch[algorithm].proxy = net.createServer({allowHalfOpen: true}, function(socket) {
var currentPool = proxySwitch[algorithm].currentPool;
var logSubCat = 'Thread ' + (parseInt(forkId) + 1);
logger.debug(logSystem, 'Connect', logSubCat, 'Proxy connect from ' + socket.remoteAddress + ' on ' + proxySwitch[algorithm].port
+ ' routing to ' + currentPool);
pools[currentPool].getStratumServer().handleNewClient(socket);
}).listen(parseInt(proxySwitch[algorithm].port), function() {
logger.debug(logSystem, logComponent, logSubCat, 'Proxy listening for ' + algorithm + ' on port ' + proxySwitch[algorithm].port
+ ' into ' + proxySwitch[algorithm].currentPool);
});
}
else {
logger.debug(logSystem, logComponent, logSubCat, 'Proxy pool for ' + algorithm + ' disabled.');
}
});
});
}).on('error', function(err){
logger.debug(logSystem, logComponent, logSubCat, 'Pool configuration failed: ' + err);
});
Object.keys(portalConfig.proxy.ports).forEach(function (port) {
proxyStuff.proxys[port] = net .createServer({allowHalfOpen: true}, function(socket) {
console.log(proxyStuff.curActivePool);
pools[proxyStuff.curActivePool].getStratumServer().handleNewClient(socket);
}).listen(parseInt(port), function(){
console.log("Proxy listening on " + port);
});
});
redisClient.quit();
}
this.getFirstPoolForAlgorithm = function(algorithm) {
var foundCoin = "";
Object.keys(poolConfigs).forEach(function(coinName) {
if (poolConfigs[coinName].coin.algorithm == algorithm) {
if (foundCoin === "")
foundCoin = coinName;
}
});
return foundCoin;
};
};

View File

@ -18,7 +18,7 @@ module.exports = function(logger, poolConfig){
var internalConfig = poolConfig.shareProcessing.internal;
var redisConfig = internalConfig.redis;
var coin = poolConfig.coin.name;
var coin = poolConfig.coin.name.toLowerCase();
var forkId = process.env.forkId;
var logSystem = 'Pool';
@ -78,4 +78,4 @@ module.exports = function(logger, poolConfig){
};
};
};

38
scripts/coinSwitch.js Normal file
View File

@ -0,0 +1,38 @@
/*
This script demonstrates sending a coin switch request and can be invoked from the command line
with:
"node coinSwitch.js localhost:8118 password %s"
where <%s> is the name of the coin proxy miners will be switched onto.
If the coin name is not configured, disabled or matches the existing proxy setting, no action
will be taken by NOMP on receipt of the message.
*/
var net = require('net');
var config = process.argv[2];
var parts = config.split(':');
var host = parts[0];
var port = parts[1];
var password = process.argv[3];
var coin = process.argv[4];
var blockHash = process.argv[5];
var client = net.connect(port, host, function() {
console.log('client connected');
client.write(JSON.stringify({
password: password,
coin: coin,
}) + '\n');
});
client.on('data', function(data) {
console.log(data.toString());
//client.end();
});
client.on('end', function() {
console.log('client disconnected');
//process.exit();
});