add: new and adversarial tests

tests added. some try basic functionality which was understed. others
try to mimic what an attacker would try to do.

this is a baseline from which to create more adversarial tests and
improve the testing suite of the contracts.

tests/README.md includes instructions on how to run the new tests.
This commit is contained in:
Joaquin L. Pereyra 2022-10-03 17:27:11 -03:00 committed by jumpsiegel
parent 9020ca6985
commit fd94c649b0
10 changed files with 1039 additions and 12 deletions

View File

@ -30,6 +30,7 @@ eth_abi = "==2.1.1"
coincurve = "==16.0.0"
PyNaCl = "==1.5.0"
PyYAML = "==6.0"
eth-utils = "*"
[dev-packages]

View File

@ -813,7 +813,7 @@ class PortalCore:
seq_addr = self.optin(client, sender, appid, int(p["sequence"] / max_bits), p["chainRaw"].hex() + p["emitter"].hex())
assert self.check_bits_set(client, appid, seq_addr, p["sequence"]) == False
# assert self.check_bits_set(client, appid, seq_addr, p["sequence"]) == False
# And then the signatures to help us verify the vaa_s
guardian_addr = self.optin(client, sender, self.coreid, p["index"], b"guardian".hex())
@ -843,7 +843,9 @@ class PortalCore:
# How many signatures can we process in a single txn... we can do 9!
bsize = (9*66)
blocks = int(len(p["signatures"]) / bsize) + 1
# audit: this was incorrectly adding an extra, empty block when the amount
# of signatures was a multiple of 9. fixed.
blocks = int(len(p["signatures"]) / bsize) + int(vaa[5] % 9 != 0)
# We don't pass the entire payload in but instead just pass it pre digested. This gets around size
# limitations with lsigs AND reduces the cost of the entire operation on a conjested network by reducing the
@ -1010,7 +1012,10 @@ class PortalCore:
m = abi.Method("portal_transfer", [abi.Argument("byte[]")], abi.Returns("byte[]"))
txns.append(transaction.ApplicationCallTxn(
sender=sender.getAddress(),
index=int.from_bytes(bytes.fromhex(p["ToAddress"]), "big"),
# AUDIT: this is wrong, as the index should be only the last
# eight bytes of the destionation... we fixed it
index=int.from_bytes(bytes.fromhex(p["ToAddress"])[24:], "big"),
on_complete=transaction.OnComplete.NoOpOC,
app_args=[m.get_selector(), m.args[0].type.encode(vaa)],
foreign_assets = foreign_assets,
@ -1034,7 +1039,7 @@ class PortalCore:
if "logs" in response.__dict__ and len(response.__dict__["logs"]) > 0:
ret.append(response.__dict__["logs"])
assert self.check_bits_set(client, appid, seq_addr, p["sequence"]) == True
# assert self.check_bits_set(client, appid, seq_addr, p["sequence"]) == True
return ret
@ -1443,6 +1448,9 @@ class PortalCore:
parser.add_argument('--approve', type=str, help='compiled approve contract', default="")
parser.add_argument('--clear', type=str, help='compiled clear contract', default="")
parser.add_argument("--loops", type=int, help="testing: how many iterations should randomized tests run for. defaults to 1 for faster testing.", default="1")
parser.add_argument("--bigset", action="store_true", help="testing: use the big set of validators", default="1")
args = parser.parse_args()
self.init(args)

View File

@ -1,11 +1,12 @@
from eth_abi import encode_single, encode_abi
import sys
import string
import pprint
import time
from Cryptodome.Hash import keccak
import coincurve
import base64
from random import random
import random
from algosdk.encoding import decode_address
class GenTest:
@ -81,8 +82,24 @@ class GenTest:
return encode_single(type, val).hex()[64-(64):64]
raise Exception("invalid type")
def createSignedVAA(self, guardianSetIndex, signers, ts, nonce, emitterChainId, emitterAddress, sequence, consistencyLevel, target, payload):
print("createSignedVAA: " + str(signers))
def createTrashVAA(self, guardianSetIndex, ts, nonce, emitterChainId, emitterAddress, sequence, consistencyLevel, target, payload, version=1):
return self.createSignedVAA(
guardianSetIndex,
# set the minimum amount of trash as signature for this to pass validations
[random.randbytes(32).hex() for _ in range(int(len(self.guardianKeys)*2/3)+1)],
ts,
nonce,
emitterChainId,
emitterAddress,
sequence,
consistencyLevel,
target,
payload,
version
)
def createSignedVAA(self, guardianSetIndex, signers, ts, nonce, emitterChainId, emitterAddress, sequence, consistencyLevel, target, payload, version=1):
b = ""
b += self.encoder("uint32", ts)
@ -104,14 +121,96 @@ class GenTest:
signature = key.sign_recoverable(hash, hasher=None)
signatures += signature.hex()
ret = self.encoder("uint8", 1)
ret = self.encoder("uint8", version)
ret += self.encoder("uint32", guardianSetIndex)
ret += self.encoder("uint8", len(signers))
ret += signatures
ret += b
print(ret)
return ret
def createValidRandomSignedVAA(self, guardianSetIndex, signers, sequence):
ts = random.randint(0, 2**32-1)
nonce = random.randint(0, 2**32-1)
emitterChainId = random.randint(0, 2**16-1)
emitterAddress = random.randbytes(32)
consitencyLevel = random.randint(0, 2**8-1)
payload = self.createRandomValidPayload().hex()
return self.createSignedVAA(
guardianSetIndex, # guardian set index needs to be fixed so contract knows where to look into
signers,
ts,
nonce,
emitterChainId,
emitterAddress,
sequence,
consitencyLevel,
0, #target = not used?
payload,
1, # only version 1 VAA
)
def createRandomValidPayload(self):
action = (0x03).to_bytes(1, byteorder="big")
# action = random.choice([0x01, 0x03]).to_bytes(1, byteorder="big")
amount = random.randint(0, 2**128-1).to_bytes(32, byteorder="big")
# TODO: we should support more addresses than this one, but this
# is hardcoded in the tests and probably used in the deploy, so we
# will make do. same goes for the token_address
some_token_address = b"4523c3F29447d1f32AEa95BEBD00383c4640F1b4"
tokenAddress = some_token_address
# TODO: same goes for the token chain, just use what's available for now
try:
tokenChain = bytes.fromhex(self.getEmitter(1))
except:
raise
to = random.randbytes(32)
toChain = random.randint(0, 2**16-1).to_bytes(2, byteorder="big")
payload = action + amount + tokenAddress + tokenChain + to + toChain
if action == 0x01:
fee = random.randint(0, 2**256-1).to_bytes(32, byteorder="big")
payload += fee
if action == 0x03:
fromAddress = random.randbytes(2)
arbitraryPayload = random.randbytes(random.randint(0,4))
payload += fromAddress + arbitraryPayload
return payload
def createRandomSignedVAA(self, guardianSetIndex, signers):
ts = random.randint(0, 2**32-1)
nonce = random.randint(0, 2**32-1)
emitterChainId = random.randint(0, 2**16-1)
emitterAddress = random.randbytes(32)
sequence = random.randint(0, 2**64-1)
consitencyLevel = random.randint(0, 2**8-1)
# payload = ''.join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(0,500)))
payload = random.randbytes(random.randint(0,496)).hex()
version = random.randint(0,10)
return self.createSignedVAA(
guardianSetIndex, # guardian set index needs to be fixed so contract knows where to look into
signers,
ts,
nonce,
emitterChainId,
emitterAddress,
sequence,
consitencyLevel,
0, #target = not used?
payload,
version,
)
def genGuardianSetUpgrade(self, signers, guardianSet, targetSet, nonce, seq):
b = self.zeroPadBytes[0:(28*2)]
b += self.encoder("uint8", ord("C"))
@ -206,6 +305,35 @@ class GenTest:
emitter = bytes.fromhex(self.getEmitter(chain))
return self.createSignedVAA(guardianSet, signers, int(time.time()), nonce, 1, emitter, seq, 32, 0, b)
def genRandomValidTransfer(self,
signers,
guardianSet,
seq,
tokenAddress,
toAddress,
amount_max):
amount = random.randint(0, int(amount_max / 100000000))
fee = random.randint(0, amount) # fee must be lower than amount for VAA to be valid
return self.genTransfer(
signers=signers,
guardianSet=guardianSet,
nonce=random.randint(0, 2**32-1),
seq=seq,
# amount gets encoded as an uint256, but it's actually clearly
# to only eight bytes. all other bytes _must_ be zero.
amount=amount,
# token address must be registed on the bridge
tokenAddress=tokenAddress,
# tokenAddress=random.randbytes(32),
tokenChain=1,
toAddress=toAddress,
# must be directed at algorand chain
toChain=8,
# fee is in the same situation as amount
fee=fee,
)
def genTransfer(self, signers, guardianSet, nonce, seq, amount, tokenAddress, tokenChain, toAddress, toChain, fee):
b = self.encoder("uint8", 1)
b += self.encoder("uint256", int(amount * 100000000))
@ -226,7 +354,7 @@ class GenTest:
return self.createSignedVAA(guardianSet, signers, int(time.time()), nonce, 1, emitter, seq, 32, 0, b)
def genVaa(self, emitter, seq, payload):
nonce = int(random() * 4000000.0)
nonce = int(random.random() * 4000000.0)
return self.createSignedVAA(1, self.guardianPrivKeys, int(time.time()), nonce, 8, emitter, seq, 32, 0, payload.hex())
def test(self):

34
algorand/test/README.md Normal file
View File

@ -0,0 +1,34 @@
# Adversarial and extra testing
README for the adversarial and extra tests added by Coinspect during the audit.
## Requirements
Some of the new tests require `pytest`, which was already declared as a
dependency.
Two new randomized test use the approach that previous tests where using
and are simply in `test.py`. See `Usage`.
## Usage
### `test.py`
`test.py` now accept two additional flags:
`--loops` defines how many times the tests should run
`--bigset` defines if it should use the big guardian set
For example, to run the tests with 10 loops and a big validator set.
Note the `--loop` flag will not affect previous tests.
```
python test.py --loops 10 --bigset
```
### Running Pytest tests
Simple do:
```
$ pytest
```
## Notes
Shared fixtures are declared in `conftest.py`

46
algorand/test/conftest.py Normal file
View File

@ -0,0 +1,46 @@
import sys
sys.path.append("..")
import pytest
import base64
from admin import PortalCore
from gentest import GenTest
from algosdk.future import transaction
from vaa_verify import get_vaa_verify
@pytest.fixture(scope='module')
def portal_core():
portal_core = PortalCore()
portal_core.devnet = True;
return portal_core
@pytest.fixture(scope='module')
def gen_test():
gen_test = GenTest(False)
return gen_test
@pytest.fixture(scope='module')
def client(portal_core):
return portal_core.getAlgodClient()
@pytest.fixture(scope='module')
def suggested_params(client):
return client.suggested_params()
@pytest.fixture(scope='module')
def creator(portal_core, client):
return portal_core.getTemporaryAccount(client)
@pytest.fixture(scope='module')
def vaa_verify_lsig(portal_core, client, creator, suggested_params):
response = client.compile(get_vaa_verify())
print(response)
lsig = transaction.LogicSigAccount(base64.b64decode(response['result']))
txn = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=lsig.address(),
amt=1000000,
sp=suggested_params,
)
signedTxn = txn.sign(creator.getPrivateKey())
client.send_transaction(signedTxn)
portal_core.waitForTransaction(client, signedTxn.get_txid())
return lsig

3
algorand/test/pytest.ini Normal file
View File

@ -0,0 +1,3 @@
[pytest]
filterwarnings =
ignore::DeprecationWarning

View File

@ -20,6 +20,7 @@ from algosdk.kmd import KMDClient
from algosdk import account, mnemonic
from algosdk.encoding import decode_address, encode_address
from algosdk.future import transaction
import algosdk
from pyteal import compileTeal, Mode, Expr
from pyteal import *
from algosdk.logic import get_application_address
@ -503,11 +504,12 @@ class AlgoTest(PortalCore):
# pprint.pprint(vaa)
# sys.exit(0)
gt = GenTest(False)
self.gt = gt
self.setup_args()
gt = GenTest(self.args.bigset)
self.gt = gt
if self.args.testnet:
self.testnet()
else:
@ -617,6 +619,112 @@ class AlgoTest(PortalCore):
self.submitVAA(transferVAA, client, player, self.tokenid)
seq += 1
def double_submit_transfer_vaa_fails(seq):
"""
Resend the same transaction we just send, changing only its nonce.
This should fail _as long as the sequence number is not incremented_
"""
# send a nice VAA to begin with. everything but these settings will be random
# so we can be sure this works with many different VAAs -- as long as they are valid
# non-valid vaas fail for other reasons
vaa = bytearray.fromhex(gt.genRandomValidTransfer(
signers=gt.guardianPrivKeys,
guardianSet=1,
seq=seq,
# we set the max_amount, but the actual amount will be between zero and this value
amount_max=self.getBalances(client, player.getAddress())[0], # 0 is the ALGO amount
tokenAddress=bytes.fromhex("4523c3F29447d1f32AEa95BEBD00383c4640F1b4"),
toAddress=decode_address(player.getAddress()),
))
self.submitVAA(vaa, client, player, self.tokenid)
# Let's make this even stronger: scramble the few bytes we can (len_signatures, signatures)
# so the repeated one is still valid, but different from the first one.
# NOTE: this will only be interesting if we are working with a big validator set,
# don't even botters if it's not
if len(gt.guardianKeys) > 1:
current_signatures_amount = vaa[5]
signatures_len = 66*current_signatures_amount
signatures_offset = 6
rest_offset = signatures_offset+signatures_len
new_signature_amount = random.randint(int(len(gt.guardianKeys)*2/3)+1, current_signatures_amount)
# construct a list of every siganture with its index
signatures = vaa[signatures_offset:rest_offset]
signatures = [signatures[i:i+66] for i in range(0, len(signatures), 66)]
assert len(signatures) == current_signatures_amount
# scramble the signatures so we get new bytes
new_signatures = random.sample(signatures, k=new_signature_amount)
assert len(new_signatures) == new_signature_amount
new_signatures = b''.join(new_signatures)
vaa[5] = new_signature_amount
new_vaa = vaa[:6] + new_signatures + vaa[rest_offset:]
assert(len(new_vaa) == len(vaa)-((current_signatures_amount-new_signature_amount)*66))
vaa = new_vaa
# now try again!
try:
self.submitVAA(vaa, client, player, self.tokenid)
except algosdk.error.AlgodHTTPError as e:
# should fail right at line 936
if "opcodes=pushint 936" in str(e):
return True, vaa, None
return False, vaa, e
return False, vaa, None
for _ in range(self.args.loops):
result, vaa, err = double_submit_transfer_vaa_fails(seq)
if err != None:
assert False, f"!!! ERR: unepexted error. error:\n {err}\noffending vaa hex:\n{vaa.hex()}"
assert result, f"!!! ERR: sending same VAA twice worked. offending vaa hex:\n{vaa.hex()}"
seq+=1
return
def sending_vaa_version_not_one_fails(seq, version):
vaa = bytearray.fromhex(gt.genRandomValidTransfer(
signers=gt.guardianPrivKeys,
guardianSet=1,
seq=seq,
tokenAddress=bytes.fromhex("4523c3F29447d1f32AEa95BEBD00383c4640F1b4"),
toAddress=decode_address(player.getAddress()),
amount_max=self.getBalances(client, player.getAddress())[0], # 0 is the ALGO amount
))
# we know VAA is malleable in the first four fields:
# version, guardian set index, len of signatures, signatures
vaa[0] = version
try:
self.submitVAA(vaa, client, player, self.tokenid)
except algosdk.error.AlgodHTTPError as e:
# right at the beginning of checkForDuplicate()
if "opcodes=pushint 919" in str(e):
return True, vaa, None
return False, vaa, e
return False, vaa, None
# no need to increase _seq_ after this one as if everything went ok...
# all VAAs should have been invalid!
for _ in range(self.args.loops):
version = random.randint(0, 255)
if version == 1:
continue
ok, vaa, err = sending_vaa_version_not_one_fails(seq, version)
if err != None:
assert False, f"!!! ERR: unepexted error when testing version. error:\n {err}\noffending vaa hex:\n{vaa.hex()}"
assert ok, f"!!! ERR: Invalid version worked. offending version: {version}. offending vaa:\n{vaa}"
print("Create the test app we will use to torture ourselves using a new player")
player2 = self.getTemporaryAccount(client)
print("player2 address " + player2.getAddress())
@ -639,7 +747,7 @@ class AlgoTest(PortalCore):
sid = self.testAttest(client, player2, 0)
vaa = self.getVAA(client, player, sid, self.tokenid)
v = self.parseVAA(bytes.fromhex(vaa))
print("We got a " + v["Meta"])
print("We got a " + str(v["Meta"]))
print("Lets try to create an attest for a non-wormhole thing with a huge number of decimals")
# paul - attestFromAlgorand

View File

@ -0,0 +1,213 @@
from algosdk.encoding import decode_address
import pytest
from algosdk.future import transaction
from algosdk.logic import get_application_address
from algosdk.error import AlgodHTTPError
@pytest.fixture(scope='module')
def correct_app_id(portal_core, client, creator):
return portal_core.createTestApp(client,creator)
@pytest.fixture(scope='module')
def incorrect_app_id(portal_core, client, creator):
return portal_core.createTestApp(client,creator)
@pytest.fixture(scope='module')
def tmpl_lsig(portal_core, client, correct_app_id, creator, suggested_params):
appAddress = get_application_address(correct_app_id)
tsig = portal_core.tsig
lsig = tsig.populate(
{
"TMPL_APP_ID": correct_app_id,
"TMPL_APP_ADDRESS": decode_address(appAddress).hex(),
"TMPL_ADDR_IDX": 0,
"TMPL_EMITTER_ID": b"emitter".hex(),
}
)
txn = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=lsig.address(),
amt=1000000,
sp=suggested_params,
)
signedTxn = txn.sign(creator.getPrivateKey())
client.send_transaction(signedTxn)
portal_core.waitForTransaction(client, signedTxn.get_txid())
return lsig
def tests_rejection_on_payment(client, portal_core, tmpl_lsig, creator, suggested_params):
with pytest.raises(AlgodHTTPError):
feePayment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=suggested_params
)
feePayment.fee = 2 * feePayment.fee
payment = transaction.PaymentTxn(
sender=tmpl_lsig.address(),
receiver=tmpl_lsig.address(),
amt=0,
sp=suggested_params
)
payment.fee = 0
transaction.assign_group_id([feePayment, payment])
signedFeePayment = feePayment.sign(creator.getPrivateKey())
signedPayment = transaction.LogicSigTransaction(lsig=tmpl_lsig, transaction=payment)
client.send_transactions([signedFeePayment, signedPayment])
portal_core.waitForTransaction(client, signedPayment.get_txid())
def tests_rejection_on_asset_transfer(client, portal_core, tmpl_lsig, creator, suggested_params):
with pytest.raises(AlgodHTTPError):
fee_payment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=suggested_params
)
fee_payment.fee = 2 * fee_payment.fee
asset_transfer = transaction.AssetTransferTxn(
index=1,
sender=tmpl_lsig.address(),
receiver=tmpl_lsig.address(),
amt=0,
sp=suggested_params
)
asset_transfer.fee = 0
transaction.assign_group_id([fee_payment, asset_transfer])
signedFeePayment = fee_payment.sign(creator.getPrivateKey())
signedAssetTransfer = transaction.LogicSigTransaction(lsig=tmpl_lsig, transaction=asset_transfer)
client.send_transactions([signedFeePayment, signedAssetTransfer])
portal_core.waitForTransaction(client, signedAssetTransfer.get_txid())
def tests_rejection_on_nop(client, portal_core, tmpl_lsig, correct_app_id, creator, suggested_params):
with pytest.raises(AlgodHTTPError):
fee_payment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=suggested_params
)
fee_payment.fee = 2 * fee_payment.fee
noop = transaction.ApplicationCallTxn(
index=correct_app_id,
sender=tmpl_lsig.address(),
sp=suggested_params,
on_complete=transaction.OnComplete.NoOpOC
)
noop.fee = 0
transaction.assign_group_id([fee_payment, noop])
signedFeePayment = fee_payment.sign(creator.getPrivateKey())
signedNoop = transaction.LogicSigTransaction(lsig=tmpl_lsig, transaction=noop)
client.send_transactions([signedFeePayment, signedNoop])
portal_core.waitForTransaction(client, signedNoop.get_txid())
def tests_rejection_on_opt_in_to_incorrect_app(client, portal_core, tmpl_lsig, incorrect_app_id, creator, suggested_params):
with pytest.raises(AlgodHTTPError):
fee_payment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=suggested_params
)
fee_payment.fee = 2*fee_payment.fee
opt_in = transaction.ApplicationCallTxn(
index=incorrect_app_id,
sender=tmpl_lsig.address(),
sp=suggested_params,
on_complete=transaction.OnComplete.OptInOC
)
opt_in.fee = 0
transaction.assign_group_id([fee_payment, opt_in])
signedFeePayment = fee_payment.sign(creator.getPrivateKey())
signedOptIn = transaction.LogicSigTransaction(lsig=tmpl_lsig, transaction=opt_in)
client.send_transactions([signedFeePayment, signedOptIn])
portal_core.waitForTransaction(client, signedOptIn.get_txid())
def tests_rejection_on_opt_in_to_correct_app_without_rekeying(client, portal_core, tmpl_lsig, correct_app_id, creator, suggested_params):
with pytest.raises(AlgodHTTPError):
fee_payment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=suggested_params
)
fee_payment.fee = 2*fee_payment.fee
opt_in = transaction.ApplicationCallTxn(
index=correct_app_id,
sender=tmpl_lsig.address(),
sp=suggested_params,
on_complete=transaction.OnComplete.OptInOC
)
opt_in.fee = 0
transaction.assign_group_id([fee_payment, opt_in])
signedFeePayment = fee_payment.sign(creator.getPrivateKey())
signedOptIn = transaction.LogicSigTransaction(lsig=tmpl_lsig, transaction=opt_in)
client.send_transactions([signedFeePayment, signedOptIn])
portal_core.waitForTransaction(client, signedOptIn.get_txid())
def tests_rejection_on_opt_in_to_correct_app_with_rekeying_with_non_zero_fee(client, portal_core, tmpl_lsig, correct_app_id, suggested_params):
with pytest.raises(AlgodHTTPError):
txn = transaction.ApplicationCallTxn(
index=correct_app_id,
sender=tmpl_lsig.address(),
sp=suggested_params,
rekey_to=get_application_address(correct_app_id),
on_complete=transaction.OnComplete.NoOpOC
)
signedTxn = transaction.LogicSigTransaction(lsig=tmpl_lsig, transaction=txn)
client.send_transaction(signedTxn)
portal_core.waitForTransaction(client, signedTxn.get_txid())
def tests_success(client, portal_core, creator, tmpl_lsig, correct_app_id, suggested_params):
fee_payment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=suggested_params
)
fee_payment.fee = 2*fee_payment.fee
opt_in = transaction.ApplicationCallTxn(
index=correct_app_id,
sender=tmpl_lsig.address(),
sp=suggested_params,
rekey_to=get_application_address(correct_app_id),
on_complete=transaction.OnComplete.OptInOC
)
opt_in.fee = 0
transaction.assign_group_id([fee_payment, opt_in])
signedFeePayment = fee_payment.sign(creator.getPrivateKey())
signedOptIn = transaction.LogicSigTransaction(lsig=tmpl_lsig, transaction=opt_in)
client.send_transactions([signedFeePayment, signedOptIn])
portal_core.waitForTransaction(client, signedOptIn.get_txid())

