Added tracking of current validators in watcher, removed stub event in exchange queue

This commit is contained in:
Kirill Fedoseev 2019-10-18 17:02:47 +03:00
parent 5b18ee4179
commit ab648b343a
4 changed files with 41 additions and 29 deletions

View File

@ -89,6 +89,7 @@ services:
- HOME_CHAIN_ID
- HOME_START_BLOCK
- BLOCKS_RANGE_SIZE
- VALIDATOR_PRIVATE_KEY
- 'RABBITMQ_URL=amqp://rabbitmq:5672'
- LOG_LEVEL
volumes:

View File

@ -98,6 +98,7 @@ services:
- HOME_TOKEN_ADDRESS
- HOME_CHAIN_ID
- HOME_START_BLOCK
- VALIDATOR_PRIVATE_KEY
- 'RABBITMQ_URL=amqp://rabbitmq:5672'
- LOG_LEVEL
volumes:

View File

@ -10,10 +10,11 @@ const { publicKeyToAddress } = require('./crypto')
const abiBridge = require('./contracts_data/Bridge.json').abi
const { HOME_RPC_URL, HOME_BRIDGE_ADDRESS, RABBITMQ_URL, HOME_START_BLOCK } = process.env
const { HOME_RPC_URL, HOME_BRIDGE_ADDRESS, RABBITMQ_URL, HOME_START_BLOCK, VALIDATOR_PRIVATE_KEY } = process.env
const web3Home = new Web3(HOME_RPC_URL)
const bridge = new web3Home.eth.Contract(abiBridge, HOME_BRIDGE_ADDRESS)
const homeWeb3 = new Web3(HOME_RPC_URL)
const bridge = new homeWeb3.eth.Contract(abiBridge, HOME_BRIDGE_ADDRESS)
const validatorAddress = homeWeb3.eth.accounts.privateKeyToAccount(`0x${VALIDATOR_PRIVATE_KEY}`).address
let channel
let exchangeQueue
@ -27,6 +28,7 @@ let epochStart
let redisTx
let rangeSize
let lastTransactionBlockNumber
let isCurrentValidator
async function resetFutureMessages (queue) {
logger.debug(`Resetting future messages in queue ${queue.name}`)
@ -78,7 +80,6 @@ async function initialize () {
logger.info(`Current epoch ${epoch}`)
epochStart = events.length ? events[events.length - 1].blockNumber : 1
const saved = (parseInt(await redis.get('homeBlock')) + 1) || parseInt(HOME_START_BLOCK)
logger.debug(epochStart, saved)
if (epochStart > saved) {
logger.info(`Data in db is outdated, starting from epoch ${epoch}, block #${epochStart}`)
blockNumber = epochStart
@ -93,6 +94,12 @@ async function initialize () {
blockNumber = saved
foreignNonce[epoch] = parseInt(await redis.get(`foreignNonce${epoch}`)) || 0
}
isCurrentValidator = (await bridge.methods.getValidators().call()).includes(validatorAddress)
if (isCurrentValidator) {
logger.info(`${validatorAddress} is a current validator`)
} else {
logger.info(`${validatorAddress} is not a current validator`)
}
await resetFutureMessages(keygenQueue)
await resetFutureMessages(cancelKeygenQueue)
@ -105,7 +112,7 @@ async function initialize () {
async function main () {
logger.debug(`Watching events in block #${blockNumber}`)
if (await web3Home.eth.getBlock(blockNumber) === null) {
if (await homeWeb3.eth.getBlock(blockNumber) === null) {
logger.debug('No block')
await new Promise(r => setTimeout(r, 1000))
return
@ -127,18 +134,13 @@ async function main () {
sendKeygenCancellation(event)
break
case 'NewFundsTransfer':
await sendSignFundsTransfer(event)
isCurrentValidator && await sendSignFundsTransfer(event)
break
case 'ExchangeRequest':
await sendSign(event)
isCurrentValidator && await sendSign(event)
break
case 'EpochStart':
epoch = event.returnValues.epoch.toNumber()
epochStart = blockNumber
logger.info(`Epoch ${epoch} started`)
rangeSize = (await bridge.methods.getRangeSize().call()).toNumber()
logger.info(`Updated range size to ${rangeSize}`)
foreignNonce[epoch] = 0
await processEpochStart(event)
break
}
}
@ -201,7 +203,7 @@ async function sendSignFundsTransfer (event) {
}
async function sendSign (event) {
const tx = await web3Home.eth.getTransaction(event.transactionHash)
const tx = await homeWeb3.eth.getTransaction(event.transactionHash)
const msg = utils.serializeTransaction({
nonce: tx.nonce,
gasPrice: `0x${new BN(tx.gasPrice).toString(16)}`,
@ -209,9 +211,9 @@ async function sendSign (event) {
to: tx.to,
value: `0x${new BN(tx.value).toString(16)}`,
data: tx.input,
chainId: await web3Home.eth.net.getId()
chainId: await homeWeb3.eth.net.getId()
})
const hash = web3Home.utils.sha3(msg)
const hash = homeWeb3.utils.sha3(msg)
const publicKey = utils.recoverPublicKey(hash, { r: tx.r, s: tx.s, v: tx.v })
const msgToQueue = {
epoch,
@ -229,14 +231,11 @@ async function sendSign (event) {
lastTransactionBlockNumber = blockNumber
redisTx.set('lastTransactionBlockNumber', blockNumber)
logger.debug('Set lastTransactionBlockNumber to %d', blockNumber)
logger.debug(`Set lastTransactionBlockNumber to ${blockNumber}`)
}
async function sendStartSign () {
redisTx.incr(`foreignNonce${epoch}`)
exchangeQueue.send({
stub: true
})
signQueue.send({
epoch,
blockNumber,
@ -245,3 +244,18 @@ async function sendStartSign () {
parties: (await bridge.methods.getParties(epoch).call()).toNumber()
})
}
async function processEpochStart (event) {
epoch = event.returnValues.epoch.toNumber()
epochStart = blockNumber
logger.info(`Epoch ${epoch} started`)
rangeSize = (await bridge.methods.getRangeSize().call()).toNumber()
isCurrentValidator = (await bridge.methods.getValidators().call()).includes(validatorAddress)
if (isCurrentValidator) {
logger.info(`${validatorAddress} is a current validator`)
} else {
logger.info(`${validatorAddress} is not a current validator`)
}
logger.info(`Updated range size to ${rangeSize}`)
foreignNonce[epoch] = 0
}

View File

@ -62,11 +62,8 @@ async function main () {
attempt = 1
if (!newEpoch) {
const exchanges = await getExchangeMessages()
const exchanges = await getExchangeMessages(nonce)
const exchangesData = exchanges.map(msg => JSON.parse(msg.content))
if (exchangesData.some(ex => ex.nonce !== nonce)) {
logger.warn('Nonce is different, should not reach this point')
}
if (exchanges.length > 0 && account.sequence <= nonce) {
const recipients = exchangesData.map(({ value, recipient }) => ({ to: recipient, tokens: value }))
@ -139,24 +136,23 @@ async function main () {
main()
async function getExchangeMessages () {
async function getExchangeMessages (nonce) {
logger.debug('Getting exchange messages')
const messages = []
do {
const msg = await exchangeQueue.get()
if (msg === false) {
logger.warn('Reached the end of exchange queue, should not reach this point')
break
}
const data = JSON.parse(msg.content)
logger.debug('Got message %o', data)
if (data.stub) {
channel.ack(msg)
if (data.nonce !== nonce) {
channel.nack(msg, false, true)
break
}
messages.push(msg)
} while (true)
logger.debug('Found %d messages', messages.length)
logger.debug(`Found ${messages.length} messages`)
return messages
}