Update paymentProcessor.js

Experimental PPLNT support (console.logs need to be removed after experiments)
Major reduction in RPC calls to daemon.
Balance rounding improvements.
Improvements to confirmations tracking.
Improvements to operation id tracking.
Improved error handling.
This commit is contained in:
hellcatz 2017-05-03 22:20:08 -07:00 committed by GitHub
parent aae3018f55
commit 774e2b0088
1 changed files with 362 additions and 289 deletions

View File

@ -54,13 +54,18 @@ function SetupForPool(logger, poolOptions, setupFinished){
var minConfPayout = 3;
var maxBlocksPerPayment = processingConfig.maxBlocksPerPayment || 3;
// pplnt - pay per last N time shares
var pplntEnabled = processingConfig.paymentMode === "pplnt" || false;
var pplntTimeQualify = processingConfig.pplnt || 0.51; // 51%
var requireShielding = poolOptions.coin.requireShielding === true;
var fee = parseFloat(poolOptions.coin.txfee) || parseFloat(0.0004);
logger.debug(logSystem, logComponent, logComponent + ' requireShielding: ' + requireShielding);
logger.debug(logSystem, logComponent, logComponent + ' payments txfee reserve: ' + fee);
logger.debug(logSystem, logComponent, logComponent + ' maxBlocksPerPayment: ' + maxBlocksPerPayment);
logger.debug(logSystem, logComponent, logComponent + ' PPLNT: ' + pplntEnabled + ', time period: '+pplntTimeQualify);
var daemon = new Stratum.daemon.interface([processingConfig.daemon], function(severity, message){
logger[severity](logSystem, logComponent, message);
@ -236,7 +241,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
return;
}
var amount = balanceRound((tBalance - 10000) / magnitude);
var amount = satoshisToCoins(tBalance - 10000);
var params = [poolOptions.address, [{'address': poolOptions.zAddress, 'amount': amount}]];
daemon.cmd('z_sendmany', params,
function (result) {
@ -269,8 +274,8 @@ function SetupForPool(logger, poolOptions, setupFinished){
return;
}
var amount = balanceRound((zBalance - 10000) / magnitude);
// no more than 100 ZEC at a time
var amount = satoshisToCoins(zBalance - 10000);
// unshield no more than 100 ZEC at a time
if (amount > 100.0)
amount = 100.0;
@ -293,7 +298,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
}
);
}
// TODO, this needs to be moved out of payments processor
function cacheNetworkStats () {
var params = null;
@ -369,58 +374,83 @@ function SetupForPool(logger, poolOptions, setupFinished){
var opid_interval = poolOptions.walletInterval * 1000;
// shielding not required for some equihash coins
if (requireShielding === true) {
setInterval(function(){
var checkOpIdSuccessAndGetResult = function(ops) {
ops.forEach(function(op, i){
if (op.status == "success" || op.status == "failed") {
daemon.cmd('z_getoperationresult', [[op.id]], function (result) {
if (result.error) {
logger.warning(logSystem, logComponent, 'Unable to get payment operation id result ' + JSON.stringify(result));
var checkOpids = function() {
var checkOpIdSuccessAndGetResult = function(ops) {
var batchRPC = [];
ops.forEach(function(op, i){
if (op.status == "success" || op.status == "failed") {
batchRPC.push(['z_getoperationresult', [[op.id]]]);
if (opidCount > 0) {
opidCount = 0;
}
if (result.response) {
if (opidCount > 0) {
opidCount = 0;
}
if (op.status == "failed") {
if (op.error) {
logger.error(logSystem, logComponent, "Shielding operation failed " + op.id + " " + op.error.code +", " + op.error.message);
} else {
logger.error(logSystem, logComponent, "Shielding operation failed " + op.id);
}
if (op.status == "failed") {
if (op.error) {
logger.error(logSystem, logComponent, "Shielding operation failed " + op.id + " " + op.error.code +", " + op.error.message);
} else {
logger.special(logSystem, logComponent, 'Shielding operation success ' + op.id + ' txid: ' + op.result.txid);
logger.error(logSystem, logComponent, "Shielding operation failed " + op.id);
}
} else {
logger.special(logSystem, logComponent, 'Shielding operation success ' + op.id + ' txid: ' + op.result.txid);
}
} else if (op.status == "executing") {
if (opidCount == 0) {
opidCount++;
logger.special(logSystem, logComponent, 'Shielding operation in progress ' + op.id );
}
}, true, true);
} else if (op.status == "executing") {
if (opidCount == 0) {
opidCount++;
logger.special(logSystem, logComponent, 'Shielding operation in progress ' + op.id );
}
});
if (batchRPC.length <= 0) {
opidInterval = setInterval(checkOpids, opid_interval);
return;
}
});
};
daemon.cmd('z_getoperationstatus', null, function (result) {
daemon.batchCmd(batchRPC, function(error, results){
if (error || !results) {
logger.error(logSystem, logComponent, 'Error with z_getoperationresult ' + JSON.stringify(error));
return;
}
results.forEach(function(result, i) {
if (parseFloat(result.result[i].execution_secs || 0) > parseFloat(poolOptions.walletInterval))
logger.warning(logSystem, logComponent, 'Increase walletInterval in pool_config. opid execution took '+result.result[i].execution_secs+' secs.');
});
opidInterval = setInterval(checkOpids, opid_interval);
});
};
clearInterval(opidInterval);
daemon.cmd('z_getoperationstatus', null, function (result) {
if (result.error) {
logger.warning(logSystem, logComponent, 'Unable to get operation ids for clearing.');
}
if (result.response) {
opidInterval = setInterval(checkOpids, opid_interval);
} else if (result.response) {
checkOpIdSuccessAndGetResult(result.response);
} else {
opidInterval = setInterval(checkOpids, opid_interval);
}
}, true, true);
}, opid_interval);
}, true, true);
}
var opidInterval = setInterval(checkOpids, opid_interval);
}
function roundTo(n, digits) {
if (digits === undefined) {
digits = 0;
}
var multiplicator = Math.pow(10, digits);
n = parseFloat((n * multiplicator).toFixed(11));
var test =(Math.round(n) / multiplicator);
return +(test.toFixed(digits));
}
var satoshisToCoins = function(satoshis){
return parseFloat((satoshis / magnitude).toFixed(coinPrecision));
return roundTo((satoshis / magnitude), coinPrecision);
};
var coinsToSatoshies = function(coins){
return Math.round(coins * magnitude);
};
function balanceRound(number) {
return parseFloat((Math.round(number * 100000000) / 100000000).toFixed(8));
function coinsRound(number) {
return roundTo(number, coinPrecision);
}
function checkForDuplicateBlockHeight(rounds, height) {
@ -452,8 +482,10 @@ function SetupForPool(logger, poolOptions, setupFinished){
var endRPCTimer = function(){ timeSpentRPC += Date.now() - startTimeRedis };
async.waterfall([
/* Call redis to get an array of rounds and balances - which are coinbase transactions and block heights from submitted blocks. */
/*
Step 1 - build workers and rounds objects from redis
* removes duplicate block submissions from redis
*/
function(callback){
startRedisTimer();
redisClient.multi([
@ -466,12 +498,12 @@ function SetupForPool(logger, poolOptions, setupFinished){
callback(true);
return;
}
// build worker balances
// build workers object from :balances
var workers = {};
for (var w in results[0]){
workers[w] = {balance: coinsToSatoshies(parseFloat(results[0][w]))};
}
// build initial rounds data from blocksPending
// build rounds object from :blocksPending
var rounds = results[1].map(function(r){
var details = r.split(':');
return {
@ -561,259 +593,297 @@ function SetupForPool(logger, poolOptions, setupFinished){
},
/* Does a batch rpc call to daemon with all the transaction hashes to see if they are confirmed yet.
It also adds the block reward amount to the round object - which the daemon gives also gives us. */
/*
Step 2 - check if mined block coinbase tx are ready for payment
* adds block reward to rounds object
* adds block confirmations count to rounds object
* updates confirmation counts in redis
*/
function(workers, rounds, callback){
// first verify block confirmations by block hash
var batchRPCcommand2 = rounds.map(function(r){
return ['getblock', [r.blockHash]];
// get pending block tx details
var batchRPCcommand = rounds.map(function(r){
return ['gettransaction', [r.txHash]];
});
// guarantee a response for batchRPCcommand2
batchRPCcommand2.push(['getblockcount']);
// get account address (not implemented at this time)
batchRPCcommand.push(['getaccount', [poolOptions.address]]);
startRPCTimer();
daemon.batchCmd(batchRPCcommand2, function(error, blockDetails){
daemon.batchCmd(batchRPCcommand, function(error, txDetails){
endRPCTimer();
// error getting block info by hash?
if (error || !blockDetails){
logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch getblock '
+ JSON.stringify(error));
if (error || !txDetails){
logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch gettransactions ' + JSON.stringify(error));
callback(true);
return;
}
// update confirmations in redis for pending blocks
var confirmsUpdate = blockDetails.map(function(b) {
if (b.result != null && b.result.confirmations > 0) {
if (b.result.confirmations > 100) {
return ['hdel', logComponent + ':blocksPendingConfirms', b.result.hash];
var confirmsUpdate = [];
var addressAccount = "";
// check for transaction errors and generated coins
txDetails.forEach(function(tx, i){
if (i === txDetails.length - 1){
if (tx.result && tx.result.toString().length > 0) {
addressAccount = tx.result.toString();
}
return ['hset', logComponent + ':blocksPendingConfirms', b.result.hash, b.result.confirmations];
}
return null;
});
// filter nulls, last item is always null...
confirmsUpdate = confirmsUpdate.filter(function(val) { return val !== null; });
// guarantee at least one redis update
if (confirmsUpdate.length < 1)
confirmsUpdate.push(['hset', logComponent + ':blocksPendingConfirms', 0, 0]);
startRedisTimer();
redisClient.multi(confirmsUpdate).exec(function(error, updated){
endRedisTimer();
if (error){
logger.error(logSystem, logComponent, 'failed to update pending block confirmations'
+ JSON.stringify(error));
callback(true);
return;
}
// get pending block transaction details from coin daemon
var batchRPCcommand = rounds.map(function(r){
return ['gettransaction', [r.txHash]];
});
// get account address (not implemented in zcash at this time..)
batchRPCcommand.push(['getaccount', [poolOptions.address]]);
var round = rounds[i];
// look for transaction errors
if (tx.error && tx.error.code === -5){
logger.warning(logSystem, logComponent, 'Daemon reports invalid transaction: ' + round.txHash);
round.category = 'kicked';
return;
}
else if (!tx.result.details || (tx.result.details && tx.result.details.length === 0)){
logger.warning(logSystem, logComponent, 'Daemon reports no details for transaction: ' + round.txHash);
round.category = 'kicked';
return;
}
else if (tx.error || !tx.result){
logger.error(logSystem, logComponent, 'Odd error with gettransaction ' + round.txHash + ' ' + JSON.stringify(tx));
return;
}
// get the coin base generation tx
var generationTx = tx.result.details.filter(function(tx){
return tx.address === poolOptions.address;
})[0];
if (!generationTx && tx.result.details.length === 1){
generationTx = tx.result.details[0];
}
if (!generationTx){
logger.error(logSystem, logComponent, 'Missing output details to pool address for transaction ' + round.txHash);
return;
}
// get transaction category for round
round.category = generationTx.category;
round.confirmations = parseInt((tx.result.confirmations || 0));
// get reward for newly generated blocks
if (round.category === 'generate') {
round.reward = coinsRound(parseFloat(generationTx.amount || generationTx.value));
}
// update confirmations in redis
confirmsUpdate.push(['hset', coin + ':blocksPendingConfirms', round.blockHash, round.confirmations]);
});
startRPCTimer();
daemon.batchCmd(batchRPCcommand, function(error, txDetails){
endRPCTimer();
if (error || !txDetails){
logger.error(logSystem, logComponent, 'Check finished - daemon rpc error with batch gettransactions '
+ JSON.stringify(error));
callback(true);
return;
var canDeleteShares = function(r){
for (var i = 0; i < rounds.length; i++){
var compareR = rounds[i];
if ((compareR.height === r.height)
&& (compareR.category !== 'kicked')
&& (compareR.category !== 'orphan')
&& (compareR.serialized !== r.serialized)){
return false;
}
}
return true;
};
var addressAccount = "";
// limit blocks paid per payment round
var payingBlocks = 0;
//filter out all rounds that are immature (not confirmed or orphaned yet)
rounds = rounds.filter(function(r){
// only pay max blocks at a time
if (payingBlocks >= maxBlocksPerPayment)
return false;
// check for transaction errors and generated coins
txDetails.forEach(function(tx, i){
if (i === txDetails.length - 1){
addressAccount = tx.result;
return;
}
var round = rounds[i];
if (tx.error && tx.error.code === -5){
logger.warning(logSystem, logComponent, 'Daemon reports invalid transaction: ' + round.txHash);
round.category = 'kicked';
return;
}
else if (!tx.result.details || (tx.result.details && tx.result.details.length === 0)){
logger.warning(logSystem, logComponent, 'Daemon reports no details for transaction: ' + round.txHash);
round.category = 'kicked';
return;
}
else if (tx.error || !tx.result){
logger.error(logSystem, logComponent, 'Odd error with gettransaction ' + round.txHash + ' '
+ JSON.stringify(tx));
return;
}
var generationTx = tx.result.details.filter(function(tx){
return tx.address === poolOptions.address;
})[0];
if (!generationTx && tx.result.details.length === 1){
generationTx = tx.result.details[0];
}
if (!generationTx){
logger.error(logSystem, logComponent, 'Missing output details to pool address for transaction ' + round.txHash);
return;
}
round.category = generationTx.category;
if (round.category === 'generate') {
round.reward = balanceRound(generationTx.amount - fee) || balanceRound(generationTx.value - fee); // TODO: Adjust fees to be dynamic
}
});
var canDeleteShares = function(r){
for (var i = 0; i < rounds.length; i++){
var compareR = rounds[i];
if ((compareR.height === r.height)
&& (compareR.category !== 'kicked')
&& (compareR.category !== 'orphan')
&& (compareR.serialized !== r.serialized)){
return false;
}
}
switch (r.category) {
case 'orphan':
case 'kicked':
r.canDeleteShares = canDeleteShares(r);
return true;
case 'generate':
payingBlocks++;
return true;
};
// limit blocks paid per payment round
var payingBlocks = 0;
//filter out all rounds that are immature (not confirmed or orphaned yet)
rounds = rounds.filter(function(r){
// only pay max blocks at a time
if (payingBlocks >= maxBlocksPerPayment)
return false;
switch (r.category) {
case 'orphan':
case 'kicked':
r.canDeleteShares = canDeleteShares(r);
return true;
case 'generate':
payingBlocks++;
return true;
default:
return false;
}
});
default:
return false;
}
});
// TODO: make tx fees dynamic
var feeSatoshi = fee * magnitude;
// calculate what the pool owes its miners
var totalOwed = parseInt(0);
for (var i = 0; i < rounds.length; i++) {
// only pay generated blocks, not orphaned or kicked
if (rounds[i].category == 'generate') {
totalOwed = totalOwed + Math.round(rounds[i].reward * magnitude) - feeSatoshi;
}
// TODO: make tx fees dynamic
var feeSatoshi = coinsToSatoshies(fee);
// calculate what the pool owes its miners
var totalOwed = parseInt(0);
for (var i = 0; i < rounds.length; i++) {
// only pay generated blocks, not orphaned or kicked
if (rounds[i].category == 'generate') {
totalOwed = totalOwed + coinsToSatoshies(rounds[i].reward) - feeSatoshi;
}
}
var notAddr = null;
if (requireShielding === true) {
notAddr = poolOptions.address;
}
// update confirmations for pending blocks in redis
if (confirmsUpdate.length > 0) {
startRedisTimer();
redisClient.multi(confirmsUpdate).exec(function(error, result){
endRedisTimer();
if (error) {
logger.error(logSystem, logComponent, 'Error could not update confirmations for pending blocks in redis ' + JSON.stringify(error));
return callback(true);
}
var notAddr = null;
if (requireShielding === true) {
notAddr = poolOptions.address;
}
// check if we have enough tAddress funds to brgin payment processing
// check if we have enough tAddress funds to begin payment processing
listUnspent(null, notAddr, minConfPayout, false, function (error, tBalance){
if (error) {
logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments.');
return callback(true);
} else if (tBalance < totalOwed) {
logger.error(logSystem, logComponent, 'Insufficient funds to process payments for ' + payingBlocks + ' blocks ('+(tBalance / magnitude).toFixed(8) + ' < ' + (totalOwed / magnitude).toFixed(8)+'). Possibly waiting for shielding process.');
logger.error(logSystem, logComponent, 'Insufficient funds ('+satoshisToCoins(tBalance) + ') to process payments (' + satoshisToCoins(totalOwed)+') for ' + payingBlocks + ' blocks; possibly waiting for txs.');
return callback(true);
} else {
// zcash daemon does not support account feature
addressAccount = "";
callback(null, workers, rounds, addressAccount);
}
})
// account feature not implemented at this time
addressAccount = "";
callback(null, workers, rounds, addressAccount);
});
});
});
});
} else {
// no pending blocks, need to find a block!
return callback(true);
}
})
},
/* Does a batch redis call to get shares contributed to each round. Then calculates the reward
amount owned to each miner for each round. */
/*
Step 3 - lookup shares in redis and calculate rewards
*/
function(workers, rounds, addressAccount, callback){
var shareLookups = rounds.map(function(r){
return ['hgetall', coin + ':shares:round' + r.height]
// pplnt times lookup
var timeLookups = rounds.map(function(r){
return ['hgetall', coin + ':shares:times' + r.height]
});
startRedisTimer();
redisClient.multi(shareLookups).exec(function(error, allWorkerShares){
redisClient.multi(timeLookups).exec(function(error, allWorkerTimes){
endRedisTimer();
if (error){
callback('Check finished - redis error with multi get rounds share');
callback('Check finished - redis error with multi get rounds time');
return;
}
rounds.forEach(function(round, i){
var workerShares = allWorkerShares[i];
if (!workerShares){
logger.error(logSystem, logComponent, 'No worker shares for round: '
+ round.height + ' blockHash: ' + round.blockHash);
var shareLookups = rounds.map(function(r){
return ['hgetall', coin + ':shares:round' + r.height];
});
startRedisTimer();
redisClient.multi(shareLookups).exec(function(error, allWorkerShares){
endRedisTimer();
if (error){
callback('Check finished - redis error with multi get rounds share');
return;
}
switch (round.category){
case 'kicked':
case 'orphan':
round.workerShares = workerShares;
break;
// error detection
var err = null;
case 'generate':
/* We found a confirmed block! Now get the reward for it and calculate how much
we owe each miner based on the shares they submitted during that block round. */
var reward = parseInt(round.reward * magnitude);
var totalShares = Object.keys(workerShares).reduce(function(p, c){
return p + parseFloat(workerShares[c])
}, 0);
for (var workerAddress in workerShares){
var percent = parseFloat(workerShares[workerAddress]) / totalShares;
var workerRewardTotal = Math.floor(reward * percent);
var worker = workers[workerAddress] = (workers[workerAddress] || {});
worker.totalShares = (worker.totalShares || 0) + parseFloat(workerShares[workerAddress]);
worker.reward = (worker.reward || 0) + workerRewardTotal;
}
break;
// total shares
rounds.forEach(function(round, i){
var workerShares = allWorkerShares[i];
if (!workerShares){
err = true;
logger.error(logSystem, logComponent, 'No worker shares for round: ' + round.height + ' blockHash: ' + round.blockHash);
return;
}
var workerTimes = allWorkerTimes[i];
switch (round.category){
case 'kicked':
case 'orphan':
round.workerShares = workerShares;
break;
case 'generate':
// TODO: make tx fees dynamic
var feeSatoshi = coinsToSatoshies(fee);
var reward = coinsToSatoshies(round.reward) - feeSatoshi;
var totalShares = parseFloat(0);
var sharesLost = parseFloat(0);
// find most time spent in this round by single worker
maxTime = 0;
for (var workerAddress in workerTimes){
if (maxTime < parseFloat(workerTimes[workerAddress]))
maxTime = parseFloat(workerTimes[workerAddress]);
}
// total up shares for round
for (var workerAddress in workerShares){
var worker = workers[workerAddress] = (workers[workerAddress] || {});
var shares = parseFloat((workerShares[workerAddress] || 0));
// if pplnt mode
if (pplntEnabled === true && maxTime > 0) {
var tshares = shares;
var lost = parseFloat(0);
var address = workerAddress.split('.')[0];
if (workerTimes[address] != null && parseFloat(workerTimes[address]) > 0) {
var timePeriod = roundTo(parseFloat(workerTimes[address] || 1) / maxTime , 2);
if (timePeriod > 0 && timePeriod < pplntTimeQualify) {
var lost = shares - (shares * timePeriod);
sharesLost += lost;
shares = Math.max(shares - lost, 0);
logger.warning(logSystem, logComponent, 'PPLNT: Reduced shares for '+workerAddress+' round:' + round.height + ' maxTime:'+maxTime+'sec timePeriod:'+roundTo(timePeriod,6)+' shares:'+tshares+' lost:'+lost+' new:'+shares);
}
if (timePeriod > 1.0) {
err = true;
logger.error(logSystem, logComponent, 'Time share period is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash);
return;
}
} else {
logger.warning(logSystem, logComponent, 'PPLNT: Missing time share period for '+workerAddress+', miner shares qualified in round ' + round.height);
}
}
worker.roundShares = shares;
worker.totalShares = parseFloat(worker.totalShares || 0) + shares;
totalShares += shares;
}
console.log('--REWARD DEBUG--------------');
// calculate rewards for round
var totalAmount = 0;
for (var workerAddress in workerShares){
var worker = workers[workerAddress] = (workers[workerAddress] || {});
var percent = parseFloat(worker.roundShares) / totalShares;
if (percent > 1.0) {
err = true;
logger.error(logSystem, logComponent, 'Share percent is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash);
return;
}
// calculate workers reward for this round
var workerRewardTotal = Math.round(reward * percent);
// add to total reward for worker
worker.reward = (worker.reward || 0) + workerRewardTotal;
// add to total amount sent to all workers
totalAmount += worker.reward;
console.log('rewardAmount: '+workerAddress+' '+workerRewardTotal);
console.log('totalAmount: '+workerAddress+' '+worker.reward);
}
console.log('totalAmount: '+totalAmount);
console.log('blockHeight: '+round.height);
console.log('blockReward: '+reward);
console.log('totalShares: '+totalShares);
console.log('sharesLost: '+sharesLost);
console.log('----------------------------');
break;
}
});
// if there was no errors
if (err === null) {
// continue payments
callback(null, workers, rounds, addressAccount);
} else {
// stop waterfall flow, do not process payments
callback(true);
}
});
callback(null, workers, rounds, addressAccount);
});
},
/* Calculate if any payments are ready to be sent and trigger them sending
Get balance different for each address and pass it along as object of latest balances such as
{worker1: balance1, worker2, balance2}
when deciding the sent balance, it the difference should be -1*amount they had in db,
if not sending the balance, the differnce should be +(the amount they earned this round)
*/
/*
Step 4 - Generate RPC commands to send payments
When deciding the sent balance, it the difference should be -1*amount they had in db,
If not sending the balance, the differnce should be +(the amount they earned this round)
*/
function(workers, rounds, addressAccount, callback) {
var trySend = function (withholdPercent) {
@ -828,12 +898,13 @@ function SetupForPool(logger, poolOptions, setupFinished){
totalShares += (worker.totalShares || 0)
worker.balance = worker.balance || 0;
worker.reward = worker.reward || 0;
var toSend = balanceRound(satoshisToCoins(Math.floor((worker.balance + worker.reward) * (1 - withholdPercent))));
// get miner payout totals
var toSendSatoshis = Math.round((worker.balance + worker.reward) * (1 - withholdPercent));
var address = worker.address = (worker.address || getProperAddress(w.split('.')[0]));
if (minerTotals[address] != null && minerTotals[address] > 0) {
minerTotals[address] = balanceRound(minerTotals[address] + toSend);
minerTotals[address] += toSendSatoshis;
} else {
minerTotals[address] = toSend;
minerTotals[address] = toSendSatoshis;
}
}
// now process each workers balance, and pay the miner
@ -841,28 +912,29 @@ function SetupForPool(logger, poolOptions, setupFinished){
var worker = workers[w];
worker.balance = worker.balance || 0;
worker.reward = worker.reward || 0;
var toSend = Math.floor((worker.balance + worker.reward) * (1 - withholdPercent));
var toSendSatoshis = Math.round((worker.balance + worker.reward) * (1 - withholdPercent));
var address = worker.address = (worker.address || getProperAddress(w.split('.')[0]));
// if miners total is enough, go ahead and add this worker balance
if (minerTotals[address] >= satoshisToCoins(minPaymentSatoshis)) {
totalSent += toSend;
worker.sent = balanceRound(satoshisToCoins(toSend));
worker.balanceChange = Math.min(worker.balance, toSend) * -1;
if (minerTotals[address] >= minPaymentSatoshis) {
totalSent += toSendSatoshis;
// send funds
worker.sent = satoshisToCoins(toSendSatoshis);
worker.balanceChange = Math.min(worker.balance, toSendSatoshis) * -1;
// multiple workers may have same address, add them up
if (addressAmounts[address] != null && addressAmounts[address] > 0) {
addressAmounts[address] = balanceRound(addressAmounts[address] + worker.sent);
addressAmounts[address] = coinsRound(addressAmounts[address] + worker.sent);
} else {
addressAmounts[address] = worker.sent;
}
}
else {
worker.balanceChange = Math.max(toSend - worker.balance, 0);
} else {
// add to balance, not enough minerals
worker.sent = 0;
worker.balanceChange = Math.max(toSendSatoshis - worker.balance, 0);
// track balance changes
if (balanceAmounts[address] != null && balanceAmounts[address] > 0) {
balanceAmounts[address] = balanceRound(balanceAmounts[address] + worker.balanceChange);
balanceAmounts[address] = coinsRound(balanceAmounts[address] + satoshisToCoins(worker.balanceChange));
} else {
balanceAmounts[address] = worker.balanceChange;
balanceAmounts[address] = satoshisToCoins(worker.balanceChange);
}
}
}
@ -872,20 +944,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
callback(null, workers, rounds);
return;
}
/*
var undoPaymentsOnError = function(workers) {
totalSent = 0;
// TODO, set round.category to immature, to attempt to pay again
// we did not send anything to any workers
for (var w in workers) {
var worker = workers[w];
if (worker.sent > 0) {
worker.balanceChange = 0;
worker.sent = 0;
}
}
};
*/
// perform the sendmany operation
daemon.cmd('sendmany', ["", addressAmounts], function (result) {
// check for failed payments, there are many reasons
@ -900,21 +959,18 @@ function SetupForPool(logger, poolOptions, setupFinished){
else if (result.error && result.error.code === -5) {
// invalid address specified in addressAmounts array
logger.error(logSystem, logComponent, 'Error sending payments ' + result.error.message);
//undoPaymentsOnError(workers);
callback(true);
return;
}
else if (result.error && result.error.message != null) {
// unknown error from daemon
logger.error(logSystem, logComponent, 'Error sending payments ' + result.error.message);
//undoPaymentsOnError(workers);
callback(true);
return;
}
else if (result.error) {
// some other unknown error
logger.error(logSystem, logComponent, 'Error sending payments ' + JSON.stringify(result.error));
//undoPaymentsOnError(workers);
callback(true);
return;
}
@ -928,7 +984,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
if (txid != null) {
// it worked, congrats on your pools payout ;)
logger.special(logSystem, logComponent, 'Sent ' + (totalSent / magnitude).toFixed(8)
logger.special(logSystem, logComponent, 'Sent ' + satoshisToCoins(totalSent)
+ ' to ' + Object.keys(addressAmounts).length + ' miners; txid: '+txid);
if (withholdPercent > 0) {
@ -938,11 +994,12 @@ function SetupForPool(logger, poolOptions, setupFinished){
}
// save payments data to redis
var paymentBlocks = rounds.map(function(r){
var paymentBlocks = rounds.filter(function(r){ return r.category == 'generate'; }).map(function(r){
return parseInt(r.height);
});
var paymentsUpdate = [];
var paymentsData = {time:Date.now(), txid:txid, shares:totalShares, paid:balanceRound(totalSent / magnitude), miners:Object.keys(addressAmounts).length, blocks: paymentBlocks, amounts: addressAmounts, balances: balanceAmounts};
var paymentsData = {time:Date.now(), txid:txid, shares:totalShares, paid:satoshisToCoins(totalSent), miners:Object.keys(addressAmounts).length, blocks: paymentBlocks, amounts: addressAmounts, balances: balanceAmounts};
paymentsUpdate.push(['zadd', logComponent + ':payments', Date.now(), JSON.stringify(paymentsData)]);
startRedisTimer();
redisClient.multi(paymentsUpdate).exec(function(error, payments){
@ -950,6 +1007,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
if (error){
logger.error(logSystem, logComponent, 'Error redis save payments data ' + JSON.stringify(payments));
}
// perform final redis updates
callback(null, workers, rounds);
});
@ -965,12 +1023,16 @@ function SetupForPool(logger, poolOptions, setupFinished){
}
}
}, true, true);
};
trySend(0);
},
/*
Step 5 - Final redis commands
*/
function(workers, rounds, callback){
var totalPaid = parseFloat(0);
@ -986,19 +1048,20 @@ function SetupForPool(logger, poolOptions, setupFinished){
'hincrbyfloat',
coin + ':balances',
w,
balanceRound(satoshisToCoins(worker.balanceChange))
satoshisToCoins(worker.balanceChange)
]);
}
if (worker.sent !== 0){
workerPayoutsCommand.push(['hincrbyfloat', coin + ':payouts', w, balanceRound(worker.sent)]);
totalPaid = balanceRound(totalPaid + worker.sent);
workerPayoutsCommand.push(['hincrbyfloat', coin + ':payouts', w, coinsRound(worker.sent)]);
totalPaid = coinsRound(totalPaid + worker.sent);
}
}
var movePendingCommands = [];
var roundsToDelete = [];
var orphanMergeCommands = [];
var confirmsToDelete = [];
var moveSharesToCurrent = function(r){
var workerShares = r.workerShares;
if (workerShares != null) {
@ -1012,17 +1075,22 @@ function SetupForPool(logger, poolOptions, setupFinished){
rounds.forEach(function(r){
switch(r.category){
case 'kicked':
confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]);
movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksKicked', r.serialized]);
case 'orphan':
confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]);
movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksOrphaned', r.serialized]);
if (r.canDeleteShares){
moveSharesToCurrent(r);
roundsToDelete.push(coin + ':shares:round' + r.height);
roundsToDelete.push(coin + ':shares:times' + r.height);
}
return;
case 'generate':
confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]);
movePendingCommands.push(['smove', coin + ':blocksPending', coin + ':blocksConfirmed', r.serialized]);
roundsToDelete.push(coin + ':shares:round' + r.height);
roundsToDelete.push(coin + ':shares:times' + r.height);
return;
}
});
@ -1044,8 +1112,11 @@ function SetupForPool(logger, poolOptions, setupFinished){
if (roundsToDelete.length > 0)
finalRedisCommands.push(['del'].concat(roundsToDelete));
if (confirmsToDelete.length > 0)
finalRedisCommands = finalRedisCommands.concat(confirmsToDelete);
if (totalPaid !== 0)
finalRedisCommands.push(['hincrbyfloat', coin + ':stats', 'totalPaid', balanceRound(totalPaid)]);
finalRedisCommands.push(['hincrbyfloat', coin + ':stats', 'totalPaid', totalPaid]);
if (finalRedisCommands.length === 0){
callback();
@ -1055,12 +1126,14 @@ function SetupForPool(logger, poolOptions, setupFinished){
startRedisTimer();
redisClient.multi(finalRedisCommands).exec(function(error, results){
endRedisTimer();
if (error){
if (error) {
clearInterval(paymentInterval);
logger.error(logSystem, logComponent,
'Payments sent but could not update redis. ' + JSON.stringify(error)
+ ' Disabling payment processing to prevent possible double-payouts. The redis commands in '
+ coin + '_finalRedisCommands.txt must be ran manually');
fs.writeFile(coin + '_finalRedisCommands.txt', JSON.stringify(finalRedisCommands), function(err){
logger.error('Could not write finalRedisCommands.txt, you are fucked.');
});