From 6860cd50cb58c684d091fb756e2b47570b0025ea Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 11 Mar 2014 19:56:19 -0600 Subject: [PATCH] Work on payment processing --- libs/paymentProcessor.js | 139 +++++++++++++++++++++++++++++++-------- libs/shareProcessor.js | 6 +- package.json | 3 +- 3 files changed, 117 insertions(+), 31 deletions(-) diff --git a/libs/paymentProcessor.js b/libs/paymentProcessor.js index 776edc1..7421af2 100644 --- a/libs/paymentProcessor.js +++ b/libs/paymentProcessor.js @@ -1,8 +1,11 @@ var redis = require('redis'); +var async = require('async'); var Stratum = require('stratum-pool'); + + module.exports = function(logger){ var poolConfigs = JSON.parse(process.env.pools); @@ -76,17 +79,91 @@ function SetupForPool(logger, poolOptions){ connectToRedis(); - var checkTx = function(tx, blockHeight){ - daemon.cmd('gettransaction', [tx], function(results){ - //console.dir(results[0].response.details[0].category); - var status = results[0].response.details[0].category; - var amount = results[0].response.details[0].amount; - if (status !== 'generate') return; - var f = 'shares_' + coin + ':round' + blockHeight; - console.log(f); - redisClient.hgetall('shares_' + coin + ':round' + blockHeight, function(error, results){ - if (error || !results) return; - console.log('okay ' + JSON.stringify(results)); + + + var processPayments = function(){ + async.waterfall([ + + /* Check redis for all pending block submissions, then pass along each object with: + { + transHash1: {height: blockHeight1}, + transHash2: {height: blockHeight2} + } + */ + function(callback){ + redisClient.smembers(coin + '_blocks', function(error, results){ + if (error){ + logger.error('redis', 'Could get blocks from redis ' + JSON.stringify(error)); + callback('done - redis error for getting blocks'); + return; + } + if (results.length === 0){ + callback('done - no pending blocks in redis'); + return; + } + + var txs = {}; + results.forEach(function(item){ + var details = item.split(':'); + var txHash = details[0]; + var height = details[1]; + txs[txHash] = {height: height}; + }); + callback(null, txs); + }); + }, + + /* Receives txs object with key, checks each key (the transHash) with block batch rpc call to daemon. + Each confirmed on get the amount added to transHash object as {amount: amount}, + Non confirmed txHashes get deleted from obj. Then remaining txHashes are passed along + */ + function(txs, callback){ + + var batchRPCcommand = []; + + for (var txHash in txs){ + batchRPCcommand.push(['gettranscation', [txHash]]); + } + + daemon.batchCmd(batchRPCcommand, function(error, txDetails){ + + txDetails.forEach(function (tx){ + var confirmedTxs = txDetails.filter(function(tx){ + var txDetails = tx.details[0]; + if (txDetails.categery === 'generate'){ + txs[txDetails.txid].amount = txDetails.amount; + } + else delete txs[txDetails.txid]; + + }); + if (Object.keys(txs).length === 0){ + callback('done - no confirmed transactions yet'); + return; + } + callback(null, txs); + }); + }); + }, + + /* Use height from each txHash to get worker shares from each round and pass along */ + function(txs, callback){ + + + var shareLooksup = []; + for (var hash in txs){ + var height = txs[hash].height; + shareLooksup.push(['hgetall', coin + '_shares:round' + height]); + } + + redisClient.multi(shareLooksup).exe(function(error, responses){ + if (error){ + callback('done - redis error with multi get rounds share') + return; + } + console.dir(response); + callback(response); + }); + //get balances_coin from redis for each address in this round //add up total balances @@ -94,28 +171,36 @@ function SetupForPool(logger, poolOptions){ //put left over balances in redis //clean up (move block entry to processedBlocks_coin) so this logic isn't called again - }); + }, + + /* Get worker existing balances from coin_balances hashset in redis*/ + function(confirmedTxs, callback){ + + /* Calculate if any payments are ready to be sent and trigger them sending + Get remaining balances for each address and pass it along as object of latest balances + such as {worker1: balance1, worker2, balance2} */ + + }, + + /* update remaining balances in coin_balance hashset in redis */ + function(updateBalances, callback){ + + }, + + //move this block enty to coin_processedBlocks so payments are not resent + function (none, callback){ + + } + + ], function(error, result){ + //log error completion }); }; setInterval(function(){ - redisClient.smembers('blocks_' + coin, function(error, results){ - if (error){ - logger.error('redis', 'Could get blocks from redis ' + JSON.stringify(error)); - return; - } - - results.forEach(function(item){ - var split = item.split(':'); - var tx = split[0]; - var blockHeight = split[1]; - checkTx(tx, blockHeight); - }); - - }); - + processPayments(); }, processingConfig.paymentInterval * 1000); diff --git a/libs/shareProcessor.js b/libs/shareProcessor.js index 1d51afd..369b9d1 100644 --- a/libs/shareProcessor.js +++ b/libs/shareProcessor.js @@ -52,16 +52,16 @@ module.exports = function(logger, poolConfig){ if (!isValidShare) return; - connection.hincrby(['shares_' + coin + ':roundCurrent', shareData.worker, shareData.difficulty], function(error, result){ + connection.hincrby([coin + '_shares:roundCurrent', shareData.worker, shareData.difficulty], function(error, result){ if (error) logger.error('redis', 'Could not store worker share') }); if (isValidBlock){ - connection.rename('shares_' + coin + ':roundCurrent', 'shares_' + coin + ':round' + shareData.height, function(result){ + connection.rename(coin + '_shares:roundCurrent', coin + '_shares:round' + shareData.height, function(result){ console.log('rename result: ' + result); }); - connection.sadd(['blocks_' + coin, shareData.tx + ':' + shareData.height], function(error, result){ + connection.sadd([coin + '_blocks', shareData.tx + ':' + shareData.height], function(error, result){ if (error) logger.error('redis', 'Could not store block data'); }); diff --git a/package.json b/package.json index 35cbc4c..e5610f1 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,8 @@ "node-json-minify": "*", "posix": "*", "redis": "*", - "mysql": "*" + "mysql": "*", + "async": "*" }, "engines": { "node": ">=0.10"