Work on payment processing

This commit is contained in:
Matt 2014-03-09 20:31:58 -06:00
parent df867c459e
commit 1309aa6f19
6 changed files with 221 additions and 85 deletions

View File

@ -92,15 +92,14 @@ Here is an example of the required fields:
##### Pool config
Take a look at the example json file inside the `pool_configs` directory. Rename it to `yourcoin.json` and change the
example fields to fit your setup. The field `coin` __must__ be a string that references the `name` field in your coin's
configuration file (the string is not case sensitive).
example fields to fit your setup.
Description of options:
````javascript
{
"disabled": false, //Set this to true and a pool will not be created from this config file
"coin": "litecoin", //This MUST be a reference to the 'name' field in your coin's config file
"coin": "litecoin.json", //Reference to coin config file in 'coins' directory
/* This determines what to do with submitted shares (and stratum worker authentication).
@ -119,7 +118,9 @@ Description of options:
will be rejected. */
"validateWorkerAddress": true,
/* Every this many seconds check for confirmed blocks and send out payments. */
/* Every this many seconds get submitted blocks from redis, use daemon RPC to check
their confirmation status, if confirmed then get shares from redis that contributed
to block and send out payments. */
"paymentInterval": 30,
/* Minimum number of coins that a miner must earn before sending payment. Typically,
@ -149,6 +150,12 @@ Description of options:
"port": 19332,
"user": "litecoinrpc",
"password": "testnet"
},
/* Redis database used for storing share and block submission data. */
"redis": {
"host": "localhost",
"port": 6379
}
},

152
init.js
View File

