diff --git a/demo/validator1/.env.development b/demo/validator1/.env.development index 183da5a..6335b08 100644 --- a/demo/validator1/.env.development +++ b/demo/validator1/.env.development @@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc FOREIGN_URL=https://testnet-dex.binance.org/ FOREIGN_CHAIN_ID=Binance-Chain-Nile FOREIGN_ASSET=KFT-94F -FOREIGN_START_TIME=1572500000000 +FOREIGN_FETCH_MAX_TIME_INTERVAL=60000 FOREIGN_FETCH_INTERVAL=5000 FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000 diff --git a/demo/validator1/.env.staging b/demo/validator1/.env.staging index 2bd353f..83a0313 100644 --- a/demo/validator1/.env.staging +++ b/demo/validator1/.env.staging @@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902 FOREIGN_URL=https://testnet-dex.binance.org/ FOREIGN_CHAIN_ID=Binance-Chain-Nile FOREIGN_ASSET=KFT-94F -FOREIGN_START_TIME=1572500000000 +FOREIGN_FETCH_MAX_TIME_INTERVAL=60000 FOREIGN_FETCH_INTERVAL=5000 FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000 diff --git a/demo/validator2/.env.development b/demo/validator2/.env.development index 491d22f..305ec1d 100644 --- a/demo/validator2/.env.development +++ b/demo/validator2/.env.development @@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc FOREIGN_URL=https://testnet-dex.binance.org/ FOREIGN_CHAIN_ID=Binance-Chain-Nile FOREIGN_ASSET=KFT-94F -FOREIGN_START_TIME=1572500000000 +FOREIGN_FETCH_MAX_TIME_INTERVAL=60000 FOREIGN_FETCH_INTERVAL=5000 FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000 diff --git a/demo/validator2/.env.staging b/demo/validator2/.env.staging index ed88661..94c04a1 100644 --- a/demo/validator2/.env.staging +++ b/demo/validator2/.env.staging @@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902 FOREIGN_URL=https://testnet-dex.binance.org/ FOREIGN_CHAIN_ID=Binance-Chain-Nile FOREIGN_ASSET=KFT-94F -FOREIGN_START_TIME=1572500000000 +FOREIGN_FETCH_MAX_TIME_INTERVAL=60000 FOREIGN_FETCH_INTERVAL=5000 FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000 diff --git a/demo/validator3/.env.development b/demo/validator3/.env.development index 014db1a..7025ee3 100644 --- a/demo/validator3/.env.development +++ b/demo/validator3/.env.development @@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc FOREIGN_URL=https://testnet-dex.binance.org/ FOREIGN_CHAIN_ID=Binance-Chain-Nile FOREIGN_ASSET=KFT-94F -FOREIGN_START_TIME=1572500000000 +FOREIGN_FETCH_MAX_TIME_INTERVAL=60000 FOREIGN_FETCH_INTERVAL=5000 FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000 diff --git a/demo/validator3/.env.staging b/demo/validator3/.env.staging index a1e5fef..458974f 100644 --- a/demo/validator3/.env.staging +++ b/demo/validator3/.env.staging @@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902 FOREIGN_URL=https://testnet-dex.binance.org/ FOREIGN_CHAIN_ID=Binance-Chain-Nile FOREIGN_ASSET=KFT-94F -FOREIGN_START_TIME=1572500000000 +FOREIGN_FETCH_MAX_TIME_INTERVAL=60000 FOREIGN_FETCH_INTERVAL=5000 FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000 diff --git a/src/deploy/deploy-home/contracts/Bridge.sol b/src/deploy/deploy-home/contracts/Bridge.sol index c424563..63ce4e1 100644 --- a/src/deploy/deploy-home/contracts/Bridge.sol +++ b/src/deploy/deploy-home/contracts/Bridge.sol @@ -4,6 +4,7 @@ import './openzeppelin-solidity/contracts/token/ERC20/IERC20.sol'; contract Bridge { event ExchangeRequest(uint value, uint nonce); + event EpochEnd(uint indexed epoch); event NewEpoch(uint indexed oldEpoch, uint indexed newEpoch); event NewEpochCancelled(uint indexed epoch); event NewFundsTransfer(uint indexed oldEpoch, uint indexed newEpoch); @@ -14,6 +15,7 @@ contract Bridge { uint threshold; uint rangeSize; uint startBlock; + uint endBlock; uint nonce; uint x; uint y; @@ -65,7 +67,16 @@ contract Bridge { status = Status.KEYGEN; nextEpoch = 1; - states[nextEpoch] = State(validators, threshold, rangeSize, 0, uint(-1), 0, 0); + states[nextEpoch] = State({ + validators : validators, + threshold : threshold, + rangeSize : rangeSize, + startBlock : 0, + endBlock : uint(-1), + nonce : uint(-1), + x : 0, + y : 0 + }); minTxLimit = limits[0]; maxTxLimit = limits[1]; @@ -133,7 +144,7 @@ contract Bridge { if (nextEpoch == 1) { status = Status.READY; states[nextEpoch].startBlock = block.number; - states[nextEpoch].nonce = uint(-1); + states[nextEpoch].nonce = uint(- 1); epoch = nextEpoch; emit EpochStart(epoch, x, y); } @@ -246,9 +257,12 @@ contract Bridge { if (tryVote(Vote.START_VOTING, epoch)) { nextEpoch++; status = Status.VOTING; + states[nextEpoch].endBlock = block.number; states[nextEpoch].threshold = getThreshold(); states[nextEpoch].validators = getValidators(); states[nextEpoch].rangeSize = getRangeSize(); + + emit EpochEnd(epoch); } } diff --git a/src/oracle/bncWatcher/Dockerfile b/src/oracle/bncWatcher/Dockerfile index 104556b..af9ce69 100644 --- a/src/oracle/bncWatcher/Dockerfile +++ b/src/oracle/bncWatcher/Dockerfile @@ -9,6 +9,6 @@ COPY ./bncWatcher/package.json /watcher/ RUN npm install -COPY ./bncWatcher/bncWatcher.js ./shared/db.js ./shared/logger.js ./shared/crypto.js ./shared/wait.js /watcher/ +COPY ./bncWatcher/bncWatcher.js ./shared/db.js ./shared/logger.js ./shared/crypto.js ./shared/amqp.js ./shared/wait.js /watcher/ ENTRYPOINT ["node", "bncWatcher.js"] diff --git a/src/oracle/bncWatcher/bncWatcher.js b/src/oracle/bncWatcher/bncWatcher.js index 966d159..1320e43 100644 --- a/src/oracle/bncWatcher/bncWatcher.js +++ b/src/oracle/bncWatcher/bncWatcher.js @@ -7,25 +7,30 @@ const logger = require('./logger') const redis = require('./db') const { publicKeyToAddress } = require('./crypto') const { delay, retry } = require('./wait') +const { connectRabbit, assertQueue } = require('./amqp') -const { FOREIGN_URL, PROXY_URL, FOREIGN_ASSET } = process.env +const { + FOREIGN_URL, PROXY_URL, FOREIGN_ASSET, RABBITMQ_URL +} = process.env -const FOREIGN_START_TIME = parseInt(process.env.FOREIGN_START_TIME, 10) const FOREIGN_FETCH_INTERVAL = parseInt(process.env.FOREIGN_FETCH_INTERVAL, 10) const FOREIGN_FETCH_BLOCK_TIME_OFFSET = parseInt(process.env.FOREIGN_FETCH_BLOCK_TIME_OFFSET, 10) +const FOREIGN_FETCH_MAX_TIME_INTERVAL = parseInt(process.env.FOREIGN_FETCH_MAX_TIME_INTERVAL, 10) const foreignHttpClient = axios.create({ baseURL: FOREIGN_URL }) const proxyHttpClient = axios.create({ baseURL: PROXY_URL }) -function getLastForeignAddress() { - const epoch = Math.max(0, ...fs.readdirSync('/keys') - .map((x) => parseInt(x.split('.')[0].substr(4), 10))) - if (epoch === 0) { +let channel +let epochTimeIntervalsQueue + +function getForeignAddress(epoch) { + const keysFile = `/keys/keys${epoch}.store` + try { + const publicKey = JSON.parse(fs.readFileSync(keysFile))[5] + return publicKeyToAddress(publicKey) + } catch (e) { return null } - const keysFile = `/keys/keys${epoch}.store` - const publicKey = JSON.parse(fs.readFileSync(keysFile))[5] - return publicKeyToAddress(publicKey) } async function getTx(hash) { @@ -45,15 +50,8 @@ async function getBlockTime() { return Date.parse(response.data.block_time) - FOREIGN_FETCH_BLOCK_TIME_OFFSET } -async function fetchNewTransactions() { +async function fetchNewTransactions(address, startTime, endTime) { logger.debug('Fetching new transactions') - const startTime = parseInt(await redis.get('foreignTime'), 10) + 1 - const address = getLastForeignAddress() - const endTime = Math.min(startTime + 3 * 30 * 24 * 60 * 60 * 1000, await getBlockTime()) - if (address === null) { - return {} - } - logger.debug('Sending api transactions request') const params = { address, side: 'RECEIVE', @@ -63,27 +61,101 @@ async function fetchNewTransactions() { endTime } - logger.trace('%o', params) - const transactions = (await retry(() => foreignHttpClient - .get('/api/v1/transactions', { params }))).data.tx + logger.trace('Transactions fetch params %o', params) + return ( + await retry(() => foreignHttpClient.get('/api/v1/transactions', { params })) + ).data.tx +} +async function fetchTimeIntervalsQueue() { + let epoch = null + let startTime = null + let endTime = null + const lastBncBlockTime = await getBlockTime() + logger.trace(`Binance last block timestamp ${lastBncBlockTime}`) + while (true) { + const msg = await epochTimeIntervalsQueue.get() + if (msg === false) { + break + } + const data = JSON.parse(msg.content) + logger.trace('Consumed time interval event %o', data) + if (epoch !== null && epoch !== data.epoch) { + logger.warn('Two consequently events have different epochs, should not be like this') + channel.nack(msg, false, true) + break + } + if (data.startTime) { + logger.trace('Set foreign time', data) + await redis.set(`foreignTime${data.epoch}`, data.startTime) + channel.ack(msg) + break + } + if (epoch === null) { + epoch = data.epoch + startTime = await redis.get(`foreignTime${epoch}`) + logger.trace(`Retrieved epoch ${epoch} and start time ${startTime} from redis`) + if (startTime === null) { + logger.warn(`Empty foreign time for epoch ${epoch}`) + } + } /* + if (data.endTime) { + if (data.endTime - startTime < FOREIGN_FETCH_MAX_TIME_INTERVAL + && data.endTime < lastBncBlockTime) { + endTime = data.endTime + channel.ack(msg) + } else { + logger.trace('Requeuing current queue message') + channel.nack(msg, false, true) + } + break + } */ + if (data.prolongedTime - startTime < FOREIGN_FETCH_MAX_TIME_INTERVAL + && data.prolongedTime < lastBncBlockTime) { + endTime = data.prolongedTime + channel.ack(msg) + } else { + logger.trace('Requeuing current queue message') + channel.nack(msg, false, true) + break + } + } return { - transactions, + epoch, + startTime, endTime } } async function initialize() { - if (await redis.get('foreignTime') === null) { - logger.info('Set default foreign time') - await redis.set('foreignTime', FOREIGN_START_TIME) - } + channel = await connectRabbit(RABBITMQ_URL) + logger.info('Connecting to epoch time intervals queue') + epochTimeIntervalsQueue = await assertQueue(channel, 'epochTimeIntervalsQueue') } async function loop() { - const { transactions, endTime } = await fetchNewTransactions() - if (!transactions || transactions.length === 0) { + const { epoch, startTime, endTime } = await fetchTimeIntervalsQueue() + + if (!startTime || !endTime) { + logger.debug('Nothing to fetch') + await delay(FOREIGN_FETCH_INTERVAL) + return + } + + const address = getForeignAddress(epoch) + + if (!address) { + logger.debug('Validator is not included in current epoch') + await redis.set(`foreignTime${epoch}`, endTime) + await delay(FOREIGN_FETCH_INTERVAL) + return + } + + const transactions = await fetchNewTransactions(address, startTime, endTime) + + if (transactions.length === 0) { logger.debug('Found 0 new transactions') + await redis.set(`foreignTime${epoch}`, endTime) await delay(FOREIGN_FETCH_INTERVAL) return } @@ -95,17 +167,16 @@ async function loop() { const tx = transactions[i] if (tx.memo !== 'funding') { const publicKeyEncoded = (await getTx(tx.txHash)).signatures[0].pub_key.value - await proxyHttpClient - .post('/transfer', { - to: computeAddress(Buffer.from(publicKeyEncoded, 'base64')), - value: new BN(tx.value) - .multipliedBy(10 ** 18) - .integerValue(), - hash: `0x${tx.txHash}` - }) + await proxyHttpClient.post('/transfer', { + to: computeAddress(Buffer.from(publicKeyEncoded, 'base64')), + value: new BN(tx.value) + .multipliedBy(10 ** 18) + .integerValue(), + hash: `0x${tx.txHash}` + }) } } - await redis.set('foreignTime', endTime) + await redis.set(`foreignTime${epoch}`, endTime) } async function main() { diff --git a/src/oracle/bncWatcher/package.json b/src/oracle/bncWatcher/package.json index 721ea33..08a6559 100644 --- a/src/oracle/bncWatcher/package.json +++ b/src/oracle/bncWatcher/package.json @@ -2,6 +2,7 @@ "name": "watcher", "version": "0.0.1", "dependencies": { + "amqplib": "0.5.3", "ioredis": "4.10.0", "axios": "0.19.0", "bech32": "1.1.3", diff --git a/src/oracle/docker-compose-test.yml b/src/oracle/docker-compose-test.yml index 3286cba..8c7a4b0 100644 --- a/src/oracle/docker-compose-test.yml +++ b/src/oracle/docker-compose-test.yml @@ -103,7 +103,7 @@ services: - FOREIGN_ASSET - 'RABBITMQ_URL=amqp://rabbitmq:5672' - 'PROXY_URL=http://proxy:8001' - - FOREIGN_START_TIME + - FOREIGN_FETCH_MAX_TIME_INTERVAL - FOREIGN_FETCH_INTERVAL - FOREIGN_FETCH_BLOCK_TIME_OFFSET - LOG_LEVEL diff --git a/src/oracle/docker-compose.yml b/src/oracle/docker-compose.yml index 3463f2b..38f56f7 100644 --- a/src/oracle/docker-compose.yml +++ b/src/oracle/docker-compose.yml @@ -112,7 +112,7 @@ services: - FOREIGN_ASSET - 'RABBITMQ_URL=amqp://rabbitmq:5672' - 'PROXY_URL=http://proxy:8001' - - FOREIGN_START_TIME + - FOREIGN_FETCH_MAX_TIME_INTERVAL - FOREIGN_FETCH_INTERVAL - FOREIGN_FETCH_BLOCK_TIME_OFFSET - LOG_LEVEL diff --git a/src/oracle/ethWatcher/ethWatcher.js b/src/oracle/ethWatcher/ethWatcher.js index f770fda..813ff0b 100644 --- a/src/oracle/ethWatcher/ethWatcher.js +++ b/src/oracle/ethWatcher/ethWatcher.js @@ -16,6 +16,7 @@ const HOME_MAX_FETCH_RANGE_SIZE = parseInt(process.env.HOME_MAX_FETCH_RANGE_SIZE const provider = new ethers.providers.JsonRpcProvider(HOME_RPC_URL) const bridgeAbi = [ 'event ExchangeRequest(uint value, uint nonce)', + 'event EpochEnd(uint indexed epoch)', 'event NewEpoch(uint indexed oldEpoch, uint indexed newEpoch)', 'event NewEpochCancelled(uint indexed epoch)', 'event NewFundsTransfer(uint indexed oldEpoch, uint indexed newEpoch)', @@ -34,6 +35,7 @@ let exchangeQueue let signQueue let keygenQueue let cancelKeygenQueue +let epochTimeIntervalsQueue let chainId let blockNumber let epoch @@ -42,6 +44,11 @@ let redisTx let rangeSize let lastTransactionBlockNumber let isCurrentValidator +let activeEpoch + +async function getBlockTimestamp(n) { + return (await provider.getBlock(n, false)).timestamp +} async function resetFutureMessages(queue) { logger.debug(`Resetting future messages in queue ${queue.name}`) @@ -169,8 +176,7 @@ async function processEpochStart(event) { epochStart = blockNumber logger.info(`Epoch ${epoch} started`) rangeSize = (await bridge.getRangeSize()).toNumber() - isCurrentValidator = (await bridge.getValidators()) - .includes(validatorAddress) + isCurrentValidator = (await bridge.getValidators()).includes(validatorAddress) if (isCurrentValidator) { logger.info(`${validatorAddress} is a current validator`) } else { @@ -186,6 +192,9 @@ async function initialize() { signQueue = await assertQueue(channel, 'signQueue') keygenQueue = await assertQueue(channel, 'keygenQueue') cancelKeygenQueue = await assertQueue(channel, 'cancelKeygenQueue') + epochTimeIntervalsQueue = await assertQueue(channel, 'epochTimeIntervalsQueue') + + activeEpoch = !!(await redis.get('activeEpoch')) chainId = (await provider.getNetwork()).chainId @@ -228,6 +237,7 @@ async function initialize() { await resetFutureMessages(cancelKeygenQueue) await resetFutureMessages(exchangeQueue) await resetFutureMessages(signQueue) + await resetFutureMessages(epochTimeIntervalsQueue) logger.debug('Sending start commands') await axios.get('http://keygen:8001/start') await axios.get('http://signer:8001/start') @@ -255,9 +265,11 @@ async function loop() { })) for (let curBlockNumber = blockNumber, i = 0; curBlockNumber <= endBlock; curBlockNumber += 1) { + let epochTimeUpdated = false + const curBlockTimestamp = await getBlockTimestamp(curBlockNumber) while (i < bridgeEvents.length && bridgeEvents[i].blockNumber === curBlockNumber) { const event = bridge.interface.parseLog(bridgeEvents[i]) - logger.debug('%o %o', event, bridgeEvents[i]) + logger.trace('Consumed event %o %o', event, bridgeEvents[i]) switch (event.name) { case 'NewEpoch': await sendKeygen(event) @@ -277,12 +289,39 @@ async function loop() { break case 'EpochStart': await processEpochStart(event) + await redis.set('activeEpoch', true) + activeEpoch = true + epochTimeIntervalsQueue.send({ + blockNumber: curBlockNumber, + startTime: curBlockTimestamp * 1000, + epoch + }) + epochTimeUpdated = true + break + case 'EpochEnd': + logger.debug(`Consumed epoch ${epoch} end event`) + await redis.set('activeEpoch', false) + activeEpoch = false + epochTimeIntervalsQueue.send({ + blockNumber: curBlockNumber, + prolongedTime: curBlockTimestamp * 1000, + epoch + }) break default: logger.warn('Unknown event %o', event) } i += 1 } + + if (!epochTimeUpdated && epoch > 0 && activeEpoch) { + epochTimeIntervalsQueue.send({ + blockNumber: curBlockNumber, + prolongedTime: curBlockTimestamp * 1000, + epoch + }) + } + if ((curBlockNumber + 1 - epochStart) % rangeSize === 0) { logger.info('Reached end of the current block range') diff --git a/src/oracle/shared/.eslintrc b/src/oracle/shared/.eslintrc new file mode 100644 index 0000000..54d0153 --- /dev/null +++ b/src/oracle/shared/.eslintrc @@ -0,0 +1,9 @@ +{ + "extends": [ + "../../../.eslintrc" + ], + "rules": { + "import/no-extraneous-dependencies": 0, + "node/no-extraneous-require": 0 + } +} diff --git a/src/oracle/shared/amqp.js b/src/oracle/shared/amqp.js index e6a88d9..e05c37f 100644 --- a/src/oracle/shared/amqp.js +++ b/src/oracle/shared/amqp.js @@ -1,16 +1,11 @@ const amqp = require('amqplib') const logger = require('./logger') +const { retry } = require('./wait') async function connectRabbit(url) { - while (true) { - try { - return (await amqp.connect(url)).createChannel() - } catch (e) { - logger.debug('Failed to connect to rabbitmqServer, reconnecting') - await new Promise((resolve) => setTimeout(resolve, 2000)) - } - } + logger.info('Connecting to RabbitMQ server') + return (await retry(() => amqp.connect(url))).createChannel() } async function assertQueue(channel, name) { diff --git a/src/oracle/tss-keygen/keygen.js b/src/oracle/tss-keygen/keygen.js index 2315936..0f16b9e 100644 --- a/src/oracle/tss-keygen/keygen.js +++ b/src/oracle/tss-keygen/keygen.js @@ -19,7 +19,6 @@ async function confirmKeygen(keysFile) { } async function main() { - logger.info('Connecting to RabbitMQ server') const channel = await connectRabbit(RABBITMQ_URL) logger.info('Connecting to epoch events queue') const keygenQueue = await assertQueue(channel, 'keygenQueue') diff --git a/src/oracle/tss-sign/signer.js b/src/oracle/tss-sign/signer.js index b23ce4f..1b89ae5 100644 --- a/src/oracle/tss-sign/signer.js +++ b/src/oracle/tss-sign/signer.js @@ -130,7 +130,7 @@ function sign(keysFile, hash, tx, publicKey, signerAddress) { let nonceInterrupt = false return new Promise((resolve) => { const cmd = exec.execFile('./sign-entrypoint.sh', [PROXY_URL, keysFile, hash], async (error) => { - logger.debug('%o', error) + logger.trace('Sign entrypoint exited, %o', error) clearInterval(nonceDaemonIntervalId) clearTimeout(restartTimeoutId) if (fs.existsSync('signature')) { // if signature was generated @@ -195,7 +195,6 @@ function sign(keysFile, hash, tx, publicKey, signerAddress) { } async function main() { - logger.info('Connecting to RabbitMQ server') channel = await connectRabbit(RABBITMQ_URL) logger.info('Connecting to signature events queue') exchangeQueue = await assertQueue(channel, 'exchangeQueue') diff --git a/tests/test/utils/proxyController.js b/tests/test/utils/proxyController.js index 13d03c0..3dac478 100644 --- a/tests/test/utils/proxyController.js +++ b/tests/test/utils/proxyController.js @@ -1,6 +1,6 @@ const axios = require('axios') -const {retry} = require('./wait') +const { retry } = require('./wait') function createController(validatorId) { const url = `http://validator${validatorId}_proxy_1:8002/`