Refactored signer and keygen clients
This commit is contained in:
parent
799004c159
commit
fd549fe6ab
|
@ -14,6 +14,7 @@ const app = express()
|
|||
|
||||
const proxyClient = axios.create({ baseURL: PROXY_URL })
|
||||
|
||||
let channel
|
||||
let currentKeygenEpoch = null
|
||||
let ready = false
|
||||
|
||||
|
@ -25,8 +26,45 @@ async function confirmKeygen({ x, y }, epoch) {
|
|||
})
|
||||
}
|
||||
|
||||
function writeParams(parties, threshold) {
|
||||
logger.debug('Writing params')
|
||||
fs.writeFileSync('./params', JSON.stringify({
|
||||
parties: parties.toString(),
|
||||
threshold: (threshold - 1).toString()
|
||||
}))
|
||||
}
|
||||
|
||||
async function keygenConsumer(msg) {
|
||||
const { epoch, parties, threshold } = JSON.parse(msg.content)
|
||||
logger.info(`Consumed new epoch event, starting keygen for epoch ${epoch}`)
|
||||
|
||||
const keysFile = `/keys/keys${epoch}.store`
|
||||
|
||||
logger.info('Running ./keygen-entrypoint.sh')
|
||||
currentKeygenEpoch = epoch
|
||||
|
||||
writeParams(parties, threshold)
|
||||
const cmd = exec.execFile('./keygen-entrypoint.sh', [PROXY_URL, keysFile], async () => {
|
||||
currentKeygenEpoch = null
|
||||
if (fs.existsSync(keysFile)) {
|
||||
logger.info(`Finished keygen for epoch ${epoch}`)
|
||||
const publicKey = JSON.parse(fs.readFileSync(keysFile))[5]
|
||||
logger.warn(`Generated multisig account in binance chain: ${publicKeyToAddress(publicKey)}`)
|
||||
|
||||
logger.info('Sending keys confirmation')
|
||||
await confirmKeygen(publicKey, epoch)
|
||||
} else {
|
||||
logger.warn(`Keygen for epoch ${epoch} failed`)
|
||||
}
|
||||
logger.debug('Ack for keygen message')
|
||||
channel.ack(msg)
|
||||
})
|
||||
cmd.stdout.on('data', (data) => logger.debug(data.toString()))
|
||||
cmd.stderr.on('data', (data) => logger.debug(data.toString()))
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const channel = await connectRabbit(RABBITMQ_URL)
|
||||
channel = await connectRabbit(RABBITMQ_URL)
|
||||
logger.info('Connecting to epoch events queue')
|
||||
const keygenQueue = await assertQueue(channel, 'keygenQueue')
|
||||
const cancelKeygenQueue = await assertQueue(channel, 'cancelKeygenQueue')
|
||||
|
@ -36,38 +74,7 @@ async function main() {
|
|||
}
|
||||
|
||||
channel.prefetch(1)
|
||||
keygenQueue.consume((msg) => {
|
||||
const { epoch, parties, threshold } = JSON.parse(msg.content)
|
||||
logger.info(`Consumed new epoch event, starting keygen for epoch ${epoch}`)
|
||||
|
||||
const keysFile = `/keys/keys${epoch}.store`
|
||||
|
||||
logger.info('Running ./keygen-entrypoint.sh')
|
||||
currentKeygenEpoch = epoch
|
||||
|
||||
logger.debug('Writing params')
|
||||
fs.writeFileSync('./params', JSON.stringify({
|
||||
parties: parties.toString(),
|
||||
threshold: (threshold - 1).toString()
|
||||
}))
|
||||
const cmd = exec.execFile('./keygen-entrypoint.sh', [PROXY_URL, keysFile], async () => {
|
||||
currentKeygenEpoch = null
|
||||
if (fs.existsSync(keysFile)) {
|
||||
logger.info(`Finished keygen for epoch ${epoch}`)
|
||||
const publicKey = JSON.parse(fs.readFileSync(keysFile))[5]
|
||||
logger.warn(`Generated multisig account in binance chain: ${publicKeyToAddress(publicKey)}`)
|
||||
|
||||
logger.info('Sending keys confirmation')
|
||||
await confirmKeygen(publicKey, epoch)
|
||||
} else {
|
||||
logger.warn(`Keygen for epoch ${epoch} failed`)
|
||||
}
|
||||
logger.debug('Ack for keygen message')
|
||||
channel.ack(msg)
|
||||
})
|
||||
cmd.stdout.on('data', (data) => logger.debug(data.toString()))
|
||||
cmd.stderr.on('data', (data) => logger.debug(data.toString()))
|
||||
})
|
||||
keygenQueue.consume(keygenConsumer)
|
||||
|
||||
cancelKeygenQueue.consume(async (msg) => {
|
||||
const { epoch } = JSON.parse(msg.content)
|
||||
|
|
|
@ -27,7 +27,6 @@ const SIGN_OK = 0
|
|||
const SIGN_NONCE_INTERRUPT = 1
|
||||
const SIGN_FAILED = 2
|
||||
|
||||
let attempt
|
||||
let nextAttempt = null
|
||||
let cancelled
|
||||
let ready = false
|
||||
|
@ -102,7 +101,8 @@ async function getAccount(address) {
|
|||
async function getFee() {
|
||||
logger.info('Getting fees')
|
||||
const response = await retry(() => httpClient.get('/api/v1/fees'))
|
||||
return response.data.filter((fee) => fee.multi_transfer_fee)[0].multi_transfer_fee * 2
|
||||
const multiTransferFee = response.data.find((fee) => fee.multi_transfer_fee).multi_transfer_fee
|
||||
return new BN(multiTransferFee * 2).div(10 ** 8)
|
||||
}
|
||||
|
||||
async function waitForAccountNonce(address, nonce) {
|
||||
|
@ -145,10 +145,14 @@ async function sendTx(tx) {
|
|||
}
|
||||
}
|
||||
|
||||
function sign(keysFile, hash, tx, publicKey, signerAddress) {
|
||||
function sign(keysFile, tx, publicKey, signerAddress) {
|
||||
let restartTimeoutId
|
||||
let nonceDaemonIntervalId
|
||||
let nonceInterrupt = false
|
||||
|
||||
const hash = sha256(tx.getSignBytes())
|
||||
logger.info(`Starting signature generation for transaction hash ${hash}`)
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const cmd = exec.execFile('./sign-entrypoint.sh', [PROXY_URL, keysFile, hash], async (error) => {
|
||||
logger.trace('Sign entrypoint exited, %o', error)
|
||||
|
@ -215,6 +219,125 @@ function sign(keysFile, hash, tx, publicKey, signerAddress) {
|
|||
})
|
||||
}
|
||||
|
||||
function getAccountBalance(account, asset) {
|
||||
return account.balances.find((token) => token.symbol === asset).free
|
||||
}
|
||||
|
||||
async function buildTx(from, account, data, txAttempt) {
|
||||
const { closeEpoch, newEpoch, nonce } = data
|
||||
|
||||
const txOptions = {
|
||||
from,
|
||||
accountNumber: account.account_number,
|
||||
sequence: nonce,
|
||||
asset: FOREIGN_ASSET,
|
||||
memo: `Attempt ${txAttempt}`
|
||||
}
|
||||
let exchanges
|
||||
|
||||
if (closeEpoch) {
|
||||
logger.info(`Building corresponding account flags transaction, nonce ${nonce}`)
|
||||
|
||||
txOptions.flags = 0x01
|
||||
} else if (newEpoch) {
|
||||
const newKeysFile = `/keys/keys${newEpoch}.store`
|
||||
const to = getAccountFromFile(newKeysFile).address
|
||||
|
||||
if (to === '') {
|
||||
return { tx: null }
|
||||
}
|
||||
|
||||
logger.info(`Building corresponding transaction for transferring all funds, nonce ${nonce}, recipient ${to}`)
|
||||
const fee = await getFee()
|
||||
|
||||
txOptions.recipients = [{
|
||||
to,
|
||||
tokens: getAccountBalance(account, FOREIGN_ASSET),
|
||||
bnbs: new BN(getAccountBalance(account, 'BNB')).minus(fee)
|
||||
}]
|
||||
} else {
|
||||
logger.info(`Building corresponding transfer transaction, nonce ${nonce}`)
|
||||
exchanges = await getExchangeMessages(nonce)
|
||||
const exchangesData = exchanges.map((exchangeMsg) => JSON.parse(exchangeMsg.content))
|
||||
|
||||
txOptions.recipients = exchangesData.map(({ value, recipient }) => ({
|
||||
to: recipient,
|
||||
tokens: value
|
||||
}))
|
||||
}
|
||||
|
||||
const tx = new Transaction(txOptions)
|
||||
|
||||
return {
|
||||
tx,
|
||||
exchanges
|
||||
}
|
||||
}
|
||||
|
||||
function writeParams(parties, threshold) {
|
||||
logger.debug('Writing params')
|
||||
fs.writeFileSync('./params', JSON.stringify({
|
||||
parties: parties.toString(),
|
||||
threshold: (threshold - 1).toString()
|
||||
}))
|
||||
}
|
||||
|
||||
async function consumer(msg) {
|
||||
const data = JSON.parse(msg.content)
|
||||
|
||||
logger.info('Consumed sign event: %o', data)
|
||||
const {
|
||||
nonce, epoch, newEpoch, parties, threshold, closeEpoch
|
||||
} = data
|
||||
|
||||
const keysFile = `/keys/keys${epoch || closeEpoch}.store`
|
||||
const { address: from, publicKey } = getAccountFromFile(keysFile)
|
||||
if (from === '') {
|
||||
logger.info('No keys found, acking message')
|
||||
channel.ack(msg)
|
||||
return
|
||||
}
|
||||
const account = await getAccount(from)
|
||||
|
||||
if (nonce > account.sequence) {
|
||||
logger.debug('Tx has been already sent')
|
||||
logger.info('Acking message (skipped nonce)')
|
||||
channel.ack(msg)
|
||||
return
|
||||
}
|
||||
|
||||
writeParams(parties, threshold)
|
||||
let attempt = 1
|
||||
|
||||
const { tx, exchanges } = buildTx(from, account, data, attempt)
|
||||
|
||||
while (tx !== null) {
|
||||
const signResult = await sign(keysFile, tx, publicKey, from)
|
||||
|
||||
if (signResult === SIGN_OK || signResult === SIGN_NONCE_INTERRUPT) {
|
||||
if (closeEpoch) {
|
||||
await confirmCloseEpoch(closeEpoch)
|
||||
} else if (newEpoch) {
|
||||
await confirmFundsTransfer(epoch)
|
||||
} else {
|
||||
// eslint-disable-next-line no-loop-func
|
||||
exchanges.forEach((exchangeMsg) => channel.ack(exchangeMsg))
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// signer either failed, or timed out after parties signup
|
||||
attempt = nextAttempt || attempt + 1
|
||||
nextAttempt = null
|
||||
logger.warn(`Sign failed, starting next attempt ${attempt}`)
|
||||
tx.tx.memo = `Attempt ${attempt}`
|
||||
await delay(1000)
|
||||
}
|
||||
logger.info('Acking message')
|
||||
channel.ack(msg)
|
||||
}
|
||||
|
||||
|
||||
async function main() {
|
||||
channel = await connectRabbit(RABBITMQ_URL)
|
||||
logger.info('Connecting to signature events queue')
|
||||
|
@ -226,138 +349,7 @@ async function main() {
|
|||
}
|
||||
|
||||
channel.prefetch(1)
|
||||
signQueue.consume(async (msg) => {
|
||||
const data = JSON.parse(msg.content)
|
||||
|
||||
logger.info('Consumed sign event: %o', data)
|
||||
const {
|
||||
nonce, epoch, newEpoch, parties, threshold, closeEpoch
|
||||
} = data
|
||||
|
||||
const keysFile = `/keys/keys${epoch || closeEpoch}.store`
|
||||
const { address: from, publicKey } = getAccountFromFile(keysFile)
|
||||
if (from === '') {
|
||||
logger.info('No keys found, acking message')
|
||||
channel.ack(msg)
|
||||
return
|
||||
}
|
||||
const account = await getAccount(from)
|
||||
|
||||
logger.debug('Writing params')
|
||||
fs.writeFileSync('./params', JSON.stringify({
|
||||
parties: parties.toString(),
|
||||
threshold: (threshold - 1).toString()
|
||||
}))
|
||||
|
||||
attempt = 1
|
||||
|
||||
if (closeEpoch) {
|
||||
while (true) {
|
||||
logger.info(`Building corresponding account flags transaction, nonce ${nonce}`)
|
||||
|
||||
const tx = new Transaction({
|
||||
from,
|
||||
accountNumber: account.account_number,
|
||||
sequence: nonce,
|
||||
flags: 0x01,
|
||||
memo: `Attempt ${attempt}`
|
||||
})
|
||||
|
||||
const hash = sha256(tx.getSignBytes())
|
||||
logger.info(`Starting signature generation for transaction hash ${hash}`)
|
||||
const signResult = await sign(keysFile, hash, tx, publicKey, from)
|
||||
|
||||
if (signResult === SIGN_OK || signResult === SIGN_NONCE_INTERRUPT) {
|
||||
await confirmCloseEpoch(closeEpoch)
|
||||
break
|
||||
}
|
||||
|
||||
// signer either failed, or timed out after parties signup
|
||||
attempt = nextAttempt || attempt + 1
|
||||
nextAttempt = null
|
||||
logger.warn(`Sign failed, starting next attempt ${attempt}`)
|
||||
await delay(1000)
|
||||
}
|
||||
} else if (!newEpoch) {
|
||||
const exchanges = await getExchangeMessages(nonce)
|
||||
const exchangesData = exchanges.map((exchangeMsg) => JSON.parse(exchangeMsg.content))
|
||||
|
||||
if (exchanges.length > 0 && account.sequence <= nonce) {
|
||||
const recipients = exchangesData.map(({ value, recipient }) => ({
|
||||
to: recipient,
|
||||
tokens: value
|
||||
}))
|
||||
|
||||
while (true) {
|
||||
logger.info(`Building corresponding transfer transaction, nonce ${nonce}`)
|
||||
|
||||
const tx = new Transaction({
|
||||
from,
|
||||
accountNumber: account.account_number,
|
||||
sequence: nonce,
|
||||
recipients,
|
||||
asset: FOREIGN_ASSET,
|
||||
memo: `Attempt ${attempt}`
|
||||
})
|
||||
|
||||
const hash = sha256(tx.getSignBytes())
|
||||
logger.info(`Starting signature generation for transaction hash ${hash}`)
|
||||
const signResult = await sign(keysFile, hash, tx, publicKey, from)
|
||||
|
||||
if (signResult === SIGN_OK || signResult === SIGN_NONCE_INTERRUPT) {
|
||||
// eslint-disable-next-line no-loop-func
|
||||
exchanges.forEach((exchangeMsg) => channel.ack(exchangeMsg))
|
||||
break
|
||||
}
|
||||
|
||||
// signer either failed, or timed out after parties signup
|
||||
attempt = nextAttempt || attempt + 1
|
||||
nextAttempt = null
|
||||
logger.warn(`Sign failed, starting next attempt ${attempt}`)
|
||||
await delay(1000)
|
||||
}
|
||||
}
|
||||
} else if (account.sequence <= nonce) {
|
||||
const newKeysFile = `/keys/keys${newEpoch}.store`
|
||||
const { address: to } = getAccountFromFile(newKeysFile)
|
||||
|
||||
while (to !== '') {
|
||||
logger.info(`Building corresponding transaction for transferring all funds, nonce ${nonce}, recipient ${to}`)
|
||||
const fee = await getFee()
|
||||
const tx = new Transaction({
|
||||
from,
|
||||
accountNumber: account.account_number,
|
||||
sequence: nonce,
|
||||
recipients: [{
|
||||
to,
|
||||
tokens: account.balances.find((token) => token.symbol === FOREIGN_ASSET).free,
|
||||
bnbs: new BN(account.balances.find((token) => token.symbol === 'BNB').free).minus(new BN(fee).div(10 ** 8))
|
||||
}],
|
||||
asset: FOREIGN_ASSET,
|
||||
memo: `Attempt ${attempt}`
|
||||
})
|
||||
|
||||
const hash = sha256(tx.getSignBytes())
|
||||
logger.info(`Starting signature generation for transaction hash ${hash}`)
|
||||
const signResult = await sign(keysFile, hash, tx, publicKey, from)
|
||||
|
||||
if (signResult === SIGN_OK || signResult === SIGN_NONCE_INTERRUPT) {
|
||||
await confirmFundsTransfer(epoch)
|
||||
break
|
||||
}
|
||||
|
||||
// signer either failed, or timed out after parties signup
|
||||
attempt = nextAttempt || attempt + 1
|
||||
nextAttempt = null
|
||||
logger.warn(`Sign failed, starting next attempt ${attempt}`)
|
||||
await delay(1000)
|
||||
}
|
||||
} else {
|
||||
logger.debug('Tx has been already sent')
|
||||
}
|
||||
logger.info('Acking message')
|
||||
channel.ack(msg)
|
||||
})
|
||||
signQueue.consume(consumer)
|
||||
}
|
||||
|
||||
app.get('/restart/:attempt', restart)
|
||||
|
|
Loading…
Reference in New Issue