mango-explorer/bin/watch-minimum-balances

130 lines
5.2 KiB
Plaintext
Raw Normal View History

2021-07-27 09:07:39 -07:00
#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import rx.operators
import rx.scheduler
import sys
import threading
import typing
from decimal import Decimal
from solana.publickey import PublicKey
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
import mango.layouts # nopep8
import mango.marketmaking.fixedratiosdesiredordersbuilder # nopep8
import mango.marketmaking.marketmaker # nopep8
import mango.marketmaking.modelstate # nopep8
import mango.marketmaking.toleranceorderreconciler # nopep8
parser = argparse.ArgumentParser(description="Shows the on-chain data of a particular account.")
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--named-address", type=str, required=True, action="append", default=[],
help="Name and address of the Solana account to watch, separated by a colon")
parser.add_argument("--minimum-sol-balance", type=Decimal, default=Decimal("0.1"),
help="the minimum SOL balance required for the alert. A SOL balance less than this value will trigger a nifitication.")
parser.add_argument("--timer-limit", type=int, default=(60 * 60),
help="notifications for an account will be sent at most once per timer-limit seconds, and accounts will be polled once per timer-limit seconds irrespective of websocket activity")
2021-07-27 09:07:39 -07:00
parser.add_argument("--notify", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for low balance events")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
def send_notification(message: str) -> None:
for notify in args.notify:
notify.send(f"[watch-minimum-balances] {message}")
def sols_from_lamports(lamports: Decimal) -> Decimal:
return lamports / mango.SOL_DECIMAL_DIVISOR
2021-07-27 09:07:39 -07:00
def notifier(name: str) -> typing.Callable[[mango.AccountInfo], None]:
def notify(account_info: mango.AccountInfo) -> None:
account_sols = sols_from_lamports(account_info.lamports)
2021-07-27 09:07:39 -07:00
report = f"Account \"{name} [{account_info.address}]\" on {context.cluster} has only {account_sols} SOL, which is below the minimum required balance of {args.minimum_sol_balance} SOL."
send_notification(report)
2021-07-27 09:07:39 -07:00
print(f"Notification sent: {report}")
return notify
def account_fails_balance_check(account_info: mango.AccountInfo) -> bool:
return sols_from_lamports(account_info.lamports) < args.minimum_sol_balance
def log_account(account_info: mango.AccountInfo) -> mango.AccountInfo:
account_sols = sols_from_lamports(account_info.lamports)
logging.info(f"Checking account {account_info.address} - {account_sols} SOL.")
return account_info
2021-07-27 09:07:39 -07:00
def add_subscription_for_parameter(context: mango.Context, manager: mango.WebSocketSubscriptionManager, timer_limit: int, name_and_address: str):
2021-07-27 09:07:39 -07:00
name, address_str = name_and_address.split(":")
address = PublicKey(address_str)
immediate = mango.AccountInfo.load(context, address)
if immediate is None:
raise Exception(f"No account '{name}' at {address_str}.")
account_subscription = mango.WebSocketAccountSubscription(context, address, lambda account_info: account_info)
manager.add(account_subscription)
on_change = account_subscription.publisher.pipe(rx.operators.start_with(immediate))
on_timer = rx.interval(timer_limit).pipe(
rx.operators.map(lambda _: mango.AccountInfo.load(context, address)))
rx.merge(on_change, on_timer).pipe(
rx.operators.observe_on(context.pool_scheduler),
rx.operators.map(log_account),
2021-07-27 09:07:39 -07:00
rx.operators.filter(account_fails_balance_check),
rx.operators.throttle_first(20),
2021-07-27 09:07:39 -07:00
rx.operators.catch(mango.observable_pipeline_error_reporter),
rx.operators.retry()
).subscribe(notifier(name))
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
for name_and_address in args.named_address:
add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address)
2021-07-27 09:07:39 -07:00
websocket_url = context.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws.ping_interval = 10
ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong"))
disposer.add_disposable(ws_pong_disposable)
ws_pong_log_disposable = ws.pong.subscribe(on_next=lambda _: logging.info("Pong"))
disposer.add_disposable(ws_pong_log_disposable)
ws_connecting_log_disposable = ws.connecting.subscribe(on_next=lambda _: send_notification("Re-connecting websocket"))
disposer.add_disposable(ws_connecting_log_disposable)
2021-07-27 09:07:39 -07:00
ws.open()
send_notification(f"Starting using context:\n{context}")
2021-07-27 09:07:39 -07:00
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
try:
waiter.wait()
except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
logging.info("Shutdown complete.")