From 1f61aa65baf348dbf1c7cd84901f08e9d98b4375 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Tue, 8 Oct 2019 19:06:10 +0300 Subject: [PATCH] Moved amqp interaction in shared codebase --- src/oracle/ethWatcher/Dockerfile | 2 +- src/oracle/ethWatcher/ethWatcher.js | 49 +++++++++-------------------- src/oracle/shared/amqp.js | 30 ++++++++++++++++++ src/oracle/tss-keygen/Dockerfile | 2 +- src/oracle/tss-keygen/keygen.js | 24 +++++--------- src/oracle/tss-sign/Dockerfile | 2 +- src/oracle/tss-sign/signer.js | 18 +++-------- 7 files changed, 59 insertions(+), 68 deletions(-) create mode 100644 src/oracle/shared/amqp.js diff --git a/src/oracle/ethWatcher/Dockerfile b/src/oracle/ethWatcher/Dockerfile index a1929fc..ba11679 100644 --- a/src/oracle/ethWatcher/Dockerfile +++ b/src/oracle/ethWatcher/Dockerfile @@ -9,6 +9,6 @@ COPY ./ethWatcher/package.json /watcher/ RUN npm install -COPY ./ethWatcher/ethWatcher.js ./shared/db.js ./shared/logger.js /watcher/ +COPY ./ethWatcher/ethWatcher.js ./shared/db.js ./shared/logger.js ./shared/amqp.js /watcher/ ENTRYPOINT ["node", "ethWatcher.js"] diff --git a/src/oracle/ethWatcher/ethWatcher.js b/src/oracle/ethWatcher/ethWatcher.js index 5306096..0c272d5 100644 --- a/src/oracle/ethWatcher/ethWatcher.js +++ b/src/oracle/ethWatcher/ethWatcher.js @@ -1,14 +1,15 @@ -const amqp = require('amqplib') const Web3 = require('web3') -const redis = require('./db') const crypto = require('crypto') const utils = require('ethers').utils const BN = require('bignumber.js') const bech32 = require('bech32') -const abiBridge = require('./contracts_data/Bridge.json').abi -const abiToken = require('./contracts_data/IERC20.json').abi const logger = require('./logger') +const redis = require('./db') +const { connectRabbit, assertQueue } = require('./amqp') + +const abiToken = require('./contracts_data/IERC20.json').abi +const abiBridge = require('./contracts_data/Bridge.json').abi const { HOME_RPC_URL, HOME_BRIDGE_ADDRESS, RABBITMQ_URL, HOME_TOKEN_ADDRESS, HOME_START_BLOCK } = process.env @@ -25,21 +26,11 @@ let foreignNonce = [] let epoch let redisTx -async function connectRabbit (url) { - return amqp.connect(url).catch(() => { - logger.debug('Failed to connect, reconnecting') - return new Promise(resolve => - setTimeout(() => resolve(connectRabbit(url)), 1000) - ) - }) -} - async function initialize () { - const connection = await connectRabbit(RABBITMQ_URL) - channel = await connection.createChannel() - signQueue = await channel.assertQueue('signQueue') - keygenQueue = await channel.assertQueue('keygenQueue') - cancelKeygenQueue = await channel.assertQueue('cancelKeygenQueue') + channel = await connectRabbit(RABBITMQ_URL) + signQueue = await assertQueue(channel, 'signQueue') + keygenQueue = await assertQueue(channel, 'keygenQueue') + cancelKeygenQueue = await assertQueue(channel, 'cancelKeygenQueue') const events = await bridge.getPastEvents('EpochStart', { fromBlock: 1 @@ -123,37 +114,29 @@ initialize().then(async () => { async function sendKeygen (event) { const newEpoch = event.returnValues.newEpoch.toNumber() - channel.sendToQueue(keygenQueue.queue, Buffer.from(JSON.stringify({ + keygenQueue.send({ epoch: newEpoch, threshold: (await bridge.methods.getThreshold(newEpoch).call()).toNumber(), parties: (await bridge.methods.getParties(newEpoch).call()).toNumber() - })), { - persistent: true }) logger.debug('Sent keygen start event') } function sendKeygenCancelation (event) { const epoch = event.returnValues.epoch.toNumber() - channel.sendToQueue(cancelKeygenQueue.queue, Buffer.from(JSON.stringify({ - epoch - })), { - persistent: true - }) + cancelKeygenQueue.send({ epoch }) logger.debug('Sent keygen cancellation event') } async function sendSignFundsTransfer (event) { const newEpoch = event.returnValues.newEpoch.toNumber() const oldEpoch = event.returnValues.oldEpoch.toNumber() - channel.sendToQueue(signQueue.queue, Buffer.from(JSON.stringify({ + signQueue.send({ epoch: oldEpoch, newEpoch, nonce: foreignNonce[oldEpoch], threshold: (await bridge.methods.getThreshold(oldEpoch).call()).toNumber(), parties: (await bridge.methods.getParties(oldEpoch).call()).toNumber() - })), { - persistent: true }) logger.debug('Sent sign funds transfer event') foreignNonce[oldEpoch]++ @@ -173,7 +156,7 @@ async function sendSign (event) { }) const hash = web3Home.utils.sha3(msg) const publicKey = utils.recoverPublicKey(hash, { r: tx.r, s: tx.s, v: tx.v }) - const msgToQueue = JSON.stringify({ + const msgToQueue = { recipient: publicKeyToAddress({ x: publicKey.substr(4, 64), y: publicKey.substr(68, 64) @@ -183,11 +166,9 @@ async function sendSign (event) { nonce: foreignNonce[epoch], threshold: (await bridge.methods.getThreshold(epoch).call()).toNumber(), parties: (await bridge.methods.getParties(epoch).call()).toNumber() - }) + } - channel.sendToQueue(signQueue.queue, Buffer.from(msgToQueue), { - persistent: true - }) + signQueue.send(msgToQueue) logger.debug('Sent new sign event: %o', msgToQueue) redisTx.incr(`foreignNonce${epoch}`) diff --git a/src/oracle/shared/amqp.js b/src/oracle/shared/amqp.js new file mode 100644 index 0000000..385576c --- /dev/null +++ b/src/oracle/shared/amqp.js @@ -0,0 +1,30 @@ +const amqp = require('amqplib') + +const logger = require('./logger') + +function _connectRabbit (url) { + return amqp.connect(url).catch(() => { + logger.debug('Failed to connect to rabbitmqServer, reconnecting') + return new Promise(resolve => + setTimeout(() => resolve(_connectRabbit(url)), 2000) + ) + }) +} + +async function connectRabbit(url) { + const connection = await _connectRabbit(url) + return await connection.createChannel() +} + +async function assertQueue (channel, name) { + const queue = await channel.assertQueue(name) + return { + queue, + send: msg => channel.sendToQueue(queue.queue, Buffer.from(JSON.stringify(msg)), { + persistent: true + }), + consume: consumer => channel.consume(queue.queue, consumer) + } +} + +module.exports = { connectRabbit, assertQueue } diff --git a/src/oracle/tss-keygen/Dockerfile b/src/oracle/tss-keygen/Dockerfile index e84a2fb..2e5b039 100644 --- a/src/oracle/tss-keygen/Dockerfile +++ b/src/oracle/tss-keygen/Dockerfile @@ -9,7 +9,7 @@ COPY ./tss-keygen/package.json /tss/ RUN npm install -COPY ./tss-keygen/keygen-entrypoint.sh ./tss-keygen/keygen.js ./shared/logger.js /tss/ +COPY ./tss-keygen/keygen-entrypoint.sh ./tss-keygen/keygen.js ./shared/logger.js ./shared/amqp.js /tss/ COPY --from=tss /tss/target/release/gg18_keygen_client /tss/ diff --git a/src/oracle/tss-keygen/keygen.js b/src/oracle/tss-keygen/keygen.js index 332b21a..01e221d 100644 --- a/src/oracle/tss-keygen/keygen.js +++ b/src/oracle/tss-keygen/keygen.js @@ -2,9 +2,9 @@ const exec = require('child_process') const fs = require('fs') const crypto = require('crypto') const bech32 = require('bech32') -const amqp = require('amqplib') const logger = require('./logger') +const { connectRabbit, assertQueue } = require('./amqp') const { RABBITMQ_URL, PROXY_URL } = process.env @@ -12,14 +12,13 @@ let currentKeygenEpoch = null async function main () { logger.info('Connecting to RabbitMQ server') - const connection = await connectRabbit(RABBITMQ_URL) + const channel = await connectRabbit(RABBITMQ_URL) logger.info('Connecting to epoch events queue') - const channel = await connection.createChannel() - const keygenQueue = await channel.assertQueue('keygenQueue') - const cancelKeygenQueue = await channel.assertQueue('cancelKeygenQueue') + const keygenQueue = await assertQueue(channel, 'keygenQueue') + const cancelKeygenQueue = await assertQueue(channel, 'cancelKeygenQueue') channel.prefetch(1) - channel.consume(keygenQueue.queue, msg => { + keygenQueue.consume(msg => { const { epoch, parties, threshold } = JSON.parse(msg.content) logger.info(`Consumed new epoch event, starting keygen for epoch ${epoch}`) @@ -30,7 +29,7 @@ async function main () { logger.debug('Writing params') fs.writeFileSync('./params', JSON.stringify({ parties: parties.toString(), threshold: threshold.toString() })) - const cmd = exec.execFile('./keygen-entrypoint.sh', [PROXY_URL, keysFile], async () => { + const cmd = exec.execFile('./keygen-entrypoint.sh', [ PROXY_URL, keysFile ], async () => { currentKeygenEpoch = null if (fs.existsSync(keysFile)) { logger.info(`Finished keygen for epoch ${epoch}`) @@ -49,7 +48,7 @@ async function main () { cmd.stderr.on('data', data => logger.debug(data.toString())) }) - channel.consume(cancelKeygenQueue.queue, async msg => { + cancelKeygenQueue.consume(async msg => { const { epoch } = JSON.parse(msg.content) logger.info(`Consumed new cancel event for epoch ${epoch} keygen`) if (currentKeygenEpoch === epoch) { @@ -62,15 +61,6 @@ async function main () { main() -async function connectRabbit (url) { - return amqp.connect(url).catch(() => { - logger.debug('Failed to connect, reconnecting') - return new Promise(resolve => - setTimeout(() => resolve(connectRabbit(url)), 1000) - ) - }) -} - async function confirmKeygen (keysFile) { exec.execSync(`curl -X POST -H "Content-Type: application/json" -d @"${keysFile}" "${PROXY_URL}/confirmKeygen"`, { stdio: 'pipe' }) } diff --git a/src/oracle/tss-sign/Dockerfile b/src/oracle/tss-sign/Dockerfile index e9a520c..2e8d6b1 100644 --- a/src/oracle/tss-sign/Dockerfile +++ b/src/oracle/tss-sign/Dockerfile @@ -10,7 +10,7 @@ COPY ./tss-sign/package.json /tss/ RUN npm install --no-optional -COPY ./tss-sign/sign-entrypoint.sh ./tss-sign/signer.js ./tss-sign/tx.js ./shared/logger.js /tss/ +COPY ./tss-sign/sign-entrypoint.sh ./tss-sign/signer.js ./tss-sign/tx.js ./shared/logger.js ./shared/amqp.js /tss/ COPY --from=tss /tss/target/release/gg18_sign_client /tss/ diff --git a/src/oracle/tss-sign/signer.js b/src/oracle/tss-sign/signer.js index 7437f8a..07fb7f8 100644 --- a/src/oracle/tss-sign/signer.js +++ b/src/oracle/tss-sign/signer.js @@ -1,12 +1,12 @@ const exec = require('child_process') const fs = require('fs') -const amqp = require('amqplib') const crypto = require('crypto') const bech32 = require('bech32') const BN = require('bignumber.js') const express = require('express') const logger = require('./logger') +const { connectRabbit, assertQueue } = require('./amqp') const app = express() app.get('/restart/:attempt', restart) @@ -24,13 +24,12 @@ let cancelled async function main () { logger.info('Connecting to RabbitMQ server') - const connection = await connectRabbit(RABBITMQ_URL) + const channel = await connectRabbit(RABBITMQ_URL) logger.info('Connecting to signature events queue') - const channel = await connection.createChannel() - const signQueue = await channel.assertQueue('signQueue') + const signQueue = await assertQueue(channel, 'signQueue') channel.prefetch(1) - channel.consume(signQueue.queue, async msg => { + signQueue.consume(async msg => { const data = JSON.parse(msg.content) logger.info('Consumed sign event: %o', data) @@ -150,15 +149,6 @@ function restart (req, res) { res.send('Cancelled') } -function connectRabbit (url) { - return amqp.connect(url).catch(() => { - logger.debug('Failed to connect, reconnecting') - return new Promise(resolve => - setTimeout(() => resolve(connectRabbit(url)), 1000) - ) - }) -} - function confirmFundsTransfer () { exec.execSync(`curl -X POST -H "Content-Type: application/json" "${PROXY_URL}/confirmFundsTransfer"`, { stdio: 'pipe' }) }