Some work on streaming logs and account/program events.

This commit is contained in:
Geoff Taylor 2021-07-26 12:19:13 +01:00
parent eb244a2167
commit 072252095a
14 changed files with 518 additions and 176 deletions

52
bin/log-subscribe Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import sys
import threading
from solana.publickey import PublicKey
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
parser = argparse.ArgumentParser(description="Show program logs for an account, as they arrive.")
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--address", type=PublicKey, required=True, help="Address of the Solana account to watch")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
log_subscription = mango.WebSocketLogSubscription(context, args.address)
manager.add(log_subscription)
publisher = log_subscription.publisher
publisher.subscribe(mango.PrintingObserverSubscriber(False))
websocket_url = context.cluster_url.replace("https", "wss", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws.ping_interval = 10
ws.open()
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
try:
waiter.wait()
except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
logging.info("Shutdown complete.")

View File

@ -68,7 +68,7 @@ def add_file_health(name: str, observable: rx.core.typing.Observable[typing.Any]
def build_latest_group_observer(context: mango.Context, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group: mango.Group) -> mango.LatestItemObserverSubscriber[mango.Group]:
group_subscription = mango.WebSocketSubscription[mango.Group](
group_subscription = mango.WebSocketAccountSubscription[mango.Group](
context, group.address, lambda account_info: mango.Group.parse(context, account_info))
manager.add(group_subscription)
latest_group_observer = mango.LatestItemObserverSubscriber(group)
@ -78,7 +78,7 @@ def build_latest_group_observer(context: mango.Context, manager: mango.WebSocket
def build_latest_account_observer(context: mango.Context, account: mango.Account, manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> typing.Tuple[mango.WebSocketSubscription[mango.Account], mango.LatestItemObserverSubscriber[mango.Account]]:
account_subscription = mango.WebSocketSubscription[mango.Account](
account_subscription = mango.WebSocketAccountSubscription[mango.Account](
context, account.address, lambda account_info: mango.Account.parse(context, account_info, group_observer.latest))
manager.add(account_subscription)
latest_account_observer = mango.LatestItemObserverSubscriber(account)
@ -93,7 +93,7 @@ def build_latest_spot_open_orders_observer(manager: mango.WebSocketSubscriptionM
if spot_open_orders_address is None:
raise Exception(f"No spot OpenOrders for market {spot_market.symbol}.")
spot_open_orders_subscription = mango.WebSocketSubscription[mango.OpenOrders](
spot_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
context, spot_open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, spot_market.base.decimals, spot_market.quote.decimals))
manager.add(spot_open_orders_subscription)
initial_spot_open_orders = mango.OpenOrders.load(
@ -124,7 +124,7 @@ def build_latest_serum_open_orders_observer(manager: mango.WebSocketSubscription
initial_serum_open_orders = mango.OpenOrders.load(
context, open_orders_address, serum_market.base.decimals, serum_market.quote.decimals)
serum_open_orders_subscription = mango.WebSocketSubscription[mango.OpenOrders](
serum_open_orders_subscription = mango.WebSocketAccountSubscription[mango.OpenOrders](
context, open_orders_address, lambda account_info: mango.OpenOrders.parse(account_info, serum_market.base.decimals, serum_market.quote.decimals))
manager.add(serum_open_orders_subscription)
@ -152,7 +152,7 @@ def build_latest_perp_open_orders_observer(disposer: mango.DisposePropagator, pe
def build_latest_perp_market_observer(manager: mango.WebSocketSubscriptionManager, disposer: mango.DisposePropagator, perp_market_info: mango.PerpMarketInfo, initial_perp_market_details: mango.PerpMarketDetails, group_observer: mango.LatestItemObserverSubscriber[mango.Group]) -> mango.LatestItemObserverSubscriber[mango.PerpMarketDetails]:
perp_market_subscription = mango.WebSocketSubscription[mango.PerpMarketDetails](
perp_market_subscription = mango.WebSocketAccountSubscription[mango.PerpMarketDetails](
context, perp_market_info.address, lambda account_info: mango.PerpMarketDetails.parse(account_info, group_observer.latest))
manager.add(perp_market_subscription)
latest_perp_market_observer = mango.LatestItemObserverSubscriber(initial_perp_market_details)

View File

@ -24,7 +24,7 @@ parser = argparse.ArgumentParser(
mango.ContextBuilder.add_command_line_parameters(parser)
parser.add_argument("--since-state-filename", type=str, default="report.state",
help="The name of the state file containing the signature of the last transaction looked up")
parser.add_argument("--instruction-type", type=lambda ins: mango.InstructionType[ins],
parser.add_argument("--instruction-type", type=lambda ins: mango.InstructionType[ins], required=True,
choices=list(mango.InstructionType),
help="The signature of the transaction to look up")
parser.add_argument("--sender", type=PublicKey,
@ -60,7 +60,7 @@ def summariser(context: mango.Context) -> typing.Callable[[mango.TransactionScou
instruction_details += [f"[{ins.instruction_type.name}]"]
else:
instruction_details += [f"[{ins.instruction_type.name}: {params}]"]
target = ins.describe_target()
target = ins.target_account
if target is not None:
instruction_targets += [str(target)]

View File

@ -40,7 +40,7 @@ manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
if args.account_type.upper() == "ACCOUNTINFO":
raw_subscription = mango.WebSocketSubscription(
raw_subscription = mango.WebSocketAccountSubscription(
context, args.address, lambda account_info: account_info)
manager.add(raw_subscription)
publisher: rx.Observable = raw_subscription.publisher
@ -61,13 +61,13 @@ elif args.account_type.upper() == "PERPEVENTS":
return splitter.split(perp_event_queue)
converter: typing.Callable[[mango.AccountInfo], typing.Any] = _split_events
event_splitting_subscription = mango.WebSocketSubscription(
event_splitting_subscription = mango.WebSocketAccountSubscription(
context, args.address, lambda acc: acc)
manager.add(event_splitting_subscription)
publisher = event_splitting_subscription.publisher.pipe(rx.operators.flat_map(_split_events))
else:
converter = mango.build_account_info_converter(context, args.account_type)
converting_subscription = mango.WebSocketSubscription(
converting_subscription = mango.WebSocketAccountSubscription(
context, args.address, converter)
manager.add(converting_subscription)
publisher = converting_subscription.publisher

83
bin/watch-liquidations Executable file
View File

@ -0,0 +1,83 @@
#!/usr/bin/env pyston3
import argparse
import logging
import os
import os.path
import rx
import rx.subject
import rx.operators
import sys
import threading
from solana.publickey import PublicKey
from solana.rpc.commitment import Max
sys.path.insert(0, os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")))
import mango # nopep8
parser = argparse.ArgumentParser(description="Show program logs for an account, as they arrive.")
mango.ContextBuilder.add_command_line_parameters(parser)
mango.Wallet.add_command_line_parameters(parser)
parser.add_argument("--address", type=PublicKey, action="append", default=[], required=True,
help="Address of the Solana account to watch (can be specified multiple times)")
parser.add_argument("--notify", type=mango.parse_subscription_target, action="append", default=[],
help="The notification target for all liquidation events")
parser.add_argument("--notify-successful", type=mango.parse_subscription_target,
action="append", default=[], help="The notification target for successful liquidations")
parser.add_argument("--notify-failed", type=mango.parse_subscription_target,
action="append", default=[], help="The notification target for failed liquidations")
args = parser.parse_args()
logging.getLogger().setLevel(args.log_level)
logging.warning(mango.WARNING_DISCLAIMER_TEXT)
context = mango.ContextBuilder.from_command_line_parameters(args)
context.commitment = Max
print(context)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager()
disposer.add_disposable(manager)
publishers = []
for address in args.address:
log_subscription = mango.WebSocketLogSubscription(context, address)
manager.add(log_subscription)
publishers += [log_subscription.publisher]
publisher = rx.subject.Subject()
publisher.pipe(
rx.operators.merge(*publishers),
# rx.operators.filter(lambda log_event: "PartialLiquidate" in "\n".join(log_event.logs)),
# rx.operators.map(mango.debug_print_item("Transaction")),
# rx.operators.delay(30), # Wait for the transaction to be fully confirmed
# rx.operators.map(mango.debug_print_item("After Delay")),
rx.operators.map(lambda log_event: mango.TransactionScout.load(context, log_event.signatures[0])),
rx.operators.filter(lambda item: item is not None),
rx.operators.catch(mango.observable_pipeline_error_reporter),
rx.operators.retry()
).subscribe(mango.PrintingObserverSubscriber(False))
websocket_url = context.cluster_url.replace("https", "wss", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler, manager.on_item)
ws.ping_interval = 10
ws_pong_disposable = ws.pong.subscribe(mango.FileToucherObserver("/var/tmp/mango_healthcheck_ws_pong"))
disposer.add_disposable(ws_pong_disposable)
ws.open()
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
try:
waiter.wait()
except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
logging.info("Shutdown complete.")

View File

@ -62,7 +62,7 @@ from .transactionscout import MangoInstruction, TransactionScout, fetch_all_rece
from .version import Version
from .wallet import Wallet
from .walletbalancer import TargetBalance, FixedTargetBalance, PercentageTargetBalance, TargetBalanceParser, sort_changes_for_trades, calculate_required_balance_changes, FilterSmallChanges, WalletBalancer, NullWalletBalancer, LiveWalletBalancer
from .websocketsubscription import WebSocketSubscription, WebSocketSubscriptionManager
from .websocketsubscription import WebSocketSubscription, WebSocketProgramSubscription, WebSocketAccountSubscription, WebSocketLogSubscription, WebSocketSubscriptionManager
from .layouts import layouts

View File

@ -70,13 +70,13 @@ class ContextBuilder:
help="Solana RPC cluster name")
parser.add_argument("--cluster-url", type=str, default=default_cluster_url,
help="Solana RPC cluster URL")
parser.add_argument("--program-id", type=str, default=default_program_id,
parser.add_argument("--program-id", type=PublicKey, default=default_program_id,
help="Mango program ID/address")
parser.add_argument("--dex-program-id", type=str, default=default_dex_program_id,
parser.add_argument("--dex-program-id", type=PublicKey, default=default_dex_program_id,
help="DEX program ID/address")
parser.add_argument("--group-name", type=str, default=default_group_name,
help="Mango group name")
parser.add_argument("--group-id", type=str, default=default_group_id,
parser.add_argument("--group-id", type=PublicKey, default=default_group_id,
help="Mango group ID/address")
parser.add_argument("--token-data-file", type=str, default=SplTokenLookup.DefaultDataFilepath,

View File

@ -28,19 +28,26 @@ class InstructionType(enum.IntEnum):
InitMarginAccount = 1
Deposit = 2
Withdraw = 3
Borrow = 4
SettleBorrow = 5
Liquidate = 6
DepositSrm = 7
WithdrawSrm = 8
PlaceOrder = 9
SettleFunds = 10
CancelOrder = 11
CancelOrderByClientId = 12
ChangeBorrowLimit = 13
PlaceAndSettle = 14
ForceCancelOrders = 15
PartialLiquidate = 16
AddSpotMarket = 4
AddToBasket = 5
Borrow = 6
CachePrices = 7
CacheRootBanks = 8
PlaceSpotOrder = 9
AddOracle = 10
AddPerpMarket = 11
PlacePerpOrder = 12
CancelPerpOrderByClientId = 13
CancelPerpOrder = 14
ConsumeEvents = 15
CachePerpMarkets = 16
UpdateFunding = 17
SetOracle = 18
SettleFunds = 19
CancelSpotOrder = 20
UpdateRootBank = 21
SettlePnl = 22
SettleBorrow = 23
def __str__(self):
return self.value
return self.name

View File

@ -1239,29 +1239,33 @@ SETTLE_FUNDS = construct.Struct(
"variant" / construct.Const(19, construct.BytesInteger(4, swapped=True))
)
UNSPECIFIED = construct.Struct(
"variant" / DecimalAdapter(4)
)
InstructionParsersByVariant = {
0: None, # INIT_MANGO_GROUP,
0: UNSPECIFIED, # INIT_MANGO_GROUP,
1: INIT_MANGO_ACCOUNT, # INIT_MANGO_ACCOUNT,
2: DEPOSIT, # DEPOSIT,
3: WITHDRAW, # WITHDRAW,
4: None, # ADD_SPOT_MARKET,
5: None, # ADD_TO_BASKET,
6: None, # BORROW,
7: None, # CACHE_PRICES,
8: None, # CACHE_ROOT_BANKS,
4: UNSPECIFIED, # ADD_SPOT_MARKET,
5: UNSPECIFIED, # ADD_TO_BASKET,
6: UNSPECIFIED, # BORROW,
7: UNSPECIFIED, # CACHE_PRICES,
8: UNSPECIFIED, # CACHE_ROOT_BANKS,
9: PLACE_SPOT_ORDER, # PLACE_SPOT_ORDER,
10: None, # ADD_ORACLE,
11: None, # ADD_PERP_MARKET,
10: UNSPECIFIED, # ADD_ORACLE,
11: UNSPECIFIED, # ADD_PERP_MARKET,
12: PLACE_PERP_ORDER, # PLACE_PERP_ORDER,
13: CANCEL_PERP_ORDER_BY_CLIENT_ID, # CANCEL_PERP_ORDER_BY_CLIENT_ID,
14: CANCEL_PERP_ORDER, # CANCEL_PERP_ORDER,
15: CONSUME_EVENTS, # CONSUME_EVENTS,
16: None, # CACHE_PERP_MARKETS,
17: None, # UPDATE_FUNDING,
18: None, # SET_ORACLE,
16: UNSPECIFIED, # CACHE_PERP_MARKETS,
17: UNSPECIFIED, # UPDATE_FUNDING,
18: UNSPECIFIED, # SET_ORACLE,
19: SETTLE_FUNDS, # SETTLE_FUNDS,
20: CANCEL_SPOT_ORDER, # CANCEL_SPOT_ORDER,
21: None, # UPDATE_ROOT_BANK,
22: None, # SETTLE_PNL,
23: None, # SETTLE_BORROW,
21: UNSPECIFIED, # UPDATE_ROOT_BANK,
22: UNSPECIFIED, # SETTLE_PNL,
23: UNSPECIFIED # SETTLE_BORROW,
}

View File

@ -14,8 +14,12 @@
# [Email](mailto:hello@blockworks.foundation)
from datetime import datetime
import json
import logging
import rx
import rx.subject
import rx.core.typing
import typing
import websocket # type: ignore
@ -38,6 +42,10 @@ class ReconnectingWebsocket:
self._on_item = on_item
self.reconnect_required: bool = True
self.ping_interval: int = 0
self.ping: rx.subject.behaviorsubject.BehaviorSubject = rx.subject.behaviorsubject.BehaviorSubject(
datetime.now())
self.pong: rx.subject.behaviorsubject.BehaviorSubject = rx.subject.behaviorsubject.BehaviorSubject(
datetime.now())
def close(self):
self.logger.info(f"Closing WebSocket for {self.url}")
@ -56,6 +64,12 @@ class ReconnectingWebsocket:
def _on_error(self, *args):
self.logger.warning(f"WebSocket for {self.url} has error {args}")
def _on_ping(self, *_):
self.ping.on_next(datetime.now())
def _on_pong(self, *_):
self.pong.on_next(datetime.now())
def open(self):
thread = Thread(target=self._run)
thread.start()
@ -71,5 +85,7 @@ class ReconnectingWebsocket:
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_ping=self._on_ping,
on_pong=self._on_pong
)
self._ws.run_forever(ping_interval=self.ping_interval)

View File

@ -16,6 +16,8 @@
import base58
import datetime
import logging
import traceback
import typing
from decimal import Decimal
@ -61,23 +63,30 @@ from .tokenvalue import TokenValue
# The index of the sender/signer depends on the instruction.
_instruction_signer_indices: typing.Dict[InstructionType, int] = {
InstructionType.InitMangoGroup: 3,
InstructionType.InitMangoGroup: 1,
InstructionType.InitMarginAccount: 2,
InstructionType.Deposit: 2,
InstructionType.Withdraw: 2,
InstructionType.AddSpotMarket: 7,
InstructionType.AddToBasket: 2,
InstructionType.Borrow: 2,
InstructionType.SettleBorrow: 2,
InstructionType.Liquidate: 1,
InstructionType.DepositSrm: 2,
InstructionType.WithdrawSrm: 2,
InstructionType.PlaceOrder: 1,
InstructionType.SettleFunds: 1,
InstructionType.CancelOrder: 1,
InstructionType.CancelOrderByClientId: 1,
InstructionType.ChangeBorrowLimit: 1,
InstructionType.PlaceAndSettle: 1,
InstructionType.ForceCancelOrders: 1,
InstructionType.PartialLiquidate: 1
InstructionType.CachePrices: -1, # No signer
InstructionType.CacheRootBanks: -1, # No signer
InstructionType.PlaceSpotOrder: 2,
InstructionType.AddOracle: 2,
InstructionType.AddPerpMarket: 6,
InstructionType.PlacePerpOrder: 2,
InstructionType.CancelPerpOrderByClientId: 2,
InstructionType.CancelPerpOrder: 2,
InstructionType.ConsumeEvents: -1, # No signer
InstructionType.CachePerpMarkets: -1, # No signer
InstructionType.UpdateFunding: -1, # No signer
InstructionType.SetOracle: -1, # No signer
InstructionType.SettleFunds: 2,
InstructionType.CancelSpotOrder: 1,
InstructionType.UpdateRootBank: -1, # No signer
InstructionType.SettlePnl: -1, # No signer
InstructionType.SettleBorrow: -1 # No signer
}
# The index of the token IN account depends on the instruction, and for some instructions
@ -85,21 +94,28 @@ _instruction_signer_indices: typing.Dict[InstructionType, int] = {
_token_in_indices: typing.Dict[InstructionType, int] = {
InstructionType.InitMangoGroup: -1,
InstructionType.InitMarginAccount: -1,
InstructionType.Deposit: 3, # token_account_acc - TokenAccount owned by user which will be sending the funds
InstructionType.Withdraw: 4, # vault_acc - TokenAccount owned by MangoGroup which will be sending
InstructionType.Deposit: 8,
InstructionType.Withdraw: 7,
InstructionType.AddSpotMarket: -1,
InstructionType.AddToBasket: -1,
InstructionType.Borrow: -1,
InstructionType.SettleBorrow: -1,
InstructionType.Liquidate: -1,
InstructionType.DepositSrm: 3, # srm_account_acc - TokenAccount owned by user which will be sending the funds
InstructionType.WithdrawSrm: 4, # vault_acc - SRM vault of MangoGroup
InstructionType.PlaceOrder: -1,
InstructionType.CachePrices: -1,
InstructionType.CacheRootBanks: -1,
InstructionType.PlaceSpotOrder: -1,
InstructionType.AddOracle: -1,
InstructionType.AddPerpMarket: -1,
InstructionType.PlacePerpOrder: -1,
InstructionType.CancelPerpOrderByClientId: -1,
InstructionType.CancelPerpOrder: -1,
InstructionType.ConsumeEvents: -1,
InstructionType.CachePerpMarkets: -1,
InstructionType.UpdateFunding: -1,
InstructionType.SetOracle: -1,
InstructionType.SettleFunds: -1,
InstructionType.CancelOrder: -1,
InstructionType.CancelOrderByClientId: -1,
InstructionType.ChangeBorrowLimit: -1,
InstructionType.PlaceAndSettle: -1,
InstructionType.ForceCancelOrders: -1,
InstructionType.PartialLiquidate: 2 # liqor_in_token_acc - liquidator's token account to deposit
InstructionType.CancelSpotOrder: -1,
InstructionType.UpdateRootBank: -1,
InstructionType.SettlePnl: -1,
InstructionType.SettleBorrow: -1,
}
# The index of the token OUT account depends on the instruction, and for some instructions
@ -107,21 +123,57 @@ _token_in_indices: typing.Dict[InstructionType, int] = {
_token_out_indices: typing.Dict[InstructionType, int] = {
InstructionType.InitMangoGroup: -1,
InstructionType.InitMarginAccount: -1,
InstructionType.Deposit: 4, # vault_acc - TokenAccount owned by MangoGroup
InstructionType.Withdraw: 3, # token_account_acc - TokenAccount owned by user which will be receiving the funds
InstructionType.Deposit: 6,
InstructionType.Withdraw: 6,
InstructionType.AddSpotMarket: -1,
InstructionType.AddToBasket: -1,
InstructionType.Borrow: -1,
InstructionType.SettleBorrow: -1,
InstructionType.Liquidate: -1,
InstructionType.DepositSrm: 4, # vault_acc - SRM vault of MangoGroup
InstructionType.WithdrawSrm: 3, # srm_account_acc - TokenAccount owned by user which will be receiving the funds
InstructionType.PlaceOrder: -1,
InstructionType.CachePrices: -1,
InstructionType.CacheRootBanks: -1,
InstructionType.PlaceSpotOrder: -1,
InstructionType.AddOracle: -1,
InstructionType.AddPerpMarket: -1,
InstructionType.PlacePerpOrder: -1,
InstructionType.CancelPerpOrderByClientId: -1,
InstructionType.CancelPerpOrder: -1,
InstructionType.ConsumeEvents: -1,
InstructionType.CachePerpMarkets: -1,
InstructionType.UpdateFunding: -1,
InstructionType.SetOracle: -1,
InstructionType.SettleFunds: -1,
InstructionType.CancelOrder: -1,
InstructionType.CancelOrderByClientId: -1,
InstructionType.ChangeBorrowLimit: -1,
InstructionType.PlaceAndSettle: -1,
InstructionType.ForceCancelOrders: -1,
InstructionType.PartialLiquidate: 3 # liqor_out_token_acc - liquidator's token account to withdraw into
InstructionType.CancelSpotOrder: -1,
InstructionType.UpdateRootBank: -1,
InstructionType.SettlePnl: -1,
InstructionType.SettleBorrow: -1,
}
# Some instructions (like liqudate) have a 'target' account. Most don't.
_target_indices: typing.Dict[InstructionType, int] = {
InstructionType.InitMangoGroup: -1,
InstructionType.InitMarginAccount: -1,
InstructionType.Deposit: -1,
InstructionType.Withdraw: -1,
InstructionType.AddSpotMarket: -1,
InstructionType.AddToBasket: -1,
InstructionType.Borrow: -1,
InstructionType.CachePrices: -1,
InstructionType.CacheRootBanks: -1,
InstructionType.PlaceSpotOrder: -1,
InstructionType.AddOracle: -1,
InstructionType.AddPerpMarket: -1,
InstructionType.PlacePerpOrder: -1,
InstructionType.CancelPerpOrderByClientId: -1,
InstructionType.CancelPerpOrder: -1,
InstructionType.ConsumeEvents: -1,
InstructionType.CachePerpMarkets: -1,
InstructionType.UpdateFunding: -1,
InstructionType.SetOracle: -1,
InstructionType.SettleFunds: -1,
InstructionType.CancelSpotOrder: -1,
InstructionType.UpdateRootBank: -1,
InstructionType.SettlePnl: -1,
InstructionType.SettleBorrow: -1,
}
@ -144,8 +196,10 @@ class MangoInstruction:
return self.accounts[0]
@property
def sender(self) -> PublicKey:
def sender(self) -> typing.Optional[PublicKey]:
account_index = _instruction_signer_indices[self.instruction_type]
if account_index < 0:
return None
return self.accounts[account_index]
@property
@ -162,6 +216,13 @@ class MangoInstruction:
return None
return self.accounts[account_index]
@property
def target_account(self) -> typing.Optional[PublicKey]:
account_index = _target_indices[self.instruction_type]
if account_index < 0:
return None
return self.accounts[account_index]
def describe_parameters(self) -> str:
instruction_type = self.instruction_type
additional_data = ""
@ -172,45 +233,50 @@ class MangoInstruction:
elif instruction_type == InstructionType.Deposit:
additional_data = f"quantity: {self.instruction_data.quantity}"
elif instruction_type == InstructionType.Withdraw:
additional_data = f"quantity: {self.instruction_data.quantity}"
additional_data = f"quantity: {self.instruction_data.quantity}, allow_borrow: {self.instruction_data.allow_borrow}"
elif instruction_type == InstructionType.AddSpotMarket:
pass
elif instruction_type == InstructionType.AddToBasket:
pass
elif instruction_type == InstructionType.Borrow:
additional_data = f"quantity: {self.instruction_data.quantity}, token index: {self.instruction_data.token_index}"
elif instruction_type == InstructionType.SettleBorrow:
additional_data = f"quantity: {self.instruction_data.quantity}, token index: {self.instruction_data.token_index}"
elif instruction_type == InstructionType.Liquidate:
additional_data = f"deposit quantities: {self.instruction_data.deposit_quantities}"
elif instruction_type == InstructionType.DepositSrm:
additional_data = f"quantity: {self.instruction_data.quantity}"
elif instruction_type == InstructionType.WithdrawSrm:
additional_data = f"quantity: {self.instruction_data.quantity}"
elif instruction_type == InstructionType.PlaceOrder:
pass
elif instruction_type == InstructionType.CachePrices:
pass
elif instruction_type == InstructionType.CacheRootBanks:
pass
elif instruction_type == InstructionType.PlaceSpotOrder:
additional_data = f"side: {self.instruction_data.side}, order_type: {self.instruction_data.order_type}, limit_price: {self.instruction_data.limit_price}, max_base_quantity: {self.instruction_data.max_base_quantity}, max_quote_quantity: {self.instruction_data.max_quote_quantity}, self_trade_behavior: {self.instruction_data.self_trade_behavior}, client_id: {self.instruction_data.client_id}, limit: {self.instruction_data.limit}"
elif instruction_type == InstructionType.AddOracle:
pass
elif instruction_type == InstructionType.AddPerpMarket:
pass
elif instruction_type == InstructionType.PlacePerpOrder:
additional_data = f"side: {self.instruction_data.side}, order_type: {self.instruction_data.order_type}, price: {self.instruction_data.price}, quantity: {self.instruction_data.quantity}, client_order_id: {self.instruction_data.client_order_id}"
elif instruction_type == InstructionType.CancelPerpOrderByClientId:
additional_data = f"client ID: {self.instruction_data.client_order_id}"
elif instruction_type == InstructionType.CancelPerpOrder:
additional_data = f"order ID: {self.instruction_data.order_id}, side: {self.instruction_data.side}"
elif instruction_type == InstructionType.ConsumeEvents:
additional_data = f"limit: {self.instruction_data.limit}"
elif instruction_type == InstructionType.CachePerpMarkets:
pass
elif instruction_type == InstructionType.UpdateFunding:
pass
elif instruction_type == InstructionType.SetOracle:
pass
elif instruction_type == InstructionType.SettleFunds:
pass
elif instruction_type == InstructionType.CancelOrder:
elif instruction_type == InstructionType.CancelSpotOrder:
additional_data = f"order ID: {self.instruction_data.order_id}, side: {self.instruction_data.side}"
elif instruction_type == InstructionType.UpdateRootBank:
pass
elif instruction_type == InstructionType.CancelOrderByClientId:
additional_data = f"client ID: {self.instruction_data.client_id}"
elif instruction_type == InstructionType.ChangeBorrowLimit:
additional_data = f"borrow limit: {self.instruction_data.borrow_limit}, token index: {self.instruction_data.token_index}"
elif instruction_type == InstructionType.PlaceAndSettle:
elif instruction_type == InstructionType.SettlePnl:
pass
elif instruction_type == InstructionType.SettleBorrow:
pass
elif instruction_type == InstructionType.ForceCancelOrders:
additional_data = f"limit: {self.instruction_data.limit}"
elif instruction_type == InstructionType.PartialLiquidate:
additional_data = f"max deposit: {self.instruction_data.max_deposit}"
return additional_data
# It'd be nice to be able to describe the target of some operations, like liquidations. So far
# only `PartialLiquidate` is handled in the code below but it could be extended if others also
# have a useful target.
def describe_target(self) -> typing.Optional[PublicKey]:
if self.instruction_type == InstructionType.PartialLiquidate:
return self.accounts[4]
return None
@staticmethod
def from_response(context: Context, all_accounts: typing.Sequence[PublicKey], instruction_data: typing.Dict) -> typing.Optional["MangoInstruction"]:
program_account_index = instruction_data["programIdIndex"]
@ -225,7 +291,9 @@ class MangoInstruction:
initial = layouts.MANGO_INSTRUCTION_VARIANT_FINDER.parse(decoded)
parser = layouts.InstructionParsersByVariant[initial.variant]
if parser is None:
raise Exception(f"Could not find instruction parser for variant {initial.variant}.")
logging.warning(
f"Could not find instruction parser for variant {initial.variant} / {InstructionType(initial.variant)}.")
return None
# A whole bunch of accounts are listed for a transaction. Some (or all) of them apply
# to this instruction. The instruction data gives the index of each account it uses,
@ -293,7 +361,7 @@ class TransactionScout:
return f"« TransactionScout {result} {self.group_name} [{self.timestamp}] {instructions}: Token Changes: {changed_tokens_text}\n {self.signatures} »"
@property
def sender(self) -> PublicKey:
def sender(self) -> typing.Optional[PublicKey]:
return self.instructions[0].sender
@property
@ -339,6 +407,7 @@ class TransactionScout:
if instruction is not None:
instructions += [instruction]
print("Instructions:", len(instructions))
group_name = context.lookup_group_name(instructions[0].group)
timestamp = datetime.datetime.fromtimestamp(response["blockTime"])
signatures = response["transaction"]["signatures"]
@ -360,7 +429,7 @@ class TransactionScout:
signature = "Unknown"
if response and ("transaction" in response) and ("signatures" in response["transaction"]) and len(response["transaction"]["signatures"]) > 0:
signature = ", ".join(response["transaction"]["signatures"])
raise Exception(f"Exception fetching transaction '{signature}'", exception)
raise Exception(f"Exception fetching transaction '{signature}' - {traceback.format_exc()}", exception)
def __str__(self) -> str:
def format_tokens(account_token_values: typing.Sequence[OwnedTokenValue]) -> str:

View File

@ -13,7 +13,7 @@
# [Github](https://github.com/blockworks-foundation)
# [Email](mailto:hello@blockworks.foundation)
import abc
import logging
import typing
import websocket
@ -37,29 +37,19 @@ from .observables import EventSource
TSubscriptionInstance = typing.TypeVar('TSubscriptionInstance')
class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance]):
class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance], metaclass=abc.ABCMeta):
def __init__(self, context: Context, address: PublicKey, constructor: typing.Callable[[AccountInfo], TSubscriptionInstance]):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.address = address
self.id = context.random_client_id()
self.subscription_id = 0
self.context: Context = context
self.address: PublicKey = address
self.id: int = context.random_client_id()
self.subscription_id: int = 0
self.from_account_info: typing.Callable[[AccountInfo], TSubscriptionInstance] = constructor
self.publisher: EventSource[TSubscriptionInstance] = EventSource[TSubscriptionInstance]()
@abc.abstractmethod
def build_request(self) -> str:
return """
{
"jsonrpc": "2.0",
"id": \"""" + str(self.id) + """\",
"method": "accountSubscribe",
"params": [\"""" + str(self.address) + """\",
{
"encoding": "base64",
"commitment": "processed"
}
]
}
"""
raise NotImplementedError("WebSocketSubscription.build_request() is not implemented on the base type.")
def build_account_info(self, response: RPCResponse) -> AccountInfo:
return AccountInfo.from_response(response, self.address)
@ -74,13 +64,99 @@ class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance]):
self.publisher.dispose()
class WebSocketProgramSubscription(WebSocketSubscription[TSubscriptionInstance]):
def __init__(self, context: Context, address: PublicKey, constructor: typing.Callable[[AccountInfo], TSubscriptionInstance]):
super().__init__(context, address, constructor)
def build_request(self) -> str:
return """
{
"jsonrpc": "2.0",
"id": \"""" + str(self.id) + """\",
"method": "programSubscribe",
"params": [\"""" + str(self.address) + """\",
{
"encoding": "base64",
"commitment": \"""" + str(self.context.commitment) + """\"
}
]
}
"""
class WebSocketAccountSubscription(WebSocketSubscription[TSubscriptionInstance]):
def __init__(self, context: Context, address: PublicKey, constructor: typing.Callable[[AccountInfo], TSubscriptionInstance]):
super().__init__(context, address, constructor)
def build_request(self) -> str:
return """
{
"jsonrpc": "2.0",
"id": \"""" + str(self.id) + """\",
"method": "accountSubscribe",
"params": [\"""" + str(self.address) + """\",
{
"encoding": "base64",
"commitment": \"""" + str(self.context.commitment) + """\"
}
]
}
"""
class LogEvent:
def __init__(self, signatures: typing.Sequence[str], logs: typing.Sequence[str]):
self.signatures: typing.Sequence[str] = signatures
self.logs: typing.Sequence[str] = logs
@staticmethod
def from_response(response) -> "LogEvent":
signature_text: str = response["result"]["value"]["signature"]
signatures = signature_text.split(",")
logs = response["result"]["value"]["logs"]
return LogEvent(signatures, logs)
def __str__(self):
logs = "\n ".join(self.logs)
return f"""« 𝙻𝚘𝚐𝙴𝚟𝚎𝚗𝚝 {self.signatures}
{logs}
»"""
def __repr__(self) -> str:
return f"{self}"
class WebSocketLogSubscription(WebSocketSubscription[LogEvent]):
def __init__(self, context: Context, address: PublicKey):
super().__init__(context, address, lambda _: LogEvent([""], []))
def build_request(self) -> str:
return """
{
"jsonrpc": "2.0",
"id": \"""" + str(self.id) + """\",
"method": "logsSubscribe",
"params": [
{
"mentions": [ \"""" + str(self.address) + """\" ]
},
{
"commitment": \"""" + str(self.context.commitment) + """\"
}
]
}
"""
def build(self, response: RPCResponse) -> LogEvent:
return LogEvent.from_response(response)
# # 🥭 WebSocketSubscriptionManager class
#
# The `WebSocketSubscriptionManager` takes websocket account updates and sends them to the correct
# `WebSocketSubscription`.
#
class WebSocketSubscriptionManager(Disposable):
def __init__(self):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
@ -109,7 +185,7 @@ class WebSocketSubscriptionManager(Disposable):
id: int = int(response["id"])
subscription_id: int = int(response["result"])
self.add_subscription_id(id, subscription_id)
elif response["method"] == "accountNotification":
elif (response["method"] == "accountNotification") or (response["method"] == "programNotification") or (response["method"] == "logsNotification"):
subscription_id = response["params"]["subscription"]
subscription = self.subscription_by_subscription_id(subscription_id)
built = subscription.build(response["params"])

View File

@ -21,66 +21,101 @@ def test_instruction_type_withdraw():
assert actual == mango.InstructionType.Withdraw
def test_instruction_type_borrow():
def test_instruction_type_add_to_spot_market():
actual = mango.InstructionType(4)
assert actual == mango.InstructionType.AddSpotMarket
def test_instruction_type_add_to_basket():
actual = mango.InstructionType(5)
assert actual == mango.InstructionType.AddToBasket
def test_instruction_type_borrow():
actual = mango.InstructionType(6)
assert actual == mango.InstructionType.Borrow
def test_instruction_type_settle_borrow():
actual = mango.InstructionType(5)
assert actual == mango.InstructionType.SettleBorrow
def test_instruction_type_liquidate():
actual = mango.InstructionType(6)
assert actual == mango.InstructionType.Liquidate
def test_instruction_type_deposit_srm():
def test_instruction_type_cache_prices():
actual = mango.InstructionType(7)
assert actual == mango.InstructionType.DepositSrm
assert actual == mango.InstructionType.CachePrices
def test_instruction_type_withdraw_srm():
def test_instruction_type_cache_root_banks():
actual = mango.InstructionType(8)
assert actual == mango.InstructionType.WithdrawSrm
assert actual == mango.InstructionType.CacheRootBanks
def test_instruction_type_place_order():
def test_instruction_type_place_spot_order():
actual = mango.InstructionType(9)
assert actual == mango.InstructionType.PlaceOrder
assert actual == mango.InstructionType.PlaceSpotOrder
def test_instruction_type_add_oracle():
actual = mango.InstructionType(10)
assert actual == mango.InstructionType.AddOracle
def test_instruction_type_add_perp_market():
actual = mango.InstructionType(11)
assert actual == mango.InstructionType.AddPerpMarket
def test_instruction_type_place_perp_order():
actual = mango.InstructionType(12)
assert actual == mango.InstructionType.PlacePerpOrder
def test_instruction_type_cancel_perp_order_by_client_id():
actual = mango.InstructionType(13)
assert actual == mango.InstructionType.CancelPerpOrderByClientId
def test_instruction_type_cancel_perp_order():
actual = mango.InstructionType(14)
assert actual == mango.InstructionType.CancelPerpOrder
def test_instruction_type_consume_events():
actual = mango.InstructionType(15)
assert actual == mango.InstructionType.ConsumeEvents
def test_instruction_type_cache_perp_markets():
actual = mango.InstructionType(16)
assert actual == mango.InstructionType.CachePerpMarkets
def test_instruction_type_update_funding():
actual = mango.InstructionType(17)
assert actual == mango.InstructionType.UpdateFunding
def test_instruction_type_set_oracle():
actual = mango.InstructionType(18)
assert actual == mango.InstructionType.SetOracle
def test_instruction_type_settle_funds():
actual = mango.InstructionType(10)
actual = mango.InstructionType(19)
assert actual == mango.InstructionType.SettleFunds
def test_instruction_type_cancel_funds():
actual = mango.InstructionType(11)
assert actual == mango.InstructionType.CancelOrder
def test_instruction_type_cancel_spot_order():
actual = mango.InstructionType(20)
assert actual == mango.InstructionType.CancelSpotOrder
def test_instruction_type_cancel_order_by_client_id():
actual = mango.InstructionType(12)
assert actual == mango.InstructionType.CancelOrderByClientId
def test_instruction_type_update_root_bank():
actual = mango.InstructionType(21)
assert actual == mango.InstructionType.UpdateRootBank
def test_instruction_type_change_borrow_limit():
actual = mango.InstructionType(13)
assert actual == mango.InstructionType.ChangeBorrowLimit
def test_instruction_type_settle_pnl():
actual = mango.InstructionType(22)
assert actual == mango.InstructionType.SettlePnl
def test_instruction_type_place_and_settle():
actual = mango.InstructionType(14)
assert actual == mango.InstructionType.PlaceAndSettle
def test_instruction_type_force_cancel_orders():
actual = mango.InstructionType(15)
assert actual == mango.InstructionType.ForceCancelOrders
def test_instruction_type_partial_liquidate():
actual = mango.InstructionType(16)
assert actual == mango.InstructionType.PartialLiquidate
def test_instruction_type_settle_borrow():
actual = mango.InstructionType(23)
assert actual == mango.InstructionType.SettleBorrow

View File

@ -9,7 +9,7 @@ import typing
def test_transaction_instruction_constructor():
instruction_type: mango.InstructionType = mango.InstructionType.PartialLiquidate
instruction_type: mango.InstructionType = mango.InstructionType.Deposit
instruction_data: typing.Dict[str, str] = {"key": "test value"}
account1 = fake_seeded_public_key("account 1")
account2 = fake_seeded_public_key("account 2")