Moved to a more event-driven, observable-based liquidator.

This commit is contained in:
Geoff Taylor 2021-06-17 16:30:40 +01:00
parent aac32ba716
commit 80ba1e0d4b
1 changed files with 39 additions and 39 deletions

View File

@ -5,8 +5,9 @@ import json
import logging import logging
import os import os
import os.path import os.path
import rx
import rx.operators as ops
import sys import sys
import time
import traceback import traceback
from decimal import Decimal from decimal import Decimal
@ -26,8 +27,6 @@ parser.add_argument("--throttle-reload-to-seconds", type=Decimal, default=Decima
help="minimum number of seconds between each full margin account reload loop (including time taken processing accounts)") help="minimum number of seconds between each full margin account reload loop (including time taken processing accounts)")
parser.add_argument("--throttle-ripe-update-to-seconds", type=Decimal, default=Decimal(5), parser.add_argument("--throttle-ripe-update-to-seconds", type=Decimal, default=Decimal(5),
help="minimum number of seconds between each ripe update loop (including time taken processing accounts)") help="minimum number of seconds between each ripe update loop (including time taken processing accounts)")
parser.add_argument("--ripe-update-iterations", type=int, default=10,
help="number of iterations of ripe updates before performing a full reload of all margin accounts")
parser.add_argument("--target", type=str, action="append", parser.add_argument("--target", type=str, action="append",
help="token symbol plus target value or percentage, separated by a colon (e.g. 'ETH:2.5' or 'ETH:33%')") help="token symbol plus target value or percentage, separated by a colon (e.g. 'ETH:2.5' or 'ETH:33%')")
parser.add_argument("--action-threshold", type=Decimal, default=Decimal("0.01"), parser.add_argument("--action-threshold", type=Decimal, default=Decimal("0.01"),
@ -62,7 +61,6 @@ try:
adjustment_factor = args.adjustment_factor adjustment_factor = args.adjustment_factor
throttle_reload_to_seconds = args.throttle_reload_to_seconds throttle_reload_to_seconds = args.throttle_reload_to_seconds
throttle_ripe_update_to_seconds = args.throttle_ripe_update_to_seconds throttle_ripe_update_to_seconds = args.throttle_ripe_update_to_seconds
ripe_update_iterations = args.ripe_update_iterations
liquidator_name = args.name liquidator_name = args.name
logging.info(f"Context: {context}") logging.info(f"Context: {context}")
@ -90,8 +88,8 @@ try:
notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded) notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded)
liquidations_publisher.subscribe(on_next=filtering.send) liquidations_publisher.subscribe(on_next=filtering.send)
for notification_target in args.notify_failed_liquidations: for notification_target in args.notify_failed_liquidations:
filtering = mango.FilteringNotificationTarget(notification_target, lambda item: isinstance( filtering = mango.FilteringNotificationTarget(
item, mango.LiquidationEvent) and not item.succeeded) notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and not item.succeeded)
liquidations_publisher.subscribe(on_next=filtering.send) liquidations_publisher.subscribe(on_next=filtering.send)
if args.dry_run: if args.dry_run:
@ -116,47 +114,49 @@ try:
wallet_balancer = mango.LiveWalletBalancer( wallet_balancer = mango.LiveWalletBalancer(
context, wallet, group, trade_executor, action_threshold, tokens, targets) context, wallet, group, trade_executor, action_threshold, tokens, targets)
stop = False def fetch_prices(context):
liquidation_processor = mango.LiquidationProcessor(context, account_liquidator, wallet_balancer) def _fetch_prices(_):
with mango.retry_context("Price Fetch",
lambda: mango.Group.load_with_prices(context),
context.retry_pauses) as retrier:
return retrier.run()
while not stop: return _fetch_prices
try:
margin_account_loop_started_at = time.time() def fetch_margin_accounts(context):
group = mango.Group.load(context)
def _fetch_margin_accounts(_):
with mango.retry_context("Margin Account Fetch", with mango.retry_context("Margin Account Fetch",
lambda: mango.MarginAccount.load_ripe(context, group), lambda: mango.MarginAccount.load_ripe(context, group),
context.retry_pauses) as margin_account_retrier: context.retry_pauses) as retrier:
ripe = margin_account_retrier.run() return retrier.run()
return _fetch_margin_accounts
liquidation_processor.update_margin_accounts(ripe) liquidation_processor = mango.LiquidationProcessor(context, account_liquidator, wallet_balancer)
for counter in range(ripe_update_iterations): logging.info("Starting margin account fetcher subscription")
price_loop_started_at = time.time() margin_account_subscription = rx.interval(float(throttle_reload_to_seconds)).pipe(
logging.info(f"Update {counter} of {ripe_update_iterations} - {len(ripe)} ripe 🥭 accounts.") ops.subscribe_on(context.pool_scheduler),
ops.start_with(-1),
ops.map(fetch_margin_accounts(context)),
ops.catch(mango.observable_pipeline_error_reporter),
ops.retry()
).subscribe(mango.create_backpressure_skipping_observer(on_next=liquidation_processor.update_margin_accounts, on_error=mango.log_subscription_error))
with mango.retry_context("Price Fetch", logging.info("Starting price fetcher subscription")
lambda: mango.Group.load_with_prices(context), price_subscription = rx.interval(float(throttle_ripe_update_to_seconds)).pipe(
context.retry_pauses) as price_retrier: ops.subscribe_on(context.pool_scheduler),
group, prices = price_retrier.run() ops.map(fetch_prices(context)),
ops.catch(mango.observable_pipeline_error_reporter),
ops.retry()
).subscribe(mango.create_backpressure_skipping_observer(on_next=lambda piped: liquidation_processor.update_prices(piped[0], piped[1]), on_error=mango.log_subscription_error))
liquidation_processor.update_prices(group, prices) # Wait - don't exit
input()
price_loop_time_taken = time.time() - price_loop_started_at
price_loop_should_sleep_for = float(throttle_ripe_update_to_seconds) - price_loop_time_taken
price_loop_sleep_for = max(price_loop_should_sleep_for, 0.0)
logging.info(
f"Price fetch and check of all ripe 🥭 accounts complete. Time taken: {price_loop_time_taken:.2f} seconds, sleeping for {price_loop_sleep_for} seconds...")
time.sleep(price_loop_sleep_for)
margin_account_loop_time_taken = time.time() - margin_account_loop_started_at
margin_account_should_sleep_for = float(throttle_reload_to_seconds) - int(margin_account_loop_time_taken)
margin_account_sleep_for = max(margin_account_should_sleep_for, 0.0)
logging.info(
f"Check of all margin accounts complete. Time taken: {margin_account_loop_time_taken:.2f} seconds, sleeping for {margin_account_sleep_for} seconds...")
time.sleep(margin_account_sleep_for)
except KeyboardInterrupt:
stop = True
logging.info("Stopping...")
except KeyboardInterrupt:
logging.info("Liquidator stopping...")
except Exception as exception: except Exception as exception:
logging.critical(f"Liquidator stopped because of exception: {exception} - {traceback.format_exc()}") logging.critical(f"Liquidator stopped because of exception: {exception} - {traceback.format_exc()}")
except: except: