diff --git a/README.md b/README.md index 053e346..48a3ed5 100644 --- a/README.md +++ b/README.md @@ -92,15 +92,14 @@ Here is an example of the required fields: ##### Pool config Take a look at the example json file inside the `pool_configs` directory. Rename it to `yourcoin.json` and change the -example fields to fit your setup. The field `coin` __must__ be a string that references the `name` field in your coin's -configuration file (the string is not case sensitive). +example fields to fit your setup. Description of options: ````javascript { "disabled": false, //Set this to true and a pool will not be created from this config file - "coin": "litecoin", //This MUST be a reference to the 'name' field in your coin's config file + "coin": "litecoin.json", //Reference to coin config file in 'coins' directory /* This determines what to do with submitted shares (and stratum worker authentication). @@ -119,7 +118,9 @@ Description of options: will be rejected. */ "validateWorkerAddress": true, - /* Every this many seconds check for confirmed blocks and send out payments. */ + /* Every this many seconds get submitted blocks from redis, use daemon RPC to check + their confirmation status, if confirmed then get shares from redis that contributed + to block and send out payments. */ "paymentInterval": 30, /* Minimum number of coins that a miner must earn before sending payment. Typically, @@ -149,6 +150,12 @@ Description of options: "port": 19332, "user": "litecoinrpc", "password": "testnet" + }, + + /* Redis database used for storing share and block submission data. */ + "redis": { + "host": "localhost", + "port": 6379 } }, diff --git a/init.js b/init.js index 331290f..75619e4 100644 --- a/init.js +++ b/init.js @@ -8,20 +8,12 @@ var PoolLogger = require('./libs/logutils.js'); var BlocknotifyListener = require('./libs/blocknotifyListener.js'); var WorkerListener = require('./libs/workerListener.js'); var PoolWorker = require('./libs/poolWorker.js'); +var PaymentProcessor = require('./libs/paymentProcessor.js'); JSON.minify = JSON.minify || require("node-json-minify"); -//Try to give process ability to handle 100k concurrent connections -try{ - posix.setrlimit('nofile', { soft: 100000, hard: 100000 }); -} -catch(e){ - console.error(e); -} - - var loggerInstance = new PoolLogger({ 'default': true, @@ -37,85 +29,97 @@ var logWarning = loggerInstance.logWarning; var logError = loggerInstance.logError; - -if (cluster.isMaster){ +//Try to give process ability to handle 100k concurrent connections +try{ + posix.setrlimit('nofile', { soft: 100000, hard: 100000 }); +} +catch(e){ + logWarning('posix', 'system', '(Safe to ignore) Must be ran as root to increase resource limits'); +} - var config = JSON.parse(JSON.minify(fs.readFileSync("config.json", {encoding: 'utf8'}))); - //Read all coin profile json files from coins directory and build object where key is name of coin - var coinProfiles = (function(){ - var profiles = {}; - fs.readdirSync('coins').forEach(function(file){ - var coinProfile = JSON.parse(JSON.minify(fs.readFileSync('coins/' + file, {encoding: 'utf8'}))); - profiles[coinProfile.name.toLowerCase()] = coinProfile; - }); - return profiles; - })(); +if (cluster.isWorker){ + + switch(process.env.workerType){ + case 'pool': + new PoolWorker(loggerInstance); + break; + case 'paymentProcessor': + new PaymentProcessor(loggerInstance); + break; + } + + return; +} - //Read all pool configs from pool_configs and join them with their coin profile - var poolConfigs = (function(){ - var configs = {}; - fs.readdirSync('pool_configs').forEach(function(file){ - var poolOptions = JSON.parse(JSON.minify(fs.readFileSync('pool_configs/' + file, {encoding: 'utf8'}))); - if (poolOptions.disabled) return; - if (!(poolOptions.coin.toLowerCase() in coinProfiles)){ - logError(poolOptions.coin, 'system', 'could not find coin profile'); - return; - } - poolOptions.coin = coinProfiles[poolOptions.coin.toLowerCase()]; - configs[poolOptions.coin.name] = poolOptions; - }); - return configs; - })(); + +//Read all pool configs from pool_configs and join them with their coin profile +var buildPoolConfigs = function(){ + var configs = {}; + fs.readdirSync('pool_configs').forEach(function(file){ + var poolOptions = JSON.parse(JSON.minify(fs.readFileSync('pool_configs/' + file, {encoding: 'utf8'}))); + if (poolOptions.disabled) return; + var coinFilePath = 'coins/' + poolOptions.coin; + if (!fs.existsSync(coinFilePath)){ + logError(poolOptions.coin, 'system', 'could not find file: ' + coinFilePath); + return; + } + + var coinProfile = JSON.parse(JSON.minify(fs.readFileSync(coinFilePath, {encoding: 'utf8'}))); + poolOptions.coin = coinProfile; + configs[poolOptions.coin.name] = poolOptions; + }); + return configs; +}; + +var spawnPoolWorkers = function(portalConfig, poolConfigs){ var serializedConfigs = JSON.stringify(poolConfigs); var numForks = (function(){ - if (!config.clustering || !config.clustering.enabled) + if (!portalConfig.clustering || !portalConfig.clustering.enabled) return 1; - if (config.clustering.forks === 'auto') + if (portalConfig.clustering.forks === 'auto') return os.cpus().length; - if (!config.clustering.forks || isNaN(config.clustering.forks)) + if (!portalConfig.clustering.forks || isNaN(portalConfig.clustering.forks)) return 1; - return config.clustering.forks; + return portalConfig.clustering.forks; })(); - var workerIds = {}; - for (var i = 0; i < numForks; i++) { - var worker = cluster.fork({ - forkId: i, - pools: serializedConfigs - }); - workerIds[worker.process.pid] = i; - } - - cluster.on('exit', function(worker, code, signal) { - var diedPid = worker.process.pid; - var forkId = workerIds[diedPid] - logError('poolWorker', 'system', 'Fork ' + forkId + ' died, spawning replacement worker...'); + var createPoolWorker = function(forkId){ var worker = cluster.fork({ + workerType: 'pool', forkId: forkId, pools: serializedConfigs }); - delete workerIds[diedPid]; - workerIds[worker.process.pid] = forkId; - }); - + worker.on('exit', function(code, signal){ + logError('poolWorker', 'system', 'Fork ' + forkId + ' died, spawning replacement worker...'); + createPoolWorker(forkId); + }); + }; + + for (var i = 0; i < numForks; i++) { + createPoolWorker(i); + } + +}; +var startWorkerListener = function(poolConfigs){ var workerListener = new WorkerListener(loggerInstance, poolConfigs); workerListener.init(); +}; - +var startBlockListener = function(portalConfig){ //block notify options //setup block notify here and use IPC to tell appropriate pools - var listener = new BlocknotifyListener(config.blockNotifyListener); + var listener = new BlocknotifyListener(portalConfig.blockNotifyListener); listener.on('log', function(text){ logDebug('blocknotify', 'system', text); }); @@ -128,13 +132,33 @@ if (cluster.isMaster){ }); listener.start(); +}; - //create fork for payment processor here -} +var startPaymentProcessor = function(poolConfigs){ + var worker = cluster.fork({ + workerType: 'paymentProcessor', + pools: JSON.stringify(poolConfigs) + }); + worker.on('exit', function(code, signal){ + logError('paymentProcessor', 'system', 'Payment processor died, spawning replacement...'); + startPaymentProcessor(poolConfigs); + }); +}; -else{ - var worker = new PoolWorker(loggerInstance); -} +(function init(){ + var portalConfig = JSON.parse(JSON.minify(fs.readFileSync("config.json", {encoding: 'utf8'}))); + + var poolConfigs = buildPoolConfigs(); + + spawnPoolWorkers(portalConfig, poolConfigs); + + startPaymentProcessor(poolConfigs); + + startBlockListener(portalConfig); + + startWorkerListener(poolConfigs); + +})(); \ No newline at end of file diff --git a/libs/paymentProcessor.js b/libs/paymentProcessor.js index c1dd849..2813a5c 100644 --- a/libs/paymentProcessor.js +++ b/libs/paymentProcessor.js @@ -1,11 +1,118 @@ -/** - * Created by Matt on 3/5/14. - */ -var daemon = new Stratum.daemon.interface([internalConfig.daemon]); -daemon.once('online', function(){ - logger.debug('system', 'Connected to daemon for payment processing'); -}).once('connectionFailed', function(error){ - logger.error('system', 'Failed to connect to daemon for payment processing: ' + JSON.stringify(error)); +var redis = require('redis'); + +var Stratum = require('stratum-pool'); + + +module.exports = function(logger){ + + var poolConfigs = JSON.parse(process.env.pools); + + + Object.keys(poolConfigs).forEach(function(coin) { + SetupForPool(logger, poolConfigs[coin]); + }); + +}; + + +function SetupForPool(logger, poolOptions){ + + var coin = poolOptions.coin.name; + + var processingConfig = poolOptions.shareProcessing.internal; + + if (!processingConfig.enabled) return; + + var logIdentify = 'Payment Processor (' + coin + ')'; + + var paymentLogger = { + debug: function(key, text){ + logger.logDebug(logIdentify, key, text); + }, + warning: function(key, text){ + logger.logWarning(logIdentify, key, text); + }, + error: function(key, text){ + logger.logError(logIdentify, key, text); + } + }; + + var daemon = new Stratum.daemon.interface([processingConfig.daemon]); + daemon.once('online', function(){ + paymentLogger.debug('system', 'Connected to daemon for payment processing'); + + daemon.cmd('validateaddress', [poolOptions.address], function(result){ + if (!result[0].response.ismine){ + paymentLogger.error('system', 'Daemon does not own pool address - payment processing can not be done with this daemon'); + } + }); + }).once('connectionFailed', function(error){ + paymentLogger.error('system', 'Failed to connect to daemon for payment processing: ' + JSON.stringify(error)); }).on('error', function(error){ - logger.error('system', error); - }).init(); \ No newline at end of file + paymentLogger.error('system', error); + }).init(); + + + + var redisClient; + + + var connectToRedis = function(){ + var reconnectTimeout; + redisClient = redis.createClient(processingConfig.redis.port, processingConfig.redis.host); + redisClient.on('ready', function(){ + clearTimeout(reconnectTimeout); + paymentLogger.debug('redis', 'Successfully connected to redis database'); + }).on('error', function(err){ + paymentLogger.error('redis', 'Redis client had an error: ' + JSON.stringify(err)) + }).on('end', function(){ + paymentLogger.error('redis', 'Connection to redis database as been ended'); + paymentLogger.warning('redis', 'Trying reconnection in 3 seconds...'); + reconnectTimeout = setTimeout(function(){ + connectToRedis(); + }, 3000); + }); + }; + connectToRedis(); + + + var checkTx = function(tx, blockHeight){ + daemon.cmd('gettransaction', [tx], function(results){ + //console.dir(results[0].response.details[0].category); + var status = results[0].response.details[0].category; + var confirmed = (status === 'generate'); + + /* next: + - get contributed shares + - get unsent payments + - calculate payments + - send payments + - put unsent payments in db + - remove tx from db + - remove shares from db + */ + }); + }; + + + setInterval(function(){ + + redisClient.smembers('blocks_' + coin, function(error, results){ + if (error){ + logger.error('redis', 'Could get blocks from redis ' + JSON.stringify(error)); + return; + } + + results.forEach(function(item){ + var split = item.split(':'); + var tx = split[0]; + var blockHeight = split[1]; + checkTx(tx, blockHeight); + }); + + }); + + + }, processingConfig.paymentInterval * 1000); + +}; \ No newline at end of file diff --git a/libs/poolWorker.js b/libs/poolWorker.js index d945248..3b00322 100644 --- a/libs/poolWorker.js +++ b/libs/poolWorker.js @@ -1,5 +1,3 @@ -var cluster = require('cluster'); - var Stratum = require('stratum-pool'); var MposCompatibility = require('./mposCompatibility.js'); @@ -28,7 +26,7 @@ module.exports = function(logger){ var poolOptions = poolConfigs[coin]; - var logIdentify = coin + ' (Fork ' + forkId + ')'; + var logIdentify = 'Pool Fork ' + forkId + ' (' + coin + ')'; var poolLogger = { debug: function(key, text){ diff --git a/libs/shareProcessor.js b/libs/shareProcessor.js index fe3e250..43faee5 100644 --- a/libs/shareProcessor.js +++ b/libs/shareProcessor.js @@ -58,7 +58,7 @@ module.exports = function(logger, poolConfig){ }); if (isValidBlock){ - connection.sadd(['blocks_' + coin, shareData.solution + ':' + shareData.height], function(error, result){ + connection.sadd(['blocks_' + coin, shareData.tx + ':' + shareData.height], function(error, result){ if (error) logger.error('redis', 'Could not store block data'); }); diff --git a/pool_configs/litecoin_testnet_example.json b/pool_configs/litecoin_testnet_example.json index 6a690ce..2e9430c 100644 --- a/pool_configs/litecoin_testnet_example.json +++ b/pool_configs/litecoin_testnet_example.json @@ -1,12 +1,12 @@ { "disabled": false, - "coin": "litecoin", + "coin": "litecoin.json", "shareProcessing": { "internal": { "enabled": true, "validateWorkerAddress": true, - "paymentInterval": 30, + "paymentInterval": 10, "minimumPayment": 0.001, "feePercent": 0.02, "feeReceiveAddress": "LZz44iyF4zLCXJTU8RxztyyJZBntdS6fvv", @@ -35,7 +35,7 @@ }, - "address": "mi4iBXbBsydtcc5yFmsff2zCFVX4XG7qJc", + "address": "mfsm1ckZKTTjDz94KonZZsbZnAbm1UV4BF", "blockRefreshInterval": 1000, "connectionTimeout": 600,