Update paymentProcessor.js

Update recommended minimums.
Cache market stats every 5 minutes, do not use payment interval.
Changes to confirmation tracking and payments waterfall.
Add immature balance calculations and tracking.
This commit is contained in:
hellcatz 2017-09-02 10:58:16 -07:00 committed by GitHub
parent eb5caf93fa
commit d8a9ec59a4
1 changed files with 265 additions and 169 deletions

View File

@ -54,14 +54,14 @@ function SetupForPool(logger, poolOptions, setupFinished){
// zcash team recommends 10 confirmations for safety from orphaned blocks
var minConfShield = Math.max((processingConfig.minConf || 10), 1); // Don't allow 0 conf transactions.
var minConfPayout = Math.max((processingConfig.minConf || 10), 1);
if (minConfPayout < 10) {
logger.warning(logSystem, logComponent, logComponent + 'minConf of 10 is recommended to reduce chances of payments being orphaned.');
if (minConfPayout < 3) {
logger.warning(logSystem, logComponent, logComponent + ' minConf of 3 is recommended.');
}
// minimum paymentInterval of 60 seconds
var paymentIntervalSecs = Math.max((processingConfig.paymentInterval || 180), 60);
if (parseInt(processingConfig.paymentInterval) < 180) {
logger.warning(logSystem, logComponent, 'paymentInterval of 180 seconds recommended to reduce the RPC work queue.');
var paymentIntervalSecs = Math.max((processingConfig.paymentInterval || 120), 30);
if (parseInt(processingConfig.paymentInterval) < 120) {
logger.warning(logSystem, logComponent, ' minimum paymentInterval of 120 seconds recommended.');
}
var maxBlocksPerPayment = Math.max(processingConfig.maxBlocksPerPayment || 3, 1);
@ -84,9 +84,11 @@ function SetupForPool(logger, poolOptions, setupFinished){
logger[severity](logSystem, logComponent, message);
});
var redisClient = redis.createClient(poolOptions.redis.port, poolOptions.redis.host);
// redis auth if enabled
// redis auth if enabled
if (poolOptions.redis.password) {
redisClient.auth(poolOptions.redis.password);
}
var magnitude;
var minPaymentSatoshis;
var coinPrecision;
@ -425,16 +427,21 @@ function SetupForPool(logger, poolOptions, setupFinished){
}, shielding_interval);
}
// stats caching every 58 seconds
// network stats caching every 58 seconds
var stats_interval = 58 * 1000;
var statsInterval = setInterval(function() {
// update network stats using coin daemon
cacheNetworkStats();
// update market stats using coinmarketcap
if (getMarketStats === true) {
cacheMarketStats();
}
}, stats_interval);
// market stats caching every 5 minutes
if (getMarketStats === true) {
var market_stats_interval = 300 * 1000;
var marketStatsInterval = setInterval(function() {
// update market stats using coinmarketcap
cacheMarketStats();
}, market_stats_interval);
}
// check operation statuses every 57 seconds
var opid_interval = 57 * 1000;
@ -454,6 +461,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
logger.warning(logSystem, logComponent, 'Clearing operation ids due to empty result set.');
}
}
// loop through op-ids checking their status
ops.forEach(function(op, i){
// check operation id status
if (op.status == "success" || op.status == "failed") {
@ -606,6 +614,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
txHash: details[1],
height: details[2],
minedby: details[3],
time: details[4],
duplicate: false,
serialized: r
};
@ -696,7 +705,6 @@ function SetupForPool(logger, poolOptions, setupFinished){
Step 2 - check if mined block coinbase tx are ready for payment
* adds block reward to rounds object
* adds block confirmations count to rounds object
* updates confirmation counts in redis
*/
function(workers, rounds, callback){
// get pending block tx details
@ -714,8 +722,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
callback(true);
return;
}
var confirmsUpdate = [];
var addressAccount = "";
// check for transaction errors and generated coins
@ -727,6 +734,8 @@ function SetupForPool(logger, poolOptions, setupFinished){
return;
}
var round = rounds[i];
// update confirmations for round
round.confirmations = parseInt((tx.result.confirmations || 0));
// look for transaction errors
if (tx.error && tx.error.code === -5){
logger.warning(logSystem, logComponent, 'Daemon reports invalid transaction: ' + round.txHash);
@ -755,13 +764,10 @@ function SetupForPool(logger, poolOptions, setupFinished){
}
// get transaction category for round
round.category = generationTx.category;
round.confirmations = parseInt((tx.result.confirmations || 0));
// get reward for newly generated blocks
if (round.category === 'generate') {
if (round.category === 'generate' || round.category === 'immature') {
round.reward = coinsRound(parseFloat(generationTx.amount || generationTx.value));
}
// update confirmations in redis
confirmsUpdate.push(['hset', coin + ':blocksPendingConfirms', round.blockHash, round.confirmations]);
});
var canDeleteShares = function(r){
@ -788,62 +794,25 @@ function SetupForPool(logger, poolOptions, setupFinished){
case 'generate':
payingBlocks++;
return (payingBlocks <= maxBlocksPerPayment);
case 'immature':
return true;
default:
return false;
}
});
// TODO: make tx fees dynamic
var feeSatoshi = coinsToSatoshies(fee);
// calculate what the pool owes its miners
var totalOwed = parseInt(0);
for (var i = 0; i < rounds.length; i++) {
// only pay generated blocks, not orphaned or kicked
if (rounds[i].category == 'generate') {
totalOwed = totalOwed + coinsToSatoshies(rounds[i].reward) - feeSatoshi;
}
}
var notAddr = null;
if (requireShielding === true) {
notAddr = poolOptions.address;
}
// update confirmations for pending blocks in redis
if (confirmsUpdate.length > 0) {
startRedisTimer();
redisClient.multi(confirmsUpdate).exec(function(error, result){
endRedisTimer();
if (error) {
logger.error(logSystem, logComponent, 'Error could not update confirmations for pending blocks in redis ' + JSON.stringify(error));
return callback(true);
}
// check if we have enough tAddress funds to begin payment processing
listUnspent(null, notAddr, minConfPayout, false, function (error, tBalance){
if (error) {
logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments.');
return callback(true);
} else if (tBalance < totalOwed) {
logger.error(logSystem, logComponent, 'Insufficient funds ('+satoshisToCoins(tBalance) + ') to process payments (' + satoshisToCoins(totalOwed)+') for ' + payingBlocks + ' blocks; possibly waiting for txs.');
return callback(true);
}
// account feature not implemented at this time
addressAccount = "";
// begin payments for generated coins
callback(null, workers, rounds, addressAccount);
});
});
} else {
// no pending blocks, need to find a block!
return callback(true);
}
// continue to next step in waterfall
callback(null, workers, rounds, addressAccount);
})
},
/*
Step 3 - lookup shares in redis and calculate rewards
Step 3 - lookup shares and calculate rewards
* pull pplnt times from redis
* pull shares from redis
* calculate rewards
* pplnt share reductions if needed
*/
function(workers, rounds, addressAccount, callback){
// pplnt times lookup
@ -857,6 +826,7 @@ function SetupForPool(logger, poolOptions, setupFinished){
callback('Check finished - redis error with multi get rounds time');
return;
}
// shares lookup
var shareLookups = rounds.map(function(r){
return ['hgetall', coin + ':shares:round' + r.height];
});
@ -870,107 +840,215 @@ function SetupForPool(logger, poolOptions, setupFinished){
// error detection
var err = null;
var performPayment = false;
// total shares
rounds.forEach(function(round, i){
var workerShares = allWorkerShares[i];
if (!workerShares){
err = true;
logger.error(logSystem, logComponent, 'No worker shares for round: ' + round.height + ' blockHash: ' + round.blockHash);
return;
}
var workerTimes = allWorkerTimes[i];
switch (round.category){
case 'kicked':
case 'orphan':
round.workerShares = workerShares;
break;
case 'generate':
// TODO: make tx fees dynamic
var feeSatoshi = coinsToSatoshies(fee);
var reward = coinsToSatoshies(round.reward) - feeSatoshi;
var totalShares = parseFloat(0);
var sharesLost = parseFloat(0);
// find most time spent in this round by single worker
maxTime = 0;
for (var workerAddress in workerTimes){
if (maxTime < parseFloat(workerTimes[workerAddress]))
maxTime = parseFloat(workerTimes[workerAddress]);
}
// total up shares for round
for (var workerAddress in workerShares){
var worker = workers[workerAddress] = (workers[workerAddress] || {});
var shares = parseFloat((workerShares[workerAddress] || 0));
// if pplnt mode
if (pplntEnabled === true && maxTime > 0) {
var tshares = shares;
var lost = parseFloat(0);
var address = workerAddress.split('.')[0];
if (workerTimes[address] != null && parseFloat(workerTimes[address]) > 0) {
var timePeriod = roundTo(parseFloat(workerTimes[address] || 1) / maxTime , 2);
if (timePeriod > 0 && timePeriod < pplntTimeQualify) {
var lost = shares - (shares * timePeriod);
sharesLost += lost;
shares = Math.max(shares - lost, 0);
logger.warning(logSystem, logComponent, 'PPLNT: Reduced shares for '+workerAddress+' round:' + round.height + ' maxTime:'+maxTime+'sec timePeriod:'+roundTo(timePeriod,6)+' shares:'+tshares+' lost:'+lost+' new:'+shares);
}
if (timePeriod > 1.0) {
err = true;
logger.error(logSystem, logComponent, 'Time share period is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash);
return;
}
worker.timePeriod = timePeriod;
} else {
logger.warning(logSystem, logComponent, 'PPLNT: Missing time share period for '+workerAddress+', miner shares qualified in round ' + round.height);
}
}
worker.roundShares = shares;
worker.totalShares = parseFloat(worker.totalShares || 0) + shares;
totalShares += shares;
}
//console.log('--REWARD DEBUG--------------');
// calculate rewards for round
var totalAmount = 0;
for (var workerAddress in workerShares){
var worker = workers[workerAddress] = (workers[workerAddress] || {});
var percent = parseFloat(worker.roundShares) / totalShares;
if (percent > 1.0) {
err = true;
logger.error(logSystem, logComponent, 'Share percent is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash);
return;
}
// calculate workers reward for this round
var workerRewardTotal = Math.round(reward * percent);
// add to total reward for worker
worker.reward = (worker.reward || 0) + workerRewardTotal;
// add to total amount sent to all workers
totalAmount += worker.reward;
//console.log('rewardAmount: '+workerAddress+' '+workerRewardTotal);
//console.log('totalAmount: '+workerAddress+' '+worker.reward);
}
//console.log('totalAmount: '+totalAmount);
//console.log('blockHeight: '+round.height);
//console.log('blockReward: '+reward);
//console.log('totalShares: '+totalShares);
//console.log('sharesLost: '+sharesLost);
//console.log('----------------------------');
break;
}
});
// if there was no errors
if (err === null) {
// continue payments
callback(null, workers, rounds, addressAccount);
} else {
// stop waterfall flow, do not process payments
callback(true);
var notAddr = null;
if (requireShielding === true) {
notAddr = poolOptions.address;
}
});
});
// calculate what the pool owes its miners
var feeSatoshi = coinsToSatoshies(fee);
var totalOwed = parseInt(0);
for (var i = 0; i < rounds.length; i++) {
// only pay generated blocks, not orphaned, kicked, immature
if (rounds[i].category == 'generate') {
totalOwed = totalOwed + coinsToSatoshies(rounds[i].reward) - feeSatoshi;
}
}
// check if we have enough tAddress funds to begin payment processing
listUnspent(null, notAddr, minConfPayout, false, function (error, tBalance){
if (error) {
logger.error(logSystem, logComponent, 'Error checking pool balance before processing payments.');
return callback(true);
} else if (tBalance < totalOwed) {
logger.error(logSystem, logComponent, 'Insufficient funds ('+satoshisToCoins(tBalance) + ') to process payments (' + satoshisToCoins(totalOwed)+'); possibly waiting for txs.');
performPayment = false;
} else if (tBalance > totalOwed) {
performPayment = true;
}
// calculate rewards for each worker
rounds.forEach(function(round, i){
var workerShares = allWorkerShares[i];
if (!workerShares){
err = true;
logger.error(logSystem, logComponent, 'No worker shares for round: ' + round.height + ' blockHash: ' + round.blockHash);
return;
}
var workerTimes = allWorkerTimes[i];
switch (round.category){
case 'kicked':
case 'orphan':
if (!performPayment)
break;
round.workerShares = workerShares;
break;
/* calculate immature balances */
case 'immature':
var feeSatoshi = coinsToSatoshies(fee);
var immature = coinsToSatoshies(round.reward);
var totalShares = parseFloat(0);
var sharesLost = parseFloat(0);
// adjust block immature .. tx fees
immature = Math.round(immature - feeSatoshi);
// find most time spent in this round by single worker
maxTime = 0;
for (var workerAddress in workerTimes){
if (maxTime < parseFloat(workerTimes[workerAddress]))
maxTime = parseFloat(workerTimes[workerAddress]);
}
// total up shares for round
for (var workerAddress in workerShares){
var worker = workers[workerAddress] = (workers[workerAddress] || {});
var shares = parseFloat((workerShares[workerAddress] || 0));
// if pplnt mode
if (pplntEnabled === true && maxTime > 0) {
var tshares = shares;
var lost = parseFloat(0);
var address = workerAddress.split('.')[0];
if (workerTimes[address] != null && parseFloat(workerTimes[address]) > 0) {
var timePeriod = roundTo(parseFloat(workerTimes[address] || 1) / maxTime , 2);
if (timePeriod > 0 && timePeriod < pplntTimeQualify) {
var lost = shares - (shares * timePeriod);
sharesLost += lost;
shares = Math.max(shares - lost, 0);
}
if (timePeriod > 1.0) {
err = true;
logger.error(logSystem, logComponent, 'Time share period is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash);
return;
}
}
}
worker.roundShares = shares;
totalShares += shares;
}
//console.log('--IMMATURE DEBUG--------------');
//console.log('blockHeight: '+round.height);
//console.log('blockReward: ' + Math.round(immature));
// calculate rewards for round
var totalAmount = 0;
for (var workerAddress in workerShares){
var worker = workers[workerAddress] = (workers[workerAddress] || {});
var percent = parseFloat(worker.roundShares) / totalShares;
if (percent > 1.0) {
err = true;
logger.error(logSystem, logComponent, 'Share percent is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash);
return;
}
// calculate workers immature for this round
var workerImmatureTotal = Math.round(immature * percent);
worker.immature = (worker.immature || 0) + workerImmatureTotal;
totalAmount += workerImmatureTotal;
//console.log('immatureTotalAmount: '+workerAddress+' '+workerImmatureTotal);
//console.log('immatureTotal: '+workerAddress+' '+worker.immature);
}
//console.log('totalAmount: '+totalAmount);
//console.log('totalShares: '+totalShares);
//console.log('sharesLost: '+sharesLost);
//console.log('----------------------------');
break;
/* calculate reward balances */
case 'generate':
var feeSatoshi = coinsToSatoshies(fee);
var reward = coinsToSatoshies(round.reward);
var totalShares = parseFloat(0);
var sharesLost = parseFloat(0);
// adjust block reward .. tx fees
reward = Math.round(reward - feeSatoshi);
// find most time spent in this round by single worker
maxTime = 0;
for (var workerAddress in workerTimes){
if (maxTime < parseFloat(workerTimes[workerAddress]))
maxTime = parseFloat(workerTimes[workerAddress]);
}
// total up shares for round
for (var workerAddress in workerShares){
var worker = workers[workerAddress] = (workers[workerAddress] || {});
var shares = parseFloat((workerShares[workerAddress] || 0));
// if pplnt mode
if (pplntEnabled === true && maxTime > 0) {
var tshares = shares;
var lost = parseFloat(0);
var address = workerAddress.split('.')[0];
if (workerTimes[address] != null && parseFloat(workerTimes[address]) > 0) {
var timePeriod = roundTo(parseFloat(workerTimes[address] || 1) / maxTime , 2);
if (timePeriod > 0 && timePeriod < pplntTimeQualify) {
var lost = shares - (shares * timePeriod);
sharesLost += lost;
shares = Math.max(shares - lost, 0);
logger.warning(logSystem, logComponent, 'PPLNT: Reduced shares for '+workerAddress+' round:' + round.height + ' maxTime:'+maxTime+'sec timePeriod:'+roundTo(timePeriod,6)+' shares:'+tshares+' lost:'+lost+' new:'+shares);
}
if (timePeriod > 1.0) {
err = true;
logger.error(logSystem, logComponent, 'Time share period is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash);
return;
}
worker.timePeriod = timePeriod;
} else {
logger.warning(logSystem, logComponent, 'PPLNT: Missing time share period for '+workerAddress+', miner shares qualified in round ' + round.height);
}
}
worker.roundShares = shares;
worker.totalShares = parseFloat(worker.totalShares || 0) + shares;
totalShares += shares;
}
//console.log('--REWARD DEBUG--------------');
//console.log('blockHeight: '+round.height);
//console.log('blockReward: ' + Math.round(reward));
// calculate rewards for round
var totalAmount = 0;
for (var workerAddress in workerShares){
var worker = workers[workerAddress] = (workers[workerAddress] || {});
var percent = parseFloat(worker.roundShares) / totalShares;
if (percent > 1.0) {
err = true;
logger.error(logSystem, logComponent, 'Share percent is greater than 1.0 for '+workerAddress+' round:' + round.height + ' blockHash:' + round.blockHash);
return;
}
// calculate workers reward for this round
var workerRewardTotal = Math.round(reward * percent);
worker.reward = (worker.reward || 0) + workerRewardTotal;
totalAmount += workerRewardTotal;
//console.log('rewardAmount: '+workerAddress+' '+workerRewardTotal);
//console.log('rewardTotal: '+workerAddress+' '+worker.reward);
}
//console.log('totalAmount: '+totalAmount);
//console.log('totalShares: '+totalShares);
//console.log('sharesLost: '+sharesLost);
//console.log('----------------------------');
break;
}
});
// if there was no errors
if (err === null && performPayment) {
// continue payments
callback(null, workers, rounds, addressAccount);
} else {
// stop waterfall flow, do not process payments
callback(true);
}
}); // end funds check
});// end share lookup
}); // end time lookup
},
@ -1022,7 +1100,6 @@ function SetupForPool(logger, poolOptions, setupFinished){
// send funds
worker.sent = satoshisToCoins(toSendSatoshis);
worker.balanceChange = Math.min(worker.balance, toSendSatoshis) * -1;
// multiple workers may have same address, add them up
if (addressAmounts[address] != null && addressAmounts[address] > 0) {
addressAmounts[address] = coinsRound(addressAmounts[address] + worker.sent);
} else {
@ -1062,12 +1139,14 @@ function SetupForPool(logger, poolOptions, setupFinished){
for (var a in addressAmounts) {
addressAmounts[a] = coinsRound(addressAmounts[a]);
}
// POINT OF NO RETURN! GOOD LUCK!
// WE ARE SENDING PAYMENT CMD TO DAEMON
// perform the sendmany operation .. addressAccount
var rpccallTracking = 'sendmany "" '+JSON.stringify(addressAmounts);
//console.log(rpccallTracking);
daemon.cmd('sendmany', ["", addressAmounts], function (result) {
// check for failed payments, there are many reasons
if (result.error && result.error.code === -6) {
@ -1179,14 +1258,15 @@ function SetupForPool(logger, poolOptions, setupFinished){
function(workers, rounds, callback){
var totalPaid = parseFloat(0);
var balanceUpdateCommands = [];
var immatureUpdateCommands = [];
var workerPayoutsCommand = [];
// update worker paid/balance stats
for (var w in workers) {
var worker = workers[w];
if (worker.balanceChange !== 0){
if (worker.balanceChange && worker.balanceChange !== 0){
balanceUpdateCommands.push([
'hincrbyfloat',
coin + ':balances',
@ -1194,15 +1274,22 @@ function SetupForPool(logger, poolOptions, setupFinished){
satoshisToCoins(worker.balanceChange)
]);
}
if (worker.sent !== 0){
if (worker.sent && worker.sent > 0){
workerPayoutsCommand.push(['hincrbyfloat', coin + ':payouts', w, coinsRound(worker.sent)]);
totalPaid = coinsRound(totalPaid + worker.sent);
}
if (worker.immature && worker.immature > 0) {
immatureUpdateCommands.push(['hset', coin + ':immature', w, worker.immature]);
} else {
immatureUpdateCommands.push(['hset', coin + ':immature', w, 0]);
}
}
var movePendingCommands = [];
var roundsToDelete = [];
var orphanMergeCommands = [];
var confirmsUpdate = [];
var confirmsToDelete = [];
var moveSharesToCurrent = function(r){
@ -1216,6 +1303,9 @@ function SetupForPool(logger, poolOptions, setupFinished){
// handle the round
rounds.forEach(function(r){
if (r.confirmations) {
confirmsUpdate.push(['hset', coin + ':blocksPendingConfirms', r.blockHash, r.confirmations]);
}
switch(r.category){
case 'kicked':
confirmsToDelete.push(['hdel', coin + ':blocksPendingConfirms', r.blockHash]);
@ -1246,6 +1336,9 @@ function SetupForPool(logger, poolOptions, setupFinished){
if (orphanMergeCommands.length > 0)
finalRedisCommands = finalRedisCommands.concat(orphanMergeCommands);
if (immatureUpdateCommands.length > 0)
finalRedisCommands = finalRedisCommands.concat(immatureUpdateCommands);
if (balanceUpdateCommands.length > 0)
finalRedisCommands = finalRedisCommands.concat(balanceUpdateCommands);
@ -1255,6 +1348,9 @@ function SetupForPool(logger, poolOptions, setupFinished){
if (roundsToDelete.length > 0)
finalRedisCommands.push(['del'].concat(roundsToDelete));
if (confirmsUpdate.length > 0)
finalRedisCommands = finalRedisCommands.concat(confirmsUpdate);
if (confirmsToDelete.length > 0)
finalRedisCommands = finalRedisCommands.concat(confirmsToDelete);