From a4ca18e5ed4bc8d94fe8186b26e7dc14f6013ea5 Mon Sep 17 00:00:00 2001 From: Geoff Taylor Date: Tue, 17 Aug 2021 12:17:49 +0100 Subject: [PATCH] Added/centralised 'watchers'. Removed FileToucherObserver. Added HealthCheck. --- bin/hedger | 4 +- bin/marketmaker | 174 +++-------------------------------- bin/watch-liquidations | 7 +- bin/watch-minimum-balances | 8 +- mango/__init__.py | 4 +- mango/healthcheck.py | 40 ++++++++ mango/observables.py | 66 +++----------- mango/watchers.py | 182 +++++++++++++++++++++++++++++++++++++ 8 files changed, 262 insertions(+), 223 deletions(-) create mode 100644 mango/healthcheck.py create mode 100644 mango/watchers.py diff --git a/bin/hedger b/bin/hedger index 14a64a7..2026107 100755 --- a/bin/hedger +++ b/bin/hedger @@ -49,6 +49,8 @@ account = mango.Account.load_for_owner_by_index(context, wallet.address, group, disposer = mango.DisposePropagator() manager = mango.WebSocketSubscriptionManager(context.name) disposer.add_disposable(manager) +health_check = mango.HealthCheck() +disposer.add_disposable(health_check) buy_price_adjustment_factor: Decimal = Decimal("1") + args.max_price_slippage_factor sell_price_adjustment_factor: Decimal = Decimal("1") - args.max_price_slippage_factor @@ -116,7 +118,7 @@ websocket_url = context.client.cluster_url.replace("https", "ws", 1) ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 -ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong")) +health_check.add("ws_pong", ws.pong) ws.open() logging.info(f"Current assets in account {account.address} (owner: {account.owner}):") diff --git a/bin/marketmaker b/bin/marketmaker index 7f34880..b1f71fd 100755 --- a/bin/marketmaker +++ b/bin/marketmaker @@ -7,10 +7,8 @@ import os.path import rx import sys import threading -import typing from decimal import Decimal -from pyserum.market import Market as PySerumMarket sys.path.insert(0, os.path.abspath( os.path.join(os.path.dirname(__file__), ".."))) @@ -63,158 +61,13 @@ def cleanup(context: mango.Context, wallet: mango.Wallet, account: mango.Account market_operations.settle() -def add_file_health(name: str, observable: rx.core.typing.Observable[typing.Any], disposer: mango.DisposePropagator): - healthcheck_file_touch_disposer = observable.subscribe( - mango.FileToucherObserver(f"/var/tmp/mango_healthcheck_{name}")) - disposer.add_disposable(healthcheck_file_touch_disposer) - - -def build_latest_group_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group: mango.Group) -> mango.LatestItemObserverSubscriber[mango.Group]: - group_subscription = mango.WebSocketAccountSubscription[mango.Group]( - context, group.address, lambda account_info: mango.Group.parse(context, account_info)) - manager.add(group_subscription) - latest_group_observer = mango.LatestItemObserverSubscriber(group) - group_subscription.publisher.subscribe(latest_group_observer) - add_file_health("group_subscription", group_subscription.publisher, disposer) - return latest_group_observer - - -def build_latest_account_observer(context: mango.Context, account: mango.Account, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> typing.Tuple[mango.WebSocketSubscription[mango.Account], mango.LatestItemObserverSubscriber[mango.Account]]: - account_subscription = mango.WebSocketAccountSubscription[mango.Account]( - context, account.address, lambda account_info: mango.Account.parse(account_info, group_observer.latest)) - manager.add(account_subscription) - latest_account_observer = mango.LatestItemObserverSubscriber(account) - account_subscription.publisher.subscribe(latest_account_observer) - add_file_health("account_subscription", account_subscription.publisher, disposer) - return account_subscription, latest_account_observer - - -def build_latest_spot_open_orders_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, account: mango.Account, spot_market: mango.SpotMarket) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]: - market_index = group.find_spot_market_index(spot_market.address) - open_orders_address = account.spot_open_orders[market_index] - if open_orders_address is None: - spot_market_instruction_builder: mango.SpotMarketInstructionBuilder = mango.SpotMarketInstructionBuilder.load( - context, wallet, spot_market.group, account, spot_market) - market_operations: mango.SpotMarketOperations = mango.SpotMarketOperations( - context, wallet, spot_market.group, account, spot_market, spot_market_instruction_builder) - open_orders_address = market_operations.create_openorders_for_market() - logging.info(f"Created {spot_market.symbol} OpenOrders at: {open_orders_address}") - - # This line is a little nasty. Now that we know we have an OpenOrders account at this address, update - # the Account so that future uses (like later in this method) have access to it in the right place. - account.update_spot_open_orders_for_market(market_index, open_orders_address) - - spot_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders]( - context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals)) - manager.add(spot_open_orders_subscription) - initial_spot_open_orders = mango.OpenOrders.load( - context, open_orders_address, spot_market.base.decimals, spot_market.quote.decimals) - latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]( - initial_spot_open_orders) - spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer) - add_file_health("open_orders_subscription", spot_open_orders_subscription.publisher, disposer) - return latest_open_orders_observer - - -def build_latest_serum_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, serum_market: mango.SerumMarket, context: mango.Context, wallet: mango.Wallet) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]: - all_open_orders = mango.OpenOrders.load_for_market_and_owner( - context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals) - if len(all_open_orders) > 0: - initial_serum_open_orders: mango.OpenOrders = all_open_orders[0] - open_orders_address = initial_serum_open_orders.address - else: - raw_market = PySerumMarket.load(context.client.compatible_client, serum_market.address) - create_open_orders = mango.build_create_serum_open_orders_instructions( - context, wallet, raw_market) - - open_orders_address = create_open_orders.signers[0].public_key() - - logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.") - signers: mango.CombinableInstructions = mango.CombinableInstructions.from_wallet(wallet) - transaction_ids = (signers + create_open_orders).execute(context) - context.client.wait_for_confirmation(transaction_ids) - initial_serum_open_orders = mango.OpenOrders.load( - context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals) - - serum_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders]( - context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals)) - - manager.add(serum_open_orders_subscription) - - latest_serum_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]( - initial_serum_open_orders) - serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer) - add_file_health("open_orders_subscription", serum_open_orders_subscription.publisher, disposer) - return latest_serum_open_orders_observer - - -def build_latest_perp_open_orders_observer(disposer: mango.DisposePropagator, perp_market: mango.PerpMarket, account: mango.Account, account_subscription: mango.WebSocketSubscription[mango.Account]) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]: - index = group.find_perp_market_index(perp_market.address) - initial_perp_account = account.perp_accounts[index] - if initial_perp_account is None: - raise Exception(f"Could not find perp account at index {index} of account {account.address}.") - initial_open_orders = initial_perp_account.open_orders - latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](initial_open_orders) - account_subscription.publisher.subscribe( - on_next=lambda updated_account: latest_open_orders_observer.on_next(updated_account.perp_accounts[index].open_orders)) - add_file_health("open_orders_subscription", account_subscription.publisher, disposer) - return latest_open_orders_observer - - -def build_latest_price_observer(context: mango.Context, disposer: mango.DisposePropagator, provider_name: str, market: mango.Market) -> mango.LatestItemObserverSubscriber[mango.Price]: - oracle_provider: mango.OracleProvider = mango.create_oracle_provider(context, provider_name) - oracle = oracle_provider.oracle_for_market(context, market) - if oracle is None: - raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.") - - initial_price = oracle.fetch_price(context) - price_feed = oracle.to_streaming_observable(context) - latest_price_observer = mango.LatestItemObserverSubscriber(initial_price) - price_disposable = price_feed.subscribe(latest_price_observer) - disposer.add_disposable(price_disposable) - add_file_health("price_subscription", price_feed, disposer) - return latest_price_observer - - -def build_serum_inventory_observer(context: mango.Context, disposer: mango.DisposePropagator, market: mango.Market) -> mango.Watcher[mango.Inventory]: - base_account = mango.TokenAccount.fetch_largest_for_owner_and_token( - context, wallet.address, market.base) - if base_account is None: - raise Exception( - f"Could not find token account owned by {wallet.address} for base token {market.base}.") - base_token_subscription = mango.WebSocketAccountSubscription[mango.TokenAccount]( - context, base_account.address, lambda account_info: mango.TokenAccount.parse(account_info, market.base)) - manager.add(base_token_subscription) - latest_base_token_account_observer = mango.LatestItemObserverSubscriber(base_account) - base_subscription_disposable = base_token_subscription.publisher.subscribe(latest_base_token_account_observer) - disposer.add_disposable(base_subscription_disposable) - - quote_account = mango.TokenAccount.fetch_largest_for_owner_and_token( - context, wallet.address, market.quote) - if quote_account is None: - raise Exception( - f"Could not find token account owned by {wallet.address} for quote token {market.quote}.") - quote_token_subscription = mango.WebSocketAccountSubscription[mango.TokenAccount]( - context, quote_account.address, lambda account_info: mango.TokenAccount.parse(account_info, market.quote)) - manager.add(quote_token_subscription) - latest_quote_token_account_observer = mango.LatestItemObserverSubscriber(quote_account) - quote_subscription_disposable = quote_token_subscription.publisher.subscribe(latest_quote_token_account_observer) - disposer.add_disposable(quote_subscription_disposable) - - def serum_inventory_accessor() -> mango.Inventory: - return mango.Inventory(mango.InventorySource.SPL_TOKENS, - latest_base_token_account_observer.latest.value, - latest_quote_token_account_observer.latest.value) - - return mango.LamdaUpdateWatcher(serum_inventory_accessor) - - context = mango.ContextBuilder.from_command_line_parameters(args) disposer = mango.DisposePropagator() manager = mango.WebSocketSubscriptionManager(context.name) disposer.add_disposable(manager) - +health_check = mango.HealthCheck() +disposer.add_disposable(health_check) wallet = mango.Wallet.from_command_line_parameters_or_raise(args) group = mango.Group.load(context, context.group_id) @@ -232,24 +85,27 @@ if market.quote != group.shared_quote_token.token: cleanup(context, wallet, account, market, args.dry_run) -latest_group_observer = build_latest_group_observer(context, manager, disposer, group) -account_subscription, latest_account_observer = build_latest_account_observer( - context, account, manager, disposer, latest_group_observer) -latest_price_observer = build_latest_price_observer(context, disposer, args.oracle_provider, market) +latest_group_observer = mango.build_group_watcher(context, manager, health_check, group) +account_subscription, latest_account_observer = mango.build_account_watcher( + context, manager, health_check, account, latest_group_observer) +latest_price_observer = mango.build_price_watcher( + context, manager, health_check, disposer, args.oracle_provider, market) market = mango.ensure_market_loaded(context, market) market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder( context, wallet, account, market, args.dry_run) if isinstance(market, mango.SerumMarket): - inventory_watcher: mango.Watcher[mango.Inventory] = build_serum_inventory_observer(context, disposer, market) - latest_open_orders_observer = build_latest_serum_open_orders_observer(manager, disposer, market, context, wallet) + inventory_watcher: mango.Watcher[mango.Inventory] = mango.build_serum_inventory_watcher( + context, manager, health_check, disposer, wallet, market) + latest_open_orders_observer = mango.build_serum_open_orders_watcher(context, manager, health_check, market, wallet) elif isinstance(market, mango.SpotMarket): inventory_watcher = mango.InventoryAccountWatcher(market, latest_account_observer) - latest_open_orders_observer = build_latest_spot_open_orders_observer(context, manager, disposer, account, market) + latest_open_orders_observer = mango.build_spot_open_orders_watcher( + context, manager, health_check, wallet, account, group, market) elif isinstance(market, mango.PerpMarket): inventory_watcher = mango.InventoryAccountWatcher(market, latest_account_observer) - latest_open_orders_observer = build_latest_perp_open_orders_observer( - disposer, market, account, account_subscription) + latest_open_orders_observer = mango.build_perp_open_orders_watcher( + context, manager, health_check, market, account, group, account_subscription) else: raise Exception(f"Could not determine type of market {market.symbol}") @@ -270,7 +126,7 @@ model_state = mango.marketmaking.ModelState(market, latest_account_observer, latest_open_orders_observer, inventory_watcher) -add_file_health("marketmaker_pulse", market_maker.pulse_complete, disposer) +health_check.add("marketmaker_pulse", market_maker.pulse_complete) logging.info(f"Current assets in account {account.address} (owner: {account.owner}):") mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info) diff --git a/bin/watch-liquidations b/bin/watch-liquidations index 9d57bfc..26a8bcd 100755 --- a/bin/watch-liquidations +++ b/bin/watch-liquidations @@ -40,6 +40,8 @@ print(context) disposer = mango.DisposePropagator() manager = mango.WebSocketSubscriptionManager(context.name) disposer.add_disposable(manager) +health_check = mango.HealthCheck() +disposer.add_disposable(health_check) publishers = [] for address in args.address: @@ -65,10 +67,7 @@ websocket_url = context.client.cluster_url.replace("https", "wss", 1) ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 - -ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong")) -disposer.add_disposable(ws_pong_disposable) - +health_check.add("ws_pong", ws.pong) ws.open() # Wait - don't exit. Exiting will be handled by signals/interrupts. diff --git a/bin/watch-minimum-balances b/bin/watch-minimum-balances index 934a5d8..97aeda2 100755 --- a/bin/watch-minimum-balances +++ b/bin/watch-minimum-balances @@ -105,6 +105,8 @@ context = mango.ContextBuilder.from_command_line_parameters(args) disposer = mango.DisposePropagator() manager = mango.WebSocketSubscriptionManager(context.name) disposer.add_disposable(manager) +health_check = mango.HealthCheck() +disposer.add_disposable(health_check) for name_and_address in args.named_address: add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address) @@ -113,11 +115,7 @@ websocket_url = context.client.cluster_url.replace("https", "ws", 1) ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 - -ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong")) -disposer.add_disposable(ws_pong_disposable) -ws_pong_log_disposable = ws.pong.subscribe(on_next=lambda _: logging.info("Pong")) -disposer.add_disposable(ws_pong_log_disposable) +health_check.add("ws_pong", ws.pong) ws_connecting_log_disposable = ws.connecting.subscribe( on_next=lambda _: send_event_notification("Re-connecting websocket")) disposer.add_disposable(ws_connecting_log_disposable) diff --git a/mango/__init__.py b/mango/__init__.py index 54b9d89..5eca437 100644 --- a/mango/__init__.py +++ b/mango/__init__.py @@ -17,6 +17,7 @@ from .createmarketoperations import create_market_operations from .encoding import decode_binary, encode_binary, encode_key, encode_int from .ensuremarketloaded import ensure_market_loaded from .group import GroupBasketMarket, Group +from .healthcheck import HealthCheck from .idsjsontokenlookup import IdsJsonTokenLookup from .idsjsonmarketlookup import IdsJsonMarketLookup from .inventory import Inventory, InventoryAccountWatcher, spl_token_inventory_loader, account_inventory_loader @@ -32,7 +33,7 @@ from .marketlookup import MarketLookup, NullMarketLookup, CompoundMarketLookup from .marketoperations import MarketOperations, NullMarketOperations from .metadata import Metadata from .notification import NotificationTarget, TelegramNotificationTarget, DiscordNotificationTarget, MailjetNotificationTarget, CsvFileNotificationTarget, FilteringNotificationTarget, NotificationHandler, parse_subscription_target -from .observables import DisposePropagator, DisposeWrapper, NullObserverSubscriber, PrintingObserverSubscriber, TimestampedPrintingObserverSubscriber, CollectingObserverSubscriber, LatestItemObserverSubscriber, CaptureFirstItem, FunctionObserver, create_backpressure_skipping_observer, debug_print_item, log_subscription_error, observable_pipeline_error_reporter, EventSource, FileToucherObserver +from .observables import DisposePropagator, DisposeWrapper, NullObserverSubscriber, PrintingObserverSubscriber, TimestampedPrintingObserverSubscriber, CollectingObserverSubscriber, LatestItemObserverSubscriber, CaptureFirstItem, FunctionObserver, create_backpressure_skipping_observer, debug_print_item, log_subscription_error, observable_pipeline_error_reporter, EventSource from .openorders import OpenOrders from .oracle import OracleSource, Price, Oracle, OracleProvider, SupportedOracleFeature from .orderbookside import OrderBookSide @@ -73,6 +74,7 @@ from .version import Version from .wallet import Wallet from .walletbalancer import TargetBalance, FixedTargetBalance, PercentageTargetBalance, TargetBalanceParser, sort_changes_for_trades, calculate_required_balance_changes, FilterSmallChanges, WalletBalancer, NullWalletBalancer, LiveWalletBalancer from .watcher import Watcher, ManualUpdateWatcher, LamdaUpdateWatcher +from .watchers import build_group_watcher, build_account_watcher, build_spot_open_orders_watcher, build_serum_open_orders_watcher, build_perp_open_orders_watcher, build_price_watcher, build_serum_inventory_watcher from .websocketsubscription import WebSocketSubscription, WebSocketProgramSubscription, WebSocketAccountSubscription, WebSocketLogSubscription, WebSocketSubscriptionManager from .layouts import layouts diff --git a/mango/healthcheck.py b/mango/healthcheck.py new file mode 100644 index 0000000..db9c35b --- /dev/null +++ b/mango/healthcheck.py @@ -0,0 +1,40 @@ +# # ⚠ Warning +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT +# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +# NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +# [🥭 Mango Markets](https://mango.markets/) support is available at: +# [Docs](https://docs.mango.markets/) +# [Discord](https://discord.gg/67jySBhxrg) +# [Twitter](https://twitter.com/mangomarkets) +# [Github](https://github.com/blockworks-foundation) +# [Email](mailto:hello@blockworks.foundation) + +import rx +import typing + +from pathlib import Path + + +# # 🥭 HealthCheck class +# +# `HealthCheck` adds an observable to the health checks. Current version touches a file when a new item +# is observed from that observable, so that external systems can watch those files and gain insight into +# the health of the system. +# +class HealthCheck(rx.core.typing.Disposable): + def __init__(self, healthcheck_files_location: str = "/var/tmp"): + self.healthcheck_files_location: str = healthcheck_files_location + self._to_dispose: typing.List[rx.core.typing.Disposable] = [] + + def add(self, name: str, observable: rx.core.typing.Observable[typing.Any]): + healthcheck_file_touch_disposer = observable.subscribe( + on_next=lambda _: Path(f"{self.healthcheck_files_location}/mango_healthcheck_{name}").touch(mode=0o666, exist_ok=True)) + self._to_dispose += [healthcheck_file_touch_disposer] + + def dispose(self) -> None: + for disposable in self._to_dispose: + disposable.dispose() diff --git a/mango/observables.py b/mango/observables.py index 3bd0f00..fb38764 100644 --- a/mango/observables.py +++ b/mango/observables.py @@ -20,7 +20,6 @@ import rx.subject import typing from datetime import datetime -from pathlib import Path from rx.core.abc.disposable import Disposable from rxpy_backpressure import BackPressure @@ -55,8 +54,6 @@ class NullObserverSubscriber(rx.core.typing.Observer): # # This class can subscribe to an `Observable` and print out each item. # - - class PrintingObserverSubscriber(rx.core.typing.Observer): def __init__(self, report_no_output: bool) -> None: super().__init__() @@ -81,7 +78,6 @@ class PrintingObserverSubscriber(rx.core.typing.Observer): # # Just like `PrintingObserverSubscriber` but it puts a timestamp on each printout. # - class TimestampedPrintingObserverSubscriber(PrintingObserverSubscriber): def __init__(self, report_no_output: bool) -> None: super().__init__(report_no_output) @@ -94,8 +90,6 @@ class TimestampedPrintingObserverSubscriber(PrintingObserverSubscriber): # # This class can subscribe to an `Observable` and collect each item. # - - class CollectingObserverSubscriber(rx.core.typing.Observer): def __init__(self) -> None: self.logger: logging.Logger = logging.getLogger(self.__class__.__name__) @@ -116,8 +110,6 @@ class CollectingObserverSubscriber(rx.core.typing.Observer): # This captures the first item to pass through the pipeline, allowing it to be inspected # later. # - - class CaptureFirstItem: def __init__(self): self.captured: typing.Any = None @@ -131,14 +123,17 @@ class CaptureFirstItem: return item -# # 🥭 NullObserverSubscriber class +# # 🥭 TItem type parameter # -# This class can subscribe to an `Observable` to do nothing but make sure it runs. +# The `TItem` type parameter is the type parameter for the generic `LatestItemObserverSubscriber`. # - TItem = typing.TypeVar('TItem') +# # 🥭 LatestItemObserverSubscriber class +# +# This class can subscribe to an `Observable` and capture the latest item as it is observed. +# class LatestItemObserverSubscriber(rx.core.typing.Observer, typing.Generic[TItem]): def __init__(self, initial: TItem) -> None: super().__init__() @@ -164,8 +159,6 @@ class LatestItemObserverSubscriber(rx.core.typing.Observer, typing.Generic[TItem # This is mostly for libraries (like `rxpy_backpressure`) that take observers but not their # component functions. # - - class FunctionObserver(rx.core.typing.Observer): def __init__(self, on_next: typing.Callable[[typing.Any], None], @@ -203,7 +196,6 @@ class FunctionObserver(rx.core.typing.Observer): # take multiple seconds to complete. In that case, the latest item will be immediately # emitted and the in-between items skipped. # - def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any], None], on_error: typing.Callable[[Exception], None] = lambda _: None, on_completed: typing.Callable[[], None] = lambda: None) -> rx.core.typing.Observer: observer = FunctionObserver(on_next=on_next, on_error=on_error, on_completed=on_completed) return BackPressure.LATEST(observer) @@ -222,7 +214,6 @@ def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any], # ).subscribe(some_subscriber) # ``` # - def debug_print_item(title: str) -> typing.Callable[[typing.Any], typing.Any]: def _debug_print_item(item: typing.Any) -> typing.Any: print(title, item) @@ -234,7 +225,6 @@ def debug_print_item(title: str) -> typing.Callable[[typing.Any], typing.Any]: # # Logs subscription exceptions to the root logger. # - def log_subscription_error(error: Exception) -> None: logging.error(f"Observable subscription error: {error}") @@ -268,20 +258,22 @@ def log_subscription_error(error: Exception) -> None: # sub1.subscribe(lambda item: print(item), on_error = lambda error: print(f"Error : {error}")) # ``` # - def observable_pipeline_error_reporter(ex, _): logging.error(f"Intercepted error in observable pipeline: {ex}") raise ex +# # 🥭 TEventDatum type parameter +# +# The `TEventDatum` type parameter is the type parameter for the generic `LatestItemObserverSubscriber`. +# +TEventDatum = typing.TypeVar('TEventDatum') + + # # 🥭 EventSource class # # A strongly(ish)-typed event source that can handle many subscribers. # - -TEventDatum = typing.TypeVar('TEventDatum') - - class EventSource(rx.subject.Subject, typing.Generic[TEventDatum]): def __init__(self) -> None: super().__init__() @@ -311,7 +303,6 @@ class EventSource(rx.subject.Subject, typing.Generic[TEventDatum]): # A `Disposable` class that can 'fan out' `dispose()` calls to perform additional # cleanup actions. # - class DisposePropagator(Disposable): def __init__(self): self.disposables: typing.List[Disposable] = [] @@ -334,34 +325,3 @@ class DisposeWrapper(Disposable): def dispose(self): self.callable() - - -# # 🥭 FileToucherObserver class -# -# An `Observer` that touches a file every time an item is observed, and deletes that file when the -# `Observable` completes. -# -# The use case for this is for things like health checks. If a file like /var/tmp/helathz is touched -# every time an item is processed, systems running contianer images can watch for these files and -# trigger alerts or restarts if the file hasn't been touched within a certain time limit. -# -class FileToucherObserver(rx.core.typing.Observer): - def __init__(self, filename: str): - self.logger: logging.Logger = logging.getLogger(self.__class__.__name__) - self.filename = filename - - def on_next(self, _: typing.Any) -> None: - try: - Path(self.filename).touch(mode=0o666, exist_ok=True) - except Exception as exception: - self.logger.warning(f"Touching file '{self.filename}' raised exception: {exception}") - - def on_error(self, exception: Exception) -> None: - self.logger.warning(f"FileTouchObserver ignoring error: {exception}") - - def on_completed(self) -> None: - try: - self.logger.info(f"Cleaning up touch file '{self.filename}'.") - Path(self.filename).unlink(missing_ok=True) - except Exception as exception: - self.logger.warning(f"Deleting touch file '{self.filename}' raised exception: {exception}") diff --git a/mango/watchers.py b/mango/watchers.py new file mode 100644 index 0000000..22d32e8 --- /dev/null +++ b/mango/watchers.py @@ -0,0 +1,182 @@ +# # ⚠ Warning +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT +# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +# NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +# [🥭 Mango Markets](https://mango.markets/) support is available at: +# [Docs](https://docs.mango.markets/) +# [Discord](https://discord.gg/67jySBhxrg) +# [Twitter](https://twitter.com/mangomarkets) +# [Github](https://github.com/blockworks-foundation) +# [Email](mailto:hello@blockworks.foundation) + +import logging +import typing + +from pyserum.market import Market as PySerumMarket + +from .account import Account +from .combinableinstructions import CombinableInstructions +from .context import Context +from .group import Group +from .healthcheck import HealthCheck +from .instructions import build_create_serum_open_orders_instructions +from .inventory import Inventory, InventorySource +from .market import Market +from .observables import DisposePropagator, LatestItemObserverSubscriber +from .openorders import OpenOrders +from .oracle import Price +from .oraclefactory import OracleProvider, create_oracle_provider +from .perpmarket import PerpMarket +from .placedorder import PlacedOrdersContainer +from .serummarket import SerumMarket +from .spotmarket import SpotMarket +from .spotmarketinstructionbuilder import SpotMarketInstructionBuilder +from .spotmarketoperations import SpotMarketOperations +from .tokenaccount import TokenAccount +from .wallet import Wallet +from .watcher import Watcher, LamdaUpdateWatcher +from .websocketsubscription import WebSocketAccountSubscription, WebSocketSubscription, WebSocketSubscriptionManager + + +def build_group_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, group: Group) -> Watcher[Group]: + group_subscription = WebSocketAccountSubscription[Group]( + context, group.address, lambda account_info: Group.parse(context, account_info)) + manager.add(group_subscription) + latest_group_observer = LatestItemObserverSubscriber(group) + group_subscription.publisher.subscribe(latest_group_observer) + health_check.add("group_subscription", group_subscription.publisher) + return latest_group_observer + + +def build_account_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, account: Account, group_observer: Watcher[Group]) -> typing.Tuple[WebSocketSubscription[Account], Watcher[Account]]: + account_subscription = WebSocketAccountSubscription[Account]( + context, account.address, lambda account_info: Account.parse(account_info, group_observer.latest)) + manager.add(account_subscription) + latest_account_observer = LatestItemObserverSubscriber(account) + account_subscription.publisher.subscribe(latest_account_observer) + health_check.add("account_subscription", account_subscription.publisher) + return account_subscription, latest_account_observer + + +def build_spot_open_orders_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, wallet: Wallet, account: Account, group: Group, spot_market: SpotMarket) -> Watcher[PlacedOrdersContainer]: + market_index = group.find_spot_market_index(spot_market.address) + open_orders_address = account.spot_open_orders[market_index] + if open_orders_address is None: + spot_market_instruction_builder: SpotMarketInstructionBuilder = SpotMarketInstructionBuilder.load( + context, wallet, spot_market.group, account, spot_market) + market_operations: SpotMarketOperations = SpotMarketOperations( + context, wallet, spot_market.group, account, spot_market, spot_market_instruction_builder) + open_orders_address = market_operations.create_openorders_for_market() + logging.info(f"Created {spot_market.symbol} OpenOrders at: {open_orders_address}") + + # This line is a little nasty. Now that we know we have an OpenOrders account at this address, update + # the Account so that future uses (like later in this method) have access to it in the right place. + account.update_spot_open_orders_for_market(market_index, open_orders_address) + + spot_open_orders_subscription = WebSocketAccountSubscription[OpenOrders]( + context, open_orders_address, lambda account_info: OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals)) + manager.add(spot_open_orders_subscription) + initial_spot_open_orders = OpenOrders.load( + context, open_orders_address, spot_market.base.decimals, spot_market.quote.decimals) + latest_open_orders_observer = LatestItemObserverSubscriber[PlacedOrdersContainer]( + initial_spot_open_orders) + spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer) + health_check.add("open_orders_subscription", spot_open_orders_subscription.publisher) + return latest_open_orders_observer + + +def build_serum_open_orders_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, serum_market: SerumMarket, wallet: Wallet) -> Watcher[PlacedOrdersContainer]: + all_open_orders = OpenOrders.load_for_market_and_owner( + context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals) + if len(all_open_orders) > 0: + initial_serum_open_orders: OpenOrders = all_open_orders[0] + open_orders_address = initial_serum_open_orders.address + else: + raw_market = PySerumMarket.load(context.client.compatible_client, serum_market.address) + create_open_orders = build_create_serum_open_orders_instructions( + context, wallet, raw_market) + + open_orders_address = create_open_orders.signers[0].public_key() + + logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.") + signers: CombinableInstructions = CombinableInstructions.from_wallet(wallet) + transaction_ids = (signers + create_open_orders).execute(context) + context.client.wait_for_confirmation(transaction_ids) + initial_serum_open_orders = OpenOrders.load( + context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals) + + serum_open_orders_subscription = WebSocketAccountSubscription[OpenOrders]( + context, open_orders_address, lambda account_info: OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals)) + + manager.add(serum_open_orders_subscription) + + latest_serum_open_orders_observer = LatestItemObserverSubscriber[PlacedOrdersContainer]( + initial_serum_open_orders) + serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer) + health_check.add("open_orders_subscription", serum_open_orders_subscription.publisher) + return latest_serum_open_orders_observer + + +def build_perp_open_orders_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, perp_market: PerpMarket, account: Account, group: Group, account_subscription: WebSocketSubscription[Account]) -> Watcher[PlacedOrdersContainer]: + index = group.find_perp_market_index(perp_market.address) + initial_perp_account = account.perp_accounts[index] + if initial_perp_account is None: + raise Exception(f"Could not find perp account at index {index} of account {account.address}.") + initial_open_orders = initial_perp_account.open_orders + latest_open_orders_observer = LatestItemObserverSubscriber[PlacedOrdersContainer](initial_open_orders) + account_subscription.publisher.subscribe( + on_next=lambda updated_account: latest_open_orders_observer.on_next(updated_account.perp_accounts[index].open_orders)) + health_check.add("open_orders_subscription", account_subscription.publisher) + return latest_open_orders_observer + + +def build_price_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, disposer: DisposePropagator, provider_name: str, market: Market) -> LatestItemObserverSubscriber[Price]: + oracle_provider: OracleProvider = create_oracle_provider(context, provider_name) + oracle = oracle_provider.oracle_for_market(context, market) + if oracle is None: + raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.") + + initial_price = oracle.fetch_price(context) + price_feed = oracle.to_streaming_observable(context) + latest_price_observer = LatestItemObserverSubscriber(initial_price) + price_disposable = price_feed.subscribe(latest_price_observer) + disposer.add_disposable(price_disposable) + health_check.add("price_subscription", price_feed) + return latest_price_observer + + +def build_serum_inventory_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, disposer: DisposePropagator, wallet: Wallet, market: Market) -> Watcher[Inventory]: + base_account = TokenAccount.fetch_largest_for_owner_and_token( + context, wallet.address, market.base) + if base_account is None: + raise Exception( + f"Could not find token account owned by {wallet.address} for base token {market.base}.") + base_token_subscription = WebSocketAccountSubscription[TokenAccount]( + context, base_account.address, lambda account_info: TokenAccount.parse(account_info, market.base)) + manager.add(base_token_subscription) + latest_base_token_account_observer = LatestItemObserverSubscriber(base_account) + base_subscription_disposable = base_token_subscription.publisher.subscribe(latest_base_token_account_observer) + disposer.add_disposable(base_subscription_disposable) + + quote_account = TokenAccount.fetch_largest_for_owner_and_token( + context, wallet.address, market.quote) + if quote_account is None: + raise Exception( + f"Could not find token account owned by {wallet.address} for quote token {market.quote}.") + quote_token_subscription = WebSocketAccountSubscription[TokenAccount]( + context, quote_account.address, lambda account_info: TokenAccount.parse(account_info, market.quote)) + manager.add(quote_token_subscription) + latest_quote_token_account_observer = LatestItemObserverSubscriber(quote_account) + quote_subscription_disposable = quote_token_subscription.publisher.subscribe(latest_quote_token_account_observer) + disposer.add_disposable(quote_subscription_disposable) + + def serum_inventory_accessor() -> Inventory: + return Inventory(InventorySource.SPL_TOKENS, + latest_base_token_account_observer.latest.value, + latest_quote_token_account_observer.latest.value) + + return LamdaUpdateWatcher(serum_inventory_accessor)