Started work on share processing. Added redis dependency.

This commit is contained in:
Matt 2014-03-01 18:19:10 -07:00
parent 39cdea33cc
commit d640cc12c0
7 changed files with 158 additions and 153 deletions

74
init.js
View File

@ -4,9 +4,10 @@ var cluster = require('cluster');
var posix = require('posix'); var posix = require('posix');
var Stratum = require('stratum-pool');
var PoolLogger = require('./libs/logutils.js'); var PoolLogger = require('./libs/logutils.js');
var BlocknotifyListener = require('./libs/blocknotifyListener.js'); var BlocknotifyListener = require('./libs/blocknotifyListener.js');
var ShareProcessor = require('./libs/shareProcessor.js');
var PoolWorker = require('./libs/poolWorker.js');
JSON.minify = JSON.minify || require("node-json-minify"); JSON.minify = JSON.minify || require("node-json-minify");
@ -91,10 +92,16 @@ if (cluster.isMaster){
} }
cluster.on('exit', function(worker, code, signal) { cluster.on('exit', function(worker, code, signal) {
console.log('worker fork with PID ' + worker.process.pid + ' died'); logError('workerFork', 'system', 'fork with PID ' + worker.process.pid + ' died');
}); });
var shareProcessor = new ShareProcessor(loggerInstance);
shareProcessor.init();
//block notify options //block notify options
//setup block notify here and use IPC to tell appropriate pools //setup block notify here and use IPC to tell appropriate pools
var listener = new BlocknotifyListener(config.blockNotifyListener); var listener = new BlocknotifyListener(config.blockNotifyListener);
@ -115,67 +122,6 @@ if (cluster.isMaster){
else{ else{
var poolConfigs = JSON.parse(process.env.pools); var worker = new PoolWorker(loggerInstance);
var fork = process.env.fork;
var pools = [];
//Handle blocknotify message from master process sent via IPC
process.on('message', function(msg) {
var message = JSON.parse(msg);
if (message.blocknotify){
for (var i = 0; i < pools.length; i++){
if (pools[i].options.coin.name.toLowerCase() === message.coin.toLowerCase()){
pools[i].processBlockNotify(message.blockHash)
return;
}
}
}
});
poolConfigs.forEach(function(poolOptions){
var logIdentify = poolOptions.coin.name + ' (Fork ' + fork + ')';
var authorizeFN = function (ip, workerName, password, callback) {
// Default implementation just returns true
logDebug(logIdentify, 'client', "Authorize [" + ip + "] " + workerName + ":" + password);
callback({
error: null,
authorized: true,
disconnect: false
});
};
var pool = Stratum.createPool(poolOptions, authorizeFN);
pool.on('share', function(isValidShare, isValidBlock, data){
var shareData = JSON.stringify(data);
if (data.solution && !isValidBlock)
logDebug(logIdentify, 'client', 'We thought a block solution was found but it was rejected by the daemon, share data: ' + shareData);
else if (isValidBlock)
logDebug(logIdentify, 'client', 'Block found, share data: ' + shareData);
else if (isValidShare)
logDebug(logIdentify, 'client', 'Valid share submitted, share data: ' + shareData);
else
logDebug(logIdentify, 'client', 'Invalid share submitted, share data: ' + shareData)
}).on('log', function(severity, logKey, logText) {
if (severity == 'debug') {
logDebug(logIdentify, logKey, logText);
} else if (severity == 'warning') {
logWarning(logIdentify, logKey, logText);
} else if (severity == 'error') {
logError(logIdentify, logKey, logText);
}
});
pool.start();
pools.push(pool);
});
} }

85
libs/poolWorker.js Normal file
View File

@ -0,0 +1,85 @@
var Stratum = require('stratum-pool');
module.exports = function(logger){
var logDebug = logger.logDebug;
var logWarning = logger.logWarning;
var logError = logger.logError;
var poolConfigs = JSON.parse(process.env.pools);
var fork = process.env.fork;
var pools = [];
//Handle blocknotify message from master process sent via IPC
process.on('message', function(message) {
if (message.blocknotify){
for (var i = 0; i < pools.length; i++){
if (pools[i].options.coin.name.toLowerCase() === message.coin.toLowerCase()){
pools[i].processBlockNotify(message.blockHash)
return;
}
}
}
});
poolConfigs.forEach(function(poolOptions){
var logIdentify = poolOptions.coin.name + ' (Fork ' + fork + ')';
var authorizeFN = function (ip, workerName, password, callback) {
// Default implementation just returns true
logDebug(logIdentify, 'client', "Authorize [" + ip + "] " + workerName + ":" + password);
callback({
error: null,
authorized: true,
disconnect: false
});
};
var pool = Stratum.createPool(poolOptions, authorizeFN);
pool.on('share', function(isValidShare, isValidBlock, data){
var shareData = JSON.stringify(data);
if (data.solution && !isValidBlock){
logDebug(logIdentify, 'client', 'We thought a block solution was found but it was rejected by the daemon, share data: ' + shareData);
return;
}
else if (!isValidShare){
logDebug(logIdentify, 'client', 'Invalid share submitted, share data: ' + shareData)
return;
}
logDebug(logIdentify, 'client', 'Valid share submitted, share data: ' + shareData);
process.send({type: 'share', share: shareData, coin: poolOptions.coin.name});
if (isValidBlock){
logDebug(logIdentify, 'client', 'Block found, solution: ' + shareData.solution);
process.send({
type: 'block',
share: shareData,
coin: poolOptions.coin.name,
confirmations: poolOptions.rewards.blockConfirmations
});
}
}).on('log', function(severity, logKey, logText) {
if (severity == 'debug') {
logDebug(logIdentify, logKey, logText);
} else if (severity == 'warning') {
logWarning(logIdentify, logKey, logText);
} else if (severity == 'error') {
logError(logIdentify, logKey, logText);
}
});
pool.start();
pools.push(pool);
});
};

51
libs/shareProcessor.js Normal file
View File

@ -0,0 +1,51 @@
var events = require('events');
var cluster = require('cluster');
var redis = require('redis');
var processor = module.exports = function processor(logger){
var _this = this;
var client;
function handleShare(data){
var shareData = data.share;
var coin = data.coin;
client.hincrby([coin + ':' + shareData.height, shareData.worker, shareData.difficulty], function(error, result){
if (error)
logger.logError('shareProcessor', 'database', 'could not store worker share')
});
}
function handleBlock(data){
var requiredConfirmations = data.confirmations;
//setInterval where we check for block confirmations
//probably create our own rpc interface for each pool
}
this.init = function(){
client = redis.createClient();
client.on("error", function (err) {
logger.logError('shareProcessor', 'database', 'Redis client had an error: ' + err);
});
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].on('message', function(data){
switch(data.type){
case 'share':
handleShare(data);
break;
case 'block':
handleBlock(data)
break;
}
});
});
}
};
processor.prototype.__proto__ = events.EventEmitter.prototype;

