From c0f2a57938d5d33aa229284029e579af55ff0df1 Mon Sep 17 00:00:00 2001 From: Geoff Taylor Date: Wed, 28 Jul 2021 19:07:57 +0100 Subject: [PATCH] Fixed error notification problem in watch-minimum-balances. --- bin/watch-minimum-balances | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/bin/watch-minimum-balances b/bin/watch-minimum-balances index da85319..5f16a0c 100755 --- a/bin/watch-minimum-balances +++ b/bin/watch-minimum-balances @@ -64,13 +64,7 @@ def log_account(account_info: mango.AccountInfo) -> mango.AccountInfo: return account_info -context = mango.ContextBuilder.from_command_line_parameters(args) - -disposer = mango.DisposePropagator() -manager = mango.WebSocketSubscriptionManager() -disposer.add_disposable(manager) - -for name_and_address in args.named_address: +def add_subscription_for_parameter(context: mango.Context, manager: mango.WebSocketSubscriptionManager, timer_limit: int, name_and_address: str): name, address_str = name_and_address.split(":") address = PublicKey(address_str) @@ -82,17 +76,27 @@ for name_and_address in args.named_address: manager.add(account_subscription) on_change = account_subscription.publisher.pipe(rx.operators.start_with(immediate)) - on_timer = rx.interval(args.timer_limit).pipe( + on_timer = rx.interval(timer_limit).pipe( rx.operators.map(lambda _: mango.AccountInfo.load(context, address))) rx.merge(on_change, on_timer).pipe( rx.operators.observe_on(context.pool_scheduler), rx.operators.map(log_account), rx.operators.filter(account_fails_balance_check), - rx.operators.throttle_first(args.timer_limit), + rx.operators.throttle_first(timer_limit), rx.operators.catch(mango.observable_pipeline_error_reporter), rx.operators.retry() ).subscribe(notifier(name)) + +context = mango.ContextBuilder.from_command_line_parameters(args) + +disposer = mango.DisposePropagator() +manager = mango.WebSocketSubscriptionManager() +disposer.add_disposable(manager) + +for name_and_address in args.named_address: + add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address) + websocket_url = context.cluster_url.replace("https", "ws", 1) ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item) ws.ping_interval = 10