Added bids and asks to marketmaker ModelState, with watchers for latest state.

This commit is contained in:
Geoff Taylor 2021-08-21 22:06:58 +01:00
parent 191d3e644c
commit b56d114a4a
7 changed files with 130 additions and 37 deletions

View File

@ -7,6 +7,7 @@ import os.path
import rx
import sys
import threading
import typing
from decimal import Decimal
@ -100,15 +101,28 @@ market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market
if isinstance(market, mango.SerumMarket):
inventory_watcher: mango.Watcher[mango.Inventory] = mango.build_serum_inventory_watcher(
context, manager, health_check, disposer, wallet, market)
latest_open_orders_observer = mango.build_serum_open_orders_watcher(context, manager, health_check, market, wallet)
latest_open_orders_observer: mango.Watcher[mango.PlacedOrdersContainer] = mango.build_serum_open_orders_watcher(
context, manager, health_check, market, wallet)
latest_bids_watcher: mango.Watcher[typing.Sequence[mango.Order]] = mango.build_serum_orderbook_side_watcher(
context, manager, health_check, market.underlying_serum_market, mango.OrderBookSideType.BIDS)
latest_asks_watcher: mango.Watcher[typing.Sequence[mango.Order]] = mango.build_serum_orderbook_side_watcher(
context, manager, health_check, market.underlying_serum_market, mango.OrderBookSideType.ASKS)
elif isinstance(market, mango.SpotMarket):
inventory_watcher = mango.SpotInventoryAccountWatcher(market, latest_account_observer)
latest_open_orders_observer = mango.build_spot_open_orders_watcher(
context, manager, health_check, wallet, account, group, market)
latest_bids_watcher = mango.build_serum_orderbook_side_watcher(
context, manager, health_check, market.underlying_serum_market, mango.OrderBookSideType.BIDS)
latest_asks_watcher = mango.build_serum_orderbook_side_watcher(
context, manager, health_check, market.underlying_serum_market, mango.OrderBookSideType.ASKS)
elif isinstance(market, mango.PerpMarket):
inventory_watcher = mango.PerpInventoryAccountWatcher(market, latest_account_observer, group)
latest_open_orders_observer = mango.build_perp_open_orders_watcher(
context, manager, health_check, market, account, group, account_subscription)
latest_bids_watcher = mango.build_perp_orderbook_side_watcher(
context, manager, health_check, market, mango.OrderBookSideType.BIDS)
latest_asks_watcher = mango.build_perp_orderbook_side_watcher(
context, manager, health_check, market, mango.OrderBookSideType.ASKS)
else:
raise Exception(f"Could not determine type of market {market.symbol}")
@ -125,8 +139,8 @@ market_maker = mango.marketmaking.MarketMaker(
wallet, market, market_instruction_builder, desired_orders_builder, order_reconciler)
model_state = mango.marketmaking.ModelState(market, latest_account_observer,
latest_group_observer, latest_price_observer,
latest_open_orders_observer,
inventory_watcher)
latest_open_orders_observer, inventory_watcher,
latest_bids_watcher, latest_asks_watcher)
health_check.add("marketmaker_pulse", market_maker.pulse_complete)

View File

