mango-explorer/bin/marketmaker

265 lines
16 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import sys
import threading
import typing
from decimal import Decimal
from pyserum.market import Market as PySerumMarket
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
import mango.marketmaking # nopep8
parser = argparse.ArgumentParser(description="Shows the on-chain data of a particular account.")
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--market", type=str, required=True, help="market symbol to make market upon (e.g. ETH/USDC)")
parser.add_argument("--account-index", type=int, default=0,
help="index of the account to use, if more than one available")
parser.add_argument("--oracle-provider", type=str, required=True,
help="name of the price provider to use (e.g. pyth)")
parser.add_argument("--position-size-ratio", type=Decimal, required=True,
help="fraction of the token inventory to be bought or sold in each order")
parser.add_argument("--existing-order-tolerance", type=Decimal, default=Decimal("0.001"),
help="tolerance in price and quantity when matching existing orders or cancelling/replacing")
parser.add_argument("--minimum-charge-ratio", type=Decimal, default=Decimal("0.0005"),
help="minimum fraction of the price to be accept as a spread")
parser.add_argument("--confidence-weighting", type=Decimal, default=Decimal("2"),
help="a weighting to apply to the confidence interval from the oracle: e.g. 1 - use the oracle confidence interval as the spread, 2 (risk averse, default) - multiply the oracle confidence interval by 2 to get the spread, 0.5 (aggressive) halve the oracle confidence interval to get the spread")
parser.add_argument("--order-type", type=mango.OrderType, default=mango.OrderType.POST_ONLY,
choices=list(mango.OrderType), help="Order type: LIMIT, IOC or POST_ONLY")
parser.add_argument("--pulse-interval", type=int, default=10,
help="number of seconds between each 'pulse' of the market maker")
parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
def cleanup(context: mango.Context, wallet: mango.Wallet, dry_run: bool, market: mango.Market):
logging.info("Cleaning up.")
market_operations: mango.MarketOperations = mango.create_market_operations(context, wallet, dry_run, market)
orders = market_operations.load_my_orders()
for order in orders:
market_operations.cancel_order(order)
logging.info("Settling.")
market_operations.crank()
market_operations.settle()
2021-07-16 03:54:13 -07:00
def add_file_health(name: str, observable: rx.core.typing.Observable[typing.Any], disposer: mango.DisposePropagator):
perp_market_file_touch_disposer = observable.subscribe(
mango.FileToucherObserver(f"/var/tmp/mango_healthcheck_{name}"))
disposer.add_disposable(perp_market_file_touch_disposer)
def build_latest_group_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group: mango.Group) -> mango.LatestItemObserverSubscriber[mango.Group]:
group_subscription = mango.WebSocketAccountSubscription[mango.Group](
context, group.address, lambda account_info: mango.Group.parse(context, account_info))
manager.add(group_subscription)
latest_group_observer = mango.LatestItemObserverSubscriber(group)
group_subscription.publisher.subscribe(latest_group_observer)
add_file_health("group_subscription", group_subscription.publisher, disposer)
return latest_group_observer
def build_latest_account_observer(context: mango.Context, account: mango.Account, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> typing.Tuple[mango.WebSocketSubscription[mango.Account], mango.LatestItemObserverSubscriber[mango.Account]]:
account_subscription = mango.WebSocketAccountSubscription[mango.Account](
context, account.address, lambda account_info: mango.Account.parse(context, account_info, group_observer.latest))
manager.add(account_subscription)
latest_account_observer = mango.LatestItemObserverSubscriber(account)
account_subscription.publisher.subscribe(latest_account_observer)
add_file_health("account_subscription", account_subscription.publisher, disposer)
return account_subscription, latest_account_observer
def build_latest_spot_open_orders_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, account: mango.Account, spot_market: mango.SpotMarket) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
market_index = group.find_spot_market_index(spot_market.address)
spot_open_orders_address = account.spot_open_orders[market_index]
if spot_open_orders_address is None:
spot_market_instruction_builder: mango.SpotMarketInstructionBuilder = mango.SpotMarketInstructionBuilder.load(
context, wallet, spot_market.group, account, spot_market)
market_operations: mango.SpotMarketOperations = mango.SpotMarketOperations(
context, wallet, spot_market.group, account, market, spot_market_instruction_builder)
open_orders_address = market_operations.create_openorders_for_market()
# This line is a little nasty. Now that we know we have an OpenOrders account at this address, update
# the Account so that future uses (like later in this method) have access to it in the right place.
account.spot_open_orders[market_index] = open_orders_address
spot_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
context, spot_open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals))
manager.add(spot_open_orders_subscription)
initial_spot_open_orders = mango.OpenOrders.load(
context, spot_open_orders_address, spot_market.base.decimals, spot_market.quote.decimals)
latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](
initial_spot_open_orders)
spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer)
add_file_health("open_orders_subscription", spot_open_orders_subscription.publisher, disposer)
return latest_open_orders_observer
def build_latest_serum_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, serum_market: mango.SerumMarket, context: mango.Context, wallet: mango.Wallet) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
all_open_orders = mango.OpenOrders.load_for_market_and_owner(
context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals)
if len(all_open_orders) > 0:
initial_serum_open_orders: mango.OpenOrders = all_open_orders[0]
open_orders_address = initial_serum_open_orders.address
else:
raw_market = PySerumMarket.load(context.client, serum_market.address)
create_open_orders = mango.build_create_serum_open_orders_instructions(
context, wallet, raw_market)
open_orders_address = create_open_orders.signers[0].public_key()
logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.")
signers: mango.CombinableInstructions = mango.CombinableInstructions.from_wallet(wallet)
transaction_id = (signers + create_open_orders).execute_and_unwrap_transaction_ids(context)[0]
context.wait_for_confirmation(transaction_id)
initial_serum_open_orders = mango.OpenOrders.load(
context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals)
serum_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals))
manager.add(serum_open_orders_subscription)
latest_serum_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](
initial_serum_open_orders)
serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer)
add_file_health("open_orders_subscription", serum_open_orders_subscription.publisher, disposer)
return latest_serum_open_orders_observer
def build_latest_perp_open_orders_observer(disposer: mango.DisposePropagator, perp_market: mango.PerpMarket, account: mango.Account, account_subscription: mango.WebSocketSubscription[mango.Account]) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
index = group.find_perp_market_index(perp_market.address)
initial_open_orders = account.perp_accounts[index].open_orders
latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](initial_open_orders)
account_subscription.publisher.subscribe(
on_next=lambda updated_account: latest_open_orders_observer.on_next(updated_account.perp_accounts[index].open_orders))
add_file_health("open_orders_subscription", account_subscription.publisher, disposer)
return latest_open_orders_observer
def build_latest_perp_market_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, perp_market_info: mango.PerpMarketInfo, initial_perp_market_details: mango.PerpMarketDetails, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> mango.LatestItemObserverSubscriber[mango.PerpMarketDetails]:
perp_market_subscription = mango.WebSocketAccountSubscription[mango.PerpMarketDetails](
context, perp_market_info.address, lambda account_info: mango.PerpMarketDetails.parse(account_info, group_observer.latest))
manager.add(perp_market_subscription)
latest_perp_market_observer = mango.LatestItemObserverSubscriber(initial_perp_market_details)
perp_market_subscription.publisher.subscribe(latest_perp_market_observer)
add_file_health("perp_market_subscription", perp_market_subscription.publisher, disposer)
return latest_perp_market_observer
def build_latest_price_observer(context: mango.Context, disposer: mango.DisposePropagator, provider_name: str, market: mango.Market) -> mango.LatestItemObserverSubscriber[mango.Price]:
oracle_provider: mango.OracleProvider = mango.create_oracle_provider(context, provider_name)
oracle = oracle_provider.oracle_for_market(context, market)
if oracle is None:
raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.")
initial_price = oracle.fetch_price(context)
price_feed = oracle.to_streaming_observable(context)
latest_price_observer = mango.LatestItemObserverSubscriber(initial_price)
price_disposable = price_feed.subscribe(latest_price_observer)
disposer.add_disposable(price_disposable)
add_file_health("price_subscription", price_feed, disposer)
return latest_price_observer
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
wallet = mango.Wallet.from_command_line_parameters_or_raise(args)
group = mango.Group.load(context, context.group_id)
accounts = mango.Account.load_all_for_owner(context, wallet.address, group)
if len(accounts) == 0:
raise Exception(f"No mango account found for root address '{wallet.address}'.")
account = accounts[args.account_index]
market_symbol = args.market.upper()
market = context.market_lookup.find_by_symbol(market_symbol)
if market is None:
raise Exception(f"Could not find market {market_symbol}")
# The market index is also the index of the base token in the group's token list.
if market.quote != group.shared_quote_token.token:
raise Exception(
f"Group {group.name} uses shared quote token {group.shared_quote_token.token.symbol}/{group.shared_quote_token.token.mint}, but market {market.symbol} uses quote token {market.quote.symbol}/{market.quote.mint}.")
cleanup(context, wallet, args.dry_run, market)
latest_group_observer = build_latest_group_observer(context, manager, disposer, group)
account_subscription, latest_account_observer = build_latest_account_observer(
context, account, manager, disposer, latest_group_observer)
latest_price_observer = build_latest_price_observer(context, disposer, args.oracle_provider, market)
market = mango.ensure_market_loaded(context, market)
market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder(
context, wallet, args.dry_run, market, account)
if isinstance(market, mango.SerumMarket):
latest_perp_market_observer = None
latest_open_orders_observer = build_latest_serum_open_orders_observer(manager, disposer, market, context, wallet)
elif isinstance(market, mango.SpotMarket):
latest_perp_market_observer = None
latest_open_orders_observer = build_latest_spot_open_orders_observer(context, manager, disposer, account, market)
elif isinstance(market, mango.PerpMarket):
perp_market_info = mango.PerpMarketInfo.find_by_address(group.perp_markets, market.address)
initial_perp_market_details = mango.PerpMarketDetails.load(context, perp_market_info.address, group)
latest_perp_market_observer = build_latest_perp_market_observer(
manager, disposer, perp_market_info, initial_perp_market_details, latest_group_observer)
latest_open_orders_observer = build_latest_perp_open_orders_observer(
disposer, market, account, account_subscription)
else:
raise Exception(f"Could not determine type of market {market.symbol}")
websocket_url = context.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws.ping_interval = 10
ws.open()
order_reconciler = mango.marketmaking.ToleranceOrderReconciler(
args.existing_order_tolerance, args.existing_order_tolerance)
desired_orders_builder = mango.marketmaking.ConfidenceIntervalDesiredOrdersBuilder(
args.position_size_ratio, args.minimum_charge_ratio, args.confidence_weighting, args.order_type)
market_maker = mango.marketmaking.MarketMaker(
wallet, market, market_instruction_builder, desired_orders_builder, order_reconciler)
model_state = mango.marketmaking.ModelState(market, latest_account_observer,
latest_group_observer, latest_price_observer,
latest_perp_market_observer,
latest_open_orders_observer)
add_file_health("marketmaker_pulse", market_maker.pulse_complete, disposer)
logging.info("Current assets in account:")
mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info)
pulse_disposable = rx.interval(args.pulse_interval).subscribe(
on_next=lambda _: market_maker.pulse(context, model_state))
disposer.add_disposable(pulse_disposable)
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
try:
waiter.wait()
except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
cleanup(context, wallet, args.dry_run, market)
logging.info("Shutdown complete.")