Added/centralised 'watchers'. Removed FileToucherObserver. Added HealthCheck.

This commit is contained in:
Geoff Taylor 2021-08-17 12:17:49 +01:00
parent bd36dcf47b
commit a4ca18e5ed
8 changed files with 262 additions and 223 deletions

View File

@ -49,6 +49,8 @@ account = mango.Account.load_for_owner_by_index(context, wallet.address, group,
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
buy_price_adjustment_factor: Decimal = Decimal("1") + args.max_price_slippage_factor
sell_price_adjustment_factor: Decimal = Decimal("1") - args.max_price_slippage_factor
@ -116,7 +118,7 @@ websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong"))
health_check.add("ws_pong", ws.pong)
ws.open()
logging.info(f"Current assets in account {account.address} (owner: {account.owner}):")

View File

@ -7,10 +7,8 @@ import os.path
import rx
import sys
import threading
import typing
from decimal import Decimal
from pyserum.market import Market as PySerumMarket
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
@ -63,158 +61,13 @@ def cleanup(context: mango.Context, wallet: mango.Wallet, account: mango.Account
market_operations.settle()
def add_file_health(name: str, observable: rx.core.typing.Observable[typing.Any], disposer: mango.DisposePropagator):
healthcheck_file_touch_disposer = observable.subscribe(
mango.FileToucherObserver(f"/var/tmp/mango_healthcheck_{name}"))
disposer.add_disposable(healthcheck_file_touch_disposer)
def build_latest_group_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group: mango.Group) -> mango.LatestItemObserverSubscriber[mango.Group]:
group_subscription = mango.WebSocketAccountSubscription[mango.Group](
context, group.address, lambda account_info: mango.Group.parse(context, account_info))
manager.add(group_subscription)
latest_group_observer = mango.LatestItemObserverSubscriber(group)
group_subscription.publisher.subscribe(latest_group_observer)
add_file_health("group_subscription", group_subscription.publisher, disposer)
return latest_group_observer
def build_latest_account_observer(context: mango.Context, account: mango.Account, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> typing.Tuple[mango.WebSocketSubscription[mango.Account], mango.LatestItemObserverSubscriber[mango.Account]]:
account_subscription = mango.WebSocketAccountSubscription[mango.Account](
context, account.address, lambda account_info: mango.Account.parse(account_info, group_observer.latest))
manager.add(account_subscription)
latest_account_observer = mango.LatestItemObserverSubscriber(account)
account_subscription.publisher.subscribe(latest_account_observer)
add_file_health("account_subscription", account_subscription.publisher, disposer)
return account_subscription, latest_account_observer
def build_latest_spot_open_orders_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, account: mango.Account, spot_market: mango.SpotMarket) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
market_index = group.find_spot_market_index(spot_market.address)
open_orders_address = account.spot_open_orders[market_index]
if open_orders_address is None:
spot_market_instruction_builder: mango.SpotMarketInstructionBuilder = mango.SpotMarketInstructionBuilder.load(
context, wallet, spot_market.group, account, spot_market)
market_operations: mango.SpotMarketOperations = mango.SpotMarketOperations(
context, wallet, spot_market.group, account, spot_market, spot_market_instruction_builder)
open_orders_address = market_operations.create_openorders_for_market()
logging.info(f"Created {spot_market.symbol} OpenOrders at: {open_orders_address}")
# This line is a little nasty. Now that we know we have an OpenOrders account at this address, update
# the Account so that future uses (like later in this method) have access to it in the right place.
account.update_spot_open_orders_for_market(market_index, open_orders_address)
spot_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals))
manager.add(spot_open_orders_subscription)
initial_spot_open_orders = mango.OpenOrders.load(
context, open_orders_address, spot_market.base.decimals, spot_market.quote.decimals)
latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](
initial_spot_open_orders)
spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer)
add_file_health("open_orders_subscription", spot_open_orders_subscription.publisher, disposer)
return latest_open_orders_observer
def build_latest_serum_open_orders_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, serum_market: mango.SerumMarket, context: mango.Context, wallet: mango.Wallet) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
all_open_orders = mango.OpenOrders.load_for_market_and_owner(
context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals)
if len(all_open_orders) > 0:
initial_serum_open_orders: mango.OpenOrders = all_open_orders[0]
open_orders_address = initial_serum_open_orders.address
else:
raw_market = PySerumMarket.load(context.client.compatible_client, serum_market.address)
create_open_orders = mango.build_create_serum_open_orders_instructions(
context, wallet, raw_market)
open_orders_address = create_open_orders.signers[0].public_key()
logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.")
signers: mango.CombinableInstructions = mango.CombinableInstructions.from_wallet(wallet)
transaction_ids = (signers + create_open_orders).execute(context)
context.client.wait_for_confirmation(transaction_ids)
initial_serum_open_orders = mango.OpenOrders.load(
context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals)
serum_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals))
manager.add(serum_open_orders_subscription)
latest_serum_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](
initial_serum_open_orders)
serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer)
add_file_health("open_orders_subscription", serum_open_orders_subscription.publisher, disposer)
return latest_serum_open_orders_observer
def build_latest_perp_open_orders_observer(disposer: mango.DisposePropagator, perp_market: mango.PerpMarket, account: mango.Account, account_subscription: mango.WebSocketSubscription[mango.Account]) -> mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer]:
index = group.find_perp_market_index(perp_market.address)
initial_perp_account = account.perp_accounts[index]
if initial_perp_account is None:
raise Exception(f"Could not find perp account at index {index} of account {account.address}.")
initial_open_orders = initial_perp_account.open_orders
latest_open_orders_observer = mango.LatestItemObserverSubscriber[mango.PlacedOrdersContainer](initial_open_orders)
account_subscription.publisher.subscribe(
on_next=lambda updated_account: latest_open_orders_observer.on_next(updated_account.perp_accounts[index].open_orders))
add_file_health("open_orders_subscription", account_subscription.publisher, disposer)
return latest_open_orders_observer
def build_latest_price_observer(context: mango.Context, disposer: mango.DisposePropagator, provider_name: str, market: mango.Market) -> mango.LatestItemObserverSubscriber[mango.Price]:
oracle_provider: mango.OracleProvider = mango.create_oracle_provider(context, provider_name)
oracle = oracle_provider.oracle_for_market(context, market)
if oracle is None:
raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.")
initial_price = oracle.fetch_price(context)
price_feed = oracle.to_streaming_observable(context)
latest_price_observer = mango.LatestItemObserverSubscriber(initial_price)
price_disposable = price_feed.subscribe(latest_price_observer)
disposer.add_disposable(price_disposable)
add_file_health("price_subscription", price_feed, disposer)
return latest_price_observer
def build_serum_inventory_observer(context: mango.Context, disposer: mango.DisposePropagator, market: mango.Market) -> mango.Watcher[mango.Inventory]:
base_account = mango.TokenAccount.fetch_largest_for_owner_and_token(
context, wallet.address, market.base)
if base_account is None:
raise Exception(
f"Could not find token account owned by {wallet.address} for base token {market.base}.")
base_token_subscription = mango.WebSocketAccountSubscription[mango.TokenAccount](
context, base_account.address, lambda account_info: mango.TokenAccount.parse(account_info, market.base))
manager.add(base_token_subscription)
latest_base_token_account_observer = mango.LatestItemObserverSubscriber(base_account)
base_subscription_disposable = base_token_subscription.publisher.subscribe(latest_base_token_account_observer)
disposer.add_disposable(base_subscription_disposable)
quote_account = mango.TokenAccount.fetch_largest_for_owner_and_token(
context, wallet.address, market.quote)
if quote_account is None:
raise Exception(
f"Could not find token account owned by {wallet.address} for quote token {market.quote}.")
quote_token_subscription = mango.WebSocketAccountSubscription[mango.TokenAccount](
context, quote_account.address, lambda account_info: mango.TokenAccount.parse(account_info, market.quote))
manager.add(quote_token_subscription)
latest_quote_token_account_observer = mango.LatestItemObserverSubscriber(quote_account)
quote_subscription_disposable = quote_token_subscription.publisher.subscribe(latest_quote_token_account_observer)
disposer.add_disposable(quote_subscription_disposable)
def serum_inventory_accessor() -> mango.Inventory:
return mango.Inventory(mango.InventorySource.SPL_TOKENS,
latest_base_token_account_observer.latest.value,
latest_quote_token_account_observer.latest.value)
return mango.LamdaUpdateWatcher(serum_inventory_accessor)
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
wallet = mango.Wallet.from_command_line_parameters_or_raise(args)
group = mango.Group.load(context, context.group_id)
@ -232,24 +85,27 @@ if market.quote != group.shared_quote_token.token:
cleanup(context, wallet, account, market, args.dry_run)
latest_group_observer = build_latest_group_observer(context, manager, disposer, group)
account_subscription, latest_account_observer = build_latest_account_observer(
context, account, manager, disposer, latest_group_observer)
latest_price_observer = build_latest_price_observer(context, disposer, args.oracle_provider, market)
latest_group_observer = mango.build_group_watcher(context, manager, health_check, group)
account_subscription, latest_account_observer = mango.build_account_watcher(
context, manager, health_check, account, latest_group_observer)
latest_price_observer = mango.build_price_watcher(
context, manager, health_check, disposer, args.oracle_provider, market)
market = mango.ensure_market_loaded(context, market)
market_instruction_builder: mango.MarketInstructionBuilder = mango.create_market_instruction_builder(
context, wallet, account, market, args.dry_run)
if isinstance(market, mango.SerumMarket):
inventory_watcher: mango.Watcher[mango.Inventory] = build_serum_inventory_observer(context, disposer, market)
latest_open_orders_observer = build_latest_serum_open_orders_observer(manager, disposer, market, context, wallet)
inventory_watcher: mango.Watcher[mango.Inventory] = mango.build_serum_inventory_watcher(
context, manager, health_check, disposer, wallet, market)
latest_open_orders_observer = mango.build_serum_open_orders_watcher(context, manager, health_check, market, wallet)
elif isinstance(market, mango.SpotMarket):
inventory_watcher = mango.InventoryAccountWatcher(market, latest_account_observer)
latest_open_orders_observer = build_latest_spot_open_orders_observer(context, manager, disposer, account, market)
latest_open_orders_observer = mango.build_spot_open_orders_watcher(
context, manager, health_check, wallet, account, group, market)
elif isinstance(market, mango.PerpMarket):
inventory_watcher = mango.InventoryAccountWatcher(market, latest_account_observer)
latest_open_orders_observer = build_latest_perp_open_orders_observer(
disposer, market, account, account_subscription)
latest_open_orders_observer = mango.build_perp_open_orders_watcher(
context, manager, health_check, market, account, group, account_subscription)
else:
raise Exception(f"Could not determine type of market {market.symbol}")
@ -270,7 +126,7 @@ model_state = mango.marketmaking.ModelState(market, latest_account_observer,
latest_open_orders_observer,
inventory_watcher)
add_file_health("marketmaker_pulse", market_maker.pulse_complete, disposer)
health_check.add("marketmaker_pulse", market_maker.pulse_complete)
logging.info(f"Current assets in account {account.address} (owner: {account.owner}):")
mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info)

