Stuff for miners switch

This commit is contained in:
Andrea 2014-03-14 19:11:52 +00:00
parent 7cf448d1e2
commit a4f302b526
5 changed files with 181 additions and 209 deletions

View File

@ -45,92 +45,6 @@ function DaemonInterface(options){
}
function performHttpRequest(instance, jsonData, callback){
var options = {
hostname: (typeof(instance.host) === 'undefined' ? 'localhost' : instance.host),
port : instance.port,
method : 'POST',
auth : instance.user + ':' + instance.password,
headers : {
'Content-Length': jsonData.length
}
};
var req = http.request(options, function(res) {
var data = '';
res.setEncoding('utf8');
res.on('data', function (chunk) {
data += chunk;
});
res.on('end', function(){
var dataJson;
var parsingError;
try{
dataJson = JSON.parse(data);
}
catch(e){
if (res.statusCode === 401){
parsingError = 'unauthorized';
_this.emit('error', 'Invalid RPC username or password');
}
else{
parsingError = e;
_this.emit('error', 'could not parse rpc data with request of: ' + jsonData +
' on instance ' + instance.index + ' data: ' + data);
}
}
if (typeof(dataJson) !== 'undefined'){
callback(dataJson.error, dataJson);
}
else
callback(parsingError);
});
});
req.on('error', function(e) {
if (e.code === 'ECONNREFUSED')
callback({type: 'offline', message: e.message}, null);
else
callback({type: 'request error', message: e.message}, null);
});
req.end(jsonData);
};
//Performs a batch JSON-RPC command - only uses the first configured rpc daemon
/* First argument must have:
[
[ methodName, [params] ],
[ methodName, [params] ]
]
*/
function batchCmd(cmdArray, callback){
var requestJson = [];
for (var i = 0; i < cmdArray.length; i++){
requestJson.push({
method: cmdArray[i][0],
params: cmdArray[i][1],
id: Date.now() + Math.floor(Math.random() * 10) + i
});
}
var serializedRequest = JSON.stringify(requestJson);
performHttpRequest(instances[0], serializedRequest, function(error, result){
callback(error, result);
}, 'fuck');
}
/* Sends a JSON RPC (http://json-rpc.org/wiki/specification) command to every configured daemon.
The callback function is fired once with the result from each daemon unless streamResults is
set to true. */
@ -141,7 +55,7 @@ function DaemonInterface(options){
async.each(instances, function(instance, eachCallback){
var itemFinished = function(error, result){
var returnObj = {error: error, response: result.result, instance: instance};
var returnObj = {error: error, response: result, instance: instance};
if (streamResults) callback(returnObj);
else results.push(returnObj);
eachCallback();
@ -149,15 +63,55 @@ function DaemonInterface(options){
};
var requestJson = JSON.stringify({
id: Date.now() + Math.floor(Math.random() * 10),
method: method,
params: params,
id: Date.now() + Math.floor(Math.random() * 10)
params: params
});
performHttpRequest(instance, requestJson, function(error, result){
itemFinished(error, result);
var options = {
hostname: (typeof(instance.host) === 'undefined' ? 'localhost' : instance.host),
port : instance.port,
method : 'POST',
auth : instance.user + ':' + instance.password,
headers : {
'Content-Length': requestJson.length
}
};
var req = http.request(options, function(res) {
var data = '';
res.setEncoding('utf8');
res.on('data', function (chunk) {
data += chunk;
});
res.on('end', function(){
var dataJson;
var parsingError;
try{
dataJson = JSON.parse(data);
}
catch(e){
parsingError = e;
_this.emit('error', 'could not parse rpc data from method: ' + method +
' on instance ' + instance.index + ' data: ' + data);
}
if (typeof(dataJson) !== 'undefined')
itemFinished(dataJson.error, dataJson.result);
else
itemFinished(parsingError);
});
});
req.on('error', function(e) {
if (e.code === 'ECONNREFUSED')
itemFinished({type: 'offline', message: e.message}, null);
else
itemFinished({type: 'request error', message: e.message}, null);
});
req.end(requestJson);
}, function(){
if (!streamResults){
@ -173,7 +127,6 @@ function DaemonInterface(options){
this.init = init;
this.isOnline = isOnline;
this.cmd = cmd;
this.batchCmd = batchCmd;
}
DaemonInterface.prototype.__proto__ = events.EventEmitter.prototype;

View File

@ -270,7 +270,6 @@ var JobManager = module.exports = function JobManager(options){
worker: workerName,
difficulty: difficulty,
height: job.rpcData.height,
reward: job.rpcData.coinbasevalue,
networkDifficulty: job.difficulty,
solution: blockHash
}, blockHex);

View File

@ -28,7 +28,7 @@ var pool = module.exports = function pool(options, authorizeFn){
var _this = this;
var publicKeyBuffer;
var blockPollingIntervalId;
var emitLog = function(key, text) { _this.emit('log', 'debug' , key, text); };
@ -39,7 +39,14 @@ var pool = module.exports = function pool(options, authorizeFn){
emitLog('system', 'Starting pool for ' + options.coin.name + ' [' + options.coin.symbol.toUpperCase() + ']');
SetupJobManager();
SetupVarDiff();
SetupDaemonInterface();
SetupDaemonInterface(function (err, newDaemon) {
if (!err) {
_this.daemon = newDaemon;
SetupBlockPolling();
StartStratumServer();
SetupPeer();
}
});
SetupApi();
};
@ -69,23 +76,8 @@ var pool = module.exports = function pool(options, authorizeFn){
}
function SetupVarDiff(){
_this.varDiff = new varDiff(options.ports);
_this.varDiff.on('newDifficulty', function(client, newDiff) {
/* We request to set the newDiff @ the next difficulty retarget
(which should happen when a new job comes in - AKA BLOCK) */
client.enqueueNextDifficulty(newDiff);
/*if (options.varDiff.mode === 'fast'){
//Send new difficulty, then force miner to use new diff by resending the
//current job parameters but with the "clean jobs" flag set to false
//so the miner doesn't restart work and submit duplicate shares
client.sendDifficulty(newDiff);
var job = _this.jobManager.currentJob.getJobParams();
job[8] = false;
client.sendMiningJob(job);
}*/
Object.keys(options.ports).forEach(function(port) {
_this.setVarDiff(port, new varDiff(port, options.ports[port]));
});
}
@ -163,9 +155,8 @@ var pool = module.exports = function pool(options, authorizeFn){
emitShare();
else{
SubmitBlock(blockHex, function(){
CheckBlockAccepted(shareData.solution, function(isAccepted, tx){
CheckBlockAccepted(shareData.solution, function(isAccepted){
isValidBlock = isAccepted;
shareData.tx = tx;
emitShare();
});
});
@ -176,13 +167,13 @@ var pool = module.exports = function pool(options, authorizeFn){
}
function SetupDaemonInterface(){
function SetupDaemonInterface(cback){
emitLog('system', 'Connecting to daemon(s)');
_this.daemon = new daemon.interface(options.daemons);
_this.daemon.once('online', function(){
var newDaemon = new daemon.interface(options.daemons);
newDaemon.once('online', function(){
async.parallel({
addressInfo: function(callback){
_this.daemon.cmd('validateaddress', [options.address], function(results){
newDaemon.cmd('validateaddress', [options.address], function(results){
//Make sure address is valid with each daemon
var allValid = results.every(function(result){
@ -205,7 +196,7 @@ var pool = module.exports = function pool(options, authorizeFn){
});
},
miningInfo: function(callback){
_this.daemon.cmd('getmininginfo', [], function(results){
newDaemon.cmd('getmininginfo', [], function(results){
// Print which network each daemon is running on
@ -252,7 +243,7 @@ var pool = module.exports = function pool(options, authorizeFn){
submitMethod: function(callback){
/* This checks to see whether the daemon uses submitblock
or getblocktemplate for submitting new blocks */
_this.daemon.cmd('submitblock', [], function(results){
newDaemon.cmd('submitblock', [], function(results){
var couldNotDetectMethod = results.every(function(result){
if (result.error && result.error.message === 'Method not found'){
callback(null, false);
@ -274,6 +265,7 @@ var pool = module.exports = function pool(options, authorizeFn){
}, function(err, results){
if (err){
emitErrorLog('system', 'Could not start pool, ' + JSON.stringify(err));
cback(err);
return;
}
@ -285,6 +277,7 @@ var pool = module.exports = function pool(options, authorizeFn){
if (options.coin.reward === 'POS' && typeof(results.addressInfo.pubkey) == 'undefined') {
// address provided is not of the wallet.
emitErrorLog('system', 'The address provided is not from the daemon wallet.');
cback(err);
return;
} else {
@ -304,15 +297,13 @@ var pool = module.exports = function pool(options, authorizeFn){
'than network difficulty of ' + networkDifficulty);
});
GetBlockTemplate(function(error, result){
if (error){
GetBlockTemplate(newDaemon, function(error, result){
if (error) {
console.error(error);
emitErrorLog('system', 'Error with getblocktemplate on initializing');
}
else{
SetupBlockPolling();
StartStratumServer();
SetupPeer();
cback(error);
} else {
cback(null, newDaemon); // finish!
}
});
}
@ -324,7 +315,7 @@ var pool = module.exports = function pool(options, authorizeFn){
emitErrorLog('system', message);
});
_this.daemon.init();
newDaemon.init();
}
@ -334,8 +325,10 @@ var pool = module.exports = function pool(options, authorizeFn){
emitLog('system','Stratum server started on port(s): ' + Object.keys(options.ports).join(', '));
_this.emit('started');
}).on('client.connected', function(client){
_this.varDiff.manageClient(client);
if (typeof(_this.varDiff[client.socket.localPort]) !== 'undefined') {
_this.varDiff[client.socket.localPort].manageClient(client);
}
client.on('difficultyChanged', function(diff){
_this.emit('difficultyUpdate', client.workerName, diff);
@ -348,7 +341,12 @@ var pool = module.exports = function pool(options, authorizeFn){
extraNonce2Size
);
this.sendDifficulty(options.ports[client.socket.localPort].diff);
if (typeof(options.ports[client.socket.localPort]) !== 'undefined' && options.ports[client.socket.localPort].diff) {
this.sendDifficulty(options.ports[client.socket.localPort].diff);
} else {
this.sendDifficulty(8);
}
this.sendMiningJob(_this.jobManager.currentJob.getJobParams());
}).on('submit', function(params, resultCallback){
@ -391,14 +389,18 @@ var pool = module.exports = function pool(options, authorizeFn){
var pollingInterval = options.blockRefreshInterval;
setInterval(function () {
blockPollingIntervalId = setInterval(function () {
GetBlockTemplate(function(error, result){});
}, pollingInterval);
emitLog('system', 'Block polling every ' + pollingInterval + ' milliseconds');
}
function GetBlockTemplate(callback){
_this.daemon.cmd('getblocktemplate',
function GetBlockTemplate(daemonObj, callback){
if (typeof(callback) === 'undefined') {
callback = daemonObj;
daemonObj = _this.daemon;
}
daemonObj.cmd('getblocktemplate',
[{"capabilities": [ "coinbasetxn", "workid", "coinbase/append" ]}],
function(result){
if (result.error){
@ -408,8 +410,12 @@ var pool = module.exports = function pool(options, authorizeFn){
} else {
var processedNewBlock = _this.jobManager.processTemplate(result.response, publicKeyBuffer);
if (processedNewBlock)
_this.varDiff.setNetworkDifficulty(_this.jobManager.currentJob.difficulty);
if (processedNewBlock) {
Object.keys(_this.varDiff).forEach(function(port){
_this.varDiff[port].setNetworkDifficulty(_this.jobManager.currentJob.difficulty);
});
}
callback(null, result.response);
callback = function(){};
@ -422,13 +428,9 @@ var pool = module.exports = function pool(options, authorizeFn){
_this.daemon.cmd('getblock',
[blockHash],
function(results){
var validResults = results.filter(function(result){
return result.response && (result.response.hash === blockHash)
});
if (validResults.length >= 1){
callback(true, validResults[0].response.tx[0]);
if (results.filter(function(result){return result.response &&
(result.response.hash === blockHash)}).length >= 1){
callback(true);
}
else{
callback(false);
@ -442,8 +444,9 @@ var pool = module.exports = function pool(options, authorizeFn){
* This method is being called from the blockNotify so that when a new block is discovered by the daemon
* We can inform our miners about the newly found block
**/
this.processBlockNotify = function(blockHash){
if (blockHash !== _this.jobManager.currentJob.rpcData.previousblockhash){
this.processBlockNotify = function(blockHash) {
if (typeof(_this.jobManager.currentJob) !== 'undefined' && blockHash !== _this.jobManager.currentJob.rpcData.previousblockhash){
GetBlockTemplate(function(error, result){
if (error)
emitErrorLog('system', 'Block notify error getting block template for ' + options.coin.name);
@ -451,6 +454,8 @@ var pool = module.exports = function pool(options, authorizeFn){
}
};
this.relinquishMiners = function(filterFn, resultCback) {
var origStratumClients = this.stratumServer.getStratumClients();
@ -485,7 +490,40 @@ var pool = module.exports = function pool(options, authorizeFn){
miners.forEach(function (clientObj) {
_this.stratumServer.manuallyAddStratumClient(clientObj);
});
}
_this.stratumServer.broadcastMiningJobs(_this.jobManager.currentJob.getJobParams());
};
this.getStratumServer = function() {
return _this.stratumServer;
};
this.setVarDiff = function(port, varDiffInstance) {
if (typeof(_this.varDiff) === 'undefined') {
_this.varDiff = {};
}
if (typeof(_this.varDiff[port]) != 'undefined' ) {
_this.varDiff[port].removeAllListeners();
}
_this.varDiff[port] = varDiffInstance;
_this.varDiff[port].on('newDifficulty', function(client, newDiff) {
/* We request to set the newDiff @ the next difficulty retarget
(which should happen when a new job comes in - AKA BLOCK) */
client.enqueueNextDifficulty(newDiff);
/*if (options.varDiff.mode === 'fast'){
//Send new difficulty, then force miner to use new diff by resending the
//current job parameters but with the "clean jobs" flag set to false
//so the miner doesn't restart work and submit duplicate shares
client.sendDifficulty(newDiff);
var job = _this.jobManager.currentJob.getJobParams();
job[8] = false;
client.sendMiningJob(job);
}*/
});
};
};
pool.prototype.__proto__ = events.EventEmitter.prototype;

View File

@ -271,9 +271,8 @@ var StratumClient = function(options){
};
this.manuallyInitClient = function (username, password) {
this.manuallyAuthClient = function (username, password) {
handleAuthorize({id: 1, params: [username, password]}, false /*do not reply to miner*/);
handleSubscribe({id: 2});
}
};
StratumClient.prototype.__proto__ = events.EventEmitter.prototype;
@ -292,16 +291,14 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
//private members
var socketTimeout = connectionTimeout * 1000;
var bannedMS = banning.time * 1000;
var _this = this;
var stratumClients = {};
var socketTimeout = connectionTimeout * 1000;
var bannedMS = banning.time * 1000;
var _this = this;
var stratumClients = {};
var subscriptionCounter = SubscriptionCounter();
var rebroadcastTimeout;
var bannedIPs = {};
var bannedIPs = {};
//Interval to look through bannedIPs for old bans and remove them in order to prevent a memory leak
var purgeOldBans = !banning.enabled ? null : setInterval(function(){
@ -312,7 +309,20 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
}
}, 1000 * banning.purgeInterval);
var handleNewClient = function (socket) {
this.handleNewClient = function (socket) {
if (banning.enabled && socket.remoteAddress in bannedIPs){
var bannedTime = bannedIPs[socket.remoteAddress];
if ((Date.now() - bannedTime) < bannedMS){
socket.end();
return null;
}
else {
delete bannedIPs[socket.remoteAddress];
}
}
// If we're here the client was not banned.
socket.setKeepAlive(true);
var subscriptionId = subscriptionCounter.next();
var client = new StratumClient(
@ -327,6 +337,7 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
stratumClients[subscriptionId] = client;
_this.emit('client.connected', client);
client.on('socketDisconnect', function() {
console.log("Socket disconnected for: "+client);
_this.removeStratumClientBySubId(subscriptionId);
_this.emit('client.disconnected', client);
}).on('ban', function(ipAddress){
@ -337,28 +348,11 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
(function init(){
var serversStarted = 0;
Object.keys(ports).forEach(function(port){
net.createServer({allowHalfOpen: false}, function(socket){
if (banning.enabled && socket.remoteAddress in bannedIPs){
var bannedTime = bannedIPs[socket.remoteAddress];
if ((Date.now() - bannedTime) < bannedMS){
socket.end();
return;
}
else{
delete bannedIPs[socket.remoteAddress];
}
}
handleNewClient(socket);
}).listen(parseInt(port), function(){
serversStarted++;
if (serversStarted === Object.keys(ports).length)
net .createServer({allowHalfOpen: true}, function(socket) { _this.handleNewClient(socket); } )
.listen(parseInt(port), function(){
_this.emit('started');
});
});
});
})();
@ -379,15 +373,6 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
client.sendMiningJob(jobParams);
}
}
/* Some miners will consider the pool dead if it doesn't receive a job at least every 30 seconds.
So every time broadcast jobs, we set a timeout to rebroadcast in 30 seconds unless cleared. */
clearTimeout(rebroadcastTimeout);
rebroadcastTimeout = setTimeout(function(){
var resendParams = jobParams;
resendParams[8] = false;
_this.broadcastMiningJobs(resendParams);
}, 30000);
};
this.getStratumClients = function () {
@ -399,8 +384,10 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim
};
this.manuallyAddStratumClient = function(clientObj) {
var subId = handleNewClient(clientObj.socket);
stratumClients[subscriptionId].manuallyInit(clientObj.workerName, clientObj.workerPass);
var subId = _this.handleNewClient(clientObj.socket);
if (subId != null) { // not banned!
stratumClients[subId].manuallyAuthClient(clientObj.workerName, clientObj.workerPass);
}
}
};

View File

@ -41,25 +41,21 @@ function RingBuffer(maxSize){
}
var varDiff = module.exports = function varDiff(ports){
var varDiff = module.exports = function varDiff(port, varDiffOptions){
var _this = this;
var networkDifficulty;
var portsCalcInfo = {};
var bufferSize, tMin, tMax;
Object.keys(ports).forEach(function(port){
var varDiffOptions = ports[port].varDiff;
if (!varDiffOptions) return;
if (!varDiffOptions) return;
var variance = varDiffOptions.targetTime * (varDiffOptions.variancePercent / 100);
var variance = varDiffOptions.targetTime * (varDiffOptions.variancePercent / 100);
portsCalcInfo[parseInt(port)] = {
bufferSize: varDiffOptions.retargetTime / varDiffOptions.targetTime * 4,
tMin: varDiffOptions.targetTime - variance,
tMax: varDiffOptions.targetTime + variance
}
});
bufferSize = varDiffOptions.retargetTime / varDiffOptions.targetTime * 4;
tMin = varDiffOptions.targetTime - variance;
tMax = varDiffOptions.targetTime + variance;
this.setNetworkDifficulty = function(diff){
networkDifficulty = diff;
@ -70,11 +66,10 @@ var varDiff = module.exports = function varDiff(ports){
var stratumPort = client.socket.localPort;
if (!(stratumPort in portsCalcInfo))
return;
var calcInfo = portsCalcInfo[stratumPort];
var options = ports[stratumPort].varDiff;
if (stratumPort != port) {
console.error("Handling a client which is not of this vardiff?");
}
var options = varDiffOptions
var lastTs;
var lastRtc;
@ -87,7 +82,7 @@ var varDiff = module.exports = function varDiff(ports){
if (!lastRtc){
lastRtc = ts - options.retargetTime / 2;
lastTs = ts;
timeBuffer = new RingBuffer(calcInfo.bufferSize);
timeBuffer = new RingBuffer(bufferSize);
return;
}
@ -103,12 +98,12 @@ var varDiff = module.exports = function varDiff(ports){
var avg = timeBuffer.avg();
var ddiff;
if (avg > calcInfo.tMax && client.difficulty > options.minDiff) {
if (avg > tMax && client.difficulty > options.minDiff) {
ddiff = 0.5;
if (ddiff * client.difficulty < options.minDiff) {
ddiff = options.minDiff / client.difficulty;
}
} else if (avg < calcInfo.tMin) {
} else if (avg < tMin) {
ddiff = 2;
var diffMax = networkDifficulty < options.maxDiff ? networkDifficulty : options.maxDiff;