View File

@ -0,0 +1,179 @@
import pytest
import coincurve
from algosdk.future import transaction
from algosdk.error import AlgodHTTPError
from algosdk.logic import get_application_address
from Cryptodome.Hash import keccak
@pytest.fixture(scope='module')
def app_id(portal_core, client, creator):
return portal_core.createTestApp(client,creator)
@pytest.fixture(scope='module')
def signers(gen_test):
return bytes.fromhex("".join(gen_test.guardianKeys))
@pytest.fixture(scope='module')
def signers_private_keys(gen_test):
return gen_test.guardianPrivKeys
@pytest.fixture(scope='module')
def hash():
return keccak.new(digest_bits=256).update(b"42").digest()
@pytest.fixture(scope='module')
def incorrect_hash():
return keccak.new(digest_bits=256).update(b"error").digest()
@pytest.fixture
def signatures(gen_test,signers_private_keys, hash):
signatures = ""
for i in range(len(signers_private_keys)):
signatures += gen_test.encoder("uint8", i)
key = coincurve.PrivateKey(bytes.fromhex(signers_private_keys[i]))
signature = key.sign_recoverable(hash, hasher=None)
signatures += signature.hex()
return bytes.fromhex(signatures)
def tests_rejection_on_rekey(client, portal_core, creator, vaa_verify_lsig, app_id):
with pytest.raises(AlgodHTTPError):
doubleFee = client.suggested_params()
doubleFee.flat_fee = True
doubleFee.fee = 2000
feePayment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=doubleFee
)
zeroFee = client.suggested_params()
zeroFee.flat_fee = True
zeroFee.fee = 0
noop = transaction.ApplicationCallTxn(
index=app_id,
sender=vaa_verify_lsig.address(),
sp=zeroFee,
rekey_to=get_application_address(app_id),
on_complete=transaction.OnComplete.NoOpOC
)
transaction.assign_group_id([feePayment, noop])
signedFeePayment = feePayment.sign(creator.getPrivateKey())
signedNoop = transaction.LogicSigTransaction(lsig=vaa_verify_lsig, transaction=noop)
client.send_transactions([signedFeePayment, signedNoop])
portal_core.waitForTransaction(client, signedNoop.get_txid())
def tests_rejection_on_non_app_call(client, portal_core, creator, vaa_verify_lsig):
with pytest.raises(AlgodHTTPError):
doubleFee = client.suggested_params()
doubleFee.flat_fee = True
doubleFee.fee = 2000
feePayment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=doubleFee
)
zeroFee = client.suggested_params()
zeroFee.flat_fee = True
zeroFee.fee = 0
payment = transaction.PaymentTxn(
sender=vaa_verify_lsig.address(),
receiver=vaa_verify_lsig.address(),
amt=0,
sp=zeroFee,
)
transaction.assign_group_id([feePayment, payment])
signedFeePayment = feePayment.sign(creator.getPrivateKey())
signedPayment = transaction.LogicSigTransaction(lsig=vaa_verify_lsig, transaction=payment)
client.send_transactions([signedFeePayment, signedPayment])
portal_core.waitForTransaction(client, signedPayment.get_txid())
def tests_rejection_on_zero_fee(client, portal_core, vaa_verify_lsig, app_id, signatures, hash, signers, suggested_params):
with pytest.raises(AlgodHTTPError):
noop = transaction.ApplicationCallTxn(
index=app_id,
sender=vaa_verify_lsig.address(),
sp=suggested_params,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"nop", signatures, signers, hash]
)
signedNoop = transaction.LogicSigTransaction(lsig=vaa_verify_lsig, transaction=noop)
client.send_transaction(signedNoop)
portal_core.waitForTransaction(client, signedNoop.get_txid())
def tests_rejection_on_hash_not_signed(client, portal_core, creator, vaa_verify_lsig, app_id, signatures, incorrect_hash, signers):
with pytest.raises(AlgodHTTPError):
doubleFee = client.suggested_params()
doubleFee.flat_fee = True
doubleFee.fee = 2000
feePayment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=doubleFee
)
zeroFee = client.suggested_params()
zeroFee.flat_fee = True
zeroFee.fee = 0
noop = transaction.ApplicationCallTxn(
index=app_id,
sender=vaa_verify_lsig.address(),
sp=zeroFee,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"nop", signatures, signers, incorrect_hash]
)
transaction.assign_group_id([feePayment, noop])
signedFeePayment = feePayment.sign(creator.getPrivateKey())
signedNoop = transaction.LogicSigTransaction(lsig=vaa_verify_lsig, transaction=noop)
client.send_transactions([signedFeePayment, signedNoop])
portal_core.waitForTransaction(client, signedNoop.get_txid())
def tests_success(client, portal_core, creator, vaa_verify_lsig, app_id, signatures, hash, signers):
doubleFee = client.suggested_params()
doubleFee.flat_fee = True
doubleFee.fee = 2000
feePayment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=creator.getAddress(),
amt=0,
sp=doubleFee
)
zeroFee = client.suggested_params()
zeroFee.flat_fee = True
zeroFee.fee = 0
noop = transaction.ApplicationCallTxn(
index=app_id,
sender=vaa_verify_lsig.address(),
sp=zeroFee,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"nop", signatures, signers, hash]
)
transaction.assign_group_id([feePayment, noop])
signedFeePayment = feePayment.sign(creator.getPrivateKey())
signedNoop = transaction.LogicSigTransaction(lsig=vaa_verify_lsig, transaction=noop)
client.send_transactions([signedFeePayment, signedNoop])
portal_core.waitForTransaction(client, signedNoop.get_txid())

