mango-explorer/bin/watch-liquidations

78 lines
2.8 KiB
Plaintext
Executable File

#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import rx.subject
import rx.operators
import sys
import threading
from solana.publickey import PublicKey
from solana.rpc.commitment import Max
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
parser = argparse.ArgumentParser(description="Show program logs for an account, as they arrive.")
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--address", type=PublicKey, action="append", default=[], required=True,
help="Address of the Solana account to watch (can be specified multiple times)")
parser.add_argument("--notify", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for all liquidation events")
parser.add_argument("--notify-successful", type=mango.parse_subscription_target,
action="append", default=[], help="The notification target for successful liquidations")
parser.add_argument("--notify-failed", type=mango.parse_subscription_target,
action="append", default=[], help="The notification target for failed liquidations")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
context = mango.ContextBuilder.from_command_line_parameters(args)
context.client.commitment = Max
print(context)
disposer = mango.DisposePropagator()
manager = mango.IndividualWebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
publishers = []
for address in args.address:
log_subscription = mango.WebSocketLogSubscription(context, address)
manager.add(log_subscription)
publishers += [log_subscription.publisher]
publisher = rx.subject.Subject()
publisher.pipe(
rx.operators.merge(*publishers),
# rx.operators.filter(lambda log_event: "PartialLiquidate" in "\n".join(log_event.logs)),
# rx.operators.map(mango.debug_print_item("Transaction")),
# rx.operators.delay(30), # Wait for the transaction to be fully confirmed
# rx.operators.map(mango.debug_print_item("After Delay")),
rx.operators.map(lambda log_event: mango.TransactionScout.load(context, log_event.signatures[0])),
rx.operators.filter(lambda item: item is not None),
rx.operators.catch(mango.observable_pipeline_error_reporter),
rx.operators.retry()
).subscribe(mango.PrintingObserverSubscriber(False))
manager.open()
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
try:
waiter.wait()
except:
pass
logging.info("Shutting down...")
disposer.dispose()
logging.info("Shutdown complete.")