Fixed error notification problem in watch-minimum-balances.

This commit is contained in:
Geoff Taylor 2021-07-28 19:07:57 +01:00
parent 8af5bbb05a
commit c0f2a57938
1 changed files with 13 additions and 9 deletions

View File

@ -64,13 +64,7 @@ def log_account(account_info: mango.AccountInfo) -> mango.AccountInfo:
return account_info
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
for name_and_address in args.named_address:
def add_subscription_for_parameter(context: mango.Context, manager: mango.WebSocketSubscriptionManager, timer_limit: int, name_and_address: str):
name, address_str = name_and_address.split(":")
address = PublicKey(address_str)
@ -82,17 +76,27 @@ for name_and_address in args.named_address:
manager.add(account_subscription)
on_change = account_subscription.publisher.pipe(rx.operators.start_with(immediate))
on_timer = rx.interval(args.timer_limit).pipe(
on_timer = rx.interval(timer_limit).pipe(
rx.operators.map(lambda _: mango.AccountInfo.load(context, address)))
rx.merge(on_change, on_timer).pipe(
rx.operators.observe_on(context.pool_scheduler),
rx.operators.map(log_account),
rx.operators.filter(account_fails_balance_check),
rx.operators.throttle_first(args.timer_limit),
rx.operators.throttle_first(timer_limit),
rx.operators.catch(mango.observable_pipeline_error_reporter),
rx.operators.retry()
).subscribe(notifier(name))
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
for name_and_address in args.named_address:
add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address)
websocket_url = context.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws.ping_interval = 10