View File

@ -0,0 +1,307 @@
from Cryptodome.Hash import keccak
import pytest
import base64
import random
from wormhole_core import getCoreContracts
from algosdk.future import transaction
from algosdk.encoding import decode_address
from algosdk.logic import get_application_address
from algosdk.error import AlgodHTTPError
from admin import max_bits
CORE_NAME = "core"
CLEAR_NAME = "clear"
SEED_AMOUNT = 1000000
DEV_MODE = True
def pytest_namespace():
return {'core_id': 0}
@pytest.fixture(scope='function')
def core_id():
# Value is set after contract creation
return pytest.core_id
@pytest.fixture(scope='function')
def boot_vaa(gen_test, portal_core, client, core_id):
seq = int(random.random() * (2**31))
portal_core.client = client
portal_core.coreid = core_id
return bytes.fromhex(gen_test.genGuardianSetUpgrade(gen_test.guardianPrivKeys, portal_core.getGovSet(), portal_core.getGovSet(), seq, seq))
@pytest.fixture(scope='function')
def core_tmpl_lsig(portal_core, boot_vaa, core_id):
parsed_vaa = portal_core.parseVAA(boot_vaa)
core_address = get_application_address(core_id)
tsig = portal_core.tsig
return tsig.populate(
{
"TMPL_APP_ID": core_id,
"TMPL_APP_ADDRESS": decode_address(core_address).hex(),
"TMPL_ADDR_IDX": int(parsed_vaa['sequence'] / max_bits),
"TMPL_EMITTER_ID": parsed_vaa['chainRaw'].hex() + parsed_vaa['emitter'].hex(),
}
)
def tests_contract_creates_succesfully(gen_test, client, creator, portal_core):
approval_program, clear_program = getCoreContracts(gen_test, CORE_NAME, CLEAR_NAME, client, SEED_AMOUNT, portal_core.tsig, DEV_MODE)
globalSchema = transaction.StateSchema(num_uints=8, num_byte_slices=40)
localSchema = transaction.StateSchema(num_uints=0, num_byte_slices=16)
app_args = []
txn = transaction.ApplicationCreateTxn(
sender=creator.getAddress(),
on_complete=transaction.OnComplete.NoOpOC,
approval_program=base64.b64decode(approval_program["result"]),
clear_program=base64.b64decode(clear_program["result"]),
global_schema=globalSchema,
local_schema=localSchema,
extra_pages = 1,
app_args=app_args,
sp=client.suggested_params(),
)
signedTxn = txn.sign(creator.getPrivateKey())
client.send_transaction(signedTxn)
response = portal_core.waitForTransaction(client, signedTxn.get_txid())
assert response.applicationIndex is not None and response.applicationIndex > 0
pytest.core_id = response.applicationIndex
def tests_allow_opt_in(client, core_tmpl_lsig, creator, portal_core, suggested_params, core_id):
core_address = get_application_address(core_id)
seed_payment = transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=core_tmpl_lsig.address(),
amt=SEED_AMOUNT,
sp=suggested_params,
)
seed_payment.fee = 2 * seed_payment.fee
optin = transaction.ApplicationOptInTxn(
sender=core_tmpl_lsig.address(),
sp=suggested_params,
index=core_id,
rekey_to=core_address
)
optin.fee = 0
transaction.assign_group_id([seed_payment, optin])
signed_seed = seed_payment.sign(creator.getPrivateKey())
signed_optin = transaction.LogicSigTransaction(optin, core_tmpl_lsig)
client.send_transactions([signed_seed, signed_optin])
portal_core.waitForTransaction(client, signed_optin.get_txid())
def tests_allow_init(client, creator, portal_core, suggested_params, vaa_verify_lsig, boot_vaa, core_id):
core_address = get_application_address(core_id)
parsed_vaa = portal_core.parseVAA(boot_vaa)
portal_core.seed_amt = SEED_AMOUNT
seq_addr = portal_core.optin(client, creator, core_id, int(parsed_vaa["sequence"] / max_bits), parsed_vaa["chainRaw"].hex() + parsed_vaa["emitter"].hex())
guardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["index"], b"guardian".hex())
newguardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["NewGuardianSetIndex"], b"guardian".hex())
txns = [
transaction.ApplicationCallTxn(
sender=creator.getAddress(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"nop", b"0"],
sp=suggested_params
),
transaction.ApplicationCallTxn(
sender=creator.getAddress(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"nop", b"1"],
sp=suggested_params
),
transaction.ApplicationCallTxn(
sender=creator.getAddress(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"init", boot_vaa, decode_address(vaa_verify_lsig.address())],
accounts=[seq_addr, guardian_addr, newguardian_addr],
sp=suggested_params
),
transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=vaa_verify_lsig.address(),
amt=100000,
sp=suggested_params
)
]
portal_core.sendTxn(client, creator, txns, True)
def tests_reject_another_init(client, creator, portal_core, suggested_params, vaa_verify_lsig, gen_test, core_id):
# Generate a different init vaa
seq = int(random.random() * (2**31))
portal_core.client = client
portal_core.coreid = core_id
boot_vaa = bytes.fromhex(gen_test.genGuardianSetUpgrade(gen_test.guardianPrivKeys, portal_core.getGovSet(), portal_core.getGovSet(), seq, seq))
parsed_vaa = portal_core.parseVAA(boot_vaa)
portal_core.seed_amt = SEED_AMOUNT
seq_addr = portal_core.optin(client, creator, core_id, int(parsed_vaa["sequence"] / max_bits), parsed_vaa["chainRaw"].hex() + parsed_vaa["emitter"].hex())
guardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["index"], b"guardian".hex())
newguardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["NewGuardianSetIndex"], b"guardian".hex())
txns = [
transaction.ApplicationCallTxn(
sender=creator.getAddress(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"nop", b"0"],
sp=suggested_params
),
transaction.ApplicationCallTxn(
sender=creator.getAddress(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"nop", b"1"],
sp=suggested_params
),
transaction.ApplicationCallTxn(
sender=creator.getAddress(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"init", boot_vaa, decode_address(vaa_verify_lsig.address())],
accounts=[seq_addr, guardian_addr, newguardian_addr],
sp=suggested_params
),
transaction.PaymentTxn(
sender=creator.getAddress(),
receiver=vaa_verify_lsig.address(),
amt=100000,
sp=suggested_params
)
]
with pytest.raises(AlgodHTTPError):
portal_core.sendTxn(client, creator, txns, True)
def test_rejects_evil_double_verify_vaa(gen_test, portal_core, client, creator, core_id, vaa_verify_lsig):
"""
A new verions of submitVAA. In an ideal word, we would generalize the functions
to reduce code duplication, but at his stage I prefer duplication over complexity.
NOTE: this reproduces an attack idea, and is not mean to reproduce a normal scenario
"""
seq = int(random.random() * (2**31))
signed_vaa = bytearray.fromhex(gen_test.createRandomSignedVAA(0,gen_test.guardianPrivKeys))
trash_vaa = bytearray.fromhex(gen_test.createTrashVAA(
guardianSetIndex=0,
ts=1,
nonce=1, # the nonce is irrelevant in algorand, batch not supported
emitterChainId=8,
emitterAddress=bytes([0x00]*32),
sequence=seq+1,
consistencyLevel=1,
target="",
payload="C0FFEEBABE",
version=1
))
# A lot of our logic here depends on parseVAA and knowing what the payload is..
parsed_vaa = portal_core.parseVAA(signed_vaa)
seq_addr = portal_core.optin(client, creator, core_id, int(parsed_vaa["sequence"] / max_bits), parsed_vaa["chainRaw"].hex() + parsed_vaa["emitter"].hex())
# And then the signatures to help us verify the vaa_s
guardian_addr = portal_core.optin(client, creator, core_id, parsed_vaa["index"], b"guardian".hex())
accts = [seq_addr, guardian_addr]
keys = portal_core.decodeLocalState(client, creator, core_id, guardian_addr)
print("keys: " + keys.hex())
sp = client.suggested_params()
txns = []
# How many signatures can we process in a single txn... we can do 9!
bsize = (9*66)
blocks = int(len(parsed_vaa["signatures"]) / bsize) + 1
# We don't pass the entire payload in but instead just pass it pre digested. This gets around size
# limitations with lsigs AND reduces the cost of the entire operation on a conjested network by reducing the
# bytes passed into the transaction
digest = keccak.new(digest_bits=256).update(keccak.new(digest_bits=256).update(parsed_vaa["digest"]).digest()).digest()
for i in range(blocks):
# Which signatures will we be verifying in this block
sigs = parsed_vaa["signatures"][(i * bsize):]
if (len(sigs) > bsize):
sigs = sigs[:bsize]
# keys
kset = b''
# Grab the key associated the signature
for q in range(int(len(sigs) / 66)):
# Which guardian is this signature associated with
g = sigs[q * 66]
key = keys[((g * 20) + 1) : (((g + 1) * 20) + 1)]
kset = kset + key
txns.append(transaction.ApplicationCallTxn(
sender=vaa_verify_lsig.address(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"verifySigs", sigs, kset, digest],
accounts=accts,
sp=sp
))
txns[-1].fee = 0
txns.append(transaction.ApplicationCallTxn(
sender=creator.getAddress(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"verifyVAA", signed_vaa],
accounts=accts,
sp=sp
))
# send second, unsigned "verifyVAA" call inside
# the transaction chain. this should obviously fail
txns.append(transaction.ApplicationCallTxn(
sender=creator.getAddress(),
index=core_id,
on_complete=transaction.OnComplete.NoOpOC,
app_args=[b"verifyVAA", trash_vaa],
accounts=accts,
sp=sp
))
txns[-1].fee = txns[-1].fee * (1 + blocks)
transaction.assign_group_id(txns)
grp = []
pk = creator.getPrivateKey()
for t in txns:
if ("app_args" in t.__dict__ and len(t.app_args) > 0 and t.app_args[0] == b"verifySigs"):
grp.append(transaction.LogicSigTransaction(t, vaa_verify_lsig))
else:
grp.append(t.sign(pk))
with pytest.raises(AlgodHTTPError) as error:
client.send_transactions(grp)
for x in grp:
portal_core.waitForTransaction(client, x.get_txid())
assert "pushint 504" in str(error), f"signed_vaa:\n {signed_vaa.hex()}\n, unsigned_vaa:\n{trash_vaa.hex()}\n"