Feed: Fix processing of potentially liquidatable accounts

This commit is contained in:
Christian Kamm 2022-02-04 16:45:22 +01:00
parent ff53e19294
commit 42969d1d38
2 changed files with 56 additions and 6 deletions

40
src/AsyncBlockingQueue.ts Normal file
View File

@ -0,0 +1,40 @@
// from https://stackoverflow.com/questions/47157428/how-to-implement-a-pseudo-blocking-async-queue-in-js-ts
export class AsyncBlockingQueue<T> {
private _promises: Promise<T>[];
private _resolvers: ((t: T) => void)[];
constructor() {
this._resolvers = [];
this._promises = [];
}
private _add() {
this._promises.push(new Promise(resolve => {
this._resolvers.push(resolve);
}));
}
enqueue(t: T) {
if (!this._resolvers.length) this._add();
const resolve = this._resolvers.shift()!;
resolve(t);
}
dequeue(): Promise<T> {
if (!this._promises.length) this._add();
const promise = this._promises.shift()!;
return promise;
}
isEmpty() {
return !this._promises.length;
}
isBlocked() {
return !!this._resolvers.length;
}
get length() {
return this._promises.length - this._resolvers.length;
}
}

View File

@ -30,6 +30,7 @@ import axios from 'axios';
import * as Env from 'dotenv';
import envExpand from 'dotenv-expand';
import {Client as RpcWebSocketClient} from 'rpc-websockets';
import { AsyncBlockingQueue } from './AsyncBlockingQueue';
envExpand(Env.config());
@ -310,10 +311,10 @@ async function maybeLiquidateAccount(mangoAccount: MangoAccount): Promise<boolea
return true;
}
async function newAccountOnLiquidatableFeed(params) {
console.log(`Checking health of Account ${params.account}...`);
async function newAccountOnLiquidatableFeed(account) {
console.log(`Checking health of Account ${account}...`);
try {
const mangoAccountKey = new PublicKey(params.account);
const mangoAccountKey = new PublicKey(account);
const mangoAccount = new MangoAccount(mangoAccountKey, null);
[cache, liqorMangoAccount, ] = await Promise.all([
@ -343,17 +344,26 @@ async function newAccountOnLiquidatableFeed(params) {
// never returns
async function liquidatableFromLiquidatableFeed() {
let candidates = new AsyncBlockingQueue<string>();
let candidatesSet = new Set<string>();
const ws = new RpcWebSocketClient(liquidatableFeedWebsocketAddress, {
max_reconnects: Infinity,
});
ws.on('open', (x) => console.log("opened liquidatable feed"));
ws.on('error', (status) => console.log("error on liquidatable feed", status));
ws.on('close', (err) => console.log("closed liquidatable feed", err));
ws.on('liquidatable', newAccountOnLiquidatableFeed);
ws.on('candidate', (params) => {
const account = params.account;
if (!candidatesSet.has(account)) {
candidatesSet.add(account);
candidates.enqueue(account);
}
});
// fully reactive now
while (true) {
await sleep(5000);
const account = await candidates.dequeue();
candidatesSet.delete(account);
await newAccountOnLiquidatableFeed(account);
}
}