View File

@ -40,6 +40,8 @@ print(context)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
publishers = []
for address in args.address:
@ -65,10 +67,7 @@ websocket_url = context.client.cluster_url.replace("https", "wss", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong"))
disposer.add_disposable(ws_pong_disposable)
health_check.add("ws_pong", ws.pong)
ws.open()
# Wait - don't exit. Exiting will be handled by signals/interrupts.

View File

@ -105,6 +105,8 @@ context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
for name_and_address in args.named_address:
add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address)
@ -113,11 +115,7 @@ websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong"))
disposer.add_disposable(ws_pong_disposable)
ws_pong_log_disposable = ws.pong.subscribe(on_next=lambda _: logging.info("Pong"))
disposer.add_disposable(ws_pong_log_disposable)
health_check.add("ws_pong", ws.pong)
ws_connecting_log_disposable = ws.connecting.subscribe(
on_next=lambda _: send_event_notification("Re-connecting websocket"))
disposer.add_disposable(ws_connecting_log_disposable)

View File

@ -17,6 +17,7 @@ from .createmarketoperations import create_market_operations
from .encoding import decode_binary, encode_binary, encode_key, encode_int
from .ensuremarketloaded import ensure_market_loaded
from .group import GroupBasketMarket, Group
from .healthcheck import HealthCheck
from .idsjsontokenlookup import IdsJsonTokenLookup
from .idsjsonmarketlookup import IdsJsonMarketLookup
from .inventory import Inventory, InventoryAccountWatcher, spl_token_inventory_loader, account_inventory_loader
@ -32,7 +33,7 @@ from .marketlookup import MarketLookup, NullMarketLookup, CompoundMarketLookup
from .marketoperations import MarketOperations, NullMarketOperations
from .metadata import Metadata
from .notification import NotificationTarget, TelegramNotificationTarget, DiscordNotificationTarget, MailjetNotificationTarget, CsvFileNotificationTarget, FilteringNotificationTarget, NotificationHandler, parse_subscription_target
from .observables import DisposePropagator, DisposeWrapper, NullObserverSubscriber, PrintingObserverSubscriber, TimestampedPrintingObserverSubscriber, CollectingObserverSubscriber, LatestItemObserverSubscriber, CaptureFirstItem, FunctionObserver, create_backpressure_skipping_observer, debug_print_item, log_subscription_error, observable_pipeline_error_reporter, EventSource, FileToucherObserver
from .observables import DisposePropagator, DisposeWrapper, NullObserverSubscriber, PrintingObserverSubscriber, TimestampedPrintingObserverSubscriber, CollectingObserverSubscriber, LatestItemObserverSubscriber, CaptureFirstItem, FunctionObserver, create_backpressure_skipping_observer, debug_print_item, log_subscription_error, observable_pipeline_error_reporter, EventSource
from .openorders import OpenOrders
from .oracle import OracleSource, Price, Oracle, OracleProvider, SupportedOracleFeature
from .orderbookside import OrderBookSide
@ -73,6 +74,7 @@ from .version import Version
from .wallet import Wallet
from .walletbalancer import TargetBalance, FixedTargetBalance, PercentageTargetBalance, TargetBalanceParser, sort_changes_for_trades, calculate_required_balance_changes, FilterSmallChanges, WalletBalancer, NullWalletBalancer, LiveWalletBalancer
from .watcher import Watcher, ManualUpdateWatcher, LamdaUpdateWatcher
from .watchers import build_group_watcher, build_account_watcher, build_spot_open_orders_watcher, build_serum_open_orders_watcher, build_perp_open_orders_watcher, build_price_watcher, build_serum_inventory_watcher
from .websocketsubscription import WebSocketSubscription, WebSocketProgramSubscription, WebSocketAccountSubscription, WebSocketLogSubscription, WebSocketSubscriptionManager
from .layouts import layouts

