Removed web3 dependency from eth watcher

This commit is contained in:
Kirill Fedoseev 2019-11-04 17:06:37 +03:00
parent 8c8dad07ca
commit 7f4bc98318
13 changed files with 117 additions and 35335 deletions

View File

@ -2,6 +2,7 @@ HOME_RPC_URL=http://ganache_home:8545
HOME_BRIDGE_ADDRESS=0x44c158FE850821ae69DaF37AADF5c539e9d0025B
HOME_TOKEN_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc
HOME_START_BLOCK=1
HOME_MAX_FETCH_RANGE_SIZE=10
SIDE_RPC_URL=http://ganache_side:8545
SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc

View File

@ -2,6 +2,7 @@ HOME_RPC_URL=https://kovan.infura.io/v3/5d7bd94c50ed43fab1cb8e74f58678b0
HOME_BRIDGE_ADDRESS=0x6ADCa5e691341fb9de8927d15c0a89B83A4E665e
HOME_TOKEN_ADDRESS=0x57d2533B640cfb58f8f1F69C14c089968Da9fdFc
HOME_START_BLOCK=13276224
HOME_MAX_FETCH_RANGE_SIZE=10
SIDE_RPC_URL=https://sokol.poa.network
SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902

View File

@ -2,6 +2,7 @@ HOME_RPC_URL=http://ganache_home:8545
HOME_BRIDGE_ADDRESS=0x44c158FE850821ae69DaF37AADF5c539e9d0025B
HOME_TOKEN_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc
HOME_START_BLOCK=1
HOME_MAX_FETCH_RANGE_SIZE=10
SIDE_RPC_URL=http://ganache_side:8545
SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc

View File

@ -2,6 +2,7 @@ HOME_RPC_URL=https://kovan.infura.io/v3/5d7bd94c50ed43fab1cb8e74f58678b0
HOME_BRIDGE_ADDRESS=0x6ADCa5e691341fb9de8927d15c0a89B83A4E665e
HOME_TOKEN_ADDRESS=0x57d2533B640cfb58f8f1F69C14c089968Da9fdFc
HOME_START_BLOCK=13276224
HOME_MAX_FETCH_RANGE_SIZE=10
SIDE_RPC_URL=https://sokol.poa.network
SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902

View File

@ -2,6 +2,7 @@ HOME_RPC_URL=http://ganache_home:8545
HOME_BRIDGE_ADDRESS=0x44c158FE850821ae69DaF37AADF5c539e9d0025B
HOME_TOKEN_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc
HOME_START_BLOCK=1
HOME_MAX_FETCH_RANGE_SIZE=10
SIDE_RPC_URL=http://ganache_side:8545
SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc

View File

@ -2,6 +2,7 @@ HOME_RPC_URL=https://kovan.infura.io/v3/5d7bd94c50ed43fab1cb8e74f58678b0
HOME_BRIDGE_ADDRESS=0x6ADCa5e691341fb9de8927d15c0a89B83A4E665e
HOME_TOKEN_ADDRESS=0x57d2533B640cfb58f8f1F69C14c089968Da9fdFc
HOME_START_BLOCK=13276224
HOME_MAX_FETCH_RANGE_SIZE=10
SIDE_RPC_URL=https://sokol.poa.network
SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902

View File

@ -10,5 +10,8 @@
},
"engines": {
"node": ">=10.6.0"
},
"dependencies": {
"ethers": "^4.0.39"
}
}

View File

@ -84,6 +84,7 @@ services:
- HOME_START_BLOCK
- BLOCKS_RANGE_SIZE
- VALIDATOR_PRIVATE_KEY
- HOME_MAX_FETCH_RANGE_SIZE
- 'RABBITMQ_URL=amqp://rabbitmq:5672'
- LOG_LEVEL
networks:

View File

@ -93,6 +93,7 @@ services:
- HOME_TOKEN_ADDRESS
- HOME_START_BLOCK
- VALIDATOR_PRIVATE_KEY
- HOME_MAX_FETCH_RANGE_SIZE
- 'RABBITMQ_URL=amqp://rabbitmq:5672'
- LOG_LEVEL
networks:

File diff suppressed because one or more lines are too long

