Made bnc watcher fetch policy to be dependent on eth block timestamps

This commit is contained in:
Kirill Fedoseev 2019-11-07 17:06:12 +03:00
parent 6ea6df820c
commit 37a5c1e989
18 changed files with 189 additions and 62 deletions

View File

@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc
FOREIGN_URL=https://testnet-dex.binance.org/
FOREIGN_CHAIN_ID=Binance-Chain-Nile
FOREIGN_ASSET=KFT-94F
FOREIGN_START_TIME=1572500000000
FOREIGN_FETCH_MAX_TIME_INTERVAL=60000
FOREIGN_FETCH_INTERVAL=5000
FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000

View File

@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902
FOREIGN_URL=https://testnet-dex.binance.org/
FOREIGN_CHAIN_ID=Binance-Chain-Nile
FOREIGN_ASSET=KFT-94F
FOREIGN_START_TIME=1572500000000
FOREIGN_FETCH_MAX_TIME_INTERVAL=60000
FOREIGN_FETCH_INTERVAL=5000
FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000

View File

@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc
FOREIGN_URL=https://testnet-dex.binance.org/
FOREIGN_CHAIN_ID=Binance-Chain-Nile
FOREIGN_ASSET=KFT-94F
FOREIGN_START_TIME=1572500000000
FOREIGN_FETCH_MAX_TIME_INTERVAL=60000
FOREIGN_FETCH_INTERVAL=5000
FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000

View File

@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902
FOREIGN_URL=https://testnet-dex.binance.org/
FOREIGN_CHAIN_ID=Binance-Chain-Nile
FOREIGN_ASSET=KFT-94F
FOREIGN_START_TIME=1572500000000
FOREIGN_FETCH_MAX_TIME_INTERVAL=60000
FOREIGN_FETCH_INTERVAL=5000
FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000

View File

@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xd5fE0D28e058D375b0b038fFbB446Da37E85fFdc
FOREIGN_URL=https://testnet-dex.binance.org/
FOREIGN_CHAIN_ID=Binance-Chain-Nile
FOREIGN_ASSET=KFT-94F
FOREIGN_START_TIME=1572500000000
FOREIGN_FETCH_MAX_TIME_INTERVAL=60000
FOREIGN_FETCH_INTERVAL=5000
FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000

View File

@ -10,7 +10,7 @@ SIDE_SHARED_DB_ADDRESS=0xda9a1cA2Fcb18cAB02934269369627D2b4ea8902
FOREIGN_URL=https://testnet-dex.binance.org/
FOREIGN_CHAIN_ID=Binance-Chain-Nile
FOREIGN_ASSET=KFT-94F
FOREIGN_START_TIME=1572500000000
FOREIGN_FETCH_MAX_TIME_INTERVAL=60000
FOREIGN_FETCH_INTERVAL=5000
FOREIGN_FETCH_BLOCK_TIME_OFFSET=10000

View File