@ -8,20 +8,12 @@ var PoolLogger = require('./libs/logutils.js');
var BlocknotifyListener = require('./libs/blocknotifyListener.js');
var WorkerListener = require('./libs/workerListener.js');
var PoolWorker = require('./libs/poolWorker.js');
var PaymentProcessor = require('./libs/paymentProcessor.js');
JSON.minify = JSON.minify || require("node-json-minify");
//Try to give process ability to handle 100k concurrent connections
try{
posix.setrlimit('nofile', { soft: 100000, hard: 100000 });
}
catch(e){
console.error(e);
}
var loggerInstance = new PoolLogger({
'default': true,
@ -37,85 +29,97 @@ var logWarning = loggerInstance.logWarning;
var logError = loggerInstance.logError;
if (cluster.isMaster){
//Try to give process ability to handle 100k concurrent connections
try{
posix.setrlimit('nofile', { soft: 100000, hard: 100000 });
}
catch(e){
logWarning('posix', 'system', '(Safe to ignore) Must be ran as root to increase resource limits');
}
var config = JSON.parse(JSON.minify(fs.readFileSync("config.json", {encoding: 'utf8'})));
//Read all coin profile json files from coins directory and build object where key is name of coin
var coinProfiles = (function(){
var profiles = {};
fs.readdirSync('coins').forEach(function(file){
var coinProfile = JSON.parse(JSON.minify(fs.readFileSync('coins/' + file, {encoding: 'utf8'})));
profiles[coinProfile.name.toLowerCase()] = coinProfile;
});
return profiles;
})();
if (cluster.isWorker){
switch(process.env.workerType){
case 'pool':
new PoolWorker(loggerInstance);
break;
case 'paymentProcessor':
new PaymentProcessor(loggerInstance);
break;
}
return;
}
//Read all pool configs from pool_configs and join them with their coin profile
var poolConfigs = (function(){
var configs = {};
fs.readdirSync('pool_configs').forEach(function(file){
var poolOptions = JSON.parse(JSON.minify(fs.readFileSync('pool_configs/' + file, {encoding: 'utf8'})));
if (poolOptions.disabled) return;
if (!(poolOptions.coin.toLowerCase() in coinProfiles)){
logError(poolOptions.coin, 'system', 'could not find coin profile');
return;
}
poolOptions.coin = coinProfiles[poolOptions.coin.toLowerCase()];
configs[poolOptions.coin.name] = poolOptions;
});
return configs;
})();
//Read all pool configs from pool_configs and join them with their coin profile
var buildPoolConfigs = function(){
var configs = {};
fs.readdirSync('pool_configs').forEach(function(file){
var poolOptions = JSON.parse(JSON.minify(fs.readFileSync('pool_configs/' + file, {encoding: 'utf8'})));
if (poolOptions.disabled) return;
var coinFilePath = 'coins/' + poolOptions.coin;
if (!fs.existsSync(coinFilePath)){
logError(poolOptions.coin, 'system', 'could not find file: ' + coinFilePath);
return;
}
var coinProfile = JSON.parse(JSON.minify(fs.readFileSync(coinFilePath, {encoding: 'utf8'})));
poolOptions.coin = coinProfile;
configs[poolOptions.coin.name] = poolOptions;
});
return configs;
};
var spawnPoolWorkers = function(portalConfig, poolConfigs){
var serializedConfigs = JSON.stringify(poolConfigs);
var numForks = (function(){
if (!config.clustering || !config.clustering.enabled)
if (!portalConfig.clustering || !portalConfig.clustering.enabled)
return 1;
if (config.clustering.forks === 'auto')
if (portalConfig.clustering.forks === 'auto')
return os.cpus().length;
if (!config.clustering.forks || isNaN(config.clustering.forks))
if (!portalConfig.clustering.forks || isNaN(portalConfig.clustering.forks))
return 1;
return config.clustering.forks;
return portalConfig.clustering.forks;
})();
var workerIds = {};
for (var i = 0; i < numForks; i++) {
var worker = cluster.fork({
forkId: i,
pools: serializedConfigs
});
workerIds[worker.process.pid] = i;
}
cluster.on('exit', function(worker, code, signal) {
var diedPid = worker.process.pid;
var forkId = workerIds[diedPid]
logError('poolWorker', 'system', 'Fork ' + forkId + ' died, spawning replacement worker...');
var createPoolWorker = function(forkId){
var worker = cluster.fork({
workerType: 'pool',
forkId: forkId,
pools: serializedConfigs
});
delete workerIds[diedPid];
workerIds[worker.process.pid] = forkId;
});
worker.on('exit', function(code, signal){
logError('poolWorker', 'system', 'Fork ' + forkId + ' died, spawning replacement worker...');
createPoolWorker(forkId);
});
};
for (var i = 0; i < numForks; i++) {
createPoolWorker(i);
}
};
var startWorkerListener = function(poolConfigs){
var workerListener = new WorkerListener(loggerInstance, poolConfigs);
workerListener.init();
};
var startBlockListener = function(portalConfig){
//block notify options
//setup block notify here and use IPC to tell appropriate pools
var listener = new BlocknotifyListener(config.blockNotifyListener);
var listener = new BlocknotifyListener(portalConfig.blockNotifyListener);
listener.on('log', function(text){
logDebug('blocknotify', 'system', text);
});
@ -128,13 +132,33 @@ if (cluster.isMaster){
});
listener.start();
};
//create fork for payment processor here
}
var startPaymentProcessor = function(poolConfigs){
var worker = cluster.fork({
workerType: 'paymentProcessor',
pools: JSON.stringify(poolConfigs)
});
worker.on('exit', function(code, signal){
logError('paymentProcessor', 'system', 'Payment processor died, spawning replacement...');
startPaymentProcessor(poolConfigs);
});
};
else{
var worker = new PoolWorker(loggerInstance);
}
(function init(){
var portalConfig = JSON.parse(JSON.minify(fs.readFileSync("config.json", {encoding: 'utf8'})));
var poolConfigs = buildPoolConfigs();
spawnPoolWorkers(portalConfig, poolConfigs);
startPaymentProcessor(poolConfigs);
startBlockListener(portalConfig);
startWorkerListener(poolConfigs);
})();

View File

@ -1,11 +1,118 @@
/**
* Created by Matt on 3/5/14.
*/
var daemon = new Stratum.daemon.interface([internalConfig.daemon]);
daemon.once('online', function(){
logger.debug('system', 'Connected to daemon for payment processing');
}).once('connectionFailed', function(error){
logger.error('system', 'Failed to connect to daemon for payment processing: ' + JSON.stringify(error));
var redis = require('redis');
var Stratum = require('stratum-pool');
module.exports = function(logger){
var poolConfigs = JSON.parse(process.env.pools);
Object.keys(poolConfigs).forEach(function(coin) {
SetupForPool(logger, poolConfigs[coin]);
});
};
function SetupForPool(logger, poolOptions){
var coin = poolOptions.coin.name;
var processingConfig = poolOptions.shareProcessing.internal;
if (!processingConfig.enabled) return;
var logIdentify = 'Payment Processor (' + coin + ')';
var paymentLogger = {
debug: function(key, text){
logger.logDebug(logIdentify, key, text);
},
warning: function(key, text){
logger.logWarning(logIdentify, key, text);
},
error: function(key, text){
logger.logError(logIdentify, key, text);
}
};
var daemon = new Stratum.daemon.interface([processingConfig.daemon]);
daemon.once('online', function(){
paymentLogger.debug('system', 'Connected to daemon for payment processing');
daemon.cmd('validateaddress', [poolOptions.address], function(result){
if (!result[0].response.ismine){
paymentLogger.error('system', 'Daemon does not own pool address - payment processing can not be done with this daemon');
}
});
}).once('connectionFailed', function(error){
paymentLogger.error('system', 'Failed to connect to daemon for payment processing: ' + JSON.stringify(error));
}).on('error', function(error){
logger.error('system', error);
}).init();
paymentLogger.error('system', error);
}).init();
var redisClient;
var connectToRedis = function(){
var reconnectTimeout;
redisClient = redis.createClient(processingConfig.redis.port, processingConfig.redis.host);
redisClient.on('ready', function(){
clearTimeout(reconnectTimeout);
paymentLogger.debug('redis', 'Successfully connected to redis database');
}).on('error', function(err){
paymentLogger.error('redis', 'Redis client had an error: ' + JSON.stringify(err))
}).on('end', function(){
paymentLogger.error('redis', 'Connection to redis database as been ended');
paymentLogger.warning('redis', 'Trying reconnection in 3 seconds...');
reconnectTimeout = setTimeout(function(){
connectToRedis();
}, 3000);
});
};
connectToRedis();
var checkTx = function(tx, blockHeight){
daemon.cmd('gettransaction', [tx], function(results){
//console.dir(results[0].response.details[0].category);
var status = results[0].response.details[0].category;
var confirmed = (status === 'generate');
/* next:
- get contributed shares
- get unsent payments
- calculate payments
- send payments
- put unsent payments in db
- remove tx from db
- remove shares from db
*/
});
};
setInterval(function(){
redisClient.smembers('blocks_' + coin, function(error, results){
if (error){
logger.error('redis', 'Could get blocks from redis ' + JSON.stringify(error));
return;
}
results.forEach(function(item){
var split = item.split(':');
var tx = split[0];
var blockHeight = split[1];
checkTx(tx, blockHeight);
});
});
}, processingConfig.paymentInterval * 1000);
};

View File

@ -1,5 +1,3 @@
var cluster = require('cluster');
var Stratum = require('stratum-pool');
var MposCompatibility = require('./mposCompatibility.js');
@ -28,7 +26,7 @@ module.exports = function(logger){
var poolOptions = poolConfigs[coin];
var logIdentify = coin + ' (Fork ' + forkId + ')';
var logIdentify = 'Pool Fork ' + forkId + ' (' + coin + ')';
var poolLogger = {
debug: function(key, text){

View File

@ -58,7 +58,7 @@ module.exports = function(logger, poolConfig){
});
if (isValidBlock){
connection.sadd(['blocks_' + coin, shareData.solution + ':' + shareData.height], function(error, result){
connection.sadd(['blocks_' + coin, shareData.tx + ':' + shareData.height], function(error, result){
if (error)
logger.error('redis', 'Could not store block data');
});

View File

@ -1,12 +1,12 @@
{
"disabled": false,
"coin": "litecoin",
"coin": "litecoin.json",
"shareProcessing": {
"internal": {
"enabled": true,
"validateWorkerAddress": true,
"paymentInterval": 30,
"paymentInterval": 10,
"minimumPayment": 0.001,
"feePercent": 0.02,
"feeReceiveAddress": "LZz44iyF4zLCXJTU8RxztyyJZBntdS6fvv",
@ -35,7 +35,7 @@
},
"address": "mi4iBXbBsydtcc5yFmsff2zCFVX4XG7qJc",
"address": "mfsm1ckZKTTjDz94KonZZsbZnAbm1UV4BF",
"blockRefreshInterval": 1000,
"connectionTimeout": 600,