var fs = require('fs'); var path = require('path'); var os = require('os'); var cluster = require('cluster'); var async = require('async'); var PoolLogger = require('./libs/logUtil.js'); var CliListener = require('./libs/cliListener.js'); var RedisBlocknotifyListener = require('./libs/redisblocknotifyListener.js'); var PoolWorker = require('./libs/poolWorker.js'); var PaymentProcessor = require('./libs/paymentProcessor.js'); var Website = require('./libs/website.js'); var ProfitSwitch = require('./libs/profitSwitch.js'); var algos = require('stratum-pool/lib/algoProperties.js'); JSON.minify = JSON.minify || require("node-json-minify"); if (!fs.existsSync('config.json')){ console.log('config.json file does not exist. Read the installation/setup instructions.'); return; } var portalConfig = JSON.parse(JSON.minify(fs.readFileSync("config.json", {encoding: 'utf8'}))); var logger = new PoolLogger({ logLevel: portalConfig.logLevel }); try { require('newrelic'); if (cluster.isMaster) logger.debug('NewRelic', 'Monitor', 'New Relic initiated'); } catch(e) {} //Try to give process ability to handle 100k concurrent connections try{ var posix = require('posix'); try { posix.setrlimit('nofile', { soft: 100000, hard: 100000 }); } catch(e){ if (cluster.isMaster) logger.warning('POSIX', 'Connection Limit', '(Safe to ignore) Must be ran as root to increase resource limits'); } } catch(e){ if (cluster.isMaster) logger.debug('POSIX', 'Connection Limit', '(Safe to ignore) POSIX module not installed and resource (connection) limit was not raised'); } if (cluster.isWorker){ switch(process.env.workerType){ case 'pool': new PoolWorker(logger); break; case 'paymentProcessor': new PaymentProcessor(logger); break; case 'website': new Website(logger); break; case 'profitSwitch': new ProfitSwitch(logger); break; } return; } //Read all pool configs from pool_configs and join them with their coin profile var buildPoolConfigs = function(){ var configs = {}; var configDir = 'pool_configs/'; var poolConfigFiles = []; /* Get filenames of pool config json files that are enabled */ fs.readdirSync(configDir).forEach(function(file){ if (!fs.existsSync(configDir + file) || path.extname(configDir + file) !== '.json') return; var poolOptions = JSON.parse(JSON.minify(fs.readFileSync(configDir + file, {encoding: 'utf8'}))); if (!poolOptions.enabled) return; poolOptions.fileName = file; poolConfigFiles.push(poolOptions); }); /* Ensure no pool uses any of the same ports as another pool */ for (var i = 0; i < poolConfigFiles.length; i++){ var ports = Object.keys(poolConfigFiles[i].ports); for (var f = 0; f < poolConfigFiles.length; f++){ if (f === i) continue; var portsF = Object.keys(poolConfigFiles[f].ports); for (var g = 0; g < portsF.length; g++){ if (ports.indexOf(portsF[g]) !== -1){ logger.error('Master', poolConfigFiles[f].fileName, 'Has same configured port of ' + portsF[g] + ' as ' + poolConfigFiles[i].fileName); process.exit(1); return; } } if (poolConfigFiles[f].coin === poolConfigFiles[i].coin){ logger.error('Master', poolConfigFiles[f].fileName, 'Pool has same configured coin file coins/' + poolConfigFiles[f].coin + ' as ' + poolConfigFiles[i].fileName + ' pool'); process.exit(1); return; } } } poolConfigFiles.forEach(function(poolOptions){ poolOptions.coinFileName = poolOptions.coin; var coinFilePath = 'coins/' + poolOptions.coinFileName; if (!fs.existsSync(coinFilePath)){ logger.error('Master', poolOptions.coinFileName, 'could not find file: ' + coinFilePath); return; } var coinProfile = JSON.parse(JSON.minify(fs.readFileSync(coinFilePath, {encoding: 'utf8'}))); poolOptions.coin = coinProfile; if (poolOptions.coin.name in configs){ logger.error('Master', poolOptions.fileName, 'coins/' + poolOptions.coinFileName + ' has same configured coin name ' + poolOptions.coin.name + ' as coins/' + configs[poolOptions.coin.name].coinFileName + ' used by pool config ' + configs[poolOptions.coin.name].fileName); process.exit(1); return; } configs[poolOptions.coin.name] = poolOptions; if (!(coinProfile.algorithm in algos)){ logger.error('Master', coinProfile.name, 'Cannot run a pool for unsupported algorithm "' + coinProfile.algorithm + '"'); delete configs[poolOptions.coin.name]; } }); return configs; }; var spawnPoolWorkers = function(portalConfig, poolConfigs){ Object.keys(poolConfigs).forEach(function(coin){ var p = poolConfigs[coin]; var internalEnabled = p.shareProcessing && p.shareProcessing.internal && p.shareProcessing.internal.enabled; var mposEnabled = p.shareProcessing && p.shareProcessing.mpos && p.shareProcessing.mpos.enabled; if (!internalEnabled && !mposEnabled){ logger.error('Master', coin, 'Share processing is not configured so a pool cannot be started for this coin.'); delete poolConfigs[coin]; } if (!Array.isArray(p.daemons) || p.daemons.length < 1){ logger.error('Master', coin, 'No daemons configured so a pool cannot be started for this coin.'); delete poolConfigs[coin]; } }); if (Object.keys(poolConfigs).length === 0){ logger.warning('Master', 'PoolSpawner', 'No pool configs exists or are enabled in pool_configs folder. No pools spawned.'); return; } for (var p in poolConfigs){ } var serializedConfigs = JSON.stringify(poolConfigs); var numForks = (function(){ if (!portalConfig.clustering || !portalConfig.clustering.enabled) return 1; if (portalConfig.clustering.forks === 'auto') return os.cpus().length; if (!portalConfig.clustering.forks || isNaN(portalConfig.clustering.forks)) return 1; return portalConfig.clustering.forks; })(); var poolWorkers = {}; var createPoolWorker = function(forkId){ var worker = cluster.fork({ workerType: 'pool', forkId: forkId, pools: serializedConfigs, portalConfig: JSON.stringify(portalConfig) }); worker.forkId = forkId; worker.type = 'pool'; poolWorkers[forkId] = worker; worker.on('exit', function(code, signal){ logger.error('Master', 'PoolSpanwer', 'Fork ' + forkId + ' died, spawning replacement worker...'); setTimeout(function(){ createPoolWorker(forkId); }, 2000); }).on('message', function(msg){ switch(msg.type){ case 'banIP': Object.keys(cluster.workers).forEach(function(id) { if (cluster.workers[id].type === 'pool'){ cluster.workers[id].send({type: 'banIP', ip: msg.ip}); } }); break; } }); }; var i = 0; var spawnInterval = setInterval(function(){ createPoolWorker(i); i++; if (i === numForks){ clearInterval(spawnInterval); logger.debug('Master', 'PoolSpawner', 'Spawned ' + Object.keys(poolConfigs).length + ' pool(s) on ' + numForks + ' thread(s)'); } }, 250); }; var startCliListener = function(cliPort){ var listener = new CliListener(cliPort); listener.on('log', function(text){ logger.debug('Master', 'CLI', text); }).on('command', function(command, params, options){ switch(command){ case 'blocknotify': Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send({type: 'blocknotify', coin: params[0], hash: params[1]}); }); break; case 'coinswitch': Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send({type: 'coinswitch', switchName: params[0], coin: params[1] }); }); break; case 'restartpool': Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send({type: 'restartpool', coin: params[0] }); }); } console.log('command: ' + JSON.stringify([command, params, options])); }).start(); }; /* // // Receives authenticated events from coin switch listener and triggers proxy // to swtich to a new coin. // var startCoinswitchListener = function(portalConfig){ var listener = new CoinswitchListener(portalConfig.coinSwitchListener); listener.on('log', function(text){ logger.debug('Master', 'Coinswitch', text); }); listener.on('switchcoin', function(message){ var ipcMessage = {type:'blocknotify', coin: message.coin, hash: message.hash}; Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send(ipcMessage); }); var ipcMessage = { type:'coinswitch', coin: message.coin }; Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send(ipcMessage); }); }); listener.start(); }; */ var startRedisBlockListener = function(portalConfig){ //block notify options //setup block notify here and use IPC to tell appropriate pools if (!portalConfig.redisBlockNotifyListener.enabled) return; var listener = new RedisBlocknotifyListener(portalConfig.redisBlockNotifyListener); listener.on('log', function(text){ logger.debug('Master', 'blocknotify', text); }).on('hash', function (message) { var ipcMessage = {type:'blocknotify', coin: message.coin, hash: message.hash}; Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send(ipcMessage); }); }); listener.start(); }; var startPaymentProcessor = function(poolConfigs){ var enabledForAny = false; for (var pool in poolConfigs){ var p = poolConfigs[pool]; var enabled = p.enabled && p.shareProcessing && p.shareProcessing.internal && p.shareProcessing.internal.enabled; if (enabled){ enabledForAny = true; break; } } if (!enabledForAny) return; var worker = cluster.fork({ workerType: 'paymentProcessor', pools: JSON.stringify(poolConfigs) }); worker.on('exit', function(code, signal){ logger.error('Master', 'Payment Processor', 'Payment processor died, spawning replacement...'); setTimeout(function(){ startPaymentProcessor(poolConfigs); }, 2000); }); }; var startWebsite = function(portalConfig, poolConfigs){ if (!portalConfig.website.enabled) return; var worker = cluster.fork({ workerType: 'website', pools: JSON.stringify(poolConfigs), portalConfig: JSON.stringify(portalConfig) }); worker.on('exit', function(code, signal){ logger.error('Master', 'Website', 'Website process died, spawning replacement...'); setTimeout(function(){ startWebsite(portalConfig, poolConfigs); }, 2000); }); }; var startProfitSwitch = function(portalConfig, poolConfigs){ if (!portalConfig.profitSwitch || !portalConfig.profitSwitch.enabled){ //logger.error('Master', 'Profit', 'Profit auto switching disabled'); return; } var worker = cluster.fork({ workerType: 'profitSwitch', pools: JSON.stringify(poolConfigs), portalConfig: JSON.stringify(portalConfig) }); worker.on('exit', function(code, signal){ logger.error('Master', 'Profit', 'Profit switching process died, spawning replacement...'); setTimeout(function(){ startWebsite(portalConfig, poolConfigs); }, 2000); }); }; (function init(){ var poolConfigs = buildPoolConfigs(); spawnPoolWorkers(portalConfig, poolConfigs); startPaymentProcessor(poolConfigs); startRedisBlockListener(portalConfig); startWebsite(portalConfig, poolConfigs); startProfitSwitch(portalConfig, poolConfigs); startCliListener(portalConfig.cliPort); })();