Websocket now has a force_reconnect() method, plus subscribable disconnect event.

This commit is contained in:
Geoff Taylor 2021-08-09 13:24:33 +01:00
parent b297db217c
commit 9bd75dc8d2
9 changed files with 32 additions and 20 deletions

View File

@ -113,7 +113,8 @@ def hedge(event: mango.PerpFillEvent) -> None:
publisher.subscribe(on_next=hedge)
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong"))
ws.open()

View File

@ -35,7 +35,8 @@ publisher = log_subscription.publisher
publisher.subscribe(mango.PrintingObserverSubscriber(False))
websocket_url = context.client.cluster_url.replace("https", "wss", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws.open()

View File

@ -254,7 +254,8 @@ else:
raise Exception(f"Could not determine type of market {market.symbol}")
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws.open()

View File

@ -70,7 +70,8 @@ else:
publisher.subscribe(mango.PrintingObserverSubscriber(False))
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws.open()

View File

@ -62,7 +62,8 @@ publisher.pipe(
).subscribe(mango.PrintingObserverSubscriber(False))
websocket_url = context.client.cluster_url.replace("https", "wss", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong"))

View File

@ -110,7 +110,8 @@ for name_and_address in args.named_address:
add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address)
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong"))

View File

@ -89,8 +89,8 @@ class FtxOracle(Oracle):
ws: ReconnectingWebsocket = ReconnectingWebsocket("wss://ftx.com/ws/",
lambda ws: ws.send(
f"""{{"op": "subscribe", "channel": "ticker", "market": "{self.ftx_symbol}"}}"""),
_on_item)
f"""{{"op": "subscribe", "channel": "ticker", "market": "{self.ftx_symbol}"}}"""))
ws.item.subscribe(on_next=_on_item)
def subscribe(observer, scheduler_=None):
subject.subscribe(observer, scheduler_)

View File

@ -14,7 +14,6 @@
# [Email](mailto:hello@blockworks.foundation)
from datetime import datetime
import json
import logging
import rx
@ -23,6 +22,7 @@ import rx.core.typing
import typing
import websocket # type: ignore
from datetime import datetime
from threading import Thread
@ -32,28 +32,30 @@ from threading import Thread
# mechanism. If an error disconnects the websocket, it will automatically reconnect. It
# will continue to automatically reconnect, until it is explicitly closed.
#
class ReconnectingWebsocket:
def __init__(self, url: str, on_open_call: typing.Callable[[websocket.WebSocketApp], None], on_item: typing.Callable[[typing.Any], typing.Any]):
def __init__(self,
url: str,
on_open_call: typing.Callable[[websocket.WebSocketApp], None]):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.url = url
self.on_open_call = on_open_call
self._on_item = on_item
self.reconnect_required: bool = True
self.ping_interval: int = 0
self.connecting: rx.subject.behaviorsubject.BehaviorSubject = rx.subject.behaviorsubject.BehaviorSubject(
datetime.now())
self.ping: rx.subject.behaviorsubject.BehaviorSubject = rx.subject.behaviorsubject.BehaviorSubject(
datetime.now())
self.pong: rx.subject.behaviorsubject.BehaviorSubject = rx.subject.behaviorsubject.BehaviorSubject(
datetime.now())
self.connecting: rx.subject.BehaviorSubject = rx.subject.BehaviorSubject(datetime.now())
self.disconnected: rx.subject.BehaviorSubject = rx.subject.BehaviorSubject(datetime.now())
self.ping: rx.subject.BehaviorSubject = rx.subject.BehaviorSubject(datetime.now())
self.pong: rx.subject.BehaviorSubject = rx.subject.BehaviorSubject(datetime.now())
self.item: rx.subject.Subject = rx.subject.Subject()
def close(self):
self.logger.info(f"Closing WebSocket for {self.url}")
self.reconnect_required = False
self._ws.close()
def force_reconnect(self):
self.logger.info(f"Forcing a reconnect on WebSocket for {self.url}")
self._ws.close()
def _on_open(self, ws: websocket.WebSocketApp):
self.logger.info(f"Opening WebSocket for {self.url}")
if self.on_open_call:
@ -61,7 +63,7 @@ class ReconnectingWebsocket:
def _on_message(self, _, message):
data = json.loads(message)
self._on_item(data)
self.item.on_next(data)
def _on_error(self, *args):
self.logger.warning(f"WebSocket for {self.url} has error {args}")
@ -92,3 +94,4 @@ class ReconnectingWebsocket:
on_pong=self._on_pong
)
self._ws.run_forever(ping_interval=self.ping_interval)
self.disconnected.on_next(datetime.now())

View File

@ -197,6 +197,9 @@ class WebSocketSubscriptionManager(Disposable):
for subscription in self.subscriptions:
ws.send(subscription.build_request())
def on_disconnected(self, ws: websocket.WebSocketApp):
self.subscriptions = []
def dispose(self):
for subscription in self.subscriptions:
subscription.dispose()