#!/usr/bin/env pyston3 import argparse import logging import os import os.path import rx import rx.operators import sys import threading from decimal import Decimal from solana.publickey import PublicKey sys.path.insert(0, os.path.abspath( os.path.join(os.path.dirname(__file__), ".."))) import mango # nopep8 import mango.marketmaking # nopep8 from mango.marketmaking.orderchain import chain # nopep8 from mango.marketmaking.orderchain import chainbuilder # nopep8 parser = argparse.ArgumentParser(description="Runs a marketmaker against a particular market.") mango.ContextBuilder.add_command_line_parameters(parser) mango.Wallet.add_command_line_parameters(parser) chainbuilder.ChainBuilder.add_command_line_parameters(parser) parser.add_argument("--market", type=str, required=True, help="market symbol to make market upon (e.g. ETH/USDC)") parser.add_argument("--update-mode", type=mango.marketmaking.ModelUpdateMode, default=mango.marketmaking.ModelUpdateMode.POLL, choices=list(mango.marketmaking.ModelUpdateMode), help="Update mode for model data - can be POLL (default) or WEBSOCKET") parser.add_argument("--oracle-provider", type=str, required=True, help="name of the price provider to use (e.g. pyth)") parser.add_argument("--oracle-market", type=str, help="market symbol for oracle to use for pricing (e.g. ETH/USDC) - defaults to market specified in --market") parser.add_argument("--order-type", type=mango.OrderType, default=mango.OrderType.POST_ONLY, choices=list(mango.OrderType), help="Order type: LIMIT, IOC or POST_ONLY") parser.add_argument("--existing-order-tolerance", type=Decimal, default=Decimal("0.001"), help="tolerance in price and quantity when matching existing orders or cancelling/replacing") parser.add_argument("--redeem-threshold", type=Decimal, help="threshold above which liquidity incentives will be automatically moved to the account (default: no moving)") parser.add_argument("--pulse-interval", type=int, default=10, help="number of seconds between each 'pulse' of the market maker") parser.add_argument("--event-queue-poll-interval", type=int, default=10, help="number of seconds between each 'poll' of the event queue (only used if --update-mode is POLL)") parser.add_argument("--hedge-market", type=str, help="spot market symbol to use for hedging (e.g. ETH/USDC)") parser.add_argument("--max-price-slippage-factor", type=Decimal, default=Decimal("0.05"), help="the maximum value the IOC hedging order price can slip by when hedging (default is 0.05 for 5%%)") parser.add_argument("--account-address", type=PublicKey, help="address of the specific account to use, if more than one available") parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[], help="The notification target for error events") parser.add_argument("--dry-run", action="store_true", default=False, help="runs as read-only and does not perform any transactions") args: argparse.Namespace = mango.parse_args(parser) for notify in args.notify_errors: handler = mango.NotificationHandler(notify) handler.setLevel(logging.ERROR) logging.getLogger().addHandler(handler) def cleanup(context: mango.Context, wallet: mango.Wallet, account: mango.Account, market: mango.Market, dry_run: bool): market_operations: mango.MarketOperations = mango.create_market_operations( context, wallet, account, market, dry_run) market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder( context, wallet, account, market, dry_run) cancels: mango.CombinableInstructions = mango.CombinableInstructions.empty() orders = market_operations.load_my_orders() for order in orders: cancels += market_instruction_builder.build_cancel_order_instructions(order, ok_if_missing=True) if len(cancels.instructions) > 0: logging.info(f"Cleaning up {len(cancels.instructions)} order(s).") signer: mango.CombinableInstructions = mango.CombinableInstructions.from_wallet(wallet) (signer + cancels).execute(context) market_operations.crank() market_operations.settle() context = mango.ContextBuilder.from_command_line_parameters(args) disposer = mango.DisposePropagator() manager = mango.IndividualWebSocketSubscriptionManager(context) disposer.add_disposable(manager) health_check = mango.HealthCheck() disposer.add_disposable(health_check) wallet = mango.Wallet.from_command_line_parameters_or_raise(args) group = mango.Group.load(context, context.group_address) account = mango.Account.load_for_owner_by_address(context, wallet.address, group, args.account_address) market = mango.load_market_by_symbol(context, args.market) # The market index is also the index of the base token in the group's token list. if market.quote != group.shared_quote_token.token: raise Exception( f"Group {group.name} uses shared quote token {group.shared_quote_token.token.symbol}/{group.shared_quote_token.token.mint}, but market {market.symbol} uses quote token {market.quote.symbol}/{market.quote.mint}.") cleanup(context, wallet, account, market, args.dry_run) if args.hedge_market is not None: if not isinstance(market, mango.PerpMarket): raise Exception(f"Cannot hedge - market {market.symbol} is not a perp market.") watched_market: mango.PerpMarket = market hedging_market_symbol = args.hedge_market.upper() hedging_market_stub = context.market_lookup.find_by_symbol(hedging_market_symbol) if hedging_market_stub is None: raise Exception(f"Could not find market {hedging_market_symbol}") hedging_market = mango.ensure_market_loaded(context, hedging_market_stub) if not isinstance(hedging_market, mango.SpotMarket): raise Exception(f"Market {hedging_market_symbol} is not a spot market.") logging.info(f"Hedging on {hedging_market.symbol}") hedging_market_operations: mango.MarketOperations = mango.create_market_operations( context, wallet, account, hedging_market, args.dry_run) perp_event_queue: mango.PerpEventQueue = mango.PerpEventQueue.load( context, watched_market.underlying_perp_market.event_queue, watched_market.lot_size_converter) event_queue_source: mango.EventSource[mango.PerpEventQueue] if args.update_mode == mango.marketmaking.ModelUpdateMode.WEBSOCKET: event_subscription = mango.WebSocketAccountSubscription( context, watched_market.underlying_perp_market.event_queue, lambda account_info: mango.PerpEventQueue.parse(account_info, watched_market.lot_size_converter)) manager.add(event_subscription) health_check.add("hedger_watched_events_pong_subscription", event_subscription.pong) event_queue_source = event_subscription.publisher else: event_queue_observable: rx.Observable = rx.interval(args.event_queue_poll_interval).pipe( rx.operators.observe_on(context.create_thread_pool_scheduler()), rx.operators.catch(mango.observable_pipeline_error_reporter), rx.operators.retry(), rx.operators.start_with(-1), rx.operators.map(lambda _: mango.PerpEventQueue.load( context, watched_market.underlying_perp_market.event_queue, watched_market.lot_size_converter)) ) event_queue_source = mango.EventSource[mango.PerpEventQueue]() event_queue_observable.subscribe(event_queue_source) health_check.add("hedger_watched_events_pong_subscription", event_queue_source) hedger: mango.marketmaking.Hedger = mango.marketmaking.Hedger( context, account, watched_market, hedging_market, perp_event_queue, event_queue_source, hedging_market_operations, args.max_price_slippage_factor) disposer.add_disposable(hedger) order_reconciler = mango.marketmaking.ToleranceOrderReconciler( args.existing_order_tolerance, args.existing_order_tolerance) desired_orders_chain: chain.Chain = chainbuilder.ChainBuilder.from_command_line_parameters(args) logging.info(f"Desired orders chain: {desired_orders_chain}") market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder( context, wallet, account, market, args.dry_run) market_maker = mango.marketmaking.MarketMaker( wallet, market, market_instruction_builder, desired_orders_chain, order_reconciler, args.redeem_threshold) oracle_provider: mango.OracleProvider = mango.create_oracle_provider(context, args.oracle_provider) oracle_market: mango.LoadedMarket = market if args.oracle_market is None else mango.load_market_by_symbol( context, args.oracle_market) oracle = oracle_provider.oracle_for_market(context, oracle_market) if oracle is None: raise Exception(f"Could not find oracle for market {oracle_market.symbol} from provider {args.oracle_provider}.") model_state_builder: mango.marketmaking.ModelStateBuilder = mango.marketmaking.model_state_builder_factory( args.update_mode, context, disposer, manager, health_check, wallet, group, account, market, oracle) health_check.add("marketmaker_pulse", market_maker.pulse_complete) logging.info(f"Current assets in account {account.address} (owner: {account.owner}):") mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info) manager.open() pulse_disposable = rx.interval(args.pulse_interval).pipe( rx.operators.observe_on(context.create_thread_pool_scheduler()), rx.operators.start_with(-1), rx.operators.catch(mango.observable_pipeline_error_reporter), rx.operators.retry() ).subscribe( on_next=lambda _: market_maker.pulse(context, model_state_builder.build(context))) disposer.add_disposable(pulse_disposable) # Wait - don't exit. Exiting will be handled by signals/interrupts. waiter = threading.Event() try: waiter.wait() except: pass logging.info("Shutting down...") disposer.dispose() cleanup(context, wallet, account, market, args.dry_run) logging.info("Shutdown complete.")