mango-explorer/bin/marketmaker

262 lines
15 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import sys
import typing
from decimal import Decimal
from pyserum.market import Market as PySerumMarket
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
import mango.marketmaking.fixedratiosdesiredordersbuilder # nopep8
import mango.marketmaking.marketmaker # nopep8
import mango.marketmaking.modelstate # nopep8
import mango.marketmaking.toleranceorderreconciler # nopep8
parser = argparse.ArgumentParser(description="Shows the on-chain data of a particular account.")
mango.Context.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--market", type=str, required=True, help="market symbol to make market upon (e.g. ETH/USDC)")
parser.add_argument("--account-index", type=int, default=0,
help="index of the account to use, if more than one available")
parser.add_argument("--oracle-provider", type=str, required=True,
help="name of the price provider to use (e.g. pyth)")
parser.add_argument("--spread-ratio", type=Decimal, action="append", default=[], required=True,
help="fraction of the mid price to be added and subtracted to calculate buy and sell prices (can be specified multiple times)")
parser.add_argument("--position-size-ratio", type=Decimal, action="append", default=[], required=True,
help="fraction of the token inventory to be bought or sold in each order (can be specified multiple times)")
parser.add_argument("--existing-order-tolerance", type=Decimal, default=Decimal("0.001"),
help="tolerance in price and quantity when matching existing orders or cancelling/replacing")
parser.add_argument("--pulse-interval", type=int, default=10,
help="number of seconds between each 'pulse' of the market maker")
parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions")
args = parser.parse_args()
if len(args.spread_ratio) != len(args.position_size_ratio):
raise Exception(
f"Number of spread ratio parameters ({len(args.spread_ratio)}) does not match number of position size ratio ({len(args.position_size_ratio)}).")
logging.getLogger().setLevel(args.log_level)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
def cleanup(context: mango.Context, wallet: mango.Wallet, dry_run: bool, market: mango.Market):
logging.info("Cleaning up.")
market_operations: mango.MarketOperations = mango.create_market_operations(context, wallet, dry_run, market)
orders = market_operations.load_my_orders()
for order in orders:
market_operations.cancel_order(order)
2021-07-16 03:54:13 -07:00
def add_file_health(name: str, observable: rx.core.typing.Observable[typing.Any], disposer: mango.DisposePropagator):
perp_market_file_touch_disposer = observable.subscribe(
mango.FileToucherObserver(f"/var/tmp/mango_healthcheck_{name}"))
disposer.add_disposable(perp_market_file_touch_disposer)
def build_latest_group_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group: mango.Group) -> mango.LatestItemObserverSubscriber[mango.Group]:
group_subscription = mango.WebSocketSubscription[mango.Group](
context, group.address, lambda account_info: mango.Group.parse(context, account_info))
manager.add(group_subscription)
latest_group_observer = mango.LatestItemObserverSubscriber(group)
group_subscription.publisher.subscribe(latest_group_observer)
add_file_health("group_subscription", group_subscription.publisher, disposer)
return latest_group_observer
def build_latest_account_observer(context: mango.Context, account: mango.Account, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> typing.Tuple[mango.WebSocketSubscription[mango.Account], mango.LatestItemObserverSubscriber[mango.Account]]:
account_subscription = mango.WebSocketSubscription[mango.Account](
context, account.address, lambda account_info: mango.Account.parse(context, account_info, group_observer.latest))
manager.add(account_subscription)
latest_account_observer = mango.LatestItemObserverSubscriber(account)
account_subscription.publisher.subscribe(latest_account_observer)
add_file_health("account_subscription", account_subscription.publisher, disposer)
return account_subscription, latest_account_observer
def build_latest_spot_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, spot_market: mango.SpotMarket) -> mango.LatestItemObserverSubscriber[mango.OpenOrders]:
market_index = group.find_spot_market_index(spot_market.address)
spot_open_orders_address = account.spot_open_orders[market_index]
if spot_open_orders_address is None:
raise Exception(f"No spot OpenOrders for market {spot_market.symbol}.")
spot_open_orders_subscription = mango.WebSocketSubscription[mango.OpenOrders](
context, spot_open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals))
manager.add(spot_open_orders_subscription)
initial_spot_open_orders = mango.OpenOrders.load(
context, spot_open_orders_address, spot_market.base.decimals, spot_market.quote.decimals)
latest_open_orders_observer = mango.LatestItemObserverSubscriber(initial_spot_open_orders)
spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer)
add_file_health("open_orders_subscription", spot_open_orders_subscription.publisher, disposer)
return latest_open_orders_observer
def build_latest_serum_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, serum_market: mango.SerumMarket, context: mango.Context, wallet: mango.Wallet) -> mango.LatestItemObserverSubscriber[mango.OpenOrders]:
all_open_orders = mango.OpenOrders.load_for_market_and_owner(
context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals)
if len(all_open_orders) > 0:
initial_serum_open_orders: mango.OpenOrders = all_open_orders[0]
open_orders_address = initial_serum_open_orders.address
else:
raw_market = PySerumMarket.load(context.client, serum_market.address)
create_open_orders = mango.build_create_serum_open_orders_instructions(
context, wallet, raw_market)
open_orders_address = create_open_orders.signers[0].public_key()
logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.")
signers: mango.CombinableInstructions = mango.CombinableInstructions.from_wallet(wallet)
transaction_id = (signers + create_open_orders).execute_and_unwrap_transaction_ids(context)[0]
context.wait_for_confirmation(transaction_id)
initial_serum_open_orders = mango.OpenOrders.load(
context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals)
serum_open_orders_subscription = mango.WebSocketSubscription[mango.OpenOrders](
context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals))
manager.add(serum_open_orders_subscription)
latest_serum_open_orders_observer = mango.LatestItemObserverSubscriber(initial_serum_open_orders)
serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer)
add_file_health("open_orders_subscription", serum_open_orders_subscription.publisher, disposer)
return latest_serum_open_orders_observer
def build_latest_perp_open_orders_observer(disposer: mango.DisposePropagator, perp_market: mango.PerpsMarket, account: mango.Account, account_subscription: mango.WebSocketSubscription[mango.Account]) -> mango.LatestItemObserverSubscriber[mango.OpenOrders]:
index = group.find_perp_market_index(perp_market.address)
def _build_open_orders_from_account(account: mango.Account) -> mango.OpenOrders:
perp_account = account.perp_accounts[index]
perp_open_orders = perp_account.open_orders
return mango.OpenOrders.from_perps_account_layout(context, account, perp_market, perp_open_orders)
initial_open_orders = _build_open_orders_from_account(account)
latest_open_orders_observer = mango.LatestItemObserverSubscriber(initial_open_orders)
account_subscription.publisher.subscribe(
on_next=lambda account: latest_open_orders_observer.on_next(_build_open_orders_from_account(account)))
add_file_health("open_orders_subscription", account_subscription.publisher, disposer)
return latest_open_orders_observer
def build_latest_perp_market_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, perp_market_info: mango.PerpMarketInfo, initial_perp_market: mango.PerpMarket, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> mango.LatestItemObserverSubscriber[mango.PerpMarket]:
perp_market_subscription = mango.WebSocketSubscription[mango.PerpMarket](
context, perp_market_info.address, lambda account_info: mango.PerpMarket.parse(account_info, group_observer.latest))
manager.add(perp_market_subscription)
latest_perp_market_observer = mango.LatestItemObserverSubscriber(initial_perp_market)
perp_market_subscription.publisher.subscribe(latest_perp_market_observer)
add_file_health("perp_market_subscription", perp_market_subscription.publisher, disposer)
return latest_perp_market_observer
def build_latest_price_observer(context: mango.Context, disposer: mango.DisposePropagator, provider_name: str, market: mango.Market) -> mango.LatestItemObserverSubscriber[mango.Price]:
oracle_provider: mango.OracleProvider = mango.create_oracle_provider(provider_name)
oracle = oracle_provider.oracle_for_market(context, market)
if oracle is None:
raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.")
initial_price = oracle.fetch_price(context)
price_feed = oracle.to_streaming_observable(context)
latest_price_observer = mango.LatestItemObserverSubscriber(initial_price)
price_disposable = price_feed.subscribe(latest_price_observer)
disposer.add_disposable(price_disposable)
add_file_health("price_subscription", price_feed, disposer)
return latest_price_observer
context = mango.Context.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
wallet = mango.Wallet.from_command_line_parameters_or_raise(args)
group = mango.Group.load(context, context.group_id)
accounts = mango.Account.load_all_for_owner(context, wallet.address, group)
if len(accounts) == 0:
raise Exception(f"No mango account found for root address '{wallet.address}'.")
account = accounts[args.account_index]
market_symbol = args.market.upper()
market = context.market_lookup.find_by_symbol(market_symbol)
if market is None:
raise Exception(f"Could not find market {market_symbol}")
# The market index is also the index of the base token in the group's token list.
if market.quote != group.shared_quote_token.token:
raise Exception(
f"Group {group.name} uses shared quote token {group.shared_quote_token.token.symbol}/{group.shared_quote_token.token.mint}, but market {market.symbol} uses quote token {market.quote.symbol}/{market.quote.mint}.")
cleanup(context, wallet, args.dry_run, market)
latest_group_observer = build_latest_group_observer(context, manager, disposer, group)
account_subscription, latest_account_observer = build_latest_account_observer(
context, account, manager, disposer, latest_group_observer)
latest_price_observer = build_latest_price_observer(context, disposer, args.oracle_provider, market)
market_instruction_builder: typing.Optional[mango.MarketInstructionBuilder] = None
if isinstance(market, mango.SerumMarket):
market_as_serum_market: mango.SerumMarket = typing.cast(mango.SerumMarket, market)
latest_perp_market_observer = None
latest_open_orders_observer = build_latest_serum_open_orders_observer(
manager, disposer, market_as_serum_market, context, wallet)
market_instruction_builder = mango.SerumMarketInstructionBuilder.load(context, wallet, market)
elif isinstance(market, mango.SpotMarket):
market_as_spot_market: mango.SpotMarket = typing.cast(mango.SpotMarket, market)
market_instruction_builder = mango.SpotMarketInstructionBuilder.load(context, wallet, group, account, market)
latest_perp_market_observer = None
latest_open_orders_observer = build_latest_spot_open_orders_observer(manager, disposer, market_as_spot_market)
elif isinstance(market, mango.PerpsMarket):
market_as_perps_market: mango.PerpsMarket = typing.cast(mango.PerpsMarket, market)
perp_market_info = mango.PerpMarketInfo.find_by_address(group.perp_markets, market.address)
initial_perp_market = mango.PerpMarket.load(context, perp_market_info.address, group)
latest_perp_market_observer = build_latest_perp_market_observer(
manager, disposer, perp_market_info, initial_perp_market, latest_group_observer)
market_instruction_builder = mango.PerpMarketInstructionBuilder.load(
context, wallet, group, account, initial_perp_market)
latest_open_orders_observer = build_latest_perp_open_orders_observer(
disposer, market_as_perps_market, account, account_subscription)
else:
raise Exception(f"Could not determine type of market {market.symbol}")
websocket_url = context.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws.ping_interval = 10
ws.open()
order_reconciler = mango.marketmaking.toleranceorderreconciler.ToleranceOrderReconciler(
args.existing_order_tolerance, args.existing_order_tolerance)
desired_orders_builder = mango.marketmaking.fixedratiosdesiredordersbuilder.FixedRatiosDesiredOrdersBuilder(
args.spread_ratio, args.position_size_ratio)
market_maker = mango.marketmaking.marketmaker.MarketMaker(
wallet, market, market_instruction_builder, desired_orders_builder, order_reconciler)
model_state = mango.marketmaking.modelstate.ModelState(market, latest_account_observer,
latest_group_observer, latest_price_observer,
latest_perp_market_observer,
latest_open_orders_observer)
add_file_health("marketmaker_pulse", market_maker.pulse_complete, disposer)
pulse_disposable = rx.interval(args.pulse_interval).subscribe(
on_next=lambda _: market_maker.pulse(context, model_state))
disposer.add_disposable(pulse_disposable)
print("Press <ENTER> to quit.")
# Wait - don't exit
input()
print("Shutting down...")
ws.close()
disposer.dispose()
cleanup(context, wallet, args.dry_run, market)
print("Done.")