diff --git a/libs/paymentProcessor.js b/libs/paymentProcessor.js index 87c1716..d60427d 100644 --- a/libs/paymentProcessor.js +++ b/libs/paymentProcessor.js @@ -54,13 +54,18 @@ function SetupForPool(logger, poolOptions, setupFinished){ var minConfPayout = 3; var maxBlocksPerPayment = processingConfig.maxBlocksPerPayment || 3; - + + // pplnt - pay per last N time shares + var pplntEnabled = processingConfig.paymentMode === "pplnt" || false; + var pplntTimeQualify = processingConfig.pplnt || 0.51; // 51% + var requireShielding = poolOptions.coin.requireShielding === true; var fee = parseFloat(poolOptions.coin.txfee) || parseFloat(0.0004); logger.debug(logSystem, logComponent, logComponent + ' requireShielding: ' + requireShielding); logger.debug(logSystem, logComponent, logComponent + ' payments txfee reserve: ' + fee); logger.debug(logSystem, logComponent, logComponent + ' maxBlocksPerPayment: ' + maxBlocksPerPayment); + logger.debug(logSystem, logComponent, logComponent + ' PPLNT: ' + pplntEnabled + ', time period: '+pplntTimeQualify); var daemon = new Stratum.daemon.interface([processingConfig.daemon], function(severity, message){ logger[severity](logSystem, logComponent, message); @@ -236,7 +241,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ return; } - var amount = balanceRound((tBalance - 10000) / magnitude); + var amount = satoshisToCoins(tBalance - 10000); var params = [poolOptions.address, [{'address': poolOptions.zAddress, 'amount': amount}]]; daemon.cmd('z_sendmany', params, function (result) { @@ -269,8 +274,8 @@ function SetupForPool(logger, poolOptions, setupFinished){ return; } - var amount = balanceRound((zBalance - 10000) / magnitude); - // no more than 100 ZEC at a time + var amount = satoshisToCoins(zBalance - 10000); + // unshield no more than 100 ZEC at a time if (amount > 100.0) amount = 100.0; @@ -293,7 +298,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ } ); } - + // TODO, this needs to be moved out of payments processor function cacheNetworkStats () { var params = null; @@ -369,58 +374,83 @@ function SetupForPool(logger, poolOptions, setupFinished){ var opid_interval = poolOptions.walletInterval * 1000; // shielding not required for some equihash coins if (requireShielding === true) { - setInterval(function(){ - var checkOpIdSuccessAndGetResult = function(ops) { - ops.forEach(function(op, i){ - if (op.status == "success" || op.status == "failed") { - daemon.cmd('z_getoperationresult', [[op.id]], function (result) { - if (result.error) { - logger.warning(logSystem, logComponent, 'Unable to get payment operation id result ' + JSON.stringify(result)); + var checkOpids = function() { + var checkOpIdSuccessAndGetResult = function(ops) { + var batchRPC = []; + ops.forEach(function(op, i){ + if (op.status == "success" || op.status == "failed") { + batchRPC.push(['z_getoperationresult', [[op.id]]]); + if (opidCount > 0) { + opidCount = 0; } - if (result.response) { - if (opidCount > 0) { - opidCount = 0; - } - if (op.status == "failed") { - if (op.error) { - logger.error(logSystem, logComponent, "Shielding operation failed " + op.id + " " + op.error.code +", " + op.error.message); - } else { - logger.error(logSystem, logComponent, "Shielding operation failed " + op.id); - } + if (op.status == "failed") { + if (op.error) { + logger.error(logSystem, logComponent, "Shielding operation failed " + op.id + " " + op.error.code +", " + op.error.message); } else { - logger.special(logSystem, logComponent, 'Shielding operation success ' + op.id + ' txid: ' + op.result.txid); + logger.error(logSystem, logComponent, "Shielding operation failed " + op.id); } + } else { + logger.special(logSystem, logComponent, 'Shielding operation success ' + op.id + ' txid: ' + op.result.txid); + } + } else if (op.status == "executing") { + if (opidCount == 0) { + opidCount++; + logger.special(logSystem, logComponent, 'Shielding operation in progress ' + op.id ); } - }, true, true); - } else if (op.status == "executing") { - if (opidCount == 0) { - opidCount++; - logger.special(logSystem, logComponent, 'Shielding operation in progress ' + op.id ); } + }); + if (batchRPC.length <= 0) { + opidInterval = setInterval(checkOpids, opid_interval); + return; } - }); - }; - daemon.cmd('z_getoperationstatus', null, function (result) { + daemon.batchCmd(batchRPC, function(error, results){ + if (error || !results) { + logger.error(logSystem, logComponent, 'Error with z_getoperationresult ' + JSON.stringify(error)); + return; + } + results.forEach(function(result, i) { + if (parseFloat(result.result[i].execution_secs || 0) > parseFloat(poolOptions.walletInterval)) + logger.warning(logSystem, logComponent, 'Increase walletInterval in pool_config. opid execution took '+result.result[i].execution_secs+' secs.'); + }); + opidInterval = setInterval(checkOpids, opid_interval); + }); + }; + clearInterval(opidInterval); + daemon.cmd('z_getoperationstatus', null, function (result) { if (result.error) { logger.warning(logSystem, logComponent, 'Unable to get operation ids for clearing.'); - } - if (result.response) { + opidInterval = setInterval(checkOpids, opid_interval); + } else if (result.response) { checkOpIdSuccessAndGetResult(result.response); + } else { + opidInterval = setInterval(checkOpids, opid_interval); } - }, true, true); - }, opid_interval); + }, true, true); + } + + var opidInterval = setInterval(checkOpids, opid_interval); + } + + function roundTo(n, digits) { + if (digits === undefined) { + digits = 0; + } + var multiplicator = Math.pow(10, digits); + n = parseFloat((n * multiplicator).toFixed(11)); + var test =(Math.round(n) / multiplicator); + return +(test.toFixed(digits)); } var satoshisToCoins = function(satoshis){ - return parseFloat((satoshis / magnitude).toFixed(coinPrecision)); + return roundTo((satoshis / magnitude), coinPrecision); }; var coinsToSatoshies = function(coins){ return Math.round(coins * magnitude); }; - function balanceRound(number) { - return parseFloat((Math.round(number * 100000000) / 100000000).toFixed(8)); + function coinsRound(number) { + return roundTo(number, coinPrecision); } function checkForDuplicateBlockHeight(rounds, height) { @@ -452,8 +482,10 @@ function SetupForPool(logger, poolOptions, setupFinished){ var endRPCTimer = function(){ timeSpentRPC += Date.now() - startTimeRedis }; async.waterfall([ - - /* Call redis to get an array of rounds and balances - which are coinbase transactions and block heights from submitted blocks. */ + /* + Step 1 - build workers and rounds objects from redis + * removes duplicate block submissions from redis + */ function(callback){ startRedisTimer(); redisClient.multi([ @@ -466,12 +498,12 @@ function SetupForPool(logger, poolOptions, setupFinished){ callback(true); return; } - // build worker balances + // build workers object from :balances var workers = {}; for (var w in results[0]){ workers[w] = {balance: coinsToSatoshies(parseFloat(results[0][w]))}; } - // build initial rounds data from blocksPending + // build rounds object from :blocksPending var rounds = results[1].map(function(r){ var details = r.split(':'); return { @@ -561,259 +593,297 @@ function SetupForPool(logger, poolOptions, setupFinished){ }, - /* Does a batch rpc call to daemon with all the transaction hashes to see if they are confirmed yet. - It also adds the block reward amount to the round object - which the daemon gives also gives us. */ + /* + Step 2 - check if mined block coinbase tx are ready for payment + * adds block reward to rounds object + * adds block confirmations count to rounds object + * updates confirmation counts in redis + */ function(workers, rounds, callback){ - - // first verify block confirmations by block hash - var batchRPCcommand2 = rounds.map(function(r){ - return ['getblock', [r.blockHash]]; + // get pending block tx details + var batchRPCcommand = rounds.map(function(r){ + return ['gettransaction', [r.txHash]]; }); - // guarantee a response for batchRPCcommand2 - batchRPCcommand2.push(['getblockcount']); + // get account address (not implemented at this time) + batchRPCcommand.push(['getaccount', [poolOptions.address]]); startRPCTimer(); - daemon.batchCmd(batchRPCcommand2, function(error, blockDetails){ + daemon.batchCmd(batchRPCcommand, function(error, txDetails){ endRPCTimer(); - - // error getting block info by hash? - if (error || !blockDetails){ - logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch getblock ' - + JSON.stringify(error)); + if (error || !txDetails){ + logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch gettransactions ' + JSON.stringify(error)); callback(true); return; } - // update confirmations in redis for pending blocks - var confirmsUpdate = blockDetails.map(function(b) { - if (b.result != null && b.result.confirmations > 0) { - if (b.result.confirmations > 100) { - return ['hdel', logComponent + ':blocksPendingConfirms', b.result.hash]; + var confirmsUpdate = []; + var addressAccount = ""; + + // check for transaction errors and generated coins + txDetails.forEach(function(tx, i){ + if (i === txDetails.length - 1){ + if (tx.result && tx.result.toString().length > 0) { + addressAccount = tx.result.toString(); } - return ['hset', logComponent + ':blocksPendingConfirms', b.result.hash, b.result.confirmations]; - } - return null; - }); - - // filter nulls, last item is always null... - confirmsUpdate = confirmsUpdate.filter(function(val) { return val !== null; }); - // guarantee at least one redis update - if (confirmsUpdate.length < 1) - confirmsUpdate.push(['hset', logComponent + ':blocksPendingConfirms', 0, 0]); - - startRedisTimer(); - redisClient.multi(confirmsUpdate).exec(function(error, updated){ - endRedisTimer(); - - if (error){ - logger.error(logSystem, logComponent, 'failed to update pending block confirmations' - + JSON.stringify(error)); - callback(true); return; } - - // get pending block transaction details from coin daemon - var batchRPCcommand = rounds.map(function(r){ - return ['gettransaction', [r.txHash]]; - }); - // get account address (not implemented in zcash at this time..) - batchRPCcommand.push(['getaccount', [poolOptions.address]]); + var round = rounds[i]; + // look for transaction errors + if (tx.error && tx.error.code === -5){ + logger.warning(logSystem, logComponent, 'Daemon reports invalid transaction: ' + round.txHash); + round.category = 'kicked'; + return; + } + else if (!tx.result.details || (tx.result.details && tx.result.details.length === 0)){ + logger.warning(logSystem, logComponent, 'Daemon reports no details for transaction: ' + round.txHash); + round.category = 'kicked'; + return; + } + else if (tx.error || !tx.result){ + logger.error(logSystem, logComponent, 'Odd error with gettransaction ' + round.txHash + ' ' + JSON.stringify(tx)); + return; + } + // get the coin base generation tx + var generationTx = tx.result.details.filter(function(tx){ + return tx.address === poolOptions.address; + })[0]; + if (!generationTx && tx.result.details.length === 1){ + generationTx = tx.result.details[0]; + } + if (!generationTx){ + logger.error(logSystem, logComponent, 'Missing output details to pool address for transaction ' + round.txHash); + return; + } + // get transaction category for round + round.category = generationTx.category; + round.confirmations = parseInt((tx.result.confirmations || 0)); + // get reward for newly generated blocks + if (round.category === 'generate') { + round.reward = coinsRound(parseFloat(generationTx.amount || generationTx.value)); + } + // update confirmations in redis + confirmsUpdate.push(['hset', coin + ':blocksPendingConfirms', round.blockHash, round.confirmations]); + }); - startRPCTimer(); - daemon.batchCmd(batchRPCcommand, function(error, txDetails){ - endRPCTimer(); - - if (error || !txDetails){ - logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch gettransactions ' - + JSON.stringify(error)); - callback(true); - return; + var canDeleteShares = function(r){ + for (var i = 0; i < rounds.length; i++){ + var compareR = rounds[i]; + if ((compareR.height === r.height) + && (compareR.category !== 'kicked') + && (compareR.category !== 'orphan') + && (compareR.serialized !== r.serialized)){ + return false; } + } + return true; + }; - var addressAccount = ""; + // limit blocks paid per payment round + var payingBlocks = 0; + //filter out all rounds that are immature (not confirmed or orphaned yet) + rounds = rounds.filter(function(r){ + // only pay max blocks at a time + if (payingBlocks >= maxBlocksPerPayment) + return false; - // check for transaction errors and generated coins - txDetails.forEach(function(tx, i){ - - if (i === txDetails.length - 1){ - addressAccount = tx.result; - return; - } - - var round = rounds[i]; - if (tx.error && tx.error.code === -5){ - logger.warning(logSystem, logComponent, 'Daemon reports invalid transaction: ' + round.txHash); - round.category = 'kicked'; - return; - } - else if (!tx.result.details || (tx.result.details && tx.result.details.length === 0)){ - logger.warning(logSystem, logComponent, 'Daemon reports no details for transaction: ' + round.txHash); - round.category = 'kicked'; - return; - } - else if (tx.error || !tx.result){ - logger.error(logSystem, logComponent, 'Odd error with gettransaction ' + round.txHash + ' ' - + JSON.stringify(tx)); - return; - } - - var generationTx = tx.result.details.filter(function(tx){ - return tx.address === poolOptions.address; - })[0]; - - if (!generationTx && tx.result.details.length === 1){ - generationTx = tx.result.details[0]; - } - - if (!generationTx){ - logger.error(logSystem, logComponent, 'Missing output details to pool address for transaction ' + round.txHash); - return; - } - - round.category = generationTx.category; - if (round.category === 'generate') { - round.reward = balanceRound(generationTx.amount - fee) || balanceRound(generationTx.value - fee); // TODO: Adjust fees to be dynamic - } - - }); - - var canDeleteShares = function(r){ - for (var i = 0; i < rounds.length; i++){ - var compareR = rounds[i]; - if ((compareR.height === r.height) - && (compareR.category !== 'kicked') - && (compareR.category !== 'orphan') - && (compareR.serialized !== r.serialized)){ - return false; - } - } + switch (r.category) { + case 'orphan': + case 'kicked': + r.canDeleteShares = canDeleteShares(r); + return true; + case 'generate': + payingBlocks++; return true; - }; - - // limit blocks paid per payment round - var payingBlocks = 0; - - //filter out all rounds that are immature (not confirmed or orphaned yet) - rounds = rounds.filter(function(r){ - // only pay max blocks at a time - if (payingBlocks >= maxBlocksPerPayment) - return false; - - switch (r.category) { - case 'orphan': - case 'kicked': - r.canDeleteShares = canDeleteShares(r); - return true; - case 'generate': - payingBlocks++; - return true; - - default: - return false; - } - }); + default: + return false; + } + }); - // TODO: make tx fees dynamic - var feeSatoshi = fee * magnitude; - - // calculate what the pool owes its miners - var totalOwed = parseInt(0); - for (var i = 0; i < rounds.length; i++) { - // only pay generated blocks, not orphaned or kicked - if (rounds[i].category == 'generate') { - totalOwed = totalOwed + Math.round(rounds[i].reward * magnitude) - feeSatoshi; - } + // TODO: make tx fees dynamic + var feeSatoshi = coinsToSatoshies(fee); + // calculate what the pool owes its miners + var totalOwed = parseInt(0); + for (var i = 0; i < rounds.length; i++) { + // only pay generated blocks, not orphaned or kicked + if (rounds[i].category == 'generate') { + totalOwed = totalOwed + coinsToSatoshies(rounds[i].reward) - feeSatoshi; + } + } + + var notAddr = null; + if (requireShielding === true) { + notAddr = poolOptions.address; + } + + // update confirmations for pending blocks in redis + if (confirmsUpdate.length > 0) { + startRedisTimer(); + redisClient.multi(confirmsUpdate).exec(function(error, result){ + endRedisTimer(); + if (error) { + logger.error(logSystem, logComponent, 'Error could not update confirmations for pending blocks in redis ' + JSON.stringify(error)); + return callback(true); } - - var notAddr = null; - if (requireShielding === true) { - notAddr = poolOptions.address; - } - - // check if we have enough tAddress funds to brgin payment processing + // check if we have enough tAddress funds to begin payment processing listUnspent(null, notAddr, minConfPayout, false, function (error, tBalance){ if (error) { logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments.'); return callback(true); } else if (tBalance < totalOwed) { - logger.error(logSystem, logComponent, 'Insufficient funds to process payments for ' + payingBlocks + ' blocks ('+(tBalance / magnitude).toFixed(8) + ' < ' + (totalOwed / magnitude).toFixed(8)+'). Possibly waiting for shielding process.'); + logger.error(logSystem, logComponent, 'Insufficient funds ('+satoshisToCoins(tBalance) + ') to process payments (' + satoshisToCoins(totalOwed)+') for ' + payingBlocks + ' blocks; possibly waiting for txs.'); return callback(true); - } else { - // zcash daemon does not support account feature - addressAccount = ""; - callback(null, workers, rounds, addressAccount); } - }) - + // account feature not implemented at this time + addressAccount = ""; + callback(null, workers, rounds, addressAccount); + }); }); - }); - }); + } else { + // no pending blocks, need to find a block! + return callback(true); + } + }) }, - /* Does a batch redis call to get shares contributed to each round. Then calculates the reward - amount owned to each miner for each round. */ + /* + Step 3 - lookup shares in redis and calculate rewards + */ function(workers, rounds, addressAccount, callback){ - - var shareLookups = rounds.map(function(r){ - return ['hgetall', coin + ':shares:round' + r.height] + // pplnt times lookup + var timeLookups = rounds.map(function(r){ + return ['hgetall', coin + ':shares:times' + r.height] }); - startRedisTimer(); - redisClient.multi(shareLookups).exec(function(error, allWorkerShares){ + redisClient.multi(timeLookups).exec(function(error, allWorkerTimes){ endRedisTimer(); - if (error){ - callback('Check finished - redis error with multi get rounds share'); + callback('Check finished - redis error with multi get rounds time'); return; } - - rounds.forEach(function(round, i){ - var workerShares = allWorkerShares[i]; - - if (!workerShares){ - logger.error(logSystem, logComponent, 'No worker shares for round: ' - + round.height + ' blockHash: ' + round.blockHash); + var shareLookups = rounds.map(function(r){ + return ['hgetall', coin + ':shares:round' + r.height]; + }); + startRedisTimer(); + redisClient.multi(shareLookups).exec(function(error, allWorkerShares){ + endRedisTimer(); + if (error){ + callback('Check finished - redis error with multi get rounds share'); return; } - switch (round.category){ - case 'kicked': - case 'orphan': - round.workerShares = workerShares; - break; + // error detection + var err = null; - case 'generate': - /* We found a confirmed block! Now get the reward for it and calculate how much - we owe each miner based on the shares they submitted during that block round. */ - var reward = parseInt(round.reward * magnitude); - - var totalShares = Object.keys(workerShares).reduce(function(p, c){ - return p + parseFloat(workerShares[c]) - }, 0); - - for (var workerAddress in workerShares){ - var percent = parseFloat(workerShares[workerAddress]) / totalShares; - var workerRewardTotal = Math.floor(reward * percent); - var worker = workers[workerAddress] = (workers[workerAddress] || {}); - worker.totalShares = (worker.totalShares || 0) + parseFloat(workerShares[workerAddress]); - worker.reward = (worker.reward || 0) + workerRewardTotal; - } - break; + // total shares + rounds.forEach(function(round, i){ + var workerShares = allWorkerShares[i]; + if (!workerShares){ + err = true; + logger.error(logSystem, logComponent, 'No worker shares for round: ' + round.height + ' blockHash: ' + round.blockHash); + return; + } + var workerTimes = allWorkerTimes[i]; + switch (round.category){ + case 'kicked': + case 'orphan': + round.workerShares = workerShares; + break; + case 'generate': + // TODO: make tx fees dynamic + var feeSatoshi = coinsToSatoshies(fee); + var reward = coinsToSatoshies(round.reward) - feeSatoshi; + var totalShares = parseFloat(0); + var sharesLost = parseFloat(0); + // find most time spent in this round by single worker + maxTime = 0; + for (var workerAddress in workerTimes){ + if (maxTime < parseFloat(workerTimes[workerAddress])) + maxTime = parseFloat(workerTimes[workerAddress]); + } + // total up shares for round + for (var workerAddress in workerShares){ + var worker = workers[workerAddress] = (workers[workerAddress] || {}); + var shares = parseFloat((workerShares[workerAddress] || 0)); + // if pplnt mode + if (pplntEnabled === true && maxTime > 0) { + var tshares = shares; + var lost = parseFloat(0); + var address = workerAddress.split('.')[0]; + if (workerTimes[address] != null && parseFloat(workerTimes[address]) > 0) { + var timePeriod = roundTo(parseFloat(workerTimes[address] || 1) / maxTime , 2); + if (timePeriod > 0 && timePeriod < pplntTimeQualify) { + var lost = shares - (shares * timePeriod); + sharesLost += lost; + shares = Math.max(shares - lost, 0); + logger.warning(logSystem, logComponent, 'PPLNT: Reduced shares for '+workerAddress+' round:' + round.height + ' maxTime:'+maxTime+'sec timePeriod:'+roundTo(timePeriod,6)+' shares:'+tshares+' lost:'+lost+' new:'+shares); + } + if (timePeriod > 1.0) { + err = true; + logger.error(logSystem, logComponent, 'Time share period is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash); + return; + } + } else { + logger.warning(logSystem, logComponent, 'PPLNT: Missing time share period for '+workerAddress+', miner shares qualified in round ' + round.height); + } + } + worker.roundShares = shares; + worker.totalShares = parseFloat(worker.totalShares || 0) + shares; + totalShares += shares; + } + + console.log('--REWARD DEBUG--------------'); + // calculate rewards for round + var totalAmount = 0; + for (var workerAddress in workerShares){ + var worker = workers[workerAddress] = (workers[workerAddress] || {}); + var percent = parseFloat(worker.roundShares) / totalShares; + if (percent > 1.0) { + err = true; + logger.error(logSystem, logComponent, 'Share percent is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash); + return; + } + // calculate workers reward for this round + var workerRewardTotal = Math.round(reward * percent); + // add to total reward for worker + worker.reward = (worker.reward || 0) + workerRewardTotal; + // add to total amount sent to all workers + totalAmount += worker.reward; + + console.log('rewardAmount: '+workerAddress+' '+workerRewardTotal); + console.log('totalAmount: '+workerAddress+' '+worker.reward); + } + console.log('totalAmount: '+totalAmount); + console.log('blockHeight: '+round.height); + console.log('blockReward: '+reward); + console.log('totalShares: '+totalShares); + console.log('sharesLost: '+sharesLost); + console.log('----------------------------'); + break; + } + }); + + // if there was no errors + if (err === null) { + // continue payments + callback(null, workers, rounds, addressAccount); + } else { + // stop waterfall flow, do not process payments + callback(true); } }); - - callback(null, workers, rounds, addressAccount); + }); + }, - /* Calculate if any payments are ready to be sent and trigger them sending - Get balance different for each address and pass it along as object of latest balances such as - {worker1: balance1, worker2, balance2} - when deciding the sent balance, it the difference should be -1*amount they had in db, - if not sending the balance, the differnce should be +(the amount they earned this round) - */ + + /* + Step 4 - Generate RPC commands to send payments + When deciding the sent balance, it the difference should be -1*amount they had in db, + If not sending the balance, the differnce should be +(the amount they earned this round) + */ function(workers, rounds, addressAccount, callback) { var trySend = function (withholdPercent) { @@ -828,12 +898,13 @@ function SetupForPool(logger, poolOptions, setupFinished){ totalShares += (worker.totalShares || 0) worker.balance = worker.balance || 0; worker.reward = worker.reward || 0; - var toSend = balanceRound(satoshisToCoins(Math.floor((worker.balance + worker.reward) * (1 - withholdPercent)))); + // get miner payout totals + var toSendSatoshis = Math.round((worker.balance + worker.reward) * (1 - withholdPercent)); var address = worker.address = (worker.address || getProperAddress(w.split('.')[0])); if (minerTotals[address] != null && minerTotals[address] > 0) { - minerTotals[address] = balanceRound(minerTotals[address] + toSend); + minerTotals[address] += toSendSatoshis; } else { - minerTotals[address] = toSend; + minerTotals[address] = toSendSatoshis; } } // now process each workers balance, and pay the miner @@ -841,28 +912,29 @@ function SetupForPool(logger, poolOptions, setupFinished){ var worker = workers[w]; worker.balance = worker.balance || 0; worker.reward = worker.reward || 0; - var toSend = Math.floor((worker.balance + worker.reward) * (1 - withholdPercent)); + var toSendSatoshis = Math.round((worker.balance + worker.reward) * (1 - withholdPercent)); var address = worker.address = (worker.address || getProperAddress(w.split('.')[0])); // if miners total is enough, go ahead and add this worker balance - if (minerTotals[address] >= satoshisToCoins(minPaymentSatoshis)) { - totalSent += toSend; - worker.sent = balanceRound(satoshisToCoins(toSend)); - worker.balanceChange = Math.min(worker.balance, toSend) * -1; + if (minerTotals[address] >= minPaymentSatoshis) { + totalSent += toSendSatoshis; + // send funds + worker.sent = satoshisToCoins(toSendSatoshis); + worker.balanceChange = Math.min(worker.balance, toSendSatoshis) * -1; // multiple workers may have same address, add them up if (addressAmounts[address] != null && addressAmounts[address] > 0) { - addressAmounts[address] = balanceRound(addressAmounts[address] + worker.sent); + addressAmounts[address] = coinsRound(addressAmounts[address] + worker.sent); } else { addressAmounts[address] = worker.sent; } - } - else { - worker.balanceChange = Math.max(toSend - worker.balance, 0); + } else { + // add to balance, not enough minerals worker.sent = 0; + worker.balanceChange = Math.max(toSendSatoshis - worker.balance, 0); // track balance changes if (balanceAmounts[address] != null && balanceAmounts[address] > 0) { - balanceAmounts[address] = balanceRound(balanceAmounts[address] + worker.balanceChange); + balanceAmounts[address] = coinsRound(balanceAmounts[address] + satoshisToCoins(worker.balanceChange)); } else { - balanceAmounts[address] = worker.balanceChange; + balanceAmounts[address] = satoshisToCoins(worker.balanceChange); } } } @@ -872,20 +944,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ callback(null, workers, rounds); return; } - /* - var undoPaymentsOnError = function(workers) { - totalSent = 0; - // TODO, set round.category to immature, to attempt to pay again - // we did not send anything to any workers - for (var w in workers) { - var worker = workers[w]; - if (worker.sent > 0) { - worker.balanceChange = 0; - worker.sent = 0; - } - } - }; - */ + // perform the sendmany operation daemon.cmd('sendmany', ["", addressAmounts], function (result) { // check for failed payments, there are many reasons @@ -900,21 +959,18 @@ function SetupForPool(logger, poolOptions, setupFinished){ else if (result.error && result.error.code === -5) { // invalid address specified in addressAmounts array logger.error(logSystem, logComponent, 'Error sending payments ' + result.error.message); - //undoPaymentsOnError(workers); callback(true); return; } else if (result.error && result.error.message != null) { // unknown error from daemon logger.error(logSystem, logComponent, 'Error sending payments ' + result.error.message); - //undoPaymentsOnError(workers); callback(true); return; } else if (result.error) { // some other unknown error logger.error(logSystem, logComponent, 'Error sending payments ' + JSON.stringify(result.error)); - //undoPaymentsOnError(workers); callback(true); return; } @@ -928,7 +984,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ if (txid != null) { // it worked, congrats on your pools payout ;) - logger.special(logSystem, logComponent, 'Sent ' + (totalSent / magnitude).toFixed(8) + logger.special(logSystem, logComponent, 'Sent ' + satoshisToCoins(totalSent) + ' to ' + Object.keys(addressAmounts).length + ' miners; txid: '+txid); if (withholdPercent > 0) { @@ -938,11 +994,12 @@ function SetupForPool(logger, poolOptions, setupFinished){ } // save payments data to redis - var paymentBlocks = rounds.map(function(r){ + var paymentBlocks = rounds.filter(function(r){ return r.category == 'generate'; }).map(function(r){ return parseInt(r.height); }); + var paymentsUpdate = []; - var paymentsData = {time:Date.now(), txid:txid, shares:totalShares, paid:balanceRound(totalSent / magnitude), miners:Object.keys(addressAmounts).length, blocks: paymentBlocks, amounts: addressAmounts, balances: balanceAmounts}; + var paymentsData = {time:Date.now(), txid:txid, shares:totalShares, paid:satoshisToCoins(totalSent), miners:Object.keys(addressAmounts).length, blocks: paymentBlocks, amounts: addressAmounts, balances: balanceAmounts}; paymentsUpdate.push(['zadd', logComponent + ':payments', Date.now(), JSON.stringify(paymentsData)]); startRedisTimer(); redisClient.multi(paymentsUpdate).exec(function(error, payments){ @@ -950,6 +1007,7 @@ function SetupForPool(logger, poolOptions, setupFinished){ if (error){ logger.error(logSystem, logComponent, 'Error redis save payments data ' + JSON.stringify(payments)); } + // perform final redis updates callback(null, workers, rounds); }); @@ -965,12 +1023,16 @@ function SetupForPool(logger, poolOptions, setupFinished){ } } }, true, true); - }; trySend(0); }, + + + /* + Step 5 - Final redis commands + */ function(workers, rounds, callback){ var totalPaid = parseFloat(0); @@ -986,19 +1048,20 @@ function SetupForPool(logger, poolOptions, setupFinished){ 'hincrbyfloat', coin + ':balances', w, - balanceRound(satoshisToCoins(worker.balanceChange)) + satoshisToCoins(worker.balanceChange) ]); } if (worker.sent !== 0){ - workerPayoutsCommand.push(['hincrbyfloat', coin + ':payouts', w, balanceRound(worker.sent)]); - totalPaid = balanceRound(totalPaid + worker.sent); + workerPayoutsCommand.push(['hincrbyfloat', coin + ':payouts', w, coinsRound(worker.sent)]); + totalPaid = coinsRound(totalPaid + worker.sent); } } var movePendingCommands = []; var roundsToDelete = []; var orphanMergeCommands = []; - + var confirmsToDelete = []; + var moveSharesToCurrent = function(r){ var workerShares = r.workerShares; if (workerShares != null) { @@ -1012,17 +1075,22 @@ function SetupForPool(logger, poolOptions, setupFinished){ rounds.forEach(function(r){ switch(r.category){ case 'kicked': + confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]); movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksKicked', r.serialized]); case 'orphan': + confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]); movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksOrphaned', r.serialized]); if (r.canDeleteShares){ moveSharesToCurrent(r); roundsToDelete.push(coin + ':shares:round' + r.height); + roundsToDelete.push(coin + ':shares:times' + r.height); } return; case 'generate': + confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]); movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksConfirmed', r.serialized]); roundsToDelete.push(coin + ':shares:round' + r.height); + roundsToDelete.push(coin + ':shares:times' + r.height); return; } }); @@ -1044,8 +1112,11 @@ function SetupForPool(logger, poolOptions, setupFinished){ if (roundsToDelete.length > 0) finalRedisCommands.push(['del'].concat(roundsToDelete)); + if (confirmsToDelete.length > 0) + finalRedisCommands = finalRedisCommands.concat(confirmsToDelete); + if (totalPaid !== 0) - finalRedisCommands.push(['hincrbyfloat', coin + ':stats', 'totalPaid', balanceRound(totalPaid)]); + finalRedisCommands.push(['hincrbyfloat', coin + ':stats', 'totalPaid', totalPaid]); if (finalRedisCommands.length === 0){ callback(); @@ -1055,12 +1126,14 @@ function SetupForPool(logger, poolOptions, setupFinished){ startRedisTimer(); redisClient.multi(finalRedisCommands).exec(function(error, results){ endRedisTimer(); - if (error){ + if (error) { clearInterval(paymentInterval); + logger.error(logSystem, logComponent, 'Payments sent but could not update redis. ' + JSON.stringify(error) + ' Disabling payment processing to prevent possible double-payouts. The redis commands in ' + coin + '_finalRedisCommands.txt must be ran manually'); + fs.writeFile(coin + '_finalRedisCommands.txt', JSON.stringify(finalRedisCommands), function(err){ logger.error('Could not write finalRedisCommands.txt, you are fucked.'); });