@ -38,7 +38,7 @@ from .notification import NotificationTarget, TelegramNotificationTarget, Discor
from .observables import DisposePropagator, DisposeWrapper, NullObserverSubscriber, PrintingObserverSubscriber, TimestampedPrintingObserverSubscriber, CollectingObserverSubscriber, LatestItemObserverSubscriber, CaptureFirstItem, FunctionObserver, create_backpressure_skipping_observer, debug_print_item, log_subscription_error, observable_pipeline_error_reporter, EventSource
from .openorders import OpenOrders
from .oracle import OracleSource, Price, Oracle, OracleProvider, SupportedOracleFeature
from .orderbookside import OrderBookSide
from .orderbookside import OrderBookSideType, PerpOrderBookSide
from .orders import Order, OrderType, Side
from .ownedtokenvalue import OwnedTokenValue
from .oraclefactory import create_oracle_provider
@ -76,7 +76,7 @@ from .version import Version
from .wallet import Wallet
from .walletbalancer import TargetBalance, FixedTargetBalance, PercentageTargetBalance, TargetBalanceParser, sort_changes_for_trades, calculate_required_balance_changes, FilterSmallChanges, WalletBalancer, NullWalletBalancer, LiveWalletBalancer
from .watcher import Watcher, ManualUpdateWatcher, LamdaUpdateWatcher
from .watchers import build_group_watcher, build_account_watcher, build_spot_open_orders_watcher, build_serum_open_orders_watcher, build_perp_open_orders_watcher, build_price_watcher, build_serum_inventory_watcher
from .watchers import build_group_watcher, build_account_watcher, build_spot_open_orders_watcher, build_serum_open_orders_watcher, build_perp_open_orders_watcher, build_price_watcher, build_serum_inventory_watcher, build_perp_orderbook_side_watcher, build_serum_orderbook_side_watcher
from .websocketsubscription import WebSocketSubscription, WebSocketProgramSubscription, WebSocketAccountSubscription, WebSocketLogSubscription, WebSocketSubscriptionManager, IndividualWebSocketSubscriptionManager, SharedWebSocketSubscriptionManager
from .layouts import layouts

View File

@ -29,7 +29,9 @@ class ModelState:
group_watcher: mango.Watcher[mango.Group],
price_watcher: mango.Watcher[mango.Price],
placed_orders_container_watcher: mango.Watcher[mango.PlacedOrdersContainer],
inventory_watcher: mango.Watcher[mango.Inventory]
inventory_watcher: mango.Watcher[mango.Inventory],
bids: mango.Watcher[typing.Sequence[mango.Order]],
asks: mango.Watcher[typing.Sequence[mango.Order]]
):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.market: mango.Market = market
@ -40,6 +42,8 @@ class ModelState:
mango.PlacedOrdersContainer] = placed_orders_container_watcher
self.inventory_watcher: mango.Watcher[
mango.Inventory] = inventory_watcher
self.bids_watcher: mango.Watcher[typing.Sequence[mango.Order]] = bids
self.asks_watcher: mango.Watcher[typing.Sequence[mango.Order]] = asks
@property
def group(self) -> mango.Group:
@ -61,6 +65,14 @@ class ModelState:
def inventory(self) -> mango.Inventory:
return self.inventory_watcher.latest
@property
def bids(self) -> typing.Sequence[mango.Order]:
return self.bids_watcher.latest
@property
def asks(self) -> typing.Sequence[mango.Order]:
return self.asks_watcher.latest
@property
def existing_orders(self) -> typing.Sequence[mango.PlacedOrder]:
return self.placed_orders_container_watcher.latest.placed_orders

View File

