From 774e2b0088e7e05e026bd5cb6d9c120073819922 Mon Sep 17 00:00:00 2001 From: hellcatz Date: Wed, 3 May 2017 22:20:08 -0700 Subject: [PATCH] Update paymentProcessor.js Experimental PPLNT support (console.logs need to be removed after experiments) Major reduction in RPC calls to daemon. Balance rounding improvements. Improvements to confirmations tracking. Improvements to operation id tracking. Improved error handling. --- libs/paymentProcessor.js | 651 ++++++++++++++++++++++----------------- 1 file changed, 362 insertions(+), 289 deletions(-) diff --git a/libs/paymentProcessor.js b/libs/paymentProcessor.js index 87c1716..d60427d 100644 --- a/libs/paymentProcessor.js +++ b/libs/paymentProcessor.js @@ -54,13 +54,18 @@ function SetupForPool(logger, poolOptions, setupFinished){ var minConfPayout = 3; var maxBlocksPerPayment = processingConfig.maxBlocksPerPayment || 3; - + + // pplnt - pay per last N time shares + var pplntEnabled = processingConfig.paymentMode === "pplnt" || false; + var pplntTimeQualify = processingConfig.pplnt || 0.51; // 51% + var requireShielding = poolOptions.coin.requireShielding === true; var fee = parseFloat(poolOptions.coin.txfee) || parseFloat(0.0004); logger.debug(logSystem, logComponent, logComponent + ' requireShielding: ' + requireShielding); logger.debug(logSystem, logComponent, logComponent + ' payments txfee reserve: ' + fee); logger.debug(logSystem, logComponent, logComponent + ' maxBlocksPerPayment: ' + maxBlocksPerPayment); + logger.debug(logSystem, logComponent, logComponent + ' PPLNT: ' + pplntEnabled + ', time period: '+pplntTimeQualify); var daemon = new Stratum.daemon.interface([processingConfig.daemon], function(severity, message){ logger[severity](logSystem, logComponent, message); @@ -236,7 +241,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ return; } - var amount = balanceRound((tBalance - 10000) / magnitude); + var amount = satoshisToCoins(tBalance - 10000); var params = [poolOptions.address, [{'address': poolOptions.zAddress, 'amount': amount}]]; daemon.cmd('z_sendmany', params, function (result) { @@ -269,8 +274,8 @@ function SetupForPool(logger, poolOptions, setupFinished){ return; } - var amount = balanceRound((zBalance - 10000) / magnitude); - // no more than 100 ZEC at a time + var amount = satoshisToCoins(zBalance - 10000); + // unshield no more than 100 ZEC at a time if (amount > 100.0) amount = 100.0; @@ -293,7 +298,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ } ); } - + // TODO, this needs to be moved out of payments processor function cacheNetworkStats () { var params = null; @@ -369,58 +374,83 @@ function SetupForPool(logger, poolOptions, setupFinished){ var opid_interval = poolOptions.walletInterval * 1000; // shielding not required for some equihash coins if (requireShielding === true) { - setInterval(function(){ - var checkOpIdSuccessAndGetResult = function(ops) { - ops.forEach(function(op, i){ - if (op.status == "success" || op.status == "failed") { - daemon.cmd('z_getoperationresult', [[op.id]], function (result) { - if (result.error) { - logger.warning(logSystem, logComponent, 'Unable to get payment operation id result ' + JSON.stringify(result)); + var checkOpids = function() { + var checkOpIdSuccessAndGetResult = function(ops) { + var batchRPC = []; + ops.forEach(function(op, i){ + if (op.status == "success" || op.status == "failed") { + batchRPC.push(['z_getoperationresult', [[op.id]]]); + if (opidCount > 0) { + opidCount = 0; } - if (result.response) { - if (opidCount > 0) { - opidCount = 0; - } - if (op.status == "failed") { - if (op.error) { - logger.error(logSystem, logComponent, "Shielding operation failed " + op.id + " " + op.error.code +", " + op.error.message); - } else { - logger.error(logSystem, logComponent, "Shielding operation failed " + op.id); - } + if (op.status == "failed") { + if (op.error) { + logger.error(logSystem, logComponent, "Shielding operation failed " + op.id + " " + op.error.code +", " + op.error.message); } else { - logger.special(logSystem, logComponent, 'Shielding operation success ' + op.id + ' txid: ' + op.result.txid); + logger.error(logSystem, logComponent, "Shielding operation failed " + op.id); } + } else { + logger.special(logSystem, logComponent, 'Shielding operation success ' + op.id + ' txid: ' + op.result.txid); + } + } else if (op.status == "executing") { + if (opidCount == 0) { + opidCount++; + logger.special(logSystem, logComponent, 'Shielding operation in progress ' + op.id ); } - }, true, true); - } else if (op.status == "executing") { - if (opidCount == 0) { - opidCount++; - logger.special(logSystem, logComponent, 'Shielding operation in progress ' + op.id ); } + }); + if (batchRPC.length <= 0) { + opidInterval = setInterval(checkOpids, opid_interval); + return; } - }); - }; - daemon.cmd('z_getoperationstatus', null, function (result) { + daemon.batchCmd(batchRPC, function(error, results){ + if (error || !results) { + logger.error(logSystem, logComponent, 'Error with z_getoperationresult ' + JSON.stringify(error)); + return; + } + results.forEach(function(result, i) { + if (parseFloat(result.result[i].execution_secs || 0) > parseFloat(poolOptions.walletInterval)) + logger.warning(logSystem, logComponent, 'Increase walletInterval in pool_config. opid execution took '+result.result[i].execution_secs+' secs.'); + }); + opidInterval = setInterval(checkOpids, opid_interval); + }); + }; + clearInterval(opidInterval); + daemon.cmd('z_getoperationstatus', null, function (result) { if (result.error) { logger.warning(logSystem, logComponent, 'Unable to get operation ids for clearing.'); - } - if (result.response) { + opidInterval = setInterval(checkOpids, opid_interval); + } else if (result.response) { checkOpIdSuccessAndGetResult(result.response); + } else { + opidInterval = setInterval(checkOpids, opid_interval); } - }, true, true); - }, opid_interval); + }, true, true); + } + + var opidInterval = setInterval(checkOpids, opid_interval); + } + + function roundTo(n, digits) { + if (digits === undefined) { + digits = 0; + } + var multiplicator = Math.pow(10, digits); + n = parseFloat((n * multiplicator).toFixed(11)); + var test =(Math.round(n) / multiplicator); + return +(test.toFixed(digits)); } var satoshisToCoins = function(satoshis){ - return parseFloat((satoshis / magnitude).toFixed(coinPrecision)); + return roundTo((satoshis / magnitude), coinPrecision); }; var coinsToSatoshies = function(coins){ return Math.round(coins * magnitude); }; - function balanceRound(number) { - return parseFloat((Math.round(number * 100000000) / 100000000).toFixed(8)); + function coinsRound(number) { + return roundTo(number, coinPrecision); } function checkForDuplicateBlockHeight(rounds, height) { @@ -452,8 +482,10 @@ function SetupForPool(logger, poolOptions, setupFinished){ var endRPCTimer = function(){ timeSpentRPC += Date.now() - startTimeRedis }; async.waterfall([ - - /* Call redis to get an array of rounds and balances - which are coinbase transactions and block heights from submitted blocks. */ + /* + Step 1 - build workers and rounds objects from redis + * removes duplicate block submissions from redis + */ function(callback){ startRedisTimer(); redisClient.multi([ @@ -466,12 +498,12 @@ function SetupForPool(logger, poolOptions, setupFinished){ callback(true); return; } - // build worker balances + // build workers object from :balances var workers = {}; for (var w in results[0]){ workers[w] = {balance: coinsToSatoshies(parseFloat(results[0][w]))}; } - // build initial rounds data from blocksPending + // build rounds object from :blocksPending var rounds = results[1].map(function(r){ var details = r.split(':'); return { @@ -561,259 +593,297 @@ function SetupForPool(logger, poolOptions, setupFinished){ }, - /* Does a batch rpc call to daemon with all the transaction hashes to see if they are confirmed yet. - It also adds the block reward amount to the round object - which the daemon gives also gives us. */ + /* + Step 2 - check if mined block coinbase tx are ready for payment + * adds block reward to rounds object + * adds block confirmations count to rounds object + * updates confirmation counts in redis + */ function(workers, rounds, callback){ - - // first verify block confirmations by block hash - var batchRPCcommand2 = rounds.map(function(r){ - return ['getblock', [r.blockHash]]; + // get pending block tx details + var batchRPCcommand = rounds.map(function(r){ + return ['gettransaction', [r.txHash]]; }); - // guarantee a response for batchRPCcommand2 - batchRPCcommand2.push(['getblockcount']); + // get account address (not implemented at this time) + batchRPCcommand.push(['getaccount', [poolOptions.address]]); startRPCTimer(); - daemon.batchCmd(batchRPCcommand2, function(error, blockDetails){ + daemon.batchCmd(batchRPCcommand, function(error, txDetails){ endRPCTimer(); - - // error getting block info by hash? - if (error || !blockDetails){ - logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch getblock ' - + JSON.stringify(error)); + if (error || !txDetails){ + logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch gettransactions ' + JSON.stringify(error)); callback(true); return; } - // update confirmations in redis for pending blocks - var confirmsUpdate = blockDetails.map(function(b) { - if (b.result != null && b.result.confirmations > 0) { - if (b.result.confirmations > 100) { - return ['hdel', logComponent + ':blocksPendingConfirms', b.result.hash]; + var confirmsUpdate = []; + var addressAccount = ""; + + // check for transaction errors and generated coins + txDetails.forEach(function(tx, i){ + if (i === txDetails.length - 1){ + if (tx.result && tx.result.toString().length > 0) { + addressAccount = tx.result.toString(); } - return ['hset', logComponent + ':blocksPendingConfirms', b.result.hash, b.result.confirmations]; - } - return null; - }); - - // filter nulls, last item is always null... - confirmsUpdate = confirmsUpdate.filter(function(val) { return val !== null; }); - // guarantee at least one redis update - if (confirmsUpdate.length < 1) - confirmsUpdate.push(['hset', logComponent + ':blocksPendingConfirms', 0, 0]); - - startRedisTimer(); - redisClient.multi(confirmsUpdate).exec(function(error, updated){ - endRedisTimer(); - - if (error){ - logger.error(logSystem, logComponent, 'failed to update pending block confirmations' - + JSON.stringify(error)); - callback(true); return; } - - // get pending block transaction details from coin daemon - var batchRPCcommand = rounds.map(function(r){ - return ['gettransaction', [r.txHash]]; - }); - // get account address (not implemented in zcash at this time..) - batchRPCcommand.push(['getaccount', [poolOptions.address]]); + var round = rounds[i]; + // look for transaction errors + if (tx.error && tx.error.code === -5){ + logger.warning(logSystem, logComponent, 'Daemon reports invalid transaction: ' + round.txHash); + round.category = 'kicked'; + return; + } + else if (!tx.result.details || (tx.result.details && tx.result.details.length === 0)){ + logger.warning(logSystem, logComponent, 'Daemon reports no details for transaction: ' + round.txHash); + round.category = 'kicked'; + return; + } + else if (tx.error || !tx.result){ + logger.error(logSystem, logComponent, 'Odd error with gettransaction ' + round.txHash + ' ' + JSON.stringify(tx)); + return; + } + // get the coin base generation tx + var generationTx = tx.result.details.filter(function(tx){ + return tx.address === poolOptions.address; + })[0]; + if (!generationTx && tx.result.details.length === 1){ + generationTx = tx.result.details[0]; + } + if (!generationTx){ + logger.error(logSystem, logComponent, 'Missing output details to pool address for transaction ' + round.txHash); + return; + } + // get transaction category for round + round.category = generationTx.category; + round.confirmations = parseInt((tx.result.confirmations || 0)); + // get reward for newly generated blocks + if (round.category === 'generate') { + round.reward = coinsRound(parseFloat(generationTx.amount || generationTx.value)); + } + // update confirmations in redis + confirmsUpdate.push(['hset', coin + ':blocksPendingConfirms', round.blockHash, round.confirmations]); + }); - startRPCTimer(); - daemon.batchCmd(batchRPCcommand, function(error, txDetails){ - endRPCTimer(); - - if (error || !txDetails){ - logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch gettransactions ' - + JSON.stringify(error)); - callback(true); - return; + var canDeleteShares = function(r){ + for (var i = 0; i < rounds.length; i++){ + var compareR = rounds[i]; + if ((compareR.height === r.height) + && (compareR.category !== 'kicked') + && (compareR.category !== 'orphan') + && (compareR.serialized !== r.serialized)){ + return false; } + } + return true; + }; - var addressAccount = ""; + // limit blocks paid per payment round + var payingBlocks = 0; + //filter out all rounds that are immature (not confirmed or orphaned yet) + rounds = rounds.filter(function(r){ + // only pay max blocks at a time + if (payingBlocks >= maxBlocksPerPayment) + return false; - // check for transaction errors and generated coins - txDetails.forEach(function(tx, i){ - - if (i === txDetails.length - 1){ - addressAccount = tx.result; - return; - } - - var round = rounds[i]; - if (tx.error && tx.error.code === -5){ - logger.warning(logSystem, logComponent, 'Daemon reports invalid transaction: ' + round.txHash); - round.category = 'kicked'; - return; - } - else if (!tx.result.details || (tx.result.details && tx.result.details.length === 0)){ - logger.warning(logSystem, logComponent, 'Daemon reports no details for transaction: ' + round.txHash); - round.category = 'kicked'; - return; - } - else if (tx.error || !tx.result){ - logger.error(logSystem, logComponent, 'Odd error with gettransaction ' + round.txHash + ' ' - + JSON.stringify(tx)); - return; - } - - var generationTx = tx.result.details.filter(function(tx){ - return tx.address === poolOptions.address; - })[0]; - - if (!generationTx && tx.result.details.length === 1){ - generationTx = tx.result.details[0]; - } - - if (!generationTx){ - logger.error(logSystem, logComponent, 'Missing output details to pool address for transaction ' + round.txHash); - return; - } - - round.category = generationTx.category; - if (round.category === 'generate') { - round.reward = balanceRound(generationTx.amount - fee) || balanceRound(generationTx.value - fee); // TODO: Adjust fees to be dynamic - } - - }); - - var canDeleteShares = function(r){ - for (var i = 0; i < rounds.length; i++){ - var compareR = rounds[i]; - if ((compareR.height === r.height) - && (compareR.category !== 'kicked') - && (compareR.category !== 'orphan') - && (compareR.serialized !== r.serialized)){ - return false; - } - } + switch (r.category) { + case 'orphan': + case 'kicked': + r.canDeleteShares = canDeleteShares(r); + return true; + case 'generate': + payingBlocks++; return true; - }; - - // limit blocks paid per payment round - var payingBlocks = 0; - - //filter out all rounds that are immature (not confirmed or orphaned yet) - rounds = rounds.filter(function(r){ - // only pay max blocks at a time - if (payingBlocks >= maxBlocksPerPayment) - return false; - - switch (r.category) { - case 'orphan': - case 'kicked': - r.canDeleteShares = canDeleteShares(r); - return true; - case 'generate': - payingBlocks++; - return true; - - default: - return false; - } - }); + default: + return false; + } + }); - // TODO: make tx fees dynamic - var feeSatoshi = fee * magnitude; - - // calculate what the pool owes its miners - var totalOwed = parseInt(0); - for (var i = 0; i < rounds.length; i++) { - // only pay generated blocks, not orphaned or kicked - if (rounds[i].category == 'generate') { - totalOwed = totalOwed + Math.round(rounds[i].reward * magnitude) - feeSatoshi; - } + // TODO: make tx fees dynamic + var feeSatoshi = coinsToSatoshies(fee); + // calculate what the pool owes its miners + var totalOwed = parseInt(0); + for (var i = 0; i < rounds.length; i++) { + // only pay generated blocks, not orphaned or kicked + if (rounds[i].category == 'generate') { + totalOwed = totalOwed + coinsToSatoshies(rounds[i].reward) - feeSatoshi; + } + } + + var notAddr = null; + if (requireShielding === true) { + notAddr = poolOptions.address; + } + + // update confirmations for pending blocks in redis + if (confirmsUpdate.length > 0) { + startRedisTimer(); + redisClient.multi(confirmsUpdate).exec(function(error, result){ + endRedisTimer(); + if (error) { + logger.error(logSystem, logComponent, 'Error could not update confirmations for pending blocks in redis ' + JSON.stringify(error)); + return callback(true); } - - var notAddr = null; - if (requireShielding === true) { - notAddr = poolOptions.address; - } - - // check if we have enough tAddress funds to brgin payment processing + // check if we have enough tAddress funds to begin payment processing listUnspent(null, notAddr, minConfPayout, false, function (error, tBalance){ if (error) { logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments.'); return callback(true); } else if (tBalance < totalOwed) { - logger.error(logSystem, logComponent, 'Insufficient funds to process payments for ' + payingBlocks + ' blocks ('+(tBalance / magnitude).toFixed(8) + ' < ' + (totalOwed / magnitude).toFixed(8)+'). Possibly waiting for shielding process.'); + logger.error(logSystem, logComponent, 'Insufficient funds ('+satoshisToCoins(tBalance) + ') to process payments (' + satoshisToCoins(totalOwed)+') for ' + payingBlocks + ' blocks; possibly waiting for txs.'); return callback(true); - } else { - // zcash daemon does not support account feature - addressAccount = ""; - callback(null, workers, rounds, addressAccount); } - }) - + // account feature not implemented at this time + addressAccount = ""; + callback(null, workers, rounds, addressAccount); + }); }); - }); - }); + } else { + // no pending blocks, need to find a block! + return callback(true); + } + }) }, - /* Does a batch redis call to get shares contributed to each round. Then calculates the reward - amount owned to each miner for each round. */ + /* + Step 3 - lookup shares in redis and calculate rewards + */ function(workers, rounds, addressAccount, callback){ - - var shareLookups = rounds.map(function(r){ - return ['hgetall', coin + ':shares:round' + r.height] + // pplnt times lookup + var timeLookups = rounds.map(function(r){ + return ['hgetall', coin + ':shares:times' + r.height] }); - startRedisTimer(); - redisClient.multi(shareLookups).exec(function(error, allWorkerShares){ + redisClient.multi(timeLookups).exec(function(error, allWorkerTimes){ endRedisTimer(); - if (error){ - callback('Check finished - redis error with multi get rounds share'); + callback('Check finished - redis error with multi get rounds time'); return; } - - rounds.forEach(function(round, i){ - var workerShares = allWorkerShares[i]; - - if (!workerShares){ - logger.error(logSystem, logComponent, 'No worker shares for round: ' - + round.height + ' blockHash: ' + round.blockHash); + var shareLookups = rounds.map(function(r){ + return ['hgetall', coin + ':shares:round' + r.height]; + }); + startRedisTimer(); + redisClient.multi(shareLookups).exec(function(error, allWorkerShares){ + endRedisTimer(); + if (error){ + callback('Check finished - redis error with multi get rounds share'); return; } - switch (round.category){ - case 'kicked': - case 'orphan': - round.workerShares = workerShares; - break; + // error detection + var err = null; - case 'generate': - /* We found a confirmed block! Now get the reward for it and calculate how much - we owe each miner based on the shares they submitted during that block round. */ - var reward = parseInt(round.reward * magnitude); - - var totalShares = Object.keys(workerShares).reduce(function(p, c){ - return p + parseFloat(workerShares[c]) - }, 0); - - for (var workerAddress in workerShares){ - var percent = parseFloat(workerShares[workerAddress]) / totalShares; - var workerRewardTotal = Math.floor(reward * percent); - var worker = workers[workerAddress] = (workers[workerAddress] || {}); - worker.totalShares = (worker.totalShares || 0) + parseFloat(workerShares[workerAddress]); - worker.reward = (worker.reward || 0) + workerRewardTotal; - } - break; + // total shares + rounds.forEach(function(round, i){ + var workerShares = allWorkerShares[i]; + if (!workerShares){ + err = true; + logger.error(logSystem, logComponent, 'No worker shares for round: ' + round.height + ' blockHash: ' + round.blockHash); + return; + } + var workerTimes = allWorkerTimes[i]; + switch (round.category){ + case 'kicked': + case 'orphan': + round.workerShares = workerShares; + break; + case 'generate': + // TODO: make tx fees dynamic + var feeSatoshi = coinsToSatoshies(fee); + var reward = coinsToSatoshies(round.reward) - feeSatoshi; + var totalShares = parseFloat(0); + var sharesLost = parseFloat(0); + // find most time spent in this round by single worker + maxTime = 0; + for (var workerAddress in workerTimes){ + if (maxTime < parseFloat(workerTimes[workerAddress])) + maxTime = parseFloat(workerTimes[workerAddress]); + } + // total up shares for round + for (var workerAddress in workerShares){ + var worker = workers[workerAddress] = (workers[workerAddress] || {}); + var shares = parseFloat((workerShares[workerAddress] || 0)); + // if pplnt mode + if (pplntEnabled === true && maxTime > 0) { + var tshares = shares; + var lost = parseFloat(0); + var address = workerAddress.split('.')[0]; + if (workerTimes[address] != null && parseFloat(workerTimes[address]) > 0) { + var timePeriod = roundTo(parseFloat(workerTimes[address] || 1) / maxTime , 2); + if (timePeriod > 0 && timePeriod < pplntTimeQualify) { + var lost = shares - (shares * timePeriod); + sharesLost += lost; + shares = Math.max(shares - lost, 0); + logger.warning(logSystem, logComponent, 'PPLNT: Reduced shares for '+workerAddress+' round:' + round.height + ' maxTime:'+maxTime+'sec timePeriod:'+roundTo(timePeriod,6)+' shares:'+tshares+' lost:'+lost+' new:'+shares); + } + if (timePeriod > 1.0) { + err = true; + logger.error(logSystem, logComponent, 'Time share period is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash); + return; + } + } else { + logger.warning(logSystem, logComponent, 'PPLNT: Missing time share period for '+workerAddress+', miner shares qualified in round ' + round.height); + } + } + worker.roundShares = shares; + worker.totalShares = parseFloat(worker.totalShares || 0) + shares; + totalShares += shares; + } + + console.log('--REWARD DEBUG--------------'); + // calculate rewards for round + var totalAmount = 0; + for (var workerAddress in workerShares){ + var worker = workers[workerAddress] = (workers[workerAddress] || {}); + var percent = parseFloat(worker.roundShares) / totalShares; + if (percent > 1.0) { + err = true; + logger.error(logSystem, logComponent, 'Share percent is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash); + return; + } + // calculate workers reward for this round + var workerRewardTotal = Math.round(reward * percent); + // add to total reward for worker + worker.reward = (worker.reward || 0) + workerRewardTotal; + // add to total amount sent to all workers + totalAmount += worker.reward; + + console.log('rewardAmount: '+workerAddress+' '+workerRewardTotal); + console.log('totalAmount: '+workerAddress+' '+worker.reward); + } + console.log('totalAmount: '+totalAmount); + console.log('blockHeight: '+round.height); + console.log('blockReward: '+reward); + console.log('totalShares: '+totalShares); + console.log('sharesLost: '+sharesLost); + console.log('----------------------------'); + break; + } + }); + + // if there was no errors + if (err === null) { + // continue payments + callback(null, workers, rounds, addressAccount); + } else { + // stop waterfall flow, do not process payments + callback(true); } }); - - callback(null, workers, rounds, addressAccount); + }); + }, - /* Calculate if any payments are ready to be sent and trigger them sending - Get balance different for each address and pass it along as object of latest balances such as - {worker1: balance1, worker2, balance2} - when deciding the sent balance, it the difference should be -1*amount they had in db, - if not sending the balance, the differnce should be +(the amount they earned this round) - */ + + /* + Step 4 - Generate RPC commands to send payments + When deciding the sent balance, it the difference should be -1*amount they had in db, + If not sending the balance, the differnce should be +(the amount they earned this round) + */ function(workers, rounds, addressAccount, callback) { var trySend = function (withholdPercent) { @@ -828,12 +898,13 @@ function SetupForPool(logger, poolOptions, setupFinished){ totalShares += (worker.totalShares || 0) worker.balance = worker.balance || 0; worker.reward = worker.reward || 0; - var toSend = balanceRound(satoshisToCoins(Math.floor((worker.balance + worker.reward) * (1 - withholdPercent)))); + // get miner payout totals + var toSendSatoshis = Math.round((worker.balance + worker.reward) * (1 - withholdPercent)); var address = worker.address = (worker.address || getProperAddress(w.split('.')[0])); if (minerTotals[address] != null && minerTotals[address] > 0) { - minerTotals[address] = balanceRound(minerTotals[address] + toSend); + minerTotals[address] += toSendSatoshis; } else { - minerTotals[address] = toSend; + minerTotals[address] = toSendSatoshis; } } // now process each workers balance, and pay the miner @@ -841,28 +912,29 @@ function SetupForPool(logger, poolOptions, setupFinished){ var worker = workers[w]; worker.balance = worker.balance || 0; worker.reward = worker.reward || 0; - var toSend = Math.floor((worker.balance + worker.reward) * (1 - withholdPercent)); + var toSendSatoshis = Math.round((worker.balance + worker.reward) * (1 - withholdPercent)); var address = worker.address = (worker.address || getProperAddress(w.split('.')[0])); // if miners total is enough, go ahead and add this worker balance - if (minerTotals[address] >= satoshisToCoins(minPaymentSatoshis)) { - totalSent += toSend; - worker.sent = balanceRound(satoshisToCoins(toSend)); - worker.balanceChange = Math.min(worker.balance, toSend) * -1; + if (minerTotals[address] >= minPaymentSatoshis) { + totalSent += toSendSatoshis; + // send funds + worker.sent = satoshisToCoins(toSendSatoshis); + worker.balanceChange = Math.min(worker.balance, toSendSatoshis) * -1; // multiple workers may have same address, add them up if (addressAmounts[address] != null && addressAmounts[address] > 0) { - addressAmounts[address] = balanceRound(addressAmounts[address] + worker.sent); + addressAmounts[address] = coinsRound(addressAmounts[address] + worker.sent); } else { addressAmounts[address] = worker.sent; } - } - else { - worker.balanceChange = Math.max(toSend - worker.balance, 0); + } else { + // add to balance, not enough minerals worker.sent = 0; + worker.balanceChange = Math.max(toSendSatoshis - worker.balance, 0); // track balance changes if (balanceAmounts[address] != null && balanceAmounts[address] > 0) { - balanceAmounts[address] = balanceRound(balanceAmounts[address] + worker.balanceChange); + balanceAmounts[address] = coinsRound(balanceAmounts[address] + satoshisToCoins(worker.balanceChange)); } else { - balanceAmounts[address] = worker.balanceChange; + balanceAmounts[address] = satoshisToCoins(worker.balanceChange); } } } @@ -872,20 +944,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ callback(null, workers, rounds); return; } - /* - var undoPaymentsOnError = function(workers) { - totalSent = 0; - // TODO, set round.category to immature, to attempt to pay again - // we did not send anything to any workers - for (var w in workers) { - var worker = workers[w]; - if (worker.sent > 0) { - worker.balanceChange = 0; - worker.sent = 0; - } - } - }; - */ + // perform the sendmany operation daemon.cmd('sendmany', ["", addressAmounts], function (result) { // check for failed payments, there are many reasons @@ -900,21 +959,18 @@ function SetupForPool(logger, poolOptions, setupFinished){ else if (result.error && result.error.code === -5) { // invalid address specified in addressAmounts array logger.error(logSystem, logComponent, 'Error sending payments ' + result.error.message); - //undoPaymentsOnError(workers); callback(true); return; } else if (result.error && result.error.message != null) { // unknown error from daemon logger.error(logSystem, logComponent, 'Error sending payments ' + result.error.message); - //undoPaymentsOnError(workers); callback(true); return; } else if (result.error) { // some other unknown error logger.error(logSystem, logComponent, 'Error sending payments ' + JSON.stringify(result.error)); - //undoPaymentsOnError(workers); callback(true); return; } @@ -928,7 +984,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ if (txid != null) { // it worked, congrats on your pools payout ;) - logger.special(logSystem, logComponent, 'Sent ' + (totalSent / magnitude).toFixed(8) + logger.special(logSystem, logComponent, 'Sent ' + satoshisToCoins(totalSent) + ' to ' + Object.keys(addressAmounts).length + ' miners; txid: '+txid); if (withholdPercent > 0) { @@ -938,11 +994,12 @@ function SetupForPool(logger, poolOptions, setupFinished){ } // save payments data to redis - var paymentBlocks = rounds.map(function(r){ + var paymentBlocks = rounds.filter(function(r){ return r.category == 'generate'; }).map(function(r){ return parseInt(r.height); }); + var paymentsUpdate = []; - var paymentsData = {time:Date.now(), txid:txid, shares:totalShares, paid:balanceRound(totalSent / magnitude), miners:Object.keys(addressAmounts).length, blocks: paymentBlocks, amounts: addressAmounts, balances: balanceAmounts}; + var paymentsData = {time:Date.now(), txid:txid, shares:totalShares, paid:satoshisToCoins(totalSent), miners:Object.keys(addressAmounts).length, blocks: paymentBlocks, amounts: addressAmounts, balances: balanceAmounts}; paymentsUpdate.push(['zadd', logComponent + ':payments', Date.now(), JSON.stringify(paymentsData)]); startRedisTimer(); redisClient.multi(paymentsUpdate).exec(function(error, payments){ @@ -950,6 +1007,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ if (error){ logger.error(logSystem, logComponent, 'Error redis save payments data ' + JSON.stringify(payments)); } + // perform final redis updates callback(null, workers, rounds); }); @@ -965,12 +1023,16 @@ function SetupForPool(logger, poolOptions, setupFinished){ } } }, true, true); - }; trySend(0); }, + + + /* + Step 5 - Final redis commands + */ function(workers, rounds, callback){ var totalPaid = parseFloat(0); @@ -986,19 +1048,20 @@ function SetupForPool(logger, poolOptions, setupFinished){ 'hincrbyfloat', coin + ':balances', w, - balanceRound(satoshisToCoins(worker.balanceChange)) + satoshisToCoins(worker.balanceChange) ]); } if (worker.sent !== 0){ - workerPayoutsCommand.push(['hincrbyfloat', coin + ':payouts', w, balanceRound(worker.sent)]); - totalPaid = balanceRound(totalPaid + worker.sent); + workerPayoutsCommand.push(['hincrbyfloat', coin + ':payouts', w, coinsRound(worker.sent)]); + totalPaid = coinsRound(totalPaid + worker.sent); } } var movePendingCommands = []; var roundsToDelete = []; var orphanMergeCommands = []; - + var confirmsToDelete = []; + var moveSharesToCurrent = function(r){ var workerShares = r.workerShares; if (workerShares != null) { @@ -1012,17 +1075,22 @@ function SetupForPool(logger, poolOptions, setupFinished){ rounds.forEach(function(r){ switch(r.category){ case 'kicked': + confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]); movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksKicked', r.serialized]); case 'orphan': + confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]); movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksOrphaned', r.serialized]); if (r.canDeleteShares){ moveSharesToCurrent(r); roundsToDelete.push(coin + ':shares:round' + r.height); + roundsToDelete.push(coin + ':shares:times' + r.height); } return; case 'generate': + confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]); movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksConfirmed', r.serialized]); roundsToDelete.push(coin + ':shares:round' + r.height); + roundsToDelete.push(coin + ':shares:times' + r.height); return; } }); @@ -1044,8 +1112,11 @@ function SetupForPool(logger, poolOptions, setupFinished){ if (roundsToDelete.length > 0) finalRedisCommands.push(['del'].concat(roundsToDelete)); + if (confirmsToDelete.length > 0) + finalRedisCommands = finalRedisCommands.concat(confirmsToDelete); + if (totalPaid !== 0) - finalRedisCommands.push(['hincrbyfloat', coin + ':stats', 'totalPaid', balanceRound(totalPaid)]); + finalRedisCommands.push(['hincrbyfloat', coin + ':stats', 'totalPaid', totalPaid]); if (finalRedisCommands.length === 0){ callback(); @@ -1055,12 +1126,14 @@ function SetupForPool(logger, poolOptions, setupFinished){ startRedisTimer(); redisClient.multi(finalRedisCommands).exec(function(error, results){ endRedisTimer(); - if (error){ + if (error) { clearInterval(paymentInterval); + logger.error(logSystem, logComponent, 'Payments sent but could not update redis. ' + JSON.stringify(error) + ' Disabling payment processing to prevent possible double-payouts. The redis commands in ' + coin + '_finalRedisCommands.txt must be ran manually'); + fs.writeFile(coin + '_finalRedisCommands.txt', JSON.stringify(finalRedisCommands), function(err){ logger.error('Could not write finalRedisCommands.txt, you are fucked.'); });