mango-explorer/bin/liquidator

163 lines
8.2 KiB
Plaintext
Executable File

#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import rx.operators as ops
import sys
import threading
import traceback
from decimal import Decimal
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), '..')))
import mango # nopep8
# We explicitly want argument parsing to be outside the main try-except block because some arguments
# (like --help) will cause an exit, which our except: block traps.
parser = argparse.ArgumentParser(description="Run a liquidator for a Mango Markets group.")
mango.Context.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--name", type=str, default="Mango Markets Liquidator",
help="Name of the liquidator (used in reports and alerts)")
parser.add_argument("--throttle-reload-to-seconds", type=Decimal, default=Decimal(60),
help="minimum number of seconds between each full margin account reload loop (including time taken processing accounts)")
parser.add_argument("--throttle-ripe-update-to-seconds", type=Decimal, default=Decimal(5),
help="minimum number of seconds between each ripe update loop (including time taken processing accounts)")
parser.add_argument("--target", type=str, action="append",
help="token symbol plus target value or percentage, separated by a colon (e.g. 'ETH:2.5' or 'ETH:33%')")
parser.add_argument("--action-threshold", type=Decimal, default=Decimal("0.01"),
help="fraction of total wallet value a trade must be above to be carried out")
parser.add_argument("--adjustment-factor", type=Decimal, default=Decimal("0.05"),
help="factor by which to adjust the SELL price (akin to maximum slippage)")
parser.add_argument("--notify-liquidations", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for liquidation events")
parser.add_argument("--notify-successful-liquidations", type=mango.parse_subscription_target,
action="append", default=[], help="The notification target for successful liquidation events")
parser.add_argument("--notify-failed-liquidations", type=mango.parse_subscription_target,
action="append", default=[], help="The notification target for failed liquidation events")
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
for notify in args.notify_errors:
handler = mango.NotificationHandler(notify)
handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(handler)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
try:
context = mango.Context.from_command_line_parameters(args)
wallet = mango.Wallet.from_command_line_parameters_or_raise(args)
action_threshold = args.action_threshold
adjustment_factor = args.adjustment_factor
throttle_reload_to_seconds = args.throttle_reload_to_seconds
throttle_ripe_update_to_seconds = args.throttle_ripe_update_to_seconds
liquidator_name = args.name
logging.info(f"Context: {context}")
logging.info(f"Wallet address: {wallet.address}")
group = mango.Group.load(context)
tokens = [basket_token.token for basket_token in group.basket_tokens]
logging.info("Checking wallet accounts.")
scout = mango.AccountScout()
report = scout.verify_account_prepared_for_group(context, group, wallet.address)
logging.info(f"Wallet account report: {report}")
if report.has_errors:
raise Exception(f"Account '{wallet.address}' is not prepared for group '{group.address}'.")
logging.info("Wallet accounts OK.")
liquidations_publisher = mango.EventSource[mango.LiquidationEvent]()
liquidations_publisher.subscribe(on_next=lambda event: logging.info(
str(mango.TransactionScout.load(context, event.signature))))
for notification_target in args.notify_liquidations:
liquidations_publisher.subscribe(on_next=notification_target.send)
for notification_target in args.notify_successful_liquidations:
filtering = mango.FilteringNotificationTarget(
notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded)
liquidations_publisher.subscribe(on_next=filtering.send)
for notification_target in args.notify_failed_liquidations:
filtering = mango.FilteringNotificationTarget(
notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and not item.succeeded)
liquidations_publisher.subscribe(on_next=filtering.send)
if args.dry_run:
account_liquidator: mango.AccountLiquidator = mango.NullAccountLiquidator()
else:
intermediate = mango.ForceCancelOrdersAccountLiquidator(context, wallet)
account_liquidator = mango.ReportingAccountLiquidator(intermediate,
context,
wallet,
liquidations_publisher,
liquidator_name)
if args.dry_run or (args.target is None) or (len(args.target) == 0):
wallet_balancer: mango.WalletBalancer = mango.NullWalletBalancer()
else:
balance_parser = mango.TargetBalanceParser(tokens)
targets = list(map(balance_parser.parse, args.target))
trade_executor = mango.SerumImmediateTradeExecutor(context, wallet, adjustment_factor)
wallet_balancer = mango.LiveWalletBalancer(
context, wallet, group, trade_executor, action_threshold, tokens, targets)
def fetch_prices(context):
def _fetch_prices(_):
with mango.retry_context("Price Fetch",
lambda: mango.Group.load_with_prices(context),
context.retry_pauses) as retrier:
return retrier.run()
return _fetch_prices
def fetch_margin_accounts(context):
group = mango.Group.load(context)
def _fetch_margin_accounts(_):
with mango.retry_context("Margin Account Fetch",
lambda: mango.MarginAccount.load_ripe(context, group),
context.retry_pauses) as retrier:
return retrier.run()
return _fetch_margin_accounts
liquidation_processor = mango.LiquidationProcessor(context, account_liquidator, wallet_balancer)
logging.info("Starting margin account fetcher subscription")
margin_account_subscription = rx.interval(float(throttle_reload_to_seconds)).pipe(
ops.subscribe_on(context.pool_scheduler),
ops.start_with(-1),
ops.map(fetch_margin_accounts(context)),
ops.catch(mango.observable_pipeline_error_reporter),
ops.retry()
).subscribe(mango.create_backpressure_skipping_observer(on_next=liquidation_processor.update_margin_accounts, on_error=mango.log_subscription_error))
logging.info("Starting price fetcher subscription")
price_subscription = rx.interval(float(throttle_ripe_update_to_seconds)).pipe(
ops.subscribe_on(context.pool_scheduler),
ops.map(fetch_prices(context)),
ops.catch(mango.observable_pipeline_error_reporter),
ops.retry()
).subscribe(mango.create_backpressure_skipping_observer(on_next=lambda piped: liquidation_processor.update_prices(piped[0], piped[1]), on_error=mango.log_subscription_error))
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
waiter.wait()
except KeyboardInterrupt:
logging.info("Liquidator stopping...")
except Exception as exception:
logging.critical(f"Liquidator stopped because of exception: {exception} - {traceback.format_exc()}")
except:
logging.critical(f"Liquidator stopped because of uncatchable error: {traceback.format_exc()}")
finally:
logging.info("Liquidator completed.")