Refactored PollingLiquidator into ReportingAccountLiquidator and LiquidationProcessor

* ReportingAccountLiquidator wraps an AccountLiquidator but provides before-and-after logging and reporting.
* Sped up fetch_token_prices() by reducing it to one network fetch
* Centralised loading of ripe mangoes by creating MarginAccount.load_all_ripe()
* Clearer, more consistent use of Observables
* Added RxPy Backpressure library
* Improved container building to allow for different tags
This commit is contained in:
Geoff Taylor 2021-05-21 14:28:42 +01:00
parent 6058468a99
commit 8b8afe1a0b
14 changed files with 773 additions and 398 deletions

3
.gitignore vendored
View File

@ -14,6 +14,9 @@ report.state
__pycache__
.mypy_cache
# Don't check in any experimental commands
bin/experimental*
# Don't check in anything that might have credentials.
configuration.json
wallet.json

View File

@ -17,3 +17,6 @@ ignore_missing_imports = True
[mypy-pandas.*]
ignore_missing_imports = True
[mypy-rxpy_backpressure.*]
ignore_missing_imports = True

View File

@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "markdown",
"id": "armed-matrix",
"id": "simple-simulation",
"metadata": {},
"source": [
"# ⚠ Warning\n",
@ -16,7 +16,7 @@
},
{
"cell_type": "markdown",
"id": "verbal-reason",
"id": "abstract-collector",
"metadata": {},
"source": [
"# 🥭 AccountLiquidator\n",
@ -31,7 +31,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "smooth-appeal",
"id": "smart-gamma",
"metadata": {
"jupyter": {
"source_hidden": true
@ -40,20 +40,22 @@
"outputs": [],
"source": [
"import abc\n",
"import datetime\n",
"import logging\n",
"import typing\n",
"\n",
"from solana.transaction import Transaction\n",
"\n",
"from BaseModel import Group, MarginAccount, TokenValue\n",
"from BaseModel import Group, LiquidationEvent, MarginAccount, MarginAccountMetadata, TokenValue\n",
"from Context import Context\n",
"from Instructions import ForceCancelOrdersInstructionBuilder, InstructionBuilder, LiquidateInstructionBuilder\n",
"from Observables import EventSource\n",
"from Wallet import Wallet\n"
]
},
{
"cell_type": "markdown",
"id": "backed-deficit",
"id": "modular-strike",
"metadata": {},
"source": [
"## 💧 AccountLiquidator class\n",
@ -64,7 +66,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "certified-blend",
"id": "peripheral-journey",
"metadata": {},
"outputs": [],
"source": [
@ -73,13 +75,17 @@
" self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n",
"\n",
" @abc.abstractmethod\n",
" def prepare_instructions(self, group: Group, margin_account: MarginAccount, prices: typing.List[TokenValue]) -> typing.List[InstructionBuilder]:\n",
" raise NotImplementedError(\"AccountLiquidator.prepare_instructions() is not implemented on the base type.\")\n",
"\n",
" @abc.abstractmethod\n",
" def liquidate(self, group: Group, margin_account: MarginAccount, prices: typing.List[TokenValue]) -> typing.Optional[str]:\n",
" raise NotImplementedError(\"AccountLiquidator.liquidate() is not implemented on the base type.\")\n"
]
},
{
"cell_type": "markdown",
"id": "medieval-voltage",
"id": "loved-fairy",
"metadata": {},
"source": [
"## 🌬️ NullAccountLiquidator class\n",
@ -90,7 +96,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "diagnostic-aruba",
"id": "invalid-object",
"metadata": {},
"outputs": [],
"source": [
@ -98,6 +104,9 @@
" def __init__(self):\n",
" super().__init__()\n",
"\n",
" def prepare_instructions(self, group: Group, margin_account: MarginAccount, prices: typing.List[TokenValue]) -> typing.List[InstructionBuilder]:\n",
" return []\n",
"\n",
" def liquidate(self, group: Group, margin_account: MarginAccount, prices: typing.List[TokenValue]) -> typing.Optional[str]:\n",
" self.logger.info(f\"Skipping liquidation of margin account [{margin_account.address}]\")\n",
" return None\n"
@ -105,7 +114,7 @@
},
{
"cell_type": "markdown",
"id": "difficult-majority",
"id": "vietnamese-machine",
"metadata": {},
"source": [
"## 💧 ActualAccountLiquidator class\n",
@ -118,7 +127,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "mechanical-scoop",
"id": "unable-violence",
"metadata": {},
"outputs": [],
"source": [
@ -163,7 +172,7 @@
},
{
"cell_type": "markdown",
"id": "forbidden-sense",
"id": "adopted-lighter",
"metadata": {},
"source": [
"# 🌪️ ForceCancelOrdersAccountLiquidator class\n",
@ -180,7 +189,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "selected-instrument",
"id": "explicit-secondary",
"metadata": {},
"outputs": [],
"source": [
@ -206,7 +215,80 @@
},
{
"cell_type": "markdown",
"id": "dense-projection",
"id": "protected-latitude",
"metadata": {},
"source": [
"# 📝 ReportingAccountLiquidator class\n",
"\n",
"This class takes a regular `AccountLiquidator` and wraps its `liquidate()` call in some useful reporting."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "integrated-carrier",
"metadata": {},
"outputs": [],
"source": [
"class ReportingAccountLiquidator(AccountLiquidator):\n",
" def __init__(self, inner: AccountLiquidator, context: Context, wallet: Wallet, liquidations_publisher: EventSource[LiquidationEvent]):\n",
" super().__init__()\n",
" self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n",
" self.inner: AccountLiquidator = inner\n",
" self.context: Context = context\n",
" self.wallet: Wallet = wallet\n",
" self.liquidations_publisher: EventSource[LiquidationEvent] = liquidations_publisher\n",
"\n",
" def prepare_instructions(self, group: Group, margin_account: MarginAccount, prices: typing.List[TokenValue]) -> typing.List[InstructionBuilder]:\n",
" return self.inner.prepare_instructions(group, margin_account, prices)\n",
"\n",
" def liquidate(self, group: Group, margin_account: MarginAccount, prices: typing.List[TokenValue]) -> typing.Optional[str]:\n",
" balance_sheet = margin_account.get_balance_sheet_totals(group, prices)\n",
" balances = margin_account.get_intrinsic_balances(group)\n",
" mam = MarginAccountMetadata(margin_account, balance_sheet, balances)\n",
"\n",
" balances_before = group.fetch_balances(self.wallet.address)\n",
" self.logger.info(\"Wallet balances before:\")\n",
" TokenValue.report(self.logger.info, balances_before)\n",
"\n",
" self.logger.info(f\"Margin account balances before:\\n{mam.balances}\")\n",
" self.logger.info(f\"Liquidating margin account: {mam.margin_account}\\n{mam.balance_sheet}\")\n",
" transaction_id = self.inner.liquidate(group, mam.margin_account, prices)\n",
" if transaction_id is None:\n",
" self.logger.info(\"No transaction sent.\")\n",
" else:\n",
" self.logger.info(f\"Transaction ID: {transaction_id} - waiting for confirmation...\")\n",
"\n",
" self.context.wait_for_confirmation(transaction_id)\n",
"\n",
" group_after = Group.load(self.context)\n",
" margin_account_after_liquidation = MarginAccount.load(self.context, mam.margin_account.address, group_after)\n",
" intrinsic_balances_after = margin_account_after_liquidation.get_intrinsic_balances(group_after)\n",
" self.logger.info(f\"Margin account balances after: {intrinsic_balances_after}\")\n",
"\n",
" self.logger.info(\"Wallet Balances After:\")\n",
" balances_after = group_after.fetch_balances(self.wallet.address)\n",
" TokenValue.report(self.logger.info, balances_after)\n",
"\n",
" liquidation_event = LiquidationEvent(datetime.datetime.now(),\n",
" transaction_id,\n",
" self.wallet.address,\n",
" margin_account_after_liquidation.address,\n",
" balances_before,\n",
" balances_after)\n",
"\n",
" self.logger.info(\"Wallet Balances Changes:\")\n",
" changes = TokenValue.changes(balances_before, balances_after)\n",
" TokenValue.report(self.logger.info, changes)\n",
"\n",
" self.liquidations_publisher.publish(liquidation_event)\n",
"\n",
" return transaction_id\n"
]
},
{
"cell_type": "markdown",
"id": "seven-merchandise",
"metadata": {},
"source": [
"# 🏃 Running"
@ -215,7 +297,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "virgin-idaho",
"id": "expected-transcript",
"metadata": {},
"outputs": [],
"source": [

View File

@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "markdown",
"id": "golden-restriction",
"id": "dedicated-dispatch",
"metadata": {},
"source": [
"# ⚠ Warning\n",
@ -16,7 +16,7 @@
},
{
"cell_type": "markdown",
"id": "comic-harvard",
"id": "excessive-jacket",
"metadata": {},
"source": [
"# 🥭 BaseModel\n",
@ -33,7 +33,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "architectural-corps",
"id": "raised-printer",
"metadata": {
"jupyter": {
"source_hidden": true
@ -45,6 +45,7 @@
"import datetime\n",
"import enum\n",
"import logging\n",
"import time\n",
"import typing\n",
"\n",
"import Layouts as layouts\n",
@ -66,7 +67,7 @@
},
{
"cell_type": "markdown",
"id": "developmental-proceeding",
"id": "dependent-economy",
"metadata": {},
"source": [
"## Version enum\n",
@ -77,7 +78,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "regular-enclosure",
"id": "piano-poison",
"metadata": {},
"outputs": [],
"source": [
@ -92,7 +93,7 @@
},
{
"cell_type": "markdown",
"id": "monthly-apollo",
"id": "dried-custom",
"metadata": {},
"source": [
"## InstructionType enum\n",
@ -103,7 +104,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "juvenile-northeast",
"id": "turkish-houston",
"metadata": {},
"outputs": [],
"source": [
@ -132,7 +133,7 @@
},
{
"cell_type": "markdown",
"id": "abandoned-certification",
"id": "spectacular-proceeding",
"metadata": {},
"source": [
"## AccountInfo class\n"
@ -141,7 +142,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "twenty-kernel",
"id": "unsigned-baltimore",
"metadata": {},
"outputs": [],
"source": [
@ -201,7 +202,7 @@
},
{
"cell_type": "markdown",
"id": "august-jonathan",
"id": "serial-script",
"metadata": {},
"source": [
"## AddressableAccount class\n",
@ -214,7 +215,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "accepted-stereo",
"id": "referenced-grove",
"metadata": {},
"outputs": [],
"source": [
@ -233,7 +234,7 @@
},
{
"cell_type": "markdown",
"id": "great-boston",
"id": "gentle-benefit",
"metadata": {},
"source": [
"## SerumAccountFlags class\n",
@ -244,7 +245,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "indoor-rugby",
"id": "presidential-massachusetts",
"metadata": {},
"outputs": [],
"source": [
@ -287,7 +288,7 @@
},
{
"cell_type": "markdown",
"id": "female-internship",
"id": "bibliographic-documentation",
"metadata": {},
"source": [
"## MangoAccountFlags class\n",
@ -298,7 +299,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "interested-samoa",
"id": "labeled-flood",
"metadata": {},
"outputs": [],
"source": [
@ -331,7 +332,7 @@
},
{
"cell_type": "markdown",
"id": "synthetic-architecture",
"id": "brown-contrary",
"metadata": {},
"source": [
"## Index class"
@ -340,7 +341,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "varying-suite",
"id": "obvious-ghost",
"metadata": {},
"outputs": [],
"source": [
@ -367,7 +368,7 @@
},
{
"cell_type": "markdown",
"id": "weighted-canvas",
"id": "nearby-kelly",
"metadata": {},
"source": [
"## AggregatorConfig class"
@ -376,7 +377,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "changing-sociology",
"id": "living-survey",
"metadata": {},
"outputs": [],
"source": [
@ -409,7 +410,7 @@
},
{
"cell_type": "markdown",
"id": "annual-reserve",
"id": "chinese-morris",
"metadata": {},
"source": [
"## Round class"
@ -418,7 +419,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "falling-constant",
"id": "based-boring",
"metadata": {},
"outputs": [],
"source": [
@ -443,7 +444,7 @@
},
{
"cell_type": "markdown",
"id": "informal-acoustic",
"id": "settled-absence",
"metadata": {},
"source": [
"## Answer class"
@ -452,7 +453,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "assigned-madagascar",
"id": "artistic-archives",
"metadata": {},
"outputs": [],
"source": [
@ -478,7 +479,7 @@
},
{
"cell_type": "markdown",
"id": "enormous-highlight",
"id": "behavioral-minority",
"metadata": {},
"source": [
"## Aggregator class"
@ -487,7 +488,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "verbal-devil",
"id": "boxed-library",
"metadata": {},
"outputs": [],
"source": [
@ -553,7 +554,7 @@
},
{
"cell_type": "markdown",
"id": "extended-rachel",
"id": "atomic-shooting",
"metadata": {},
"source": [
"## Token class\n",
@ -564,7 +565,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "following-startup",
"id": "boring-heritage",
"metadata": {},
"outputs": [],
"source": [
@ -619,7 +620,7 @@
},
{
"cell_type": "markdown",
"id": "coordinated-sunrise",
"id": "supreme-state",
"metadata": {},
"source": [
"## SolToken object\n",
@ -630,7 +631,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "unavailable-genealogy",
"id": "unavailable-knock",
"metadata": {},
"outputs": [],
"source": [
@ -639,7 +640,7 @@
},
{
"cell_type": "markdown",
"id": "pretty-encyclopedia",
"id": "discrete-bride",
"metadata": {},
"source": [
"## TokenLookup class\n",
@ -657,7 +658,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "hundred-penetration",
"id": "entire-gregory",
"metadata": {},
"outputs": [],
"source": [
@ -683,7 +684,7 @@
},
{
"cell_type": "markdown",
"id": "sensitive-closing",
"id": "charming-fiction",
"metadata": {},
"source": [
"## BasketToken class\n",
@ -694,7 +695,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "acquired-indonesia",
"id": "developing-cooper",
"metadata": {},
"outputs": [],
"source": [
@ -749,7 +750,7 @@
},
{
"cell_type": "markdown",
"id": "fallen-analyst",
"id": "mighty-portfolio",
"metadata": {},
"source": [
"## TokenValue class\n",
@ -760,7 +761,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "convenient-first",
"id": "sufficient-bidding",
"metadata": {},
"outputs": [],
"source": [
@ -845,7 +846,7 @@
},
{
"cell_type": "markdown",
"id": "brutal-testament",
"id": "steady-knitting",
"metadata": {},
"source": [
"## OwnedTokenValue class\n",
@ -856,7 +857,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "important-desktop",
"id": "engaged-camel",
"metadata": {},
"outputs": [],
"source": [
@ -896,7 +897,7 @@
},
{
"cell_type": "markdown",
"id": "broadband-george",
"id": "blond-edgar",
"metadata": {},
"source": [
"## MarketMetadata class"
@ -905,7 +906,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "complicated-times",
"id": "conscious-spoke",
"metadata": {},
"outputs": [],
"source": [
@ -941,7 +942,7 @@
},
{
"cell_type": "markdown",
"id": "circular-amber",
"id": "ranging-consideration",
"metadata": {},
"source": [
"## Group class"
@ -950,7 +951,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "fifty-shape",
"id": "minus-hazard",
"metadata": {},
"outputs": [],
"source": [
@ -1041,11 +1042,24 @@
" return -1\n",
"\n",
" def fetch_token_prices(self) -> typing.List[TokenValue]:\n",
" oracles = map(lambda market: Aggregator.load(self.context, market.oracle), self.markets)\n",
" started_at = time.time()\n",
"\n",
" # Note: we can just load the oracle data in a simpler way, with:\n",
" # oracles = map(lambda market: Aggregator.load(self.context, market.oracle), self.markets)\n",
" # but that makes a network request for every oracle. We can reduce that to just one request\n",
" # if we use AccountInfo.load_multiple() and parse the data ourselves.\n",
" #\n",
" # This seems to halve the time this function takes.\n",
" oracle_addresses = list([market.oracle for market in self.markets])\n",
" oracle_account_infos = AccountInfo.load_multiple(self.context, oracle_addresses)\n",
" oracles = map(lambda oracle_account_info: Aggregator.parse(self.context, oracle_account_info), oracle_account_infos)\n",
" prices = list(map(lambda oracle: oracle.price, oracles)) + [Decimal(1)]\n",
" token_prices = []\n",
" for index, price in enumerate(prices):\n",
" token_prices += [TokenValue(self.basket_tokens[index].token, price)]\n",
"\n",
" time_taken = time.time() - started_at\n",
" self.logger.info(f\"Faster fetching prices complete. Time taken: {time_taken:.2f} seconds.\")\n",
" return token_prices\n",
"\n",
" def fetch_balances(self, root_address: PublicKey) -> typing.List[TokenValue]:\n",
@ -1088,7 +1102,7 @@
},
{
"cell_type": "markdown",
"id": "acute-testing",
"id": "fifty-maker",
"metadata": {},
"source": [
"## TokenAccount class"
@ -1097,7 +1111,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "covered-calibration",
"id": "fiscal-flush",
"metadata": {},
"outputs": [],
"source": [
@ -1181,7 +1195,7 @@
},
{
"cell_type": "markdown",
"id": "intellectual-comfort",
"id": "second-sentence",
"metadata": {},
"source": [
"## OpenOrders class\n"
@ -1190,7 +1204,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "delayed-arkansas",
"id": "confident-leadership",
"metadata": {},
"outputs": [],
"source": [
@ -1310,7 +1324,7 @@
},
{
"cell_type": "markdown",
"id": "disabled-ratio",
"id": "comparative-geography",
"metadata": {},
"source": [
"## BalanceSheet class"
@ -1319,7 +1333,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "guilty-storm",
"id": "southeast-rachel",
"metadata": {},
"outputs": [],
"source": [
@ -1366,7 +1380,7 @@
},
{
"cell_type": "markdown",
"id": "martial-walnut",
"id": "enabling-right",
"metadata": {},
"source": [
"## MarginAccount class\n"
@ -1375,7 +1389,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "narrow-sphere",
"id": "featured-nancy",
"metadata": {},
"outputs": [],
"source": [
@ -1445,6 +1459,15 @@
" return margin_accounts\n",
"\n",
" @staticmethod\n",
" def load_all_for_group_with_open_orders(context: Context, program_id: PublicKey, group: Group) -> typing.List[\"MarginAccount\"]:\n",
" margin_accounts = MarginAccount.load_all_for_group(context, context.program_id, group)\n",
" open_orders = OpenOrders.load_raw_open_orders_account_infos(context, group)\n",
" for margin_account in margin_accounts:\n",
" margin_account.install_open_orders_accounts(group, open_orders)\n",
"\n",
" return margin_accounts\n",
"\n",
" @staticmethod\n",
" def load_all_for_owner(context: Context, owner: PublicKey, group: typing.Optional[Group] = None) -> typing.List[\"MarginAccount\"]:\n",
" if group is None:\n",
" group = Group.load(context)\n",
@ -1472,6 +1495,33 @@
" margin_accounts += [margin_account]\n",
" return margin_accounts\n",
"\n",
" @classmethod\n",
" def load_all_ripe(cls, context: Context) -> typing.List[\"MarginAccount\"]:\n",
" logger: logging.Logger = logging.getLogger(cls.__name__)\n",
"\n",
" started_at = time.time()\n",
"\n",
" group = Group.load(context)\n",
" margin_accounts = MarginAccount.load_all_for_group_with_open_orders(context, context.program_id, group)\n",
" logger.info(f\"Fetched {len(margin_accounts)} margin accounts to process.\")\n",
"\n",
" prices = group.fetch_token_prices()\n",
" nonzero: typing.List[MarginAccountMetadata] = []\n",
" for margin_account in margin_accounts:\n",
" balance_sheet = margin_account.get_balance_sheet_totals(group, prices)\n",
" if balance_sheet.collateral_ratio > 0:\n",
" balances = margin_account.get_intrinsic_balances(group)\n",
" nonzero += [MarginAccountMetadata(margin_account, balance_sheet, balances)]\n",
" logger.info(f\"Of those {len(margin_accounts)}, {len(nonzero)} have a nonzero collateral ratio.\")\n",
"\n",
" ripe_metadata = filter(lambda mam: mam.balance_sheet.collateral_ratio <= group.init_coll_ratio, nonzero)\n",
" ripe_accounts = list(map(lambda mam: mam.margin_account, ripe_metadata))\n",
" logger.info(f\"Of those {len(nonzero)}, {len(ripe_accounts)} are ripe 🥭.\")\n",
"\n",
" time_taken = time.time() - started_at\n",
" logger.info(f\"Loading ripe 🥭 accounts complete. Time taken: {time_taken:.2f} seconds.\")\n",
" return ripe_accounts\n",
"\n",
" def load_open_orders_accounts(self, context: Context, group: Group) -> None:\n",
" for index, oo in enumerate(self.open_orders):\n",
" key = oo\n",
@ -1578,7 +1628,7 @@
},
{
"cell_type": "markdown",
"id": "spiritual-hello",
"id": "portuguese-bearing",
"metadata": {},
"source": [
"## MarginAccountMetadata class"
@ -1587,7 +1637,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "sonic-jackson",
"id": "breathing-emission",
"metadata": {},
"outputs": [],
"source": [
@ -1613,7 +1663,7 @@
},
{
"cell_type": "markdown",
"id": "committed-contrast",
"id": "virtual-patient",
"metadata": {},
"source": [
"# Events"
@ -1621,7 +1671,7 @@
},
{
"cell_type": "markdown",
"id": "special-pasta",
"id": "backed-facial",
"metadata": {},
"source": [
"## LiquidationEvent"
@ -1630,7 +1680,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "painted-stylus",
"id": "recognized-swiss",
"metadata": {},
"outputs": [],
"source": [
@ -1660,7 +1710,7 @@
},
{
"cell_type": "markdown",
"id": "entire-laptop",
"id": "civil-myanmar",
"metadata": {},
"source": [
"# ✅ Testing"
@ -1669,7 +1719,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "numerous-polls",
"id": "overhead-tongue",
"metadata": {},
"outputs": [],
"source": [
@ -1713,7 +1763,7 @@
},
{
"cell_type": "markdown",
"id": "monthly-research",
"id": "coordinate-injection",
"metadata": {},
"source": [
"# 🏃 Running"
@ -1722,7 +1772,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "quality-marketing",
"id": "equivalent-scroll",
"metadata": {},
"outputs": [],
"source": [

272
LiquidationProcessor.ipynb Normal file
View File

@ -0,0 +1,272 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "opposite-territory",
"metadata": {},
"source": [
"# ⚠ Warning\n",
"\n",
"THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\n",
"\n",
"[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gl/OpinionatedGeek%2Fmango-explorer/HEAD?filepath=LiquidationProcessor.ipynb) _🏃 To run this notebook press the ⏩ icon in the toolbar above._\n",
"\n",
"[🥭 Mango Markets](https://mango.markets/) support is available at: [Docs](https://docs.mango.markets/) | [Discord](https://discord.gg/67jySBhxrg) | [Twitter](https://twitter.com/mangomarkets) | [Github](https://github.com/blockworks-foundation) | [Email](mailto:hello@blockworks.foundation)"
]
},
{
"cell_type": "markdown",
"id": "thorough-portland",
"metadata": {},
"source": [
"# 🥭 Liquidation Processor\n",
"\n",
"This notebook contains a liquidator that makes heavy use of [RX Observables](https://rxpy.readthedocs.io/en/latest/reference_observable.html).\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dependent-metro",
"metadata": {
"jupyter": {
"source_hidden": true
}
},
"outputs": [],
"source": [
"import logging\n",
"import rx\n",
"import rx.operators as ops\n",
"import time\n",
"import typing\n",
"\n",
"from AccountLiquidator import AccountLiquidator\n",
"from BaseModel import Group, LiquidationEvent, MarginAccount, MarginAccountMetadata, TokenValue\n",
"from Context import Context\n",
"from Observables import EventSource\n",
"from WalletBalancer import NullWalletBalancer, WalletBalancer\n"
]
},
{
"cell_type": "markdown",
"id": "ecological-salem",
"metadata": {},
"source": [
"# 💧 LiquidationProcessor class\n",
"\n",
"An `AccountLiquidator` liquidates a `MarginAccount`. A `LiquidationProcessor` processes a list of `MarginAccount`s, determines if they're liquidatable, and calls an `AccountLiquidator` to do the work."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "published-murder",
"metadata": {},
"outputs": [],
"source": [
"class LiquidationProcessor:\n",
" def __init__(self, context: Context, account_liquidator: AccountLiquidator, wallet_balancer: WalletBalancer, worthwhile_threshold: float = 0.01):\n",
" self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n",
" self.context: Context = context\n",
" self.account_liquidator: AccountLiquidator = account_liquidator\n",
" self.wallet_balancer: WalletBalancer = wallet_balancer\n",
" self.worthwhile_threshold: float = worthwhile_threshold\n",
" self.liquidations: EventSource[LiquidationEvent] = EventSource[LiquidationEvent]()\n",
" self.ripe_accounts: typing.Optional[typing.List[MarginAccount]] = None\n",
"\n",
" def update_margin_accounts(self, ripe_margin_accounts: typing.List[MarginAccount]):\n",
" self.logger.info(f\"Received {len(ripe_margin_accounts)} ripe 🥭 margin accounts to process.\")\n",
" self.ripe_accounts = ripe_margin_accounts\n",
"\n",
" def update_prices(self, prices):\n",
" started_at = time.time()\n",
"\n",
" if self.ripe_accounts is None:\n",
" self.logger.info(\"Ripe accounts is None - skipping\")\n",
" return\n",
"\n",
" self.logger.info(f\"Running on {len(self.ripe_accounts)} ripe accounts.\")\n",
" group = Group.load(self.context)\n",
" updated: typing.List[MarginAccountMetadata] = []\n",
" for margin_account in self.ripe_accounts:\n",
" balance_sheet = margin_account.get_balance_sheet_totals(group, prices)\n",
" balances = margin_account.get_intrinsic_balances(group)\n",
" updated += [MarginAccountMetadata(margin_account, balance_sheet, balances)]\n",
"\n",
" liquidatable = list(filter(lambda mam: mam.balance_sheet.collateral_ratio <= group.maint_coll_ratio, updated))\n",
" self.logger.info(f\"Of those {len(updated)}, {len(liquidatable)} are liquidatable.\")\n",
"\n",
" above_water = list(filter(lambda mam: mam.collateral_ratio > 1, liquidatable))\n",
" self.logger.info(f\"Of those {len(liquidatable)} liquidatable margin accounts, {len(above_water)} are 'above water' margin accounts with assets greater than their liabilities.\")\n",
"\n",
" worthwhile = list(filter(lambda mam: mam.assets - mam.liabilities > self.worthwhile_threshold, above_water))\n",
" self.logger.info(f\"Of those {len(above_water)} above water margin accounts, {len(worthwhile)} are worthwhile margin accounts with more than ${self.worthwhile_threshold} net assets.\")\n",
"\n",
" self._liquidate_all(group, prices, worthwhile)\n",
"\n",
" time_taken = time.time() - started_at\n",
" self.logger.info(f\"Check of all ripe 🥭 accounts complete. Time taken: {time_taken:.2f} seconds.\")\n",
"\n",
" def _liquidate_all(self, group: Group, prices: typing.List[TokenValue], to_liquidate: typing.List[MarginAccountMetadata]):\n",
" to_process = to_liquidate\n",
" while len(to_process) > 0:\n",
" highest_first = sorted(to_process, key=lambda mam: mam.assets - mam.liabilities, reverse=True)\n",
" highest = highest_first[0]\n",
" try:\n",
" self.account_liquidator.liquidate(group, highest.margin_account, prices)\n",
" self.wallet_balancer.balance(prices)\n",
"\n",
" updated_margin_account = MarginAccount.load(self.context, highest.margin_account.address, group)\n",
" balance_sheet = updated_margin_account.get_balance_sheet_totals(group, prices)\n",
" balances = updated_margin_account.get_intrinsic_balances(group)\n",
" updated_mam = MarginAccountMetadata(updated_margin_account, balance_sheet, balances)\n",
" if updated_mam.assets - updated_mam.liabilities > self.worthwhile_threshold:\n",
" self.logger.info(f\"Margin account {updated_margin_account.address} has been drained and is no longer worthwhile.\")\n",
" else:\n",
" self.logger.info(f\"Margin account {updated_margin_account.address} is still worthwhile - putting it back on list.\")\n",
" to_process += [updated_mam]\n",
" except Exception as exception:\n",
" self.logger.error(f\"Failed to liquidate account '{highest.margin_account.address}' - {exception}\")\n",
" finally:\n",
" # highest should always be in to_process, but we're outside the try-except block\n",
" # so let's be a little paranoid about it.\n",
" if highest in to_process:\n",
" to_process.remove(highest)\n"
]
},
{
"cell_type": "markdown",
"id": "polar-killer",
"metadata": {},
"source": [
"# 🏃 Running\n",
"\n",
"A quick example to show how to plug observables into the `LiquidationProcessor`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "rough-sending",
"metadata": {},
"outputs": [],
"source": [
"if __name__ == \"__main__\":\n",
" from AccountLiquidator import NullAccountLiquidator\n",
" from Context import default_context\n",
" from Observables import create_backpressure_skipping_observer, log_subscription_error\n",
" from Wallet import default_wallet\n",
"\n",
" from rx.scheduler import ThreadPoolScheduler\n",
"\n",
" if default_wallet is None:\n",
" raise Exception(\"No wallet\")\n",
"\n",
" pool_scheduler = ThreadPoolScheduler(2)\n",
"\n",
" def fetch_prices(context):\n",
" group = Group.load(context)\n",
"\n",
" def _fetch_prices(_):\n",
" return group.fetch_token_prices()\n",
"\n",
" return _fetch_prices\n",
"\n",
" def fetch_margin_accounts(context):\n",
" def _fetch_margin_accounts(_):\n",
" group = Group.load(context)\n",
" return MarginAccount.load_all_for_group_with_open_orders(context, context.program_id, group)\n",
" return _fetch_margin_accounts\n",
"\n",
" liquidation_processor = LiquidationProcessor(default_context, NullAccountLiquidator(), NullWalletBalancer())\n",
"\n",
" print(\"Starting margin account fetcher subscription\")\n",
" margin_account_interval = 60\n",
" margin_account_subscription = rx.interval(margin_account_interval).pipe(\n",
" ops.subscribe_on(pool_scheduler),\n",
" ops.start_with(-1),\n",
" ops.map(fetch_margin_accounts(default_context)),\n",
" ).subscribe(create_backpressure_skipping_observer(on_next=liquidation_processor.update_margin_accounts, on_error=log_subscription_error))\n",
"\n",
" print(\"Starting price fetcher subscription\")\n",
" price_interval = 2\n",
" price_subscription = rx.interval(price_interval).pipe(\n",
" ops.subscribe_on(pool_scheduler),\n",
" ops.map(fetch_prices(default_context))\n",
" ).subscribe(create_backpressure_skipping_observer(on_next=liquidation_processor.update_prices, on_error=log_subscription_error))\n",
"\n",
" print(\"Subscriptions created - now just running\")\n",
"\n",
" time.sleep(120)\n",
" print(\"Disposing\")\n",
" price_subscription.dispose()\n",
" margin_account_subscription.dispose()\n",
" print(\"Disposed\")\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": true
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "markdown",
"id": "gentle-dealing",
"id": "ordered-award",
"metadata": {},
"source": [
"# ⚠ Warning\n",
@ -16,7 +16,7 @@
},
{
"cell_type": "markdown",
"id": "brazilian-christian",
"id": "decreased-fighter",
"metadata": {},
"source": [
"# 🥭 Observables\n",
@ -27,23 +27,22 @@
{
"cell_type": "code",
"execution_count": null,
"id": "subsequent-experience",
"metadata": {
"jupyter": {
"source_hidden": true
}
},
"id": "chicken-definition",
"metadata": {},
"outputs": [],
"source": [
"import datetime\n",
"import logging\n",
"import rx\n",
"import rx.operators as ops\n",
"import typing\n"
"import typing\n",
"\n",
"from rxpy_backpressure import BackPressure\n"
]
},
{
"cell_type": "markdown",
"id": "scientific-diagram",
"id": "japanese-skiing",
"metadata": {},
"source": [
"# PrintingObserverSubscriber class\n",
@ -54,12 +53,13 @@
{
"cell_type": "code",
"execution_count": null,
"id": "ancient-exemption",
"id": "weird-powell",
"metadata": {},
"outputs": [],
"source": [
"class PrintingObserverSubscriber(rx.core.typing.Observer):\n",
" def __init__(self, report_no_output: bool) -> None:\n",
" super().__init__()\n",
" self.report_no_output = report_no_output\n",
"\n",
" def on_next(self, item: typing.Any) -> None:\n",
@ -77,7 +77,32 @@
},
{
"cell_type": "markdown",
"id": "opponent-assembly",
"id": "interesting-dream",
"metadata": {},
"source": [
"# TimestampedPrintingObserverSubscriber class\n",
"\n",
"Just like `PrintingObserverSubscriber` but it puts a timestamp on each printout."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "broad-apple",
"metadata": {},
"outputs": [],
"source": [
"class TimestampedPrintingObserverSubscriber(PrintingObserverSubscriber):\n",
" def __init__(self, report_no_output: bool) -> None:\n",
" super().__init__(report_no_output)\n",
"\n",
" def on_next(self, item: typing.Any) -> None:\n",
" super().on_next(f\"{datetime.datetime.now()}: {item}\")\n"
]
},
{
"cell_type": "markdown",
"id": "received-present",
"metadata": {},
"source": [
"# CollectingObserverSubscriber class\n",
@ -88,7 +113,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "vocational-adelaide",
"id": "basic-latin",
"metadata": {},
"outputs": [],
"source": [
@ -109,7 +134,7 @@
},
{
"cell_type": "markdown",
"id": "quantitative-exercise",
"id": "promotional-blair",
"metadata": {},
"source": [
"# CaptureFirstItem class\n",
@ -120,7 +145,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "sitting-heather",
"id": "figured-dress",
"metadata": {},
"outputs": [],
"source": [
@ -139,7 +164,69 @@
},
{
"cell_type": "markdown",
"id": "protected-pointer",
"id": "three-reasoning",
"metadata": {},
"source": [
"# FunctionObserver\n",
"\n",
"This class takes functions for `on_next()`, `on_error()` and `on_completed()` and returns an `Observer` object.\n",
"\n",
"This is mostly for libraries (like `rxpy_backpressure`) that take observers but not their component functions."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "olive-audit",
"metadata": {},
"outputs": [],
"source": [
"class FunctionObserver(rx.core.typing.Observer):\n",
" def __init__(self,\n",
" on_next: typing.Callable[[typing.Any], None],\n",
" on_error: typing.Callable[[Exception], None] = lambda _: None,\n",
" on_completed: typing.Callable[[], None] = lambda: None):\n",
" self._on_next = on_next\n",
" self._on_error = on_error\n",
" self._on_completed = on_completed\n",
"\n",
" def on_next(self, value: typing.Any) -> None:\n",
" self._on_next(value)\n",
"\n",
" def on_error(self, error: Exception) -> None:\n",
" self._on_error(error)\n",
"\n",
" def on_completed(self) -> None:\n",
" self._on_completed()\n"
]
},
{
"cell_type": "markdown",
"id": "concerned-participation",
"metadata": {},
"source": [
"# create_backpressure_skipping_observer function\n",
"\n",
"Creates an `Observer` that skips inputs if they are building up while a subscriber works.\n",
"\n",
"This is useful for situations that, say, poll every second but the operation can sometimes take multiple seconds to complete. In that case, the latest item will be immediately emitted and the in-between items skipped."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "promising-substance",
"metadata": {},
"outputs": [],
"source": [
"def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any], None], on_error: typing.Callable[[Exception], None] = lambda _: None, on_completed: typing.Callable[[], None] = lambda: None) -> rx.core.typing.Observer:\n",
" observer = FunctionObserver(on_next=on_next, on_error=on_error, on_completed=on_completed)\n",
" return BackPressure.LATEST(observer)\n"
]
},
{
"cell_type": "markdown",
"id": "boring-lemon",
"metadata": {},
"source": [
"# debug_print_item function\n",
@ -159,7 +246,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "entertaining-depth",
"id": "saved-reputation",
"metadata": {},
"outputs": [],
"source": [
@ -167,12 +254,77 @@
" def _debug_print_item(item: typing.Any) -> typing.Any:\n",
" print(title, item)\n",
" return item\n",
" return _debug_print_item\n"
" return ops.map(_debug_print_item)\n"
]
},
{
"cell_type": "markdown",
"id": "bulgarian-relative",
"id": "therapeutic-holder",
"metadata": {},
"source": [
"# log_subscription_error function\n",
"\n",
"Logs subscription exceptions to the root logger.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "serious-eugene",
"metadata": {},
"outputs": [],
"source": [
"def log_subscription_error(error: Exception) -> None:\n",
" logging.error(f\"Observable subscription error: {error}\")\n"
]
},
{
"cell_type": "markdown",
"id": "civil-chester",
"metadata": {},
"source": [
"# observable_pipeline_error_reporter function\n",
"\n",
"This intercepts and re-raises an exception, to help report on errors.\n",
"\n",
"RxPy pipelines are tricky to restart, so it's often easier to use the `ops.retry()` function in the pipeline. That just swallows the error though, so there's no way to know what was raised to cause the retry.\n",
"\n",
"Enter `observable_pipeline_error_reporter()`! Put it in a `catch` just before the `retry` and it should log the error properly.\n",
"\n",
"For example:\n",
"```\n",
"from rx import of, operators as ops\n",
"\n",
"def raise_on_every_third(item):\n",
" if (item % 3 == 0):\n",
" raise Exception(\"Divisible by 3\")\n",
" else:\n",
" return item\n",
"\n",
"sub1 = of(1, 2, 3, 4, 5, 6).pipe(\n",
" ops.map(lambda e : raise_on_every_third(e)),\n",
" ops.catch(observable_pipeline_error_reporter),\n",
" ops.retry(3)\n",
")\n",
"sub1.subscribe(lambda item: print(item), on_error = lambda error: print(f\"Error : {error}\"))\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "about-scheme",
"metadata": {},
"outputs": [],
"source": [
"def observable_pipeline_error_reporter(ex, _):\n",
" logging.error(f\"Intercepted error in observable pipeline: {ex}\")\n",
" raise ex\n"
]
},
{
"cell_type": "markdown",
"id": "qualified-basis",
"metadata": {},
"source": [
"# Events\n",
@ -183,7 +335,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "institutional-ivory",
"id": "fabulous-quarter",
"metadata": {},
"outputs": [],
"source": [
@ -195,8 +347,8 @@
" super().__init__()\n",
" self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n",
"\n",
" def on_next(self, value: typing.Type[TEventDatum]) -> None:\n",
" super().on_next(value)\n",
" def on_next(self, event: TEventDatum) -> None:\n",
" super().on_next(event)\n",
"\n",
" def on_error(self, ex: Exception) -> None:\n",
" super().on_error(ex)\n",
@ -204,8 +356,11 @@
" def on_completed(self) -> None:\n",
" super().on_completed()\n",
"\n",
" def publish(self, value: typing.Type[TEventDatum]) -> None:\n",
" self.on_next(value)\n",
" def publish(self, event: TEventDatum) -> None:\n",
" try:\n",
" self.on_next(event)\n",
" except Exception as exception:\n",
" self.logger.warning(f\"Failed to publish event '{event}' - {exception}\")\n",
"\n",
" def dispose(self) -> None:\n",
" super().dispose()\n"
@ -213,7 +368,7 @@
},
{
"cell_type": "markdown",
"id": "intermediate-catalog",
"id": "frequent-edition",
"metadata": {},
"source": [
"# 🏃 Running\n",
@ -224,7 +379,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "quiet-visibility",
"id": "extreme-crime",
"metadata": {},
"outputs": [],
"source": [

View File

@ -1,244 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "behavioral-thanksgiving",
"metadata": {},
"source": [
"# ⚠ Warning\n",
"\n",
"THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\n",
"\n",
"[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gl/OpinionatedGeek%2Fmango-explorer/HEAD?filepath=PollingLiquidator.ipynb) _🏃 To run this notebook press the ⏩ icon in the toolbar above._\n",
"\n",
"[🥭 Mango Markets](https://mango.markets/) support is available at: [Docs](https://docs.mango.markets/) | [Discord](https://discord.gg/67jySBhxrg) | [Twitter](https://twitter.com/mangomarkets) | [Github](https://github.com/blockworks-foundation) | [Email](mailto:hello@blockworks.foundation)"
]
},
{
"cell_type": "markdown",
"id": "supreme-harvest",
"metadata": {},
"source": [
"# 🥭 PollingLiquidator\n",
"\n",
"This notebook implements a simple polling approach to a liquidator. It loops, processes all possible liquidations, then sleeps."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "simplified-marking",
"metadata": {
"jupyter": {
"source_hidden": true
}
},
"outputs": [],
"source": [
"import datetime\n",
"import logging\n",
"import time\n",
"import typing\n",
"\n",
"from AccountLiquidator import AccountLiquidator\n",
"from BaseModel import Group, LiquidationEvent, MarginAccount, MarginAccountMetadata, OpenOrders, TokenValue\n",
"from Context import Context\n",
"from Observables import EventSource\n",
"from Wallet import Wallet\n",
"from WalletBalancer import WalletBalancer\n"
]
},
{
"cell_type": "markdown",
"id": "seeing-measurement",
"metadata": {},
"source": [
"# PollingLiquidator class\n",
"\n",
"In [Liquidation](Liquidation.ipynb) it says these are probably roughly the steps to run a liquidator:\n",
"\n",
"1. Find all liquidatable margin accounts.\n",
"2. Pick the most appropriate of these margin accounts, based on that account's collatoralisation and the liquidator's token balances.\n",
"3. Pick the market with the most value in the margin account's openorders accounts.\n",
"4. Force cancellation of all outstanding orders for the margin account in that market.\n",
"5. Pick the market with the highest borrows and lowest deposits for the account being liquidated.\n",
"6. Build and send the PartialLiquidate instruction.\n",
"7. Convert the received tokens to your desired tokens.\n",
"8. Repeat from step 2 (if necessary) with fresh tokens.\n",
"\n",
"The `PollingLiquidator` class performs steps 1, 2, 6, and 8. Steps 3, 4, and 5 are handled implicitly by the `AccountLiquidator` (in our case the `ForceCancelOrdersAccountLiquidator`). Step 7 is handled by the `Balancer`.\n",
"\n",
"That's not quite the whole story though.\n",
"\n",
"We want to be quick to react to potential liquidation events, so as an optimisation this class:\n",
"* Pulls down all margin accounts\n",
"* Selects all those accounts which have borrows and currently have a collateral ratio less than the Group's 'initial collateral ratio' - ripe 🥭 mangoes\n",
"* Polls for Group and price changes, and processes possible liquidations on those ripe 🥭 accounts.\n",
"\n",
"The reason for this is: pulling down and processing all margin accounts is processor and bandwidth intensive and takes a long time. The ripe 🥭 accounts are also less likely to change - liquidation events are triggered by price changes, not trades.\n",
"\n",
"Processing only the ripe 🥭 accounts in a tighter loop makes it more likely we'll come across a liquidation early enough to be able to liquidate it.\n",
"\n",
"This does also increase the potential of liquidating non-liquidatable accounts. For example, if we use cached data on a margin account's borrows and the user manually reduces those borrows, the `PollingLiquidator` could still try to liquidate it based on the outdated borrows. This may lead to an increase in `MangoErrorCode::NotLiquidatable` errors in the logs, but these typically aren't a problem."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "short-execution",
"metadata": {},
"outputs": [],
"source": [
"class PollingLiquidator:\n",
" def __init__(self, context: Context, wallet: Wallet, account_liquidator: AccountLiquidator, wallet_balancer: WalletBalancer, worthwhile_threshold: float = 0.01, throttle_to_seconds: int = 5, ripe_iterations: int = 10):\n",
" self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n",
" self.context: Context = context\n",
" self.wallet: Wallet = wallet\n",
" self.account_liquidator: AccountLiquidator = account_liquidator\n",
" self.wallet_balancer: WalletBalancer = wallet_balancer\n",
" self.worthwhile_threshold: float = worthwhile_threshold\n",
" self.throttle_to_seconds: int = throttle_to_seconds\n",
" self.ripe_iterations: int = ripe_iterations\n",
" self.liquidations: EventSource[LiquidationEvent] = EventSource[LiquidationEvent]()\n",
"\n",
" def run(self):\n",
" self.logger.info(\"Fetching all margin accounts...\")\n",
" group = Group.load(self.context)\n",
" prices = group.fetch_token_prices()\n",
" margin_accounts = MarginAccount.load_all_for_group(self.context, self.context.program_id, group)\n",
" open_orders = OpenOrders.load_raw_open_orders_account_infos(self.context, group)\n",
" for margin_account in margin_accounts:\n",
" margin_account.install_open_orders_accounts(group, open_orders)\n",
" self.logger.info(f\"Fetched {len(margin_accounts)} margin accounts to process.\")\n",
" nonzero: typing.List[MarginAccountMetadata] = []\n",
" for margin_account in margin_accounts:\n",
" balance_sheet = margin_account.get_balance_sheet_totals(group, prices)\n",
" if balance_sheet.collateral_ratio > 0:\n",
" balances = margin_account.get_intrinsic_balances(group)\n",
" nonzero += [MarginAccountMetadata(margin_account, balance_sheet, balances)]\n",
" self.logger.info(f\"Of those {len(margin_accounts)}, {len(nonzero)} have a nonzero collateral ratio.\")\n",
"\n",
" ripe_metadata = filter(lambda mam: mam.balance_sheet.collateral_ratio <= group.init_coll_ratio, nonzero)\n",
" ripe = list(map(lambda mam: mam.margin_account, ripe_metadata))\n",
" self.logger.info(f\"Of those {len(nonzero)}, {len(ripe)} are ripe 🥭.\")\n",
"\n",
" for counter in range(self.ripe_iterations):\n",
" self.logger.info(f\"Update {counter} of {self.ripe_iterations} - {len(ripe)} ripe 🥭 accounts.\")\n",
" started_at = time.time()\n",
"\n",
" group = Group.load(self.context)\n",
" prices = group.fetch_token_prices()\n",
" updated: typing.List[MarginAccountMetadata] = []\n",
" for margin_account in ripe:\n",
" balance_sheet = margin_account.get_balance_sheet_totals(group, prices)\n",
" balances = margin_account.get_intrinsic_balances(group)\n",
" updated += [MarginAccountMetadata(margin_account, balance_sheet, balances)]\n",
"\n",
" liquidatable = list(filter(lambda mam: mam.balance_sheet.collateral_ratio <= group.maint_coll_ratio, updated))\n",
" self.logger.info(f\"Of those {len(updated)}, {len(liquidatable)} are liquidatable.\")\n",
"\n",
" above_water = list(filter(lambda mam: mam.collateral_ratio > 1, liquidatable))\n",
" self.logger.info(f\"Of those {len(liquidatable)} liquidatable margin accounts, {len(above_water)} are 'above water' margin accounts with assets greater than their liabilities.\")\n",
"\n",
" worthwhile = list(filter(lambda mam: mam.assets - mam.liabilities > self.worthwhile_threshold, above_water))\n",
" self.logger.info(f\"Of those {len(above_water)} above water margin accounts, {len(worthwhile)} are worthwhile margin accounts with more than ${self.worthwhile_threshold} net assets.\")\n",
"\n",
" highest_first = sorted(worthwhile, key=lambda mam: mam.assets - mam.liabilities, reverse=True)\n",
" for mam in highest_first:\n",
" balances_before = group.fetch_balances(self.wallet.address)\n",
" self.logger.info(\"Wallet balances before:\")\n",
" TokenValue.report(self.logger.info, balances_before)\n",
"\n",
" self.logger.info(f\"Margin account balances before:\\n{mam.balances}\")\n",
" self.logger.info(f\"Liquidating margin account: {mam.margin_account}\\n{mam.balance_sheet}\")\n",
" transaction_id = self.account_liquidator.liquidate(group, mam.margin_account, prices)\n",
" if transaction_id is None:\n",
" self.logger.info(\"No transaction sent.\")\n",
" else:\n",
" self.logger.info(f\"Transaction ID: {transaction_id} - waiting for confirmation...\")\n",
"\n",
" self.context.wait_for_confirmation(transaction_id)\n",
"\n",
" group_after = Group.load(self.context)\n",
" margin_account_after_liquidation = MarginAccount.load(self.context, mam.margin_account.address, group_after)\n",
" intrinsic_balances_after = margin_account_after_liquidation.get_intrinsic_balances(group_after)\n",
" self.logger.info(f\"Margin account balances after: {intrinsic_balances_after}\")\n",
"\n",
" self.logger.info(\"Wallet Balances After:\")\n",
" balances_after = group_after.fetch_balances(self.wallet.address)\n",
" TokenValue.report(self.logger.info, balances_after)\n",
"\n",
" liquidation_event = LiquidationEvent(datetime.datetime.now(),\n",
" transaction_id,\n",
" self.wallet.address,\n",
" margin_account_after_liquidation.address,\n",
" balances_before,\n",
" balances_after)\n",
"\n",
" self.logger.info(\"Wallet Balances Changes:\")\n",
" changes = TokenValue.changes(balances_before, balances_after)\n",
" TokenValue.report(self.logger.info, changes)\n",
"\n",
" self.liquidations.publish(liquidation_event)\n",
" self.wallet_balancer.balance(prices)\n",
"\n",
" time_taken = time.time() - started_at\n",
" should_sleep_for = self.throttle_to_seconds - int(time_taken)\n",
" sleep_for = max(should_sleep_for, 0)\n",
" self.logger.info(f\"Check of all ripe 🥭 accounts complete. Time taken: {time_taken:.2f} seconds, sleeping for {sleep_for} seconds...\")\n",
" time.sleep(sleep_for)\n"
]
},
{
"cell_type": "markdown",
"id": "arranged-times",
"metadata": {},
"source": [
"# 🏃 Running"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "physical-zealand",
"metadata": {},
"outputs": [],
"source": [
"if __name__ == \"__main__\":\n",
" logging.getLogger().setLevel(logging.INFO)\n",
"\n",
" from AccountLiquidator import NullAccountLiquidator\n",
" from Context import default_context\n",
" from Wallet import default_wallet\n",
" from WalletBalancer import NullWalletBalancer\n",
"\n",
" if default_wallet is None:\n",
" print(\"No default wallet file available.\")\n",
" else:\n",
" liquidator = PollingLiquidator(default_context, default_wallet, NullAccountLiquidator(), NullWalletBalancer())\n",
" liquidator.run()\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -97,6 +97,7 @@ Other notebooks are more user-oriented:
* [Python Decimal Class](https://docs.python.org/3/library/decimal.html)
* [Python Construct Library](https://construct.readthedocs.io/en/latest/)
* [Python Observables](https://rxpy.readthedocs.io/en/latest/)
* [RxPy Backpressure](https://github.com/daliclass/rxpy-backpressure)
* [Pyston](https://www.pyston.org/)
* [Flux Aggregator](https://github.com/octopus-network/solana-flux-aggregator)

View File

@ -29,10 +29,12 @@ import traceback
from solana.publickey import PublicKey
from AccountScout import AccountScout
from AccountLiquidator import AccountLiquidator, ForceCancelOrdersAccountLiquidator, NullAccountLiquidator
from BaseModel import Group, MarginAccount, TokenValue
from AccountLiquidator import AccountLiquidator, ForceCancelOrdersAccountLiquidator, NullAccountLiquidator, ReportingAccountLiquidator
from BaseModel import Group, LiquidationEvent, MarginAccount
from Constants import WARNING_DISCLAIMER_TEXT
from Context import Context, default_cluster, default_cluster_url, default_program_id, default_dex_program_id, default_group_name, default_group_id
from Observables import EventSource
from Notification import NotificationHandler, parse_subscription_target
from TransactionScout import TransactionScout
from Wallet import Wallet
@ -57,11 +59,20 @@ parser.add_argument("--log-level", default=logging.INFO, type=lambda level: geta
help="level of verbosity to log (possible values: DEBUG, INFO, WARNING, ERROR, CRITICAL)")
parser.add_argument("--margin-account-address", type=PublicKey,
help="Solana address of the Mango Markets margin account to be liquidated")
parser.add_argument("--notify-liquidations", type=parse_subscription_target, action="append", default=[],
help="The notification target for liquidation events")
parser.add_argument("--notify-errors", type=parse_subscription_target, action="append", default=[],
help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
for notify in args.notify_errors:
handler = NotificationHandler(notify)
handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(handler)
logging.warning(WARNING_DISCLAIMER_TEXT)
try:
@ -91,24 +102,19 @@ try:
logging.info("Wallet accounts OK.")
balances_before = group.fetch_balances(wallet.address)
print("Wallet balances before:")
TokenValue.report(print, balances_before)
print(f" SOL balance: {context.fetch_sol_balance(wallet.address):>18,.8f}")
for basket_token in group.basket_tokens:
balance = TokenValue.fetch_total_value(context, wallet.address, basket_token.token)
print(f"{basket_token.token.name:>7} balance: {balance.value:>18,.8f}")
prices = group.fetch_token_prices()
margin_account = MarginAccount.load(context, margin_account_address, group)
intrinsic_balance_sheets_before = margin_account.get_intrinsic_balance_sheets(group)
print("Margin Account Before:", intrinsic_balance_sheets_before)
liquidations_publisher = EventSource[LiquidationEvent]()
liquidations_publisher.subscribe(on_next=lambda event: logging.info(str(TransactionScout.load(context, event.signature))))
for notification_target in args.notify_liquidations:
liquidations_publisher.subscribe(on_next=notification_target.send)
if args.dry_run:
account_liquidator: AccountLiquidator = NullAccountLiquidator()
else:
account_liquidator = ForceCancelOrdersAccountLiquidator(context, wallet)
intermediate = ForceCancelOrdersAccountLiquidator(context, wallet)
account_liquidator = ReportingAccountLiquidator(intermediate, context, wallet, liquidations_publisher)
prices = group.fetch_token_prices()
margin_account = MarginAccount.load(context, margin_account_address, group)
transaction_id = account_liquidator.liquidate(group, margin_account, prices)
if transaction_id is None:
@ -122,15 +128,6 @@ try:
transaction_scout = TransactionScout.load(context, transaction_id)
print(str(transaction_scout))
group_after = Group.load(context)
margin_account_after_liquidation = MarginAccount.load(context, margin_account.address, group_after)
intrinsic_balance_sheets_after = margin_account_after_liquidation.get_intrinsic_balance_sheets(group_after)
print("Margin Account After:", intrinsic_balance_sheets_after)
balances_after = group_after.fetch_balances(wallet.address)
print("Wallet balances after:")
TokenValue.report(print, balances_after)
except Exception as exception:
logging.critical(f"Liquidator stopped because of exception: {exception} - {traceback.format_exc()}")
except:

View File

@ -30,12 +30,13 @@ import traceback
from decimal import Decimal
from AccountScout import AccountScout
from AccountLiquidator import AccountLiquidator, ForceCancelOrdersAccountLiquidator, NullAccountLiquidator
from BaseModel import Group
from AccountLiquidator import AccountLiquidator, ForceCancelOrdersAccountLiquidator, NullAccountLiquidator, ReportingAccountLiquidator
from BaseModel import Group, LiquidationEvent, MarginAccount
from Constants import WARNING_DISCLAIMER_TEXT
from Context import Context, default_cluster, default_cluster_url, default_program_id, default_dex_program_id, default_group_name, default_group_id
from LiquidationProcessor import LiquidationProcessor
from Notification import NotificationHandler, parse_subscription_target
from PollingLiquidator import PollingLiquidator
from Observables import EventSource
from TradeExecutor import SerumImmediateTradeExecutor
from TransactionScout import TransactionScout
from Wallet import Wallet
@ -119,10 +120,16 @@ try:
logging.info("Wallet accounts OK.")
liquidations_publisher = EventSource[LiquidationEvent]()
liquidations_publisher.subscribe(on_next=lambda event: logging.info(str(TransactionScout.load(context, event.signature))))
for notification_target in args.notify_liquidations:
liquidations_publisher.subscribe(on_next=notification_target.send)
if args.dry_run:
account_liquidator: AccountLiquidator = NullAccountLiquidator()
else:
account_liquidator = ForceCancelOrdersAccountLiquidator(context, wallet)
intermediate = ForceCancelOrdersAccountLiquidator(context, wallet)
account_liquidator = ReportingAccountLiquidator(intermediate, context, wallet, liquidations_publisher)
if args.dry_run or (args.target is None) or (len(args.target) == 0):
wallet_balancer: WalletBalancer = NullWalletBalancer()
@ -133,28 +140,38 @@ try:
wallet_balancer = LiveWalletBalancer(context, wallet, trade_executor, action_threshold, tokens, targets)
stop = False
liquidator = PollingLiquidator(context, wallet, account_liquidator, wallet_balancer,
throttle_to_seconds=throttle_ripe_update_to_seconds,
ripe_iterations=ripe_update_iterations)
liquidator.liquidations.subscribe(on_next=lambda event: logging.info(str(TransactionScout.load(context, event.signature))))
for notification_target in args.notify_liquidations:
liquidator.liquidations.subscribe(on_next=notification_target.send)
liquidation_processor = LiquidationProcessor(context, account_liquidator, wallet_balancer)
while not stop:
started_at = time.time()
try:
liquidator.run()
margin_account_loop_started_at = time.time()
ripe = MarginAccount.load_all_ripe(context)
time_taken = time.time() - started_at
should_sleep_for = throttle_reload_to_seconds - int(time_taken)
sleep_for = max(should_sleep_for, 0)
logging.info(f"Check of all margin accounts complete. Time taken: {time_taken:.2f} seconds, sleeping for {sleep_for} seconds...")
time.sleep(sleep_for)
liquidation_processor.update_margin_accounts(ripe)
for counter in range(ripe_update_iterations):
price_loop_started_at = time.time()
logging.info(f"Update {counter} of {ripe_update_iterations} - {len(ripe)} ripe 🥭 accounts.")
group = Group.load(context)
prices = group.fetch_token_prices()
liquidation_processor.update_prices(prices)
price_loop_time_taken = time.time() - price_loop_started_at
price_loop_should_sleep_for = throttle_ripe_update_to_seconds - int(price_loop_time_taken)
price_loop_sleep_for = max(price_loop_should_sleep_for, 0)
logging.info(f"Price fetch and check of all ripe 🥭 accounts complete. Time taken: {price_loop_time_taken:.2f} seconds, sleeping for {price_loop_sleep_for} seconds...")
time.sleep(price_loop_sleep_for)
margin_account_loop_time_taken = time.time() - margin_account_loop_started_at
margin_account_should_sleep_for = throttle_reload_to_seconds - int(margin_account_loop_time_taken)
margin_account_sleep_for = max(margin_account_should_sleep_for, 0)
logging.info(f"Check of all margin accounts complete. Time taken: {margin_account_loop_time_taken:.2f} seconds, sleeping for {margin_account_sleep_for} seconds...")
time.sleep(margin_account_sleep_for)
except KeyboardInterrupt:
stop = True
logging.info("Stopping...")
except Exception as exception:
logging.critical(f"Iteration failed because of exception: {exception}")
except Exception as exception:
logging.critical(f"Liquidator stopped because of exception: {exception} - {traceback.format_exc()}")

View File

@ -24,14 +24,17 @@ import argparse
import logging
import os.path
import projectsetup # noqa: F401
import time
import traceback
from AccountScout import AccountScout
from AccountLiquidator import AccountLiquidator, ForceCancelOrdersAccountLiquidator, NullAccountLiquidator
from BaseModel import Group
from AccountLiquidator import AccountLiquidator, ForceCancelOrdersAccountLiquidator, NullAccountLiquidator, ReportingAccountLiquidator
from BaseModel import Group, LiquidationEvent, MarginAccount
from Constants import WARNING_DISCLAIMER_TEXT
from Context import Context, default_cluster, default_cluster_url, default_program_id, default_dex_program_id, default_group_name, default_group_id
from PollingLiquidator import PollingLiquidator
from LiquidationProcessor import LiquidationProcessor
from Notification import NotificationHandler, parse_subscription_target
from Observables import EventSource
from TransactionScout import TransactionScout
from Wallet import Wallet
from WalletBalancer import NullWalletBalancer
@ -53,6 +56,10 @@ parser.add_argument("--group-id", type=str, default=default_group_id,
help="Mango group ID/address")
parser.add_argument("--id-file", type=str, default="id.json",
help="file containing the JSON-formatted wallet private key")
parser.add_argument("--notify-liquidations", type=parse_subscription_target, action="append", default=[],
help="The notification target for liquidation events")
parser.add_argument("--notify-errors", type=parse_subscription_target, action="append", default=[],
help="The notification target for error events")
parser.add_argument("--log-level", default=logging.INFO, type=lambda level: getattr(logging, level),
help="level of verbosity to log (possible values: DEBUG, INFO, WARNING, ERROR, CRITICAL)")
parser.add_argument("--dry-run", action="store_true", default=False,
@ -60,6 +67,11 @@ parser.add_argument("--dry-run", action="store_true", default=False,
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
for notify in args.notify_errors:
handler = NotificationHandler(notify)
handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(handler)
logging.warning(WARNING_DISCLAIMER_TEXT)
try:
@ -86,15 +98,30 @@ try:
logging.info("Wallet accounts OK.")
liquidations_publisher = EventSource[LiquidationEvent]()
liquidations_publisher.subscribe(on_next=lambda event: logging.info(str(TransactionScout.load(context, event.signature))))
for notification_target in args.notify_liquidations:
liquidations_publisher.subscribe(on_next=notification_target.send)
if args.dry_run:
account_liquidator: AccountLiquidator = NullAccountLiquidator()
else:
account_liquidator = ForceCancelOrdersAccountLiquidator(context, wallet)
intermediate = ForceCancelOrdersAccountLiquidator(context, wallet)
account_liquidator = ReportingAccountLiquidator(intermediate, context, wallet, liquidations_publisher)
wallet_balancer = NullWalletBalancer()
liquidator = PollingLiquidator(context, wallet, account_liquidator, wallet_balancer, ripe_iterations=1)
liquidator.liquidations.subscribe(on_next=lambda event: logging.info(str(TransactionScout.load(context, event.signature))))
liquidator.run()
liquidation_processor = LiquidationProcessor(context, account_liquidator, wallet_balancer)
started_at = time.time()
ripe = MarginAccount.load_all_ripe(context)
liquidation_processor.update_margin_accounts(ripe)
prices = group.fetch_token_prices()
liquidation_processor.update_prices(prices)
time_taken = time.time() - started_at
logging.info(f"Check of all margin accounts complete. Time taken: {time_taken:.2f} seconds.")
except Exception as exception:
logging.critical(f"Liquidator stopped because of exception: {exception} - {traceback.format_exc()}")

View File

@ -5,5 +5,6 @@ nblint
pandas
pyserum
rx
rxpy_backpressure
solana
web3

View File

@ -4,4 +4,7 @@ CURRENT_DIRECTORY="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "${CURRENT_DIRECTORY}/.."
docker build . -t opinionatedgeek/mango-explorer:latest
TAG=${1:-latest}
echo Building opinionatedgeek/mango-explorer:${TAG}
docker build . -t opinionatedgeek/mango-explorer:${TAG}

View File

@ -19,8 +19,16 @@ script_path = Path(os.path.realpath(__file__))
# notebook directory. It's this notebook directory we want.
notebook_directory = script_path.parent.parent
parser = argparse.ArgumentParser(description="Run a liquidator for a Mango Markets group.")
parser.add_argument("--skip-cleanup", action="store_true", default=False,
help="skip removal of the temp directory when finished (will leave processed files for inspection)")
args = parser.parse_args()
working_directory_name = ".tmplintdir"
try:
if os.path.exists(notebook_directory / working_directory_name):
shutil.rmtree(notebook_directory / working_directory_name)
os.mkdir(str(notebook_directory / working_directory_name))
all_notebook_files = [f for f in os.listdir(notebook_directory) if isfile(
@ -57,9 +65,9 @@ try:
command = f'mypy "{str(notebook_directory / working_directory_name)}"'
os.system(command)
except Exception as ex:
print(f"Caught exception: {ex}")
if os.path.exists(notebook_directory / working_directory_name):
shutil.rmtree(notebook_directory / working_directory_name)
if not args.skip_cleanup:
if os.path.exists(notebook_directory / working_directory_name):
shutil.rmtree(notebook_directory / working_directory_name)