From ab648b343aca9feaf5e88218f5b8e8c0f4792e1c Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Fri, 18 Oct 2019 17:02:47 +0300 Subject: [PATCH] Added tracking of current validators in watcher, removed stub event in exchange queue --- src/oracle/docker-compose-test.yml | 1 + src/oracle/docker-compose.yml | 1 + src/oracle/ethWatcher/ethWatcher.js | 54 ++++++++++++++++++----------- src/oracle/tss-sign/signer.js | 14 +++----- 4 files changed, 41 insertions(+), 29 deletions(-) diff --git a/src/oracle/docker-compose-test.yml b/src/oracle/docker-compose-test.yml index 77f7614..afa2c00 100644 --- a/src/oracle/docker-compose-test.yml +++ b/src/oracle/docker-compose-test.yml @@ -89,6 +89,7 @@ services: - HOME_CHAIN_ID - HOME_START_BLOCK - BLOCKS_RANGE_SIZE + - VALIDATOR_PRIVATE_KEY - 'RABBITMQ_URL=amqp://rabbitmq:5672' - LOG_LEVEL volumes: diff --git a/src/oracle/docker-compose.yml b/src/oracle/docker-compose.yml index 5461efe..a79b816 100644 --- a/src/oracle/docker-compose.yml +++ b/src/oracle/docker-compose.yml @@ -98,6 +98,7 @@ services: - HOME_TOKEN_ADDRESS - HOME_CHAIN_ID - HOME_START_BLOCK + - VALIDATOR_PRIVATE_KEY - 'RABBITMQ_URL=amqp://rabbitmq:5672' - LOG_LEVEL volumes: diff --git a/src/oracle/ethWatcher/ethWatcher.js b/src/oracle/ethWatcher/ethWatcher.js index af5b8a7..cc4bb4f 100644 --- a/src/oracle/ethWatcher/ethWatcher.js +++ b/src/oracle/ethWatcher/ethWatcher.js @@ -10,10 +10,11 @@ const { publicKeyToAddress } = require('./crypto') const abiBridge = require('./contracts_data/Bridge.json').abi -const { HOME_RPC_URL, HOME_BRIDGE_ADDRESS, RABBITMQ_URL, HOME_START_BLOCK } = process.env +const { HOME_RPC_URL, HOME_BRIDGE_ADDRESS, RABBITMQ_URL, HOME_START_BLOCK, VALIDATOR_PRIVATE_KEY } = process.env -const web3Home = new Web3(HOME_RPC_URL) -const bridge = new web3Home.eth.Contract(abiBridge, HOME_BRIDGE_ADDRESS) +const homeWeb3 = new Web3(HOME_RPC_URL) +const bridge = new homeWeb3.eth.Contract(abiBridge, HOME_BRIDGE_ADDRESS) +const validatorAddress = homeWeb3.eth.accounts.privateKeyToAccount(`0x${VALIDATOR_PRIVATE_KEY}`).address let channel let exchangeQueue @@ -27,6 +28,7 @@ let epochStart let redisTx let rangeSize let lastTransactionBlockNumber +let isCurrentValidator async function resetFutureMessages (queue) { logger.debug(`Resetting future messages in queue ${queue.name}`) @@ -78,7 +80,6 @@ async function initialize () { logger.info(`Current epoch ${epoch}`) epochStart = events.length ? events[events.length - 1].blockNumber : 1 const saved = (parseInt(await redis.get('homeBlock')) + 1) || parseInt(HOME_START_BLOCK) - logger.debug(epochStart, saved) if (epochStart > saved) { logger.info(`Data in db is outdated, starting from epoch ${epoch}, block #${epochStart}`) blockNumber = epochStart @@ -93,6 +94,12 @@ async function initialize () { blockNumber = saved foreignNonce[epoch] = parseInt(await redis.get(`foreignNonce${epoch}`)) || 0 } + isCurrentValidator = (await bridge.methods.getValidators().call()).includes(validatorAddress) + if (isCurrentValidator) { + logger.info(`${validatorAddress} is a current validator`) + } else { + logger.info(`${validatorAddress} is not a current validator`) + } await resetFutureMessages(keygenQueue) await resetFutureMessages(cancelKeygenQueue) @@ -105,7 +112,7 @@ async function initialize () { async function main () { logger.debug(`Watching events in block #${blockNumber}`) - if (await web3Home.eth.getBlock(blockNumber) === null) { + if (await homeWeb3.eth.getBlock(blockNumber) === null) { logger.debug('No block') await new Promise(r => setTimeout(r, 1000)) return @@ -127,18 +134,13 @@ async function main () { sendKeygenCancellation(event) break case 'NewFundsTransfer': - await sendSignFundsTransfer(event) + isCurrentValidator && await sendSignFundsTransfer(event) break case 'ExchangeRequest': - await sendSign(event) + isCurrentValidator && await sendSign(event) break case 'EpochStart': - epoch = event.returnValues.epoch.toNumber() - epochStart = blockNumber - logger.info(`Epoch ${epoch} started`) - rangeSize = (await bridge.methods.getRangeSize().call()).toNumber() - logger.info(`Updated range size to ${rangeSize}`) - foreignNonce[epoch] = 0 + await processEpochStart(event) break } } @@ -201,7 +203,7 @@ async function sendSignFundsTransfer (event) { } async function sendSign (event) { - const tx = await web3Home.eth.getTransaction(event.transactionHash) + const tx = await homeWeb3.eth.getTransaction(event.transactionHash) const msg = utils.serializeTransaction({ nonce: tx.nonce, gasPrice: `0x${new BN(tx.gasPrice).toString(16)}`, @@ -209,9 +211,9 @@ async function sendSign (event) { to: tx.to, value: `0x${new BN(tx.value).toString(16)}`, data: tx.input, - chainId: await web3Home.eth.net.getId() + chainId: await homeWeb3.eth.net.getId() }) - const hash = web3Home.utils.sha3(msg) + const hash = homeWeb3.utils.sha3(msg) const publicKey = utils.recoverPublicKey(hash, { r: tx.r, s: tx.s, v: tx.v }) const msgToQueue = { epoch, @@ -229,14 +231,11 @@ async function sendSign (event) { lastTransactionBlockNumber = blockNumber redisTx.set('lastTransactionBlockNumber', blockNumber) - logger.debug('Set lastTransactionBlockNumber to %d', blockNumber) + logger.debug(`Set lastTransactionBlockNumber to ${blockNumber}`) } async function sendStartSign () { redisTx.incr(`foreignNonce${epoch}`) - exchangeQueue.send({ - stub: true - }) signQueue.send({ epoch, blockNumber, @@ -245,3 +244,18 @@ async function sendStartSign () { parties: (await bridge.methods.getParties(epoch).call()).toNumber() }) } + +async function processEpochStart (event) { + epoch = event.returnValues.epoch.toNumber() + epochStart = blockNumber + logger.info(`Epoch ${epoch} started`) + rangeSize = (await bridge.methods.getRangeSize().call()).toNumber() + isCurrentValidator = (await bridge.methods.getValidators().call()).includes(validatorAddress) + if (isCurrentValidator) { + logger.info(`${validatorAddress} is a current validator`) + } else { + logger.info(`${validatorAddress} is not a current validator`) + } + logger.info(`Updated range size to ${rangeSize}`) + foreignNonce[epoch] = 0 +} diff --git a/src/oracle/tss-sign/signer.js b/src/oracle/tss-sign/signer.js index 186dc82..5077e6a 100644 --- a/src/oracle/tss-sign/signer.js +++ b/src/oracle/tss-sign/signer.js @@ -62,11 +62,8 @@ async function main () { attempt = 1 if (!newEpoch) { - const exchanges = await getExchangeMessages() + const exchanges = await getExchangeMessages(nonce) const exchangesData = exchanges.map(msg => JSON.parse(msg.content)) - if (exchangesData.some(ex => ex.nonce !== nonce)) { - logger.warn('Nonce is different, should not reach this point') - } if (exchanges.length > 0 && account.sequence <= nonce) { const recipients = exchangesData.map(({ value, recipient }) => ({ to: recipient, tokens: value })) @@ -139,24 +136,23 @@ async function main () { main() -async function getExchangeMessages () { +async function getExchangeMessages (nonce) { logger.debug('Getting exchange messages') const messages = [] do { const msg = await exchangeQueue.get() if (msg === false) { - logger.warn('Reached the end of exchange queue, should not reach this point') break } const data = JSON.parse(msg.content) logger.debug('Got message %o', data) - if (data.stub) { - channel.ack(msg) + if (data.nonce !== nonce) { + channel.nack(msg, false, true) break } messages.push(msg) } while (true) - logger.debug('Found %d messages', messages.length) + logger.debug(`Found ${messages.length} messages`) return messages }