diff --git a/libs/paymentProcessor.js b/libs/paymentProcessor.js index 096cb20..fe1928f 100644 --- a/libs/paymentProcessor.js +++ b/libs/paymentProcessor.js @@ -6,7 +6,6 @@ var async = require('async'); var Stratum = require('stratum-pool'); var util = require('stratum-pool/lib/util.js'); - module.exports = function(logger){ var poolConfigs = JSON.parse(process.env.pools); @@ -53,12 +52,15 @@ function SetupForPool(logger, poolOptions, setupFinished){ var minConfShield = 3; var minConfPayout = 3; + + var maxBlocksPerPayment = processingConfig.maxBlocksPerPayment || 3; var requireShielding = poolOptions.coin.requireShielding === true; var fee = parseFloat(poolOptions.coin.txfee) || parseFloat(0.0004); - logger.special(logSystem, logComponent, logComponent + ' requireShielding: ' + requireShielding); - logger.special(logSystem, logComponent, logComponent + ' payments txfee reserve: ' + fee); + logger.debug(logSystem, logComponent, logComponent + ' requireShielding: ' + requireShielding); + logger.debug(logSystem, logComponent, logComponent + ' payments txfee reserve: ' + fee); + logger.debug(logSystem, logComponent, logComponent + ' maxBlocksPerPayment: ' + maxBlocksPerPayment); var daemon = new Stratum.daemon.interface([processingConfig.daemon], function(severity, message){ logger[severity](logSystem, logComponent, message); @@ -291,8 +293,8 @@ function SetupForPool(logger, poolOptions, setupFinished){ } ); } - - + + // TODO, this needs to be moved out of payments processor function cacheNetworkStats () { var params = null; daemon.cmd('getmininginfo', params, @@ -381,19 +383,19 @@ function SetupForPool(logger, poolOptions, setupFinished){ } if (op.status == "failed") { if (op.error) { - logger.error(logSystem, logComponent, "Payment operation failed " + op.id + " " + op.error.code +", " + op.error.message); + logger.error(logSystem, logComponent, "Shielding operation failed " + op.id + " " + op.error.code +", " + op.error.message); } else { - logger.error(logSystem, logComponent, "Payment operation failed " + op.id); + logger.error(logSystem, logComponent, "Shielding operation failed " + op.id); } } else { - logger.special(logSystem, logComponent, 'Payment operation success ' + op.id + ' txid: ' + op.result.txid); + logger.special(logSystem, logComponent, 'Shielding operation success ' + op.id + ' txid: ' + op.result.txid); } } }, true, true); } else if (op.status == "executing") { if (opidCount == 0) { opidCount++; - logger.special(logSystem, logComponent, 'Payment operation in progress ' + op.id ); + logger.special(logSystem, logComponent, 'Shielding operation in progress ' + op.id ); } } }); @@ -418,9 +420,18 @@ function SetupForPool(logger, poolOptions, setupFinished){ }; function balanceRound(number) { - return parseFloat((Math.round(number * 100000000) / 100000000).toFixed(8)); + return parseFloat((Math.round(number * 100000000) / 100000000).toFixed(8)); } + function checkForDuplicateBlockHeight(rounds, height) { + var count = 0; + for (var i = 0; i < rounds.length; i++) { + if (rounds[i].height == height) + count++; + } + return count > 1; + } + /* Deal with numbers in smallest possible units (satoshis) as much as possible. This greatly helps with accuracy when rounding and whatnot. When we are storing numbers for only humans to see, store in whole coin units. */ @@ -442,8 +453,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ async.waterfall([ - /* Call redis to get an array of rounds - which are coinbase transactions and block heights from submitted - blocks. */ + /* Call redis to get an array of rounds and balances - which are coinbase transactions and block heights from submitted blocks. */ function(callback){ startRedisTimer(); redisClient.multi([ @@ -451,29 +461,102 @@ function SetupForPool(logger, poolOptions, setupFinished){ ['smembers', coin + ':blocksPending'] ]).exec(function(error, results){ endRedisTimer(); - if (error){ logger.error(logSystem, logComponent, 'Could not get blocks from redis ' + JSON.stringify(error)); callback(true); return; } - + // build worker balances var workers = {}; for (var w in results[0]){ workers[w] = {balance: coinsToSatoshies(parseFloat(results[0][w]))}; } - + // build initial rounds data from blocksPending var rounds = results[1].map(function(r){ var details = r.split(':'); return { blockHash: details[0], txHash: details[1], height: details[2], + minedby: details[3], + duplicate: false, serialized: r }; }); - - callback(null, workers, rounds); + // find duplicate blocks by height + // this can happen when two or more solutions are submitted at the same block height + var duplicateFound = false; + for (var i = 0; i < rounds.length; i++) { + if (checkForDuplicateBlockHeight(rounds, rounds[i].height) === true) { + rounds[i].duplicate = true; + duplicateFound = true; + } + } + // handle duplicates if needed + if (duplicateFound) { + var dups = rounds.filter(function(round){ return round.duplicate; }); + logger.warning(logSystem, logComponent, 'Duplicate pending blocks found: ' + JSON.stringify(dups)); + // attempt to find the invalid duplicates + var rpcDupCheck = dups.map(function(r){ + return ['getblock', [r.blockHash]]; + }); + startRPCTimer(); + daemon.batchCmd(rpcDupCheck, function(error, blocks){ + endRPCTimer(); + if (error || !blocks) { + logger.error(logSystem, logComponent, 'Error with duplicate block check rpc call getblock ' + JSON.stringify(error)); + return; + } + // look for the invalid duplicate block + var validBlocks = {}; // hashtable for unique look up + var invalidBlocks = []; // array for redis work + blocks.forEach(function(block, i) { + if (block && block.result) { + // invalid duplicate submit blocks have negative confirmations + if (block.result.confirmations < 0) { + logger.warning(logSystem, logComponent, 'Remove invalid duplicate block ' + block.result.height + ' > ' + block.result.hash); + // move from blocksPending to blocksDuplicate... + invalidBlocks.push(['smove', coin + ':blocksPending', coin + ':blocksDuplicate', dups[i].serialized]); + } else { + // block must be valid, make sure it is unique + if (validBlocks.hasOwnProperty(dups[i].blockHash)) { + // not unique duplicate block + logger.warning(logSystem, logComponent, 'Remove non-unique duplicate block ' + block.result.height + ' > ' + block.result.hash); + // move from blocksPending to blocksDuplicate... + invalidBlocks.push(['smove', coin + ':blocksPending', coin + ':blocksDuplicate', dups[i].serialized]); + } else { + // keep unique valid block + validBlocks[dups[i].blockHash] = dups[i].serialized; + logger.debug(logSystem, logComponent, 'Keep valid duplicate block ' + block.result.height + ' > ' + block.result.hash); + } + } + } + }); + // filter out all duplicates to prevent double payments + rounds = rounds.filter(function(round){ return !round.duplicate; }); + // if we detected the invalid duplicates, move them + if (invalidBlocks.length > 0) { + // move invalid duplicate blocks in redis + startRedisTimer(); + redisClient.multi(invalidBlocks).exec(function(error, kicked){ + endRedisTimer(); + if (error) { + logger.error(logSystem, logComponent, 'Error could not move invalid duplicate blocks in redis ' + JSON.stringify(error)); + } + // continue payments normally + callback(null, workers, rounds); + }); + } else { + // notify pool owner that we are unable to find the invalid duplicate blocks, manual intervention required... + logger.error(logSystem, logComponent, 'Unable to detect invalid duplicate blocks, duplicate block payments on hold.'); + // continue payments normally + callback(null, workers, rounds); + } + }); + } else { + // no duplicates, continue payments normally + callback(null, workers, rounds); + } }); }, @@ -528,58 +611,12 @@ function SetupForPool(logger, poolOptions, setupFinished){ callback(true); return; } - - // check for invalid blocks by block hash - blockDetails.forEach(function(block, i) { - // this is just the response from getblockcount - if (i === blockDetails.length - 1){ - return; - } - // help track duplicate or invalid blocks by block hash - if (block && block.result && block.result.hash) { - // find the round for this block hash - for (var k=0; k < rounds.length; k++) { - if (rounds[k].blockHash == block.result.hash) { - var round = rounds[k]; - var dupFound = false; - // duplicate, invalid, kicked, orphaned blocks will have negative confirmations - if (block.result.confirmations < 0) { - // check if this is an invalid duplicate - // we need to kick invalid duplicates now, as this will cause a double payout... - for (var d=0; d < rounds.length; d++) { - if (rounds[d].height == block.result.height && rounds[d].blockHash != block.result.hash) { - logger.warning(logSystem, logComponent, 'Kicking invalid duplicate block ' +round.height + ' > ' + round.blockHash); - dupFound = true; - // kick this round now, its completely invalid! - var kickNow = []; - kickNow.push(['smove', coin + ':blocksPending', coin + ':blocksDuplicate', round.serialized]); - startRedisTimer(); - redisClient.multi(kickNow).exec(function(error, kicked){ - endRedisTimer(); - if (error){ - logger.error(logSystem, logComponent, 'Error could not kick invalid duplicate block ' + JSON.stringify(kicked)); - } - }); - // filter the duplicate out now, just in case we are actually paying this time around... - rounds = rounds.filter(function(item){ return item.txHash != round.txHash; }); - } - } - // unknown reason why this block failed, possible orphan or kicked soon - // not sure if we should take any action or just wait it out... - if (!dupFound) { - logger.warning(logSystem, logComponent, 'Daemon reports negative confirmations '+block.result.confirmations+' for block: ' +round.height + ' > ' + round.blockHash); - } - } - } - } - } - }); - - // now check block transaction ids + + // get pending block transaction details from coin daemon var batchRPCcommand = rounds.map(function(r){ return ['gettransaction', [r.txHash]]; }); - // guarantee a response for batchRPCcommand + // get account address (not implemented in zcash at this time..) batchRPCcommand.push(['getaccount', [poolOptions.address]]); startRPCTimer(); @@ -653,25 +690,40 @@ function SetupForPool(logger, poolOptions, setupFinished){ return true; }; - //Filter out all rounds that are immature (not confirmed or orphaned yet) + // limit blocks paid per payment round + var payingBlocks = 0; + + //filter out all rounds that are immature (not confirmed or orphaned yet) rounds = rounds.filter(function(r){ + + // only pay max blocks at a time + if (payingBlocks >= maxBlocksPerPayment) + return false; + switch (r.category) { case 'orphan': case 'kicked': r.canDeleteShares = canDeleteShares(r); + return true; case 'generate': + payingBlocks++; return true; + default: return false; } }); + // TODO: make tx fees dynamic var feeSatoshi = fee * magnitude; - + // calculate what the pool owes its miners var totalOwed = parseInt(0); for (var i = 0; i < rounds.length; i++) { - totalOwed = totalOwed + Math.round(rounds[i].reward * magnitude) - feeSatoshi; // TODO: make tx fees dynamic + // only pay generated blocks, not orphaned or kicked + if (rounds[i].category == 'generate') { + totalOwed = totalOwed + Math.round(rounds[i].reward * magnitude) - feeSatoshi; + } } var notAddr = null; @@ -682,10 +734,10 @@ function SetupForPool(logger, poolOptions, setupFinished){ // check if we have enough tAddress funds to brgin payment processing listUnspent(null, notAddr, minConfPayout, false, function (error, tBalance){ if (error) { - logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments. (Unable to begin payment process)'); + logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments.'); return callback(true); } else if (tBalance < totalOwed) { - logger.error(logSystem, logComponent, 'Insufficient funds to process payments ('+(tBalance / magnitude).toFixed(8) + ' < ' + (totalOwed / magnitude).toFixed(8)+'). Possibly waiting for shielding process.'); + logger.error(logSystem, logComponent, 'Insufficient funds to process payments for ' + payingBlocks + ' blocks ('+(tBalance / magnitude).toFixed(8) + ' < ' + (totalOwed / magnitude).toFixed(8)+'). Possibly waiting for shielding process.'); return callback(true); } else { // zcash daemon does not support account feature @@ -756,7 +808,6 @@ function SetupForPool(logger, poolOptions, setupFinished){ }); }, - /* Calculate if any payments are ready to be sent and trigger them sending Get balance different for each address and pass it along as object of latest balances such as {worker1: balance1, worker2, balance2} diff --git a/pool_configs/komodo_example.json b/pool_configs/komodo_example.json index acbcda3..a81f446 100644 --- a/pool_configs/komodo_example.json +++ b/pool_configs/komodo_example.json @@ -30,6 +30,7 @@ "paymentInterval": 57, "_comment_paymentInterval": "Interval in seconds to check and perform payments.", "minimumPayment": 0.1, + "maxBlocksPerPayment": 3, "daemon": { "host": "127.0.0.1", "port": 8232, diff --git a/pool_configs/zcash_example.json b/pool_configs/zcash_example.json index 33b1eea..2466d19 100644 --- a/pool_configs/zcash_example.json +++ b/pool_configs/zcash_example.json @@ -22,6 +22,7 @@ "enabled": false, "paymentInterval": 20, "minimumPayment": 0.1, + "maxBlocksPerPayment": 3, "daemon": { "host": "127.0.0.1", "port": 19332, diff --git a/pool_configs/zcash_testnet_example.json b/pool_configs/zcash_testnet_example.json index 6340845..987dbfc 100644 --- a/pool_configs/zcash_testnet_example.json +++ b/pool_configs/zcash_testnet_example.json @@ -23,6 +23,7 @@ "enabled": false, "paymentInterval": 20, "minimumPayment": 0.1, + "maxBlocksPerPayment": 1, "daemon": { "host": "127.0.0.1", "port": 19332, diff --git a/pool_configs/zclassic_example.json b/pool_configs/zclassic_example.json index b7851e7..c3120bb 100644 --- a/pool_configs/zclassic_example.json +++ b/pool_configs/zclassic_example.json @@ -27,6 +27,7 @@ "enabled": true, "paymentInterval": 20, "minimumPayment": 0.1, + "maxBlocksPerPayment": 3, "daemon": { "host": "127.0.0.1", "port": 8232,