551 lines
19 KiB
Python
551 lines
19 KiB
Python
# python3 -m pip install pycryptodomex uvarint pyteal web3 coincurve
|
|
|
|
import sys
|
|
sys.path.append("..")
|
|
|
|
from admin import PortalCore, Account
|
|
from gentest import GenTest
|
|
from base64 import b64decode
|
|
|
|
from typing import List, Tuple, Dict, Any, Optional, Union
|
|
import base64
|
|
import random
|
|
import time
|
|
import hashlib
|
|
import uuid
|
|
import json
|
|
|
|
from algosdk.v2client.algod import AlgodClient
|
|
from algosdk.kmd import KMDClient
|
|
from algosdk import account, mnemonic
|
|
from algosdk.encoding import decode_address, encode_address
|
|
from algosdk.future import transaction
|
|
from pyteal import compileTeal, Mode, Expr
|
|
from pyteal import *
|
|
from algosdk.logic import get_application_address
|
|
from vaa_verify import get_vaa_verify
|
|
|
|
from algosdk.future.transaction import LogicSig
|
|
|
|
from test_contract import get_test_app
|
|
|
|
from algosdk.v2client import indexer
|
|
|
|
import pprint
|
|
|
|
class AlgoTest(PortalCore):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def getBalances(self, client: AlgodClient, account: str) -> Dict[int, int]:
|
|
balances: Dict[int, int] = dict()
|
|
|
|
accountInfo = client.account_info(account)
|
|
|
|
# set key 0 to Algo balance
|
|
balances[0] = accountInfo["amount"]
|
|
|
|
assets: List[Dict[str, Any]] = accountInfo.get("assets", [])
|
|
for assetHolding in assets:
|
|
assetID = assetHolding["asset-id"]
|
|
amount = assetHolding["amount"]
|
|
balances[assetID] = amount
|
|
|
|
return balances
|
|
|
|
def createTestApp(
|
|
self,
|
|
client: AlgodClient,
|
|
sender: Account,
|
|
) -> int:
|
|
approval, clear = get_test_app(client)
|
|
|
|
globalSchema = transaction.StateSchema(num_uints=4, num_byte_slices=30)
|
|
localSchema = transaction.StateSchema(num_uints=0, num_byte_slices=16)
|
|
|
|
app_args = []
|
|
|
|
txn = transaction.ApplicationCreateTxn(
|
|
sender=sender.getAddress(),
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
approval_program=b64decode(approval["result"]),
|
|
clear_program=b64decode(clear["result"]),
|
|
global_schema=globalSchema,
|
|
local_schema=localSchema,
|
|
app_args=app_args,
|
|
sp=client.suggested_params(),
|
|
)
|
|
|
|
signedTxn = txn.sign(sender.getPrivateKey())
|
|
|
|
client.send_transaction(signedTxn)
|
|
|
|
response = self.waitForTransaction(client, signedTxn.get_txid())
|
|
assert response.applicationIndex is not None and response.applicationIndex > 0
|
|
|
|
txn = transaction.PaymentTxn(sender = sender.getAddress(), sp = client.suggested_params(),
|
|
receiver = get_application_address(response.applicationIndex), amt = 300000)
|
|
signedTxn = txn.sign(sender.getPrivateKey())
|
|
client.send_transaction(signedTxn)
|
|
|
|
return response.applicationIndex
|
|
|
|
def parseSeqFromLog(self, txn):
|
|
try:
|
|
return int.from_bytes(b64decode(txn.innerTxns[-1]["logs"][0]), "big")
|
|
except Exception as err:
|
|
pprint.pprint(txn.__dict__)
|
|
raise
|
|
|
|
def getVAA(self, client, sender, sid, app):
|
|
if sid == None:
|
|
raise Exception("getVAA called with a sid of None")
|
|
|
|
saddr = get_application_address(app)
|
|
|
|
# SOOO, we send a nop txn through to push the block forward
|
|
# one
|
|
|
|
# This is ONLY needed on a local net... the indexer will sit
|
|
# on the last block for 30 to 60 seconds... we don't want this
|
|
# log in prod since it is wasteful of gas
|
|
|
|
if (self.INDEXER_ROUND > 512 and not self.args.testnet): # until they fix it
|
|
print("indexer is broken in local net... stop/clean/restart the sandbox")
|
|
sys.exit(0)
|
|
|
|
txns = []
|
|
|
|
txns.append(
|
|
transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=self.tokenid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"nop"],
|
|
sp=client.suggested_params(),
|
|
)
|
|
)
|
|
self.sendTxn(client, sender, txns, False)
|
|
|
|
if self.myindexer == None:
|
|
print("indexer address: " + self.INDEXER_ADDRESS)
|
|
self.myindexer = indexer.IndexerClient(indexer_token=self.INDEXER_TOKEN, indexer_address=self.INDEXER_ADDRESS)
|
|
|
|
while True:
|
|
nexttoken = ""
|
|
while True:
|
|
response = self.myindexer.search_transactions( min_round=self.INDEXER_ROUND, note_prefix=self.NOTE_PREFIX, next_page=nexttoken)
|
|
# pprint.pprint(response)
|
|
for x in response["transactions"]:
|
|
# pprint.pprint(x)
|
|
for y in x["inner-txns"]:
|
|
if "application-transaction" not in y:
|
|
continue
|
|
if y["application-transaction"]["application-id"] != self.coreid:
|
|
continue
|
|
if len(y["logs"]) == 0:
|
|
continue
|
|
args = y["application-transaction"]["application-args"]
|
|
if len(args) < 2:
|
|
continue
|
|
if base64.b64decode(args[0]) != b'publishMessage':
|
|
continue
|
|
seq = int.from_bytes(base64.b64decode(y["logs"][0]), "big")
|
|
if seq != sid:
|
|
continue
|
|
if y["sender"] != saddr:
|
|
continue;
|
|
emitter = decode_address(y["sender"])
|
|
payload = base64.b64decode(args[1])
|
|
# pprint.pprint([seq, y["sender"], payload.hex()])
|
|
# sys.exit(0)
|
|
return self.gt.genVaa(emitter, seq, payload)
|
|
|
|
if 'next-token' in response:
|
|
nexttoken = response['next-token']
|
|
else:
|
|
self.INDEXER_ROUND = response['current-round'] + 1
|
|
break
|
|
time.sleep(1)
|
|
|
|
def publishMessage(self, client, sender, vaa, appid):
|
|
aa = decode_address(get_application_address(appid)).hex()
|
|
emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
|
|
|
|
txns = []
|
|
sp = client.suggested_params()
|
|
|
|
a = transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=appid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"test1", vaa, self.coreid],
|
|
foreign_apps = [self.coreid],
|
|
accounts=[emitter_addr],
|
|
sp=sp
|
|
)
|
|
|
|
a.fee = a.fee * 2
|
|
|
|
txns.append(a)
|
|
|
|
resp = self.sendTxn(client, sender, txns, True)
|
|
|
|
self.INDEXER_ROUND = resp.confirmedRound
|
|
|
|
return self.parseSeqFromLog(resp)
|
|
|
|
def createTestAsset(self, client, sender):
|
|
txns = []
|
|
|
|
a = transaction.PaymentTxn(
|
|
sender = sender.getAddress(),
|
|
sp = client.suggested_params(),
|
|
receiver = get_application_address(self.testid),
|
|
amt = 300000
|
|
)
|
|
|
|
txns.append(a)
|
|
|
|
sp = client.suggested_params()
|
|
a = transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=self.testid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"setup"],
|
|
sp=sp
|
|
)
|
|
|
|
a.fee = a.fee * 2
|
|
|
|
txns.append(a)
|
|
transaction.assign_group_id(txns)
|
|
|
|
grp = []
|
|
pk = sender.getPrivateKey()
|
|
for t in txns:
|
|
grp.append(t.sign(pk))
|
|
|
|
client.send_transactions(grp)
|
|
resp = self.waitForTransaction(client, grp[-1].get_txid())
|
|
|
|
aid = int.from_bytes(resp.__dict__["logs"][0], "big")
|
|
|
|
print("Opting " + sender.getAddress() + " into " + str(aid))
|
|
self.asset_optin(client, sender, aid, sender.getAddress())
|
|
|
|
txns = []
|
|
a = transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=self.testid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"mint"],
|
|
foreign_assets = [aid],
|
|
sp=sp
|
|
)
|
|
|
|
a.fee = a.fee * 2
|
|
|
|
txns.append(a)
|
|
|
|
resp = self.sendTxn(client, sender, txns, True)
|
|
|
|
# self.INDEXER_ROUND = resp.confirmedRound
|
|
|
|
return aid
|
|
|
|
def getCreator(self, client, sender, asset_id):
|
|
return client.asset_info(asset_id)["params"]["creator"]
|
|
|
|
def testAttest(self, client, sender, asset_id):
|
|
taddr = get_application_address(self.tokenid)
|
|
aa = decode_address(taddr).hex()
|
|
emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
|
|
|
|
creator = self.getCreator(client, sender, asset_id)
|
|
c = client.account_info(creator)
|
|
wormhole = c.get("auth-addr") == taddr
|
|
|
|
if not wormhole:
|
|
creator = self.optin(client, sender, self.tokenid, asset_id, b"native".hex())
|
|
|
|
txns = []
|
|
sp = client.suggested_params()
|
|
|
|
txns.append(transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=self.tokenid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"nop"],
|
|
sp=sp
|
|
))
|
|
|
|
mfee = self.getMessageFee()
|
|
if (mfee > 0):
|
|
txns.append(transaction.PaymentTxn(sender = sender.getAddress(), sp = sp, receiver = get_application_address(self.tokenid), amt = mfee))
|
|
|
|
a = transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=self.tokenid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"attestToken", asset_id],
|
|
foreign_apps = [self.coreid],
|
|
foreign_assets = [asset_id],
|
|
accounts=[emitter_addr, creator, c["address"], get_application_address(self.coreid)],
|
|
sp=sp
|
|
)
|
|
|
|
if (mfee > 0):
|
|
a.fee = a.fee * 3
|
|
else:
|
|
a.fee = a.fee * 2
|
|
|
|
txns.append(a)
|
|
|
|
resp = self.sendTxn(client, sender, txns, True)
|
|
|
|
# Point us at the correct round
|
|
self.INDEXER_ROUND = resp.confirmedRound
|
|
|
|
# print(encode_address(resp.__dict__["logs"][0]))
|
|
# print(encode_address(resp.__dict__["logs"][1]))
|
|
# pprint.pprint(resp.__dict__)
|
|
return self.parseSeqFromLog(resp)
|
|
|
|
def transferAsset(self, client, sender, asset_id, quantity, receiver, chain, fee, payload = None):
|
|
# pprint.pprint(["transferAsset", asset_id, quantity, receiver, chain, fee])
|
|
|
|
taddr = get_application_address(self.tokenid)
|
|
aa = decode_address(taddr).hex()
|
|
emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
|
|
|
|
# asset_id 0 is ALGO
|
|
|
|
if asset_id == 0:
|
|
wormhole = False
|
|
else:
|
|
creator = self.getCreator(client, sender, asset_id)
|
|
c = client.account_info(creator)
|
|
wormhole = c.get("auth-addr") == taddr
|
|
|
|
txns = []
|
|
|
|
|
|
mfee = self.getMessageFee()
|
|
if (mfee > 0):
|
|
txns.append(transaction.PaymentTxn(sender = sender.getAddress(), sp = sp, receiver = get_application_address(self.tokenid), amt = mfee))
|
|
|
|
if not wormhole:
|
|
creator = self.optin(client, sender, self.tokenid, asset_id, b"native".hex())
|
|
print("non wormhole account " + creator)
|
|
|
|
sp = client.suggested_params()
|
|
|
|
if (asset_id != 0) and (not self.asset_optin_check(client, sender, asset_id, creator)):
|
|
print("Looks like we need to optin")
|
|
|
|
txns.append(
|
|
transaction.PaymentTxn(
|
|
sender=sender.getAddress(),
|
|
receiver=creator,
|
|
amt=100000,
|
|
sp=sp
|
|
)
|
|
)
|
|
|
|
# The tokenid app needs to do the optin since it has signature authority
|
|
a = transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=self.tokenid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"optin", asset_id],
|
|
foreign_assets = [asset_id],
|
|
accounts=[creator],
|
|
sp=sp
|
|
)
|
|
|
|
a.fee = a.fee * 2
|
|
txns.append(a)
|
|
self.sendTxn(client, sender, txns, False)
|
|
txns = []
|
|
|
|
txns.insert(0,
|
|
transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=self.tokenid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"nop"],
|
|
sp=client.suggested_params(),
|
|
)
|
|
)
|
|
|
|
if asset_id == 0:
|
|
print("asset_id == 0")
|
|
txns.append(transaction.PaymentTxn(
|
|
sender=sender.getAddress(),
|
|
receiver=creator,
|
|
amt=quantity,
|
|
sp=sp,
|
|
))
|
|
accounts=[emitter_addr, creator, creator]
|
|
else:
|
|
print("asset_id != 0")
|
|
txns.append(
|
|
transaction.AssetTransferTxn(
|
|
sender = sender.getAddress(),
|
|
sp = sp,
|
|
receiver = creator,
|
|
amt = quantity,
|
|
index = asset_id
|
|
))
|
|
accounts=[emitter_addr, creator, c["address"]]
|
|
|
|
args = [b"sendTransfer", asset_id, quantity, decode_address(receiver), chain, fee]
|
|
if None != payload:
|
|
args.append(payload)
|
|
|
|
#pprint.pprint(args)
|
|
|
|
# print(self.tokenid)
|
|
a = transaction.ApplicationCallTxn(
|
|
sender=sender.getAddress(),
|
|
index=self.tokenid,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=args,
|
|
foreign_apps = [self.coreid],
|
|
foreign_assets = [asset_id],
|
|
accounts=accounts,
|
|
sp=sp
|
|
)
|
|
|
|
a.fee = a.fee * 2
|
|
|
|
txns.append(a)
|
|
|
|
resp = self.sendTxn(client, sender, txns, True)
|
|
|
|
self.INDEXER_ROUND = resp.confirmedRound
|
|
|
|
# pprint.pprint([self.coreid, self.tokenid, resp.__dict__,
|
|
# int.from_bytes(resp.__dict__["logs"][1], "big"),
|
|
# int.from_bytes(resp.__dict__["logs"][2], "big"),
|
|
# int.from_bytes(resp.__dict__["logs"][3], "big"),
|
|
# int.from_bytes(resp.__dict__["logs"][4], "big"),
|
|
# int.from_bytes(resp.__dict__["logs"][5], "big")
|
|
# ])
|
|
# print(encode_address(resp.__dict__["logs"][0]))
|
|
# print(encode_address(resp.__dict__["logs"][1]))
|
|
return self.parseSeqFromLog(resp)
|
|
|
|
def asset_optin_check(self, client, sender, asset, receiver):
|
|
if receiver not in self.asset_cache:
|
|
self.asset_cache[receiver] = {}
|
|
|
|
if asset in self.asset_cache[receiver]:
|
|
return True
|
|
|
|
ai = client.account_info(receiver)
|
|
if "assets" in ai:
|
|
for x in ai["assets"]:
|
|
if x["asset-id"] == asset:
|
|
self.asset_cache[receiver][asset] = True
|
|
return True
|
|
|
|
return False
|
|
|
|
def asset_optin(self, client, sender, asset, receiver):
|
|
if self.asset_optin_check(client, sender, asset, receiver):
|
|
return
|
|
|
|
pprint.pprint(["asset_optin", asset, receiver])
|
|
|
|
sp = client.suggested_params()
|
|
optin_txn = transaction.AssetTransferTxn(
|
|
sender = sender.getAddress(),
|
|
sp = sp,
|
|
receiver = receiver,
|
|
amt = 0,
|
|
index = asset
|
|
)
|
|
|
|
transaction.assign_group_id([optin_txn])
|
|
signed_optin = optin_txn.sign(sender.getPrivateKey())
|
|
client.send_transactions([signed_optin])
|
|
resp = self.waitForTransaction(client, signed_optin.get_txid())
|
|
assert self.asset_optin_check(client, sender, asset, receiver), "The optin failed"
|
|
print("woah! optin succeeded")
|
|
|
|
def simple_test(self):
|
|
# q = bytes.fromhex(gt.genAssetMeta(gt.guardianPrivKeys, 1, 1, 1, bytes.fromhex("4523c3F29447d1f32AEa95BEBD00383c4640F1b4"), 1, 8, b"USDC", b"CircleCoin"))
|
|
# pprint.pprint(self.parseVAA(q))
|
|
# sys.exit(0)
|
|
|
|
|
|
# vaa = self.parseVAA(bytes.fromhex("01000000010100e1232697de3681d67ca0c46fbbc9ea5d282c473daae8fda2b23145e7b7167f9a35888acf80ed9d091af3069108c25324a22d8665241db884dda53ca53a8212d100625436600000000100020000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585000000000000000120010000000000000000000000000000000000000000000000000000000005f5e1000000000000000000000000000000000000000000000000004523c3F29447d1f32AEa95BEBD00383c4640F1b400020000000000000000000000000000000000000000000000000000aabbcc00080000000000000000000000000000000000000000000000000000000000000000"))
|
|
# pprint.pprint(vaa)
|
|
# sys.exit(0)
|
|
|
|
gt = GenTest(True)
|
|
self.gt = gt
|
|
|
|
self.setup_args()
|
|
|
|
if self.args.testnet:
|
|
self.testnet()
|
|
|
|
client = self.client = self.getAlgodClient()
|
|
|
|
self.genTeal()
|
|
|
|
self.vaa_verify = self.client.compile(get_vaa_verify())
|
|
self.vaa_verify["lsig"] = LogicSig(base64.b64decode(self.vaa_verify["result"]))
|
|
|
|
vaaLogs = []
|
|
|
|
args = self.args
|
|
|
|
if self.args.mnemonic:
|
|
self.foundation = Account.FromMnemonic(self.args.mnemonic)
|
|
|
|
if self.foundation == None:
|
|
print("Generating the foundation account...")
|
|
self.foundation = self.getTemporaryAccount(self.client)
|
|
|
|
if self.foundation == None:
|
|
print("We dont have a account? ")
|
|
sys.exit(0)
|
|
|
|
foundation = self.foundation
|
|
|
|
seq = int(time.time())
|
|
|
|
self.coreid = 1004
|
|
self.tokenid = 1006
|
|
|
|
|
|
player = self.getTemporaryAccount(client)
|
|
print("token bridge " + str(self.tokenid) + " address " + get_application_address(self.tokenid))
|
|
|
|
player2 = self.getTemporaryAccount(client)
|
|
player3 = self.getTemporaryAccount(client)
|
|
|
|
# This creates a asset by using another app... you can also just creat the asset from the client sdk like we do in the typescript test
|
|
self.testid = self.createTestApp(client, player2)
|
|
|
|
print("Lets create a brand new non-wormhole asset and try to attest and send it out")
|
|
self.testasset = self.createTestAsset(client, player2)
|
|
|
|
print("test asset id: " + str(self.testasset))
|
|
|
|
print("Lets try to create an attest for a non-wormhole thing with a huge number of decimals")
|
|
# paul - attestFromAlgorand
|
|
sid = self.testAttest(client, player2, self.testasset)
|
|
print("... track down the generated VAA")
|
|
vaa = self.getVAA(client, player, sid, self.tokenid)
|
|
v = self.parseVAA(bytes.fromhex(vaa))
|
|
print("We got a " + v["Meta"])
|
|
|
|
if __name__ == "__main__":
|
|
core = AlgoTest()
|
|
core.simple_test()
|