#!/usr/bin/env pyston3 import argparse import logging import os import os.path import rx import sys import threading import typing from decimal import Decimal from pyserum.market import Market as PySerumMarket sys.path.insert(0, os.path.abspath( os.path.join(os.path.dirname(__file__), ".."))) import mango # nopep8 import mango.marketmaking # nopep8 parser = argparse.ArgumentParser(description="Shows the on-chain data of a particular account.") mango.ContextBuilder.add_command_line_parameters(parser) mango.Wallet.add_command_line_parameters(parser) parser.add_argument("--market", type=str, required=True, help="market symbol to make market upon (e.g. ETH/USDC)") parser.add_argument("--account-index", type=int, default=0, help="index of the account to use, if more than one available") parser.add_argument("--oracle-provider", type=str, required=True, help="name of the price provider to use (e.g. pyth)") parser.add_argument("--position-size-ratio", type=Decimal, required=True, help="fraction of the token inventory to be bought or sold in each order") parser.add_argument("--existing-order-tolerance", type=Decimal, default=Decimal("0.001"), help="tolerance in price and quantity when matching existing orders or cancelling/replacing") parser.add_argument("--minimum-charge-ratio", type=Decimal, default=Decimal("0.0005"), help="minimum fraction of the price to be accept as a spread") parser.add_argument("--confidence-weighting", type=Decimal, default=Decimal("2"), help="a weighting to apply to the confidence interval from the oracle: e.g. 1 - use the oracle confidence interval as the spread, 2 (risk averse, default) - multiply the oracle confidence interval by 2 to get the spread, 0.5 (aggressive) halve the oracle confidence interval to get the spread") parser.add_argument("--order-type", type=mango.OrderType, default=mango.OrderType.POST_ONLY, choices=list(mango.OrderType), help="Order type: LIMIT, IOC or POST_ONLY") parser.add_argument("--pulse-interval", type=int, default=10, help="number of seconds between each 'pulse' of the market maker") parser.add_argument("--dry-run", action="store_true", default=False, help="runs as read-only and does not perform any transactions") args = parser.parse_args() logging.getLogger().setLevel(args.log_level) logging.warning(mango.WARNING_DISCLAIMER_TEXT) def cleanup(context: mango.Context, wallet: mango.Wallet, dry_run: bool, market: mango.Market): logging.info("Cleaning up.") market_operations: mango.MarketOperations = mango.create_market_operations(context, wallet, dry_run, market) orders = market_operations.load_my_orders() for order in orders: market_operations.cancel_order(order) logging.info("Settling.") market_operations.crank() market_operations.settle() def add_file_health(name: str, observable: rx.core.typing.Observable[typing.Any], disposer: mango.DisposePropagator): perp_market_file_touch_disposer = observable.subscribe( mango.FileToucherObserver(f"/var/tmp/mango_healthcheck_{name}")) disposer.add_disposable(perp_market_file_touch_disposer) def build_latest_group_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group: mango.Group) -> mango.LatestItemObserverSubscriber[mango.Group]: group_subscription = mango.WebSocketAccountSubscription[mango.Group]( context, group.address, lambda account_info: mango.Group.parse(context, account_info)) manager.add(group_subscription) latest_group_observer = mango.LatestItemObserverSubscriber(group) group_subscription.publisher.subscribe(latest_group_observer) add_file_health("group_subscription", group_subscription.publisher, disposer) return latest_group_observer def build_latest_account_observer(context: mango.Context, account: mango.Account, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> typing.Tuple[mango.WebSocketSubscription[mango.Account], mango.LatestItemObserverSubscriber[mango.Account]]: account_subscription = mango.WebSocketAccountSubscription[mango.Account]( context, account.address, lambda account_info: mango.Account.parse(context, account_info, group_observer.latest)) manager.add(account_subscription) latest_account_observer = mango.LatestItemObserverSubscriber(account) account_subscription.publisher.subscribe(latest_account_observer) add_file_health("account_subscription", account_subscription.publisher, disposer) return account_subscription, latest_account_observer def build_latest_spot_open_orders_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, account: mango.Account, spot_market: mango.SpotMarket) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]: market_index = group.find_spot_market_index(spot_market.address) spot_open_orders_address = account.spot_open_orders[market_index] if spot_open_orders_address is None: spot_market_instruction_builder: mango.SpotMarketInstructionBuilder = mango.SpotMarketInstructionBuilder.load( context, wallet, spot_market.group, account, spot_market) market_operations: mango.SpotMarketOperations = mango.SpotMarketOperations( context, wallet, spot_market.group, account, market, spot_market_instruction_builder) open_orders_address = market_operations.create_openorders_for_market() # This line is a little nasty. Now that we know we have an OpenOrders account at this address, update # the Account so that future uses (like later in this method) have access to it in the right place. account.spot_open_orders[market_index] = open_orders_address spot_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders]( context, spot_open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals)) manager.add(spot_open_orders_subscription) initial_spot_open_orders = mango.OpenOrders.load( context, spot_open_orders_address, spot_market.base.decimals, spot_market.quote.decimals) latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]( initial_spot_open_orders) spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer) add_file_health("open_orders_subscription", spot_open_orders_subscription.publisher, disposer) return latest_open_orders_observer def build_latest_serum_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, serum_market: mango.SerumMarket, context: mango.Context, wallet: mango.Wallet) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]: all_open_orders = mango.OpenOrders.load_for_market_and_owner( context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals) if len(all_open_orders) > 0: initial_serum_open_orders: mango.OpenOrders = all_open_orders[0] open_orders_address = initial_serum_open_orders.address else: raw_market = PySerumMarket.load(context.client, serum_market.address) create_open_orders = mango.build_create_serum_open_orders_instructions( context, wallet, raw_market) open_orders_address = create_open_orders.signers[0].public_key() logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.") signers: mango.CombinableInstructions = mango.CombinableInstructions.from_wallet(wallet) transaction_id = (signers + create_open_orders).execute_and_unwrap_transaction_ids(context)[0] context.wait_for_confirmation(transaction_id) initial_serum_open_orders = mango.OpenOrders.load( context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals) serum_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders]( context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals)) manager.add(serum_open_orders_subscription) latest_serum_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]( initial_serum_open_orders) serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer) add_file_health("open_orders_subscription", serum_open_orders_subscription.publisher, disposer) return latest_serum_open_orders_observer def build_latest_perp_open_orders_observer(disposer: mango.DisposePropagator, perp_market: mango.PerpMarket, account: mango.Account, account_subscription: mango.WebSocketSubscription[mango.Account]) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]: index = group.find_perp_market_index(perp_market.address) initial_open_orders = account.perp_accounts[index].open_orders latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](initial_open_orders) account_subscription.publisher.subscribe( on_next=lambda updated_account: latest_open_orders_observer.on_next(updated_account.perp_accounts[index].open_orders)) add_file_health("open_orders_subscription", account_subscription.publisher, disposer) return latest_open_orders_observer def build_latest_perp_market_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, perp_market_info: mango.PerpMarketInfo, initial_perp_market_details: mango.PerpMarketDetails, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> mango.LatestItemObserverSubscriber[mango.PerpMarketDetails]: perp_market_subscription = mango.WebSocketAccountSubscription[mango.PerpMarketDetails]( context, perp_market_info.address, lambda account_info: mango.PerpMarketDetails.parse(account_info, group_observer.latest)) manager.add(perp_market_subscription) latest_perp_market_observer = mango.LatestItemObserverSubscriber(initial_perp_market_details) perp_market_subscription.publisher.subscribe(latest_perp_market_observer) add_file_health("perp_market_subscription", perp_market_subscription.publisher, disposer) return latest_perp_market_observer def build_latest_price_observer(context: mango.Context, disposer: mango.DisposePropagator, provider_name: str, market: mango.Market) -> mango.LatestItemObserverSubscriber[mango.Price]: oracle_provider: mango.OracleProvider = mango.create_oracle_provider(context, provider_name) oracle = oracle_provider.oracle_for_market(context, market) if oracle is None: raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.") initial_price = oracle.fetch_price(context) price_feed = oracle.to_streaming_observable(context) latest_price_observer = mango.LatestItemObserverSubscriber(initial_price) price_disposable = price_feed.subscribe(latest_price_observer) disposer.add_disposable(price_disposable) add_file_health("price_subscription", price_feed, disposer) return latest_price_observer context = mango.ContextBuilder.from_command_line_parameters(args) disposer = mango.DisposePropagator() manager = mango.WebSocketSubscriptionManager() disposer.add_disposable(manager) wallet = mango.Wallet.from_command_line_parameters_or_raise(args) group = mango.Group.load(context, context.group_id) accounts = mango.Account.load_all_for_owner(context, wallet.address, group) if len(accounts) == 0: raise Exception(f"No mango account found for root address '{wallet.address}'.") account = accounts[args.account_index] market_symbol = args.market.upper() market = context.market_lookup.find_by_symbol(market_symbol) if market is None: raise Exception(f"Could not find market {market_symbol}") # The market index is also the index of the base token in the group's token list. if market.quote != group.shared_quote_token.token: raise Exception( f"Group {group.name} uses shared quote token {group.shared_quote_token.token.symbol}/{group.shared_quote_token.token.mint}, but market {market.symbol} uses quote token {market.quote.symbol}/{market.quote.mint}.") cleanup(context, wallet, args.dry_run, market) latest_group_observer = build_latest_group_observer(context, manager, disposer, group) account_subscription, latest_account_observer = build_latest_account_observer( context, account, manager, disposer, latest_group_observer) latest_price_observer = build_latest_price_observer(context, disposer, args.oracle_provider, market) market = mango.ensure_market_loaded(context, market) market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder( context, wallet, args.dry_run, market, account) if isinstance(market, mango.SerumMarket): latest_perp_market_observer = None latest_open_orders_observer = build_latest_serum_open_orders_observer(manager, disposer, market, context, wallet) elif isinstance(market, mango.SpotMarket): latest_perp_market_observer = None latest_open_orders_observer = build_latest_spot_open_orders_observer(context, manager, disposer, account, market) elif isinstance(market, mango.PerpMarket): perp_market_info = mango.PerpMarketInfo.find_by_address(group.perp_markets, market.address) initial_perp_market_details = mango.PerpMarketDetails.load(context, perp_market_info.address, group) latest_perp_market_observer = build_latest_perp_market_observer( manager, disposer, perp_market_info, initial_perp_market_details, latest_group_observer) latest_open_orders_observer = build_latest_perp_open_orders_observer( disposer, market, account, account_subscription) else: raise Exception(f"Could not determine type of market {market.symbol}") websocket_url = context.cluster_url.replace("https", "ws", 1) ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item) ws.ping_interval = 10 ws.open() order_reconciler = mango.marketmaking.ToleranceOrderReconciler( args.existing_order_tolerance, args.existing_order_tolerance) desired_orders_builder = mango.marketmaking.ConfidenceIntervalDesiredOrdersBuilder( args.position_size_ratio, args.minimum_charge_ratio, args.confidence_weighting, args.order_type) market_maker = mango.marketmaking.MarketMaker( wallet, market, market_instruction_builder, desired_orders_builder, order_reconciler) model_state = mango.marketmaking.ModelState(market, latest_account_observer, latest_group_observer, latest_price_observer, latest_perp_market_observer, latest_open_orders_observer) add_file_health("marketmaker_pulse", market_maker.pulse_complete, disposer) logging.info("Current assets in account:") mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info) pulse_disposable = rx.interval(args.pulse_interval).subscribe( on_next=lambda _: market_maker.pulse(context, model_state)) disposer.add_disposable(pulse_disposable) # Wait - don't exit. Exiting will be handled by signals/interrupts. waiter = threading.Event() try: waiter.wait() except: pass logging.info("Shutting down...") ws.close() disposer.dispose() cleanup(context, wallet, args.dry_run, market) logging.info("Shutdown complete.")