From 9bd75dc8d2a26c9f7990ba2dca9c6185d6435d12 Mon Sep 17 00:00:00 2001 From: Geoff Taylor Date: Mon, 9 Aug 2021 13:24:33 +0100 Subject: [PATCH] Websocket now has a force_reconnect() method, plus subscribable disconnect event. --- bin/hedger | 3 ++- bin/log-subscribe | 3 ++- bin/marketmaker | 3 ++- bin/watch-address | 3 ++- bin/watch-liquidations | 3 ++- bin/watch-minimum-balances | 3 ++- mango/oracles/ftx/ftx.py | 4 ++-- mango/reconnectingwebsocket.py | 27 +++++++++++++++------------ mango/websocketsubscription.py | 3 +++ 9 files changed, 32 insertions(+), 20 deletions(-) diff --git a/bin/hedger b/bin/hedger index 030fe5e..14a64a7 100755 --- a/bin/hedger +++ b/bin/hedger @@ -113,7 +113,8 @@ def hedge(event: mango.PerpFillEvent) -> None: publisher.subscribe(on_next=hedge) websocket_url = context.client.cluster_url.replace("https", "ws", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item) +ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) +ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong")) ws.open() diff --git a/bin/log-subscribe b/bin/log-subscribe index d82005c..102d549 100755 --- a/bin/log-subscribe +++ b/bin/log-subscribe @@ -35,7 +35,8 @@ publisher = log_subscription.publisher publisher.subscribe(mango.PrintingObserverSubscriber(False)) websocket_url = context.client.cluster_url.replace("https", "wss", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item) +ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) +ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 ws.open() diff --git a/bin/marketmaker b/bin/marketmaker index cf0d5b2..7f34880 100755 --- a/bin/marketmaker +++ b/bin/marketmaker @@ -254,7 +254,8 @@ else: raise Exception(f"Could not determine type of market {market.symbol}") websocket_url = context.client.cluster_url.replace("https", "ws", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item) +ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) +ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 ws.open() diff --git a/bin/watch-address b/bin/watch-address index b02d6f8..9a80ce8 100755 --- a/bin/watch-address +++ b/bin/watch-address @@ -70,7 +70,8 @@ else: publisher.subscribe(mango.PrintingObserverSubscriber(False)) websocket_url = context.client.cluster_url.replace("https", "ws", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item) +ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) +ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 ws.open() diff --git a/bin/watch-liquidations b/bin/watch-liquidations index 406f168..9d57bfc 100755 --- a/bin/watch-liquidations +++ b/bin/watch-liquidations @@ -62,7 +62,8 @@ publisher.pipe( ).subscribe(mango.PrintingObserverSubscriber(False)) websocket_url = context.client.cluster_url.replace("https", "wss", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item) +ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) +ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong")) diff --git a/bin/watch-minimum-balances b/bin/watch-minimum-balances index fa66540..934a5d8 100755 --- a/bin/watch-minimum-balances +++ b/bin/watch-minimum-balances @@ -110,7 +110,8 @@ for name_and_address in args.named_address: add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address) websocket_url = context.client.cluster_url.replace("https", "ws", 1) -ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item) +ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler) +ws.item.subscribe(on_next=manager.on_item) ws.ping_interval = 10 ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong")) diff --git a/mango/oracles/ftx/ftx.py b/mango/oracles/ftx/ftx.py index 3cc7501..10c9f5e 100644 --- a/mango/oracles/ftx/ftx.py +++ b/mango/oracles/ftx/ftx.py @@ -89,8 +89,8 @@ class FtxOracle(Oracle): ws: ReconnectingWebsocket = ReconnectingWebsocket("wss://ftx.com/ws/", lambda ws: ws.send( - f"""{{"op": "subscribe", "channel": "ticker", "market": "{self.ftx_symbol}"}}"""), - _on_item) + f"""{{"op": "subscribe", "channel": "ticker", "market": "{self.ftx_symbol}"}}""")) + ws.item.subscribe(on_next=_on_item) def subscribe(observer, scheduler_=None): subject.subscribe(observer, scheduler_) diff --git a/mango/reconnectingwebsocket.py b/mango/reconnectingwebsocket.py index 4b9b815..9e3d290 100644 --- a/mango/reconnectingwebsocket.py +++ b/mango/reconnectingwebsocket.py @@ -14,7 +14,6 @@ # [Email](mailto:hello@blockworks.foundation) -from datetime import datetime import json import logging import rx @@ -23,6 +22,7 @@ import rx.core.typing import typing import websocket # type: ignore +from datetime import datetime from threading import Thread @@ -32,28 +32,30 @@ from threading import Thread # mechanism. If an error disconnects the websocket, it will automatically reconnect. It # will continue to automatically reconnect, until it is explicitly closed. # - - class ReconnectingWebsocket: - def __init__(self, url: str, on_open_call: typing.Callable[[websocket.WebSocketApp], None], on_item: typing.Callable[[typing.Any], typing.Any]): + def __init__(self, + url: str, + on_open_call: typing.Callable[[websocket.WebSocketApp], None]): self.logger: logging.Logger = logging.getLogger(self.__class__.__name__) self.url = url self.on_open_call = on_open_call - self._on_item = on_item self.reconnect_required: bool = True self.ping_interval: int = 0 - self.connecting: rx.subject.behaviorsubject.BehaviorSubject = rx.subject.behaviorsubject.BehaviorSubject( - datetime.now()) - self.ping: rx.subject.behaviorsubject.BehaviorSubject = rx.subject.behaviorsubject.BehaviorSubject( - datetime.now()) - self.pong: rx.subject.behaviorsubject.BehaviorSubject = rx.subject.behaviorsubject.BehaviorSubject( - datetime.now()) + self.connecting: rx.subject.BehaviorSubject = rx.subject.BehaviorSubject(datetime.now()) + self.disconnected: rx.subject.BehaviorSubject = rx.subject.BehaviorSubject(datetime.now()) + self.ping: rx.subject.BehaviorSubject = rx.subject.BehaviorSubject(datetime.now()) + self.pong: rx.subject.BehaviorSubject = rx.subject.BehaviorSubject(datetime.now()) + self.item: rx.subject.Subject = rx.subject.Subject() def close(self): self.logger.info(f"Closing WebSocket for {self.url}") self.reconnect_required = False self._ws.close() + def force_reconnect(self): + self.logger.info(f"Forcing a reconnect on WebSocket for {self.url}") + self._ws.close() + def _on_open(self, ws: websocket.WebSocketApp): self.logger.info(f"Opening WebSocket for {self.url}") if self.on_open_call: @@ -61,7 +63,7 @@ class ReconnectingWebsocket: def _on_message(self, _, message): data = json.loads(message) - self._on_item(data) + self.item.on_next(data) def _on_error(self, *args): self.logger.warning(f"WebSocket for {self.url} has error {args}") @@ -92,3 +94,4 @@ class ReconnectingWebsocket: on_pong=self._on_pong ) self._ws.run_forever(ping_interval=self.ping_interval) + self.disconnected.on_next(datetime.now()) diff --git a/mango/websocketsubscription.py b/mango/websocketsubscription.py index 78c54ac..efafa39 100644 --- a/mango/websocketsubscription.py +++ b/mango/websocketsubscription.py @@ -197,6 +197,9 @@ class WebSocketSubscriptionManager(Disposable): for subscription in self.subscriptions: ws.send(subscription.build_request()) + def on_disconnected(self, ws: websocket.WebSocketApp): + self.subscriptions = [] + def dispose(self): for subscription in self.subscriptions: subscription.dispose()