294 lines
17 KiB
Plaintext
Executable File
294 lines
17 KiB
Plaintext
Executable File
#!/usr/bin/env pyston3
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import rx
|
|
import sys
|
|
import threading
|
|
import typing
|
|
|
|
from decimal import Decimal
|
|
from pyserum.market import Market as PySerumMarket
|
|
|
|
sys.path.insert(0, os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..")))
|
|
import mango # nopep8
|
|
import mango.marketmaking # nopep8
|
|
|
|
parser = argparse.ArgumentParser(description="Shows the on-chain data of a particular account.")
|
|
mango.ContextBuilder.add_command_line_parameters(parser)
|
|
mango.Wallet.add_command_line_parameters(parser)
|
|
parser.add_argument("--market", type=str, required=True, help="market symbol to make market upon (e.g. ETH/USDC)")
|
|
parser.add_argument("--oracle-provider", type=str, required=True,
|
|
help="name of the price provider to use (e.g. pyth)")
|
|
parser.add_argument("--position-size-ratio", type=Decimal, required=True,
|
|
help="fraction of the token inventory to be bought or sold in each order")
|
|
parser.add_argument("--existing-order-tolerance", type=Decimal, default=Decimal("0.001"),
|
|
help="tolerance in price and quantity when matching existing orders or cancelling/replacing")
|
|
parser.add_argument("--minimum-charge-ratio", type=Decimal, default=Decimal("0.0005"),
|
|
help="minimum fraction of the price to be accept as a spread")
|
|
parser.add_argument("--confidence-weighting", type=Decimal, default=Decimal("2"),
|
|
help="a weighting to apply to the confidence interval from the oracle: e.g. 1 - use the oracle confidence interval as the spread, 2 (risk averse, default) - multiply the oracle confidence interval by 2 to get the spread, 0.5 (aggressive) halve the oracle confidence interval to get the spread")
|
|
parser.add_argument("--order-type", type=mango.OrderType, default=mango.OrderType.POST_ONLY,
|
|
choices=list(mango.OrderType), help="Order type: LIMIT, IOC or POST_ONLY")
|
|
parser.add_argument("--pulse-interval", type=int, default=10,
|
|
help="number of seconds between each 'pulse' of the market maker")
|
|
parser.add_argument("--account-index", type=int, default=0,
|
|
help="index of the account to use, if more than one available")
|
|
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[],
|
|
help="The notification target for error events")
|
|
parser.add_argument("--dry-run", action="store_true", default=False,
|
|
help="runs as read-only and does not perform any transactions")
|
|
args = parser.parse_args()
|
|
|
|
logging.getLogger().setLevel(args.log_level)
|
|
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
|
|
for notify in args.notify_errors:
|
|
handler = mango.NotificationHandler(notify)
|
|
handler.setLevel(logging.ERROR)
|
|
logging.getLogger().addHandler(handler)
|
|
|
|
|
|
def cleanup(context: mango.Context, wallet: mango.Wallet, account: mango.Account, market: mango.Market, dry_run: bool):
|
|
logging.info("Cleaning up.")
|
|
market_operations: mango.MarketOperations = mango.create_market_operations(
|
|
context, wallet, account, market, dry_run)
|
|
orders = market_operations.load_my_orders()
|
|
for order in orders:
|
|
market_operations.cancel_order(order)
|
|
logging.info("Settling.")
|
|
market_operations.crank()
|
|
market_operations.settle()
|
|
|
|
|
|
def add_file_health(name: str, observable: rx.core.typing.Observable[typing.Any], disposer: mango.DisposePropagator):
|
|
healthcheck_file_touch_disposer = observable.subscribe(
|
|
mango.FileToucherObserver(f"/var/tmp/mango_healthcheck_{name}"))
|
|
disposer.add_disposable(healthcheck_file_touch_disposer)
|
|
|
|
|
|
def build_latest_group_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group: mango.Group) -> mango.LatestItemObserverSubscriber[mango.Group]:
|
|
group_subscription = mango.WebSocketAccountSubscription[mango.Group](
|
|
context, group.address, lambda account_info: mango.Group.parse(context, account_info))
|
|
manager.add(group_subscription)
|
|
latest_group_observer = mango.LatestItemObserverSubscriber(group)
|
|
group_subscription.publisher.subscribe(latest_group_observer)
|
|
add_file_health("group_subscription", group_subscription.publisher, disposer)
|
|
return latest_group_observer
|
|
|
|
|
|
def build_latest_account_observer(context: mango.Context, account: mango.Account, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> typing.Tuple[mango.WebSocketSubscription[mango.Account], mango.LatestItemObserverSubscriber[mango.Account]]:
|
|
account_subscription = mango.WebSocketAccountSubscription[mango.Account](
|
|
context, account.address, lambda account_info: mango.Account.parse(account_info, group_observer.latest))
|
|
manager.add(account_subscription)
|
|
latest_account_observer = mango.LatestItemObserverSubscriber(account)
|
|
account_subscription.publisher.subscribe(latest_account_observer)
|
|
add_file_health("account_subscription", account_subscription.publisher, disposer)
|
|
return account_subscription, latest_account_observer
|
|
|
|
|
|
def build_latest_spot_open_orders_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, account: mango.Account, spot_market: mango.SpotMarket) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
|
|
market_index = group.find_spot_market_index(spot_market.address)
|
|
open_orders_address = account.spot_open_orders[market_index]
|
|
if open_orders_address is None:
|
|
spot_market_instruction_builder: mango.SpotMarketInstructionBuilder = mango.SpotMarketInstructionBuilder.load(
|
|
context, wallet, spot_market.group, account, spot_market)
|
|
market_operations: mango.SpotMarketOperations = mango.SpotMarketOperations(
|
|
context, wallet, spot_market.group, account, spot_market, spot_market_instruction_builder)
|
|
open_orders_address = market_operations.create_openorders_for_market()
|
|
logging.info(f"Created {spot_market.symbol} OpenOrders at: {open_orders_address}")
|
|
|
|
# This line is a little nasty. Now that we know we have an OpenOrders account at this address, update
|
|
# the Account so that future uses (like later in this method) have access to it in the right place.
|
|
account.update_spot_open_orders_for_market(market_index, open_orders_address)
|
|
|
|
spot_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
|
|
context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals))
|
|
manager.add(spot_open_orders_subscription)
|
|
initial_spot_open_orders = mango.OpenOrders.load(
|
|
context, open_orders_address, spot_market.base.decimals, spot_market.quote.decimals)
|
|
latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](
|
|
initial_spot_open_orders)
|
|
spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer)
|
|
add_file_health("open_orders_subscription", spot_open_orders_subscription.publisher, disposer)
|
|
return latest_open_orders_observer
|
|
|
|
|
|
def build_latest_serum_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, serum_market: mango.SerumMarket, context: mango.Context, wallet: mango.Wallet) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
|
|
all_open_orders = mango.OpenOrders.load_for_market_and_owner(
|
|
context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals)
|
|
if len(all_open_orders) > 0:
|
|
initial_serum_open_orders: mango.OpenOrders = all_open_orders[0]
|
|
open_orders_address = initial_serum_open_orders.address
|
|
else:
|
|
raw_market = PySerumMarket.load(context.client.compatible_client, serum_market.address)
|
|
create_open_orders = mango.build_create_serum_open_orders_instructions(
|
|
context, wallet, raw_market)
|
|
|
|
open_orders_address = create_open_orders.signers[0].public_key()
|
|
|
|
logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.")
|
|
signers: mango.CombinableInstructions = mango.CombinableInstructions.from_wallet(wallet)
|
|
transaction_ids = (signers + create_open_orders).execute(context)
|
|
context.client.wait_for_confirmation(transaction_ids)
|
|
initial_serum_open_orders = mango.OpenOrders.load(
|
|
context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals)
|
|
|
|
serum_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
|
|
context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals))
|
|
|
|
manager.add(serum_open_orders_subscription)
|
|
|
|
latest_serum_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](
|
|
initial_serum_open_orders)
|
|
serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer)
|
|
add_file_health("open_orders_subscription", serum_open_orders_subscription.publisher, disposer)
|
|
return latest_serum_open_orders_observer
|
|
|
|
|
|
def build_latest_perp_open_orders_observer(disposer: mango.DisposePropagator, perp_market: mango.PerpMarket, account: mango.Account, account_subscription: mango.WebSocketSubscription[mango.Account]) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
|
|
index = group.find_perp_market_index(perp_market.address)
|
|
initial_perp_account = account.perp_accounts[index]
|
|
if initial_perp_account is None:
|
|
raise Exception(f"Could not find perp account at index {index} of account {account.address}.")
|
|
initial_open_orders = initial_perp_account.open_orders
|
|
latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](initial_open_orders)
|
|
account_subscription.publisher.subscribe(
|
|
on_next=lambda updated_account: latest_open_orders_observer.on_next(updated_account.perp_accounts[index].open_orders))
|
|
add_file_health("open_orders_subscription", account_subscription.publisher, disposer)
|
|
return latest_open_orders_observer
|
|
|
|
|
|
def build_latest_price_observer(context: mango.Context, disposer: mango.DisposePropagator, provider_name: str, market: mango.Market) -> mango.LatestItemObserverSubscriber[mango.Price]:
|
|
oracle_provider: mango.OracleProvider = mango.create_oracle_provider(context, provider_name)
|
|
oracle = oracle_provider.oracle_for_market(context, market)
|
|
if oracle is None:
|
|
raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.")
|
|
|
|
initial_price = oracle.fetch_price(context)
|
|
price_feed = oracle.to_streaming_observable(context)
|
|
latest_price_observer = mango.LatestItemObserverSubscriber(initial_price)
|
|
price_disposable = price_feed.subscribe(latest_price_observer)
|
|
disposer.add_disposable(price_disposable)
|
|
add_file_health("price_subscription", price_feed, disposer)
|
|
return latest_price_observer
|
|
|
|
|
|
def build_serum_inventory_observer(context: mango.Context, disposer: mango.DisposePropagator, market: mango.Market) -> mango.Watcher[mango.Inventory]:
|
|
base_account = mango.TokenAccount.fetch_largest_for_owner_and_token(
|
|
context, wallet.address, market.base)
|
|
if base_account is None:
|
|
raise Exception(
|
|
f"Could not find token account owned by {wallet.address} for base token {market.base}.")
|
|
base_token_subscription = mango.WebSocketAccountSubscription[mango.TokenAccount](
|
|
context, base_account.address, lambda account_info: mango.TokenAccount.parse(account_info, market.base))
|
|
manager.add(base_token_subscription)
|
|
latest_base_token_account_observer = mango.LatestItemObserverSubscriber(base_account)
|
|
base_subscription_disposable = base_token_subscription.publisher.subscribe(latest_base_token_account_observer)
|
|
disposer.add_disposable(base_subscription_disposable)
|
|
|
|
quote_account = mango.TokenAccount.fetch_largest_for_owner_and_token(
|
|
context, wallet.address, market.quote)
|
|
if quote_account is None:
|
|
raise Exception(
|
|
f"Could not find token account owned by {wallet.address} for quote token {market.quote}.")
|
|
quote_token_subscription = mango.WebSocketAccountSubscription[mango.TokenAccount](
|
|
context, quote_account.address, lambda account_info: mango.TokenAccount.parse(account_info, market.quote))
|
|
manager.add(quote_token_subscription)
|
|
latest_quote_token_account_observer = mango.LatestItemObserverSubscriber(quote_account)
|
|
quote_subscription_disposable = quote_token_subscription.publisher.subscribe(latest_quote_token_account_observer)
|
|
disposer.add_disposable(quote_subscription_disposable)
|
|
|
|
def serum_inventory_accessor() -> mango.Inventory:
|
|
return mango.Inventory(mango.InventorySource.SPL_TOKENS,
|
|
latest_base_token_account_observer.latest.value,
|
|
latest_quote_token_account_observer.latest.value)
|
|
|
|
return mango.LamdaUpdateWatcher(serum_inventory_accessor)
|
|
|
|
|
|
context = mango.ContextBuilder.from_command_line_parameters(args)
|
|
|
|
disposer = mango.DisposePropagator()
|
|
manager = mango.WebSocketSubscriptionManager(context.name)
|
|
disposer.add_disposable(manager)
|
|
|
|
|
|
wallet = mango.Wallet.from_command_line_parameters_or_raise(args)
|
|
group = mango.Group.load(context, context.group_id)
|
|
account = mango.Account.load_for_owner_by_index(context, wallet.address, group, args.account_index)
|
|
|
|
market_symbol = args.market.upper()
|
|
market = context.market_lookup.find_by_symbol(market_symbol)
|
|
if market is None:
|
|
raise Exception(f"Could not find market {market_symbol}")
|
|
|
|
# The market index is also the index of the base token in the group's token list.
|
|
if market.quote != group.shared_quote_token.token:
|
|
raise Exception(
|
|
f"Group {group.name} uses shared quote token {group.shared_quote_token.token.symbol}/{group.shared_quote_token.token.mint}, but market {market.symbol} uses quote token {market.quote.symbol}/{market.quote.mint}.")
|
|
|
|
cleanup(context, wallet, account, market, args.dry_run)
|
|
|
|
latest_group_observer = build_latest_group_observer(context, manager, disposer, group)
|
|
account_subscription, latest_account_observer = build_latest_account_observer(
|
|
context, account, manager, disposer, latest_group_observer)
|
|
latest_price_observer = build_latest_price_observer(context, disposer, args.oracle_provider, market)
|
|
|
|
market = mango.ensure_market_loaded(context, market)
|
|
market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder(
|
|
context, wallet, account, market, args.dry_run)
|
|
if isinstance(market, mango.SerumMarket):
|
|
inventory_watcher: mango.Watcher[mango.Inventory] = build_serum_inventory_observer(context, disposer, market)
|
|
latest_open_orders_observer = build_latest_serum_open_orders_observer(manager, disposer, market, context, wallet)
|
|
elif isinstance(market, mango.SpotMarket):
|
|
inventory_watcher = mango.InventoryAccountWatcher(market, latest_account_observer)
|
|
latest_open_orders_observer = build_latest_spot_open_orders_observer(context, manager, disposer, account, market)
|
|
elif isinstance(market, mango.PerpMarket):
|
|
inventory_watcher = mango.InventoryAccountWatcher(market, latest_account_observer)
|
|
latest_open_orders_observer = build_latest_perp_open_orders_observer(
|
|
disposer, market, account, account_subscription)
|
|
else:
|
|
raise Exception(f"Could not determine type of market {market.symbol}")
|
|
|
|
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
|
|
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
|
|
ws.item.subscribe(on_next=manager.on_item)
|
|
ws.ping_interval = 10
|
|
ws.open()
|
|
|
|
order_reconciler = mango.marketmaking.ToleranceOrderReconciler(
|
|
args.existing_order_tolerance, args.existing_order_tolerance)
|
|
desired_orders_builder = mango.marketmaking.ConfidenceIntervalDesiredOrdersBuilder(
|
|
args.position_size_ratio, args.minimum_charge_ratio, args.confidence_weighting, args.order_type)
|
|
market_maker = mango.marketmaking.MarketMaker(
|
|
wallet, market, market_instruction_builder, desired_orders_builder, order_reconciler)
|
|
model_state = mango.marketmaking.ModelState(market, latest_account_observer,
|
|
latest_group_observer, latest_price_observer,
|
|
latest_open_orders_observer,
|
|
inventory_watcher)
|
|
|
|
add_file_health("marketmaker_pulse", market_maker.pulse_complete, disposer)
|
|
|
|
logging.info(f"Current assets in account {account.address} (owner: {account.owner}):")
|
|
mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info)
|
|
|
|
pulse_disposable = rx.interval(args.pulse_interval).subscribe(
|
|
on_next=lambda _: market_maker.pulse(context, model_state))
|
|
disposer.add_disposable(pulse_disposable)
|
|
|
|
# Wait - don't exit. Exiting will be handled by signals/interrupts.
|
|
waiter = threading.Event()
|
|
try:
|
|
waiter.wait()
|
|
except:
|
|
pass
|
|
|
|
logging.info("Shutting down...")
|
|
ws.close()
|
|
disposer.dispose()
|
|
cleanup(context, wallet, account, market, args.dry_run)
|
|
logging.info("Shutdown complete.")
|