40
mango/healthcheck.py Normal file
View File

@ -0,0 +1,40 @@
# # ⚠ Warning
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
# NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# [🥭 Mango Markets](https://mango.markets/) support is available at:
# [Docs](https://docs.mango.markets/)
# [Discord](https://discord.gg/67jySBhxrg)
# [Twitter](https://twitter.com/mangomarkets)
# [Github](https://github.com/blockworks-foundation)
# [Email](mailto:hello@blockworks.foundation)
import rx
import typing
from pathlib import Path
# # 🥭 HealthCheck class
#
# `HealthCheck` adds an observable to the health checks. Current version touches a file when a new item
# is observed from that observable, so that external systems can watch those files and gain insight into
# the health of the system.
#
class HealthCheck(rx.core.typing.Disposable):
def __init__(self, healthcheck_files_location: str = "/var/tmp"):
self.healthcheck_files_location: str = healthcheck_files_location
self._to_dispose: typing.List[rx.core.typing.Disposable] = []
def add(self, name: str, observable: rx.core.typing.Observable[typing.Any]):
healthcheck_file_touch_disposer = observable.subscribe(
on_next=lambda _: Path(f"{self.healthcheck_files_location}/mango_healthcheck_{name}").touch(mode=0o666, exist_ok=True))
self._to_dispose += [healthcheck_file_touch_disposer]
def dispose(self) -> None:
for disposable in self._to_dispose:
disposable.dispose()

