Refactored notifications.

This commit is contained in:
Geoff Taylor 2021-10-28 12:19:18 +01:00
parent feb04c7990
commit 27c5e676f3
13 changed files with 186 additions and 140 deletions

View File

@ -21,22 +21,21 @@ mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser) mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--address", type=PublicKey, parser.add_argument("--address", type=PublicKey,
help="Solana address of the Mango Markets margin account to be liquidated") help="Solana address of the Mango Markets margin account to be liquidated")
parser.add_argument("--notify-liquidations", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-liquidations", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for liquidation events") help="The notification target for liquidation events")
parser.add_argument("--notify-successful-liquidations", type=mango.parse_subscription_target, parser.add_argument("--notify-successful-liquidations", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for successful liquidation events") action="append", default=[], help="The notification target for successful liquidation events")
parser.add_argument("--notify-failed-liquidations", type=mango.parse_subscription_target, parser.add_argument("--notify-failed-liquidations", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for failed liquidation events") action="append", default=[], help="The notification target for failed liquidation events")
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-errors", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for error events") help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False, parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions") help="runs as read-only and does not perform any transactions")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)
for notify in args.notify_errors: handler = mango.NotificationHandler(mango.CompoundNotificationTarget(args.notify_errors))
handler = mango.NotificationHandler(notify) handler.setLevel(logging.ERROR)
handler.setLevel(logging.ERROR) logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(handler)
try: try:
context = mango.ContextBuilder.from_command_line_parameters(args) context = mango.ContextBuilder.from_command_line_parameters(args)
@ -59,16 +58,17 @@ try:
logging.info("Wallet accounts OK.") logging.info("Wallet accounts OK.")
liquidations_publisher = mango.EventSource[mango.LiquidationEvent]() liquidations_publisher = mango.EventSource[mango.LiquidationEvent]()
for notification_target in args.notify_liquidations: liquidations_publisher.subscribe(on_next=mango.CompoundNotificationTarget(args.notify_liquidations).send)
liquidations_publisher.subscribe(on_next=notification_target.send)
for notification_target in args.notify_successful_liquidations: on_success = mango.FilteringNotificationTarget(
filtering = mango.FilteringNotificationTarget( mango.CompoundNotificationTarget(args.notify_successful_liquidations),
notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded) lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded)
liquidations_publisher.subscribe(on_next=filtering.send) liquidations_publisher.subscribe(on_next=on_success.send)
for notification_target in args.notify_failed_liquidations:
filtering = mango.FilteringNotificationTarget(notification_target, lambda item: isinstance( on_failed = mango.FilteringNotificationTarget(
item, mango.LiquidationEvent) and not item.succeeded) mango.CompoundNotificationTarget(args.notify_failed_liquidations),
liquidations_publisher.subscribe(on_next=filtering.send) lambda item: isinstance(item, mango.LiquidationEvent) and not item.succeeded)
liquidations_publisher.subscribe(on_next=on_failed.send)
# TODO: Add proper liquidator classes here when they're written for V3 # TODO: Add proper liquidator classes here when they're written for V3
if args.dry_run: if args.dry_run:

View File

@ -34,22 +34,21 @@ parser.add_argument("--worthwhile-threshold", type=Decimal, default=Decimal("0.0
help="value a liquidation must be above to be carried out") help="value a liquidation must be above to be carried out")
parser.add_argument("--adjustment-factor", type=Decimal, default=Decimal("0.05"), parser.add_argument("--adjustment-factor", type=Decimal, default=Decimal("0.05"),
help="factor by which to adjust the SELL price (akin to maximum slippage)") help="factor by which to adjust the SELL price (akin to maximum slippage)")
parser.add_argument("--notify-liquidations", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-liquidations", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for liquidation events") help="The notification target for liquidation events")
parser.add_argument("--notify-successful-liquidations", type=mango.parse_subscription_target, parser.add_argument("--notify-successful-liquidations", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for successful liquidation events") action="append", default=[], help="The notification target for successful liquidation events")
parser.add_argument("--notify-failed-liquidations", type=mango.parse_subscription_target, parser.add_argument("--notify-failed-liquidations", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for failed liquidation events") action="append", default=[], help="The notification target for failed liquidation events")
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-errors", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for error events") help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False, parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions") help="runs as read-only and does not perform any transactions")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)
for notify in args.notify_errors: handler = mango.NotificationHandler(mango.CompoundNotificationTarget(args.notify_errors))
handler = mango.NotificationHandler(notify) handler.setLevel(logging.ERROR)
handler.setLevel(logging.ERROR) logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(handler)
def start_subscriptions(context: mango.Context, liquidation_processor: mango.LiquidationProcessor, fetch_prices: typing.Callable[[typing.Any], typing.Any], fetch_accounts: typing.Callable[[typing.Any], typing.Any], throttle_reload_to_seconds: Decimal, throttle_ripe_update_to_seconds: Decimal): def start_subscriptions(context: mango.Context, liquidation_processor: mango.LiquidationProcessor, fetch_prices: typing.Callable[[typing.Any], typing.Any], fetch_accounts: typing.Callable[[typing.Any], typing.Any], throttle_reload_to_seconds: Decimal, throttle_ripe_update_to_seconds: Decimal):
@ -101,18 +100,17 @@ try:
logging.info("Wallet accounts OK.") logging.info("Wallet accounts OK.")
liquidations_publisher = mango.EventSource[mango.LiquidationEvent]() liquidations_publisher = mango.EventSource[mango.LiquidationEvent]()
for notification_target in args.notify_liquidations: liquidations_publisher.subscribe(on_next=mango.CompoundNotificationTarget(args.notify_liquidations).send)
liquidations_publisher.subscribe(on_next=notification_target.send)
for successful_notification_target in args.notify_successful_liquidations: on_success = mango.FilteringNotificationTarget(
captured_successful_notification_target = successful_notification_target mango.CompoundNotificationTarget(args.notify_successful_liquidations),
filtering_successful = mango.FilteringNotificationTarget( lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded)
captured_successful_notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded) liquidations_publisher.subscribe(on_next=on_success.send)
liquidations_publisher.subscribe(on_next=filtering_successful.send)
for failed_notification_target in args.notify_failed_liquidations: on_failed = mango.FilteringNotificationTarget(
captured_failed_notification_target = failed_notification_target mango.CompoundNotificationTarget(args.notify_failed_liquidations),
filtering_failed = mango.FilteringNotificationTarget( lambda item: isinstance(item, mango.LiquidationEvent) and not item.succeeded)
captured_failed_notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and not item.succeeded) liquidations_publisher.subscribe(on_next=on_failed.send)
liquidations_publisher.subscribe(on_next=filtering_failed.send)
# TODO: Add proper liquidator classes here when they're written for V3 # TODO: Add proper liquidator classes here when they're written for V3
if args.dry_run: if args.dry_run:

View File

@ -17,22 +17,21 @@ import mango # nopep8
parser = argparse.ArgumentParser(description="Run a single pass of the liquidator for a Mango Markets group.") parser = argparse.ArgumentParser(description="Run a single pass of the liquidator for a Mango Markets group.")
mango.ContextBuilder.add_command_line_parameters(parser) mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser) mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--notify-liquidations", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-liquidations", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for liquidation events") help="The notification target for liquidation events")
parser.add_argument("--notify-successful-liquidations", type=mango.parse_subscription_target, parser.add_argument("--notify-successful-liquidations", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for successful liquidation events") action="append", default=[], help="The notification target for successful liquidation events")
parser.add_argument("--notify-failed-liquidations", type=mango.parse_subscription_target, parser.add_argument("--notify-failed-liquidations", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for failed liquidation events") action="append", default=[], help="The notification target for failed liquidation events")
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-errors", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for error events") help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False, parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions") help="runs as read-only and does not perform any transactions")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)
for notify in args.notify_errors: handler = mango.NotificationHandler(mango.CompoundNotificationTarget(args.notify_errors))
handler = mango.NotificationHandler(notify) handler.setLevel(logging.ERROR)
handler.setLevel(logging.ERROR) logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(handler)
try: try:
context = mango.ContextBuilder.from_command_line_parameters(args) context = mango.ContextBuilder.from_command_line_parameters(args)
@ -54,16 +53,17 @@ try:
logging.info("Wallet accounts OK.") logging.info("Wallet accounts OK.")
liquidations_publisher = mango.EventSource[mango.LiquidationEvent]() liquidations_publisher = mango.EventSource[mango.LiquidationEvent]()
for notification_target in args.notify_liquidations: liquidations_publisher.subscribe(on_next=mango.CompoundNotificationTarget(args.notify_liquidations).send)
liquidations_publisher.subscribe(on_next=notification_target.send)
for notification_target in args.notify_successful_liquidations: on_success = mango.FilteringNotificationTarget(
filtering = mango.FilteringNotificationTarget( mango.CompoundNotificationTarget(args.notify_successful_liquidations),
notification_target, lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded) lambda item: isinstance(item, mango.LiquidationEvent) and item.succeeded)
liquidations_publisher.subscribe(on_next=filtering.send) liquidations_publisher.subscribe(on_next=on_success.send)
for notification_target in args.notify_failed_liquidations:
filtering = mango.FilteringNotificationTarget(notification_target, lambda item: isinstance( on_failed = mango.FilteringNotificationTarget(
item, mango.LiquidationEvent) and not item.succeeded) mango.CompoundNotificationTarget(args.notify_failed_liquidations),
liquidations_publisher.subscribe(on_next=filtering.send) lambda item: isinstance(item, mango.LiquidationEvent) and not item.succeeded)
liquidations_publisher.subscribe(on_next=on_failed.send)
# TODO: Add proper liquidator classes here when they're written for V3 # TODO: Add proper liquidator classes here when they're written for V3
if args.dry_run: if args.dry_run:

View File

@ -48,16 +48,15 @@ parser.add_argument("--hedging-target-balance", type=mango.parse_fixed_target_ba
help="hedged balance to maintain - format is a token symbol plus target value, separated by a colon (e.g. 'ETH:2.5')") help="hedged balance to maintain - format is a token symbol plus target value, separated by a colon (e.g. 'ETH:2.5')")
parser.add_argument("--account-address", type=PublicKey, parser.add_argument("--account-address", type=PublicKey,
help="address of the specific account to use, if more than one available") help="address of the specific account to use, if more than one available")
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-errors", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for error events") help="The notification target for error events")
parser.add_argument("--dry-run", action="store_true", default=False, parser.add_argument("--dry-run", action="store_true", default=False,
help="runs as read-only and does not perform any transactions") help="runs as read-only and does not perform any transactions")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)
for notify in args.notify_errors: handler = mango.NotificationHandler(mango.CompoundNotificationTarget(args.notify_errors))
handler = mango.NotificationHandler(notify) handler.setLevel(logging.ERROR)
handler.setLevel(logging.ERROR) logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(handler)
def cleanup(context: mango.Context, wallet: mango.Wallet, account: mango.Account, market: mango.Market, dry_run: bool): def cleanup(context: mango.Context, wallet: mango.Wallet, account: mango.Account, market: mango.Market, dry_run: bool):

View File

@ -19,7 +19,7 @@ parser.add_argument("--address", type=PublicKey, required=True,
help="address of the account") help="address of the account")
parser.add_argument("--minimum-sol-balance", type=Decimal, default=Decimal("0.1"), parser.add_argument("--minimum-sol-balance", type=Decimal, default=Decimal("0.1"),
help="the minimum SOL balance required for the alert. A SOL balance less than this value will trigger a nifitication.") help="the minimum SOL balance required for the alert. A SOL balance less than this value will trigger a nifitication.")
parser.add_argument("--notify", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for low balance events") help="The notification target for low balance events")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)
@ -30,7 +30,7 @@ if account_info is None:
raise Exception(f"No account at '{args.address}'") raise Exception(f"No account at '{args.address}'")
else: else:
if account_info.sols < args.minimum_sol_balance: if account_info.sols < args.minimum_sol_balance:
notify: mango.NotificationTarget = mango.CompoundNotificationTarget(args.notify)
report = f"Account \"{args.name} [{args.address}]\" on {context.client.cluster_name} has only {account_info.sols} SOL, which is below the minimum required balance of {args.minimum_sol_balance} SOL." report = f"Account \"{args.name} [{args.address}]\" on {context.client.cluster_name} has only {account_info.sols} SOL, which is below the minimum required balance of {args.minimum_sol_balance} SOL."
for notify in args.notify:
notify.send(report) notify.send(report)
print(f"Notification sent: {report}") print(f"Notification sent: {report}")

View File

@ -5,8 +5,9 @@ import itertools
import logging import logging
import os import os
import os.path import os.path
import rx.operators as ops
import rx import rx
import rx.subject
import rx.operators as ops
import sys import sys
import traceback import traceback
import typing import typing
@ -29,22 +30,21 @@ parser.add_argument("--instruction-type", type=lambda ins: mango.InstructionType
help="The signature of the transaction to look up") help="The signature of the transaction to look up")
parser.add_argument("--sender", type=PublicKey, parser.add_argument("--sender", type=PublicKey,
help="Only transactions sent by this PublicKey will be returned") help="Only transactions sent by this PublicKey will be returned")
parser.add_argument("--notify-transactions", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-transactions", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for transaction information") help="The notification target for transaction information")
parser.add_argument("--notify-successful-transactions", type=mango.parse_subscription_target, parser.add_argument("--notify-successful-transactions", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for successful transactions") action="append", default=[], help="The notification target for successful transactions")
parser.add_argument("--notify-failed-transactions", type=mango.parse_subscription_target, parser.add_argument("--notify-failed-transactions", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for failed transactions") action="append", default=[], help="The notification target for failed transactions")
parser.add_argument("--notify-errors", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify-errors", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for errors") help="The notification target for errors")
parser.add_argument("--summarise", action="store_true", default=False, parser.add_argument("--summarise", action="store_true", default=False,
help="create a short summary rather than the full TransactionScout details") help="create a short summary rather than the full TransactionScout details")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)
for notify in args.notify_errors: handler = mango.NotificationHandler(mango.CompoundNotificationTarget(args.notify_errors))
handler = mango.NotificationHandler(notify) handler.setLevel(logging.ERROR)
handler.setLevel(logging.ERROR) logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(handler)
def summariser(context: mango.Context) -> typing.Callable[[mango.TransactionScout], str]: def summariser(context: mango.Context) -> typing.Callable[[mango.TransactionScout], str]:
@ -102,7 +102,7 @@ try:
first_item_capturer = mango.CaptureFirstItem() first_item_capturer = mango.CaptureFirstItem()
signatures = mango.fetch_all_recent_transaction_signatures(context) signatures = mango.fetch_all_recent_transaction_signatures(context)
oldest_first = reversed(list(itertools.takewhile(lambda sig: sig != since_signature, signatures))) oldest_first = reversed(list(itertools.takewhile(lambda sig: sig != since_signature, signatures)))
pipeline = rx.from_(oldest_first).pipe( pipeline: rx.core.Observable = rx.from_(oldest_first).pipe(
ops.map(first_item_capturer.capture_if_first), ops.map(first_item_capturer.capture_if_first),
# ops.map(debug_print_item("Signature:")), # ops.map(debug_print_item("Signature:")),
ops.map(lambda sig: mango.TransactionScout.load_if_available(context, sig)), ops.map(lambda sig: mango.TransactionScout.load_if_available(context, sig)),
@ -127,17 +127,17 @@ try:
fan_out = rx.subject.Subject() fan_out = rx.subject.Subject()
fan_out.subscribe(mango.PrintingObserverSubscriber(False)) fan_out.subscribe(mango.PrintingObserverSubscriber(False))
fan_out.subscribe(on_next=mango.CompoundNotificationTarget(args.notify_transactions).send)
for notify in args.notify_transactions: on_success = mango.FilteringNotificationTarget(
fan_out.subscribe(on_next=notify.send) mango.CompoundNotificationTarget(args.notify_successful_transactions),
for notification_target in args.notify_successful_transactions: lambda item: isinstance(item, mango.TransactionScout) and item.succeeded)
filtering = mango.FilteringNotificationTarget( fan_out.subscribe(on_next=on_success.send)
notification_target, lambda item: isinstance(item, mango.TransactionScout) and item.succeeded)
fan_out.subscribe(on_next=filtering.send) on_failed = mango.FilteringNotificationTarget(
for notification_target in args.notify_failed_transactions: mango.CompoundNotificationTarget(args.notify_failed_transactions),
filtering = mango.FilteringNotificationTarget(notification_target, lambda item: isinstance( lambda item: isinstance(item, mango.TransactionScout) and not item.succeeded)
item, mango.TransactionScout) and not item.succeeded) fan_out.subscribe(on_next=on_failed.send)
fan_out.subscribe(on_next=filtering.send)
pipeline.subscribe(fan_out) pipeline.subscribe(fan_out)

View File

@ -14,13 +14,13 @@ import mango # nopep8
# We explicitly want argument parsing to be outside the main try-except block because some arguments # We explicitly want argument parsing to be outside the main try-except block because some arguments
# (like --help) will cause an exit, which our except: block traps. # (like --help) will cause an exit, which our except: block traps.
parser = argparse.ArgumentParser(description="Sends SOL to a different address.") parser = argparse.ArgumentParser(description="Sends SOL to a different address.")
parser.add_argument("--notification-target", type=mango.parse_subscription_target, required=True, action="append", parser.add_argument("--notification-target", type=mango.parse_notification_target, required=True, action="append",
help="The notification target - a compound string that varies depending on the target") help="The notification target - a compound string that varies depending on the target")
parser.add_argument("--message", type=str, help="Message to send") parser.add_argument("--message", type=str, help="Message to send")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)
try: try:
for notify in args.notification_target: notify: mango.NotificationTarget = mango.CompoundNotificationTarget(args.notification_target)
print("Sending to:", notify) print("Sending to:", notify)
notify.send(args.message) notify.send(args.message)

View File

@ -22,11 +22,11 @@ mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser) mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--address", type=PublicKey, action="append", default=[], required=True, parser.add_argument("--address", type=PublicKey, action="append", default=[], required=True,
help="Address of the Solana account to watch (can be specified multiple times)") help="Address of the Solana account to watch (can be specified multiple times)")
parser.add_argument("--notify", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for all liquidation events") help="The notification target for all liquidation events")
parser.add_argument("--notify-successful", type=mango.parse_subscription_target, parser.add_argument("--notify-successful", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for successful liquidations") action="append", default=[], help="The notification target for successful liquidations")
parser.add_argument("--notify-failed", type=mango.parse_subscription_target, parser.add_argument("--notify-failed", type=mango.parse_notification_target,
action="append", default=[], help="The notification target for failed liquidations") action="append", default=[], help="The notification target for failed liquidations")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)

View File

@ -28,27 +28,20 @@ parser.add_argument("--minimum-sol-balance", type=Decimal, default=Decimal("0.1"
help="the minimum SOL balance required for the alert. A SOL balance less than this value will trigger a nofitication.") help="the minimum SOL balance required for the alert. A SOL balance less than this value will trigger a nofitication.")
parser.add_argument("--timer-limit", type=int, default=(60 * 60), parser.add_argument("--timer-limit", type=int, default=(60 * 60),
help="notifications for an account will be sent at most once per timer-limit seconds, and accounts will be polled once per timer-limit seconds irrespective of websocket activity") help="notifications for an account will be sent at most once per timer-limit seconds, and accounts will be polled once per timer-limit seconds irrespective of websocket activity")
parser.add_argument("--notify", type=mango.parse_subscription_target, action="append", default=[], parser.add_argument("--notify", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for low balance events")
parser.add_argument("--notify-events", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for low balance events") help="The notification target for low balance events")
parser.add_argument("--notify-events", type=mango.parse_notification_target, action="append", default=[],
help="The notification target for startup events")
args: argparse.Namespace = mango.parse_args(parser) args: argparse.Namespace = mango.parse_args(parser)
notify_balance: mango.NotificationTarget = mango.CompoundNotificationTarget(args.notify)
def send_balance_notification(message: str) -> None: notify_event: mango.NotificationTarget = mango.CompoundNotificationTarget(args.notify_events)
for notify in args.notify:
notify.send(f"[{args.name}] {message}")
def send_event_notification(message: str) -> None:
for notify in args.notify_events:
notify.send(f"[{args.name}] {message}")
def notifier(name: str) -> typing.Callable[[mango.AccountInfo], None]: def notifier(name: str) -> typing.Callable[[mango.AccountInfo], None]:
def notify(account_info: mango.AccountInfo) -> None: def notify(account_info: mango.AccountInfo) -> None:
report = f"Account \"{name} [{account_info.address}]\" on {context.client.cluster_name} has only {account_info.sols} SOL, which is below the minimum required balance of {args.minimum_sol_balance} SOL." report = f"Account \"{name} [{account_info.address}]\" on {context.client.cluster_name} has only {account_info.sols} SOL, which is below the minimum required balance of {args.minimum_sol_balance} SOL."
send_balance_notification(report) notify_balance.send(f"[{args.name}] {report}")
print(f"Notification sent: {report}") print(f"Notification sent: {report}")
return notify return notify
@ -102,7 +95,7 @@ for name_and_address in args.named_address:
manager.open() manager.open()
send_event_notification(f"Starting using context:\n{context}") notify_event.send(f"[{args.name}] Starting using context:\n{context}")
# Wait - don't exit. Exiting will be handled by signals/interrupts. # Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event() waiter = threading.Event()

View File

@ -40,7 +40,7 @@ from .marketlookup import MarketLookup, NullMarketLookup, CompoundMarketLookup
from .marketoperations import MarketOperations, DryRunMarketOperations from .marketoperations import MarketOperations, DryRunMarketOperations
from .metadata import Metadata from .metadata import Metadata
from .modelstate import ModelState from .modelstate import ModelState
from .notification import NotificationTarget, TelegramNotificationTarget, DiscordNotificationTarget, MailjetNotificationTarget, CsvFileNotificationTarget, FilteringNotificationTarget, NotificationHandler, parse_subscription_target from .notification import NotificationTarget, TelegramNotificationTarget, DiscordNotificationTarget, MailjetNotificationTarget, CsvFileNotificationTarget, FilteringNotificationTarget, ConsoleNotificationTarget, CompoundNotificationTarget, parse_notification_target, NotificationHandler
from .observables import DisposePropagator, DisposeWrapper, NullObserverSubscriber, PrintingObserverSubscriber, TimestampedPrintingObserverSubscriber, CollectingObserverSubscriber, LatestItemObserverSubscriber, CaptureFirstItem, FunctionObserver, create_backpressure_skipping_observer, debug_print_item, log_subscription_error, observable_pipeline_error_reporter, EventSource from .observables import DisposePropagator, DisposeWrapper, NullObserverSubscriber, PrintingObserverSubscriber, TimestampedPrintingObserverSubscriber, CollectingObserverSubscriber, LatestItemObserverSubscriber, CaptureFirstItem, FunctionObserver, create_backpressure_skipping_observer, debug_print_item, log_subscription_error, observable_pipeline_error_reporter, EventSource
from .openorders import OpenOrders from .openorders import OpenOrders
from .oracle import OracleSource, Price, Oracle, OracleProvider, SupportedOracleFeature from .oracle import OracleSource, Price, Oracle, OracleProvider, SupportedOracleFeature

View File

@ -53,6 +53,9 @@ class NotificationTarget(metaclass=abc.ABCMeta):
def send_notification(self, item: typing.Any) -> None: def send_notification(self, item: typing.Any) -> None:
raise NotImplementedError("NotificationTarget.send() is not implemented on the base type.") raise NotImplementedError("NotificationTarget.send() is not implemented on the base type.")
def __str__(self) -> str:
return "« 𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝚃𝚊𝚛𝚐𝚎𝚝 »"
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self}" return f"{self}"
@ -89,7 +92,7 @@ class TelegramNotificationTarget(NotificationTarget):
requests.post(url, json=payload, headers=headers) requests.post(url, json=payload, headers=headers)
def __str__(self) -> str: def __str__(self) -> str:
return f"Telegram chat ID: {self.chat_id}" return f"« 𝚃𝚎𝚕𝚎𝚐𝚛𝚊𝚖𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝚃𝚊𝚛𝚐𝚎𝚝 Chat ID: {self.chat_id} »"
# # 🥭 DiscordNotificationTarget class # # 🥭 DiscordNotificationTarget class
@ -110,7 +113,7 @@ class DiscordNotificationTarget(NotificationTarget):
requests.post(url, json=payload, headers=headers) requests.post(url, json=payload, headers=headers)
def __str__(self) -> str: def __str__(self) -> str:
return "Discord webhook" return f"« 𝙳𝚒𝚜𝚌𝚘𝚛𝚍𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝚃𝚊𝚛𝚐𝚎𝚝 Address: {self.address} »"
# # 🥭 MailjetNotificationTarget class # # 🥭 MailjetNotificationTarget class
@ -201,7 +204,7 @@ class MailjetNotificationTarget(NotificationTarget):
requests.post(url, json=payload, headers=headers, auth=(self.api_key, self.api_secret)) requests.post(url, json=payload, headers=headers, auth=(self.api_key, self.api_secret))
def __str__(self) -> str: def __str__(self) -> str:
return f"Mailjet notifications to '{self.to_name}' '{self.to_address}' with subject '{self.subject}'" return f"« 𝙼𝚊𝚒𝚕𝚓𝚎𝚝𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝚃𝚊𝚛𝚐𝚎𝚝 To: '{self.to_name}' '{self.to_address}' with subject '{self.subject}' »"
# # 🥭 CsvFileNotificationTarget class # # 🥭 CsvFileNotificationTarget class
@ -233,14 +236,14 @@ class CsvFileNotificationTarget(NotificationTarget):
with open(self.filename, "a") as csvfile: with open(self.filename, "a") as csvfile:
result = "Succeeded" if event.succeeded else "Failed" result = "Succeeded" if event.succeeded else "Failed"
row_data = [event.timestamp, event.liquidator_name, event.group_name, result, row_data = [event.timestamp, event.liquidator_name, event.group_name, result,
event.signatures, event.wallet_address, event.account_address] " ".join(event.signatures), event.wallet_address, event.account_address]
for change in event.changes: for change in event.changes:
row_data += [f"{change.value:.8f}", change.token.name] row_data += [f"{change.value:.8f}", change.token.name]
file_writer = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL) file_writer = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
file_writer.writerow(row_data) file_writer.writerow(row_data)
def __str__(self) -> str: def __str__(self) -> str:
return f"CSV notifications to file {self.filename}" return f"« 𝙲𝚜𝚟𝙵𝚒𝚕𝚎𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝚃𝚊𝚛𝚐𝚎𝚝 File: {self.filename} »"
# # 🥭 FilteringNotificationTarget class # # 🥭 FilteringNotificationTarget class
@ -259,7 +262,80 @@ class FilteringNotificationTarget(NotificationTarget):
self.inner_notifier.send_notification(item) self.inner_notifier.send_notification(item)
def __str__(self) -> str: def __str__(self) -> str:
return f"Filtering notification target for '{self.inner_notifier}'" return f"« 𝙵𝚒𝚕𝚝𝚎𝚛𝚒𝚗𝚐𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝚃𝚊𝚛𝚐𝚎𝚝 For: {self.inner_notifier} »"
# # 🥭 ConsoleNotificationTarget class
#
# The `ConsoleNotificationTarget` prints messages on the console.
#
class ConsoleNotificationTarget(NotificationTarget):
def __init__(self, name):
super().__init__()
self.name = name
def send_notification(self, item: typing.Any) -> None:
print(self.name, item)
def __str__(self) -> str:
return "« 𝙲𝚘𝚗𝚜𝚘𝚕𝚎𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝚃𝚊𝚛𝚐𝚎𝚝 »"
# # 🥭 CompoundNotificationTarget class
#
# The `CompoundNotificationTarget` acts as a single `NotificationTarget`, sending notifications to all
# inner `NotificationTarget`s.
#
class CompoundNotificationTarget(NotificationTarget):
def __init__(self, targets: typing.Sequence[NotificationTarget]):
super().__init__()
self.targets: typing.Sequence[NotificationTarget] = targets
self.in_exception_handler: bool = False
def send_notification(self, item: typing.Any) -> None:
for target in self.targets:
try:
target.send(item)
except Exception as exception:
if not self.in_exception_handler:
self.in_exception_handler = True
self.logger.error(f"Failed to send notification to: {target} - {exception}")
finally:
self.in_exception_handler = False
def __str__(self) -> str:
inner: typing.List[str] = []
for target in self.targets:
inner += [f"{target}"]
inner_text: str = "\n ".join(inner)
return f"""« 𝙲𝚘𝚖𝚙𝚘𝚞𝚗𝚍𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝚃𝚊𝚛𝚐𝚎𝚝 with {len(self.targets)} inner targets:
{inner_text}
»"""
# # 🥭 parse_notification_target() function
#
# `parse_notification_target()` takes a parameter as a string and returns a notification
# target.
#
# This is most likely used when parsing command-line arguments - this function can be used
# in the `type` parameter of an `add_argument()` call.
#
def parse_notification_target(target):
protocol, destination = target.split(":", 1)
if protocol == "telegram":
return TelegramNotificationTarget(destination)
elif protocol == "discord":
return DiscordNotificationTarget(destination)
elif protocol == "mailjet":
return MailjetNotificationTarget(destination)
elif protocol == "csvfile":
return CsvFileNotificationTarget(destination)
elif protocol == "console":
return ConsoleNotificationTarget(destination)
else:
raise Exception(f"Unknown protocol: {protocol}")
# # 🥭 NotificationHandler class # # 🥭 NotificationHandler class
@ -280,25 +356,5 @@ class NotificationHandler(logging.StreamHandler):
message = self.format(record) message = self.format(record)
self.target.send_notification(message) self.target.send_notification(message)
def __str__(self) -> str:
# # 🥭 parse_subscription_target() function return "« 𝙽𝚘𝚝𝚒𝚏𝚒𝚌𝚊𝚝𝚒𝚘𝚗𝙷𝚊𝚗𝚍𝚕𝚎𝚛 »"
#
# `parse_subscription_target()` takes a parameter as a string and returns a notification
# target.
#
# This is most likely used when parsing command-line arguments - this function can be used
# in the `type` parameter of an `add_argument()` call.
#
def parse_subscription_target(target):
protocol, destination = target.split(":", 1)
if protocol == "telegram":
return TelegramNotificationTarget(destination)
elif protocol == "discord":
return DiscordNotificationTarget(destination)
elif protocol == "mailjet":
return MailjetNotificationTarget(destination)
elif protocol == "csvfile":
return CsvFileNotificationTarget(destination)
else:
raise Exception(f"Unknown protocol: {protocol}")

View File

@ -79,7 +79,7 @@ class LiquidityMiningInfo:
proportion_distributed = mngo_distributed.value / self.mngo_per_period.value proportion_distributed = mngo_distributed.value / self.mngo_per_period.value
estimated_duration_seconds = (elapsed_seconds / float(proportion_distributed)) estimated_duration_seconds = (elapsed_seconds / float(proportion_distributed))
estimated_duration = timedelta(seconds=int(estimated_duration_seconds)) estimated_duration = timedelta(seconds=int(estimated_duration_seconds))
estimated_remaining_seconds: float = estimated_duration_seconds - elapsed_seconds estimated_remaining_seconds = estimated_duration_seconds - elapsed_seconds
estimated_remaining = timedelta(seconds=int(estimated_remaining_seconds)) estimated_remaining = timedelta(seconds=int(estimated_remaining_seconds))
estimated_end = now + estimated_remaining estimated_end = now + estimated_remaining
return f"""« 𝙻𝚒𝚚𝚞𝚒𝚍𝚒𝚝𝚢𝙼𝚒𝚗𝚒𝚗𝚐𝙸𝚗𝚏𝚘 {self.version} return f"""« 𝙻𝚒𝚚𝚞𝚒𝚍𝚒𝚝𝚢𝙼𝚒𝚗𝚒𝚗𝚐𝙸𝚗𝚏𝚘 {self.version}

View File

@ -82,18 +82,18 @@ def test_filtering_notification_target():
assert(mock.send_notification_called) assert(mock.send_notification_called)
def test_parse_subscription_target(): def test_parse_notification_target():
telegram_target = mango.parse_subscription_target( telegram_target = mango.parse_notification_target(
"telegram:012345678@9876543210:ABCDEFGHijklmnop-qrstuvwxyzABCDEFGH") "telegram:012345678@9876543210:ABCDEFGHijklmnop-qrstuvwxyzABCDEFGH")
assert telegram_target is not None assert telegram_target is not None
discord_target = mango.parse_subscription_target( discord_target = mango.parse_notification_target(
"discord:https://discord.com/api/webhooks/012345678901234567/ABCDE_fghij-KLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN") "discord:https://discord.com/api/webhooks/012345678901234567/ABCDE_fghij-KLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN")
assert discord_target is not None assert discord_target is not None
mailjet_target = mango.parse_subscription_target( mailjet_target = mango.parse_notification_target(
"mailjet:user:secret:subject:from%20name:from@address:to%20name%20with%20colon%3A:to@address") "mailjet:user:secret:subject:from%20name:from@address:to%20name%20with%20colon%3A:to@address")
assert mailjet_target is not None assert mailjet_target is not None
csvfile_target = mango.parse_subscription_target("csvfile:filename.csv") csvfile_target = mango.parse_notification_target("csvfile:filename.csv")
assert csvfile_target is not None assert csvfile_target is not None