View File

@ -10,6 +10,5 @@ COPY ./ethWatcher/package.json /watcher/
RUN npm install
COPY ./ethWatcher/ethWatcher.js ./shared/db.js ./shared/logger.js ./shared/amqp.js ./shared/crypto.js ./shared/wait.js /watcher/
COPY ./ethWatcher/Bridge.json /watcher/contracts_data/
ENTRYPOINT ["node", "ethWatcher.js"]

View File

@ -1,5 +1,4 @@
const Web3 = require('web3')
const { utils } = require('ethers')
const ethers = require('ethers')
const BN = require('bignumber.js')
const axios = require('axios')
@ -9,15 +8,25 @@ const { connectRabbit, assertQueue } = require('./amqp')
const { publicKeyToAddress } = require('./crypto')
const { delay } = require('./wait')
const abiBridge = require('./contracts_data/Bridge.json').abi
const {
HOME_RPC_URL, HOME_BRIDGE_ADDRESS, RABBITMQ_URL, HOME_START_BLOCK, VALIDATOR_PRIVATE_KEY
} = process.env
const HOME_MAX_FETCH_RANGE_SIZE = parseInt(process.env.HOME_MAX_FETCH_RANGE_SIZE, 10)
const homeWeb3 = new Web3(HOME_RPC_URL)
const bridge = new homeWeb3.eth.Contract(abiBridge, HOME_BRIDGE_ADDRESS)
const validatorAddress = homeWeb3.eth.accounts.privateKeyToAccount(`0x${VALIDATOR_PRIVATE_KEY}`).address
const provider = new ethers.providers.JsonRpcProvider(HOME_RPC_URL)
const bridgeAbi = [
'event ExchangeRequest(uint value, uint nonce)',
'event NewEpoch(uint indexed oldEpoch, uint indexed newEpoch)',
'event NewEpochCancelled(uint indexed epoch)',
'event NewFundsTransfer(uint indexed oldEpoch, uint indexed newEpoch)',
'event EpochStart(uint indexed epoch, uint x, uint y)',
'function getThreshold(uint epoch) view returns (uint)',
'function getParties(uint epoch) view returns (uint)',
'function getRangeSize() view returns (uint)',
'function getValidators() view returns (address[])'
]
const bridge = new ethers.Contract(HOME_BRIDGE_ADDRESS, bridgeAbi, provider)
const validatorAddress = ethers.utils.computeAddress(`0x${VALIDATOR_PRIVATE_KEY}`)
const foreignNonce = []
let channel
@ -25,6 +34,7 @@ let exchangeQueue
let signQueue
let keygenQueue
let cancelKeygenQueue
let chainId
let blockNumber
let epoch
let epochStart
@ -72,20 +82,18 @@ async function resetFutureMessages(queue) {
}
async function sendKeygen(event) {
const newEpoch = event.returnValues.newEpoch.toNumber()
const newEpoch = event.values.newEpoch.toNumber()
keygenQueue.send({
epoch: newEpoch,
blockNumber,
threshold: (await bridge.methods.getThreshold(newEpoch)
.call()).toNumber(),
parties: (await bridge.methods.getParties(newEpoch)
.call()).toNumber()
threshold: (await bridge.getThreshold(newEpoch)).toNumber(),
parties: (await bridge.getParties(newEpoch)).toNumber()
})
logger.debug('Sent keygen start event')
}
function sendKeygenCancellation(event) {
const eventEpoch = event.returnValues.epoch.toNumber()
const eventEpoch = event.values.epoch.toNumber()
cancelKeygenQueue.send({
epoch: eventEpoch,
blockNumber
@ -94,36 +102,33 @@ function sendKeygenCancellation(event) {
}
async function sendSignFundsTransfer(event) {
const newEpoch = event.returnValues.newEpoch.toNumber()
const oldEpoch = event.returnValues.oldEpoch.toNumber()
const newEpoch = event.values.newEpoch.toNumber()
const oldEpoch = event.values.oldEpoch.toNumber()
signQueue.send({
epoch: oldEpoch,
blockNumber,
newEpoch,
nonce: foreignNonce[oldEpoch],
threshold: (await bridge.methods.getThreshold(oldEpoch)
.call()).toNumber(),
parties: (await bridge.methods.getParties(oldEpoch)
.call()).toNumber()
threshold: (await bridge.getThreshold(oldEpoch)).toNumber(),
parties: (await bridge.getParties(oldEpoch)).toNumber()
})
logger.debug('Sent sign funds transfer event')
foreignNonce[oldEpoch] += 1
redisTx.incr(`foreignNonce${oldEpoch}`)
}
async function sendSign(event) {
const tx = await homeWeb3.eth.getTransaction(event.transactionHash)
const msg = utils.serializeTransaction({
async function sendSign(event, transactionHash) {
const tx = await provider.getTransaction(transactionHash)
const msg = ethers.utils.serializeTransaction({
nonce: tx.nonce,
gasPrice: `0x${new BN(tx.gasPrice).toString(16)}`,
gasLimit: `0x${new BN(tx.gas).toString(16)}`,
gasPrice: tx.gasPrice,
gasLimit: tx.gasLimit,
to: tx.to,
value: `0x${new BN(tx.value).toString(16)}`,
data: tx.input,
chainId: await homeWeb3.eth.net.getId()
data: tx.data,
chainId
})
const hash = homeWeb3.utils.sha3(msg)
const publicKey = utils.recoverPublicKey(hash, {
const hash = ethers.utils.keccak256(msg)
const publicKey = ethers.utils.recoverPublicKey(hash, {
r: tx.r,
s: tx.s,
v: tx.v
@ -135,9 +140,8 @@ async function sendSign(event) {
x: publicKey.substr(4, 64),
y: publicKey.substr(68, 64)
}),
value: (new BN(event.returnValues.value)).dividedBy(10 ** 18)
.toFixed(8, 3),
nonce: event.returnValues.nonce.toNumber()
value: (new BN(event.values.value)).dividedBy(10 ** 18).toFixed(8, 3),
nonce: event.values.nonce.toNumber()
}
exchangeQueue.send(msgToQueue)
@ -154,22 +158,19 @@ async function sendStartSign() {
epoch,
blockNumber,
nonce: foreignNonce[epoch],
threshold: (await bridge.methods.getThreshold(epoch)
.call()).toNumber(),
parties: (await bridge.methods.getParties(epoch)
.call()).toNumber()
threshold: (await bridge.getThreshold(epoch)).toNumber(),
parties: (await bridge.getParties(epoch)).toNumber()
})
foreignNonce[epoch] += 1
}
async function processEpochStart(event) {
epoch = event.returnValues.epoch.toNumber()
epoch = event.values.epoch.toNumber()
epochStart = blockNumber
logger.info(`Epoch ${epoch} started`)
rangeSize = (await bridge.methods.getRangeSize()
.call()).toNumber()
isCurrentValidator = (await bridge.methods.getValidators()
.call()).includes(validatorAddress)
rangeSize = (await bridge.getRangeSize()).toNumber()
isCurrentValidator = (await bridge.getValidators())
.includes(validatorAddress)
if (isCurrentValidator) {
logger.info(`${validatorAddress} is a current validator`)
} else {
@ -186,10 +187,16 @@ async function initialize() {
keygenQueue = await assertQueue(channel, 'keygenQueue')
cancelKeygenQueue = await assertQueue(channel, 'cancelKeygenQueue')
const events = await bridge.getPastEvents('EpochStart', {
fromBlock: 1
})
epoch = events.length ? events[events.length - 1].returnValues.epoch.toNumber() : 0
chainId = (await provider.getNetwork()).chainId
const events = (await provider.getLogs({
address: HOME_BRIDGE_ADDRESS,
fromBlock: 1,
toBlock: 'latest',
topics: bridge.filters.EpochStart().topics
})).map((log) => bridge.interface.parseLog(log))
epoch = events.length ? events[events.length - 1].values.epoch.toNumber() : 0
logger.info(`Current epoch ${epoch}`)
epochStart = events.length ? events[events.length - 1].blockNumber : 1
const saved = (parseInt(await redis.get('homeBlock'), 10) + 1) || parseInt(HOME_START_BLOCK, 10)
@ -206,10 +213,11 @@ async function initialize() {
blockNumber = saved
foreignNonce[epoch] = parseInt(await redis.get(`foreignNonce${epoch}`), 10) || 0
}
rangeSize = (await bridge.methods.getRangeSize().call()).toNumber()
rangeSize = (await bridge.getRangeSize()).toNumber()
logger.debug(`Range size ${rangeSize}`)
logger.debug('Checking if current validator')
isCurrentValidator = (await bridge.methods.getValidators().call()).includes(validatorAddress)
isCurrentValidator = (await bridge.getValidators())
.includes(validatorAddress)
if (isCurrentValidator) {
logger.info(`${validatorAddress} is a current validator`)
} else {
@ -226,60 +234,68 @@ async function initialize() {
}
async function loop() {
logger.debug(`Watching events in block #${blockNumber}`)
if (await homeWeb3.eth.getBlock(blockNumber) === null) {
logger.debug('No block')
await delay(1000)
const latestBlockNumber = await provider.getBlockNumber()
if (latestBlockNumber < blockNumber) {
logger.debug(`No block after ${latestBlockNumber}`)
await delay(2000)
return
}
const endBlock = Math.min(latestBlockNumber, blockNumber + HOME_MAX_FETCH_RANGE_SIZE - 1)
redisTx = redis.multi()
const bridgeEvents = await bridge.getPastEvents('allEvents', {
logger.debug(`Watching events in blocks #${blockNumber}-${endBlock}`)
const bridgeEvents = (await provider.getLogs({
address: HOME_BRIDGE_ADDRESS,
fromBlock: blockNumber,
toBlock: blockNumber
})
toBlock: endBlock,
topics: []
}))
for (let i = 0; i < bridgeEvents.length; i += 1) {
const event = bridgeEvents[i]
switch (event.event) {
case 'NewEpoch':
await sendKeygen(event)
break
case 'NewEpochCancelled':
sendKeygenCancellation(event)
break
case 'NewFundsTransfer':
if (isCurrentValidator) {
await sendSignFundsTransfer(event)
}
break
case 'ExchangeRequest':
if (isCurrentValidator) {
await sendSign(event)
}
break
case 'EpochStart':
await processEpochStart(event)
break
default:
logger.warn('Unknown event %o', event)
for (let curBlockNumber = blockNumber, i = 0; curBlockNumber <= endBlock; curBlockNumber += 1) {
while (i < bridgeEvents.length && bridgeEvents[i].blockNumber === curBlockNumber) {
const event = bridge.interface.parseLog(bridgeEvents[i])
logger.debug('%o %o', event, bridgeEvents[i])
switch (event.name) {
case 'NewEpoch':
await sendKeygen(event)
break
case 'NewEpochCancelled':
sendKeygenCancellation(event)
break
case 'NewFundsTransfer':
if (isCurrentValidator) {
await sendSignFundsTransfer(event)
}
break
case 'ExchangeRequest':
if (isCurrentValidator) {
await sendSign(event, bridgeEvents[i].transactionHash)
}
break
case 'EpochStart':
await processEpochStart(event)
break
default:
logger.warn('Unknown event %o', event)
}
i += 1
}
if ((curBlockNumber + 1 - epochStart) % rangeSize === 0) {
logger.info('Reached end of the current block range')
if (lastTransactionBlockNumber > curBlockNumber - rangeSize) {
logger.info('Sending message to start signature generation for the ended range')
await sendStartSign()
}
}
}
if ((blockNumber + 1 - epochStart) % rangeSize === 0) {
logger.info('Reached end of the current block range')
if (lastTransactionBlockNumber > blockNumber - rangeSize) {
logger.info('Sending message to start signature generation for the ended range')
await sendStartSign()
}
}
blockNumber += 1
blockNumber = endBlock + 1
// Exec redis tx
await redisTx.incr('homeBlock')
.exec()
await redisTx.incr('homeBlock').exec()
await redis.save()
}

View File

@ -4,7 +4,6 @@
"dependencies": {
"ioredis": "4.10.0",
"amqplib": "0.5.3",
"web3": "1.0.0-beta.55",
"ethers": "4.0.33",
"bignumber.js": "9.0.0",
"bech32": "1.1.3",