Compare commits

...

3 Commits

Author SHA1 Message Date
Christian Kamm c3d18aa04f
Merge pull request #16 from blockworks-foundation/ckamm/liquidatable-feed
Support using the liquidatable accounts feed as data source
2022-02-07 17:28:02 +01:00
Christian Kamm 42969d1d38 Feed: Fix processing of potentially liquidatable accounts 2022-02-04 20:44:04 +01:00
Christian Kamm ff53e19294 Support using the liquidatable accounts feed as data source 2022-02-04 12:35:47 +01:00
2 changed files with 174 additions and 62 deletions

40
src/AsyncBlockingQueue.ts Normal file
View File

@ -0,0 +1,40 @@
// from https://stackoverflow.com/questions/47157428/how-to-implement-a-pseudo-blocking-async-queue-in-js-ts
export class AsyncBlockingQueue<T> {
private _promises: Promise<T>[];
private _resolvers: ((t: T) => void)[];
constructor() {
this._resolvers = [];
this._promises = [];
}
private _add() {
this._promises.push(new Promise(resolve => {
this._resolvers.push(resolve);
}));
}
enqueue(t: T) {
if (!this._resolvers.length) this._add();
const resolve = this._resolvers.shift()!;
resolve(t);
}
dequeue(): Promise<T> {
if (!this._promises.length) this._add();
const promise = this._promises.shift()!;
return promise;
}
isEmpty() {
return !this._promises.length;
}
isBlocked() {
return !!this._resolvers.length;
}
get length() {
return this._promises.length - this._resolvers.length;
}
}

View File

