{ "cells": [ { "cell_type": "markdown", "id": "velvet-difficulty", "metadata": {}, "source": [ "# ⚠ Warning\n", "\n", "THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\n", "\n", "[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gl/OpinionatedGeek%2Fmango-explorer/HEAD?filepath=BaseModel.ipynb) _🏃‍♀️ To run this notebook press the ⏩ icon in the toolbar above._\n", "\n", "[🥭 Mango Markets](https://mango.markets/) support is available at: [Docs](https://docs.mango.markets/) | [Discord](https://discord.gg/67jySBhxrg) | [Twitter](https://twitter.com/mangomarkets) | [Github](https://github.com/blockworks-foundation) | [Email](mailto:hello@blockworks.foundation)" ] }, { "cell_type": "markdown", "id": "subject-princeton", "metadata": {}, "source": [ "# 🥭 BaseModel\n", "\n", "This notebook contains high-level classes to interact with Mango Markets.\n", "\n", "These often depend on structure layouts in [Layouts](Layouts.ipynb).\n", "\n", "The idea is to have one high-level class, with useful members and methods, that can allow the Solana blobs loaded by the layout to vary. So there could be a layouts.SOMETHING_V1 and a layouts.SOMETHING_V2, and the Something class here can load from both.\n", "\n", "Code, in general, depends only on these classes, abstracting away the version of the layout used to load the data.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "reserved-fellow", "metadata": { "jupyter": { "source_hidden": true } }, "outputs": [], "source": [ "import abc\n", "import datetime\n", "import enum\n", "import logging\n", "import typing\n", "\n", "import Layouts as layouts\n", "\n", "from decimal import Decimal\n", "from pyserum.market import Market\n", "from pyserum.open_orders_account import OpenOrdersAccount\n", "from solana.account import Account\n", "from solana.publickey import PublicKey\n", "from solana.rpc.commitment import Single\n", "from solana.rpc.types import MemcmpOpts, TokenAccountOpts, RPCMethod, RPCResponse\n", "from spl.token.client import Token as SplToken\n", "from spl.token.constants import TOKEN_PROGRAM_ID\n", "\n", "from Constants import NUM_MARKETS, NUM_TOKENS, SOL_DECIMALS, SYSTEM_PROGRAM_ADDRESS\n", "from Context import Context\n", "from Decoder import decode_binary, encode_binary, encode_key\n" ] }, { "cell_type": "markdown", "id": "small-austin", "metadata": {}, "source": [ "## Version enum\n", "\n", "This is used to keep track of which version of the layout struct was used to load the data.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "amended-reverse", "metadata": {}, "outputs": [], "source": [ "class Version(enum.Enum):\n", " UNSPECIFIED = 0\n", " V1 = 1\n", " V2 = 2\n", " V3 = 3\n", " V4 = 4\n", " V5 = 5\n" ] }, { "cell_type": "markdown", "id": "provincial-voice", "metadata": {}, "source": [ "## InstructionType enum\n", "\n", "This `enum` encapsulates all current Mango Market instruction variants.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "republican-verification", "metadata": {}, "outputs": [], "source": [ "class InstructionType(enum.IntEnum):\n", " InitMangoGroup = 0\n", " InitMarginAccount = 1\n", " Deposit = 2\n", " Withdraw = 3\n", " Borrow = 4\n", " SettleBorrow = 5\n", " Liquidate = 6\n", " DepositSrm = 7\n", " WithdrawSrm = 8\n", " PlaceOrder = 9\n", " SettleFunds = 10\n", " CancelOrder = 11\n", " CancelOrderByClientId = 12\n", " ChangeBorrowLimit = 13\n", " PlaceAndSettle = 14\n", " ForceCancelOrders = 15\n", " PartialLiquidate = 16\n", "\n", " def __str__(self):\n", " return self.name\n" ] }, { "cell_type": "markdown", "id": "earlier-foundation", "metadata": {}, "source": [ "## AccountInfo class\n" ] }, { "cell_type": "code", "execution_count": null, "id": "duplicate-timeline", "metadata": {}, "outputs": [], "source": [ "class AccountInfo:\n", " def __init__(self, address: PublicKey, executable: bool, lamports: Decimal, owner: PublicKey, rent_epoch: Decimal, data: bytes):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.address: PublicKey = address\n", " self.executable: bool = executable\n", " self.lamports: Decimal = lamports\n", " self.owner: PublicKey = owner\n", " self.rent_epoch: Decimal = rent_epoch\n", " self.data: bytes = data\n", "\n", " def encoded_data(self) -> typing.List:\n", " return encode_binary(self.data)\n", "\n", " def __str__(self) -> str:\n", " return f\"\"\"« AccountInfo [{self.address}]:\n", " Owner: {self.owner}\n", " Executable: {self.executable}\n", " Lamports: {self.lamports}\n", " Rent Epoch: {self.rent_epoch}\n", "»\"\"\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n", "\n", " @staticmethod\n", " def load(context: Context, address: PublicKey) -> typing.Optional[\"AccountInfo\"]:\n", " response: RPCResponse = context.client.get_account_info(address)\n", " result = context.unwrap_or_raise_exception(response)\n", " if result[\"value\"] is None:\n", " return None\n", "\n", " return AccountInfo._from_response_values(result[\"value\"], address)\n", "\n", " @staticmethod\n", " def load_multiple(context: Context, addresses: typing.List[PublicKey]) -> typing.List[\"AccountInfo\"]:\n", " address_strings = list(map(PublicKey.__str__, addresses))\n", " response = context.client._provider.make_request(RPCMethod(\"getMultipleAccounts\"), address_strings)\n", " response_value_list = zip(response[\"result\"][\"value\"], addresses)\n", " return list(map(lambda pair: AccountInfo._from_response_values(pair[0], pair[1]), response_value_list))\n", "\n", " @staticmethod\n", " def _from_response_values(response_values: typing.Dict[str, typing.Any], address: PublicKey) -> \"AccountInfo\":\n", " executable = bool(response_values[\"executable\"])\n", " lamports = Decimal(response_values[\"lamports\"])\n", " owner = PublicKey(response_values[\"owner\"])\n", " rent_epoch = Decimal(response_values[\"rentEpoch\"])\n", " data = decode_binary(response_values[\"data\"])\n", " return AccountInfo(address, executable, lamports, owner, rent_epoch, data)\n", "\n", " @staticmethod\n", " def from_response(response: RPCResponse, address: PublicKey) -> \"AccountInfo\":\n", " return AccountInfo._from_response_values(response[\"result\"][\"value\"], address)\n" ] }, { "cell_type": "markdown", "id": "reduced-arnold", "metadata": {}, "source": [ "## AddressableAccount class\n", "\n", "Some of our most-used objects (like `Group` or `MarginAccount`) are accounts on Solana with packed data. When these are loaded, they're typically loaded by loading the `AccountInfo` and parsing it in an object-specific way.\n", "\n", "It's sometimes useful to be able to treat these in a common fashion so we use `AddressableAccount` as a way of sharing common features and providing a common base." ] }, { "cell_type": "code", "execution_count": null, "id": "controlling-notion", "metadata": {}, "outputs": [], "source": [ "class AddressableAccount(metaclass=abc.ABCMeta):\n", " def __init__(self, account_info: AccountInfo):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.account_info = account_info\n", "\n", " @property\n", " def address(self) -> PublicKey:\n", " return self.account_info.address\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "terminal-while", "metadata": {}, "source": [ "## SerumAccountFlags class\n", "\n", "The Serum prefix is because there's also `MangoAccountFlags` for the Mango-specific flags." ] }, { "cell_type": "code", "execution_count": null, "id": "sporting-storage", "metadata": {}, "outputs": [], "source": [ "class SerumAccountFlags:\n", " def __init__(self, version: Version, initialized: bool, market: bool, open_orders: bool,\n", " request_queue: bool, event_queue: bool, bids: bool, asks: bool, disabled: bool):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.version: Version = version\n", " self.initialized = initialized\n", " self.market = market\n", " self.open_orders = open_orders\n", " self.request_queue = request_queue\n", " self.event_queue = event_queue\n", " self.bids = bids\n", " self.asks = asks\n", " self.disabled = disabled\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.SERUM_ACCOUNT_FLAGS) -> \"SerumAccountFlags\":\n", " return SerumAccountFlags(Version.UNSPECIFIED, layout.initialized, layout.market,\n", " layout.open_orders, layout.request_queue, layout.event_queue,\n", " layout.bids, layout.asks, layout.disabled)\n", "\n", " def __str__(self) -> str:\n", " flags: typing.List[typing.Optional[str]] = []\n", " flags += [\"initialized\" if self.initialized else None]\n", " flags += [\"market\" if self.market else None]\n", " flags += [\"open_orders\" if self.open_orders else None]\n", " flags += [\"request_queue\" if self.request_queue else None]\n", " flags += [\"event_queue\" if self.event_queue else None]\n", " flags += [\"bids\" if self.bids else None]\n", " flags += [\"asks\" if self.asks else None]\n", " flags += [\"disabled\" if self.disabled else None]\n", " flag_text = \" | \".join(flag for flag in flags if flag is not None) or \"None\"\n", " return f\"« SerumAccountFlags: {flag_text} »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "regulation-telling", "metadata": {}, "source": [ "## MangoAccountFlags class\n", "\n", "The Mango prefix is because there's also `SerumAccountFlags` for the standard Serum flags." ] }, { "cell_type": "code", "execution_count": null, "id": "sorted-allergy", "metadata": {}, "outputs": [], "source": [ "class MangoAccountFlags:\n", " def __init__(self, version: Version, initialized: bool, group: bool, margin_account: bool, srm_account: bool):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.version: Version = version\n", " self.initialized = initialized\n", " self.group = group\n", " self.margin_account = margin_account\n", " self.srm_account = srm_account\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.MANGO_ACCOUNT_FLAGS) -> \"MangoAccountFlags\":\n", " return MangoAccountFlags(Version.UNSPECIFIED, layout.initialized, layout.group, layout.margin_account,\n", " layout.srm_account)\n", "\n", " def __str__(self) -> str:\n", " flags: typing.List[typing.Optional[str]] = []\n", " flags += [\"initialized\" if self.initialized else None]\n", " flags += [\"group\" if self.group else None]\n", " flags += [\"margin_account\" if self.margin_account else None]\n", " flags += [\"srm_account\" if self.srm_account else None]\n", " flag_text = \" | \".join(flag for flag in flags if flag is not None) or \"None\"\n", " return f\"« MangoAccountFlags: {flag_text} »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "guilty-convergence", "metadata": {}, "source": [ "## Index class" ] }, { "cell_type": "code", "execution_count": null, "id": "forbidden-reliance", "metadata": {}, "outputs": [], "source": [ "class Index:\n", " def __init__(self, version: Version, last_update: datetime.datetime, borrow: Decimal, deposit: Decimal):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.version: Version = version\n", " self.last_update: datetime.datetime = last_update\n", " self.borrow: Decimal = borrow\n", " self.deposit: Decimal = deposit\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.INDEX, decimals: Decimal) -> \"Index\":\n", " borrow = layout.borrow / Decimal(10 ** decimals)\n", " deposit = layout.deposit / Decimal(10 ** decimals)\n", " return Index(Version.UNSPECIFIED, layout.last_update, borrow, deposit)\n", "\n", " def __str__(self) -> str:\n", " return f\"« Index: Borrow: {self.borrow:,.8f}, Deposit: {self.deposit:,.8f} [last update: {self.last_update}] »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "honey-person", "metadata": {}, "source": [ "## AggregatorConfig class" ] }, { "cell_type": "code", "execution_count": null, "id": "piano-search", "metadata": {}, "outputs": [], "source": [ "class AggregatorConfig:\n", " def __init__(self, version: Version, description: str, decimals: Decimal, restart_delay: Decimal,\n", " max_submissions: Decimal, min_submissions: Decimal, reward_amount: Decimal,\n", " reward_token_account: PublicKey):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.version: Version = version\n", " self.description: str = description\n", " self.decimals: Decimal = decimals\n", " self.restart_delay: Decimal = restart_delay\n", " self.max_submissions: Decimal = max_submissions\n", " self.min_submissions: Decimal = min_submissions\n", " self.reward_amount: Decimal = reward_amount\n", " self.reward_token_account: PublicKey = reward_token_account\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.AGGREGATOR_CONFIG) -> \"AggregatorConfig\":\n", " return AggregatorConfig(Version.UNSPECIFIED, layout.description, layout.decimals,\n", " layout.restart_delay, layout.max_submissions, layout.min_submissions,\n", " layout.reward_amount, layout.reward_token_account)\n", "\n", " def __str__(self) -> str:\n", " return f\"« AggregatorConfig: '{self.description}', Decimals: {self.decimals} [restart delay: {self.restart_delay}], Max: {self.max_submissions}, Min: {self.min_submissions}, Reward: {self.reward_amount}, Reward Account: {self.reward_token_account} »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "controlling-singles", "metadata": {}, "source": [ "## Round class" ] }, { "cell_type": "code", "execution_count": null, "id": "vertical-wound", "metadata": {}, "outputs": [], "source": [ "class Round:\n", " def __init__(self, version: Version, id: Decimal, created_at: datetime.datetime, updated_at: datetime.datetime):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.version: Version = version\n", " self.id: Decimal = id\n", " self.created_at: datetime.datetime = created_at\n", " self.updated_at: datetime.datetime = updated_at\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.ROUND) -> \"Round\":\n", " return Round(Version.UNSPECIFIED, layout.id, layout.created_at, layout.updated_at)\n", "\n", " def __str__(self) -> str:\n", " return f\"« Round[{self.id}], Created: {self.updated_at}, Updated: {self.updated_at} »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "consolidated-hughes", "metadata": {}, "source": [ "## Answer class" ] }, { "cell_type": "code", "execution_count": null, "id": "identical-california", "metadata": {}, "outputs": [], "source": [ "class Answer:\n", " def __init__(self, version: Version, round_id: Decimal, median: Decimal, created_at: datetime.datetime, updated_at: datetime.datetime):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.version: Version = version\n", " self.round_id: Decimal = round_id\n", " self.median: Decimal = median\n", " self.created_at: datetime.datetime = created_at\n", " self.updated_at: datetime.datetime = updated_at\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.ANSWER) -> \"Answer\":\n", " return Answer(Version.UNSPECIFIED, layout.round_id, layout.median, layout.created_at, layout.updated_at)\n", "\n", " def __str__(self) -> str:\n", " return f\"« Answer: Round[{self.round_id}], Median: {self.median:,.8f}, Created: {self.updated_at}, Updated: {self.updated_at} »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "extreme-nature", "metadata": {}, "source": [ "## Aggregator class" ] }, { "cell_type": "code", "execution_count": null, "id": "arbitrary-ottawa", "metadata": {}, "outputs": [], "source": [ "class Aggregator(AddressableAccount):\n", " def __init__(self, account_info: AccountInfo, version: Version, config: AggregatorConfig,\n", " initialized: bool, name: str, owner: PublicKey, round_: Round,\n", " round_submissions: PublicKey, answer: Answer, answer_submissions: PublicKey):\n", " super().__init__(account_info)\n", " self.version: Version = version\n", " self.config: AggregatorConfig = config\n", " self.initialized: bool = initialized\n", " self.name: str = name\n", " self.owner: PublicKey = owner\n", " self.round: Round = round_\n", " self.round_submissions: PublicKey = round_submissions\n", " self.answer: Answer = answer\n", " self.answer_submissions: PublicKey = answer_submissions\n", "\n", " @property\n", " def price(self) -> Decimal:\n", " return self.answer.median / (10 ** self.config.decimals)\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.AGGREGATOR, account_info: AccountInfo, name: str) -> \"Aggregator\":\n", " config = AggregatorConfig.from_layout(layout.config)\n", " initialized = bool(layout.initialized)\n", " round_ = Round.from_layout(layout.round)\n", " answer = Answer.from_layout(layout.answer)\n", "\n", " return Aggregator(account_info, Version.UNSPECIFIED, config, initialized, name, layout.owner,\n", " round_, layout.round_submissions, answer, layout.answer_submissions)\n", "\n", " @staticmethod\n", " def parse(context: Context, account_info: AccountInfo) -> \"Aggregator\":\n", " data = account_info.data\n", " if len(data) != layouts.AGGREGATOR.sizeof():\n", " raise Exception(f\"Data length ({len(data)}) does not match expected size ({layouts.AGGREGATOR.sizeof()})\")\n", "\n", " name = context.lookup_oracle_name(account_info.address)\n", " layout = layouts.AGGREGATOR.parse(data)\n", " return Aggregator.from_layout(layout, account_info, name)\n", "\n", " @staticmethod\n", " def load(context: Context, account_address: PublicKey):\n", " account_info = AccountInfo.load(context, account_address)\n", " if account_info is None:\n", " raise Exception(f\"Aggregator account not found at address '{account_address}'\")\n", " return Aggregator.parse(context, account_info)\n", "\n", " def __str__(self) -> str:\n", " return f\"\"\"\n", "« Aggregator '{self.name}' [{self.version}]:\n", " Config: {self.config}\n", " Initialized: {self.initialized}\n", " Owner: {self.owner}\n", " Round: {self.round}\n", " Round Submissions: {self.round_submissions}\n", " Answer: {self.answer}\n", " Answer Submissions: {self.answer_submissions}\n", "»\n", "\"\"\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "alone-alfred", "metadata": {}, "source": [ "## Token class\n", "\n", "`Token` defines aspects common to every token." ] }, { "cell_type": "code", "execution_count": null, "id": "pediatric-sword", "metadata": {}, "outputs": [], "source": [ "class Token:\n", " def __init__(self, name: str, mint: PublicKey, decimals: Decimal):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.name: str = name.upper()\n", " self.mint: PublicKey = mint\n", " self.decimals: Decimal = decimals\n", "\n", " def round(self, value: Decimal) -> Decimal:\n", " rounded = round(value, int(self.decimals))\n", " return Decimal(rounded)\n", "\n", " def name_matches(self, name: str) -> bool:\n", " return self.name.upper() == name.upper()\n", "\n", " @staticmethod\n", " def find_by_name(values: typing.List[\"Token\"], name: str) -> \"Token\":\n", " found = [value for value in values if value.name_matches(name)]\n", " if len(found) == 0:\n", " raise Exception(f\"Token '{name}' not found in token values: {values}\")\n", "\n", " if len(found) > 1:\n", " raise Exception(f\"Token '{name}' matched multiple tokens in values: {values}\")\n", "\n", " return found[0]\n", "\n", " @staticmethod\n", " def find_by_mint(values: typing.List[\"Token\"], mint: PublicKey) -> \"Token\":\n", " found = [value for value in values if value.mint == mint]\n", " if len(found) == 0:\n", " raise Exception(f\"Token '{mint}' not found in token values: {values}\")\n", "\n", " if len(found) > 1:\n", " raise Exception(f\"Token '{mint}' matched multiple tokens in values: {values}\")\n", "\n", " return found[0]\n", "\n", " # TokenMetadatas are equal if they have the same mint address.\n", " def __eq__(self, other):\n", " if hasattr(other, 'mint'):\n", " return self.mint == other.mint\n", " return False\n", "\n", " def __str__(self) -> str:\n", " return f\"« Token '{self.name}' [{self.mint} ({self.decimals} decimals)] »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "statistical-drinking", "metadata": {}, "source": [ "## SolToken object\n", "\n", "It's sometimes handy to have a `Token` for SOL, but SOL isn't actually a token and can't appear in baskets. This object defines a special case for SOL." ] }, { "cell_type": "code", "execution_count": null, "id": "small-mitchell", "metadata": {}, "outputs": [], "source": [ "SolToken = Token(\"SOL\", SYSTEM_PROGRAM_ADDRESS, SOL_DECIMALS)" ] }, { "cell_type": "markdown", "id": "ordinary-corner", "metadata": {}, "source": [ "## TokenLookup class\n", "\n", "The sole reason for this class is for us to be able to look up a token by name or mint address, and return a `Token` instances.\n", "\n", "It's complicated because:\n", "* Our static token data is in `MangoConstants`\n", "* Our `Token` class is defined in `BaseModel`\n", "* Our `context` is the only part that knows which cluster we're in\n", "* `Context` can't use anything from `BaseModel` (it would cause a dependency cycle)\n", "* SOL isn't mentioned in our `MangoConstants` data\n" ] }, { "cell_type": "code", "execution_count": null, "id": "durable-olive", "metadata": {}, "outputs": [], "source": [ "class TokenLookup:\n", " @staticmethod\n", " def find_by_name(context: Context, name: str) -> Token:\n", " if SolToken.name_matches(name):\n", " return SolToken\n", " mint = context.lookup_token_address(name)\n", " if mint is None:\n", " raise Exception(f\"Could not find token with name '{name}'.\")\n", " return Token(name, mint, Decimal(6))\n", "\n", " @staticmethod\n", " def find_by_mint(context: Context, mint: PublicKey) -> Token:\n", " if SolToken.mint == mint:\n", " return SolToken\n", " name = context.lookup_token_name(mint)\n", " if name is None:\n", " raise Exception(f\"Could not find token with mint '{mint}'.\")\n", " return Token(name, mint, Decimal(6))\n" ] }, { "cell_type": "markdown", "id": "appointed-stewart", "metadata": {}, "source": [ "## BasketToken class\n", "\n", "`BasketToken` defines aspects of `Token`s that are part of a `Group` basket." ] }, { "cell_type": "code", "execution_count": null, "id": "synthetic-dallas", "metadata": {}, "outputs": [], "source": [ "class BasketToken:\n", " def __init__(self, token: Token, vault: PublicKey, index: Index):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.token: Token = token\n", " self.vault: PublicKey = vault\n", " self.index: Index = index\n", "\n", " @staticmethod\n", " def find_by_name(values: typing.List[\"BasketToken\"], name: str) -> \"BasketToken\":\n", " found = [value for value in values if value.token.name_matches(name)]\n", " if len(found) == 0:\n", " raise Exception(f\"Token '{name}' not found in token values: {values}\")\n", "\n", " if len(found) > 1:\n", " raise Exception(f\"Token '{name}' matched multiple tokens in values: {values}\")\n", "\n", " return found[0]\n", "\n", " @staticmethod\n", " def find_by_mint(values: typing.List[\"BasketToken\"], mint: PublicKey) -> \"BasketToken\":\n", " found = [value for value in values if value.token.mint == mint]\n", " if len(found) == 0:\n", " raise Exception(f\"Token '{mint}' not found in token values: {values}\")\n", "\n", " if len(found) > 1:\n", " raise Exception(f\"Token '{mint}' matched multiple tokens in values: {values}\")\n", "\n", " return found[0]\n", "\n", " @staticmethod\n", " def find_by_token(values: typing.List[\"BasketToken\"], token: Token) -> \"BasketToken\":\n", " return BasketToken.find_by_mint(values, token.mint)\n", "\n", " # BasketTokens are equal if they have the same underlying token.\n", " def __eq__(self, other):\n", " if hasattr(other, 'token'):\n", " return self.token == other.token\n", " return False\n", "\n", " def __str__(self) -> str:\n", " return f\"\"\"« BasketToken [{self.token}]:\n", " Vault: {self.vault}\n", " Index: {self.index}\n", "»\"\"\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "polish-michigan", "metadata": {}, "source": [ "## TokenValue class\n", "\n", "The `TokenValue` class is a simple way of keeping a token and value together, and displaying them nicely consistently." ] }, { "cell_type": "code", "execution_count": null, "id": "acknowledged-trunk", "metadata": {}, "outputs": [], "source": [ "class TokenValue:\n", " def __init__(self, token: Token, value: Decimal):\n", " self.token = token\n", " self.value = value\n", "\n", " @staticmethod\n", " def fetch_total_value_or_none(context: Context, account_public_key: PublicKey, token: Token) -> typing.Optional[\"TokenValue\"]:\n", " opts = TokenAccountOpts(mint=token.mint)\n", "\n", " token_accounts_response = context.client.get_token_accounts_by_owner(account_public_key, opts, commitment=context.commitment)\n", " token_accounts = token_accounts_response[\"result\"][\"value\"]\n", " if len(token_accounts) == 0:\n", " return None\n", "\n", " total_value = Decimal(0)\n", " for token_account in token_accounts:\n", " result = context.client.get_token_account_balance(token_account[\"pubkey\"], commitment=context.commitment)\n", " value = Decimal(result[\"result\"][\"value\"][\"amount\"])\n", " decimal_places = result[\"result\"][\"value\"][\"decimals\"]\n", " divisor = Decimal(10 ** decimal_places)\n", " total_value += value / divisor\n", "\n", " return TokenValue(token, total_value)\n", "\n", " @staticmethod\n", " def fetch_total_value(context: Context, account_public_key: PublicKey, token: Token) -> \"TokenValue\":\n", " value = TokenValue.fetch_total_value_or_none(context, account_public_key, token)\n", " if value is None:\n", " return TokenValue(token, Decimal(0))\n", " return value\n", "\n", " @staticmethod\n", " def report(reporter: typing.Callable[[str], None], values: typing.List[\"TokenValue\"]) -> None:\n", " for value in values:\n", " reporter(f\"{value.value:>18,.8f} {value.token.name}\")\n", "\n", " @staticmethod\n", " def find_by_name(values: typing.List[\"TokenValue\"], name: str) -> \"TokenValue\":\n", " found = [value for value in values if value.token.name_matches(name)]\n", " if len(found) == 0:\n", " raise Exception(f\"Token '{name}' not found in token values: {values}\")\n", "\n", " if len(found) > 1:\n", " raise Exception(f\"Token '{name}' matched multiple tokens in values: {values}\")\n", "\n", " return found[0]\n", "\n", " @staticmethod\n", " def find_by_mint(values: typing.List[\"TokenValue\"], mint: PublicKey) -> \"TokenValue\":\n", " found = [value for value in values if value.token.mint == mint]\n", " if len(found) == 0:\n", " raise Exception(f\"Token '{mint}' not found in token values: {values}\")\n", "\n", " if len(found) > 1:\n", " raise Exception(f\"Token '{mint}' matched multiple tokens in values: {values}\")\n", "\n", " return found[0]\n", "\n", " @staticmethod\n", " def find_by_token(values: typing.List[\"TokenValue\"], token: Token) -> \"TokenValue\":\n", " return TokenValue.find_by_mint(values, token.mint)\n", "\n", " @staticmethod\n", " def changes(before: typing.List[\"TokenValue\"], after: typing.List[\"TokenValue\"]) -> typing.List[\"TokenValue\"]:\n", " changes: typing.List[TokenValue] = []\n", " for before_balance in before:\n", " after_balance = TokenValue.find_by_token(after, before_balance.token)\n", " result = TokenValue(before_balance.token, after_balance.value - before_balance.value)\n", " changes += [result]\n", "\n", " return changes\n", "\n", " def __str__(self) -> str:\n", " return f\"« TokenValue: {self.value:>18,.8f} {self.token.name} »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "viral-property", "metadata": {}, "source": [ "## OwnedTokenValue class\n", "\n", "Ties an owner and `TokenValue` together. This is useful in the `TransactionScout`, where token mints and values are given separate from the owner `PublicKey` - we can package them together in this `OwnedTokenValue` class." ] }, { "cell_type": "code", "execution_count": null, "id": "tribal-start", "metadata": {}, "outputs": [], "source": [ "class OwnedTokenValue:\n", " def __init__(self, owner: PublicKey, token_value: TokenValue):\n", " self.owner = owner\n", " self.token_value = token_value\n", "\n", " @staticmethod\n", " def find_by_owner(values: typing.List[\"OwnedTokenValue\"], owner: PublicKey) -> \"OwnedTokenValue\":\n", " found = [value for value in values if value.owner == owner]\n", " if len(found) == 0:\n", " raise Exception(f\"Owner '{owner}' not found in: {values}\")\n", "\n", " if len(found) > 1:\n", " raise Exception(f\"Owner '{owner}' matched multiple tokens in: {values}\")\n", "\n", " return found[0]\n", "\n", " @staticmethod\n", " def changes(before: typing.List[\"OwnedTokenValue\"], after: typing.List[\"OwnedTokenValue\"]) -> typing.List[\"OwnedTokenValue\"]:\n", " changes: typing.List[OwnedTokenValue] = []\n", " for before_value in before:\n", " after_value = OwnedTokenValue.find_by_owner(after, before_value.owner)\n", " token_value = TokenValue(before_value.token_value.token, after_value.token_value.value - before_value.token_value.value)\n", " result = OwnedTokenValue(before_value.owner, token_value)\n", " changes += [result]\n", "\n", " return changes\n", "\n", " def __str__(self) -> str:\n", " return f\"[{self.owner}]: {self.token_value}\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "emotional-eclipse", "metadata": {}, "source": [ "## MarketMetadata class" ] }, { "cell_type": "code", "execution_count": null, "id": "local-geometry", "metadata": {}, "outputs": [], "source": [ "class MarketMetadata:\n", " def __init__(self, name: str, address: PublicKey, base: BasketToken, quote: BasketToken,\n", " spot: PublicKey, oracle: PublicKey, decimals: Decimal):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.name: str = name\n", " self.address: PublicKey = address\n", " self.base: BasketToken = base\n", " self.quote: BasketToken = quote\n", " self.spot: PublicKey = spot\n", " self.oracle: PublicKey = oracle\n", " self.decimals: Decimal = decimals\n", " self._market = None\n", "\n", " def fetch_market(self, context: Context) -> Market:\n", " if self._market is None:\n", " self._market = Market.load(context.client, self.spot)\n", "\n", " return self._market\n", "\n", " def __str__(self) -> str:\n", " return f\"\"\"« Market '{self.name}' [{self.spot}]:\n", " Base: {self.base}\n", " Quote: {self.quote}\n", " Oracle: {self.oracle} ({self.decimals} decimals)\n", "»\"\"\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "respective-advice", "metadata": {}, "source": [ "## Group class" ] }, { "cell_type": "code", "execution_count": null, "id": "parallel-healing", "metadata": {}, "outputs": [], "source": [ "class Group(AddressableAccount):\n", " def __init__(self, account_info: AccountInfo, version: Version, context: Context,\n", " account_flags: MangoAccountFlags, basket_tokens: typing.List[BasketToken],\n", " markets: typing.List[MarketMetadata],\n", " signer_nonce: Decimal, signer_key: PublicKey, dex_program_id: PublicKey,\n", " total_deposits: typing.List[Decimal], total_borrows: typing.List[Decimal],\n", " maint_coll_ratio: Decimal, init_coll_ratio: Decimal, srm_vault: PublicKey,\n", " admin: PublicKey, borrow_limits: typing.List[Decimal]):\n", " super().__init__(account_info)\n", " self.version: Version = version\n", " self.context: Context = context\n", " self.account_flags: MangoAccountFlags = account_flags\n", " self.basket_tokens: typing.List[BasketToken] = basket_tokens\n", " self.markets: typing.List[MarketMetadata] = markets\n", " self.signer_nonce: Decimal = signer_nonce\n", " self.signer_key: PublicKey = signer_key\n", " self.dex_program_id: PublicKey = dex_program_id\n", " self.total_deposits: typing.List[Decimal] = total_deposits\n", " self.total_borrows: typing.List[Decimal] = total_borrows\n", " self.maint_coll_ratio: Decimal = maint_coll_ratio\n", " self.init_coll_ratio: Decimal = init_coll_ratio\n", " self.srm_vault: PublicKey = srm_vault\n", " self.admin: PublicKey = admin\n", " self.borrow_limits: typing.List[Decimal] = borrow_limits\n", "\n", " @property\n", " def shared_quote_token(self) -> BasketToken:\n", " return self.basket_tokens[-1]\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.GROUP, context: Context, account_info: AccountInfo) -> \"Group\":\n", " account_flags = MangoAccountFlags.from_layout(layout.account_flags)\n", " indexes = list(map(lambda pair: Index.from_layout(pair[0], pair[1]), zip(layout.indexes, layout.mint_decimals)))\n", "\n", " basket_tokens: typing.List[BasketToken] = []\n", " for index in range(NUM_TOKENS):\n", " token_address = layout.tokens[index]\n", " token_name = context.lookup_token_name(token_address)\n", " if token_name is None:\n", " raise Exception(f\"Could not find token with mint '{token_address}' in Group.\")\n", " token = Token(token_name, token_address, layout.mint_decimals[index])\n", " basket_token = BasketToken(token, layout.vaults[index], indexes[index])\n", " basket_tokens += [basket_token]\n", "\n", " markets: typing.List[MarketMetadata] = []\n", " for index in range(NUM_MARKETS):\n", " market_address = layout.spot_markets[index]\n", " market_name = context.lookup_market_name(market_address)\n", " base_name, quote_name = market_name.split(\"/\")\n", " base_token = BasketToken.find_by_name(basket_tokens, base_name)\n", " quote_token = BasketToken.find_by_name(basket_tokens, quote_name)\n", " market = MarketMetadata(market_name, market_address, base_token, quote_token,\n", " layout.spot_markets[index],\n", " layout.oracles[index],\n", " layout.oracle_decimals[index])\n", " markets += [market]\n", "\n", " maint_coll_ratio = layout.maint_coll_ratio.quantize(Decimal('.01'))\n", " init_coll_ratio = layout.init_coll_ratio.quantize(Decimal('.01'))\n", " return Group(account_info, Version.UNSPECIFIED, context, account_flags, basket_tokens, markets,\n", " layout.signer_nonce, layout.signer_key, layout.dex_program_id, layout.total_deposits,\n", " layout.total_borrows, maint_coll_ratio, init_coll_ratio, layout.srm_vault,\n", " layout.admin, layout.borrow_limits)\n", "\n", " @staticmethod\n", " def parse(context: Context, account_info: AccountInfo) -> \"Group\":\n", " data = account_info.data\n", " if len(data) != layouts.GROUP.sizeof():\n", " raise Exception(f\"Data length ({len(data)}) does not match expected size ({layouts.GROUP.sizeof()})\")\n", "\n", " layout = layouts.GROUP.parse(data)\n", " return Group.from_layout(layout, context, account_info)\n", "\n", " @staticmethod\n", " def load(context: Context):\n", " account_info = AccountInfo.load(context, context.group_id)\n", " if account_info is None:\n", " raise Exception(f\"Group account not found at address '{context.group_id}'\")\n", " return Group.parse(context, account_info)\n", "\n", " def price_index_of_token(self, token: Token) -> int:\n", " for index, existing in enumerate(self.basket_tokens):\n", " if existing.token == token:\n", " return index\n", " return -1\n", "\n", " def fetch_token_prices(self) -> typing.List[TokenValue]:\n", " oracles = map(lambda market: Aggregator.load(self.context, market.oracle), self.markets)\n", " prices = list(map(lambda oracle: oracle.price, oracles)) + [Decimal(1)]\n", " token_prices = []\n", " for index, price in enumerate(prices):\n", " token_prices += [TokenValue(self.basket_tokens[index].token, price)]\n", " return token_prices\n", "\n", " def fetch_balances(self, root_address: PublicKey) -> typing.List[TokenValue]:\n", " balances: typing.List[TokenValue] = []\n", " sol_balance = self.context.fetch_sol_balance(root_address)\n", " balances += [TokenValue(SolToken, sol_balance)]\n", "\n", " for basket_token in self.basket_tokens:\n", " balance = TokenValue.fetch_total_value(self.context, root_address, basket_token.token)\n", " balances += [balance]\n", " return balances\n", "\n", " def __str__(self) -> str:\n", " total_deposits = \"\\n \".join(map(str, self.total_deposits))\n", " total_borrows = \"\\n \".join(map(str, self.total_borrows))\n", " borrow_limits = \"\\n \".join(map(str, self.borrow_limits))\n", " return f\"\"\"\n", "« Group [{self.version}] {self.address}:\n", " Flags: {self.account_flags}\n", " Tokens:\n", "{self.basket_tokens}\n", " Markets:\n", "{self.markets}\n", " DEX Program ID: « {self.dex_program_id} »\n", " SRM Vault: « {self.srm_vault} »\n", " Admin: « {self.admin} »\n", " Signer Nonce: {self.signer_nonce}\n", " Signer Key: « {self.signer_key} »\n", " Initial Collateral Ratio: {self.init_coll_ratio}\n", " Maintenance Collateral Ratio: {self.maint_coll_ratio}\n", " Total Deposits:\n", " {total_deposits}\n", " Total Borrows:\n", " {total_borrows}\n", " Borrow Limits:\n", " {borrow_limits}\n", "»\n", "\"\"\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "complete-perception", "metadata": {}, "source": [ "## TokenAccount class" ] }, { "cell_type": "code", "execution_count": null, "id": "devoted-corps", "metadata": {}, "outputs": [], "source": [ "class TokenAccount(AddressableAccount):\n", " def __init__(self, account_info: AccountInfo, version: Version, mint: PublicKey, owner: PublicKey, amount: Decimal):\n", " super().__init__(account_info)\n", " self.version: Version = version\n", " self.mint: PublicKey = mint\n", " self.owner: PublicKey = owner\n", " self.amount: Decimal = amount\n", "\n", " @staticmethod\n", " def create(context: Context, account: Account, token: Token):\n", " spl_token = SplToken(context.client, token.mint, TOKEN_PROGRAM_ID, account)\n", " owner = account.public_key()\n", " new_account_address = spl_token.create_account(owner)\n", " return TokenAccount.load(context, new_account_address)\n", "\n", " @staticmethod\n", " def fetch_all_for_owner_and_token(context: Context, owner_public_key: PublicKey, token: Token) -> typing.List[\"TokenAccount\"]:\n", " opts = TokenAccountOpts(mint=token.mint)\n", "\n", " token_accounts_response = context.client.get_token_accounts_by_owner(owner_public_key, opts, commitment=context.commitment)\n", "\n", " all_accounts: typing.List[TokenAccount] = []\n", " for token_account_response in token_accounts_response[\"result\"][\"value\"]:\n", " account_info = AccountInfo._from_response_values(token_account_response[\"account\"], PublicKey(token_account_response[\"pubkey\"]))\n", " token_account = TokenAccount.parse(account_info)\n", " all_accounts += [token_account]\n", "\n", " return all_accounts\n", "\n", " @staticmethod\n", " def fetch_largest_for_owner_and_token(context: Context, owner_public_key: PublicKey, token: Token) -> typing.Optional[\"TokenAccount\"]:\n", " all_accounts = TokenAccount.fetch_all_for_owner_and_token(context, owner_public_key, token)\n", "\n", " largest_account: typing.Optional[TokenAccount] = None\n", " for token_account in all_accounts:\n", " if largest_account is None or token_account.amount > largest_account.amount:\n", " largest_account = token_account\n", "\n", " return largest_account\n", "\n", " @staticmethod\n", " def fetch_or_create_largest_for_owner_and_token(context: Context, account: Account, token: Token) -> \"TokenAccount\":\n", " all_accounts = TokenAccount.fetch_all_for_owner_and_token(context, account.public_key(), token)\n", "\n", " largest_account: typing.Optional[TokenAccount] = None\n", " for token_account in all_accounts:\n", " if largest_account is None or token_account.amount > largest_account.amount:\n", " largest_account = token_account\n", "\n", " if largest_account is None:\n", " return TokenAccount.create(context, account, token)\n", "\n", " return largest_account\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.TOKEN_ACCOUNT, account_info: AccountInfo) -> \"TokenAccount\":\n", " return TokenAccount(account_info, Version.UNSPECIFIED, layout.mint, layout.owner, layout.amount)\n", "\n", " @staticmethod\n", " def parse(account_info: AccountInfo) -> \"TokenAccount\":\n", " data = account_info.data\n", " if len(data) != layouts.TOKEN_ACCOUNT.sizeof():\n", " raise Exception(f\"Data length ({len(data)}) does not match expected size ({layouts.TOKEN_ACCOUNT.sizeof()})\")\n", "\n", " layout = layouts.TOKEN_ACCOUNT.parse(data)\n", " return TokenAccount.from_layout(layout, account_info)\n", "\n", " @staticmethod\n", " def load(context: Context, address: PublicKey) -> typing.Optional[\"TokenAccount\"]:\n", " account_info = AccountInfo.load(context, address)\n", " if account_info is None or (len(account_info.data) != layouts.TOKEN_ACCOUNT.sizeof()):\n", " return None\n", " return TokenAccount.parse(account_info)\n", "\n", " def __str__(self) -> str:\n", " return f\"« Token: Mint: {self.mint}, Owner: {self.owner}, Amount: {self.amount} »\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "paperback-collective", "metadata": {}, "source": [ "## OpenOrders class\n" ] }, { "cell_type": "code", "execution_count": null, "id": "composite-pickup", "metadata": {}, "outputs": [], "source": [ "class OpenOrders(AddressableAccount):\n", " def __init__(self, account_info: AccountInfo, version: Version, program_id: PublicKey,\n", " account_flags: SerumAccountFlags, market: PublicKey, owner: PublicKey,\n", " base_token_free: Decimal, base_token_total: Decimal, quote_token_free: Decimal,\n", " quote_token_total: Decimal, free_slot_bits: Decimal, is_bid_bits: Decimal,\n", " orders: typing.List[Decimal], client_ids: typing.List[Decimal],\n", " referrer_rebate_accrued: Decimal):\n", " super().__init__(account_info)\n", " self.version: Version = version\n", " self.program_id: PublicKey = program_id\n", " self.account_flags: SerumAccountFlags = account_flags\n", " self.market: PublicKey = market\n", " self.owner: PublicKey = owner\n", " self.base_token_free: Decimal = base_token_free\n", " self.base_token_total: Decimal = base_token_total\n", " self.quote_token_free: Decimal = quote_token_free\n", " self.quote_token_total: Decimal = quote_token_total\n", " self.free_slot_bits: Decimal = free_slot_bits\n", " self.is_bid_bits: Decimal = is_bid_bits\n", " self.orders: typing.List[Decimal] = orders\n", " self.client_ids: typing.List[Decimal] = client_ids\n", " self.referrer_rebate_accrued: Decimal = referrer_rebate_accrued\n", "\n", " # Sometimes pyserum wants to take its own OpenOrdersAccount as a parameter (e.g. in settle_funds())\n", " def to_pyserum(self) -> OpenOrdersAccount:\n", " return OpenOrdersAccount.from_bytes(self.address, self.account_info.data)\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.OPEN_ORDERS, account_info: AccountInfo,\n", " base_decimals: Decimal, quote_decimals: Decimal) -> \"OpenOrders\":\n", " account_flags = SerumAccountFlags.from_layout(layout.account_flags)\n", " program_id = account_info.owner\n", "\n", " base_divisor = 10 ** base_decimals\n", " quote_divisor = 10 ** quote_decimals\n", " base_token_free: Decimal = layout.base_token_free / base_divisor\n", " base_token_total: Decimal = layout.base_token_total / base_divisor\n", " quote_token_free: Decimal = layout.quote_token_free / quote_divisor\n", " quote_token_total: Decimal = layout.quote_token_total / quote_divisor\n", " nonzero_orders: typing.List[Decimal] = list([order for order in layout.orders if order != 0])\n", " nonzero_client_ids: typing.List[Decimal] = list([client_id for client_id in layout.client_ids if client_id != 0])\n", "\n", " return OpenOrders(account_info, Version.UNSPECIFIED, program_id, account_flags, layout.market,\n", " layout.owner, base_token_free, base_token_total, quote_token_free, quote_token_total,\n", " layout.free_slot_bits, layout.is_bid_bits, nonzero_orders, nonzero_client_ids,\n", " layout.referrer_rebate_accrued)\n", "\n", " @staticmethod\n", " def parse(account_info: AccountInfo, base_decimals: Decimal, quote_decimals: Decimal) -> \"OpenOrders\":\n", " data = account_info.data\n", " if len(data) != layouts.OPEN_ORDERS.sizeof():\n", " raise Exception(f\"Data length ({len(data)}) does not match expected size ({layouts.OPEN_ORDERS.sizeof()})\")\n", "\n", " layout = layouts.OPEN_ORDERS.parse(data)\n", " return OpenOrders.from_layout(layout, account_info, base_decimals, quote_decimals)\n", "\n", " @staticmethod\n", " def load_raw_open_orders_account_infos(context: Context, group: Group) -> typing.Dict[str, AccountInfo]:\n", " filters = [\n", " MemcmpOpts(\n", " offset=layouts.SERUM_ACCOUNT_FLAGS.sizeof() + 37,\n", " bytes=encode_key(group.signer_key)\n", " )\n", " ]\n", "\n", " response = context.client.get_program_accounts(group.dex_program_id, data_size=layouts.OPEN_ORDERS.sizeof(), memcmp_opts=filters, commitment=Single, encoding=\"base64\")\n", " account_infos = list(map(lambda pair: AccountInfo._from_response_values(pair[0], pair[1]), [(result[\"account\"], PublicKey(result[\"pubkey\"])) for result in response[\"result\"]]))\n", " account_infos_by_address = {key: value for key, value in [(str(account_info.address), account_info) for account_info in account_infos]}\n", " return account_infos_by_address\n", "\n", " @staticmethod\n", " def load(context: Context, address: PublicKey, base_decimals: Decimal, quote_decimals: Decimal) -> \"OpenOrders\":\n", " open_orders_account = AccountInfo.load(context, address)\n", " if open_orders_account is None:\n", " raise Exception(f\"OpenOrders account not found at address '{address}'\")\n", " return OpenOrders.parse(open_orders_account, base_decimals, quote_decimals)\n", "\n", " @staticmethod\n", " def load_for_market_and_owner(context: Context, market: PublicKey, owner: PublicKey, program_id: PublicKey, base_decimals: Decimal, quote_decimals: Decimal):\n", " filters = [\n", " MemcmpOpts(\n", " offset=layouts.SERUM_ACCOUNT_FLAGS.sizeof() + 5,\n", " bytes=encode_key(market)\n", " ),\n", " MemcmpOpts(\n", " offset=layouts.SERUM_ACCOUNT_FLAGS.sizeof() + 37,\n", " bytes=encode_key(owner)\n", " )\n", " ]\n", "\n", " response = context.client.get_program_accounts(context.dex_program_id, data_size=layouts.OPEN_ORDERS.sizeof(), memcmp_opts=filters, commitment=Single, encoding=\"base64\")\n", " accounts = list(map(lambda pair: AccountInfo._from_response_values(pair[0], pair[1]), [(result[\"account\"], PublicKey(result[\"pubkey\"])) for result in response[\"result\"]]))\n", " return list(map(lambda acc: OpenOrders.parse(acc, base_decimals, quote_decimals), accounts))\n", "\n", " def __str__(self) -> str:\n", " orders = \", \".join(map(str, self.orders)) or \"None\"\n", " client_ids = \", \".join(map(str, self.client_ids)) or \"None\"\n", "\n", " return f\"\"\"« OpenOrders:\n", " Flags: {self.account_flags}\n", " Program ID: {self.program_id}\n", " Address: {self.address}\n", " Market: {self.market}\n", " Owner: {self.owner}\n", " Base Token: {self.base_token_free:,.8f} of {self.base_token_total:,.8f}\n", " Quote Token: {self.quote_token_free:,.8f} of {self.quote_token_total:,.8f}\n", " Referrer Rebate Accrued: {self.referrer_rebate_accrued}\n", " Orders:\n", " {orders}\n", " Client IDs:\n", " {client_ids}\n", "»\"\"\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "reported-preparation", "metadata": {}, "source": [ "## BalanceSheet class" ] }, { "cell_type": "code", "execution_count": null, "id": "raised-sound", "metadata": {}, "outputs": [], "source": [ "class BalanceSheet:\n", " def __init__(self, token: Token, liabilities: Decimal, settled_assets: Decimal, unsettled_assets: Decimal):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.token: Token = token\n", " self.liabilities: Decimal = liabilities\n", " self.settled_assets: Decimal = settled_assets\n", " self.unsettled_assets: Decimal = unsettled_assets\n", "\n", " @property\n", " def assets(self) -> Decimal:\n", " return self.settled_assets + self.unsettled_assets\n", "\n", " @property\n", " def value(self) -> Decimal:\n", " return self.assets - self.liabilities\n", "\n", " @property\n", " def collateral_ratio(self) -> Decimal:\n", " if self.liabilities == Decimal(0):\n", " return Decimal(0)\n", " return self.assets / self.liabilities\n", "\n", " def __str__(self) -> str:\n", " name = \"«Unspecified»\"\n", " if self.token is not None:\n", " name = self.token.name\n", "\n", " return f\"\"\"« BalanceSheet [{name}]:\n", " Assets : {self.assets:>18,.8f}\n", " Settled Assets : {self.settled_assets:>18,.8f}\n", " Unsettled Assets : {self.unsettled_assets:>18,.8f}\n", " Liabilities : {self.liabilities:>18,.8f}\n", " Value : {self.value:>18,.8f}\n", " Collateral Ratio : {self.collateral_ratio:>18,.2%}\n", "»\n", "\"\"\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "juvenile-companion", "metadata": {}, "source": [ "## MarginAccount class\n" ] }, { "cell_type": "code", "execution_count": null, "id": "exceptional-interstate", "metadata": {}, "outputs": [], "source": [ "class MarginAccount(AddressableAccount):\n", " def __init__(self, account_info: AccountInfo, version: Version, account_flags: MangoAccountFlags,\n", " mango_group: PublicKey, owner: PublicKey, deposits: typing.List[Decimal],\n", " borrows: typing.List[Decimal], open_orders: typing.List[PublicKey]):\n", " super().__init__(account_info)\n", " self.version: Version = version\n", " self.account_flags: MangoAccountFlags = account_flags\n", " self.mango_group: PublicKey = mango_group\n", " self.owner: PublicKey = owner\n", " self.deposits: typing.List[Decimal] = deposits\n", " self.borrows: typing.List[Decimal] = borrows\n", " self.open_orders: typing.List[PublicKey] = open_orders\n", " self.open_orders_accounts: typing.List[typing.Optional[OpenOrders]] = [None] * NUM_MARKETS\n", "\n", " @staticmethod\n", " def from_layout(layout: layouts.MARGIN_ACCOUNT, account_info: AccountInfo) -> \"MarginAccount\":\n", " account_flags: MangoAccountFlags = MangoAccountFlags.from_layout(layout.account_flags)\n", " deposits: typing.List[Decimal] = []\n", " for index, deposit in enumerate(layout.deposits):\n", " deposits += [deposit]\n", "\n", " borrows: typing.List[Decimal] = []\n", " for index, borrow in enumerate(layout.borrows):\n", " borrows += [borrow]\n", "\n", " return MarginAccount(account_info, Version.UNSPECIFIED, account_flags, layout.mango_group,\n", " layout.owner, deposits, borrows, list(layout.open_orders))\n", "\n", " @staticmethod\n", " def parse(account_info: AccountInfo) -> \"MarginAccount\":\n", " data = account_info.data\n", " if len(data) != layouts.MARGIN_ACCOUNT.sizeof():\n", " raise Exception(f\"Data length ({len(data)}) does not match expected size ({layouts.MARGIN_ACCOUNT.sizeof()})\")\n", "\n", " layout = layouts.MARGIN_ACCOUNT.parse(data)\n", " return MarginAccount.from_layout(layout, account_info)\n", "\n", " @staticmethod\n", " def load(context: Context, margin_account_address: PublicKey, group: typing.Optional[Group] = None) -> \"MarginAccount\":\n", " account_info = AccountInfo.load(context, margin_account_address)\n", " if account_info is None:\n", " raise Exception(f\"MarginAccount account not found at address '{margin_account_address}'\")\n", " margin_account = MarginAccount.parse(account_info)\n", " if group is None:\n", " group = Group.load(context)\n", " margin_account.load_open_orders_accounts(context, group)\n", " return margin_account\n", "\n", " @staticmethod\n", " def load_all_for_group(context: Context, program_id: PublicKey, group: Group) -> typing.List[\"MarginAccount\"]:\n", " filters = [\n", " MemcmpOpts(\n", " offset=layouts.MANGO_ACCOUNT_FLAGS.sizeof(), # mango_group is just after the MangoAccountFlags, which is the first entry\n", " bytes=encode_key(group.address)\n", " )\n", " ]\n", " response = context.client.get_program_accounts(program_id, data_size=layouts.MARGIN_ACCOUNT.sizeof(), memcmp_opts=filters, commitment=Single, encoding=\"base64\")\n", " margin_accounts = []\n", " for margin_account_data in response[\"result\"]:\n", " address = PublicKey(margin_account_data[\"pubkey\"])\n", " account = AccountInfo._from_response_values(margin_account_data[\"account\"], address)\n", " margin_account = MarginAccount.parse(account)\n", " margin_accounts += [margin_account]\n", " return margin_accounts\n", "\n", " @staticmethod\n", " def load_all_for_owner(context: Context, owner: PublicKey, group: typing.Optional[Group] = None) -> typing.List[\"MarginAccount\"]:\n", " if group is None:\n", " group = Group.load(context)\n", "\n", " mango_group_offset = layouts.MANGO_ACCOUNT_FLAGS.sizeof() # mango_group is just after the MangoAccountFlags, which is the first entry.\n", " owner_offset = mango_group_offset + 32 # owner is just after mango_group in the layout, and it's a PublicKey which is 32 bytes.\n", " filters = [\n", " MemcmpOpts(\n", " offset=mango_group_offset,\n", " bytes=encode_key(group.address)\n", " ),\n", " MemcmpOpts(\n", " offset=owner_offset,\n", " bytes=encode_key(owner)\n", " )\n", " ]\n", "\n", " response = context.client.get_program_accounts(context.program_id, data_size=layouts.MARGIN_ACCOUNT.sizeof(), memcmp_opts=filters, commitment=Single, encoding=\"base64\")\n", " margin_accounts = []\n", " for margin_account_data in response[\"result\"]:\n", " address = PublicKey(margin_account_data[\"pubkey\"])\n", " account = AccountInfo._from_response_values(margin_account_data[\"account\"], address)\n", " margin_account = MarginAccount.parse(account)\n", " margin_account.load_open_orders_accounts(context, group)\n", " margin_accounts += [margin_account]\n", " return margin_accounts\n", "\n", " def load_open_orders_accounts(self, context: Context, group: Group) -> None:\n", " for index, oo in enumerate(self.open_orders):\n", " key = oo\n", " if key != SYSTEM_PROGRAM_ADDRESS:\n", " self.open_orders_accounts[index] = OpenOrders.load(context, key, group.basket_tokens[index].token.decimals, group.shared_quote_token.token.decimals)\n", "\n", " def install_open_orders_accounts(self, group: Group, all_open_orders_by_address: typing.Dict[str, AccountInfo]) -> None:\n", " for index, oo in enumerate(self.open_orders):\n", " key = str(oo)\n", " if key in all_open_orders_by_address:\n", " open_orders_account_info = all_open_orders_by_address[key]\n", " open_orders = OpenOrders.parse(open_orders_account_info,\n", " group.basket_tokens[index].token.decimals,\n", " group.shared_quote_token.token.decimals)\n", " self.open_orders_accounts[index] = open_orders\n", "\n", " def get_intrinsic_balance_sheets(self, group: Group) -> typing.List[BalanceSheet]:\n", " settled_assets: typing.List[Decimal] = [Decimal(0)] * NUM_TOKENS\n", " liabilities: typing.List[Decimal] = [Decimal(0)] * NUM_TOKENS\n", " for index in range(NUM_TOKENS):\n", " settled_assets[index] = group.basket_tokens[index].index.deposit * self.deposits[index]\n", " liabilities[index] = group.basket_tokens[index].index.borrow * self.borrows[index]\n", "\n", " unsettled_assets: typing.List[Decimal] = [Decimal(0)] * NUM_TOKENS\n", " for index in range(NUM_MARKETS):\n", " open_orders_account = self.open_orders_accounts[index]\n", " if open_orders_account is not None:\n", " unsettled_assets[index] += open_orders_account.base_token_total\n", " unsettled_assets[NUM_TOKENS - 1] += open_orders_account.quote_token_total\n", "\n", " balance_sheets: typing.List[BalanceSheet] = []\n", " for index in range(NUM_TOKENS):\n", " balance_sheets += [BalanceSheet(group.basket_tokens[index].token, liabilities[index],\n", " settled_assets[index], unsettled_assets[index])]\n", "\n", " return balance_sheets\n", "\n", " def get_priced_balance_sheets(self, group: Group, prices: typing.List[TokenValue]) -> typing.List[BalanceSheet]:\n", " priced: typing.List[BalanceSheet] = []\n", " balance_sheets = self.get_intrinsic_balance_sheets(group)\n", " for balance_sheet in balance_sheets:\n", " price = TokenValue.find_by_token(prices, balance_sheet.token)\n", " liabilities = balance_sheet.liabilities * price.value\n", " settled_assets = balance_sheet.settled_assets * price.value\n", " unsettled_assets = balance_sheet.unsettled_assets * price.value\n", " priced += [BalanceSheet(\n", " price.token,\n", " price.token.round(liabilities),\n", " price.token.round(settled_assets),\n", " price.token.round(unsettled_assets)\n", " )]\n", "\n", " return priced\n", "\n", " def get_balance_sheet_totals(self, group: Group, prices: typing.List[TokenValue]) -> BalanceSheet:\n", " liabilities = Decimal(0)\n", " settled_assets = Decimal(0)\n", " unsettled_assets = Decimal(0)\n", "\n", " balance_sheets = self.get_priced_balance_sheets(group, prices)\n", " for balance_sheet in balance_sheets:\n", " if balance_sheet is not None:\n", " liabilities += balance_sheet.liabilities\n", " settled_assets += balance_sheet.settled_assets\n", " unsettled_assets += balance_sheet.unsettled_assets\n", "\n", " # A BalanceSheet must have a token - it's a pain to make it a typing.Optional[Token].\n", " # So in this one case, we produce a 'fake' token whose symbol is a summary of all token\n", " # symbols that went into it.\n", " #\n", " # If this becomes more painful than typing.Optional[Token], we can go with making\n", " # Token optional.\n", " summary_name = \"-\".join([bal.token.name for bal in balance_sheets])\n", " summary_token = Token(summary_name, SYSTEM_PROGRAM_ADDRESS, Decimal(0))\n", " return BalanceSheet(summary_token, liabilities, settled_assets, unsettled_assets)\n", "\n", " def get_intrinsic_balances(self, group: Group) -> typing.List[TokenValue]:\n", " balance_sheets = self.get_intrinsic_balance_sheets(group)\n", " balances: typing.List[TokenValue] = []\n", " for index, balance_sheet in enumerate(balance_sheets):\n", " if balance_sheet.token is None:\n", " raise Exception(f\"Intrinsic balance sheet with index [{index}] has no token.\")\n", " balances += [TokenValue(balance_sheet.token, balance_sheet.value)]\n", "\n", " return balances\n", "\n", " def __str__(self) -> str:\n", " deposits = \", \".join([f\"{item:,.8f}\" for item in self.deposits])\n", " borrows = \", \".join([f\"{item:,.8f}\" for item in self.borrows])\n", " if all(oo is None for oo in self.open_orders_accounts):\n", " open_orders = f\"{self.open_orders}\"\n", " else:\n", " open_orders_unindented = f\"{self.open_orders_accounts}\"\n", " open_orders = open_orders_unindented.replace(\"\\n\", \"\\n \")\n", " return f\"\"\"« MarginAccount: {self.address}\n", " Flags: {self.account_flags}\n", " Owner: {self.owner}\n", " Mango Group: {self.mango_group}\n", " Deposits: [{deposits}]\n", " Borrows: [{borrows}]\n", " Mango Open Orders: {open_orders}\n", "»\"\"\"\n", "\n", " def __repr__(self) -> str:\n", " return f\"{self}\"\n" ] }, { "cell_type": "markdown", "id": "polar-converter", "metadata": {}, "source": [ "## MarginAccountMetadata class" ] }, { "cell_type": "code", "execution_count": null, "id": "large-klein", "metadata": {}, "outputs": [], "source": [ "class MarginAccountMetadata:\n", " def __init__(self, margin_account: MarginAccount, balance_sheet: BalanceSheet, balances: typing.List[TokenValue]):\n", " self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)\n", " self.margin_account = margin_account\n", " self.balance_sheet = balance_sheet\n", " self.balances = balances\n", "\n", " @property\n", " def assets(self):\n", " return self.balance_sheet.assets\n", "\n", " @property\n", " def liabilities(self):\n", " return self.balance_sheet.liabilities\n", "\n", " @property\n", " def collateral_ratio(self):\n", " return self.balance_sheet.collateral_ratio\n" ] }, { "cell_type": "markdown", "id": "concerned-personal", "metadata": {}, "source": [ "# 🏃 Running" ] }, { "cell_type": "code", "execution_count": null, "id": "generous-thickness", "metadata": {}, "outputs": [], "source": [ "if __name__ == \"__main__\":\n", " logging.getLogger().setLevel(logging.INFO)\n", "\n", " import base64\n", " from Context import default_context\n", "\n", " # Just use any public key here\n", " fake_public_key = SYSTEM_PROGRAM_ADDRESS\n", " encoded = \"AwAAAAAAAACCaOmpoURMK6XHelGTaFawcuQ/78/15LAemWI8jrt3SRKLy2R9i60eclDjuDS8+p/ZhvTUd9G7uQVOYCsR6+BhmqGCiO6EPYP2PQkf/VRTvw7JjXvIjPFJy06QR1Cq1WfTonHl0OjCkyEf60SD07+MFJu5pVWNFGGEO/8AiAYfduaKdnFTaZEHPcK5Eq72WWHeHg2yIbBF09kyeOhlCJwOoG8O5SgpPV8QOA64ZNV4aKroFfADg6kEy/wWCdp3fv0O4GJgAAAAAPH6Ud6jtjwAAQAAAAAAAADiDkkCi9UOAAEAAAAAAAAADuBiYAAAAACNS5bSy7soAAEAAAAAAAAACMvgO+2jCwABAAAAAAAAAA7gYmAAAAAAZFeDUBNVhwABAAAAAAAAABtRNytozC8AAQAAAAAAAABIBGiCcyaEZdNhrTyeqUY692vOzzPdHaxAxguht3JQGlkzjtd05dX9LENHkl2z1XvUbTNKZlweypNRetmH0lmQ9VYQAHqylxZVK65gEg85g27YuSyvOBZAjJyRmYU9KdCO1D+4ehdPu9dQB1yI1uh75wShdAaFn2o4qrMYwq3SQQEAAAAAAAAAAiH1PPJKAuh6oGiE35aGhUQhFi/bxgKOudpFv8HEHNCFDy1uAqR6+CTQmradxC1wyyjL+iSft+5XudJWwSdi7wvphsxb96x7Obj/AgAAAAAKlV4LL5ow6r9LMhIAAAAADvsOtqcVFmChDPzPnwAAAE33lx1h8hPFD04AAAAAAAA8YRV3Oa309B2wGwAAAAAA+yPBZRlZz7b605n+AQAAAACgmZmZmZkZAQAAAAAAAAAAMDMzMzMzMwEAAAAAAAAA25D1XcAtRzSuuyx3U+X7aE9vM1EJySU9KprgL0LMJ/vat9+SEEUZuga7O5tTUrcMDYWDg+LYaAWhSQiN2fYk7aCGAQAAAAAAgIQeAAAAAAAA8gUqAQAAAAYGBgICAAAA\"\n", " decoded = base64.b64decode(encoded)\n", " group_account_info = AccountInfo(fake_public_key, False, Decimal(0), fake_public_key, Decimal(0), decoded)\n", "\n", " group = Group.parse(default_context, group_account_info)\n", " print(\"\\n\\nThis is hard-coded, not live information!\")\n", " print(group)\n", "\n", " print(TokenLookup.find_by_name(default_context, \"ETH\"))\n", " print(TokenLookup.find_by_name(default_context, \"BTC\"))\n", "\n", " # USDT\n", " print(TokenLookup.find_by_mint(default_context, PublicKey(\"Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB\")))\n", "\n", " single_account_info = AccountInfo.load(default_context, default_context.dex_program_id)\n", " print(\"DEX account info\", single_account_info)\n", "\n", " multiple_account_info = AccountInfo.load_multiple(default_context, [default_context.program_id, default_context.dex_program_id])\n", " print(\"Mango program and DEX account info\", multiple_account_info)\n", "\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.6" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": true }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false } }, "nbformat": 4, "nbformat_minor": 5 }