Moved amqp interaction in shared codebase

This commit is contained in:
Kirill Fedoseev 2019-10-08 19:06:10 +03:00
parent 45110f01c5
commit 1f61aa65ba
7 changed files with 59 additions and 68 deletions

View File

@ -9,6 +9,6 @@ COPY ./ethWatcher/package.json /watcher/
RUN npm install
COPY ./ethWatcher/ethWatcher.js ./shared/db.js ./shared/logger.js /watcher/
COPY ./ethWatcher/ethWatcher.js ./shared/db.js ./shared/logger.js ./shared/amqp.js /watcher/
ENTRYPOINT ["node", "ethWatcher.js"]

View File

@ -1,14 +1,15 @@
const amqp = require('amqplib')
const Web3 = require('web3')
const redis = require('./db')
const crypto = require('crypto')
const utils = require('ethers').utils
const BN = require('bignumber.js')
const bech32 = require('bech32')
const abiBridge = require('./contracts_data/Bridge.json').abi
const abiToken = require('./contracts_data/IERC20.json').abi
const logger = require('./logger')
const redis = require('./db')
const { connectRabbit, assertQueue } = require('./amqp')
const abiToken = require('./contracts_data/IERC20.json').abi
const abiBridge = require('./contracts_data/Bridge.json').abi
const { HOME_RPC_URL, HOME_BRIDGE_ADDRESS, RABBITMQ_URL, HOME_TOKEN_ADDRESS, HOME_START_BLOCK } = process.env
@ -25,21 +26,11 @@ let foreignNonce = []
let epoch
let redisTx
async function connectRabbit (url) {
return amqp.connect(url).catch(() => {
logger.debug('Failed to connect, reconnecting')
return new Promise(resolve =>
setTimeout(() => resolve(connectRabbit(url)), 1000)
)
})
}
async function initialize () {
const connection = await connectRabbit(RABBITMQ_URL)
channel = await connection.createChannel()
signQueue = await channel.assertQueue('signQueue')
keygenQueue = await channel.assertQueue('keygenQueue')
cancelKeygenQueue = await channel.assertQueue('cancelKeygenQueue')
channel = await connectRabbit(RABBITMQ_URL)
signQueue = await assertQueue(channel, 'signQueue')
keygenQueue = await assertQueue(channel, 'keygenQueue')
cancelKeygenQueue = await assertQueue(channel, 'cancelKeygenQueue')
const events = await bridge.getPastEvents('EpochStart', {
fromBlock: 1
@ -123,37 +114,29 @@ initialize().then(async () => {
async function sendKeygen (event) {
const newEpoch = event.returnValues.newEpoch.toNumber()
channel.sendToQueue(keygenQueue.queue, Buffer.from(JSON.stringify({
keygenQueue.send({
epoch: newEpoch,
threshold: (await bridge.methods.getThreshold(newEpoch).call()).toNumber(),
parties: (await bridge.methods.getParties(newEpoch).call()).toNumber()
})), {
persistent: true
})
logger.debug('Sent keygen start event')
}
function sendKeygenCancelation (event) {
const epoch = event.returnValues.epoch.toNumber()
channel.sendToQueue(cancelKeygenQueue.queue, Buffer.from(JSON.stringify({
epoch
})), {
persistent: true
})
cancelKeygenQueue.send({ epoch })
logger.debug('Sent keygen cancellation event')
}
async function sendSignFundsTransfer (event) {
const newEpoch = event.returnValues.newEpoch.toNumber()
const oldEpoch = event.returnValues.oldEpoch.toNumber()
channel.sendToQueue(signQueue.queue, Buffer.from(JSON.stringify({
signQueue.send({
epoch: oldEpoch,
newEpoch,
nonce: foreignNonce[oldEpoch],
threshold: (await bridge.methods.getThreshold(oldEpoch).call()).toNumber(),
parties: (await bridge.methods.getParties(oldEpoch).call()).toNumber()
})), {
persistent: true
})
logger.debug('Sent sign funds transfer event')
foreignNonce[oldEpoch]++
@ -173,7 +156,7 @@ async function sendSign (event) {
})
const hash = web3Home.utils.sha3(msg)
const publicKey = utils.recoverPublicKey(hash, { r: tx.r, s: tx.s, v: tx.v })
const msgToQueue = JSON.stringify({
const msgToQueue = {
recipient: publicKeyToAddress({
x: publicKey.substr(4, 64),
y: publicKey.substr(68, 64)
@ -183,11 +166,9 @@ async function sendSign (event) {
nonce: foreignNonce[epoch],
threshold: (await bridge.methods.getThreshold(epoch).call()).toNumber(),
parties: (await bridge.methods.getParties(epoch).call()).toNumber()
})
}
channel.sendToQueue(signQueue.queue, Buffer.from(msgToQueue), {
persistent: true
})
signQueue.send(msgToQueue)
logger.debug('Sent new sign event: %o', msgToQueue)
redisTx.incr(`foreignNonce${epoch}`)

30
src/oracle/shared/amqp.js Normal file
View File

@ -0,0 +1,30 @@
const amqp = require('amqplib')
const logger = require('./logger')
function _connectRabbit (url) {
return amqp.connect(url).catch(() => {
logger.debug('Failed to connect to rabbitmqServer, reconnecting')
return new Promise(resolve =>
setTimeout(() => resolve(_connectRabbit(url)), 2000)
)
})
}
async function connectRabbit(url) {
const connection = await _connectRabbit(url)
return await connection.createChannel()
}
async function assertQueue (channel, name) {
const queue = await channel.assertQueue(name)
return {
queue,
send: msg => channel.sendToQueue(queue.queue, Buffer.from(JSON.stringify(msg)), {
persistent: true
}),
consume: consumer => channel.consume(queue.queue, consumer)
}
}
module.exports = { connectRabbit, assertQueue }

View File

@ -9,7 +9,7 @@ COPY ./tss-keygen/package.json /tss/
RUN npm install
COPY ./tss-keygen/keygen-entrypoint.sh ./tss-keygen/keygen.js ./shared/logger.js /tss/
COPY ./tss-keygen/keygen-entrypoint.sh ./tss-keygen/keygen.js ./shared/logger.js ./shared/amqp.js /tss/
COPY --from=tss /tss/target/release/gg18_keygen_client /tss/

View File

@ -2,9 +2,9 @@ const exec = require('child_process')
const fs = require('fs')
const crypto = require('crypto')
const bech32 = require('bech32')
const amqp = require('amqplib')
const logger = require('./logger')
const { connectRabbit, assertQueue } = require('./amqp')
const { RABBITMQ_URL, PROXY_URL } = process.env
@ -12,14 +12,13 @@ let currentKeygenEpoch = null
async function main () {
logger.info('Connecting to RabbitMQ server')
const connection = await connectRabbit(RABBITMQ_URL)
const channel = await connectRabbit(RABBITMQ_URL)
logger.info('Connecting to epoch events queue')
const channel = await connection.createChannel()
const keygenQueue = await channel.assertQueue('keygenQueue')
const cancelKeygenQueue = await channel.assertQueue('cancelKeygenQueue')
const keygenQueue = await assertQueue(channel, 'keygenQueue')
const cancelKeygenQueue = await assertQueue(channel, 'cancelKeygenQueue')
channel.prefetch(1)
channel.consume(keygenQueue.queue, msg => {
keygenQueue.consume(msg => {
const { epoch, parties, threshold } = JSON.parse(msg.content)
logger.info(`Consumed new epoch event, starting keygen for epoch ${epoch}`)
@ -30,7 +29,7 @@ async function main () {
logger.debug('Writing params')
fs.writeFileSync('./params', JSON.stringify({ parties: parties.toString(), threshold: threshold.toString() }))
const cmd = exec.execFile('./keygen-entrypoint.sh', [PROXY_URL, keysFile], async () => {
const cmd = exec.execFile('./keygen-entrypoint.sh', [ PROXY_URL, keysFile ], async () => {
currentKeygenEpoch = null
if (fs.existsSync(keysFile)) {
logger.info(`Finished keygen for epoch ${epoch}`)
@ -49,7 +48,7 @@ async function main () {
cmd.stderr.on('data', data => logger.debug(data.toString()))
})
channel.consume(cancelKeygenQueue.queue, async msg => {
cancelKeygenQueue.consume(async msg => {
const { epoch } = JSON.parse(msg.content)
logger.info(`Consumed new cancel event for epoch ${epoch} keygen`)
if (currentKeygenEpoch === epoch) {
@ -62,15 +61,6 @@ async function main () {
main()
async function connectRabbit (url) {
return amqp.connect(url).catch(() => {
logger.debug('Failed to connect, reconnecting')
return new Promise(resolve =>
setTimeout(() => resolve(connectRabbit(url)), 1000)
)
})
}
async function confirmKeygen (keysFile) {
exec.execSync(`curl -X POST -H "Content-Type: application/json" -d @"${keysFile}" "${PROXY_URL}/confirmKeygen"`, { stdio: 'pipe' })
}

View File

@ -10,7 +10,7 @@ COPY ./tss-sign/package.json /tss/
RUN npm install --no-optional
COPY ./tss-sign/sign-entrypoint.sh ./tss-sign/signer.js ./tss-sign/tx.js ./shared/logger.js /tss/
COPY ./tss-sign/sign-entrypoint.sh ./tss-sign/signer.js ./tss-sign/tx.js ./shared/logger.js ./shared/amqp.js /tss/
COPY --from=tss /tss/target/release/gg18_sign_client /tss/

View File

@ -1,12 +1,12 @@
const exec = require('child_process')
const fs = require('fs')
const amqp = require('amqplib')
const crypto = require('crypto')
const bech32 = require('bech32')
const BN = require('bignumber.js')
const express = require('express')
const logger = require('./logger')
const { connectRabbit, assertQueue } = require('./amqp')
const app = express()
app.get('/restart/:attempt', restart)
@ -24,13 +24,12 @@ let cancelled
async function main () {
logger.info('Connecting to RabbitMQ server')
const connection = await connectRabbit(RABBITMQ_URL)
const channel = await connectRabbit(RABBITMQ_URL)
logger.info('Connecting to signature events queue')
const channel = await connection.createChannel()
const signQueue = await channel.assertQueue('signQueue')
const signQueue = await assertQueue(channel, 'signQueue')
channel.prefetch(1)
channel.consume(signQueue.queue, async msg => {
signQueue.consume(async msg => {
const data = JSON.parse(msg.content)
logger.info('Consumed sign event: %o', data)
@ -150,15 +149,6 @@ function restart (req, res) {
res.send('Cancelled')
}
function connectRabbit (url) {
return amqp.connect(url).catch(() => {
logger.debug('Failed to connect, reconnecting')
return new Promise(resolve =>
setTimeout(() => resolve(connectRabbit(url)), 1000)
)
})
}
function confirmFundsTransfer () {
exec.execSync(`curl -X POST -H "Content-Type: application/json" "${PROXY_URL}/confirmFundsTransfer"`, { stdio: 'pipe' })
}