@ -29,6 +29,8 @@ import { Orderbook } from '@project-serum/serum/lib/market';
import axios from 'axios';
import * as Env from 'dotenv';
import envExpand from 'dotenv-expand';
import {Client as RpcWebSocketClient} from 'rpc-websockets';
import { AsyncBlockingQueue } from './AsyncBlockingQueue';
envExpand(Env.config());
@ -39,6 +41,7 @@ const refreshAccountsInterval = parseInt(
const refreshWebsocketInterval = parseInt(
process.env.INTERVAL_WEBSOCKET || '300000',
);
const liquidatableFeedWebsocketAddress = process.env.LIQUIDATABLE_FEED_WEBSOCKET_ADDRESS;
const rebalanceInterval = parseInt(process.env.INTERVAL_REBALANCE || '10000');
const checkTriggers = process.env.CHECK_TRIGGERS
? process.env.CHECK_TRIGGERS === 'true'
@ -52,19 +55,15 @@ const config = new Config(IDS);
const cluster = (process.env.CLUSTER || 'mainnet') as Cluster;
const groupName = process.env.GROUP || 'mainnet.1';
const groupIds = config.getGroup(cluster, groupName);
if (!groupIds) {
throw new Error(`Group ${groupName} not found`);
}
const groupIds = config.getGroup(cluster, groupName) ?? (() => { throw new Error(`Group ${groupName} not found`); })();
// Target values to keep in spot, ordered the same as in mango client's ids.json
// Example:
//
// MNGO BTC ETH SOL USDT SRM RAY COPE FTT MSOL
// TARGETS=0 0 0 1 0 0 0 0 0 0
const TARGETS = process.env.TARGETS
? process.env.TARGETS.replace(/\s+/g,' ').trim().split(' ').map((s) => parseFloat(s))
: [0, 0, 0, 0, 0, 0, 0, 0, 0];
const TARGETS = process.env.TARGETS?.replace(/\s+/g,' ').trim().split(' ').map((s) => parseFloat(s))
?? [0, 0, 0, 0, 0, 0, 0, 0, 0];
const mangoProgramId = groupIds.mangoProgramId;
const mangoGroupKey = groupIds.publicKey;
@ -86,16 +85,19 @@ const client = new MangoClient(connection, mangoProgramId);
let mangoSubscriptionId = -1;
let dexSubscriptionId = -1;
let mangoGroup: MangoGroup;
let cache: MangoCache;
let liqorMangoAccount: MangoAccount;
let spotMarkets: Market[];
let perpMarkets: PerpMarket[];
let rootBanks: (RootBank | undefined)[];
async function main() {
if (!groupIds) {
throw new Error(`Group ${groupName} not found`);
}
console.log(`Starting liquidator for ${groupName}...`);
console.log(`RPC Endpoint: ${rpcEndpoint}`);
const mangoGroup = await client.getMangoGroup(mangoGroupKey);
let cache = await mangoGroup.loadCache(connection);
let liqorMangoAccount: MangoAccount;
mangoGroup = await client.getMangoGroup(mangoGroupKey);
cache = await mangoGroup.loadCache(connection);
try {
if (process.env.LIQOR_PK) {
@ -128,14 +130,9 @@ async function main() {
console.error(`Error loading liqor Mango Account: ${err}`);
return;
}
console.log(`Liqor Public Key: ${liqorMangoAccount.publicKey.toBase58()}`);
let mangoAccounts: MangoAccount[] = [];
await refreshAccounts(mangoGroup, mangoAccounts);
watchAccounts(groupIds.mangoProgramId, mangoGroup, mangoAccounts);
const perpMarkets = await Promise.all(
perpMarkets = await Promise.all(
groupIds.perpMarkets.map((perpMarket) => {
return mangoGroup.loadPerpMarket(
connection,
@ -145,7 +142,7 @@ async function main() {
);
}),
);
const spotMarkets = await Promise.all(
spotMarkets = await Promise.all(
groupIds.spotMarkets.map((spotMarket) => {
return Market.load(
connection,
@ -155,9 +152,22 @@ async function main() {
);
}),
);
const rootBanks = await mangoGroup.loadRootBanks(connection);
rootBanks = await mangoGroup.loadRootBanks(connection);
notify(`V3 Liquidator launched for group ${groupName}`);
if (liquidatableFeedWebsocketAddress) {
await liquidatableFromLiquidatableFeed();
} else {
await liquidatableFromSolanaRpc();
}
}
// never returns
async function liquidatableFromSolanaRpc() {
let mangoAccounts: MangoAccount[] = [];
await refreshAccounts(mangoGroup, mangoAccounts);
watchAccounts(groupIds.mangoProgramId, mangoGroup, mangoAccounts);
// eslint-disable-next-line
while (true) {
try {
@ -225,44 +235,9 @@ async function main() {
// Reload mango account to make sure still liquidatable
await mangoAccount.reload(connection, mangoGroup.dexProgramId);
if (!mangoAccount.isLiquidatable(mangoGroup, cache)) {
console.log(
`Account ${mangoAccountKeyString} no longer liquidatable`,
);
continue;
}
const health = mangoAccount.getHealthRatio(mangoGroup, cache, 'Maint');
const accountInfoString = mangoAccount.toPrettyString(
groupIds,
mangoGroup,
cache,
);
console.log(
`Sick account ${mangoAccountKeyString} health ratio: ${health.toString()}\n${accountInfoString}`,
);
notify(`Sick account\n${accountInfoString}`);
try {
await liquidateAccount(
mangoGroup,
cache,
spotMarkets,
rootBanks,
perpMarkets,
mangoAccount,
liqorMangoAccount,
);
console.log('Liquidated account', mangoAccountKeyString);
notify(`Liquidated account ${mangoAccountKeyString}`);
} catch (err: any) {
console.error(
`Failed to liquidate account ${mangoAccountKeyString}: ${err}`,
);
notify(
`Failed to liquidate account ${mangoAccountKeyString}: ${err}`,
);
} finally {
const liquidated = await maybeLiquidateAccount(mangoAccount);
if (liquidated) {
await balanceAccount(
mangoGroup,
liqorMangoAccount,
@ -291,6 +266,107 @@ async function main() {
}
}
async function maybeLiquidateAccount(mangoAccount: MangoAccount): Promise<boolean> {
const mangoAccountKeyString = mangoAccount.publicKey.toBase58();
if (!mangoAccount.isLiquidatable(mangoGroup, cache)) {
console.log(
`Account ${mangoAccountKeyString} no longer liquidatable`,
);
return false;
}
const health = mangoAccount.getHealthRatio(mangoGroup, cache, 'Maint');
const accountInfoString = mangoAccount.toPrettyString(
groupIds,
mangoGroup,
cache,
);
console.log(
`Sick account ${mangoAccountKeyString} health ratio: ${health.toString()}\n${accountInfoString}`,
);
notify(`Sick account\n${accountInfoString}`);
try {
await liquidateAccount(
mangoGroup,
cache,
spotMarkets,
rootBanks,
perpMarkets,
mangoAccount,
liqorMangoAccount,
);
console.log('Liquidated account', mangoAccountKeyString);
notify(`Liquidated account ${mangoAccountKeyString}`);
} catch (err: any) {
console.error(
`Failed to liquidate account ${mangoAccountKeyString}: ${err}`,
);
notify(
`Failed to liquidate account ${mangoAccountKeyString}: ${err}`,
);
}
return true;
}
async function newAccountOnLiquidatableFeed(account) {
console.log(`Checking health of Account ${account}...`);
try {
const mangoAccountKey = new PublicKey(account);
const mangoAccount = new MangoAccount(mangoAccountKey, null);
[cache, liqorMangoAccount, ] = await Promise.all([
mangoGroup.loadCache(connection),
liqorMangoAccount.reload(connection, mangoGroup.dexProgramId),
mangoAccount.reload(connection, mangoGroup.dexProgramId),
]);
const liquidated = await maybeLiquidateAccount(mangoAccount);
if (liquidated) {
cache = await mangoGroup.loadCache(connection);
await liqorMangoAccount.reload(connection, mangoGroup.dexProgramId);
// Check need to rebalance again after checking accounts
await balanceAccount(
mangoGroup,
liqorMangoAccount,
cache,
spotMarkets,
perpMarkets,
);
}
} catch (err) {
console.error('Error liquidating account:', err);
}
}
// never returns
async function liquidatableFromLiquidatableFeed() {
let candidates = new AsyncBlockingQueue<string>();
let candidatesSet = new Set<string>();
const ws = new RpcWebSocketClient(liquidatableFeedWebsocketAddress, {
max_reconnects: Infinity,
});
ws.on('open', (x) => console.log("opened liquidatable feed"));
ws.on('error', (status) => console.log("error on liquidatable feed", status));
ws.on('close', (err) => console.log("closed liquidatable feed", err));
ws.on('candidate', (params) => {
const account = params.account;
if (!candidatesSet.has(account)) {
candidatesSet.add(account);
candidates.enqueue(account);
}
});
while (true) {
const account = await candidates.dequeue();
candidatesSet.delete(account);
await newAccountOnLiquidatableFeed(account);
}
}
function watchAccounts(
mangoProgramId: PublicKey,
mangoGroup: MangoGroup,
@ -436,10 +512,6 @@ async function processTriggerOrders(
perpMarkets: PerpMarket[],
mangoAccount: MangoAccount,
) {
if (!groupIds) {
throw new Error(`Group ${groupName} not found`);
}
for (let i = 0; i < mangoAccount.advancedOrders.length; i++) {
const order = mangoAccount.advancedOrders[i];
if (!(order.perpTrigger && order.perpTrigger.isActive)) {