mango-explorer/bin/liquidator

221 lines
11 KiB
Plaintext
Executable File

#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import rx.core.typing
import rx.operators as ops
import sys
import threading
import traceback
import typing
from decimal import Decimal
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
# We explicitly want argument parsing to be outside the main try-except block because some arguments
# (like --help) will cause an exit, which our except: block traps.
parser = argparse.ArgumentParser(description="Run a liquidator for a Mango Markets group.")
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--throttle-reload-to-seconds", type=Decimal, default=Decimal(60),
help="minimum number of seconds between each full margin account reload loop (including time taken processing accounts)")
parser.add_argument("--throttle-ripe-update-to-seconds", type=Decimal, default=Decimal(5),
help="minimum number of seconds between each ripe update loop (including time taken processing accounts)")
parser.add_argument("--target", type=str, action="append",
help="token symbol plus target value or percentage, separated by a colon (e.g. 'ETH:2.5' or 'ETH:33%')")
parser.add_argument("--action-threshold", type=Decimal, default=Decimal("0.01"),
help="fraction of total wallet value a trade must be above to be carried out")
parser.add_argument("--worthwhile-threshold", type=Decimal, default=Decimal("0.01"),
help="value a liquidation must be above to be carried out")
parser.add_argument("--adjustment-factor", type=Decimal, default=Decimal("0.05"),
help="factor by which to adjust the SELL price (akin to maximum slippage)")
parser.add_argument("--notify-liquidations", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for liquidation events")
parser.add_argument("--notify-successful-liquidations", type=mango.parse_subscription_target,
action="append", default=[], help="The notification target for successful liquidation events")
parser.add_argument("--notify-failed-liquidations", type=mango.parse_subscription_target,
action="append", default=[], help="The notification target for failed liquidation events")
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
for notify in args.notify_errors:
handler = mango.NotificationHandler(notify)
handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(handler)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
def start_subscriptions(context: mango.Context, liquidation_processor: mango.LiquidationProcessor, fetch_prices: typing.Callable[[typing.Any], typing.Any], fetch_accounts: typing.Callable[[typing.Any], typing.Any], throttle_reload_to_seconds: Decimal, throttle_ripe_update_to_seconds: Decimal):
liquidation_processor.state = mango.LiquidationProcessorState.STARTING
logging.info("Starting margin account fetcher subscription")
account_subscription = rx.interval(float(throttle_reload_to_seconds)).pipe(
ops.observe_on(context.pool_scheduler),
ops.start_with(-1),
ops.map(fetch_accounts(context)),
ops.catch(mango.observable_pipeline_error_reporter),
ops.retry()
).subscribe(mango.create_backpressure_skipping_observer(on_next=liquidation_processor.update_accounts, on_error=mango.log_subscription_error))
logging.info("Starting price fetcher subscription")
price_subscription = rx.interval(float(throttle_ripe_update_to_seconds)).pipe(
ops.observe_on(context.pool_scheduler),
ops.map(fetch_prices(context)),
ops.catch(mango.observable_pipeline_error_reporter),
ops.retry()
).subscribe(mango.create_backpressure_skipping_observer(on_next=lambda piped: liquidation_processor.update_prices(piped[0], piped[1]), on_error=mango.log_subscription_error))
return account_subscription, price_subscription
try:
context = mango.ContextBuilder.from_command_line_parameters(args)
wallet = mango.Wallet.from_command_line_parameters_or_raise(args)
action_threshold = args.action_threshold
worthwhile_threshold = args.worthwhile_threshold
adjustment_factor = args.adjustment_factor
throttle_reload_to_seconds = args.throttle_reload_to_seconds
throttle_ripe_update_to_seconds = args.throttle_ripe_update_to_seconds
liquidator_name = args.name
logging.info(f"Context: {context}")
logging.info(f"Wallet address: {wallet.address}")
group = mango.Group.load(context)
tokens = [token_info.token for token_info in group.tokens if token_info is not None]
logging.info("Checking wallet accounts.")
scout = mango.AccountScout()
report = scout.verify_account_prepared_for_group(context, group, wallet.address)
logging.info(f"Wallet account report: {report}")
if report.has_errors:
raise Exception(f"Account '{wallet.address}' is not prepared for group '{group.address}'.")
logging.info("Wallet accounts OK.")
liquidations_publisher = mango.EventSource[mango.LiquidationEvent]()
liquidations_publisher.subscribe(on_next=lambda event: logging.info(
str(mango.TransactionScout.load(context, event.signature))))
for notification_target in args.notify_liquidations:
liquidations_publisher.subscribe(on_next=notification_target.send)
for successful_notification_target in args.notify_successful_liquidations:
captured_successful_notification_target = successful_notification_target
filtering_successful = mango.FilteringNotificationTarget(
captured_successful_notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded)
liquidations_publisher.subscribe(on_next=filtering_successful.send)
for failed_notification_target in args.notify_failed_liquidations:
captured_failed_notification_target = failed_notification_target
filtering_failed = mango.FilteringNotificationTarget(
captured_failed_notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and not item.succeeded)
liquidations_publisher.subscribe(on_next=filtering_failed.send)
# TODO: Add proper liquidator classes here when they're written for V3
if args.dry_run:
account_liquidator: mango.AccountLiquidator = mango.NullAccountLiquidator()
else:
account_liquidator = mango.NullAccountLiquidator()
if args.dry_run or (args.target is None) or (len(args.target) == 0):
wallet_balancer: mango.WalletBalancer = mango.NullWalletBalancer()
else:
balance_parser = mango.TargetBalanceParser(tokens)
targets = list(map(balance_parser.parse, args.target))
trade_executor = mango.ImmediateTradeExecutor(context, wallet, None, adjustment_factor)
wallet_balancer = mango.LiveWalletBalancer(
context, wallet, group, trade_executor, action_threshold, tokens, targets)
# These (along with `context`) are captured and read by `load_updated_price_details()`.
group_address = group.address
oracle_addresses = group.oracles
def load_updated_price_details() -> typing.Tuple[mango.Group, typing.Sequence[mango.TokenValue]]:
oracles = [oracle_address for oracle_address in oracle_addresses if oracle_address is not None]
all_addresses = [group_address, *oracles]
all_account_infos = mango.AccountInfo.load_multiple(context, all_addresses)
group_account_info = all_account_infos[0]
group = mango.Group.parse(context, group_account_info)
# TODO - fetch prices when code available in V3.
return group, []
def fetch_prices(context):
def _fetch_prices(_):
with mango.retry_context("Price Fetch",
load_updated_price_details,
context.retry_pauses) as retrier:
return retrier.run()
return _fetch_prices
def fetch_accounts(context):
def _actual_fetch():
group = mango.Group.load(context)
return mango.Account.load_ripe(context, group)
def _fetch_accounts(_):
with mango.retry_context("Margin Account Fetch",
_actual_fetch,
context.retry_pauses) as retrier:
return retrier.run()
return _fetch_accounts
class LiquidationProcessorSubscriptions:
def __init__(self, account: rx.core.typing.Disposable, price: rx.core.typing.Disposable):
self.account: rx.core.typing.Disposable = account
self.price: rx.core.typing.Disposable = price
liquidation_processor = mango.LiquidationProcessor(
context, liquidator_name, account_liquidator, wallet_balancer, worthwhile_threshold)
account_subscription, price_subscription = start_subscriptions(
context, liquidation_processor, fetch_prices, fetch_accounts, throttle_reload_to_seconds, throttle_ripe_update_to_seconds)
subscriptions = LiquidationProcessorSubscriptions(account=account_subscription,
price=price_subscription)
def on_unhealthy(liquidation_processor: mango.LiquidationProcessor):
if liquidation_processor.state != mango.LiquidationProcessorState.UNHEALTHY:
logging.info(
f"Ignoring LiquidationProcessor state change - state is: {liquidation_processor.state}")
return
logging.warning("Liquidation processor has been marked as unhealthy so recreating subscriptions.")
try:
subscriptions.account.dispose()
except Exception as exception:
logging.warning(f"Ignoring problem disposing of margin account subscription: {exception}")
try:
subscriptions.price.dispose()
except Exception as exception:
logging.warning(f"Ignoring problem disposing of margin account subscription: {exception}")
account_subscription, price_subscription = start_subscriptions(
context, liquidation_processor, fetch_prices, fetch_accounts, throttle_reload_to_seconds, throttle_ripe_update_to_seconds)
subscriptions.account = account_subscription
subscriptions.price = price_subscription
liquidation_processor.state_change.subscribe(on_next=on_unhealthy)
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
waiter.wait()
except KeyboardInterrupt:
logging.info("Liquidator stopping...")
except Exception as exception:
logging.critical(f"Liquidator stopped because of exception: {exception} - {traceback.format_exc()}")
except:
logging.critical(f"Liquidator stopped because of uncatchable error: {traceback.format_exc()}")
finally:
logging.info("Liquidator completed.")