mango-explorer/bin/marketmaker

182 lines
8.7 KiB
Plaintext
Executable File

#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import rx.operators
import sys
import threading
from decimal import Decimal
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
import mango.marketmaking # nopep8
from mango.marketmaking.orderchain import chain # nopep8
from mango.marketmaking.orderchain import chainbuilder # nopep8
parser = argparse.ArgumentParser(description="Shows the on-chain data of a particular account.")
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
chainbuilder.ChainBuilder.add_command_line_parameters(parser)
parser.add_argument("--market", type=str, required=True, help="market symbol to make market upon (e.g. ETH/USDC)")
parser.add_argument("--update-mode", type=mango.marketmaking.ModelUpdateMode, default=mango.marketmaking.ModelUpdateMode.WEBSOCKET,
choices=list(mango.marketmaking.ModelUpdateMode), help="Update mode for model data - can be WEBSOCKET (default) or POLL")
parser.add_argument("--oracle-provider", type=str, required=True,
help="name of the price provider to use (e.g. pyth)")
parser.add_argument("--existing-order-tolerance", type=Decimal, default=Decimal("0.001"),
help="tolerance in price and quantity when matching existing orders or cancelling/replacing")
parser.add_argument("--pulse-interval", type=int, default=10,
help="number of seconds between each 'pulse' of the market maker")
parser.add_argument("--event-queue-poll-interval", type=int, default=10,
help="number of seconds between each 'poll' of the event queue (only used if --update-mode is POLL)")
parser.add_argument("--hedge-market", type=str, help="spot market symbol to use for hedging (e.g. ETH/USDC)")
parser.add_argument("--max-price-slippage-factor", type=Decimal, default=Decimal("0.05"),
help="the maximum value the IOC hedging order price can slip by when hedging (default is 0.05 for 5%)")
parser.add_argument("--account-index", type=int, default=0,
help="index of the account to use, if more than one available")
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
for notify in args.notify_errors:
handler = mango.NotificationHandler(notify)
handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(handler)
def cleanup(context: mango.Context, wallet: mango.Wallet, account: mango.Account, market: mango.Market, dry_run: bool):
logging.info("Cleaning up.")
market_operations: mango.MarketOperations = mango.create_market_operations(
context, wallet, account, market, dry_run)
orders = market_operations.load_my_orders()
for order in orders:
market_operations.cancel_order(order)
logging.info("Settling.")
market_operations.crank()
market_operations.settle()
context = mango.ContextBuilder.from_command_line_parameters(args)
logging.info(f"{context}")
disposer = mango.DisposePropagator()
manager = mango.IndividualWebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
wallet = mango.Wallet.from_command_line_parameters_or_raise(args)
group = mango.Group.load(context, context.group_id)
account = mango.Account.load_for_owner_by_index(context, wallet.address, group, args.account_index)
market_symbol = args.market.upper()
market = context.market_lookup.find_by_symbol(market_symbol)
if market is None:
raise Exception(f"Could not find market {market_symbol}")
# The market index is also the index of the base token in the group's token list.
if market.quote != group.shared_quote_token.token:
raise Exception(
f"Group {group.name} uses shared quote token {group.shared_quote_token.token.symbol}/{group.shared_quote_token.token.mint}, but market {market.symbol} uses quote token {market.quote.symbol}/{market.quote.mint}.")
cleanup(context, wallet, account, market, args.dry_run)
market = mango.ensure_market_loaded(context, market)
if args.hedge_market is not None:
if not isinstance(market, mango.PerpMarket):
raise Exception(f"Cannot hedge - market {market_symbol} is not a perp market.")
watched_market: mango.PerpMarket = market
hedging_market_symbol = args.hedge_market.upper()
hedging_market_stub = context.market_lookup.find_by_symbol(hedging_market_symbol)
if hedging_market_stub is None:
raise Exception(f"Could not find market {hedging_market_symbol}")
hedging_market = mango.ensure_market_loaded(context, hedging_market_stub)
if not isinstance(hedging_market, mango.SpotMarket):
raise Exception(f"Market {hedging_market_symbol} is not a spot market.")
logging.info(f"Hedging on {hedging_market.symbol}")
hedging_market_operations: mango.MarketOperations = mango.create_market_operations(
context, wallet, account, hedging_market, args.dry_run)
perp_event_queue: mango.PerpEventQueue = mango.PerpEventQueue.load(
context, watched_market.underlying_perp_market.event_queue, watched_market.lot_size_converter)
event_queue_source: mango.EventSource[mango.PerpEventQueue]
if args.update_mode == mango.marketmaking.ModelUpdateMode.WEBSOCKET:
event_subscription = mango.WebSocketAccountSubscription(
context, watched_market.underlying_perp_market.event_queue,
lambda account_info: mango.PerpEventQueue.parse(account_info, watched_market.lot_size_converter))
manager.add(event_subscription)
health_check.add("hedger_watched_events_pong_subscription", event_subscription.pong)
event_queue_source = event_subscription.publisher
else:
event_queue_observable: rx.Observable = rx.interval(args.event_queue_poll_interval).pipe(
rx.operators.observe_on(context.pool_scheduler),
rx.operators.catch(mango.observable_pipeline_error_reporter),
rx.operators.retry(),
rx.operators.start_with(-1),
rx.operators.map(lambda _: mango.PerpEventQueue.load(
context, watched_market.underlying_perp_market.event_queue, watched_market.lot_size_converter))
)
event_queue_source = mango.EventSource[mango.PerpEventQueue]()
event_queue_observable.subscribe(event_queue_source)
health_check.add("hedger_watched_events_pong_subscription", event_queue_source)
hedger: mango.marketmaking.Hedger = mango.marketmaking.Hedger(
context, account, watched_market, hedging_market, perp_event_queue, event_queue_source,
hedging_market_operations, args.max_price_slippage_factor)
disposer.add_disposable(hedger)
order_reconciler = mango.marketmaking.ToleranceOrderReconciler(
args.existing_order_tolerance, args.existing_order_tolerance)
desired_orders_chain: chain.Chain = chainbuilder.ChainBuilder.from_command_line_parameters(args)
market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder(
context, wallet, account, market, args.dry_run)
market_maker = mango.marketmaking.MarketMaker(
wallet, market, market_instruction_builder, desired_orders_chain, order_reconciler)
oracle_provider: mango.OracleProvider = mango.create_oracle_provider(context, args.oracle_provider)
oracle = oracle_provider.oracle_for_market(context, market)
if oracle is None:
raise Exception(f"Could not find oracle for market {market.symbol} from provider {args.oracle_provider}.")
model_state_builder: mango.marketmaking.ModelStateBuilder = mango.marketmaking.model_state_builder_factory(
args.update_mode, context, disposer, manager, health_check, wallet, group, account, market, oracle)
health_check.add("marketmaker_pulse", market_maker.pulse_complete)
logging.info(f"Current assets in account {account.address} (owner: {account.owner}):")
mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info)
manager.open()
pulse_disposable = rx.interval(args.pulse_interval).subscribe(
on_next=lambda _: market_maker.pulse(context, model_state_builder.build(context)))
disposer.add_disposable(pulse_disposable)
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
try:
waiter.wait()
except:
pass
logging.info("Shutting down...")
disposer.dispose()
cleanup(context, wallet, account, market, args.dry_run)
logging.info("Shutdown complete.")