Refactored share processing / worker communication

This commit is contained in:
Matt 2014-03-04 01:33:55 -07:00
parent b91052ea7a
commit 95fe6129b3
8 changed files with 181 additions and 121 deletions

View File

@ -118,7 +118,7 @@ Description of options:
"internal": { //enabled this options for share payments to be processed and sent locally
"enabled": true,
/* This daemon is used to send out payments. It MUST be for the daemon that owns the
'pool.address' field below, otherwise the daemon will not be able to confirm blocks
'address' field above, otherwise the daemon will not be able to confirm blocks
or sent out payments. */
"daemon": {
"host": "localhost",

View File

@ -6,7 +6,7 @@ var cluster = require('cluster');
var posix = require('posix');
var PoolLogger = require('./libs/logutils.js');
var BlocknotifyListener = require('./libs/blocknotifyListener.js');
var ShareProcessor = require('./libs/shareProcessor.js');
var WorkerListener = require('./libs/workerListener.js');
var PoolWorker = require('./libs/poolWorker.js');
JSON.minify = JSON.minify || require("node-json-minify");
@ -97,8 +97,8 @@ if (cluster.isMaster){
var shareProcessor = new ShareProcessor(loggerInstance, poolConfigs);
shareProcessor.init();
var workerListener = new WorkerListener(loggerInstance, poolConfigs);
workerListener.init();

92
libs/mposCompatibility.js Normal file
View File

@ -0,0 +1,92 @@
var mysql = require('mysql');
module.exports = function(logger, poolConfigs){
var dbConnections = (function(){
var connections = {};
Object.keys(poolConfigs).forEach(function(coin) {
var config = poolConfigs[coin];
if (!config.shareProcessing || !config.shareProcessing.mpos || !config.shareProcessing.mpos.enabled)
return;
var mposConfig = config.shareProcessing.mpos;
function connect(){
var connection = connections[coin] = mysql.createConnection({
host: mposConfig.host,
port: mposConfig.port,
user: mposConfig.user,
password: mposConfig.password,
database: mposConfig.database
});
connection.connect(function(err){
if (err)
logger.logError('shareProcessor', 'mysql', config.coin.name +
' - could not connect to mysql database: ' + JSON.stringify(err))
else{
logger.logDebug('shareProcessor', 'mysql', config.coin.name +
' - successful connection to MySQL database');
}
});
connection.on('error', function(err){
if(err.code === 'PROTOCOL_CONNECTION_LOST') {
logger.logWarning('shareProcessor', 'mysql', config.coin.name +
' - lost connection to MySQL database, attempting reconnection...');
connect();
}
else{
logger.logError('shareProcessor', 'mysql', config.coin.name +
' - mysql database error: ' + JSON.stringify(err))
}
});
}
connect();
});
return connections;
});
this.handleAuth = function(allowCallback){
};
this.handleShare = function(isValidShare, isValidBlock, data){
if ((!data.coin in dbConnections)) return;
var connection = dbConnections[data.coin];
connection.query(
'INSERT INTO `shares` SET time = NOW(), rem_host = ?, username = ?, our_result = ?, upstream_result = ?, difficulty = ?, reason = ?, solution = ?',
[data.ip, data.worker, isValidShare ? 'Y' : 'N', isValidBlock ? 'Y' : 'N', data.difficulty, data.error, data.solution],
function(err, result) {
if (err)
logger.logError('shareProcessor', 'mysql', 'MySQL insert error when adding share: ' +
JSON.stringify(err));
}
);
};
this.handleDifficultyUpdate = function(workerName, diff){
if ((!data.coin in dbConnections)) return;
var connection = dbConnections[data.coin];
connection.query(
'UPDATE `pool_worker` SET `difficulty` = ' + diff + ' WHERE `username` = ' + connection.escape(workerName),
function(err, result){
if (err)
logger.logError('shareProcessor', 'mysql', 'MySQL error when updating worker diff: ' +
JSON.stringify(err));
else if (result.affectedRows === 0){
connection.query('INSERT INTO `pool_worker` SET ?', {username: workerName, difficulty: diff});
}
else
console.log('Updated difficulty successfully', result);
}
);
};
};

View File

@ -73,15 +73,24 @@ module.exports = function(logger){
});
}
}).on('difficultyUpdate', function(workerName, diff){
if (poolOptions.shareProcessing.mpos.enabled){
process.send({
type: 'difficultyUpdate',
workerName: workerName,
diff: diff,
coin: poolOptions.coin.name
});
}
}).on('log', function(severity, logKey, logText) {
if (severity == 'debug') {
logDebug(logIdentify, logKey, logText);
} else if (severity == 'warning') {
logWarning(logIdentify, logKey, logText);
} else if (severity == 'error') {
logError(logIdentify, logKey, logText);
}
});
if (severity == 'debug') {
logDebug(logIdentify, logKey, logText);
} else if (severity == 'warning') {
logWarning(logIdentify, logKey, logText);
} else if (severity == 'error') {
logError(logIdentify, logKey, logText);
}
});
pool.start();
pools.push(pool);
});

View File

