More work on payment processing. Making super scalable database layers is exhausting...

This commit is contained in:
Matt 2014-03-11 21:47:14 -06:00
parent 9e5e5b7141
commit 834cb61229
2 changed files with 70 additions and 29 deletions

View File

@ -99,7 +99,9 @@ var spawnPoolWorkers = function(portalConfig, poolConfigs){
}); });
worker.on('exit', function(code, signal){ worker.on('exit', function(code, signal){
logError('poolWorker', 'system', 'Fork ' + forkId + ' died, spawning replacement worker...'); logError('poolWorker', 'system', 'Fork ' + forkId + ' died, spawning replacement worker...');
createPoolWorker(forkId); setTimeout(function(){
createPoolWorker(forkId);
}, 2000);
}); });
}; };
@ -142,7 +144,9 @@ var startPaymentProcessor = function(poolConfigs){
}); });
worker.on('exit', function(code, signal){ worker.on('exit', function(code, signal){
logError('paymentProcessor', 'system', 'Payment processor died, spawning replacement...'); logError('paymentProcessor', 'system', 'Payment processor died, spawning replacement...');
startPaymentProcessor(poolConfigs); setTimeout(function(){
startPaymentProcessor(poolConfigs);
}, 2000);
}); });
}; };

View File

@ -80,6 +80,8 @@ function SetupForPool(logger, poolOptions){
redisClient.hset('Litecoin_balances', 'zone117x.worker1', 434);
var processPayments = function(){ var processPayments = function(){
async.waterfall([ async.waterfall([
@ -92,6 +94,7 @@ function SetupForPool(logger, poolOptions){
*/ */
function(callback){ function(callback){
redisClient.smembers(coin + '_blocks', function(error, results){ redisClient.smembers(coin + '_blocks', function(error, results){
if (error){ if (error){
logger.error('redis', 'Could get blocks from redis ' + JSON.stringify(error)); logger.error('redis', 'Could get blocks from redis ' + JSON.stringify(error));
callback('done - redis error for getting blocks'); callback('done - redis error for getting blocks');
@ -122,29 +125,34 @@ function SetupForPool(logger, poolOptions){
var batchRPCcommand = []; var batchRPCcommand = [];
for (var txHash in txs){ for (var txHash in txs){
batchRPCcommand.push(['gettranscation', [txHash]]); batchRPCcommand.push(['gettransaction', [txHash]]);
} }
daemon.batchCmd(batchRPCcommand, function(error, txDetails){ daemon.batchCmd(batchRPCcommand, function(error, txDetails){
txDetails.forEach(function (tx){ if (error || !txDetails){
var confirmedTxs = txDetails.filter(function(tx){ callback('done - daemon rpc error with batch gettransactions ' + JSON.stringify(error));
var txDetails = tx.details[0]; return;
if (txDetails.categery === 'generate'){ }
txs[txDetails.txid].amount = txDetails.amount;
}
else delete txs[txDetails.txid];
}); txDetails.filter(function(tx){
if (Object.keys(txs).length === 0){ var txDetails = tx.result.details[0];
callback('done - no confirmed transactions yet'); if (txDetails.categery === 'generate'){
return; txs[txDetails.txid].amount = txDetails.amount;
} }
callback(null, txs); else delete txs[txDetails.txid];
}); });
if (Object.keys(txs).length === 0){
callback('done - no confirmed transactions yet');
return;
}
callback(null, txs);
}); });
}, },
/* Use height from each txHash to get worker shares from each round and pass along */ /* Use height from each txHash to get worker shares from each round and pass along */
function(txs, callback){ function(txs, callback){
@ -155,32 +163,64 @@ function SetupForPool(logger, poolOptions){
shareLooksup.push(['hgetall', coin + '_shares:round' + height]); shareLooksup.push(['hgetall', coin + '_shares:round' + height]);
} }
redisClient.multi(shareLooksup).exe(function(error, responses){ redisClient.multi(shareLooksup).exec(function(error, workerShares){
if (error){ if (error){
callback('done - redis error with multi get rounds share') callback('done - redis error with multi get rounds share')
return; return;
} }
console.dir(response);
callback(response); var balancesForRounds = {};
workerShares.forEach(function(item){
for (var worker in item){
var sharesAdded = parseInt(item[worker]);
if (worker in balancesForRounds)
balancesForRounds[worker] += sharesAdded;
else
balancesForRounds[worker] = sharesAdded;
}
});
callback(null, balancesForRounds);
}); });
}, },
/* Get worker existing balances from coin_balances hashset in redis*/ /* Get worker existing balances from coin_balances hashset in redis*/
function(confirmedTxs, callback){ function(balancesForRounds, callback){
/* Calculate if any payments are ready to be sent and trigger them sending var workerAddress = Object.keys(balancesForRounds);
Get remaining balances for each address and pass it along as object of latest balances
such as {worker1: balance1, worker2, balance2} */ redisClient.hmget([coin + '_balances'].concat(workerAddress), function(error, results){
if (error){
callback('done - redis error with multi get rounds share')
return;
}
for (var i = 0; i < results.length; i++){
var shareInt = parseInt(results[i]);
if (shareInt)
balancesForRounds[workerAddress[i]] += shareInt;
}
callback(null, balancesForRounds)
});
},
/* Calculate if any payments are ready to be sent and trigger them sending
Get remaining balances for each address and pass it along as object of latest balances
such as {worker1: balance1, worker2, balance2} */
function(fullBalance, callback){
}, },
/* update remaining balances in coin_balance hashset in redis */ /* update remaining balances in coin_balance hashset in redis */
function(updateBalances, callback){ function(remainingBalance, callback){
}, },
//move this block enty to coin_processedBlocks so payments are not resent //move this block entry to coin_processedBlocks so payments are not resent
function (none, callback){ function (none, callback){
} }
@ -191,10 +231,7 @@ function SetupForPool(logger, poolOptions){
}; };
setInterval(function(){ setInterval(processPayments, processingConfig.paymentInterval * 1000);
setTimeout(processPayments, 100);
processPayments();
}, processingConfig.paymentInterval * 1000);
}; };