View File

@ -20,7 +20,6 @@ import rx.subject
import typing
from datetime import datetime
from pathlib import Path
from rx.core.abc.disposable import Disposable
from rxpy_backpressure import BackPressure
@ -55,8 +54,6 @@ class NullObserverSubscriber(rx.core.typing.Observer):
#
# This class can subscribe to an `Observable` and print out each item.
#
class PrintingObserverSubscriber(rx.core.typing.Observer):
def __init__(self, report_no_output: bool) -> None:
super().__init__()
@ -81,7 +78,6 @@ class PrintingObserverSubscriber(rx.core.typing.Observer):
#
# Just like `PrintingObserverSubscriber` but it puts a timestamp on each printout.
#
class TimestampedPrintingObserverSubscriber(PrintingObserverSubscriber):
def __init__(self, report_no_output: bool) -> None:
super().__init__(report_no_output)
@ -94,8 +90,6 @@ class TimestampedPrintingObserverSubscriber(PrintingObserverSubscriber):
#
# This class can subscribe to an `Observable` and collect each item.
#
class CollectingObserverSubscriber(rx.core.typing.Observer):
def __init__(self) -> None:
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
@ -116,8 +110,6 @@ class CollectingObserverSubscriber(rx.core.typing.Observer):
# This captures the first item to pass through the pipeline, allowing it to be inspected
# later.
#
class CaptureFirstItem:
def __init__(self):
self.captured: typing.Any = None
@ -131,14 +123,17 @@ class CaptureFirstItem:
return item
# # 🥭 NullObserverSubscriber class
# # 🥭 TItem type parameter
#
# This class can subscribe to an `Observable` to do nothing but make sure it runs.
# The `TItem` type parameter is the type parameter for the generic `LatestItemObserverSubscriber`.
#
TItem = typing.TypeVar('TItem')
# # 🥭 LatestItemObserverSubscriber class
#
# This class can subscribe to an `Observable` and capture the latest item as it is observed.
#
class LatestItemObserverSubscriber(rx.core.typing.Observer, typing.Generic[TItem]):
def __init__(self, initial: TItem) -> None:
super().__init__()
@ -164,8 +159,6 @@ class LatestItemObserverSubscriber(rx.core.typing.Observer, typing.Generic[TItem
# This is mostly for libraries (like `rxpy_backpressure`) that take observers but not their
# component functions.
#
class FunctionObserver(rx.core.typing.Observer):
def __init__(self,
on_next: typing.Callable[[typing.Any], None],
@ -203,7 +196,6 @@ class FunctionObserver(rx.core.typing.Observer):
# take multiple seconds to complete. In that case, the latest item will be immediately
# emitted and the in-between items skipped.
#
def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any], None], on_error: typing.Callable[[Exception], None] = lambda _: None, on_completed: typing.Callable[[], None] = lambda: None) -> rx.core.typing.Observer:
observer = FunctionObserver(on_next=on_next, on_error=on_error, on_completed=on_completed)
return BackPressure.LATEST(observer)
@ -222,7 +214,6 @@ def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any],
# ).subscribe(some_subscriber)
# ```
#
def debug_print_item(title: str) -> typing.Callable[[typing.Any], typing.Any]:
def _debug_print_item(item: typing.Any) -> typing.Any:
print(title, item)
@ -234,7 +225,6 @@ def debug_print_item(title: str) -> typing.Callable[[typing.Any], typing.Any]:
#
# Logs subscription exceptions to the root logger.
#
def log_subscription_error(error: Exception) -> None:
logging.error(f"Observable subscription error: {error}")
@ -268,20 +258,22 @@ def log_subscription_error(error: Exception) -> None:
# sub1.subscribe(lambda item: print(item), on_error = lambda error: print(f"Error : {error}"))
# ```
#
def observable_pipeline_error_reporter(ex, _):
logging.error(f"Intercepted error in observable pipeline: {ex}")
raise ex
# # 🥭 TEventDatum type parameter
#
# The `TEventDatum` type parameter is the type parameter for the generic `LatestItemObserverSubscriber`.
#
TEventDatum = typing.TypeVar('TEventDatum')
# # 🥭 EventSource class
#
# A strongly(ish)-typed event source that can handle many subscribers.
#
TEventDatum = typing.TypeVar('TEventDatum')
class EventSource(rx.subject.Subject, typing.Generic[TEventDatum]):
def __init__(self) -> None:
super().__init__()
@ -311,7 +303,6 @@ class EventSource(rx.subject.Subject, typing.Generic[TEventDatum]):
# A `Disposable` class that can 'fan out' `dispose()` calls to perform additional
# cleanup actions.
#
class DisposePropagator(Disposable):
def __init__(self):
self.disposables: typing.List[Disposable] = []
@ -334,34 +325,3 @@ class DisposeWrapper(Disposable):
def dispose(self):
self.callable()
# # 🥭 FileToucherObserver class
#
# An `Observer` that touches a file every time an item is observed, and deletes that file when the
# `Observable` completes.
#
# The use case for this is for things like health checks. If a file like /var/tmp/helathz is touched
# every time an item is processed, systems running contianer images can watch for these files and
# trigger alerts or restarts if the file hasn't been touched within a certain time limit.
#
class FileToucherObserver(rx.core.typing.Observer):
def __init__(self, filename: str):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.filename = filename
def on_next(self, _: typing.Any) -> None:
try:
Path(self.filename).touch(mode=0o666, exist_ok=True)
except Exception as exception:
self.logger.warning(f"Touching file '{self.filename}' raised exception: {exception}")
def on_error(self, exception: Exception) -> None:
self.logger.warning(f"FileTouchObserver ignoring error: {exception}")
def on_completed(self) -> None:
try:
self.logger.info(f"Cleaning up touch file '{self.filename}'.")
Path(self.filename).unlink(missing_ok=True)
except Exception as exception:
self.logger.warning(f"Deleting touch file '{self.filename}' raised exception: {exception}")

