# python3 -m pip install pycryptodomex uvarint pyteal web3 coincurve import sys sys.path.append("..") from admin import PortalCore, Account from gentest import GenTest from base64 import b64decode from typing import List, Tuple, Dict, Any, Optional, Union import base64 import random import time import hashlib import uuid import json from algosdk.v2client.algod import AlgodClient from algosdk.kmd import KMDClient from algosdk import account, mnemonic from algosdk.encoding import decode_address, encode_address from algosdk.future import transaction from pyteal import compileTeal, Mode, Expr from pyteal import * from algosdk.logic import get_application_address from vaa_verify import get_vaa_verify from algosdk.future.transaction import LogicSig from test_contract import get_test_app from algosdk.v2client import indexer import pprint class AlgoTest(PortalCore): def __init__(self) -> None: super().__init__() def getBalances(self, client: AlgodClient, account: str) -> Dict[int, int]: balances: Dict[int, int] = dict() accountInfo = client.account_info(account) # set key 0 to Algo balance balances[0] = accountInfo["amount"] assets: List[Dict[str, Any]] = accountInfo.get("assets", []) for assetHolding in assets: assetID = assetHolding["asset-id"] amount = assetHolding["amount"] balances[assetID] = amount return balances def createTestApp( self, client: AlgodClient, sender: Account, ) -> int: approval, clear = get_test_app(client) globalSchema = transaction.StateSchema(num_uints=4, num_byte_slices=30) localSchema = transaction.StateSchema(num_uints=0, num_byte_slices=16) app_args = [] txn = transaction.ApplicationCreateTxn( sender=sender.getAddress(), on_complete=transaction.OnComplete.NoOpOC, approval_program=b64decode(approval["result"]), clear_program=b64decode(clear["result"]), global_schema=globalSchema, local_schema=localSchema, app_args=app_args, sp=client.suggested_params(), ) signedTxn = txn.sign(sender.getPrivateKey()) client.send_transaction(signedTxn) response = self.waitForTransaction(client, signedTxn.get_txid()) assert response.applicationIndex is not None and response.applicationIndex > 0 txn = transaction.PaymentTxn(sender = sender.getAddress(), sp = client.suggested_params(), receiver = get_application_address(response.applicationIndex), amt = 300000) signedTxn = txn.sign(sender.getPrivateKey()) client.send_transaction(signedTxn) return response.applicationIndex def parseSeqFromLog(self, txn): return int.from_bytes(b64decode(txn.innerTxns[0]["logs"][0]), "big") def getVAA(self, client, sender, sid, app): if sid == None: raise Exception("getVAA called with a sid of None") saddr = get_application_address(app) # SOOO, we send a nop txn through to push the block forward # one # This is ONLY needed on a local net... the indexer will sit # on the last block for 30 to 60 seconds... we don't want this # log in prod since it is wasteful of gas if (self.INDEXER_ROUND > 512): # until they fix it print("indexer is broken in local net... stop/clean/restart the sandbox") sys.exit(0) txns = [] txns.append( transaction.ApplicationCallTxn( sender=sender.getAddress(), index=self.tokenid, on_complete=transaction.OnComplete.NoOpOC, app_args=[b"nop"], sp=client.suggested_params(), ) ) self.sendTxn(client, sender, txns, False) while True: nexttoken = "" while True: response = self.myindexer.search_transactions( min_round=self.INDEXER_ROUND, note_prefix=self.NOTE_PREFIX, next_page=nexttoken) # pprint.pprint(response) for x in response["transactions"]: # pprint.pprint(x) for y in x["inner-txns"]: if y["application-transaction"]["application-id"] != self.coreid: continue if len(y["logs"]) == 0: continue args = y["application-transaction"]["application-args"] if len(args) < 2: continue if base64.b64decode(args[0]) != b'publishMessage': continue seq = int.from_bytes(base64.b64decode(y["logs"][0]), "big") if seq != sid: # print(str(seq) + " != " + str(sid)) continue if y["sender"] != saddr: continue; emitter = decode_address(y["sender"]) payload = base64.b64decode(args[1]) # pprint.pprint([seq, y["sender"], payload.hex()]) # sys.exit(0) return self.gt.genVaa(emitter, seq, payload) if 'next-token' in response: nexttoken = response['next-token'] else: self.INDEXER_ROUND = response['current-round'] + 1 break time.sleep(1) def publishMessage(self, client, sender, vaa, appid): aa = decode_address(get_application_address(appid)).hex() emitter_addr = self.optin(client, sender, self.coreid, 0, aa) txns = [] sp = client.suggested_params() a = transaction.ApplicationCallTxn( sender=sender.getAddress(), index=appid, on_complete=transaction.OnComplete.NoOpOC, app_args=[b"test1", vaa, self.coreid], foreign_apps = [self.coreid], accounts=[emitter_addr], sp=sp ) a.fee = a.fee * 2 txns.append(a) resp = self.sendTxn(client, sender, txns, True) self.INDEXER_ROUND = resp.confirmedRound return self.parseSeqFromLog(resp) def createTestAsset(self, client, sender): txns = [] a = transaction.PaymentTxn( sender = sender.getAddress(), sp = client.suggested_params(), receiver = get_application_address(self.testid), amt = 300000 ) txns.append(a) sp = client.suggested_params() a = transaction.ApplicationCallTxn( sender=sender.getAddress(), index=self.testid, on_complete=transaction.OnComplete.NoOpOC, app_args=[b"setup"], sp=sp ) a.fee = a.fee * 2 txns.append(a) transaction.assign_group_id(txns) grp = [] pk = sender.getPrivateKey() for t in txns: grp.append(t.sign(pk)) client.send_transactions(grp) resp = self.waitForTransaction(client, grp[-1].get_txid()) aid = int.from_bytes(resp.__dict__["logs"][0], "big") print("Opting " + sender.getAddress() + " into " + str(aid)) self.asset_optin(client, sender, aid, sender.getAddress()) txns = [] a = transaction.ApplicationCallTxn( sender=sender.getAddress(), index=self.testid, on_complete=transaction.OnComplete.NoOpOC, app_args=[b"mint"], foreign_assets = [aid], sp=sp ) a.fee = a.fee * 2 txns.append(a) resp = self.sendTxn(client, sender, txns, True) # self.INDEXER_ROUND = resp.confirmedRound return aid def getCreator(self, client, sender, asset_id): return client.asset_info(asset_id)["params"]["creator"] def testAttest(self, client, sender, asset_id): taddr = get_application_address(self.tokenid) aa = decode_address(taddr).hex() emitter_addr = self.optin(client, sender, self.coreid, 0, aa) creator = self.getCreator(client, sender, asset_id) c = client.account_info(creator) wormhole = c.get("auth-addr") == taddr if not wormhole: creator = self.optin(client, sender, self.tokenid, asset_id, b"native".hex()) txns = [] sp = client.suggested_params() a = transaction.ApplicationCallTxn( sender=sender.getAddress(), index=self.tokenid, on_complete=transaction.OnComplete.NoOpOC, app_args=[b"attestToken", asset_id], foreign_apps = [self.coreid], foreign_assets = [asset_id], accounts=[emitter_addr, creator, c["address"]], sp=sp ) a.fee = a.fee * 2 txns.append(a) resp = self.sendTxn(client, sender, txns, True) # Point us at the correct round self.INDEXER_ROUND = resp.confirmedRound # print(encode_address(resp.__dict__["logs"][0])) # print(encode_address(resp.__dict__["logs"][1])) return self.parseSeqFromLog(resp) def transferAsset(self, client, sender, asset_id, quantity, receiver, chain, fee, payload = None): taddr = get_application_address(self.tokenid) aa = decode_address(taddr).hex() emitter_addr = self.optin(client, sender, self.coreid, 0, aa) # asset_id 0 is ALGO if asset_id == 0: wormhole = False else: creator = self.getCreator(client, sender, asset_id) c = client.account_info(creator) wormhole = c.get("auth-addr") == taddr txns = [] if not wormhole: creator = self.optin(client, sender, self.tokenid, asset_id, b"native".hex()) print("non wormhole account " + creator) sp = client.suggested_params() if (asset_id != 0) and (not self.asset_optin_check(client, sender, asset_id, creator)): print("Looks like we need to optin") txns.append( transaction.PaymentTxn( sender=sender.getAddress(), receiver=creator, amt=100000, sp=sp ) ) # The tokenid app needs to do the optin since it has signature authority a = transaction.ApplicationCallTxn( sender=sender.getAddress(), index=self.tokenid, on_complete=transaction.OnComplete.NoOpOC, app_args=[b"optin", asset_id], foreign_assets = [asset_id], accounts=[creator], sp=sp ) a.fee = a.fee * 2 txns.append(a) self.sendTxn(client, sender, txns, False) txns = [] if asset_id == 0: print("asset_id == 0") txns.append(transaction.PaymentTxn( sender=sender.getAddress(), receiver=creator, amt=quantity, sp=sp, )) accounts=[emitter_addr, creator, creator] else: print("asset_id != 0") txns.append( transaction.AssetTransferTxn( sender = sender.getAddress(), sp = sp, receiver = creator, amt = quantity, index = asset_id )) accounts=[emitter_addr, creator, c["address"]] args = [b"sendTransfer", asset_id, quantity, decode_address(receiver), chain, fee] if None != payload: args.append(payload) # pprint.pprint(args) a = transaction.ApplicationCallTxn( sender=sender.getAddress(), index=self.tokenid, on_complete=transaction.OnComplete.NoOpOC, app_args=args, foreign_apps = [self.coreid], foreign_assets = [asset_id], accounts=accounts, sp=sp ) a.fee = a.fee * 2 txns.append(a) resp = self.sendTxn(client, sender, txns, True) self.INDEXER_ROUND = resp.confirmedRound # pprint.pprint(resp.__dict__) # print(encode_address(resp.__dict__["logs"][0])) # print(encode_address(resp.__dict__["logs"][1])) return self.parseSeqFromLog(resp) def asset_optin_check(self, client, sender, asset, receiver): if receiver not in self.asset_cache: self.asset_cache[receiver] = {} if asset in self.asset_cache[receiver]: return True ai = client.account_info(receiver) if "assets" in ai: for x in ai["assets"]: if x["asset-id"] == asset: self.asset_cache[receiver][asset] = True return True return False def asset_optin(self, client, sender, asset, receiver): if self.asset_optin_check(client, sender, asset, receiver): return pprint.pprint(["asset_optin", asset, receiver]) sp = client.suggested_params() optin_txn = transaction.AssetTransferTxn( sender = sender.getAddress(), sp = sp, receiver = receiver, amt = 0, index = asset ) transaction.assign_group_id([optin_txn]) signed_optin = optin_txn.sign(sender.getPrivateKey()) client.send_transactions([signed_optin]) resp = self.waitForTransaction(client, signed_optin.get_txid()) assert self.asset_optin_check(client, sender, asset, receiver), "The optin failed" print("woah! optin succeeded") def simple_test(self): # q = bytes.fromhex(gt.genAssetMeta(gt.guardianPrivKeys, 1, 1, 1, bytes.fromhex("4523c3F29447d1f32AEa95BEBD00383c4640F1b4"), 1, 8, b"USDC", b"CircleCoin")) # pprint.pprint(self.parseVAA(q)) # sys.exit(0) # vaa = self.parseVAA(bytes.fromhex("01000000011300c412b9e5b304bde8f8633a41568991ca56b7c11a925847f0059e95010ec5241b761719f12d3f4a79d1515e08152b2e8584cd1e8217dd7743c2bf863b78b2bf040001aebade2f601a4e9083585b1bb5f98d421f116e0393f525b95d51afbe69051587531771dc127a5e9d7b74662bb7ac378d44181522dc748b1b0cbfe1b1de6ed39d01024b4e9fc86ac64aaeef84ea14e4265c3c186042a3ae9ab2933bf06c0cbf326b3c2b89e7d9854fc5204a447bd202592a72d1d6db3d007bef9fea0e35953afbd9f1010342e4446ac94545a0447851eda5d5e3b8c97c6f4ef338977562cd4ecbee2b8fea42d536d7655c28a7f7fb2ff5fc8e5775e892d853c9b2e4969f9ce054ede801700104af0d783996ccfd31d6fc6f86b634288cd2f9cc29695cfcbf12d915c1b9c383dc792c7abbe8126cd917fb8658a8de843d64171122db182453584c0c330e8889730105f34d45ec63ec0a0c4535303fd9c83a0fad6b0a112b27306a288c1b46f2a78399754536ecb07f1ab6c32d92ed50b11fef3668b23d5c1ca010ec4c924441367eac0006566671ff859eec8429874ba9e07dd107b22859cf5029928bebec6eb73cdca6752f91bb252bca76cb15ede1121a84a9a54dad126f50f282a47f7d30880ef86a3900076d0d1241e1fc9d039810a8aebd7cab863017c9420eb67f4578577c5ec4d37162723dcd6213ff6895f280a88ba70de1a5b9257fe2937cbdea007e84886abc46dd0108b24dcddaae10f5e12b7085a0c3885a050640af17ba265a448102401854183e9f3ae9a14cad1af64eb57c6f145c6f709d7ed6bb8712a6b315dc2780c9eb42812e0109df696bf506dfcd8fce57968a84d5f773706b117fad31f86bbb089ede77d71a6e54b7729f79a82e7d6e4a6797380796fbcb9ba9428e8fcdf0400515f8205b31c5010a90a03c76fdec510712b2a6ee52cc0b6df5c921437896756f34b3782aa486eb5b5d02df783664257539233502ec25bbda7dd754afc139823da8a43c0d3c91c279000b33549edd8353c4d577cb273b88b545ae547ad01e85161a4fbbbb371cff453d6311c787254e2852c3b874ea60c67d40efc3ee3f24b51bc3fe95cc0a873e8a3fb6000ce2e206214ae2b4b048857f061ed3cf8cef060c67a85ad863f266145238c5d2a85e38b4eb9b3be4d33f502df4c45762504eb43a6bf78f01363d1399b67c354df8000d2d362d64a2e3d1583e1299238829cc11d81e9b9820121c0a2eb91d542aa54c993861e8225bc3e8d028dc128d284118703a4ec69144d69402efd72a29bb9f6b8f000e6bf56fa3ae6303f495f1379b450eb52580d7d9098dd909762e6186d19e06480d2bba8f06602dbd6d3d5deac7080fc2e61bd1be97e442b63435c91fa72b33534c000fad870b47c86f6997286bd4def4bacc5a8abbfef3f730f62183c638131004ea2f706ab73ebfe8f4879bf54f580444acec212e96e41abaf4acfc3383f05478e528001089599974feaab33862cd881af13f1645079bd2fa2ff07ca744674c8556aaf97c5c9c90df332d5b4ad1428776b68612f0b1ecb98c2ebc83f44f42426f180062cd00116aa93eecb4d528afaa07b72484acd5b79ad20e9ad8e55ce37cb9138b4c12a8eb3d10fa7d932b06ac441905e0226d3420101971a72c5488e4bfef222de8c3acd1011203a3e3d8ec938ffbc3a27d8caf50fc925bd25bd286d5ad6077dffd7e205ce0806e166b661d502f8c49acf88d42fde20e6015830d5517a0bfd40f79963ded4d2d006227697a000f68690008a0ae83030f1423aa97121527f65bbbb97925b43b95231bb0478fd650a057cc4b00000000000000072003000000000000000000000000000000000000000000000000000000000007a12000000000000000000000000000000000000000000000000000000000000000000008cf9ee7255420a50c55ef35d4bdcdd8048dee5c3c1333ecd97aff98869ea280780008000000000000000000000000000000000000000000000000000000000007a1206869206d6f6d")) # pprint.pprint(vaa) # sys.exit(0) gt = GenTest(False) self.gt = gt client = self.getAlgodClient() print("Generating the foundation account...") foundation = self.getTemporaryAccount(client) player = self.getTemporaryAccount(client) player2 = self.getTemporaryAccount(client) player3 = self.getTemporaryAccount(client) self.coreid = 4 print("coreid = " + str(self.coreid)) self.tokenid = 6 print("token bridge " + str(self.tokenid) + " address " + get_application_address(self.tokenid)) self.testid = self.createTestApp(client, player2) print("testid " + str(self.testid) + " address " + get_application_address(self.testid)) print("Lets create a brand new non-wormhole asset and try to attest and send it out") self.testasset = self.createTestAsset(client, player2) print("test asset id: " + str(self.testasset)) print("Lets try to create an attest for a non-wormhole thing with a huge number of decimals") # paul - attestFromAlgorand sid = self.testAttest(client, player2, self.testasset) print("... track down the generated VAA") vaa = self.getVAA(client, player, sid, self.tokenid) v = self.parseVAA(bytes.fromhex(vaa)) print("We got a " + v["Meta"]) # pprint.pprint(self.getBalances(client, player.getAddress())) # pprint.pprint(self.getBalances(client, player2.getAddress())) # pprint.pprint(self.getBalances(client, player3.getAddress())) # # print("Lets transfer that asset to one of our other accounts... first lets create the vaa") # # paul - transferFromAlgorand # sid = self.transferAsset(client, player2, self.testasset, 100, player3.getAddress(), 8, 0) # print("... track down the generated VAA") # vaa = self.getVAA(client, player, sid, self.tokenid) # print(".. and lets pass that to player3") # self.submitVAA(bytes.fromhex(vaa), client, player3) # # pprint.pprint(self.getBalances(client, player.getAddress())) # pprint.pprint(self.getBalances(client, player2.getAddress())) # pprint.pprint(self.getBalances(client, player3.getAddress())) # # # Lets split it into two parts... the payload and the fee # print("Lets split it into two parts... the payload and the fee") # sid = self.transferAsset(client, player2, self.testasset, 1000, player3.getAddress(), 8, 500) # print("... track down the generated VAA") # vaa = self.getVAA(client, player, sid, self.tokenid) # print(".. and lets pass that to player3 with fees being passed to player acting as a relayer") # self.submitVAA(bytes.fromhex(vaa), client, player) # # pprint.pprint(self.getBalances(client, player.getAddress())) # pprint.pprint(self.getBalances(client, player2.getAddress())) # pprint.pprint(self.getBalances(client, player3.getAddress())) # # # Now it gets tricky, lets create a virgin account... # pk, addr = account.generate_account() # emptyAccount = Account(pk) # # print("How much is in the empty account? (" + addr + ")") # pprint.pprint(self.getBalances(client, emptyAccount.getAddress())) # # # paul - transferFromAlgorand # print("Lets transfer algo this time.... first lets create the vaa") # sid = self.transferAsset(client, player2, 0, 1000000, emptyAccount.getAddress(), 8, 0) # print("... track down the generated VAA") # vaa = self.getVAA(client, player, sid, self.tokenid) ## pprint.pprint(vaa) # print(".. and lets pass that to the empty account.. but use somebody else to relay since we cannot pay for it") # # # paul - redeemOnAlgorand # self.submitVAA(bytes.fromhex(vaa), client, player) # # print("=================================================") # # print("How much is in the source account now?") # pprint.pprint(self.getBalances(client, player2.getAddress())) # # print("How much is in the empty account now?") # pprint.pprint(self.getBalances(client, emptyAccount.getAddress())) # # print("How much is in the player3 account now?") # pprint.pprint(self.getBalances(client, player3.getAddress())) # # print("Lets transfer more algo.. splut 50/50 with the relayer.. going to player3") # sid = self.transferAsset(client, player2, 0, 1000000, player3.getAddress(), 8, 500000) # print("... track down the generated VAA") # vaa = self.getVAA(client, player, sid, self.tokenid) # print(".. and lets pass that to player3.. but use the previously empty account to relay it") # self.submitVAA(bytes.fromhex(vaa), client, emptyAccount) # # print("How much is in the source account now?") # pprint.pprint(self.getBalances(client, player2.getAddress())) # # print("How much is in the empty account now?") # pprint.pprint(self.getBalances(client, emptyAccount.getAddress())) # # print("How much is in the player3 account now?") # pprint.pprint(self.getBalances(client, player3.getAddress())) # # print("How about a payload3") # sid = self.transferAsset(client, player2, 0, 100, player3.getAddress(), 8, 0, b'hi mom') # print("... track down the generated VAA") # vaa = self.getVAA(client, player, sid, self.tokenid) # # print(".. and lets pass that to the wrong account") # try: # self.submitVAA(bytes.fromhex(vaa), client, emptyAccount) # except: # print("Exception thrown... nice") # # print(".. and lets pass that to the right account") # self.submitVAA(bytes.fromhex(vaa), client, player3) # print("player account: " + player.getAddress()) # pprint.pprint(client.account_info(player.getAddress())) # print("player2 account: " + player2.getAddress()) # pprint.pprint(client.account_info(player2.getAddress())) # print("foundation account: " + foundation.getAddress()) # pprint.pprint(client.account_info(foundation.getAddress())) # # print("core app: " + get_application_address(self.coreid)) # pprint.pprint(client.account_info(get_application_address(self.coreid))), # # print("token app: " + get_application_address(self.tokenid)) # pprint.pprint(client.account_info(get_application_address(self.tokenid))), # # print("asset app: " + chain_addr) # pprint.pprint(client.account_info(chain_addr)) core = AlgoTest() core.simple_test()