View File

@ -7,7 +7,8 @@
"stratum-pool": "zone117x/node-stratum", "stratum-pool": "zone117x/node-stratum",
"dateformat": "*", "dateformat": "*",
"node-json-minify": "*", "node-json-minify": "*",
"posix": "*" "posix": "*",
"redis": "*"
}, },
"devDependencies": {}, "devDependencies": {},
"scripts": { "scripts": {

View File

@ -1,44 +0,0 @@
{
"disabled": true,
"coin": "darkcoin",
"pool": {
"address": "XjkzAVe3zywGhDFSbJtqUN6xeKP37PSNSh",
"stratumPort": 3336,
"difficulty": 0.005,
"blockRefreshInterval": 1000
},
"daemons": [
{
"host": "localhost",
"port": 15342,
"user": "darkcoinrpc1",
"password": "drkpass"
}
],
"varDiff": {
"enabled": false,
"minDifficulty": 1,
"maxDifficulty": 1000,
"targetTime": 30,
"retargetTime": 120,
"variancePercent": 20
},
"p2p": {
"enabled": false,
"host": "localhost",
"port": 19333,
/* Found in src as the PROTOCOL_VERSION variable, for example:
https://github.com/litecoin-project/litecoin/blob/85f303d883ffff35238eaea5174b780c950c0ae4/src/version.h#L28
*/
"protocolVersion": 70002,
/* Magic value is different for main/testnet and for each coins. It is found in the daemon
source code as the pchMessageStart variable. For example, litecoin mainnet:
http://github.com/litecoin-project/litecoin/blob/85f303d883ffff35238eaea5174b780c950c0ae4/src/main.cpp#L3059
And for litecoin testnet:
http://github.com/litecoin-project/litecoin/blob/85f303d883ffff35238eaea5174b780c950c0ae4/src/main.cpp#L2722-L2725
*/
"magic": "fcc1b7dc"
}
}

View File

@ -1,6 +1,16 @@
{ {
"disabled": false, "disabled": false,
"coin": "litecoin", "coin": "litecoin",
"rewards": {
"enabled": true,
"blockConfirmations": 20,
"daemon":{
"host": "localhost",
"port": 19332,
"user": "litecoinrpc",
"password": "testnet"
}
},
"pool": { "pool": {
"address": "mi4iBXbBsydtcc5yFmsff2zCFVX4XG7qJc", "address": "mi4iBXbBsydtcc5yFmsff2zCFVX4XG7qJc",
"stratumPort": 3334, "stratumPort": 3334,

View File

@ -1,44 +0,0 @@
{
"disabled": true,
"coin": "skeincoin",
"pool": {
"address": "SUxtDjHYijztRKjbnBkvEbA3mQ5wFeS72H",
"stratumPort": 3336,
"difficulty": 0.005,
"blockRefreshInterval": 1000
},
"daemons": [
{
"host": "localhost",
"port": 15347,
"user": "skeincoinrpc1",
"password": "skcpass"
}
],
"varDiff": {
"enabled": false,
"minDifficulty": 16,
"maxDifficulty": 1000,
"targetTime": 30,
"retargetTime": 120,
"variancePercent": 20
},
"p2p": {
"enabled": false,
"host": "localhost",
"port": 19333,
/* Found in src as the PROTOCOL_VERSION variable, for example:
https://github.com/litecoin-project/litecoin/blob/85f303d883ffff35238eaea5174b780c950c0ae4/src/version.h#L28
*/
"protocolVersion": 70002,
/* Magic value is different for main/testnet and for each coins. It is found in the daemon
source code as the pchMessageStart variable. For example, litecoin mainnet:
http://github.com/litecoin-project/litecoin/blob/85f303d883ffff35238eaea5174b780c950c0ae4/src/main.cpp#L3059
And for litecoin testnet:
http://github.com/litecoin-project/litecoin/blob/85f303d883ffff35238eaea5174b780c950c0ae4/src/main.cpp#L2722-L2725
*/
"magic": "fcc1b7dc"
}
}