watch-minimum-balances now uses a single websocket for all accounts (again).

This commit is contained in:
Geoff Taylor 2021-08-31 10:55:06 +01:00
parent 9111294f2d
commit befb86bcfb
2 changed files with 22 additions and 13 deletions

View File

@ -65,7 +65,7 @@ def log_account(account_info: mango.AccountInfo) -> mango.AccountInfo:
return account_info
def add_subscription_for_parameter(context: mango.Context, manager: mango.IndividualWebSocketSubscriptionManager, health_check: mango.HealthCheck, timer_limit: int, name_and_address: str):
def add_subscription_for_parameter(context: mango.Context, manager: mango.WebSocketSubscriptionManager, health_check: mango.HealthCheck, timer_limit: int, name_and_address: str):
name, address_str = name_and_address.split(":")
address = PublicKey(address_str)
@ -76,9 +76,6 @@ def add_subscription_for_parameter(context: mango.Context, manager: mango.Indivi
account_subscription = mango.WebSocketAccountSubscription(context, address, lambda account_info: account_info)
manager.add(account_subscription)
# Need a nice way of ensuring pongs from all subscriptions
health_check.add("ws_pong", account_subscription.pong)
on_change = account_subscription.publisher.pipe(rx.operators.start_with(immediate))
on_timer = rx.interval(timer_limit).pipe(
rx.operators.map(lambda _: mango.AccountInfo.load(context, address)))
@ -95,11 +92,14 @@ def add_subscription_for_parameter(context: mango.Context, manager: mango.Indivi
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.IndividualWebSocketSubscriptionManager(context)
manager = mango.SharedWebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
# Need a nice way of ensuring pongs from all subscriptions
health_check.add("ws_pong", manager.pong)
for name_and_address in args.named_address:
add_subscription_for_parameter(context, manager, health_check, args.timer_limit, name_and_address)

View File

@ -51,7 +51,7 @@ class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance], m
self.publisher: EventSource[TSubscriptionInstance] = EventSource[TSubscriptionInstance]()
self.ws: typing.Optional[ReconnectingWebsocket] = None
self.pong: BehaviorSubject = BehaviorSubject(datetime.now())
self.pong_subscription: typing.Optional[Disposable] = None
self._pong_subscription: typing.Optional[Disposable] = None
@abc.abstractmethod
def build_request(self) -> str:
@ -64,13 +64,13 @@ class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance], m
ws.ping_interval = self.context.ping_interval
self.ws = ws
ws.open()
self.pong_subscription = ws.pong.subscribe(self.pong)
self._pong_subscription = ws.pong.subscribe(self.pong)
def close(self) -> None:
if self.ws is not None:
if self.pong_subscription is not None:
self.pong_subscription.dispose()
self.pong_subscription = None
if self._pong_subscription is not None:
self._pong_subscription.dispose()
self._pong_subscription = None
self.ws.close()
self.ws = None
@ -96,9 +96,9 @@ class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance], m
self.publisher.on_completed()
self.publisher.dispose()
if self.ws is not None:
if self.pong_subscription is not None:
self.pong_subscription.dispose()
self.pong_subscription = None
if self._pong_subscription is not None:
self._pong_subscription.dispose()
self._pong_subscription = None
self.ws.close()
self.ws = None
@ -246,6 +246,8 @@ class SharedWebSocketSubscriptionManager(WebSocketSubscriptionManager):
def __init__(self, context: Context, ping_interval: int = 10):
super().__init__(context, ping_interval)
self.ws: typing.Optional[ReconnectingWebsocket] = None
self.pong: BehaviorSubject = BehaviorSubject(datetime.now())
self._pong_subscription: typing.Optional[Disposable] = None
def open(self) -> None:
websocket_url = self.context.client.cluster_url.replace("https", "wss", 1)
@ -254,9 +256,13 @@ class SharedWebSocketSubscriptionManager(WebSocketSubscriptionManager):
ws.ping_interval = self.ping_interval
self.ws = ws
ws.open()
self._pong_subscription = ws.pong.subscribe(self.pong)
def close(self) -> None:
if self.ws is not None:
if self._pong_subscription is not None:
self._pong_subscription.dispose()
self._pong_subscription = None
self.ws.close()
self.ws = None
@ -295,5 +301,8 @@ class SharedWebSocketSubscriptionManager(WebSocketSubscriptionManager):
def dispose(self):
super().dispose()
if self.ws is not None:
if self._pong_subscription is not None:
self._pong_subscription.dispose()
self._pong_subscription = None
self.ws.close()
self.ws = None