308 lines
11 KiB
Python
308 lines
11 KiB
Python
from Cryptodome.Hash import keccak
|
|
import pytest
|
|
import base64
|
|
import random
|
|
from wormhole_core import getCoreContracts
|
|
from algosdk.future import transaction
|
|
from algosdk.encoding import decode_address
|
|
from algosdk.logic import get_application_address
|
|
from algosdk.error import AlgodHTTPError
|
|
from admin import max_bits
|
|
|
|
CORE_NAME = "core"
|
|
CLEAR_NAME = "clear"
|
|
SEED_AMOUNT = 1000000
|
|
DEV_MODE = True
|
|
|
|
def pytest_namespace():
|
|
return {'core_id': 0}
|
|
|
|
@pytest.fixture(scope='function')
|
|
def core_id():
|
|
# Value is set after contract creation
|
|
return pytest.core_id
|
|
|
|
@pytest.fixture(scope='function')
|
|
def boot_vaa(gen_test, portal_core, client, core_id):
|
|
seq = int(random.random() * (2**31))
|
|
portal_core.client = client
|
|
portal_core.coreid = core_id
|
|
return bytes.fromhex(gen_test.genGuardianSetUpgrade(gen_test.guardianPrivKeys, portal_core.getGovSet(), portal_core.getGovSet(), seq, seq))
|
|
|
|
@pytest.fixture(scope='function')
|
|
def core_tmpl_lsig(portal_core, boot_vaa, core_id):
|
|
parsed_vaa = portal_core.parseVAA(boot_vaa)
|
|
core_address = get_application_address(core_id)
|
|
tsig = portal_core.tsig
|
|
|
|
return tsig.populate(
|
|
{
|
|
"TMPL_APP_ID": core_id,
|
|
"TMPL_APP_ADDRESS": decode_address(core_address).hex(),
|
|
"TMPL_ADDR_IDX": int(parsed_vaa['sequence'] / max_bits),
|
|
"TMPL_EMITTER_ID": parsed_vaa['chainRaw'].hex() + parsed_vaa['emitter'].hex(),
|
|
}
|
|
)
|
|
|
|
def tests_contract_creates_succesfully(gen_test, client, creator, portal_core):
|
|
approval_program, clear_program = getCoreContracts(gen_test, CORE_NAME, CLEAR_NAME, client, SEED_AMOUNT, portal_core.tsig, DEV_MODE)
|
|
globalSchema = transaction.StateSchema(num_uints=8, num_byte_slices=40)
|
|
localSchema = transaction.StateSchema(num_uints=0, num_byte_slices=16)
|
|
app_args = []
|
|
|
|
txn = transaction.ApplicationCreateTxn(
|
|
sender=creator.getAddress(),
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
approval_program=base64.b64decode(approval_program["result"]),
|
|
clear_program=base64.b64decode(clear_program["result"]),
|
|
global_schema=globalSchema,
|
|
local_schema=localSchema,
|
|
extra_pages = 1,
|
|
app_args=app_args,
|
|
sp=client.suggested_params(),
|
|
)
|
|
|
|
signedTxn = txn.sign(creator.getPrivateKey())
|
|
|
|
client.send_transaction(signedTxn)
|
|
|
|
response = portal_core.waitForTransaction(client, signedTxn.get_txid())
|
|
assert response.applicationIndex is not None and response.applicationIndex > 0
|
|
pytest.core_id = response.applicationIndex
|
|
|
|
def tests_allow_opt_in(client, core_tmpl_lsig, creator, portal_core, suggested_params, core_id):
|
|
core_address = get_application_address(core_id)
|
|
|
|
seed_payment = transaction.PaymentTxn(
|
|
sender=creator.getAddress(),
|
|
receiver=core_tmpl_lsig.address(),
|
|
amt=SEED_AMOUNT,
|
|
sp=suggested_params,
|
|
)
|
|
|
|
seed_payment.fee = 2 * seed_payment.fee
|
|
|
|
optin = transaction.ApplicationOptInTxn(
|
|
sender=core_tmpl_lsig.address(),
|
|
sp=suggested_params,
|
|
index=core_id,
|
|
rekey_to=core_address
|
|
)
|
|
optin.fee = 0
|
|
|
|
transaction.assign_group_id([seed_payment, optin])
|
|
signed_seed = seed_payment.sign(creator.getPrivateKey())
|
|
signed_optin = transaction.LogicSigTransaction(optin, core_tmpl_lsig)
|
|
|
|
client.send_transactions([signed_seed, signed_optin])
|
|
portal_core.waitForTransaction(client, signed_optin.get_txid())
|
|
|
|
def tests_allow_init(client, creator, portal_core, suggested_params, vaa_verify_lsig, boot_vaa, core_id):
|
|
core_address = get_application_address(core_id)
|
|
|
|
parsed_vaa = portal_core.parseVAA(boot_vaa)
|
|
portal_core.seed_amt = SEED_AMOUNT
|
|
seq_addr = portal_core.optin(client, creator, core_id, int(parsed_vaa["sequence"] / max_bits), parsed_vaa["chainRaw"].hex() + parsed_vaa["emitter"].hex())
|
|
guardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["index"], b"guardian".hex())
|
|
newguardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["NewGuardianSetIndex"], b"guardian".hex())
|
|
|
|
|
|
|
|
txns = [
|
|
transaction.ApplicationCallTxn(
|
|
sender=creator.getAddress(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"nop", b"0"],
|
|
sp=suggested_params
|
|
),
|
|
|
|
transaction.ApplicationCallTxn(
|
|
sender=creator.getAddress(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"nop", b"1"],
|
|
sp=suggested_params
|
|
),
|
|
|
|
transaction.ApplicationCallTxn(
|
|
sender=creator.getAddress(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"init", boot_vaa, decode_address(vaa_verify_lsig.address())],
|
|
accounts=[seq_addr, guardian_addr, newguardian_addr],
|
|
sp=suggested_params
|
|
),
|
|
|
|
transaction.PaymentTxn(
|
|
sender=creator.getAddress(),
|
|
receiver=vaa_verify_lsig.address(),
|
|
amt=100000,
|
|
sp=suggested_params
|
|
)
|
|
]
|
|
portal_core.sendTxn(client, creator, txns, True)
|
|
|
|
def tests_reject_another_init(client, creator, portal_core, suggested_params, vaa_verify_lsig, gen_test, core_id):
|
|
|
|
# Generate a different init vaa
|
|
seq = int(random.random() * (2**31))
|
|
portal_core.client = client
|
|
portal_core.coreid = core_id
|
|
boot_vaa = bytes.fromhex(gen_test.genGuardianSetUpgrade(gen_test.guardianPrivKeys, portal_core.getGovSet(), portal_core.getGovSet(), seq, seq))
|
|
|
|
parsed_vaa = portal_core.parseVAA(boot_vaa)
|
|
portal_core.seed_amt = SEED_AMOUNT
|
|
seq_addr = portal_core.optin(client, creator, core_id, int(parsed_vaa["sequence"] / max_bits), parsed_vaa["chainRaw"].hex() + parsed_vaa["emitter"].hex())
|
|
guardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["index"], b"guardian".hex())
|
|
newguardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["NewGuardianSetIndex"], b"guardian".hex())
|
|
|
|
|
|
|
|
txns = [
|
|
transaction.ApplicationCallTxn(
|
|
sender=creator.getAddress(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"nop", b"0"],
|
|
sp=suggested_params
|
|
),
|
|
|
|
transaction.ApplicationCallTxn(
|
|
sender=creator.getAddress(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"nop", b"1"],
|
|
sp=suggested_params
|
|
),
|
|
|
|
transaction.ApplicationCallTxn(
|
|
sender=creator.getAddress(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"init", boot_vaa, decode_address(vaa_verify_lsig.address())],
|
|
accounts=[seq_addr, guardian_addr, newguardian_addr],
|
|
sp=suggested_params
|
|
),
|
|
|
|
transaction.PaymentTxn(
|
|
sender=creator.getAddress(),
|
|
receiver=vaa_verify_lsig.address(),
|
|
amt=100000,
|
|
sp=suggested_params
|
|
)
|
|
]
|
|
|
|
with pytest.raises(AlgodHTTPError):
|
|
portal_core.sendTxn(client, creator, txns, True)
|
|
|
|
def test_rejects_evil_double_verify_vaa(gen_test, portal_core, client, creator, core_id, vaa_verify_lsig):
|
|
"""
|
|
A new verions of submitVAA. In an ideal word, we would generalize the functions
|
|
to reduce code duplication, but at his stage I prefer duplication over complexity.
|
|
NOTE: this reproduces an attack idea, and is not mean to reproduce a normal scenario
|
|
"""
|
|
|
|
seq = int(random.random() * (2**31))
|
|
signed_vaa = bytearray.fromhex(gen_test.createRandomSignedVAA(0,gen_test.guardianPrivKeys))
|
|
|
|
trash_vaa = bytearray.fromhex(gen_test.createTrashVAA(
|
|
guardianSetIndex=0,
|
|
ts=1,
|
|
nonce=1, # the nonce is irrelevant in algorand, batch not supported
|
|
emitterChainId=8,
|
|
emitterAddress=bytes([0x00]*32),
|
|
sequence=seq+1,
|
|
consistencyLevel=1,
|
|
target="",
|
|
payload="C0FFEEBABE",
|
|
version=1
|
|
))
|
|
|
|
# A lot of our logic here depends on parseVAA and knowing what the payload is..
|
|
parsed_vaa = portal_core.parseVAA(signed_vaa)
|
|
seq_addr = portal_core.optin(client, creator, core_id, int(parsed_vaa["sequence"] / max_bits), parsed_vaa["chainRaw"].hex() + parsed_vaa["emitter"].hex())
|
|
|
|
# And then the signatures to help us verify the vaa_s
|
|
guardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["index"], b"guardian".hex())
|
|
|
|
accts = [seq_addr, guardian_addr]
|
|
|
|
keys = portal_core.decodeLocalState(client, creator, core_id, guardian_addr)
|
|
print("keys: " + keys.hex())
|
|
|
|
sp = client.suggested_params()
|
|
|
|
txns = []
|
|
|
|
# How many signatures can we process in a single txn... we can do 9!
|
|
bsize = (9*66)
|
|
blocks = int(len(parsed_vaa["signatures"]) / bsize) + 1
|
|
|
|
# We don't pass the entire payload in but instead just pass it pre digested. This gets around size
|
|
# limitations with lsigs AND reduces the cost of the entire operation on a conjested network by reducing the
|
|
# bytes passed into the transaction
|
|
digest = keccak.new(digest_bits=256).update(keccak.new(digest_bits=256).update(parsed_vaa["digest"]).digest()).digest()
|
|
|
|
for i in range(blocks):
|
|
# Which signatures will we be verifying in this block
|
|
sigs = parsed_vaa["signatures"][(i * bsize):]
|
|
if (len(sigs) > bsize):
|
|
sigs = sigs[:bsize]
|
|
# keys
|
|
kset = b''
|
|
# Grab the key associated the signature
|
|
for q in range(int(len(sigs) / 66)):
|
|
# Which guardian is this signature associated with
|
|
g = sigs[q * 66]
|
|
key = keys[((g * 20) + 1) : (((g + 1) * 20) + 1)]
|
|
kset = kset + key
|
|
|
|
txns.append(transaction.ApplicationCallTxn(
|
|
sender=vaa_verify_lsig.address(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"verifySigs", sigs, kset, digest],
|
|
accounts=accts,
|
|
sp=sp
|
|
))
|
|
txns[-1].fee = 0
|
|
|
|
txns.append(transaction.ApplicationCallTxn(
|
|
sender=creator.getAddress(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"verifyVAA", signed_vaa],
|
|
accounts=accts,
|
|
sp=sp
|
|
))
|
|
|
|
# send second, unsigned "verifyVAA" call inside
|
|
# the transaction chain. this should obviously fail
|
|
txns.append(transaction.ApplicationCallTxn(
|
|
sender=creator.getAddress(),
|
|
index=core_id,
|
|
on_complete=transaction.OnComplete.NoOpOC,
|
|
app_args=[b"verifyVAA", trash_vaa],
|
|
accounts=accts,
|
|
sp=sp
|
|
))
|
|
txns[-1].fee = txns[-1].fee * (1 + blocks)
|
|
|
|
transaction.assign_group_id(txns)
|
|
|
|
grp = []
|
|
pk = creator.getPrivateKey()
|
|
for t in txns:
|
|
if ("app_args" in t.__dict__ and len(t.app_args) > 0 and t.app_args[0] == b"verifySigs"):
|
|
grp.append(transaction.LogicSigTransaction(t, vaa_verify_lsig))
|
|
else:
|
|
grp.append(t.sign(pk))
|
|
|
|
with pytest.raises(AlgodHTTPError) as error:
|
|
client.send_transactions(grp)
|
|
for x in grp:
|
|
portal_core.waitForTransaction(client, x.get_txid())
|
|
|
|
assert "pushint 504" in str(error), f"signed_vaa:\n {signed_vaa.hex()}\n, unsigned_vaa:\n{trash_vaa.hex()}\n"
|