mango-explorer/bin/marketmaker

266 lines
15 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import sys
import threading
import typing
from decimal import Decimal
from pyserum.market import Market as PySerumMarket
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
import mango.marketmaking.fixedratiosdesiredordersbuilder # nopep8
import mango.marketmaking.marketmaker # nopep8
import mango.marketmaking.modelstate # nopep8
import mango.marketmaking.toleranceorderreconciler # nopep8
parser = argparse.ArgumentParser(description="Shows the on-chain data of a particular account.")
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--market", type=str, required=True, help="market symbol to make market upon (e.g. ETH/USDC)")
parser.add_argument("--account-index", type=int, default=0,
help="index of the account to use, if more than one available")
parser.add_argument("--oracle-provider", type=str, required=True,
help="name of the price provider to use (e.g. pyth)")
parser.add_argument("--spread-ratio", type=Decimal, action="append", default=[], required=True,
help="fraction of the mid price to be added and subtracted to calculate buy and sell prices (can be specified multiple times)")
parser.add_argument("--position-size-ratio", type=Decimal, action="append", default=[], required=True,
help="fraction of the token inventory to be bought or sold in each order (can be specified multiple times)")
parser.add_argument("--existing-order-tolerance", type=Decimal, default=Decimal("0.001"),
help="tolerance in price and quantity when matching existing orders or cancelling/replacing")
parser.add_argument("--order-type", type=mango.OrderType, default=mango.OrderType.POST_ONLY,
choices=list(mango.OrderType), help="Order type: LIMIT, IOC or POST_ONLY")
parser.add_argument("--pulse-interval", type=int, default=10,
help="number of seconds between each 'pulse' of the market maker")
parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions")
args = parser.parse_args()
if len(args.spread_ratio) != len(args.position_size_ratio):
raise Exception(
f"Number of spread ratio parameters ({len(args.spread_ratio)}) does not match number of position size ratio ({len(args.position_size_ratio)}).")
logging.getLogger().setLevel(args.log_level)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
def cleanup(context: mango.Context, wallet: mango.Wallet, dry_run: bool, market: mango.Market):
logging.info("Cleaning up.")
market_operations: mango.MarketOperations = mango.create_market_operations(context, wallet, dry_run, market)
orders = market_operations.load_my_orders()
for order in orders:
market_operations.cancel_order(order)
logging.info("Settling.")
market_operations.crank()
market_operations.settle()
2021-07-16 03:54:13 -07:00
def add_file_health(name: str, observable: rx.core.typing.Observable[typing.Any], disposer: mango.DisposePropagator):
perp_market_file_touch_disposer = observable.subscribe(
mango.FileToucherObserver(f"/var/tmp/mango_healthcheck_{name}"))
disposer.add_disposable(perp_market_file_touch_disposer)
def build_latest_group_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group: mango.Group) -> mango.LatestItemObserverSubscriber[mango.Group]:
group_subscription = mango.WebSocketAccountSubscription[mango.Group](
context, group.address, lambda account_info: mango.Group.parse(context, account_info))
manager.add(group_subscription)
latest_group_observer = mango.LatestItemObserverSubscriber(group)
group_subscription.publisher.subscribe(latest_group_observer)
add_file_health("group_subscription", group_subscription.publisher, disposer)
return latest_group_observer
def build_latest_account_observer(context: mango.Context, account: mango.Account, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> typing.Tuple[mango.WebSocketSubscription[mango.Account], mango.LatestItemObserverSubscriber[mango.Account]]:
account_subscription = mango.WebSocketAccountSubscription[mango.Account](
context, account.address, lambda account_info: mango.Account.parse(context, account_info, group_observer.latest))
manager.add(account_subscription)
latest_account_observer = mango.LatestItemObserverSubscriber(account)
account_subscription.publisher.subscribe(latest_account_observer)
add_file_health("account_subscription", account_subscription.publisher, disposer)
return account_subscription, latest_account_observer
def build_latest_spot_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, spot_market: mango.SpotMarket) -> mango.LatestItemObserverSubscriber[mango.OpenOrders]:
market_index = group.find_spot_market_index(spot_market.address)
spot_open_orders_address = account.spot_open_orders[market_index]
if spot_open_orders_address is None:
raise Exception(f"No spot OpenOrders for market {spot_market.symbol}.")
spot_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
context, spot_open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals))
manager.add(spot_open_orders_subscription)
initial_spot_open_orders = mango.OpenOrders.load(
context, spot_open_orders_address, spot_market.base.decimals, spot_market.quote.decimals)
latest_open_orders_observer = mango.LatestItemObserverSubscriber(initial_spot_open_orders)
spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer)
add_file_health("open_orders_subscription", spot_open_orders_subscription.publisher, disposer)
return latest_open_orders_observer
def build_latest_serum_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, serum_market: mango.SerumMarket, context: mango.Context, wallet: mango.Wallet) -> mango.LatestItemObserverSubscriber[mango.OpenOrders]:
all_open_orders = mango.OpenOrders.load_for_market_and_owner(
context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals)
if len(all_open_orders) > 0:
initial_serum_open_orders: mango.OpenOrders = all_open_orders[0]
open_orders_address = initial_serum_open_orders.address
else:
raw_market = PySerumMarket.load(context.client, serum_market.address)
create_open_orders = mango.build_create_serum_open_orders_instructions(
context, wallet, raw_market)
open_orders_address = create_open_orders.signers[0].public_key()
logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.")
signers: mango.CombinableInstructions = mango.CombinableInstructions.from_wallet(wallet)
transaction_id = (signers + create_open_orders).execute_and_unwrap_transaction_ids(context)[0]
context.wait_for_confirmation(transaction_id)
initial_serum_open_orders = mango.OpenOrders.load(
context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals)
serum_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals))
manager.add(serum_open_orders_subscription)
latest_serum_open_orders_observer = mango.LatestItemObserverSubscriber(initial_serum_open_orders)
serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer)
add_file_health("open_orders_subscription", serum_open_orders_subscription.publisher, disposer)
return latest_serum_open_orders_observer
def build_latest_perp_open_orders_observer(disposer: mango.DisposePropagator, perp_market: mango.PerpMarket, account: mango.Account, account_subscription: mango.WebSocketSubscription[mango.Account]) -> mango.LatestItemObserverSubscriber[mango.OpenOrders]:
index = group.find_perp_market_index(perp_market.address)
def _build_open_orders_from_account(account: mango.Account) -> mango.OpenOrders:
perp_account = account.perp_accounts[index]
perp_open_orders = perp_account.open_orders
return mango.OpenOrders.from_perps_account_layout(context, account, perp_market, perp_open_orders)
initial_open_orders = _build_open_orders_from_account(account)
latest_open_orders_observer = mango.LatestItemObserverSubscriber(initial_open_orders)
account_subscription.publisher.subscribe(
on_next=lambda account: latest_open_orders_observer.on_next(_build_open_orders_from_account(account)))
add_file_health("open_orders_subscription", account_subscription.publisher, disposer)
return latest_open_orders_observer
def build_latest_perp_market_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, perp_market_info: mango.PerpMarketInfo, initial_perp_market_details: mango.PerpMarketDetails, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> mango.LatestItemObserverSubscriber[mango.PerpMarketDetails]:
perp_market_subscription = mango.WebSocketAccountSubscription[mango.PerpMarketDetails](
context, perp_market_info.address, lambda account_info: mango.PerpMarketDetails.parse(account_info, group_observer.latest))
manager.add(perp_market_subscription)
latest_perp_market_observer = mango.LatestItemObserverSubscriber(initial_perp_market_details)
perp_market_subscription.publisher.subscribe(latest_perp_market_observer)
add_file_health("perp_market_subscription", perp_market_subscription.publisher, disposer)
return latest_perp_market_observer
def build_latest_price_observer(context: mango.Context, disposer: mango.DisposePropagator, provider_name: str, market: mango.Market) -> mango.LatestItemObserverSubscriber[mango.Price]:
oracle_provider: mango.OracleProvider = mango.create_oracle_provider(provider_name)
oracle = oracle_provider.oracle_for_market(context, market)
if oracle is None:
raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.")
initial_price = oracle.fetch_price(context)
price_feed = oracle.to_streaming_observable(context)
latest_price_observer = mango.LatestItemObserverSubscriber(initial_price)
price_disposable = price_feed.subscribe(latest_price_observer)
disposer.add_disposable(price_disposable)
add_file_health("price_subscription", price_feed, disposer)
return latest_price_observer
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
wallet = mango.Wallet.from_command_line_parameters_or_raise(args)
group = mango.Group.load(context, context.group_id)
accounts = mango.Account.load_all_for_owner(context, wallet.address, group)
if len(accounts) == 0:
raise Exception(f"No mango account found for root address '{wallet.address}'.")
account = accounts[args.account_index]
market_symbol = args.market.upper()
market = context.market_lookup.find_by_symbol(market_symbol)
if market is None:
raise Exception(f"Could not find market {market_symbol}")
# The market index is also the index of the base token in the group's token list.
if market.quote != group.shared_quote_token.token:
raise Exception(
f"Group {group.name} uses shared quote token {group.shared_quote_token.token.symbol}/{group.shared_quote_token.token.mint}, but market {market.symbol} uses quote token {market.quote.symbol}/{market.quote.mint}.")
cleanup(context, wallet, args.dry_run, market)
latest_group_observer = build_latest_group_observer(context, manager, disposer, group)
account_subscription, latest_account_observer = build_latest_account_observer(
context, account, manager, disposer, latest_group_observer)
latest_price_observer = build_latest_price_observer(context, disposer, args.oracle_provider, market)
market = mango.ensure_market_loaded(context, market)
market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder(
context, wallet, args.dry_run, market, account)
if isinstance(market, mango.SerumMarket):
latest_perp_market_observer = None
latest_open_orders_observer = build_latest_serum_open_orders_observer(manager, disposer, market, context, wallet)
elif isinstance(market, mango.SpotMarket):
latest_perp_market_observer = None
latest_open_orders_observer = build_latest_spot_open_orders_observer(manager, disposer, market)
elif isinstance(market, mango.PerpMarket):
perp_market_info = mango.PerpMarketInfo.find_by_address(group.perp_markets, market.address)
initial_perp_market_details = mango.PerpMarketDetails.load(context, perp_market_info.address, group)
latest_perp_market_observer = build_latest_perp_market_observer(
manager, disposer, perp_market_info, initial_perp_market_details, latest_group_observer)
latest_open_orders_observer = build_latest_perp_open_orders_observer(
disposer, market, account, account_subscription)
else:
raise Exception(f"Could not determine type of market {market.symbol}")
websocket_url = context.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws.ping_interval = 10
ws.open()
order_reconciler = mango.marketmaking.toleranceorderreconciler.ToleranceOrderReconciler(
args.existing_order_tolerance, args.existing_order_tolerance)
desired_orders_builder = mango.marketmaking.fixedratiosdesiredordersbuilder.FixedRatiosDesiredOrdersBuilder(
args.spread_ratio, args.position_size_ratio, args.order_type)
market_maker = mango.marketmaking.marketmaker.MarketMaker(
wallet, market, market_instruction_builder, desired_orders_builder, order_reconciler)
model_state = mango.marketmaking.modelstate.ModelState(market, latest_account_observer,
latest_group_observer, latest_price_observer,
latest_perp_market_observer,
latest_open_orders_observer)
add_file_health("marketmaker_pulse", market_maker.pulse_complete, disposer)
logging.info("Current assets in account:")
mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info)
pulse_disposable = rx.interval(args.pulse_interval).subscribe(
on_next=lambda _: market_maker.pulse(context, model_state))
disposer.add_disposable(pulse_disposable)
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
try:
waiter.wait()
except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
cleanup(context, wallet, args.dry_run, market)
logging.info("Shutdown complete.")