From 153ebae2f412a0da833c15c8a9771e32a0e3eed4 Mon Sep 17 00:00:00 2001 From: Geoff Taylor Date: Sat, 21 Aug 2021 13:25:28 +0100 Subject: [PATCH] Moved opening of websocket to be entirely within WebSocketSubscriptionManager. --- bin/hedger | 10 ++-------- bin/log-subscribe | 9 ++------- bin/marketmaker | 9 ++------- bin/watch-address | 9 ++------- bin/watch-liquidations | 10 ++-------- bin/watch-minimum-balances | 14 ++------------ mango/websocketsubscription.py | 29 ++++++++++++++++++++++++----- 7 files changed, 36 insertions(+), 54 deletions(-) diff --git a/bin/hedger b/bin/hedger index 2026107..83358d9 100755 --- a/bin/hedger +++ b/bin/hedger @@ -47,7 +47,7 @@ group = mango.Group.load(context, context.group_id) account = mango.Account.load_for_owner_by_index(context, wallet.address, group, args.account_index) disposer = mango.DisposePropagator() -manager = mango.WebSocketSubscriptionManager(context.name) +manager = mango.WebSocketSubscriptionManager(context) disposer.add_disposable(manager) health_check = mango.HealthCheck() disposer.add_disposable(health_check) @@ -114,12 +114,7 @@ def hedge(event: mango.PerpFillEvent) -> None: publisher.subscribe(on_next=hedge) -websocket_url = context.client.cluster_url.replace("https", "ws", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) -ws.item.subscribe(on_next=manager.on_item) -ws.ping_interval = 10 -health_check.add("ws_pong", ws.pong) -ws.open() +manager.open() logging.info(f"Current assets in account {account.address} (owner: {account.owner}):") mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info) @@ -132,6 +127,5 @@ except: pass logging.info("Shutting down...") -ws.close() disposer.dispose() logging.info("Shutdown complete.") diff --git a/bin/log-subscribe b/bin/log-subscribe index 102d549..2eabc92 100755 --- a/bin/log-subscribe +++ b/bin/log-subscribe @@ -25,7 +25,7 @@ logging.warning(mango.WARNING_DISCLAIMER_TEXT) context = mango.ContextBuilder.from_command_line_parameters(args) disposer = mango.DisposePropagator() -manager = mango.WebSocketSubscriptionManager(context.name) +manager = mango.WebSocketSubscriptionManager(context) disposer.add_disposable(manager) log_subscription = mango.WebSocketLogSubscription(context, args.address) @@ -34,11 +34,7 @@ publisher = log_subscription.publisher publisher.subscribe(mango.PrintingObserverSubscriber(False)) -websocket_url = context.client.cluster_url.replace("https", "wss", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) -ws.item.subscribe(on_next=manager.on_item) -ws.ping_interval = 10 -ws.open() +manager.open() # Wait - don't exit. Exiting will be handled by signals/interrupts. waiter = threading.Event() @@ -48,6 +44,5 @@ except: pass logging.info("Shutting down...") -ws.close() disposer.dispose() logging.info("Shutdown complete.") diff --git a/bin/marketmaker b/bin/marketmaker index f6e35f9..6b25c2b 100755 --- a/bin/marketmaker +++ b/bin/marketmaker @@ -67,7 +67,7 @@ context = mango.ContextBuilder.from_command_line_parameters(args) logging.info(f"{context}") disposer = mango.DisposePropagator() -manager = mango.WebSocketSubscriptionManager(context.name) +manager = mango.WebSocketSubscriptionManager(context) disposer.add_disposable(manager) health_check = mango.HealthCheck() disposer.add_disposable(health_check) @@ -112,11 +112,7 @@ elif isinstance(market, mango.PerpMarket): else: raise Exception(f"Could not determine type of market {market.symbol}") -websocket_url = context.client.cluster_url.replace("https", "ws", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) -ws.item.subscribe(on_next=manager.on_item) -ws.ping_interval = 10 -ws.open() +manager.open() order_reconciler = mango.marketmaking.ToleranceOrderReconciler( args.existing_order_tolerance, args.existing_order_tolerance) @@ -149,7 +145,6 @@ except: pass logging.info("Shutting down...") -ws.close() disposer.dispose() cleanup(context, wallet, account, market, args.dry_run) logging.info("Shutdown complete.") diff --git a/bin/watch-address b/bin/watch-address index 9a80ce8..610fdee 100755 --- a/bin/watch-address +++ b/bin/watch-address @@ -34,7 +34,7 @@ logging.warning(mango.WARNING_DISCLAIMER_TEXT) context = mango.ContextBuilder.from_command_line_parameters(args) disposer = mango.DisposePropagator() -manager = mango.WebSocketSubscriptionManager(context.name) +manager = mango.WebSocketSubscriptionManager(context) disposer.add_disposable(manager) if args.account_type.upper() == "ACCOUNTINFO": @@ -69,11 +69,7 @@ else: publisher.subscribe(mango.PrintingObserverSubscriber(False)) -websocket_url = context.client.cluster_url.replace("https", "ws", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) -ws.item.subscribe(on_next=manager.on_item) -ws.ping_interval = 10 -ws.open() +manager.open() # Wait - don't exit. Exiting will be handled by signals/interrupts. waiter = threading.Event() @@ -83,6 +79,5 @@ except: pass logging.info("Shutting down...") -ws.close() disposer.dispose() logging.info("Shutdown complete.") diff --git a/bin/watch-liquidations b/bin/watch-liquidations index 26a8bcd..af5b7d6 100755 --- a/bin/watch-liquidations +++ b/bin/watch-liquidations @@ -38,7 +38,7 @@ context.client.commitment = Max print(context) disposer = mango.DisposePropagator() -manager = mango.WebSocketSubscriptionManager(context.name) +manager = mango.WebSocketSubscriptionManager(context) disposer.add_disposable(manager) health_check = mango.HealthCheck() disposer.add_disposable(health_check) @@ -63,12 +63,7 @@ publisher.pipe( rx.operators.retry() ).subscribe(mango.PrintingObserverSubscriber(False)) -websocket_url = context.client.cluster_url.replace("https", "wss", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) -ws.item.subscribe(on_next=manager.on_item) -ws.ping_interval = 10 -health_check.add("ws_pong", ws.pong) -ws.open() +manager.open() # Wait - don't exit. Exiting will be handled by signals/interrupts. waiter = threading.Event() @@ -78,6 +73,5 @@ except: pass logging.info("Shutting down...") -ws.close() disposer.dispose() logging.info("Shutdown complete.") diff --git a/bin/watch-minimum-balances b/bin/watch-minimum-balances index 97aeda2..2165fc7 100755 --- a/bin/watch-minimum-balances +++ b/bin/watch-minimum-balances @@ -103,7 +103,7 @@ def add_subscription_for_parameter(context: mango.Context, manager: mango.WebSoc context = mango.ContextBuilder.from_command_line_parameters(args) disposer = mango.DisposePropagator() -manager = mango.WebSocketSubscriptionManager(context.name) +manager = mango.WebSocketSubscriptionManager(context) disposer.add_disposable(manager) health_check = mango.HealthCheck() disposer.add_disposable(health_check) @@ -111,16 +111,7 @@ disposer.add_disposable(health_check) for name_and_address in args.named_address: add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address) -websocket_url = context.client.cluster_url.replace("https", "ws", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) -ws.item.subscribe(on_next=manager.on_item) -ws.ping_interval = 10 -health_check.add("ws_pong", ws.pong) -ws_connecting_log_disposable = ws.connecting.subscribe( - on_next=lambda _: send_event_notification("Re-connecting websocket")) -disposer.add_disposable(ws_connecting_log_disposable) - -ws.open() +manager.open() send_event_notification(f"Starting using context:\n{context}") @@ -132,6 +123,5 @@ except: pass logging.info("Shutting down...") -ws.close() disposer.dispose() logging.info("Shutdown complete.") diff --git a/mango/websocketsubscription.py b/mango/websocketsubscription.py index efafa39..86cceff 100644 --- a/mango/websocketsubscription.py +++ b/mango/websocketsubscription.py @@ -25,6 +25,7 @@ from solana.rpc.types import RPCResponse from .accountinfo import AccountInfo from .context import Context from .observables import EventSource +from .reconnectingwebsocket import ReconnectingWebsocket # # 🥭 WebSocketSubscription class @@ -157,14 +158,29 @@ class WebSocketLogSubscription(WebSocketSubscription[LogEvent]): # `WebSocketSubscription`. # class WebSocketSubscriptionManager(Disposable): - def __init__(self, program_name: str): + def __init__(self, context: Context, ping_interval: int = 10): self.logger: logging.Logger = logging.getLogger(self.__class__.__name__) - self.program_name: str = program_name + self.context: Context = context + self.ping_interval: int = ping_interval self.subscriptions: typing.List[WebSocketSubscription] = [] + self.ws: typing.Optional[ReconnectingWebsocket] = None def add(self, subscription: WebSocketSubscription) -> None: self.subscriptions += [subscription] + def open(self) -> None: + websocket_url = self.context.client.cluster_url.replace("https", "wss", 1) + ws: ReconnectingWebsocket = ReconnectingWebsocket(websocket_url, self.open_handler) + ws.item.subscribe(on_next=self.on_item) + ws.ping_interval = self.ping_interval + self.ws = ws + ws.open() + + def close(self) -> None: + if self.ws is not None: + self.ws.close() + self.ws = None + def add_subscription_id(self, id, subscription_id) -> None: for subscription in self.subscriptions: if subscription.id == id: @@ -172,13 +188,13 @@ class WebSocketSubscriptionManager(Disposable): f"Setting ID {subscription_id} on subscription {subscription.id} for {subscription.address}.") subscription.subscription_id = subscription_id return - self.logger.error(f"[{self.program_name}] Subscription ID {id} not found") + self.logger.error(f"[{self.context.name}] Subscription ID {id} not found") def subscription_by_subscription_id(self, subscription_id) -> WebSocketSubscription: for subscription in self.subscriptions: if subscription.subscription_id == subscription_id: return subscription - raise Exception(f"[{self.program_name}] No subscription with subscription ID {subscription_id} could be found.") + raise Exception(f"[{self.context.name}] No subscription with subscription ID {subscription_id} could be found.") def on_item(self, response) -> None: if "method" not in response: @@ -191,7 +207,7 @@ class WebSocketSubscriptionManager(Disposable): built = subscription.build(response["params"]) subscription.publisher.publish(built) else: - self.logger.error(f"[{self.program_name}] Unknown response: {response}") + self.logger.error(f"[{self.context.name}] Unknown response: {response}") def open_handler(self, ws: websocket.WebSocketApp): for subscription in self.subscriptions: @@ -203,3 +219,6 @@ class WebSocketSubscriptionManager(Disposable): def dispose(self): for subscription in self.subscriptions: subscription.dispose() + if self.ws is not None: + self.ws.close() + self.ws = None