#!/usr/bin/env python3 import argparse import logging import os import os.path import rx import rx.operators import rx.scheduler import sys import threading import typing from decimal import Decimal from solana.publickey import PublicKey sys.path.insert(0, os.path.abspath( os.path.join(os.path.dirname(__file__), ".."))) import mango # nopep8 parser = argparse.ArgumentParser( description="Watches one or many accounts (via a websocket) and sends a notification if the SOL balance falls below the --minimum-sol-balance threshold.") mango.ContextBuilder.add_command_line_parameters(parser) mango.Wallet.add_command_line_parameters(parser) parser.add_argument("--named-address", type=str, required=True, action="append", default=[], help="Name and address of the Solana account to watch, separated by a colon") parser.add_argument("--minimum-sol-balance", type=Decimal, default=Decimal("0.1"), help="the minimum SOL balance required for the alert. A SOL balance less than this value will trigger a nofitication.") parser.add_argument("--timer-limit", type=int, default=(60 * 60), help="notifications for an account will be sent at most once per timer-limit seconds, and accounts will be polled once per timer-limit seconds irrespective of websocket activity") parser.add_argument("--notify", type=mango.parse_subscription_target, action="append", default=[], help="The notification target for low balance events") parser.add_argument("--notify-events", type=mango.parse_subscription_target, action="append", default=[], help="The notification target for low balance events") args: argparse.Namespace = mango.parse_args(parser) def send_balance_notification(message: str) -> None: for notify in args.notify: notify.send(f"[{args.name}] {message}") def send_event_notification(message: str) -> None: for notify in args.notify_events: notify.send(f"[{args.name}] {message}") def notifier(name: str) -> typing.Callable[[mango.AccountInfo], None]: def notify(account_info: mango.AccountInfo) -> None: report = f"Account \"{name} [{account_info.address}]\" on {context.client.cluster_name} has only {account_info.sols} SOL, which is below the minimum required balance of {args.minimum_sol_balance} SOL." send_balance_notification(report) print(f"Notification sent: {report}") return notify def account_fails_balance_check(account_info: mango.AccountInfo) -> bool: return account_info.sols < args.minimum_sol_balance def log_account(account_info: mango.AccountInfo) -> mango.AccountInfo: logging.info(f"Checking account {account_info.address} - {account_info.sols} SOL.") return account_info def add_subscription_for_parameter(context: mango.Context, manager: mango.WebSocketSubscriptionManager, health_check: mango.HealthCheck, timer_limit: int, name_and_address: str): name, address_str = name_and_address.split(":") address = PublicKey(address_str) immediate = mango.AccountInfo.load(context, address) if immediate is None: raise Exception(f"No account '{name}' at {address_str}.") account_subscription = mango.WebSocketAccountSubscription(context, address, lambda account_info: account_info) manager.add(account_subscription) on_change = account_subscription.publisher.pipe(rx.operators.start_with(immediate)) on_timer = rx.interval(timer_limit).pipe( rx.operators.map(lambda _: mango.AccountInfo.load(context, address))) rx.merge(on_change, on_timer).pipe( rx.operators.observe_on(context.create_thread_pool_scheduler()), rx.operators.map(log_account), rx.operators.filter(account_fails_balance_check), rx.operators.throttle_first(timer_limit), rx.operators.catch(mango.observable_pipeline_error_reporter), rx.operators.retry() ).subscribe(notifier(name)) context = mango.ContextBuilder.from_command_line_parameters(args) disposer = mango.DisposePropagator() manager = mango.SharedWebSocketSubscriptionManager(context) disposer.add_disposable(manager) health_check = mango.HealthCheck() disposer.add_disposable(health_check) # Need a nice way of ensuring pongs from all subscriptions health_check.add("ws_pong", manager.pong) for name_and_address in args.named_address: add_subscription_for_parameter(context, manager, health_check, args.timer_limit, name_and_address) manager.open() send_event_notification(f"Starting using context:\n{context}") # Wait - don't exit. Exiting will be handled by signals/interrupts. waiter = threading.Event() try: waiter.wait() except: pass logging.info("Shutting down...") disposer.dispose() logging.info("Shutdown complete.")