Relayer enhancements

This commit is contained in:
Bruce Riley 2022-01-21 20:08:47 +00:00
parent ff8ed82cd4
commit e3ed8862fe
3 changed files with 329 additions and 159 deletions

View File

@ -2,12 +2,15 @@
SPY_SERVICE_HOST=localhost:7073
SPY_SERVICE_FILTERS=[{"chain_id":2,"emitter_address":"0x000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7"},{"chain_id":5,"emitter_address":"0x000000000000000000000000377d55a7928c046e18eebb61977e714d2a76472a"}]
ETH_PROVIDER=https://goerli.infura.io/v3/ff29215fde9c4193bc25b344f10cefc0
POLYGON_PROVIDER=https://polygon-mumbai.infura.io/v3/ff29215fde9c4193bc25b344f10cefc0
ETH_PROVIDER=https://goerli.infura.io/v3/your_project_id
POLYGON_PROVIDER=https://polygon-mumbai.infura.io/v3/your_project_id
ETH_TOKEN_BRIDGE_ADDRESS=0xF890982f9310df57d00f659cf4fd87e65adEd8d7
POLYGON_TOKEN_BRIDGE_ADDRESS=0x377D55a7928c046E18eEbb61977e714d2a76472a
# If these are defined, use them instead of the ones defined in the code.
#ETH_CONTRACT_ADDRESS=0x51687A2F951Fa9A78D5B4F278A9B23BEFeA2CC4f
#POLYGON_CONTRACT_ADDRESS=0x10C6B4c7669f1604459d59D29559D05003a07143
ETH_CONTRACT_ADDRESS=0x61D26732B190bdc5771e2a2b3ADB295e1b5A88BF
POLYGON_CONTRACT_ADDRESS=0xc5Ba16A974a0c0E7935285d99F496Ee65eDFB8BA
WALLET_PRIVATE_KEY=your_key_here

View File

@ -14,7 +14,9 @@
"@types/jest": "^27.0.2",
"@types/long": "^4.0.1",
"@types/node": "^16.6.1",
"async-mutex": "^0.3.2",
"axios": "^0.24.0",
"condition-variable": "^1.0.0",
"esm": "^3.2.25",
"ethers": "^5.5.3",
"jest": "^27.3.1",

View File