@ -13,6 +13,7 @@
# [Github](https://github.com/blockworks-foundation)
# [Email](mailto:hello@blockworks.foundation)
import enum
import typing
from decimal import Decimal
@ -27,13 +28,29 @@ from .orders import Order, OrderType, Side
from .perpmarketdetails import PerpMarketDetails
from .version import Version
# # 🥭 OrderBookSide class
# # 🥭 OrderBookSideType enum
#
# `OrderBookSide` holds orders for one side of a market.
# Does the orderbook side represent bids or asks?
#
class OrderBookSideType(enum.Enum):
# We use strings here so that argparse can work with these as parameters.
BIDS = "BIDS"
ASKS = "ASKS"
class OrderBookSide(AddressableAccount):
def __str__(self) -> str:
return self.value
def __repr__(self) -> str:
return f"{self}"
# # 🥭 PerpOrderBookSide class
#
# `PerpOrderBookSide` holds orders for one side of a market.
#
class PerpOrderBookSide(AddressableAccount):
def __init__(self, account_info: AccountInfo, version: Version,
meta_data: Metadata, perp_market_details: PerpMarketDetails, bump_index: Decimal,
free_list_len: Decimal, free_list_head: Decimal, root_node: Decimal,
@ -51,7 +68,7 @@ class OrderBookSide(AddressableAccount):
self.nodes: typing.Any = nodes
@staticmethod
def from_layout(layout: layouts.ORDERBOOK_SIDE, account_info: AccountInfo, version: Version, perp_market_details: PerpMarketDetails) -> "OrderBookSide":
def from_layout(layout: layouts.ORDERBOOK_SIDE, account_info: AccountInfo, version: Version, perp_market_details: PerpMarketDetails) -> "PerpOrderBookSide":
meta_data = Metadata.from_layout(layout.meta_data)
bump_index: Decimal = layout.bump_index
free_list_len: Decimal = layout.free_list_len
@ -60,28 +77,28 @@ class OrderBookSide(AddressableAccount):
leaf_count: Decimal = layout.leaf_count
nodes: typing.Any = layout.nodes
return OrderBookSide(account_info, version, meta_data, perp_market_details, bump_index, free_list_len, free_list_head, root_node, leaf_count, nodes)
return PerpOrderBookSide(account_info, version, meta_data, perp_market_details, bump_index, free_list_len, free_list_head, root_node, leaf_count, nodes)
@staticmethod
def parse(context: Context, account_info: AccountInfo, perp_market_details: PerpMarketDetails) -> "OrderBookSide":
def parse(context: Context, account_info: AccountInfo, perp_market_details: PerpMarketDetails) -> "PerpOrderBookSide":
data = account_info.data
if len(data) != layouts.ORDERBOOK_SIDE.sizeof():
raise Exception(
f"OrderBookSide data length ({len(data)}) does not match expected size ({layouts.ORDERBOOK_SIDE.sizeof()})")
f"PerpOrderBookSide data length ({len(data)}) does not match expected size ({layouts.ORDERBOOK_SIDE.sizeof()})")
layout = layouts.ORDERBOOK_SIDE.parse(data)
return OrderBookSide.from_layout(layout, account_info, Version.V1, perp_market_details)
return PerpOrderBookSide.from_layout(layout, account_info, Version.V1, perp_market_details)
@staticmethod
def load(context: Context, address: PublicKey, perp_market_details: PerpMarketDetails) -> "OrderBookSide":
def load(context: Context, address: PublicKey, perp_market_details: PerpMarketDetails) -> "PerpOrderBookSide":
account_info = AccountInfo.load(context, address)
if account_info is None:
raise Exception(f"OrderBookSide account not found at address '{address}'")
return OrderBookSide.parse(context, account_info, perp_market_details)
raise Exception(f"PerpOrderBookSide account not found at address '{address}'")
return PerpOrderBookSide.parse(context, account_info, perp_market_details)
def orders(self) -> typing.Generator[Order, None, None]:
def orders(self) -> typing.Sequence[Order]:
if self.leaf_count == 0:
return
return []
if self.meta_data.data_type == layouts.DATA_TYPE.Bids:
order_side = Side.BUY
@ -89,6 +106,7 @@ class OrderBookSide(AddressableAccount):
order_side = Side.SELL
stack = [self.root_node]
orders: typing.List[Order] = []
while len(stack) > 0:
index = int(stack.pop())
node = self.nodes[index]
@ -105,22 +123,23 @@ class OrderBookSide(AddressableAccount):
base_factor = Decimal(10) ** self.perp_market_details.base_token.decimals
actual_quantity = (quantity * self.perp_market_details.base_lot_size) / base_factor
yield Order(int(node.key["order_id"]),
node.client_order_id,
node.owner,
order_side,
actual_price,
actual_quantity,
OrderType.UNKNOWN)
orders += [Order(int(node.key["order_id"]),
node.client_order_id,
node.owner,
order_side,
actual_price,
actual_quantity,
OrderType.UNKNOWN)]
elif node.type_name == "inner":
if order_side == Side.BUY:
stack = [node.children[0], node.children[1], *stack]
stack = [*stack, node.children[0], node.children[1]]
else:
stack = [node.children[1], node.children[0], *stack]
stack = [*stack, node.children[1], node.children[0]]
return orders
def __str__(self) -> str:
nodes = "\n ".join([str(node).replace("\n", "\n ") for node in self.orders()])
return f"""« 𝙾𝚛𝚍𝚎𝚛𝙱𝚘𝚘𝚔𝚂𝚒𝚍𝚎 {self.version} [{self.address}]
return f"""« 𝙿𝚎𝚛𝚙𝙾𝚛𝚍𝚎𝚛𝙱𝚘𝚘𝚔𝚂𝚒𝚍𝚎 {self.version} [{self.address}]
{self.meta_data}
Perp Market: {self.perp_market_details}
Bump Index: {self.bump_index}

