mango-explorer/bin/liquidator

205 lines
11 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env pyston3
import os
import sys
from pathlib import Path
# Get the full path to this script.
script_path = Path(os.path.realpath(__file__))
# The parent of the script is the bin directory.
# The parent of the bin directory is the notebook directory.
# It's this notebook directory we want.
notebook_directory = script_path.parent.parent
# Add the notebook directory to our import path.
sys.path.append(str(notebook_directory))
# Add the startup directory to our import path.
startup_directory = notebook_directory / "meta" / "startup"
sys.path.append(str(startup_directory))
import argparse
import logging
import os.path
import projectsetup # noqa: F401
import time
import traceback
from decimal import Decimal
from AccountScout import AccountScout
from AccountLiquidator import AccountLiquidator, ForceCancelOrdersAccountLiquidator, NullAccountLiquidator, ReportingAccountLiquidator
from BaseModel import Group, LiquidationEvent
from Constants import WARNING_DISCLAIMER_TEXT
from Context import Context, default_cluster, default_cluster_url, default_program_id, default_dex_program_id, default_group_name, default_group_id
from LiquidationProcessor import LiquidationProcessor
from Notification import FilteringNotificationTarget, NotificationHandler, parse_subscription_target
from Observables import EventSource
from Retrier import retry_context
from TradeExecutor import SerumImmediateTradeExecutor
from TransactionScout import TransactionScout
from Wallet import Wallet
from WalletBalancer import LiveWalletBalancer, NullWalletBalancer, TargetBalanceParser, WalletBalancer
# We explicitly want argument parsing to be outside the main try-except block because some arguments
# (like --help) will cause an exit, which our except: block traps.
parser = argparse.ArgumentParser(description="Run a liquidator for a Mango Markets group.")
parser.add_argument("--cluster", type=str, default=default_cluster,
help="Solana RPC cluster name")
parser.add_argument("--cluster-url", type=str, default=default_cluster_url,
help="Solana RPC cluster URL")
parser.add_argument("--program-id", type=str, default=default_program_id,
help="Mango program ID/address")
parser.add_argument("--dex-program-id", type=str, default=default_dex_program_id,
help="DEX program ID/address")
parser.add_argument("--group-name", type=str, default=default_group_name,
help="Mango group name")
parser.add_argument("--group-id", type=str, default=default_group_id,
help="Mango group ID/address")
parser.add_argument("--id-file", type=str, default="id.json",
help="file containing the JSON-formatted wallet private key")
parser.add_argument("--name", type=str, default="Mango Markets Liquidator",
help="Name of the liquidator (used in reports and alerts)")
parser.add_argument("--log-level", default=logging.INFO, type=lambda level: getattr(logging, level),
help="level of verbosity to log (possible values: DEBUG, INFO, WARNING, ERROR, CRITICAL)")
parser.add_argument("--throttle-reload-to-seconds", type=Decimal, default=Decimal(60),
help="minimum number of seconds between each full margin account reload loop (including time taken processing accounts)")
parser.add_argument("--throttle-ripe-update-to-seconds", type=Decimal, default=Decimal(5),
help="minimum number of seconds between each ripe update loop (including time taken processing accounts)")
parser.add_argument("--ripe-update-iterations", type=int, default=10,
help="number of iterations of ripe updates before performing a full reload of all margin accounts")
parser.add_argument("--target", type=str, action="append",
help="token symbol plus target value or percentage, separated by a colon (e.g. 'ETH:2.5' or 'ETH:33%')")
parser.add_argument("--action-threshold", type=Decimal, default=Decimal("0.01"),
help="fraction of total wallet value a trade must be above to be carried out")
parser.add_argument("--adjustment-factor", type=Decimal, default=Decimal("0.05"),
help="factor by which to adjust the SELL price (akin to maximum slippage)")
parser.add_argument("--notify-liquidations", type=parse_subscription_target, action="append", default=[],
help="The notification target for liquidation events")
parser.add_argument("--notify-successful-liquidations", type=parse_subscription_target,
action="append", default=[], help="The notification target for successful liquidation events")
parser.add_argument("--notify-failed-liquidations", type=parse_subscription_target,
action="append", default=[], help="The notification target for failed liquidation events")
parser.add_argument("--notify-errors", type=parse_subscription_target, action="append", default=[],
help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
for notify in args.notify_errors:
handler = NotificationHandler(notify)
handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(handler)
logging.warning(WARNING_DISCLAIMER_TEXT)
try:
id_filename = args.id_file
if not os.path.isfile(id_filename):
logging.error(f"Wallet file '{id_filename}' is not present.")
sys.exit(1)
wallet = Wallet.load(id_filename)
action_threshold = args.action_threshold
adjustment_factor = args.adjustment_factor
throttle_reload_to_seconds = args.throttle_reload_to_seconds
throttle_ripe_update_to_seconds = args.throttle_ripe_update_to_seconds
ripe_update_iterations = args.ripe_update_iterations
liquidator_name = args.name
context = Context.from_command_line(args.cluster, args.cluster_url, args.program_id,
args.dex_program_id, args.group_name, args.group_id)
logging.info(f"Context: {context}")
logging.info(f"Wallet address: {wallet.address}")
group = Group.load(context)
tokens = [basket_token.token for basket_token in group.basket_tokens]
logging.info("Checking wallet accounts.")
scout = AccountScout()
report = scout.verify_account_prepared_for_group(context, group, wallet.address)
logging.info(f"Wallet account report: {report}")
if report.has_errors:
raise Exception(f"Account '{wallet.address}' is not prepared for group '{group.address}'.")
logging.info("Wallet accounts OK.")
liquidations_publisher = EventSource[LiquidationEvent]()
liquidations_publisher.subscribe(on_next=lambda event: logging.info(str(TransactionScout.load(context, event.signature))))
for notification_target in args.notify_liquidations:
liquidations_publisher.subscribe(on_next=notification_target.send)
for notification_target in args.notify_successful_liquidations:
filtering = FilteringNotificationTarget(notification_target, lambda item: isinstance(item, LiquidationEvent) and item.succeeded)
liquidations_publisher.subscribe(on_next=filtering.send)
for notification_target in args.notify_failed_liquidations:
filtering = FilteringNotificationTarget(notification_target, lambda item: isinstance(item, LiquidationEvent) and not item.succeeded)
liquidations_publisher.subscribe(on_next=filtering.send)
if args.dry_run:
account_liquidator: AccountLiquidator = NullAccountLiquidator()
else:
intermediate = ForceCancelOrdersAccountLiquidator(context, wallet)
account_liquidator = ReportingAccountLiquidator(intermediate,
context,
wallet,
liquidations_publisher,
liquidator_name)
if args.dry_run or (args.target is None) or (len(args.target) == 0):
wallet_balancer: WalletBalancer = NullWalletBalancer()
else:
balance_parser = TargetBalanceParser(tokens)
targets = list(map(balance_parser.parse, args.target))
trade_executor = SerumImmediateTradeExecutor(context, wallet, group, adjustment_factor)
wallet_balancer = LiveWalletBalancer(context, wallet, trade_executor, action_threshold, tokens, targets)
stop = False
liquidation_processor = LiquidationProcessor(context, account_liquidator, wallet_balancer)
while not stop:
try:
margin_account_loop_started_at = time.time()
with retry_context("Margin Account Fetch",
group.load_ripe_margin_accounts,
context.retry_pauses) as margin_account_retrier:
ripe = margin_account_retrier.run()
liquidation_processor.update_margin_accounts(ripe)
for counter in range(ripe_update_iterations):
price_loop_started_at = time.time()
logging.info(f"Update {counter} of {ripe_update_iterations} - {len(ripe)} ripe 🥭 accounts.")
with retry_context("Price Fetch",
lambda: Group.load_with_prices(context),
context.retry_pauses) as price_retrier:
group, prices = price_retrier.run()
liquidation_processor.update_prices(group, prices)
price_loop_time_taken = time.time() - price_loop_started_at
price_loop_should_sleep_for = float(throttle_ripe_update_to_seconds) - price_loop_time_taken
price_loop_sleep_for = max(price_loop_should_sleep_for, 0.0)
logging.info(f"Price fetch and check of all ripe 🥭 accounts complete. Time taken: {price_loop_time_taken:.2f} seconds, sleeping for {price_loop_sleep_for} seconds...")
time.sleep(price_loop_sleep_for)
margin_account_loop_time_taken = time.time() - margin_account_loop_started_at
margin_account_should_sleep_for = float(throttle_reload_to_seconds) - int(margin_account_loop_time_taken)
margin_account_sleep_for = max(margin_account_should_sleep_for, 0.0)
logging.info(f"Check of all margin accounts complete. Time taken: {margin_account_loop_time_taken:.2f} seconds, sleeping for {margin_account_sleep_for} seconds...")
time.sleep(margin_account_sleep_for)
except KeyboardInterrupt:
stop = True
logging.info("Stopping...")
except Exception as exception:
logging.critical(f"Liquidator stopped because of exception: {exception} - {traceback.format_exc()}")
except:
logging.critical(f"Liquidator stopped because of uncatchable error: {traceback.format_exc()}")
finally:
logging.info("Liquidator completed.")