182
mango/watchers.py Normal file
View File

@ -0,0 +1,182 @@
# # ⚠ Warning
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
# NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# [🥭 Mango Markets](https://mango.markets/) support is available at:
# [Docs](https://docs.mango.markets/)
# [Discord](https://discord.gg/67jySBhxrg)
# [Twitter](https://twitter.com/mangomarkets)
# [Github](https://github.com/blockworks-foundation)
# [Email](mailto:hello@blockworks.foundation)
import logging
import typing
from pyserum.market import Market as PySerumMarket
from .account import Account
from .combinableinstructions import CombinableInstructions
from .context import Context
from .group import Group
from .healthcheck import HealthCheck
from .instructions import build_create_serum_open_orders_instructions
from .inventory import Inventory, InventorySource
from .market import Market
from .observables import DisposePropagator, LatestItemObserverSubscriber
from .openorders import OpenOrders
from .oracle import Price
from .oraclefactory import OracleProvider, create_oracle_provider
from .perpmarket import PerpMarket
from .placedorder import PlacedOrdersContainer
from .serummarket import SerumMarket
from .spotmarket import SpotMarket
from .spotmarketinstructionbuilder import SpotMarketInstructionBuilder
from .spotmarketoperations import SpotMarketOperations
from .tokenaccount import TokenAccount
from .wallet import Wallet
from .watcher import Watcher, LamdaUpdateWatcher
from .websocketsubscription import WebSocketAccountSubscription, WebSocketSubscription, WebSocketSubscriptionManager
def build_group_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, group: Group) -> Watcher[Group]:
group_subscription = WebSocketAccountSubscription[Group](
context, group.address, lambda account_info: Group.parse(context, account_info))
manager.add(group_subscription)
latest_group_observer = LatestItemObserverSubscriber(group)
group_subscription.publisher.subscribe(latest_group_observer)
health_check.add("group_subscription", group_subscription.publisher)
return latest_group_observer
def build_account_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, account: Account, group_observer: Watcher[Group]) -> typing.Tuple[WebSocketSubscription[Account], Watcher[Account]]:
account_subscription = WebSocketAccountSubscription[Account](
context, account.address, lambda account_info: Account.parse(account_info, group_observer.latest))
manager.add(account_subscription)
latest_account_observer = LatestItemObserverSubscriber(account)
account_subscription.publisher.subscribe(latest_account_observer)
health_check.add("account_subscription", account_subscription.publisher)
return account_subscription, latest_account_observer
def build_spot_open_orders_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, wallet: Wallet, account: Account, group: Group, spot_market: SpotMarket) -> Watcher[PlacedOrdersContainer]:
market_index = group.find_spot_market_index(spot_market.address)
open_orders_address = account.spot_open_orders[market_index]
if open_orders_address is None:
spot_market_instruction_builder: SpotMarketInstructionBuilder = SpotMarketInstructionBuilder.load(
context, wallet, spot_market.group, account, spot_market)
market_operations: SpotMarketOperations = SpotMarketOperations(
context, wallet, spot_market.group, account, spot_market, spot_market_instruction_builder)
open_orders_address = market_operations.create_openorders_for_market()
logging.info(f"Created {spot_market.symbol} OpenOrders at: {open_orders_address}")
# This line is a little nasty. Now that we know we have an OpenOrders account at this address, update
# the Account so that future uses (like later in this method) have access to it in the right place.
account.update_spot_open_orders_for_market(market_index, open_orders_address)
spot_open_orders_subscription = WebSocketAccountSubscription[OpenOrders](
context, open_orders_address, lambda account_info: OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals))
manager.add(spot_open_orders_subscription)
initial_spot_open_orders = OpenOrders.load(
context, open_orders_address, spot_market.base.decimals, spot_market.quote.decimals)
latest_open_orders_observer = LatestItemObserverSubscriber[PlacedOrdersContainer](
initial_spot_open_orders)
spot_open_orders_subscription.publisher.subscribe(latest_open_orders_observer)
health_check.add("open_orders_subscription", spot_open_orders_subscription.publisher)
return latest_open_orders_observer
def build_serum_open_orders_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, serum_market: SerumMarket, wallet: Wallet) -> Watcher[PlacedOrdersContainer]:
all_open_orders = OpenOrders.load_for_market_and_owner(
context, serum_market.address, wallet.address, context.dex_program_id, serum_market.base.decimals, serum_market.quote.decimals)
if len(all_open_orders) > 0:
initial_serum_open_orders: OpenOrders = all_open_orders[0]
open_orders_address = initial_serum_open_orders.address
else:
raw_market = PySerumMarket.load(context.client.compatible_client, serum_market.address)
create_open_orders = build_create_serum_open_orders_instructions(
context, wallet, raw_market)
open_orders_address = create_open_orders.signers[0].public_key()
logging.info(f"Creating OpenOrders account for market {serum_market.symbol} at {open_orders_address}.")
signers: CombinableInstructions = CombinableInstructions.from_wallet(wallet)
transaction_ids = (signers + create_open_orders).execute(context)
context.client.wait_for_confirmation(transaction_ids)
initial_serum_open_orders = OpenOrders.load(
context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals)
serum_open_orders_subscription = WebSocketAccountSubscription[OpenOrders](
context, open_orders_address, lambda account_info: OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals))
manager.add(serum_open_orders_subscription)
latest_serum_open_orders_observer = LatestItemObserverSubscriber[PlacedOrdersContainer](
initial_serum_open_orders)
serum_open_orders_subscription.publisher.subscribe(latest_serum_open_orders_observer)
health_check.add("open_orders_subscription", serum_open_orders_subscription.publisher)
return latest_serum_open_orders_observer
def build_perp_open_orders_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, perp_market: PerpMarket, account: Account, group: Group, account_subscription: WebSocketSubscription[Account]) -> Watcher[PlacedOrdersContainer]:
index = group.find_perp_market_index(perp_market.address)
initial_perp_account = account.perp_accounts[index]
if initial_perp_account is None:
raise Exception(f"Could not find perp account at index {index} of account {account.address}.")
initial_open_orders = initial_perp_account.open_orders
latest_open_orders_observer = LatestItemObserverSubscriber[PlacedOrdersContainer](initial_open_orders)
account_subscription.publisher.subscribe(
on_next=lambda updated_account: latest_open_orders_observer.on_next(updated_account.perp_accounts[index].open_orders))
health_check.add("open_orders_subscription", account_subscription.publisher)
return latest_open_orders_observer
def build_price_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, disposer: DisposePropagator, provider_name: str, market: Market) -> LatestItemObserverSubscriber[Price]:
oracle_provider: OracleProvider = create_oracle_provider(context, provider_name)
oracle = oracle_provider.oracle_for_market(context, market)
if oracle is None:
raise Exception(f"Could not find oracle for market {market.symbol} from provider {provider_name}.")
initial_price = oracle.fetch_price(context)
price_feed = oracle.to_streaming_observable(context)
latest_price_observer = LatestItemObserverSubscriber(initial_price)
price_disposable = price_feed.subscribe(latest_price_observer)
disposer.add_disposable(price_disposable)
health_check.add("price_subscription", price_feed)
return latest_price_observer
def build_serum_inventory_watcher(context: Context, manager: WebSocketSubscriptionManager, health_check: HealthCheck, disposer: DisposePropagator, wallet: Wallet, market: Market) -> Watcher[Inventory]:
base_account = TokenAccount.fetch_largest_for_owner_and_token(
context, wallet.address, market.base)
if base_account is None:
raise Exception(
f"Could not find token account owned by {wallet.address} for base token {market.base}.")
base_token_subscription = WebSocketAccountSubscription[TokenAccount](
context, base_account.address, lambda account_info: TokenAccount.parse(account_info, market.base))
manager.add(base_token_subscription)
latest_base_token_account_observer = LatestItemObserverSubscriber(base_account)
base_subscription_disposable = base_token_subscription.publisher.subscribe(latest_base_token_account_observer)
disposer.add_disposable(base_subscription_disposable)
quote_account = TokenAccount.fetch_largest_for_owner_and_token(
context, wallet.address, market.quote)
if quote_account is None:
raise Exception(
f"Could not find token account owned by {wallet.address} for quote token {market.quote}.")
quote_token_subscription = WebSocketAccountSubscription[TokenAccount](
context, quote_account.address, lambda account_info: TokenAccount.parse(account_info, market.quote))
manager.add(quote_token_subscription)
latest_quote_token_account_observer = LatestItemObserverSubscriber(quote_account)
quote_subscription_disposable = quote_token_subscription.publisher.subscribe(latest_quote_token_account_observer)
disposer.add_disposable(quote_subscription_disposable)
def serum_inventory_accessor() -> Inventory:
return Inventory(InventorySource.SPL_TOKENS,
latest_base_token_account_observer.latest.value,
latest_quote_token_account_observer.latest.value)
return LamdaUpdateWatcher(serum_inventory_accessor)