From a4f302b52634cc58b7d7b4fc12a906f279b9adde Mon Sep 17 00:00:00 2001 From: Andrea Date: Fri, 14 Mar 2014 19:11:52 +0000 Subject: [PATCH] Stuff for miners switch --- lib/daemon.js | 137 +++++++++++++++---------------------------- lib/jobManager.js | 1 - lib/pool.js | 144 +++++++++++++++++++++++++++++----------------- lib/stratum.js | 73 ++++++++++------------- lib/varDiff.js | 35 +++++------ 5 files changed, 181 insertions(+), 209 deletions(-) diff --git a/lib/daemon.js b/lib/daemon.js index 6b145ea..a287eb7 100644 --- a/lib/daemon.js +++ b/lib/daemon.js @@ -45,92 +45,6 @@ function DaemonInterface(options){ } - function performHttpRequest(instance, jsonData, callback){ - var options = { - hostname: (typeof(instance.host) === 'undefined' ? 'localhost' : instance.host), - port : instance.port, - method : 'POST', - auth : instance.user + ':' + instance.password, - headers : { - 'Content-Length': jsonData.length - } - }; - - var req = http.request(options, function(res) { - var data = ''; - res.setEncoding('utf8'); - res.on('data', function (chunk) { - data += chunk; - - }); - res.on('end', function(){ - - var dataJson; - var parsingError; - try{ - dataJson = JSON.parse(data); - - } - catch(e){ - if (res.statusCode === 401){ - parsingError = 'unauthorized'; - _this.emit('error', 'Invalid RPC username or password'); - } - else{ - parsingError = e; - _this.emit('error', 'could not parse rpc data with request of: ' + jsonData + - ' on instance ' + instance.index + ' data: ' + data); - } - } - if (typeof(dataJson) !== 'undefined'){ - callback(dataJson.error, dataJson); - } - else - callback(parsingError); - - }); - }); - - req.on('error', function(e) { - if (e.code === 'ECONNREFUSED') - callback({type: 'offline', message: e.message}, null); - else - callback({type: 'request error', message: e.message}, null); - }); - - req.end(jsonData); - }; - - - - //Performs a batch JSON-RPC command - only uses the first configured rpc daemon - /* First argument must have: - [ - [ methodName, [params] ], - [ methodName, [params] ] - ] - */ - - function batchCmd(cmdArray, callback){ - - var requestJson = []; - - for (var i = 0; i < cmdArray.length; i++){ - requestJson.push({ - method: cmdArray[i][0], - params: cmdArray[i][1], - id: Date.now() + Math.floor(Math.random() * 10) + i - }); - } - - var serializedRequest = JSON.stringify(requestJson); - - performHttpRequest(instances[0], serializedRequest, function(error, result){ - callback(error, result); - }, 'fuck'); - - } - /* Sends a JSON RPC (http://json-rpc.org/wiki/specification) command to every configured daemon. The callback function is fired once with the result from each daemon unless streamResults is set to true. */ @@ -141,7 +55,7 @@ function DaemonInterface(options){ async.each(instances, function(instance, eachCallback){ var itemFinished = function(error, result){ - var returnObj = {error: error, response: result.result, instance: instance}; + var returnObj = {error: error, response: result, instance: instance}; if (streamResults) callback(returnObj); else results.push(returnObj); eachCallback(); @@ -149,15 +63,55 @@ function DaemonInterface(options){ }; var requestJson = JSON.stringify({ + id: Date.now() + Math.floor(Math.random() * 10), method: method, - params: params, - id: Date.now() + Math.floor(Math.random() * 10) + params: params }); - performHttpRequest(instance, requestJson, function(error, result){ - itemFinished(error, result); + var options = { + hostname: (typeof(instance.host) === 'undefined' ? 'localhost' : instance.host), + port : instance.port, + method : 'POST', + auth : instance.user + ':' + instance.password, + headers : { + 'Content-Length': requestJson.length + } + }; + + var req = http.request(options, function(res) { + var data = ''; + res.setEncoding('utf8'); + res.on('data', function (chunk) { + data += chunk; + }); + res.on('end', function(){ + var dataJson; + var parsingError; + try{ + dataJson = JSON.parse(data); + + } + catch(e){ + parsingError = e; + _this.emit('error', 'could not parse rpc data from method: ' + method + + ' on instance ' + instance.index + ' data: ' + data); + } + if (typeof(dataJson) !== 'undefined') + itemFinished(dataJson.error, dataJson.result); + else + itemFinished(parsingError); + + }); }); + req.on('error', function(e) { + if (e.code === 'ECONNREFUSED') + itemFinished({type: 'offline', message: e.message}, null); + else + itemFinished({type: 'request error', message: e.message}, null); + }); + + req.end(requestJson); }, function(){ if (!streamResults){ @@ -173,7 +127,6 @@ function DaemonInterface(options){ this.init = init; this.isOnline = isOnline; this.cmd = cmd; - this.batchCmd = batchCmd; } DaemonInterface.prototype.__proto__ = events.EventEmitter.prototype; diff --git a/lib/jobManager.js b/lib/jobManager.js index dc7bcaa..7e5e135 100644 --- a/lib/jobManager.js +++ b/lib/jobManager.js @@ -270,7 +270,6 @@ var JobManager = module.exports = function JobManager(options){ worker: workerName, difficulty: difficulty, height: job.rpcData.height, - reward: job.rpcData.coinbasevalue, networkDifficulty: job.difficulty, solution: blockHash }, blockHex); diff --git a/lib/pool.js b/lib/pool.js index 70792e8..98df4e3 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -28,7 +28,7 @@ var pool = module.exports = function pool(options, authorizeFn){ var _this = this; var publicKeyBuffer; - + var blockPollingIntervalId; var emitLog = function(key, text) { _this.emit('log', 'debug' , key, text); }; @@ -39,7 +39,14 @@ var pool = module.exports = function pool(options, authorizeFn){ emitLog('system', 'Starting pool for ' + options.coin.name + ' [' + options.coin.symbol.toUpperCase() + ']'); SetupJobManager(); SetupVarDiff(); - SetupDaemonInterface(); + SetupDaemonInterface(function (err, newDaemon) { + if (!err) { + _this.daemon = newDaemon; + SetupBlockPolling(); + StartStratumServer(); + SetupPeer(); + } + }); SetupApi(); }; @@ -69,23 +76,8 @@ var pool = module.exports = function pool(options, authorizeFn){ } function SetupVarDiff(){ - _this.varDiff = new varDiff(options.ports); - _this.varDiff.on('newDifficulty', function(client, newDiff) { - - /* We request to set the newDiff @ the next difficulty retarget - (which should happen when a new job comes in - AKA BLOCK) */ - client.enqueueNextDifficulty(newDiff); - - /*if (options.varDiff.mode === 'fast'){ - //Send new difficulty, then force miner to use new diff by resending the - //current job parameters but with the "clean jobs" flag set to false - //so the miner doesn't restart work and submit duplicate shares - client.sendDifficulty(newDiff); - var job = _this.jobManager.currentJob.getJobParams(); - job[8] = false; - client.sendMiningJob(job); - }*/ - + Object.keys(options.ports).forEach(function(port) { + _this.setVarDiff(port, new varDiff(port, options.ports[port])); }); } @@ -163,9 +155,8 @@ var pool = module.exports = function pool(options, authorizeFn){ emitShare(); else{ SubmitBlock(blockHex, function(){ - CheckBlockAccepted(shareData.solution, function(isAccepted, tx){ + CheckBlockAccepted(shareData.solution, function(isAccepted){ isValidBlock = isAccepted; - shareData.tx = tx; emitShare(); }); }); @@ -176,13 +167,13 @@ var pool = module.exports = function pool(options, authorizeFn){ } - function SetupDaemonInterface(){ + function SetupDaemonInterface(cback){ emitLog('system', 'Connecting to daemon(s)'); - _this.daemon = new daemon.interface(options.daemons); - _this.daemon.once('online', function(){ + var newDaemon = new daemon.interface(options.daemons); + newDaemon.once('online', function(){ async.parallel({ addressInfo: function(callback){ - _this.daemon.cmd('validateaddress', [options.address], function(results){ + newDaemon.cmd('validateaddress', [options.address], function(results){ //Make sure address is valid with each daemon var allValid = results.every(function(result){ @@ -205,7 +196,7 @@ var pool = module.exports = function pool(options, authorizeFn){ }); }, miningInfo: function(callback){ - _this.daemon.cmd('getmininginfo', [], function(results){ + newDaemon.cmd('getmininginfo', [], function(results){ // Print which network each daemon is running on @@ -252,7 +243,7 @@ var pool = module.exports = function pool(options, authorizeFn){ submitMethod: function(callback){ /* This checks to see whether the daemon uses submitblock or getblocktemplate for submitting new blocks */ - _this.daemon.cmd('submitblock', [], function(results){ + newDaemon.cmd('submitblock', [], function(results){ var couldNotDetectMethod = results.every(function(result){ if (result.error && result.error.message === 'Method not found'){ callback(null, false); @@ -274,6 +265,7 @@ var pool = module.exports = function pool(options, authorizeFn){ }, function(err, results){ if (err){ emitErrorLog('system', 'Could not start pool, ' + JSON.stringify(err)); + cback(err); return; } @@ -285,6 +277,7 @@ var pool = module.exports = function pool(options, authorizeFn){ if (options.coin.reward === 'POS' && typeof(results.addressInfo.pubkey) == 'undefined') { // address provided is not of the wallet. emitErrorLog('system', 'The address provided is not from the daemon wallet.'); + cback(err); return; } else { @@ -304,15 +297,13 @@ var pool = module.exports = function pool(options, authorizeFn){ 'than network difficulty of ' + networkDifficulty); }); - GetBlockTemplate(function(error, result){ - if (error){ + GetBlockTemplate(newDaemon, function(error, result){ + if (error) { console.error(error); emitErrorLog('system', 'Error with getblocktemplate on initializing'); - } - else{ - SetupBlockPolling(); - StartStratumServer(); - SetupPeer(); + cback(error); + } else { + cback(null, newDaemon); // finish! } }); } @@ -324,7 +315,7 @@ var pool = module.exports = function pool(options, authorizeFn){ emitErrorLog('system', message); }); - _this.daemon.init(); + newDaemon.init(); } @@ -334,8 +325,10 @@ var pool = module.exports = function pool(options, authorizeFn){ emitLog('system','Stratum server started on port(s): ' + Object.keys(options.ports).join(', ')); _this.emit('started'); }).on('client.connected', function(client){ - - _this.varDiff.manageClient(client); + if (typeof(_this.varDiff[client.socket.localPort]) !== 'undefined') { + _this.varDiff[client.socket.localPort].manageClient(client); + } + client.on('difficultyChanged', function(diff){ _this.emit('difficultyUpdate', client.workerName, diff); @@ -348,7 +341,12 @@ var pool = module.exports = function pool(options, authorizeFn){ extraNonce2Size ); - this.sendDifficulty(options.ports[client.socket.localPort].diff); + if (typeof(options.ports[client.socket.localPort]) !== 'undefined' && options.ports[client.socket.localPort].diff) { + this.sendDifficulty(options.ports[client.socket.localPort].diff); + } else { + this.sendDifficulty(8); + } + this.sendMiningJob(_this.jobManager.currentJob.getJobParams()); }).on('submit', function(params, resultCallback){ @@ -391,14 +389,18 @@ var pool = module.exports = function pool(options, authorizeFn){ var pollingInterval = options.blockRefreshInterval; - setInterval(function () { + blockPollingIntervalId = setInterval(function () { GetBlockTemplate(function(error, result){}); }, pollingInterval); emitLog('system', 'Block polling every ' + pollingInterval + ' milliseconds'); } - function GetBlockTemplate(callback){ - _this.daemon.cmd('getblocktemplate', + function GetBlockTemplate(daemonObj, callback){ + if (typeof(callback) === 'undefined') { + callback = daemonObj; + daemonObj = _this.daemon; + } + daemonObj.cmd('getblocktemplate', [{"capabilities": [ "coinbasetxn", "workid", "coinbase/append" ]}], function(result){ if (result.error){ @@ -408,8 +410,12 @@ var pool = module.exports = function pool(options, authorizeFn){ } else { var processedNewBlock = _this.jobManager.processTemplate(result.response, publicKeyBuffer); - if (processedNewBlock) - _this.varDiff.setNetworkDifficulty(_this.jobManager.currentJob.difficulty); + if (processedNewBlock) { + Object.keys(_this.varDiff).forEach(function(port){ + _this.varDiff[port].setNetworkDifficulty(_this.jobManager.currentJob.difficulty); + }); + } + callback(null, result.response); callback = function(){}; @@ -422,13 +428,9 @@ var pool = module.exports = function pool(options, authorizeFn){ _this.daemon.cmd('getblock', [blockHash], function(results){ - - var validResults = results.filter(function(result){ - return result.response && (result.response.hash === blockHash) - }); - - if (validResults.length >= 1){ - callback(true, validResults[0].response.tx[0]); + if (results.filter(function(result){return result.response && + (result.response.hash === blockHash)}).length >= 1){ + callback(true); } else{ callback(false); @@ -442,8 +444,9 @@ var pool = module.exports = function pool(options, authorizeFn){ * This method is being called from the blockNotify so that when a new block is discovered by the daemon * We can inform our miners about the newly found block **/ - this.processBlockNotify = function(blockHash){ - if (blockHash !== _this.jobManager.currentJob.rpcData.previousblockhash){ + this.processBlockNotify = function(blockHash) { + + if (typeof(_this.jobManager.currentJob) !== 'undefined' && blockHash !== _this.jobManager.currentJob.rpcData.previousblockhash){ GetBlockTemplate(function(error, result){ if (error) emitErrorLog('system', 'Block notify error getting block template for ' + options.coin.name); @@ -451,6 +454,8 @@ var pool = module.exports = function pool(options, authorizeFn){ } }; + + this.relinquishMiners = function(filterFn, resultCback) { var origStratumClients = this.stratumServer.getStratumClients(); @@ -485,7 +490,40 @@ var pool = module.exports = function pool(options, authorizeFn){ miners.forEach(function (clientObj) { _this.stratumServer.manuallyAddStratumClient(clientObj); }); - } + _this.stratumServer.broadcastMiningJobs(_this.jobManager.currentJob.getJobParams()); + + }; + + this.getStratumServer = function() { + return _this.stratumServer; + }; + + this.setVarDiff = function(port, varDiffInstance) { + if (typeof(_this.varDiff) === 'undefined') { + _this.varDiff = {}; + } + if (typeof(_this.varDiff[port]) != 'undefined' ) { + _this.varDiff[port].removeAllListeners(); + } + _this.varDiff[port] = varDiffInstance; + _this.varDiff[port].on('newDifficulty', function(client, newDiff) { + + /* We request to set the newDiff @ the next difficulty retarget + (which should happen when a new job comes in - AKA BLOCK) */ + client.enqueueNextDifficulty(newDiff); + + /*if (options.varDiff.mode === 'fast'){ + //Send new difficulty, then force miner to use new diff by resending the + //current job parameters but with the "clean jobs" flag set to false + //so the miner doesn't restart work and submit duplicate shares + client.sendDifficulty(newDiff); + var job = _this.jobManager.currentJob.getJobParams(); + job[8] = false; + client.sendMiningJob(job); + }*/ + + }); + }; }; pool.prototype.__proto__ = events.EventEmitter.prototype; \ No newline at end of file diff --git a/lib/stratum.js b/lib/stratum.js index cfec141..9149ffc 100644 --- a/lib/stratum.js +++ b/lib/stratum.js @@ -271,9 +271,8 @@ var StratumClient = function(options){ }; - this.manuallyInitClient = function (username, password) { + this.manuallyAuthClient = function (username, password) { handleAuthorize({id: 1, params: [username, password]}, false /*do not reply to miner*/); - handleSubscribe({id: 2}); } }; StratumClient.prototype.__proto__ = events.EventEmitter.prototype; @@ -292,16 +291,14 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim //private members - var socketTimeout = connectionTimeout * 1000; - var bannedMS = banning.time * 1000; - - var _this = this; - var stratumClients = {}; + var socketTimeout = connectionTimeout * 1000; + var bannedMS = banning.time * 1000; + + var _this = this; + var stratumClients = {}; var subscriptionCounter = SubscriptionCounter(); - - var rebroadcastTimeout; - - var bannedIPs = {}; + + var bannedIPs = {}; //Interval to look through bannedIPs for old bans and remove them in order to prevent a memory leak var purgeOldBans = !banning.enabled ? null : setInterval(function(){ @@ -312,7 +309,20 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim } }, 1000 * banning.purgeInterval); - var handleNewClient = function (socket) { + this.handleNewClient = function (socket) { + if (banning.enabled && socket.remoteAddress in bannedIPs){ + var bannedTime = bannedIPs[socket.remoteAddress]; + if ((Date.now() - bannedTime) < bannedMS){ + socket.end(); + return null; + } + else { + delete bannedIPs[socket.remoteAddress]; + } + } + + // If we're here the client was not banned. + socket.setKeepAlive(true); var subscriptionId = subscriptionCounter.next(); var client = new StratumClient( @@ -327,6 +337,7 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim stratumClients[subscriptionId] = client; _this.emit('client.connected', client); client.on('socketDisconnect', function() { + console.log("Socket disconnected for: "+client); _this.removeStratumClientBySubId(subscriptionId); _this.emit('client.disconnected', client); }).on('ban', function(ipAddress){ @@ -337,28 +348,11 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim (function init(){ - var serversStarted = 0; - Object.keys(ports).forEach(function(port){ - net.createServer({allowHalfOpen: false}, function(socket){ - - if (banning.enabled && socket.remoteAddress in bannedIPs){ - var bannedTime = bannedIPs[socket.remoteAddress]; - if ((Date.now() - bannedTime) < bannedMS){ - socket.end(); - return; - } - else{ - delete bannedIPs[socket.remoteAddress]; - } - } - - handleNewClient(socket); - }).listen(parseInt(port), function(){ - serversStarted++; - if (serversStarted === Object.keys(ports).length) + net .createServer({allowHalfOpen: true}, function(socket) { _this.handleNewClient(socket); } ) + .listen(parseInt(port), function(){ _this.emit('started'); - }); + }); }); })(); @@ -379,15 +373,6 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim client.sendMiningJob(jobParams); } } - - /* Some miners will consider the pool dead if it doesn't receive a job at least every 30 seconds. - So every time broadcast jobs, we set a timeout to rebroadcast in 30 seconds unless cleared. */ - clearTimeout(rebroadcastTimeout); - rebroadcastTimeout = setTimeout(function(){ - var resendParams = jobParams; - resendParams[8] = false; - _this.broadcastMiningJobs(resendParams); - }, 30000); }; this.getStratumClients = function () { @@ -399,8 +384,10 @@ var StratumServer = exports.Server = function StratumServer(ports, connectionTim }; this.manuallyAddStratumClient = function(clientObj) { - var subId = handleNewClient(clientObj.socket); - stratumClients[subscriptionId].manuallyInit(clientObj.workerName, clientObj.workerPass); + var subId = _this.handleNewClient(clientObj.socket); + if (subId != null) { // not banned! + stratumClients[subId].manuallyAuthClient(clientObj.workerName, clientObj.workerPass); + } } }; diff --git a/lib/varDiff.js b/lib/varDiff.js index 116af32..188e3eb 100644 --- a/lib/varDiff.js +++ b/lib/varDiff.js @@ -41,25 +41,21 @@ function RingBuffer(maxSize){ } -var varDiff = module.exports = function varDiff(ports){ +var varDiff = module.exports = function varDiff(port, varDiffOptions){ var _this = this; var networkDifficulty; - var portsCalcInfo = {}; + var bufferSize, tMin, tMax; - Object.keys(ports).forEach(function(port){ - var varDiffOptions = ports[port].varDiff; - if (!varDiffOptions) return; + if (!varDiffOptions) return; - var variance = varDiffOptions.targetTime * (varDiffOptions.variancePercent / 100); + var variance = varDiffOptions.targetTime * (varDiffOptions.variancePercent / 100); - portsCalcInfo[parseInt(port)] = { - bufferSize: varDiffOptions.retargetTime / varDiffOptions.targetTime * 4, - tMin: varDiffOptions.targetTime - variance, - tMax: varDiffOptions.targetTime + variance - } - }); + + bufferSize = varDiffOptions.retargetTime / varDiffOptions.targetTime * 4; + tMin = varDiffOptions.targetTime - variance; + tMax = varDiffOptions.targetTime + variance; this.setNetworkDifficulty = function(diff){ networkDifficulty = diff; @@ -70,11 +66,10 @@ var varDiff = module.exports = function varDiff(ports){ var stratumPort = client.socket.localPort; - if (!(stratumPort in portsCalcInfo)) - return; - - var calcInfo = portsCalcInfo[stratumPort]; - var options = ports[stratumPort].varDiff; + if (stratumPort != port) { + console.error("Handling a client which is not of this vardiff?"); + } + var options = varDiffOptions var lastTs; var lastRtc; @@ -87,7 +82,7 @@ var varDiff = module.exports = function varDiff(ports){ if (!lastRtc){ lastRtc = ts - options.retargetTime / 2; lastTs = ts; - timeBuffer = new RingBuffer(calcInfo.bufferSize); + timeBuffer = new RingBuffer(bufferSize); return; } @@ -103,12 +98,12 @@ var varDiff = module.exports = function varDiff(ports){ var avg = timeBuffer.avg(); var ddiff; - if (avg > calcInfo.tMax && client.difficulty > options.minDiff) { + if (avg > tMax && client.difficulty > options.minDiff) { ddiff = 0.5; if (ddiff * client.difficulty < options.minDiff) { ddiff = options.minDiff / client.difficulty; } - } else if (avg < calcInfo.tMin) { + } else if (avg < tMin) { ddiff = 2; var diffMax = networkDifficulty < options.maxDiff ? networkDifficulty : options.maxDiff;