cosmwasm: accounting: Return transfer status for observations

When submitting a batch of observations, we don't want an observation
for an already committed transfer to fail the entire batch.  This leads
to more complexity in the guardian and also delays all the legitimate
observations by at least one more block (~5 seconds).

Fix this by returning the transfer status of each observation as part
of the response data.  Observations for committed transfers will get
a `TransferStatus::Committed` response without failing the tx as long
as the digest of the observation matches the digest of the committed
transfer.  Digest mismatches are still an error and will fail the entire
batch.
This commit is contained in:
Chirantan Ekbote 2023-01-13 15:52:36 +09:00 committed by Evan Gray
parent 04b0018cfe
commit 289d37771d
6 changed files with 242 additions and 112 deletions

View File

@ -756,14 +756,7 @@
"minimum": 0.0 "minimum": 0.0
}, },
"emitter_address": { "emitter_address": {
"type": "array", "type": "string"
"items": {
"type": "integer",
"format": "uint8",
"minimum": 0.0
},
"maxItems": 32,
"minItems": 32
}, },
"emitter_chain": { "emitter_chain": {
"type": "integer", "type": "integer",
@ -1055,14 +1048,7 @@
"minimum": 0.0 "minimum": 0.0
}, },
"emitter_address": { "emitter_address": {
"type": "array", "type": "string"
"items": {
"type": "integer",
"format": "uint8",
"minimum": 0.0
},
"maxItems": 32,
"minItems": 32
}, },
"emitter_chain": { "emitter_chain": {
"type": "integer", "type": "integer",
@ -1404,14 +1390,7 @@
"minimum": 0.0 "minimum": 0.0
}, },
"emitter_address": { "emitter_address": {
"type": "array", "type": "string"
"items": {
"type": "integer",
"format": "uint8",
"minimum": 0.0
},
"maxItems": 32,
"minItems": 32
}, },
"emitter_chain": { "emitter_chain": {
"type": "integer", "type": "integer",

View File

@ -3,7 +3,7 @@ use std::marker::PhantomData;
use accounting::{ use accounting::{
query_balance, query_modification, query_balance, query_modification,
state::{account, transfer, Modification, TokenAddress, Transfer}, state::{account, transfer, Modification, TokenAddress, Transfer},
validate_transfer, TransferError, validate_transfer,
}; };
use anyhow::{ensure, Context}; use anyhow::{ensure, Context};
#[cfg(not(feature = "library"))] #[cfg(not(feature = "library"))]
@ -29,8 +29,9 @@ use crate::{
msg::{ msg::{
AllAccountsResponse, AllModificationsResponse, AllPendingTransfersResponse, AllAccountsResponse, AllModificationsResponse, AllPendingTransfersResponse,
AllTransfersResponse, BatchTransferStatusResponse, ChainRegistrationResponse, ExecuteMsg, AllTransfersResponse, BatchTransferStatusResponse, ChainRegistrationResponse, ExecuteMsg,
MigrateMsg, MissingObservation, MissingObservationsResponse, Observation, QueryMsg, MigrateMsg, MissingObservation, MissingObservationsResponse, Observation, ObservationError,
TransferDetails, TransferStatus, Upgrade, ObservationStatus, QueryMsg, SubmitObservationResponse, TransferDetails, TransferStatus,
Upgrade,
}, },
state::{Data, PendingTransfer, CHAIN_REGISTRATIONS, DIGESTS, PENDING_TRANSFERS}, state::{Data, PendingTransfer, CHAIN_REGISTRATIONS, DIGESTS, PENDING_TRANSFERS},
}; };
@ -118,20 +119,39 @@ fn submit_observations(
let observations: Vec<Observation> = let observations: Vec<Observation> =
from_binary(&observations).context("failed to parse `Observations`")?; from_binary(&observations).context("failed to parse `Observations`")?;
let events = observations let mut responses = Vec::with_capacity(observations.len());
.into_iter() let mut events = Vec::with_capacity(observations.len());
.map(|o| { for o in observations {
let key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence); let key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
handle_observation(deps.branch(), o, guardian_set_index, quorum, signature) match handle_observation(deps.branch(), o, guardian_set_index, quorum, signature) {
.with_context(|| format!("failed to handle observation for key {key}")) Ok((status, event)) => {
}) responses.push(SubmitObservationResponse { key, status });
.filter_map(Result::transpose) if let Some(evt) = event {
.collect::<anyhow::Result<Vec<_>>>() events.push(evt);
.context("failed to handle `Observation`")?; }
}
Err(e) => {
let err = ObservationError {
key,
error: format!("{e:#}"),
};
let evt = cw_transcode::to_event(&err)
.context("failed to transcode observation error")?;
events.push(evt);
responses.push(SubmitObservationResponse {
key: err.key,
status: ObservationStatus::Error(err.error),
});
}
}
}
let data = to_binary(&responses).context("failed to serialize transfer details")?;
Ok(Response::new() Ok(Response::new()
.add_attribute("action", "submit_observations") .add_attribute("action", "submit_observations")
.add_attribute("owner", info.sender) .add_attribute("owner", info.sender)
.set_data(data)
.add_events(events)) .add_events(events))
} }
@ -141,8 +161,10 @@ fn handle_observation(
guardian_set_index: u32, guardian_set_index: u32,
quorum: usize, quorum: usize,
sig: Signature, sig: Signature,
) -> anyhow::Result<Option<Event>> { ) -> anyhow::Result<(ObservationStatus, Option<Event>)> {
let digest_key = DIGESTS.key((o.emitter_chain, o.emitter_address.to_vec(), o.sequence)); let digest_key = DIGESTS.key((o.emitter_chain, o.emitter_address.to_vec(), o.sequence));
let tx_key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
if let Some(saved_digest) = digest_key if let Some(saved_digest) = digest_key
.may_load(deps.storage) .may_load(deps.storage)
.context("failed to load transfer digest")? .context("failed to load transfer digest")?
@ -152,11 +174,9 @@ fn handle_observation(
bail!(ContractError::DigestMismatch); bail!(ContractError::DigestMismatch);
} }
bail!(TransferError::DuplicateTransfer); return Ok((ObservationStatus::Committed, None));
} }
let tx_key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
let key = PENDING_TRANSFERS.key(tx_key.clone()); let key = PENDING_TRANSFERS.key(tx_key.clone());
let mut pending = key let mut pending = key
.may_load(deps.storage) .may_load(deps.storage)
@ -181,7 +201,7 @@ fn handle_observation(
key.save(deps.storage, &pending) key.save(deps.storage, &pending)
.context("failed to save pending transfers")?; .context("failed to save pending transfers")?;
return Ok(None); return Ok((ObservationStatus::Pending, None));
} }
let msg = serde_wormhole::from_slice::<Message<&RawMessage>>(&o.payload) let msg = serde_wormhole::from_slice::<Message<&RawMessage>>(&o.payload)
@ -238,9 +258,11 @@ fn handle_observation(
// Now that the transfer has been committed, we don't need to keep it in the pending list. // Now that the transfer has been committed, we don't need to keep it in the pending list.
key.remove(deps.storage); key.remove(deps.storage);
cw_transcode::to_event(&o) let event = cw_transcode::to_event(&o)
.map(Some) .map(Some)
.context("failed to transcode `Observation` to `Event`") .context("failed to transcode `Observation` to `Event`")?;
Ok((ObservationStatus::Committed, event))
} }
fn modify_balance( fn modify_balance(

View File

@ -58,6 +58,32 @@ impl Observation {
} }
} }
// The default externally-tagged serde representation of enums is awkward in JSON when the
// enum contains unit variants mixed with newtype variants. We can't use the internally-tagged
// representation because it only supports newtype variants that contain structs or maps. So use
// the adjacently tagged variant representation here: the enum is always encoded as an object with
// a "type" field that indicates the variant and an optional "data" field that contains the data for
// the variant, if any.
#[cw_serde]
#[serde(tag = "type", content = "data")]
pub enum ObservationStatus {
Pending,
Committed,
Error(String),
}
#[cw_serde]
pub struct SubmitObservationResponse {
pub key: transfer::Key,
pub status: ObservationStatus,
}
#[cw_serde]
pub struct ObservationError {
pub key: transfer::Key,
pub error: String,
}
#[cw_serde] #[cw_serde]
pub struct Upgrade { pub struct Upgrade {
pub new_addr: [u8; 32], pub new_addr: [u8; 32],

View File

@ -1,10 +1,12 @@
mod helpers; mod helpers;
use std::collections::BTreeMap;
use accounting::state::{account, transfer, Kind, Modification, TokenAddress}; use accounting::state::{account, transfer, Kind, Modification, TokenAddress};
use cosmwasm_std::{to_binary, Binary, Event, Uint256}; use cosmwasm_std::{from_binary, to_binary, Binary, Event, Uint256};
use cw_multi_test::AppResponse; use cw_multi_test::AppResponse;
use helpers::*; use helpers::*;
use wormchain_accounting::msg::Observation; use wormchain_accounting::msg::{Observation, ObservationStatus, SubmitObservationResponse};
use wormhole::{ use wormhole::{
token::Message, token::Message,
vaa::{Body, Header}, vaa::{Body, Header},
@ -58,9 +60,15 @@ fn batch() {
.unwrap() as usize; .unwrap() as usize;
for (i, s) in signatures.into_iter().enumerate() { for (i, s) in signatures.into_iter().enumerate() {
if i < quorum { let resp = contract.submit_observations(obs.clone(), index, s).unwrap();
contract.submit_observations(obs.clone(), index, s).unwrap();
let status = from_binary::<Vec<SubmitObservationResponse>>(&resp.data.unwrap())
.unwrap()
.into_iter()
.map(|resp| (resp.key, resp.status))
.collect::<BTreeMap<_, _>>();
if i < quorum {
// Once there is a quorum the pending transfers are removed. // Once there is a quorum the pending transfers are removed.
if i < quorum - 1 { if i < quorum - 1 {
for o in &observations { for o in &observations {
@ -70,6 +78,7 @@ fn batch() {
assert_eq!(o, data[0].observation()); assert_eq!(o, data[0].observation());
// Make sure the transfer hasn't yet been committed. // Make sure the transfer hasn't yet been committed.
assert!(matches!(status[&key], ObservationStatus::Pending));
contract contract
.query_transfer(key) .query_transfer(key)
.expect_err("transfer committed without quorum"); .expect_err("transfer committed without quorum");
@ -78,15 +87,19 @@ fn batch() {
for o in &observations { for o in &observations {
let key = let key =
transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence); transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
assert!(matches!(status[&key], ObservationStatus::Committed));
contract contract
.query_pending_transfer(key) .query_pending_transfer(key)
.expect_err("found pending transfer for observation with quorum"); .expect_err("found pending transfer for observation with quorum");
} }
} }
} else { } else {
contract // Submitting observations for committed transfers is not an error as long as the
.submit_observations(obs.clone(), index, s) // digests match.
.expect_err("successfully submitted observation for committed transfer"); for o in &observations {
let key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
assert!(matches!(status[&key], ObservationStatus::Committed));
}
} }
} }
@ -151,15 +164,34 @@ fn duplicates() {
.calculate_quorum(index, contract.app().block_info().height) .calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize; .unwrap() as usize;
for (i, s) in signatures.iter().take(quorum).cloned().enumerate() { for (i, s) in signatures.into_iter().enumerate() {
contract.submit_observations(obs.clone(), index, s).unwrap(); contract.submit_observations(obs.clone(), index, s).unwrap();
let err = contract let resp = contract.submit_observations(obs.clone(), index, s).unwrap();
.submit_observations(obs.clone(), index, s) let status = from_binary::<Vec<SubmitObservationResponse>>(&resp.data.unwrap())
.expect_err("successfully submitted duplicate observations"); .unwrap()
.into_iter()
.map(|details| (details.key, details.status))
.collect::<BTreeMap<_, _>>();
if i < quorum - 1 { if i < quorum - 1 {
// Sadly we can't match on the exact error type in an integration test because the // Resubmitting the same signature without quorum will return an error.
// test frameworks converts it into a string before it reaches this point. for o in &observations {
assert!(format!("{err:#}").contains("duplicate signatures")); let key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
if let ObservationStatus::Error(ref err) = status[&key] {
assert!(err.contains("duplicate signatures"));
} else {
panic!(
"unexpected status for duplicate signature: {:?}",
status[&key]
);
}
}
} else {
// Submitting a signature for a committed transfer is not an error as long as the
// digests match.
for o in &observations {
let key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
assert!(matches!(status[&key], ObservationStatus::Committed));
}
} }
} }
@ -207,12 +239,6 @@ fn duplicates() {
assert_eq!(expected.amount, *dst); assert_eq!(expected.amount, *dst);
} }
for s in signatures {
contract
.submit_observations(obs.clone(), index, s)
.expect_err("successfully submitted observation for committed transfer");
}
} }
fn transfer_tokens( fn transfer_tokens(
@ -221,7 +247,7 @@ fn transfer_tokens(
key: transfer::Key, key: transfer::Key,
msg: Message, msg: Message,
index: u32, index: u32,
quorum: usize, num_signatures: usize,
) -> anyhow::Result<(Observation, Vec<AppResponse>)> { ) -> anyhow::Result<(Observation, Vec<AppResponse>)> {
let payload = serde_wormhole::to_vec(&msg).map(Binary::from).unwrap(); let payload = serde_wormhole::to_vec(&msg).map(Binary::from).unwrap();
let o = Observation { let o = Observation {
@ -240,7 +266,7 @@ fn transfer_tokens(
let responses = signatures let responses = signatures
.into_iter() .into_iter()
.take(quorum) .take(num_signatures)
.map(|s| contract.submit_observations(obs.clone(), index, s)) .map(|s| contract.submit_observations(obs.clone(), index, s))
.collect::<anyhow::Result<Vec<_>>>()?; .collect::<anyhow::Result<Vec<_>>>()?;
@ -252,9 +278,7 @@ fn round_trip() {
let (wh, mut contract) = proper_instantiate(); let (wh, mut contract) = proper_instantiate();
register_emitters(&wh, &mut contract, 15); register_emitters(&wh, &mut contract, 15);
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let quorum = wh let num_guardians = wh.num_guardians();
.calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize;
let emitter_chain = 2; let emitter_chain = 2;
let amount = Amount(Uint256::from(500u128).to_be_bytes()); let amount = Amount(Uint256::from(500u128).to_be_bytes());
@ -272,7 +296,8 @@ fn round_trip() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
let (o, _) = transfer_tokens(&wh, &mut contract, key.clone(), msg, index, quorum).unwrap(); let (o, _) =
transfer_tokens(&wh, &mut contract, key.clone(), msg, index, num_guardians).unwrap();
let expected = transfer::Data { let expected = transfer::Data {
amount: Uint256::new(amount.0), amount: Uint256::new(amount.0),
@ -298,7 +323,8 @@ fn round_trip() {
recipient_chain: emitter_chain.into(), recipient_chain: emitter_chain.into(),
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
let (o, _) = transfer_tokens(&wh, &mut contract, key.clone(), msg, index, quorum).unwrap(); let (o, _) =
transfer_tokens(&wh, &mut contract, key.clone(), msg, index, num_guardians).unwrap();
let expected = transfer::Data { let expected = transfer::Data {
amount: Uint256::new(amount.0), amount: Uint256::new(amount.0),
@ -336,9 +362,7 @@ fn round_trip() {
fn missing_guardian_set() { fn missing_guardian_set() {
let (wh, mut contract) = proper_instantiate(); let (wh, mut contract) = proper_instantiate();
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let quorum = wh let num_guardians = wh.num_guardians();
.calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize;
let emitter_chain = 2; let emitter_chain = 2;
let amount = Amount(Uint256::from(500u128).to_be_bytes()); let amount = Amount(Uint256::from(500u128).to_be_bytes());
@ -356,7 +380,7 @@ fn missing_guardian_set() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
transfer_tokens(&wh, &mut contract, key, msg, index + 1, quorum) transfer_tokens(&wh, &mut contract, key, msg, index + 1, num_guardians)
.expect_err("successfully submitted observations with invalid guardian set"); .expect_err("successfully submitted observations with invalid guardian set");
} }
@ -366,7 +390,7 @@ fn expired_guardian_set() {
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let mut block = contract.app().block_info(); let mut block = contract.app().block_info();
let quorum = wh.calculate_quorum(index, block.height).unwrap() as usize; let num_guardians = wh.num_guardians();
let emitter_chain = 2; let emitter_chain = 2;
let amount = Amount(Uint256::from(500u128).to_be_bytes()); let amount = Amount(Uint256::from(500u128).to_be_bytes());
@ -389,7 +413,7 @@ fn expired_guardian_set() {
block.height += 1; block.height += 1;
contract.app_mut().set_block(block); contract.app_mut().set_block(block);
transfer_tokens(&wh, &mut contract, key, msg, index, quorum) transfer_tokens(&wh, &mut contract, key, msg, index, num_guardians)
.expect_err("successfully submitted observations with expired guardian set"); .expect_err("successfully submitted observations with expired guardian set");
} }
@ -446,7 +470,9 @@ fn no_quorum() {
#[test] #[test]
fn missing_wrapped_account() { fn missing_wrapped_account() {
let (wh, mut contract) = proper_instantiate(); let (wh, mut contract) = proper_instantiate();
register_emitters(&wh, &mut contract, 15);
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let num_guardians = wh.num_guardians();
let quorum = wh let quorum = wh
.calculate_quorum(index, contract.app().block_info().height) .calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize; .unwrap() as usize;
@ -467,8 +493,27 @@ fn missing_wrapped_account() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
transfer_tokens(&wh, &mut contract, key, msg, index, quorum) let (_, responses) =
.expect_err("successfully burned wrapped tokens without a wrapped amount"); transfer_tokens(&wh, &mut contract, key.clone(), msg, index, num_guardians).unwrap();
for mut resp in responses.into_iter().skip(quorum - 1) {
let r = from_binary::<Vec<SubmitObservationResponse>>(&resp.data.take().unwrap()).unwrap();
assert_eq!(key, r[0].key);
if let ObservationStatus::Error(ref err) = r[0].status {
assert!(
err.contains("cannot burn wrapped tokens without an existing wrapped account"),
"{err}"
);
resp.assert_event(
&Event::new("wasm-ObservationError")
.add_attribute("key", serde_json_wasm::to_string(&key).unwrap()),
);
} else {
panic!(
"unexpected response for transfer with missing wrapped account {:?}",
r[0]
);
}
}
} }
#[test] #[test]
@ -480,7 +525,9 @@ fn missing_native_account() {
let token_chain = 2; let token_chain = 2;
let (wh, mut contract) = proper_instantiate(); let (wh, mut contract) = proper_instantiate();
register_emitters(&wh, &mut contract, 15);
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let num_guardians = wh.num_guardians();
let quorum = wh let quorum = wh
.calculate_quorum(index, contract.app().block_info().height) .calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize; .unwrap() as usize;
@ -509,8 +556,27 @@ fn missing_native_account() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
transfer_tokens(&wh, &mut contract, key, msg, index, quorum) let (_, responses) =
.expect_err("successfully unlocked native tokens without a native account"); transfer_tokens(&wh, &mut contract, key.clone(), msg, index, num_guardians).unwrap();
for mut resp in responses.into_iter().skip(quorum - 1) {
let r = from_binary::<Vec<SubmitObservationResponse>>(&resp.data.take().unwrap()).unwrap();
assert_eq!(key, r[0].key);
if let ObservationStatus::Error(ref err) = r[0].status {
assert!(
err.contains("cannot unlock native tokens without an existing native account"),
"{err}"
);
resp.assert_event(
&Event::new("wasm-ObservationError")
.add_attribute("key", serde_json_wasm::to_string(&key).unwrap()),
);
} else {
panic!(
"unexpected response for transfer with missing native account {:?}",
r[0]
);
}
}
} }
#[test] #[test]
@ -520,9 +586,7 @@ fn repeated() {
let (wh, mut contract) = proper_instantiate(); let (wh, mut contract) = proper_instantiate();
register_emitters(&wh, &mut contract, 3); register_emitters(&wh, &mut contract, 3);
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let quorum = wh let num_guardians = wh.num_guardians();
.calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize;
let emitter_chain = 2; let emitter_chain = 2;
let recipient_chain = 14; let recipient_chain = 14;
@ -541,7 +605,15 @@ fn repeated() {
for i in 0..ITERATIONS { for i in 0..ITERATIONS {
let key = transfer::Key::new(emitter_chain, [emitter_chain as u8; 32].into(), i as u64); let key = transfer::Key::new(emitter_chain, [emitter_chain as u8; 32].into(), i as u64);
transfer_tokens(&wh, &mut contract, key.clone(), msg.clone(), index, quorum).unwrap(); transfer_tokens(
&wh,
&mut contract,
key.clone(),
msg.clone(),
index,
num_guardians,
)
.unwrap();
} }
let expected = Uint256::new(amount.0) * Uint256::from(ITERATIONS as u128); let expected = Uint256::new(amount.0) * Uint256::from(ITERATIONS as u128);
@ -577,9 +649,7 @@ fn wrapped_to_wrapped() {
let (wh, mut contract) = proper_instantiate(); let (wh, mut contract) = proper_instantiate();
register_emitters(&wh, &mut contract, 15); register_emitters(&wh, &mut contract, 15);
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let quorum = wh let num_guardians = wh.num_guardians();
.calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize;
// We need an initial fake wrapped account. // We need an initial fake wrapped account.
let m = to_binary(&Modification { let m = to_binary(&Modification {
@ -605,7 +675,8 @@ fn wrapped_to_wrapped() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
let (o, _) = transfer_tokens(&wh, &mut contract, key.clone(), msg, index, quorum).unwrap(); let (o, _) =
transfer_tokens(&wh, &mut contract, key.clone(), msg, index, num_guardians).unwrap();
let expected = transfer::Data { let expected = transfer::Data {
amount: Uint256::new(amount.0), amount: Uint256::new(amount.0),
@ -642,6 +713,7 @@ fn wrapped_to_wrapped() {
fn unknown_emitter() { fn unknown_emitter() {
let (wh, mut contract) = proper_instantiate(); let (wh, mut contract) = proper_instantiate();
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let num_guardians = wh.num_guardians();
let quorum = wh let quorum = wh
.calculate_quorum(index, contract.app().block_info().height) .calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize; .unwrap() as usize;
@ -662,8 +734,24 @@ fn unknown_emitter() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
transfer_tokens(&wh, &mut contract, key, msg, index, quorum) let (_, responses) =
.expect_err("successfully transfered tokens with an invalid emitter address"); transfer_tokens(&wh, &mut contract, key.clone(), msg, index, num_guardians).unwrap();
for mut resp in responses.into_iter().skip(quorum - 1) {
let r = from_binary::<Vec<SubmitObservationResponse>>(&resp.data.take().unwrap()).unwrap();
assert_eq!(key, r[0].key);
if let ObservationStatus::Error(ref err) = r[0].status {
assert!(err.contains("no registered emitter"));
resp.assert_event(
&Event::new("wasm-ObservationError")
.add_attribute("key", serde_json_wasm::to_string(&key).unwrap()),
);
} else {
panic!(
"unexpected response for transfer with unknown emitter address {:?}",
r[0]
);
}
}
} }
#[test] #[test]
@ -757,6 +845,7 @@ fn emit_event_with_quorum() {
let quorum = wh let quorum = wh
.calculate_quorum(index, contract.app().block_info().height) .calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize; .unwrap() as usize;
let num_guardians = wh.num_guardians();
let emitter_chain = 2; let emitter_chain = 2;
let amount = Amount(Uint256::from(500u128).to_be_bytes()); let amount = Amount(Uint256::from(500u128).to_be_bytes());
@ -774,7 +863,8 @@ fn emit_event_with_quorum() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
let (o, responses) = transfer_tokens(&wh, &mut contract, key, msg, index, quorum).unwrap(); let (o, responses) =
transfer_tokens(&wh, &mut contract, key, msg, index, num_guardians).unwrap();
let expected = Event::new("wasm-Observation") let expected = Event::new("wasm-Observation")
.add_attribute("tx_hash", serde_json_wasm::to_string(&o.tx_hash).unwrap()) .add_attribute("tx_hash", serde_json_wasm::to_string(&o.tx_hash).unwrap())
@ -798,9 +888,9 @@ fn emit_event_with_quorum() {
) )
.add_attribute("payload", serde_json_wasm::to_string(&o.payload).unwrap()); .add_attribute("payload", serde_json_wasm::to_string(&o.payload).unwrap());
assert_eq!(responses.len(), quorum); assert_eq!(responses.len(), num_guardians);
for (i, r) in responses.into_iter().enumerate() { for (i, r) in responses.into_iter().enumerate() {
if i < quorum - 1 { if i < quorum - 1 || i >= quorum {
assert!(!r.has_event(&expected)); assert!(!r.has_event(&expected));
} else { } else {
r.assert_event(&expected); r.assert_event(&expected);
@ -814,9 +904,7 @@ fn duplicate_vaa() {
register_emitters(&wh, &mut contract, 3); register_emitters(&wh, &mut contract, 3);
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let quorum = wh let num_guardians = wh.num_guardians();
.calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize;
let emitter_chain = 2; let emitter_chain = 2;
let amount = Amount(Uint256::from(500u128).to_be_bytes()); let amount = Amount(Uint256::from(500u128).to_be_bytes());
@ -834,7 +922,7 @@ fn duplicate_vaa() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
let (o, _) = transfer_tokens(&wh, &mut contract, key, msg, index, quorum).unwrap(); let (o, _) = transfer_tokens(&wh, &mut contract, key, msg, index, num_guardians).unwrap();
// Now try to submit a VAA for this transfer. This should fail since the transfer is already // Now try to submit a VAA for this transfer. This should fail since the transfer is already
// processed. // processed.
@ -871,9 +959,7 @@ fn digest_mismatch() {
register_emitters(&wh, &mut contract, 3); register_emitters(&wh, &mut contract, 3);
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let quorum = wh let num_guardians = wh.num_guardians();
.calculate_quorum(index, contract.app().block_info().height)
.unwrap() as usize;
let emitter_chain = 2; let emitter_chain = 2;
let amount = Amount(Uint256::from(500u128).to_be_bytes()); let amount = Amount(Uint256::from(500u128).to_be_bytes());
@ -891,7 +977,7 @@ fn digest_mismatch() {
fee: Amount([0u8; 32]), fee: Amount([0u8; 32]),
}; };
let (o, _) = transfer_tokens(&wh, &mut contract, key, msg, index, quorum).unwrap(); let (o, _) = transfer_tokens(&wh, &mut contract, key, msg, index, num_guardians).unwrap();
// Now try submitting a VAA with the same (chain, address, sequence) tuple but with // Now try submitting a VAA with the same (chain, address, sequence) tuple but with
// different details. // different details.

View File

@ -1,10 +1,10 @@
mod helpers; mod helpers;
use accounting::state::{transfer, TokenAddress}; use accounting::state::{transfer, TokenAddress};
use cosmwasm_std::{to_binary, Binary, Event, Uint256}; use cosmwasm_std::{from_binary, to_binary, Binary, Event, Uint256};
use helpers::*; use helpers::*;
use serde_wormhole::RawMessage; use serde_wormhole::RawMessage;
use wormchain_accounting::msg::Observation; use wormchain_accounting::msg::{Observation, ObservationStatus, SubmitObservationResponse};
use wormhole::{ use wormhole::{
token::Message, token::Message,
vaa::{Body, Header, Vaa}, vaa::{Body, Header, Vaa},
@ -288,15 +288,20 @@ fn reobservation() {
consistency_level: v.consistency_level, consistency_level: v.consistency_level,
payload: serde_wormhole::to_vec(&v.payload).unwrap().into(), payload: serde_wormhole::to_vec(&v.payload).unwrap().into(),
}; };
let key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
let obs = to_binary(&vec![o]).unwrap(); let obs = to_binary(&vec![o]).unwrap();
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let signatures = wh.sign(&obs); let signatures = wh.sign(&obs);
for s in signatures { for s in signatures {
let err = contract let resp = contract.submit_observations(obs.clone(), index, s).unwrap();
.submit_observations(obs.clone(), index, s) let mut responses: Vec<SubmitObservationResponse> =
.expect_err("successfully submitted observation for processed VAA"); from_binary(&resp.data.unwrap()).unwrap();
assert!(format!("{err:#}").contains("transfer already committed"));
assert_eq!(1, responses.len());
let d = responses.remove(0);
assert_eq!(key, d.key);
assert!(matches!(d.status, ObservationStatus::Committed));
} }
} }
@ -322,13 +327,21 @@ fn digest_mismatch() {
payload: serde_wormhole::to_vec(&v.payload).unwrap().into(), payload: serde_wormhole::to_vec(&v.payload).unwrap().into(),
}; };
let key = transfer::Key::new(o.emitter_chain, o.emitter_address.into(), o.sequence);
let obs = to_binary(&vec![o]).unwrap(); let obs = to_binary(&vec![o]).unwrap();
let index = wh.guardian_set_index(); let index = wh.guardian_set_index();
let signatures = wh.sign(&obs); let signatures = wh.sign(&obs);
for s in signatures { for s in signatures {
let err = contract let resp = contract.submit_observations(obs.clone(), index, s).unwrap();
.submit_observations(obs.clone(), index, s) let responses = from_binary::<Vec<SubmitObservationResponse>>(&resp.data.unwrap()).unwrap();
.expect_err("successfully submitted different observation for processed VAA"); assert_eq!(key, responses[0].key);
assert!(format!("{err:#}").contains("digest mismatch")); if let ObservationStatus::Error(ref err) = responses[0].status {
assert!(err.contains("digest mismatch"));
} else {
panic!(
"unexpected status for observation with mismatched digest: {:?}",
responses[0].status
);
}
} }
} }

View File

@ -183,6 +183,10 @@ impl WormholeKeeper {
pub fn set_index(&self, index: u32) { pub fn set_index(&self, index: u32) {
self.0.borrow_mut().index = index; self.0.borrow_mut().index = index;
} }
pub fn num_guardians(&self) -> usize {
self.0.borrow().guardians.len()
}
} }
impl Default for WormholeKeeper { impl Default for WormholeKeeper {