View File

@ -22,7 +22,7 @@ from .context import Context
from .group import Group
from .lotsizeconverter import LotSizeConverter
from .market import Market, InventorySource
from .orderbookside import OrderBookSide
from .orderbookside import PerpOrderBookSide
from .orders import Order
from .perpeventqueue import PerpEvent, PerpEventQueue
from .perpmarketdetails import PerpMarketDetails
@ -75,8 +75,8 @@ class PerpMarket(Market):
bids_address: PublicKey = self.underlying_perp_market.bids
asks_address: PublicKey = self.underlying_perp_market.asks
[bids, asks] = AccountInfo.load_multiple(context, [bids_address, asks_address])
bid_side = OrderBookSide.parse(context, bids, self.underlying_perp_market)
ask_side = OrderBookSide.parse(context, asks, self.underlying_perp_market)
bid_side = PerpOrderBookSide.parse(context, bids, self.underlying_perp_market)
ask_side = PerpOrderBookSide.parse(context, asks, self.underlying_perp_market)
return [*bid_side.orders(), *ask_side.orders()]
def __str__(self) -> str:

View File

@ -36,7 +36,7 @@ TWatched = typing.TypeVar("TWatched", covariant=True)
class Watcher(typing.Protocol[TWatched]):
@property
def latest(self) -> TWatched:
raise NotImplementedError("Watcher.latest is not implemented on the base type.")
raise NotImplementedError("Watcher.latest is not implemented on the Protocol.")
# # 🥭 ManualUpdateWatcher class

View File