@ -4,6 +4,7 @@ import './openzeppelin-solidity/contracts/token/ERC20/IERC20.sol';
contract Bridge {
event ExchangeRequest(uint value, uint nonce);
event EpochEnd(uint indexed epoch);
event NewEpoch(uint indexed oldEpoch, uint indexed newEpoch);
event NewEpochCancelled(uint indexed epoch);
event NewFundsTransfer(uint indexed oldEpoch, uint indexed newEpoch);
@ -14,6 +15,7 @@ contract Bridge {
uint threshold;
uint rangeSize;
uint startBlock;
uint endBlock;
uint nonce;
uint x;
uint y;
@ -65,7 +67,16 @@ contract Bridge {
status = Status.KEYGEN;
nextEpoch = 1;
states[nextEpoch] = State(validators, threshold, rangeSize, 0, uint(-1), 0, 0);
states[nextEpoch] = State({
validators : validators,
threshold : threshold,
rangeSize : rangeSize,
startBlock : 0,
endBlock : uint(-1),
nonce : uint(-1),
x : 0,
y : 0
});
minTxLimit = limits[0];
maxTxLimit = limits[1];
@ -133,7 +144,7 @@ contract Bridge {
if (nextEpoch == 1) {
status = Status.READY;
states[nextEpoch].startBlock = block.number;
states[nextEpoch].nonce = uint(-1);
states[nextEpoch].nonce = uint(- 1);
epoch = nextEpoch;
emit EpochStart(epoch, x, y);
}
@ -246,9 +257,12 @@ contract Bridge {
if (tryVote(Vote.START_VOTING, epoch)) {
nextEpoch++;
status = Status.VOTING;
states[nextEpoch].endBlock = block.number;
states[nextEpoch].threshold = getThreshold();
states[nextEpoch].validators = getValidators();
states[nextEpoch].rangeSize = getRangeSize();
emit EpochEnd(epoch);
}
}

View File

@ -9,6 +9,6 @@ COPY ./bncWatcher/package.json /watcher/
RUN npm install
COPY ./bncWatcher/bncWatcher.js ./shared/db.js ./shared/logger.js ./shared/crypto.js ./shared/wait.js /watcher/
COPY ./bncWatcher/bncWatcher.js ./shared/db.js ./shared/logger.js ./shared/crypto.js ./shared/amqp.js ./shared/wait.js /watcher/
ENTRYPOINT ["node", "bncWatcher.js"]

View File

@ -7,25 +7,30 @@ const logger = require('./logger')
const redis = require('./db')
const { publicKeyToAddress } = require('./crypto')
const { delay, retry } = require('./wait')
const { connectRabbit, assertQueue } = require('./amqp')
const { FOREIGN_URL, PROXY_URL, FOREIGN_ASSET } = process.env
const {
FOREIGN_URL, PROXY_URL, FOREIGN_ASSET, RABBITMQ_URL
} = process.env
const FOREIGN_START_TIME = parseInt(process.env.FOREIGN_START_TIME, 10)
const FOREIGN_FETCH_INTERVAL = parseInt(process.env.FOREIGN_FETCH_INTERVAL, 10)
const FOREIGN_FETCH_BLOCK_TIME_OFFSET = parseInt(process.env.FOREIGN_FETCH_BLOCK_TIME_OFFSET, 10)
const FOREIGN_FETCH_MAX_TIME_INTERVAL = parseInt(process.env.FOREIGN_FETCH_MAX_TIME_INTERVAL, 10)
const foreignHttpClient = axios.create({ baseURL: FOREIGN_URL })
const proxyHttpClient = axios.create({ baseURL: PROXY_URL })
function getLastForeignAddress() {
const epoch = Math.max(0, ...fs.readdirSync('/keys')
.map((x) => parseInt(x.split('.')[0].substr(4), 10)))
if (epoch === 0) {
let channel
let epochTimeIntervalsQueue
function getForeignAddress(epoch) {
const keysFile = `/keys/keys${epoch}.store`
try {
const publicKey = JSON.parse(fs.readFileSync(keysFile))[5]
return publicKeyToAddress(publicKey)
} catch (e) {
return null
}
const keysFile = `/keys/keys${epoch}.store`
const publicKey = JSON.parse(fs.readFileSync(keysFile))[5]
return publicKeyToAddress(publicKey)
}
async function getTx(hash) {
@ -45,15 +50,8 @@ async function getBlockTime() {
return Date.parse(response.data.block_time) - FOREIGN_FETCH_BLOCK_TIME_OFFSET
}
async function fetchNewTransactions() {
async function fetchNewTransactions(address, startTime, endTime) {
logger.debug('Fetching new transactions')
const startTime = parseInt(await redis.get('foreignTime'), 10) + 1
const address = getLastForeignAddress()
const endTime = Math.min(startTime + 3 * 30 * 24 * 60 * 60 * 1000, await getBlockTime())
if (address === null) {
return {}
}
logger.debug('Sending api transactions request')
const params = {
address,
side: 'RECEIVE',
@ -63,27 +61,101 @@ async function fetchNewTransactions() {
endTime
}
logger.trace('%o', params)
const transactions = (await retry(() => foreignHttpClient
.get('/api/v1/transactions', { params }))).data.tx
logger.trace('Transactions fetch params %o', params)
return (
await retry(() => foreignHttpClient.get('/api/v1/transactions', { params }))
).data.tx
}
async function fetchTimeIntervalsQueue() {
let epoch = null
let startTime = null
let endTime = null
const lastBncBlockTime = await getBlockTime()
logger.trace(`Binance last block timestamp ${lastBncBlockTime}`)
while (true) {
const msg = await epochTimeIntervalsQueue.get()
if (msg === false) {
break
}
const data = JSON.parse(msg.content)
logger.trace('Consumed time interval event %o', data)
if (epoch !== null && epoch !== data.epoch) {
logger.warn('Two consequently events have different epochs, should not be like this')
channel.nack(msg, false, true)
break
}
if (data.startTime) {
logger.trace('Set foreign time', data)
await redis.set(`foreignTime${data.epoch}`, data.startTime)
channel.ack(msg)
break
}
if (epoch === null) {
epoch = data.epoch
startTime = await redis.get(`foreignTime${epoch}`)
logger.trace(`Retrieved epoch ${epoch} and start time ${startTime} from redis`)
if (startTime === null) {
logger.warn(`Empty foreign time for epoch ${epoch}`)
}
} /*
if (data.endTime) {
if (data.endTime - startTime < FOREIGN_FETCH_MAX_TIME_INTERVAL
&& data.endTime < lastBncBlockTime) {
endTime = data.endTime
channel.ack(msg)
} else {
logger.trace('Requeuing current queue message')
channel.nack(msg, false, true)
}
break
} */
if (data.prolongedTime - startTime < FOREIGN_FETCH_MAX_TIME_INTERVAL
&& data.prolongedTime < lastBncBlockTime) {
endTime = data.prolongedTime
channel.ack(msg)
} else {
logger.trace('Requeuing current queue message')
channel.nack(msg, false, true)
break
}
}
return {
transactions,
epoch,
startTime,
endTime
}
}
async function initialize() {
if (await redis.get('foreignTime') === null) {
logger.info('Set default foreign time')
await redis.set('foreignTime', FOREIGN_START_TIME)
}
channel = await connectRabbit(RABBITMQ_URL)
logger.info('Connecting to epoch time intervals queue')
epochTimeIntervalsQueue = await assertQueue(channel, 'epochTimeIntervalsQueue')
}
async function loop() {
const { transactions, endTime } = await fetchNewTransactions()
if (!transactions || transactions.length === 0) {
const { epoch, startTime, endTime } = await fetchTimeIntervalsQueue()
if (!startTime || !endTime) {
logger.debug('Nothing to fetch')
await delay(FOREIGN_FETCH_INTERVAL)
return
}
const address = getForeignAddress(epoch)
if (!address) {
logger.debug('Validator is not included in current epoch')
await redis.set(`foreignTime${epoch}`, endTime)
await delay(FOREIGN_FETCH_INTERVAL)
return
}
const transactions = await fetchNewTransactions(address, startTime, endTime)
if (transactions.length === 0) {
logger.debug('Found 0 new transactions')
await redis.set(`foreignTime${epoch}`, endTime)
await delay(FOREIGN_FETCH_INTERVAL)
return
}
@ -95,17 +167,16 @@ async function loop() {
const tx = transactions[i]
if (tx.memo !== 'funding') {
const publicKeyEncoded = (await getTx(tx.txHash)).signatures[0].pub_key.value
await proxyHttpClient
.post('/transfer', {
to: computeAddress(Buffer.from(publicKeyEncoded, 'base64')),
value: new BN(tx.value)
.multipliedBy(10 ** 18)
.integerValue(),
hash: `0x${tx.txHash}`
})
await proxyHttpClient.post('/transfer', {
to: computeAddress(Buffer.from(publicKeyEncoded, 'base64')),
value: new BN(tx.value)
.multipliedBy(10 ** 18)
.integerValue(),
hash: `0x${tx.txHash}`
})
}
}
await redis.set('foreignTime', endTime)
await redis.set(`foreignTime${epoch}`, endTime)
}
async function main() {

View File

@ -2,6 +2,7 @@
"name": "watcher",
"version": "0.0.1",
"dependencies": {
"amqplib": "0.5.3",
"ioredis": "4.10.0",
"axios": "0.19.0",
"bech32": "1.1.3",

View File

@ -103,7 +103,7 @@ services:
- FOREIGN_ASSET
- 'RABBITMQ_URL=amqp://rabbitmq:5672'
- 'PROXY_URL=http://proxy:8001'
- FOREIGN_START_TIME
- FOREIGN_FETCH_MAX_TIME_INTERVAL
- FOREIGN_FETCH_INTERVAL
- FOREIGN_FETCH_BLOCK_TIME_OFFSET
- LOG_LEVEL

View File

@ -112,7 +112,7 @@ services:
- FOREIGN_ASSET
- 'RABBITMQ_URL=amqp://rabbitmq:5672'
- 'PROXY_URL=http://proxy:8001'
- FOREIGN_START_TIME
- FOREIGN_FETCH_MAX_TIME_INTERVAL
- FOREIGN_FETCH_INTERVAL
- FOREIGN_FETCH_BLOCK_TIME_OFFSET
- LOG_LEVEL

View File

@ -16,6 +16,7 @@ const HOME_MAX_FETCH_RANGE_SIZE = parseInt(process.env.HOME_MAX_FETCH_RANGE_SIZE
const provider = new ethers.providers.JsonRpcProvider(HOME_RPC_URL)
const bridgeAbi = [
'event ExchangeRequest(uint value, uint nonce)',
'event EpochEnd(uint indexed epoch)',
'event NewEpoch(uint indexed oldEpoch, uint indexed newEpoch)',
'event NewEpochCancelled(uint indexed epoch)',
'event NewFundsTransfer(uint indexed oldEpoch, uint indexed newEpoch)',
@ -34,6 +35,7 @@ let exchangeQueue
let signQueue
let keygenQueue
let cancelKeygenQueue
let epochTimeIntervalsQueue
let chainId
let blockNumber
let epoch
@ -42,6 +44,11 @@ let redisTx
let rangeSize
let lastTransactionBlockNumber
let isCurrentValidator
let activeEpoch
async function getBlockTimestamp(n) {
return (await provider.getBlock(n, false)).timestamp
}
async function resetFutureMessages(queue) {
logger.debug(`Resetting future messages in queue ${queue.name}`)
@ -169,8 +176,7 @@ async function processEpochStart(event) {
epochStart = blockNumber
logger.info(`Epoch ${epoch} started`)
rangeSize = (await bridge.getRangeSize()).toNumber()
isCurrentValidator = (await bridge.getValidators())
.includes(validatorAddress)
isCurrentValidator = (await bridge.getValidators()).includes(validatorAddress)
if (isCurrentValidator) {
logger.info(`${validatorAddress} is a current validator`)
} else {
@ -186,6 +192,9 @@ async function initialize() {
signQueue = await assertQueue(channel, 'signQueue')
keygenQueue = await assertQueue(channel, 'keygenQueue')
cancelKeygenQueue = await assertQueue(channel, 'cancelKeygenQueue')
epochTimeIntervalsQueue = await assertQueue(channel, 'epochTimeIntervalsQueue')
activeEpoch = !!(await redis.get('activeEpoch'))
chainId = (await provider.getNetwork()).chainId
@ -228,6 +237,7 @@ async function initialize() {
await resetFutureMessages(cancelKeygenQueue)
await resetFutureMessages(exchangeQueue)
await resetFutureMessages(signQueue)
await resetFutureMessages(epochTimeIntervalsQueue)
logger.debug('Sending start commands')
await axios.get('http://keygen:8001/start')
await axios.get('http://signer:8001/start')
@ -255,9 +265,11 @@ async function loop() {
}))
for (let curBlockNumber = blockNumber, i = 0; curBlockNumber <= endBlock; curBlockNumber += 1) {
let epochTimeUpdated = false
const curBlockTimestamp = await getBlockTimestamp(curBlockNumber)
while (i < bridgeEvents.length && bridgeEvents[i].blockNumber === curBlockNumber) {
const event = bridge.interface.parseLog(bridgeEvents[i])
logger.debug('%o %o', event, bridgeEvents[i])
logger.trace('Consumed event %o %o', event, bridgeEvents[i])
switch (event.name) {
case 'NewEpoch':
await sendKeygen(event)
@ -277,12 +289,39 @@ async function loop() {
break
case 'EpochStart':
await processEpochStart(event)
await redis.set('activeEpoch', true)
activeEpoch = true
epochTimeIntervalsQueue.send({
blockNumber: curBlockNumber,
startTime: curBlockTimestamp * 1000,
epoch
})
epochTimeUpdated = true
break
case 'EpochEnd':
logger.debug(`Consumed epoch ${epoch} end event`)
await redis.set('activeEpoch', false)
activeEpoch = false
epochTimeIntervalsQueue.send({
blockNumber: curBlockNumber,
prolongedTime: curBlockTimestamp * 1000,
epoch
})
break
default:
logger.warn('Unknown event %o', event)
}
i += 1
}
if (!epochTimeUpdated && epoch > 0 && activeEpoch) {
epochTimeIntervalsQueue.send({
blockNumber: curBlockNumber,
prolongedTime: curBlockTimestamp * 1000,
epoch
})
}
if ((curBlockNumber + 1 - epochStart) % rangeSize === 0) {
logger.info('Reached end of the current block range')

View File

@ -0,0 +1,9 @@
{
"extends": [
"../../../.eslintrc"
],
"rules": {
"import/no-extraneous-dependencies": 0,
"node/no-extraneous-require": 0
}
}

View File

@ -1,16 +1,11 @@
const amqp = require('amqplib')
const logger = require('./logger')
const { retry } = require('./wait')
async function connectRabbit(url) {
while (true) {
try {
return (await amqp.connect(url)).createChannel()
} catch (e) {
logger.debug('Failed to connect to rabbitmqServer, reconnecting')
await new Promise((resolve) => setTimeout(resolve, 2000))
}
}
logger.info('Connecting to RabbitMQ server')
return (await retry(() => amqp.connect(url))).createChannel()
}
async function assertQueue(channel, name) {

View File

@ -19,7 +19,6 @@ async function confirmKeygen(keysFile) {
}
async function main() {
logger.info('Connecting to RabbitMQ server')
const channel = await connectRabbit(RABBITMQ_URL)
logger.info('Connecting to epoch events queue')
const keygenQueue = await assertQueue(channel, 'keygenQueue')

View File

@ -130,7 +130,7 @@ function sign(keysFile, hash, tx, publicKey, signerAddress) {
let nonceInterrupt = false
return new Promise((resolve) => {
const cmd = exec.execFile('./sign-entrypoint.sh', [PROXY_URL, keysFile, hash], async (error) => {
logger.debug('%o', error)
logger.trace('Sign entrypoint exited, %o', error)
clearInterval(nonceDaemonIntervalId)
clearTimeout(restartTimeoutId)
if (fs.existsSync('signature')) { // if signature was generated
@ -195,7 +195,6 @@ function sign(keysFile, hash, tx, publicKey, signerAddress) {
}
async function main() {
logger.info('Connecting to RabbitMQ server')
channel = await connectRabbit(RABBITMQ_URL)
logger.info('Connecting to signature events queue')
exchangeQueue = await assertQueue(channel, 'exchangeQueue')

View File

@ -1,6 +1,6 @@
const axios = require('axios')
const {retry} = require('./wait')
const { retry } = require('./wait')
function createController(validatorId) {
const url = `http://validator${validatorId}_proxy_1:8002/`