@ -1,3 +1,6 @@
import { Mutex } from "async-mutex";
let CondVar = require("condition-variable");
import {
ChainId,
CHAIN_ID_SOLANA,
@ -7,6 +10,7 @@ import {
getEmitterAddressEth,
getEmitterAddressSolana,
getEmitterAddressTerra,
getIsTransferCompletedEth,
} from "@certusone/wormhole-sdk";
import {
@ -48,16 +52,33 @@ type OurEnvironment = {
wallet_private_key: string;
eth_contract_address: string;
polygon_contract_address: string;
eth_token_bridge_address: string;
polygon_token_bridge_address: string;
};
type TargetContractData = {
name: string;
contractAddress: string;
tokenBridgeAddress: string;
contract: ethers.Contract;
provider: ethers.providers.StaticJsonRpcProvider;
wallet: ethers.Wallet;
contractWithSigner: ethers.Contract;
};
type Type3Payload = {
contractAddress: string;
relayerFee: ethers.BigNumber;
swapFunctionType: number;
swapCurrencyType: number;
};
type PendingEvent = {
vaaBytes: string;
t3Payload: Type3Payload;
receiveTime: Date;
};
setDefaultWasm("node");
let success: boolean;
@ -66,6 +87,11 @@ let env: OurEnvironment;
let ethContractData: TargetContractData = null;
let polygonContractData: TargetContractData = null;
let seqMap = new Map<string, number>();
const mutex = new Mutex();
let condition = new CondVar();
let pendingQueue = new Array<PendingEvent>();
if (success) {
logger.info(
@ -83,6 +109,7 @@ if (success) {
}
if (success) {
run_worker();
spy_listen();
}
}
@ -104,6 +131,14 @@ function loadConfig(): [boolean, OurEnvironment] {
logger.error("Missing environment variable WALLET_PRIVATE_KEY");
return [false, undefined];
}
if (!process.env.ETH_TOKEN_BRIDGE_ADDRESS) {
logger.error("Missing environment variable ETH_TOKEN_BRIDGE_ADDRESS");
return [false, undefined];
}
if (!process.env.POLYGON_TOKEN_BRIDGE_ADDRESS) {
logger.error("Missing environment variable POLYGON_TOKEN_BRIDGE_ADDRESS");
return [false, undefined];
}
return [
true,
@ -115,6 +150,8 @@ function loadConfig(): [boolean, OurEnvironment] {
wallet_private_key: process.env.WALLET_PRIVATE_KEY,
eth_contract_address: process.env.ETH_CONTRACT_ADDRESS,
polygon_contract_address: process.env.POLYGON_CONTRACT_ADDRESS,
eth_token_bridge_address: process.env.ETH_TOKEN_BRIDGE_ADDRESS,
polygon_token_bridge_address: process.env.POLYGON_TOKEN_BRIDGE_ADDRESS,
},
];
}
@ -182,33 +219,44 @@ async function encodeEmitterAddress(
return getEmitterAddressEth(emitterAddressStr);
}
type Type3Payload = {
contractAddress: string;
relayerFee: ethers.BigNumber;
swapFunctionType: number;
swapCurrencyType: number;
};
async function processVaa(vaaBytes) {
async function processVaa(vaaBytes: string) {
let receiveTime = new Date();
logger.debug("processVaa");
const { parse_vaa } = await importCoreWasm();
const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes));
logger.debug("processVaa: parsedVAA: %o", parsedVAA);
let emitter_chain_id: number = parsedVAA.emitter_chain;
let emitter_address: string = uint8ArrayToHex(parsedVAA.emitter_address);
let sequence: number = parsedVAA.sequence;
let seqNumKey: string =
parsedVAA.emitter_chain.toString() + ":" + emitter_address;
let lastSeqNum = seqMap.get(seqNumKey);
if (lastSeqNum) {
if (lastSeqNum >= parsedVAA.sequence) {
logger.debug(
"ignoring duplicate: emitter: [" +
seqNumKey +
"], seqNum: " +
parsedVAA.sequence
);
return;
}
}
seqMap.set(seqNumKey, parsedVAA.sequence);
let payload_type: number = parsedVAA.payload[0];
let t3Payload = decodeSignedVAAPayloadType3(parsedVAA);
if (t3Payload) {
if (isOurContract(t3Payload.contractAddress)) {
logger.info(
"relaying type 3: emitter: [" +
emitter_chain_id +
"enqueuing type 3 vaa: emitter: [" +
parsedVAA.emitter_chain +
":" +
emitter_address +
"], seqNum: " +
sequence +
parsedVAA.sequence +
", contractAddress: [" +
t3Payload.contractAddress +
"], relayerFee: [" +
@ -220,19 +268,34 @@ async function processVaa(vaaBytes) {
"]"
);
try {
await relayVaa(vaaBytes, t3Payload);
} catch (e) {
logger.error("failed to relay type 3 vaa: %o", e);
}
await postVaa(vaaBytes, t3Payload, receiveTime);
} else {
logger.info(
"dropping vaa: emitter: [" +
emitter_chain_id +
logger.debug(
"dropping type 3 vaa for unsupported contract: emitter: [" +
parsedVAA.emitter_chain +
":" +
emitter_address +
"], seqNum: " +
sequence +
parsedVAA.sequence +
", contractAddress: [" +
t3Payload.contractAddress +
"], relayerFee: [" +
t3Payload.relayerFee +
"], swapFunctionType: [" +
t3Payload.swapFunctionType +
"], swapCurrencyType: [" +
t3Payload.swapCurrencyType +
"]"
);
}
} else {
logger.debug(
"dropping vaa: emitter: [" +
parsedVAA.emitter_chain +
":" +
emitter_address +
"], seqNum: " +
parsedVAA.sequence +
" payloadType: " +
payload_type
);
@ -243,11 +306,6 @@ function decodeSignedVAAPayloadType3(parsedVAA: any): Type3Payload {
const payload = Buffer.from(new Uint8Array(parsedVAA.payload));
const version = payload.readUInt8(0);
// if (version !== 1) {
// return undefined;
// }
// return true;
if (version !== 3) {
return undefined;
}
@ -259,6 +317,97 @@ function decodeSignedVAAPayloadType3(parsedVAA: any): Type3Payload {
};
}
function isOurContract(contractAddress: string): boolean {
return (
contractAddress === ethContractData.contractAddress ||
contractAddress === polygonContractData.contractAddress
);
}
async function postVaa(
vaaBytes: any,
t3Payload: Type3Payload,
receiveTime: Date
) {
let event: PendingEvent = {
vaaBytes: vaaBytes,
t3Payload: t3Payload,
receiveTime: receiveTime,
};
await mutex.runExclusive(() => {
pendingQueue.push(event);
logger.debug(
"posting event, there are now " + pendingQueue.length + " enqueued events"
);
if (condition) {
logger.debug("hitting condition variable.");
condition.complete(true);
}
});
}
const COND_VAR_TIMEOUT = 10000;
async function run_worker() {
await mutex.runExclusive(async () => {
await condition.wait(COND_VAR_TIMEOUT, callBack);
});
}
async function callBack(err: any, result: any) {
// logger.debug(
// "entering callback, pendingEvents: " +
// pendingQueue.length +
// ", err: %o, result: %o",
// err,
// result
// );
let done = false;
do {
let currEvent: PendingEvent = null;
await mutex.runExclusive(async () => {
condition = null;
if (pendingQueue.length !== 0) {
currEvent = pendingQueue[0];
pendingQueue.pop();
} else {
done = true;
condition = new CondVar();
await condition.wait(COND_VAR_TIMEOUT, callBack);
}
});
if (currEvent) {
logger.debug("in callback, relaying event.");
try {
await relayVaa(currEvent.vaaBytes, currEvent.t3Payload);
} catch (e) {
logger.error("failed to relay type 3 vaa: %o", e);
}
await mutex.runExclusive(async () => {
if (pendingQueue.length === 0) {
logger.debug(
"in callback, no more pending events, rearming the condition."
);
done = true;
condition = new CondVar();
await condition.wait(COND_VAR_TIMEOUT, callBack);
} else {
logger.debug(
"in callback, there are " + pendingQueue.length + " pending events."
);
}
});
}
} while (!done);
// logger.debug("leaving callback.");
}
// Ethereum (Goerli) set up
function makeEthContractData(): TargetContractData {
let overridden: boolean = false;
@ -275,18 +424,22 @@ function makeEthContractData(): TargetContractData {
if (overridden) {
logger.info(
"Connecting to Ethereum: node [" +
env.eth_provider_url +
"], overriding contract address to [" +
"Connecting to Ethereum: overriding contract address: [" +
contractAddress +
"], node: [" +
env.eth_provider_url +
"], token bridge address: [" +
env.eth_token_bridge_address +
"]"
);
} else {
logger.info(
"Connecting to Ethereum: node [" +
env.eth_provider_url +
"], contract address [" +
"Connecting to Ethereum: contract address: [" +
contractAddress +
"], node: [" +
env.eth_provider_url +
"], token bridge address: [" +
env.eth_token_bridge_address +
"]"
);
}
@ -305,7 +458,9 @@ function makeEthContractData(): TargetContractData {
const contractWithSigner = contract.connect(wallet);
return {
name: "Ethereum",
contractAddress: contractAddress,
tokenBridgeAddress: env.eth_token_bridge_address,
contract: contract,
provider: provider,
wallet: wallet,
@ -329,18 +484,22 @@ function makePolygonContractData(): TargetContractData {
if (overridden) {
logger.info(
"Connecting to Polygon: node [" +
env.polygon_provider_url +
"], overriding contract address to [" +
"Connecting to Polygon: overriding contract address: [" +
contractAddress +
"], node: [" +
env.polygon_provider_url +
"], token bridge address: [" +
env.polygon_token_bridge_address +
"]"
);
} else {
logger.info(
"Connecting to Polygon: node [" +
env.polygon_provider_url +
"], contract address [" +
"Connecting to Polygon: contract address: [" +
contractAddress +
"], node: [" +
env.polygon_provider_url +
"], token bridge address: [" +
env.polygon_token_bridge_address +
"]"
);
}
@ -359,7 +518,9 @@ function makePolygonContractData(): TargetContractData {
const contractWithSigner = contract.connect(wallet);
return {
name: "Polygon",
contractAddress: contractAddress,
tokenBridgeAddress: env.polygon_token_bridge_address,
contract: contract,
provider: provider,
wallet: wallet,
@ -401,7 +562,7 @@ async function relayVaa(vaaBytes: string, t3Payload: Type3Payload) {
exactIn = true;
} else if (t3Payload.swapFunctionType !== 2) {
logger.error(
"unable to relay vaa: unsupported swapFunctionType: [" +
"relayVaa: unsupported swapFunctionType: [" +
t3Payload.swapFunctionType +
"]"
);
@ -412,13 +573,13 @@ async function relayVaa(vaaBytes: string, t3Payload: Type3Payload) {
native = true;
} else if (t3Payload.swapCurrencyType !== 2) {
logger.error(
"unable to relay vaa: unsupported swapCurrencyType: [" +
"relayVaa: unsupported swapCurrencyType: [" +
t3Payload.swapCurrencyType +
"]"
);
}
logger.info(
logger.debug(
"relayVaa: contractAddress: [" +
t3Payload.contractAddress +
"], ethContract: [" +
@ -429,54 +590,96 @@ async function relayVaa(vaaBytes: string, t3Payload: Type3Payload) {
);
if (t3Payload.contractAddress === ethContractData.contractAddress) {
await relayVaaToEth(signedVaaArray, exactIn, native);
await relayVaaToChain(ethContractData, signedVaaArray, exactIn, native);
} else if (
t3Payload.contractAddress === polygonContractData.contractAddress
) {
await relayVaaToPolygon(signedVaaArray, exactIn, native);
await relayVaaToChain(polygonContractData, signedVaaArray, exactIn, native);
} else {
logger.error(
"unable to relay vaa: unsupported contract: [" +
"relayVaa: unexpected contract: [" +
t3Payload.contractAddress +
"], this should not happen!"
);
}
}
async function relayVaaToChain(
tcd: TargetContractData,
signedVaaArray: Uint8Array,
exactIn: boolean,
native: boolean
) {
try {
logger.debug(
"relayVaaTo" +
tcd.name +
": checking if already redeemed on " +
tcd.name +
" using tokenBridgeAddress [" +
tcd.tokenBridgeAddress +
"]"
);
}
}
async function relayVaaToEth(
signedVaaArray: Uint8Array,
exactIn: boolean,
native: boolean
) {
logger.info("relayVaaToEth: exactIn: " + exactIn + ", native: " + native);
const alreadyRedeemed = await getIsTransferCompletedEth(
tcd.tokenBridgeAddress,
tcd.provider,
signedVaaArray
);
if (alreadyRedeemed) {
logger.info(
"relayVaaTo" +
tcd.name +
": exactIn: " +
exactIn +
", native: " +
native +
": already transferred!"
);
return;
}
} catch (e) {
logger.error(
"relayVaaTo" +
tcd.name +
": failed to check if transfer is already complete, will attempt the transfer, e: %o",
e
);
}
logger.info(
"relayVaaTo" + tcd.name + ": exactIn: " + exactIn + ", native: " + native
);
if (exactIn) {
if (native) {
await swap
.swapExactInFromVaaNativeV3(
ethContractData.contractWithSigner,
signedVaaArray
)
.swapExactInFromVaaNative(tcd.contractWithSigner, signedVaaArray)
.then((receipt) => {
logger.info("relayVaaToEth: %o", receipt.transactionHash);
logger.info(
"relayVaaTo" + tcd.name + ": %o",
receipt.transactionHash
);
})
.catch((error) => {
logger.error(
"relayVaaToEth: transaction failed: %o",
"relayVaaTo" + tcd.name + ": transaction failed: %o",
error.transactionHash
);
});
} else {
await swap
.swapExactInFromVaaTokenV3(
ethContractData.contractWithSigner,
signedVaaArray
)
.swapExactInFromVaaToken(tcd.contractWithSigner, signedVaaArray)
.then((receipt) => {
logger.info("relayVaaToEth: %o", receipt.transactionHash);
logger.info(
"relayVaaTo" + tcd.name + ": %o",
receipt.transactionHash
);
})
.catch((error) => {
logger.error(
"relayVaaToEth: transaction failed: %o",
"relayVaaTo" + tcd.name + ": transaction failed: %o",
error.transactionHash
);
});
@ -484,108 +687,70 @@ async function relayVaaToEth(
} else {
if (native) {
await swap
.swapExactOutFromVaaNativeV3(
ethContractData.contractWithSigner,
signedVaaArray
)
.swapExactOutFromVaaNative(tcd.contractWithSigner, signedVaaArray)
.then((receipt) => {
logger.info("relayVaaToEth: %o", receipt.transactionHash);
logger.info(
"relayVaaTo" + tcd.name + ": %o",
receipt.transactionHash
);
})
.catch((error) => {
logger.error(
"relayVaaToEth: transaction failed: %o",
"relayVaaTo" + tcd.name + ": transaction failed: %o",
error.transactionHash
);
});
} else {
await swap
.swapExactOutFromVaaTokenV3(
ethContractData.contractWithSigner,
signedVaaArray
)
.swapExactOutFromVaaToken(tcd.contractWithSigner, signedVaaArray)
.then((receipt) => {
logger.info("relayVaaToEth: %o", receipt.transactionHash);
logger.info(
"relayVaaTo" + tcd.name + ": %o",
receipt.transactionHash
);
})
.catch((error) => {
logger.error(
"relayVaaToEth: transaction failed: %o",
"relayVaaTo" + tcd.name + ": transaction failed: %o",
error.transactionHash
);
});
}
}
}
async function relayVaaToPolygon(
signedVaaArray: Uint8Array,
exactIn: boolean,
native: boolean
) {
logger.info("relayVaaToPolygon: exactIn: " + exactIn + ", native: " + native);
if (exactIn) {
if (native) {
await swap
.swapExactInFromVaaNativeV2(
polygonContractData.contractWithSigner,
signedVaaArray
)
.then((receipt) => {
logger.info("relayVaaToPolygon: %o", receipt.transactionHash);
})
.catch((error) => {
logger.error(
"relayVaaToPolygon: transaction failed: %o",
error.transactionHash
try {
logger.debug(
"relayVaaTo" +
tcd.name +
": checking if redeemed on " +
tcd.name +
" using tokenBridgeAddress [" +
tcd.tokenBridgeAddress +
"]"
);
});
} else {
await swap
.swapExactInFromVaaTokenV2(
polygonContractData.contractWithSigner,
const redeemed = await getIsTransferCompletedEth(
tcd.tokenBridgeAddress,
tcd.provider,
signedVaaArray
)
.then((receipt) => {
logger.info("relayVaaToPolygon: %o", receipt.transactionHash);
})
.catch((error) => {
logger.error(
"relayVaaToPolygon: transaction failed: %o",
error.transactionHash
);
});
}
} else {
if (native) {
await swap
.swapExactOutFromVaaNativeV2(
polygonContractData.contractWithSigner,
signedVaaArray
)
.then((receipt) => {
logger.info("relayVaaToPolygon: %o", receipt.transactionHash);
})
.catch((error) => {
logger.error(
"relayVaaToPolygon: transaction failed: %o",
error.transactionHash
logger.info(
"relayVaaTo" +
tcd.name +
": exactIn: " +
exactIn +
", native: " +
native +
": redeemed: " +
redeemed
);
});
} else {
await swap
.swapExactOutFromVaaTokenV2(
polygonContractData.contractWithSigner,
signedVaaArray
)
.then((receipt) => {
logger.info("relayVaaToPolygon: %o", receipt.transactionHash);
})
.catch((error) => {
} catch (e) {
logger.error(
"relayVaaToPolygon: transaction failed: %o",
error.transactionHash
"relayVaaTo" +
tcd.name +
": failed to check if transfer completed, e: %o",
e
);
});
}
}
}