151 lines
6.9 KiB
Python
151 lines
6.9 KiB
Python
# # ⚠ Warning
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
|
# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|
# NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
|
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
#
|
|
# [🥭 Mango Markets](https://mango.markets/) support is available at:
|
|
# [Docs](https://docs.mango.markets/)
|
|
# [Discord](https://discord.gg/67jySBhxrg)
|
|
# [Twitter](https://twitter.com/mangomarkets)
|
|
# [Github](https://github.com/blockworks-foundation)
|
|
# [Email](mailto:hello@blockworks.foundation)
|
|
|
|
|
|
import typing
|
|
import spl.token.instructions as spl_token
|
|
|
|
from solana.keypair import Keypair
|
|
from solana.publickey import PublicKey
|
|
from solana.rpc.types import TokenAccountOpts
|
|
from spl.token.client import Token as SplToken
|
|
from spl.token.constants import TOKEN_PROGRAM_ID
|
|
|
|
from .accountinfo import AccountInfo
|
|
from .addressableaccount import AddressableAccount
|
|
from .combinableinstructions import CombinableInstructions
|
|
from .context import Context
|
|
from .layouts import layouts
|
|
from .token import Token
|
|
from .tokenlookup import TokenLookup
|
|
from .tokenvalue import TokenValue
|
|
from .version import Version
|
|
from .wallet import Wallet
|
|
|
|
# # 🥭 TokenAccount class
|
|
#
|
|
|
|
|
|
class TokenAccount(AddressableAccount):
|
|
def __init__(self, account_info: AccountInfo, version: Version, owner: PublicKey, value: TokenValue):
|
|
super().__init__(account_info)
|
|
self.version: Version = version
|
|
self.owner: PublicKey = owner
|
|
self.value: TokenValue = value
|
|
|
|
@staticmethod
|
|
def create(context: Context, account: Keypair, token: Token):
|
|
spl_token = SplToken(context.client.compatible_client, token.mint, TOKEN_PROGRAM_ID, account)
|
|
owner = account.public_key
|
|
new_account_address = spl_token.create_account(owner)
|
|
return TokenAccount.load(context, new_account_address)
|
|
|
|
@staticmethod
|
|
def fetch_all_for_owner_and_token(context: Context, owner_public_key: PublicKey, token: Token) -> typing.Sequence["TokenAccount"]:
|
|
opts = TokenAccountOpts(mint=token.mint)
|
|
|
|
token_accounts = context.client.get_token_accounts_by_owner(owner_public_key, opts)
|
|
|
|
all_accounts: typing.List[TokenAccount] = []
|
|
for token_account_response in token_accounts:
|
|
account_info = AccountInfo._from_response_values(
|
|
token_account_response["account"], PublicKey(token_account_response["pubkey"]))
|
|
token_account = TokenAccount.parse(account_info, token)
|
|
all_accounts += [token_account]
|
|
|
|
return all_accounts
|
|
|
|
@staticmethod
|
|
def fetch_largest_for_owner_and_token(context: Context, owner_public_key: PublicKey, token: Token) -> typing.Optional["TokenAccount"]:
|
|
all_accounts = TokenAccount.fetch_all_for_owner_and_token(context, owner_public_key, token)
|
|
|
|
largest_account: typing.Optional[TokenAccount] = None
|
|
for token_account in all_accounts:
|
|
if largest_account is None or token_account.value.value > largest_account.value.value:
|
|
largest_account = token_account
|
|
|
|
return largest_account
|
|
|
|
@staticmethod
|
|
def fetch_or_create_largest_for_owner_and_token(context: Context, account: Keypair, token: Token) -> "TokenAccount":
|
|
all_accounts = TokenAccount.fetch_all_for_owner_and_token(context, account.public_key, token)
|
|
|
|
largest_account: typing.Optional[TokenAccount] = None
|
|
for token_account in all_accounts:
|
|
if largest_account is None or token_account.value.value > largest_account.value.value:
|
|
largest_account = token_account
|
|
|
|
if largest_account is None:
|
|
return TokenAccount.create(context, account, token)
|
|
|
|
return largest_account
|
|
|
|
@staticmethod
|
|
def find_or_create_token_address_to_use(context: Context, wallet: Wallet, owner: PublicKey, token: Token) -> PublicKey:
|
|
# This is a root wallet account - get the token account to use.
|
|
associated_token_address = spl_token.get_associated_token_address(owner, token.mint)
|
|
token_account: typing.Optional[TokenAccount] = TokenAccount.load(context, associated_token_address)
|
|
if token_account is not None:
|
|
# The associated token account exists so use it
|
|
return associated_token_address
|
|
|
|
# There is no associated token account. See if they have an old-style non-associated token account.
|
|
largest = TokenAccount.fetch_largest_for_owner_and_token(context, owner, token)
|
|
if largest is not None:
|
|
# There is an old-style account so use that.
|
|
return largest.address
|
|
|
|
# There is no old-style token account either, so create the proper associated token account.
|
|
signer = CombinableInstructions.from_wallet(wallet)
|
|
create_instruction = spl_token.create_associated_token_account(wallet.address, owner, token.mint)
|
|
create = CombinableInstructions.from_instruction(create_instruction)
|
|
|
|
transaction_ids = (signer + create).execute(context)
|
|
context.client.wait_for_confirmation(transaction_ids)
|
|
|
|
return associated_token_address
|
|
|
|
@staticmethod
|
|
def from_layout(layout: typing.Any, account_info: AccountInfo, token: Token) -> "TokenAccount":
|
|
token_value = TokenValue(token, token.shift_to_decimals(layout.amount))
|
|
return TokenAccount(account_info, Version.UNSPECIFIED, layout.owner, token_value)
|
|
|
|
@staticmethod
|
|
def parse(account_info: AccountInfo, token: typing.Optional[Token] = None, token_lookup: typing.Optional[TokenLookup] = None) -> "TokenAccount":
|
|
data = account_info.data
|
|
if len(data) != layouts.TOKEN_ACCOUNT.sizeof():
|
|
raise Exception(
|
|
f"Data length ({len(data)}) does not match expected size ({layouts.TOKEN_ACCOUNT.sizeof()})")
|
|
|
|
layout = layouts.TOKEN_ACCOUNT.parse(data)
|
|
if token is None:
|
|
if token_lookup is None:
|
|
raise Exception("Neither 'Token' or 'TokenLookup' specified for parsing token data.")
|
|
token = token_lookup.find_by_mint(layout.mint)
|
|
if token is None:
|
|
raise Exception(f"Could not find token data for token with mint '{layout.mint}'")
|
|
|
|
return TokenAccount.from_layout(layout, account_info, token)
|
|
|
|
@staticmethod
|
|
def load(context: Context, address: PublicKey) -> typing.Optional["TokenAccount"]:
|
|
account_info = AccountInfo.load(context, address)
|
|
if account_info is None or (len(account_info.data) != layouts.TOKEN_ACCOUNT.sizeof()):
|
|
return None
|
|
return TokenAccount.parse(account_info, token_lookup=context.token_lookup)
|
|
|
|
def __str__(self) -> str:
|
|
return f"« Token: Address: {self.address}, Owner: {self.owner}, Value: {self.value} »"
|