mango-explorer/bin/watch-address

71 lines
1.9 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import logging
import os
import os.path
import rx
import rx.operators
import sys
import threading
from solana.publickey import PublicKey
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
parser = argparse.ArgumentParser(
description="Shows the on-chain data of a particular account."
)
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument(
"--address",
type=PublicKey,
required=True,
help="Address of the Solana account to watch",
)
parser.add_argument(
"--account-type",
type=str,
required=True,
help="Underlying object type of the data in the AccountInfo",
)
args: argparse.Namespace = mango.parse_args(parser)
with mango.ContextBuilder.from_command_line_parameters(
args
) as context, mango.Disposable() as disposer:
manager = mango.IndividualWebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
converter = mango.build_account_info_converter(context, args.account_type)
converting_subscription = mango.WebSocketAccountSubscription(
context, args.address, converter
)
manager.add(converting_subscription)
publisher = converting_subscription.publisher.pipe(
rx.operators.filter(lambda item: isinstance(item, list) and len(item) > 0),
rx.operators.flat_map(
lambda item: rx.from_iterable(item)
if isinstance(item, list)
else rx.from_iterable([item])
),
)
subscription = publisher.subscribe(mango.PrintingObserverSubscriber(False))
disposer.add_disposable(subscription)
manager.open()
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
try:
waiter.wait()
except:
pass
logging.info("Shutting down...")
logging.info("Shutdown complete.")