@ -1,76 +1,27 @@
var events = require('events');
var cluster = require('cluster');
var redis = require('redis');
var mysql = require('mysql');
var processor = module.exports = function processor(logger, poolConfigs){
module.exports = function(logger, poolConfigs){
var _this = this;
//TODO: need to add redis config to json. probably do one redis client per pool?
var client;
var poolMposHandlers = (function(){
var handlers = {};
client = redis.createClient();
Object.keys(poolConfigs).forEach(function(coin) {
client.on("error", function (err) {
logger.logError('shareProcessor', 'redis', 'Redis client had an error: ' + err);
});
var config = poolConfigs[coin];
this.handleDifficultyUpdate = function(data){
var coin = data.coin;
var poolConfig = poolConfigs[coin];
if (poolConfig.shareProcessing.mpos && poolConfig.shareProcessing.mpos.enabled){
poolMposHandlers[coin].updateDifficulty(data.workerName, data.diff);
}
};
if (!config.shareProcessing || !config.shareProcessing.mpos || !config.shareProcessing.mpos.enabled)
return;
var mposConfig = config.shareProcessing.mpos;
var connection = mysql.createConnection({
host: mposConfig.host,
port: mposConfig.port,
user: mposConfig.user,
password: mposConfig.password,
database: mposConfig.database
});
connection.connect(function(err){
logger.logError('shareProcessor', 'database', config.coin.name +
' - could not connect to mysql database: ' + JSON.stringify(err))
});
connection.on('error', function(err){
logger.logError('shareProcessor', 'database', config.coin.name +
' - mysql database error: ' + JSON.stringify(err))
});
var insertShare = function(isValidShare, isValidBlock, data){
connection.query(
'INSERT INTO `shares` SET time = NOW(), rem_host = ?, username = ?, our_result = ?, upstream_result = ?, difficulty = ?, reason = ?, solution = ?',
[data.ip, data.worker, isValidShare ? 'Y' : 'N', isValidBlock ? 'Y' : 'N', data.difficulty, data.error, data.solution],
function(err, result) {
if (err)
logger.logError('shareProcessor', 'database', 'MySQL insert error when adding share: ' +
JSON.stringify(err));
}
);
};
var updateDifficulty = function(workerName, diff){
connection.query(
'UPDATE `pool_worker` SET `difficulty` = ' + diff + ' WHERE `username` = ' + connection.escape(workerName),
function(err, result){
if (err)
logger.logError('shareProcessor', 'database', 'MySQL error when updating worker diff: ' +
JSON.stringify(err));
else if (result.affectedRows === 0){
connection.query('INSERT INTO `pool_worker` SET ?', {username: workerName, difficulty: diff});
}
else
console.log('Updated difficulty successfully', result);
}
);
};
handlers[config.coin.name] = {insertShare: insertShare, updateDifficulty: updateDifficulty};
});
return handlers;
})();
function handleShare(data){
this.handleShare = function(data){
var shareData = data.share;
var coin = data.coin;
var poolConfig = poolConfigs[coin];
@ -82,39 +33,12 @@ var processor = module.exports = function processor(logger, poolConfigs){
if (poolConfig.shareProcessing.internal && poolConfig.shareProcessing.internal.enable && data.isValidShare){
client.hincrby([coin + ':' + shareData.height, shareData.worker, shareData.difficulty], function(error, result){
if (error)
logger.logError('shareProcessor', 'database', 'could not store worker share')
logger.logError('shareProcessor', 'redis', 'could not store worker share')
});
}
}
};
function handleBlock(data){
var requiredConfirmations = data.confirmations;
//setInterval where we check for block confirmations
//probably create our own rpc interface for each pool
}
this.init = function(){
client = redis.createClient();
client.on("error", function (err) {
logger.logError('shareProcessor', 'database', 'Redis client had an error: ' + err);
});
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].on('message', function(data){
switch(data.type){
case 'share':
handleShare(data);
break;
case 'block':
handleBlock(data)
break;
}
});
});
}
};
processor.prototype.__proto__ = events.EventEmitter.prototype;
this.handleBlock = function(data){
//
};
};

46
libs/workerListener.js Normal file
View File

@ -0,0 +1,46 @@
var events = require('events');
var cluster = require('cluster');
var MposCompatibility = require('./mposCompatibility.js');
var ShareProcessor = require('./shareProcessor.js');
var processor = module.exports = function processor(logger, poolConfigs){
var _this = this;
var mposCompat = new MposCompatibility(logger, poolConfigs);
var shareProcessor = new ShareProcessor(logger, poolConfigs);
this.init = function(){
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].on('message', function(data){
var shareProcessing = poolConfigs[data.coin].shareProcessing;
switch(data.type){
case 'share':
if (shareProcessing.internal.enabled)
shareProcessor.handleShare(data);
if (shareProcessing.mpos.enabled)
mposCompat.handleShare(data);
break;
case 'difficultyUpdate':
if (shareProcessing.mpos.enabled)
mposCompat.handleDifficultyUpdate(data);
break;
case 'block':
if (shareProcessing.internal.enabled)
shareProcessor.handleBlock(data);
break;
}
});
});
}
};
processor.prototype.__proto__ = events.EventEmitter.prototype;

View File

@ -56,7 +56,7 @@
],
"p2p": {
"enabled": true,
"enabled": false,
"host": "localhost",
"port": 19333,
"protocolVersion": 70002,

11
test.js
View File

@ -1,11 +0,0 @@
var net = require('net');
var socketServer = net.createServer({allowHalfOpen: true}, function(socket){
console.log(socket);
});
socketServer.listen(1111, function(){
console.log('started 0');
});
socketServer.listen(1112, function(){
console.log('started 1');
})