wormhole/algorand/test/simple.py

579 lines
23 KiB
Python

# python3 -m pip install pycryptodomex uvarint pyteal web3 coincurve
import sys
sys.path.append("..")
from admin import PortalCore, Account
from gentest import GenTest
from base64 import b64decode
from typing import List, Tuple, Dict, Any, Optional, Union
import base64
import random
import time
import hashlib
import uuid
import json
from algosdk.v2client.algod import AlgodClient
from algosdk.kmd import KMDClient
from algosdk import account, mnemonic
from algosdk.encoding import decode_address, encode_address
from algosdk.future import transaction
from pyteal import compileTeal, Mode, Expr
from pyteal import *
from algosdk.logic import get_application_address
from vaa_verify import get_vaa_verify
from algosdk.future.transaction import LogicSig
from test_contract import get_test_app
from algosdk.v2client import indexer
import pprint
class AlgoTest(PortalCore):
def __init__(self) -> None:
super().__init__()
def getBalances(self, client: AlgodClient, account: str) -> Dict[int, int]:
balances: Dict[int, int] = dict()
accountInfo = client.account_info(account)
# set key 0 to Algo balance
balances[0] = accountInfo["amount"]
assets: List[Dict[str, Any]] = accountInfo.get("assets", [])
for assetHolding in assets:
assetID = assetHolding["asset-id"]
amount = assetHolding["amount"]
balances[assetID] = amount
return balances
def createTestApp(
self,
client: AlgodClient,
sender: Account,
) -> int:
approval, clear = get_test_app(client)
globalSchema = transaction.StateSchema(num_uints=4, num_byte_slices=30)
localSchema = transaction.StateSchema(num_uints=0, num_byte_slices=16)
app_args = []
txn = transaction.ApplicationCreateTxn(
sender=sender.getAddress(),
on_complete=transaction.OnComplete.NoOpOC,
approval_program=b64decode(approval["result"]),
clear_program=b64decode(clear["result"]),
global_schema=globalSchema,
local_schema=localSchema,
app_args=app_args,
sp=client.suggested_params(),
)
signedTxn = txn.sign(sender.getPrivateKey())
client.send_transaction(signedTxn)
response = self.waitForTransaction(client, signedTxn.get_txid())
assert response.applicationIndex is not None and response.applicationIndex > 0
txn = transaction.PaymentTxn(sender = sender.getAddress(), sp = client.suggested_params(),
receiver = get_application_address(response.applicationIndex), amt = 300000)
signedTxn = txn.sign(sender.getPrivateKey())
client.send_transaction(signedTxn)
return response.applicationIndex
def parseSeqFromLog(self, txn):
return int.from_bytes(b64decode(txn.innerTxns[0]["logs"][0]), "big")
def getVAA(self, client, sender, sid, app):
if sid == None:
raise Exception("getVAA called with a sid of None")
saddr = get_application_address(app)
# SOOO, we send a nop txn through to push the block forward
# one
# This is ONLY needed on a local net... the indexer will sit
# on the last block for 30 to 60 seconds... we don't want this
# log in prod since it is wasteful of gas
if (self.INDEXER_ROUND > 512): # until they fix it
print("indexer is broken in local net... stop/clean/restart the sandbox")
sys.exit(0)
txns = []
txns.append(
transaction.ApplicationCallTxn(
sender=sender.getAddress(),
index=self.tokenid,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"nop"],
sp=client.suggested_params(),
)
)
self.sendTxn(client, sender, txns, False)
while True:
nexttoken = ""
while True:
response = self.myindexer.search_transactions( min_round=self.INDEXER_ROUND, note_prefix=self.NOTE_PREFIX, next_page=nexttoken)
# pprint.pprint(response)
for x in response["transactions"]:
# pprint.pprint(x)
for y in x["inner-txns"]:
if y["application-transaction"]["application-id"] != self.coreid:
continue
if len(y["logs"]) == 0:
continue
args = y["application-transaction"]["application-args"]
if len(args) < 2:
continue
if base64.b64decode(args[0]) != b'publishMessage':
continue
seq = int.from_bytes(base64.b64decode(y["logs"][0]), "big")
if seq != sid:
# print(str(seq) + " != " + str(sid))
continue
if y["sender"] != saddr:
continue;
emitter = decode_address(y["sender"])
payload = base64.b64decode(args[1])
# pprint.pprint([seq, y["sender"], payload.hex()])
# sys.exit(0)
return self.gt.genVaa(emitter, seq, payload)
if 'next-token' in response:
nexttoken = response['next-token']
else:
self.INDEXER_ROUND = response['current-round'] + 1
break
time.sleep(1)
def publishMessage(self, client, sender, vaa, appid):
aa = decode_address(get_application_address(appid)).hex()
emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
txns = []
sp = client.suggested_params()
a = transaction.ApplicationCallTxn(
sender=sender.getAddress(),
index=appid,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"test1", vaa, self.coreid],
foreign_apps = [self.coreid],
accounts=[emitter_addr],
sp=sp
)
a.fee = a.fee * 2
txns.append(a)
resp = self.sendTxn(client, sender, txns, True)
self.INDEXER_ROUND = resp.confirmedRound
return self.parseSeqFromLog(resp)
def createTestAsset(self, client, sender):
txns = []
a = transaction.PaymentTxn(
sender = sender.getAddress(),
sp = client.suggested_params(),
receiver = get_application_address(self.testid),
amt = 300000
)
txns.append(a)
sp = client.suggested_params()
a = transaction.ApplicationCallTxn(
sender=sender.getAddress(),
index=self.testid,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"setup"],
sp=sp
)
a.fee = a.fee * 2
txns.append(a)
transaction.assign_group_id(txns)
grp = []
pk = sender.getPrivateKey()
for t in txns:
grp.append(t.sign(pk))
client.send_transactions(grp)
resp = self.waitForTransaction(client, grp[-1].get_txid())
aid = int.from_bytes(resp.__dict__["logs"][0], "big")
print("Opting " + sender.getAddress() + " into " + str(aid))
self.asset_optin(client, sender, aid, sender.getAddress())
txns = []
a = transaction.ApplicationCallTxn(
sender=sender.getAddress(),
index=self.testid,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"mint"],
foreign_assets = [aid],
sp=sp
)
a.fee = a.fee * 2
txns.append(a)
resp = self.sendTxn(client, sender, txns, True)
# self.INDEXER_ROUND = resp.confirmedRound
return aid
def getCreator(self, client, sender, asset_id):
return client.asset_info(asset_id)["params"]["creator"]
def testAttest(self, client, sender, asset_id):
taddr = get_application_address(self.tokenid)
aa = decode_address(taddr).hex()
emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
creator = self.getCreator(client, sender, asset_id)
c = client.account_info(creator)
wormhole = c.get("auth-addr") == taddr
if not wormhole:
creator = self.optin(client, sender, self.tokenid, asset_id, b"native".hex())
txns = []
sp = client.suggested_params()
a = transaction.ApplicationCallTxn(
sender=sender.getAddress(),
index=self.tokenid,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"attestToken", asset_id],
foreign_apps = [self.coreid],
foreign_assets = [asset_id],
accounts=[emitter_addr, creator, c["address"]],
sp=sp
)
a.fee = a.fee * 2
txns.append(a)
resp = self.sendTxn(client, sender, txns, True)
# Point us at the correct round
self.INDEXER_ROUND = resp.confirmedRound
# print(encode_address(resp.__dict__["logs"][0]))
# print(encode_address(resp.__dict__["logs"][1]))
return self.parseSeqFromLog(resp)
def transferAsset(self, client, sender, asset_id, quantity, receiver, chain, fee, payload = None):
taddr = get_application_address(self.tokenid)
aa = decode_address(taddr).hex()
emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
# asset_id 0 is ALGO
if asset_id == 0:
wormhole = False
else:
creator = self.getCreator(client, sender, asset_id)
c = client.account_info(creator)
wormhole = c.get("auth-addr") == taddr
txns = []
if not wormhole:
creator = self.optin(client, sender, self.tokenid, asset_id, b"native".hex())
print("non wormhole account " + creator)
sp = client.suggested_params()
if (asset_id != 0) and (not self.asset_optin_check(client, sender, asset_id, creator)):
print("Looks like we need to optin")
txns.append(
transaction.PaymentTxn(
sender=sender.getAddress(),
receiver=creator,
amt=100000,
sp=sp
)
)
# The tokenid app needs to do the optin since it has signature authority
a = transaction.ApplicationCallTxn(
sender=sender.getAddress(),
index=self.tokenid,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"optin", asset_id],
foreign_assets = [asset_id],
accounts=[creator],
sp=sp
)
a.fee = a.fee * 2
txns.append(a)
self.sendTxn(client, sender, txns, False)
txns = []
if asset_id == 0:
print("asset_id == 0")
txns.append(transaction.PaymentTxn(
sender=sender.getAddress(),
receiver=creator,
amt=quantity,
sp=sp,
))
accounts=[emitter_addr, creator, creator]
else:
print("asset_id != 0")
txns.append(
transaction.AssetTransferTxn(
sender = sender.getAddress(),
sp = sp,
receiver = creator,
amt = quantity,
index = asset_id
))
accounts=[emitter_addr, creator, c["address"]]
args = [b"sendTransfer", asset_id, quantity, decode_address(receiver), chain, fee]
if None != payload:
args.append(payload)
# pprint.pprint(args)
a = transaction.ApplicationCallTxn(
sender=sender.getAddress(),
index=self.tokenid,
on_complete=transaction.OnComplete.NoOpOC,
app_args=args,
foreign_apps = [self.coreid],
foreign_assets = [asset_id],
accounts=accounts,
sp=sp
)
a.fee = a.fee * 2
txns.append(a)
resp = self.sendTxn(client, sender, txns, True)
self.INDEXER_ROUND = resp.confirmedRound
# pprint.pprint(resp.__dict__)
# print(encode_address(resp.__dict__["logs"][0]))
# print(encode_address(resp.__dict__["logs"][1]))
return self.parseSeqFromLog(resp)
def asset_optin_check(self, client, sender, asset, receiver):
if receiver not in self.asset_cache:
self.asset_cache[receiver] = {}
if asset in self.asset_cache[receiver]:
return True
ai = client.account_info(receiver)
if "assets" in ai:
for x in ai["assets"]:
if x["asset-id"] == asset:
self.asset_cache[receiver][asset] = True
return True
return False
def asset_optin(self, client, sender, asset, receiver):
if self.asset_optin_check(client, sender, asset, receiver):
return
pprint.pprint(["asset_optin", asset, receiver])
sp = client.suggested_params()
optin_txn = transaction.AssetTransferTxn(
sender = sender.getAddress(),
sp = sp,
receiver = receiver,
amt = 0,
index = asset
)
transaction.assign_group_id([optin_txn])
signed_optin = optin_txn.sign(sender.getPrivateKey())
client.send_transactions([signed_optin])
resp = self.waitForTransaction(client, signed_optin.get_txid())
assert self.asset_optin_check(client, sender, asset, receiver), "The optin failed"
print("woah! optin succeeded")
def simple_test(self):
# q = bytes.fromhex(gt.genAssetMeta(gt.guardianPrivKeys, 1, 1, 1, bytes.fromhex("4523c3F29447d1f32AEa95BEBD00383c4640F1b4"), 1, 8, b"USDC", b"CircleCoin"))
# pprint.pprint(self.parseVAA(q))
# sys.exit(0)
# vaa = self.parseVAA(bytes.fromhex("01000000011300c412b9e5b304bde8f8633a41568991ca56b7c11a925847f0059e95010ec5241b761719f12d3f4a79d1515e08152b2e8584cd1e8217dd7743c2bf863b78b2bf040001aebade2f601a4e9083585b1bb5f98d421f116e0393f525b95d51afbe69051587531771dc127a5e9d7b74662bb7ac378d44181522dc748b1b0cbfe1b1de6ed39d01024b4e9fc86ac64aaeef84ea14e4265c3c186042a3ae9ab2933bf06c0cbf326b3c2b89e7d9854fc5204a447bd202592a72d1d6db3d007bef9fea0e35953afbd9f1010342e4446ac94545a0447851eda5d5e3b8c97c6f4ef338977562cd4ecbee2b8fea42d536d7655c28a7f7fb2ff5fc8e5775e892d853c9b2e4969f9ce054ede801700104af0d783996ccfd31d6fc6f86b634288cd2f9cc29695cfcbf12d915c1b9c383dc792c7abbe8126cd917fb8658a8de843d64171122db182453584c0c330e8889730105f34d45ec63ec0a0c4535303fd9c83a0fad6b0a112b27306a288c1b46f2a78399754536ecb07f1ab6c32d92ed50b11fef3668b23d5c1ca010ec4c924441367eac0006566671ff859eec8429874ba9e07dd107b22859cf5029928bebec6eb73cdca6752f91bb252bca76cb15ede1121a84a9a54dad126f50f282a47f7d30880ef86a3900076d0d1241e1fc9d039810a8aebd7cab863017c9420eb67f4578577c5ec4d37162723dcd6213ff6895f280a88ba70de1a5b9257fe2937cbdea007e84886abc46dd0108b24dcddaae10f5e12b7085a0c3885a050640af17ba265a448102401854183e9f3ae9a14cad1af64eb57c6f145c6f709d7ed6bb8712a6b315dc2780c9eb42812e0109df696bf506dfcd8fce57968a84d5f773706b117fad31f86bbb089ede77d71a6e54b7729f79a82e7d6e4a6797380796fbcb9ba9428e8fcdf0400515f8205b31c5010a90a03c76fdec510712b2a6ee52cc0b6df5c921437896756f34b3782aa486eb5b5d02df783664257539233502ec25bbda7dd754afc139823da8a43c0d3c91c279000b33549edd8353c4d577cb273b88b545ae547ad01e85161a4fbbbb371cff453d6311c787254e2852c3b874ea60c67d40efc3ee3f24b51bc3fe95cc0a873e8a3fb6000ce2e206214ae2b4b048857f061ed3cf8cef060c67a85ad863f266145238c5d2a85e38b4eb9b3be4d33f502df4c45762504eb43a6bf78f01363d1399b67c354df8000d2d362d64a2e3d1583e1299238829cc11d81e9b9820121c0a2eb91d542aa54c993861e8225bc3e8d028dc128d284118703a4ec69144d69402efd72a29bb9f6b8f000e6bf56fa3ae6303f495f1379b450eb52580d7d9098dd909762e6186d19e06480d2bba8f06602dbd6d3d5deac7080fc2e61bd1be97e442b63435c91fa72b33534c000fad870b47c86f6997286bd4def4bacc5a8abbfef3f730f62183c638131004ea2f706ab73ebfe8f4879bf54f580444acec212e96e41abaf4acfc3383f05478e528001089599974feaab33862cd881af13f1645079bd2fa2ff07ca744674c8556aaf97c5c9c90df332d5b4ad1428776b68612f0b1ecb98c2ebc83f44f42426f180062cd00116aa93eecb4d528afaa07b72484acd5b79ad20e9ad8e55ce37cb9138b4c12a8eb3d10fa7d932b06ac441905e0226d3420101971a72c5488e4bfef222de8c3acd1011203a3e3d8ec938ffbc3a27d8caf50fc925bd25bd286d5ad6077dffd7e205ce0806e166b661d502f8c49acf88d42fde20e6015830d5517a0bfd40f79963ded4d2d006227697a000f68690008a0ae83030f1423aa97121527f65bbbb97925b43b95231bb0478fd650a057cc4b00000000000000072003000000000000000000000000000000000000000000000000000000000007a12000000000000000000000000000000000000000000000000000000000000000000008cf9ee7255420a50c55ef35d4bdcdd8048dee5c3c1333ecd97aff98869ea280780008000000000000000000000000000000000000000000000000000000000007a1206869206d6f6d"))
# pprint.pprint(vaa)
# sys.exit(0)
gt = GenTest(False)
self.gt = gt
client = self.getAlgodClient()
print("Generating the foundation account...")
foundation = self.getTemporaryAccount(client)
player = self.getTemporaryAccount(client)
player2 = self.getTemporaryAccount(client)
player3 = self.getTemporaryAccount(client)
self.coreid = 4
print("coreid = " + str(self.coreid))
self.tokenid = 6
print("token bridge " + str(self.tokenid) + " address " + get_application_address(self.tokenid))
self.testid = self.createTestApp(client, player2)
print("testid " + str(self.testid) + " address " + get_application_address(self.testid))
print("Lets create a brand new non-wormhole asset and try to attest and send it out")
self.testasset = self.createTestAsset(client, player2)
print("test asset id: " + str(self.testasset))
print("Lets try to create an attest for a non-wormhole thing with a huge number of decimals")
# paul - attestFromAlgorand
sid = self.testAttest(client, player2, self.testasset)
print("... track down the generated VAA")
vaa = self.getVAA(client, player, sid, self.tokenid)
v = self.parseVAA(bytes.fromhex(vaa))
print("We got a " + v["Meta"])
# pprint.pprint(self.getBalances(client, player.getAddress()))
# pprint.pprint(self.getBalances(client, player2.getAddress()))
# pprint.pprint(self.getBalances(client, player3.getAddress()))
#
# print("Lets transfer that asset to one of our other accounts... first lets create the vaa")
# # paul - transferFromAlgorand
# sid = self.transferAsset(client, player2, self.testasset, 100, player3.getAddress(), 8, 0)
# print("... track down the generated VAA")
# vaa = self.getVAA(client, player, sid, self.tokenid)
# print(".. and lets pass that to player3")
# self.submitVAA(bytes.fromhex(vaa), client, player3)
#
# pprint.pprint(self.getBalances(client, player.getAddress()))
# pprint.pprint(self.getBalances(client, player2.getAddress()))
# pprint.pprint(self.getBalances(client, player3.getAddress()))
#
# # Lets split it into two parts... the payload and the fee
# print("Lets split it into two parts... the payload and the fee")
# sid = self.transferAsset(client, player2, self.testasset, 1000, player3.getAddress(), 8, 500)
# print("... track down the generated VAA")
# vaa = self.getVAA(client, player, sid, self.tokenid)
# print(".. and lets pass that to player3 with fees being passed to player acting as a relayer")
# self.submitVAA(bytes.fromhex(vaa), client, player)
#
# pprint.pprint(self.getBalances(client, player.getAddress()))
# pprint.pprint(self.getBalances(client, player2.getAddress()))
# pprint.pprint(self.getBalances(client, player3.getAddress()))
#
# # Now it gets tricky, lets create a virgin account...
# pk, addr = account.generate_account()
# emptyAccount = Account(pk)
#
# print("How much is in the empty account? (" + addr + ")")
# pprint.pprint(self.getBalances(client, emptyAccount.getAddress()))
#
# # paul - transferFromAlgorand
# print("Lets transfer algo this time.... first lets create the vaa")
# sid = self.transferAsset(client, player2, 0, 1000000, emptyAccount.getAddress(), 8, 0)
# print("... track down the generated VAA")
# vaa = self.getVAA(client, player, sid, self.tokenid)
## pprint.pprint(vaa)
# print(".. and lets pass that to the empty account.. but use somebody else to relay since we cannot pay for it")
#
# # paul - redeemOnAlgorand
# self.submitVAA(bytes.fromhex(vaa), client, player)
#
# print("=================================================")
#
# print("How much is in the source account now?")
# pprint.pprint(self.getBalances(client, player2.getAddress()))
#
# print("How much is in the empty account now?")
# pprint.pprint(self.getBalances(client, emptyAccount.getAddress()))
#
# print("How much is in the player3 account now?")
# pprint.pprint(self.getBalances(client, player3.getAddress()))
#
# print("Lets transfer more algo.. splut 50/50 with the relayer.. going to player3")
# sid = self.transferAsset(client, player2, 0, 1000000, player3.getAddress(), 8, 500000)
# print("... track down the generated VAA")
# vaa = self.getVAA(client, player, sid, self.tokenid)
# print(".. and lets pass that to player3.. but use the previously empty account to relay it")
# self.submitVAA(bytes.fromhex(vaa), client, emptyAccount)
#
# print("How much is in the source account now?")
# pprint.pprint(self.getBalances(client, player2.getAddress()))
#
# print("How much is in the empty account now?")
# pprint.pprint(self.getBalances(client, emptyAccount.getAddress()))
#
# print("How much is in the player3 account now?")
# pprint.pprint(self.getBalances(client, player3.getAddress()))
#
# print("How about a payload3")
# sid = self.transferAsset(client, player2, 0, 100, player3.getAddress(), 8, 0, b'hi mom')
# print("... track down the generated VAA")
# vaa = self.getVAA(client, player, sid, self.tokenid)
#
# print(".. and lets pass that to the wrong account")
# try:
# self.submitVAA(bytes.fromhex(vaa), client, emptyAccount)
# except:
# print("Exception thrown... nice")
#
# print(".. and lets pass that to the right account")
# self.submitVAA(bytes.fromhex(vaa), client, player3)
# print("player account: " + player.getAddress())
# pprint.pprint(client.account_info(player.getAddress()))
# print("player2 account: " + player2.getAddress())
# pprint.pprint(client.account_info(player2.getAddress()))
# print("foundation account: " + foundation.getAddress())
# pprint.pprint(client.account_info(foundation.getAddress()))
#
# print("core app: " + get_application_address(self.coreid))
# pprint.pprint(client.account_info(get_application_address(self.coreid))),
#
# print("token app: " + get_application_address(self.tokenid))
# pprint.pprint(client.account_info(get_application_address(self.tokenid))),
#
# print("asset app: " + chain_addr)
# pprint.pprint(client.account_info(chain_addr))
core = AlgoTest()
core.simple_test()