@ -17,8 +17,11 @@ import logging
import typing
from pyserum.market import Market as PySerumMarket
from pyserum.market.orderbook import OrderBook as PySerumOrderBook
from solana.publickey import PublicKey
from .account import Account
from .accountinfo import AccountInfo
from .combinableinstructions import CombinableInstructions
from .context import Context
from .group import Group
@ -30,6 +33,8 @@ from .observables import DisposePropagator, LatestItemObserverSubscriber
from .openorders import OpenOrders
from .oracle import Price
from .oraclefactory import OracleProvider, create_oracle_provider
from .orderbookside import OrderBookSideType, PerpOrderBookSide
from .orders import Order
from .perpmarket import PerpMarket
from .placedorder import PlacedOrdersContainer
from .serummarket import SerumMarket
@ -46,7 +51,7 @@ def build_group_watcher(context: Context, manager: WebSocketSubscriptionManager,
group_subscription = WebSocketAccountSubscription[Group](
context, group.address, lambda account_info: Group.parse(context, account_info))
manager.add(group_subscription)
latest_group_observer = LatestItemObserverSubscriber(group)
latest_group_observer = LatestItemObserverSubscriber[Group](group)
group_subscription.publisher.subscribe(latest_group_observer)
health_check.add("group_subscription", group_subscription.publisher)
return latest_group_observer
@ -56,7 +61,7 @@ def build_account_watcher(context: Context, manager: WebSocketSubscriptionManage
account_subscription = WebSocketAccountSubscription[Account](
context, account.address, lambda account_info: Account.parse(account_info, group_observer.latest))
manager.add(account_subscription)
latest_account_observer = LatestItemObserverSubscriber(account)
latest_account_observer = LatestItemObserverSubscriber[Account](account)
account_subscription.publisher.subscribe(latest_account_observer)
health_check.add("account_subscription", account_subscription.publisher)
return account_subscription, latest_account_observer
@ -154,7 +159,7 @@ def build_serum_inventory_watcher(context: Context, manager: WebSocketSubscripti
base_token_subscription = WebSocketAccountSubscription[TokenAccount](
context, base_account.address, lambda account_info: TokenAccount.parse(account_info, market.base))
manager.add(base_token_subscription)
latest_base_token_account_observer = LatestItemObserverSubscriber(base_account)
latest_base_token_account_observer = LatestItemObserverSubscriber[TokenAccount](base_account)
base_subscription_disposable = base_token_subscription.publisher.subscribe(latest_base_token_account_observer)
disposer.add_disposable(base_subscription_disposable)
@ -166,7 +171,7 @@ def build_serum_inventory_watcher(context: Context, manager: WebSocketSubscripti
quote_token_subscription = WebSocketAccountSubscription[TokenAccount](
context, quote_account.address, lambda account_info: TokenAccount.parse(account_info, market.quote))
manager.add(quote_token_subscription)
latest_quote_token_account_observer = LatestItemObserverSubscriber(quote_account)
latest_quote_token_account_observer = LatestItemObserverSubscriber[TokenAccount](quote_account)
quote_subscription_disposable = quote_token_subscription.publisher.subscribe(latest_quote_token_account_observer)
disposer.add_disposable(quote_subscription_disposable)
@ -176,3 +181,46 @@ def build_serum_inventory_watcher(context: Context, manager: WebSocketSubscripti
latest_quote_token_account_observer.latest.value)
return LamdaUpdateWatcher(serum_inventory_accessor)
def build_perp_orderbook_side_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, perp_market: PerpMarket, side: OrderBookSideType) -> Watcher[typing.Sequence[Order]]:
orderbook_address: PublicKey = perp_market.underlying_perp_market.bids if side == OrderBookSideType.BIDS else perp_market.underlying_perp_market.asks
orderbook_side_info = AccountInfo.load(context, orderbook_address)
if orderbook_side_info is None:
raise Exception(f"Could not find perp order book side at address {orderbook_address}.")
initial_orderbook_side: PerpOrderBookSide = PerpOrderBookSide.parse(
context, orderbook_side_info, perp_market.underlying_perp_market)
orders_subscription = WebSocketAccountSubscription[typing.Sequence[Order]](
context, orderbook_address, lambda account_info: PerpOrderBookSide.parse(context, account_info, perp_market.underlying_perp_market).orders())
manager.add(orders_subscription)
latest_orders_observer = LatestItemObserverSubscriber[typing.Sequence[Order]](initial_orderbook_side.orders())
orders_subscription.publisher.subscribe(latest_orders_observer)
health_check.add("orderbook_side_subscription", orders_subscription.publisher)
return latest_orders_observer
def build_serum_orderbook_side_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, underlying_serum_market: PySerumMarket, side: OrderBookSideType) -> Watcher[typing.Sequence[Order]]:
orderbook_address: PublicKey = underlying_serum_market.state.bids if side == OrderBookSideType.BIDS else underlying_serum_market.state.asks
orderbook_side_info = AccountInfo.load(context, orderbook_address)
if orderbook_side_info is None:
raise Exception(f"Could not find Serum order book side at address {orderbook_address}.")
def account_info_to_orderbook(account_info: AccountInfo) -> typing.Sequence[Order]:
serum_orderbook_side = PySerumOrderBook.from_bytes(
underlying_serum_market.state, account_info.data)
return list(map(Order.from_serum_order, serum_orderbook_side.orders()))
initial_orderbook_side: typing.Sequence[Order] = account_info_to_orderbook(orderbook_side_info)
orders_subscription = WebSocketAccountSubscription[typing.Sequence[Order]](
context, orderbook_address, account_info_to_orderbook)
manager.add(orders_subscription)
latest_orders_observer = LatestItemObserverSubscriber[typing.Sequence[Order]](initial_orderbook_side)
orders_subscription.publisher.subscribe(latest_orders_observer)
health_check.add("orderbook_side_subscription", orders_subscription.publisher)
return latest_orders_observer