From 6b113853bd9a3d8bb5b9e7ed35e96a5e6fb98cb4 Mon Sep 17 00:00:00 2001 From: Hendrik Hofstadt Date: Fri, 21 Aug 2020 23:41:45 +0200 Subject: [PATCH] solana: fix missing transfer info and cli checks --- solana/agent/src/main.rs | 205 +++++++++++++++++---------------- solana/bridge/src/processor.rs | 5 +- solana/cli/src/main.rs | 14 +++ 3 files changed, 122 insertions(+), 102 deletions(-) diff --git a/solana/agent/src/main.rs b/solana/agent/src/main.rs index fd09bb03..37cf8399 100644 --- a/solana/agent/src/main.rs +++ b/solana/agent/src/main.rs @@ -107,115 +107,120 @@ impl Agent for AgentImpl { &self, req: Request, ) -> Result, Status> { - let (mut tx, rx) = mpsc::channel(1); + let (mut tx, mut rx) = mpsc::channel(1); let mut tx1 = tx.clone(); let url = self.url.clone(); let bridge = self.bridge.clone(); let rpc_url = self.rpc_url.clone(); - // creating a new task + tokio::spawn(async move { - // looping and sending our response using stream + let rpc = RpcClient::new(rpc_url.to_string()); let sub = PubsubClient::program_subscribe(&url, &bridge).unwrap(); - loop { - let item = sub.1.recv(); - match item { - Ok(v) => { - let rpc = RpcClient::new(rpc_url.to_string()); - // We only want to track lockups - if v.value.account.data.len() != size_of::() { - continue; + // Keepalive routine + let keepalive = tokio::spawn(async move { + // We need to keep the channel alive https://github.com/hyperium/tonic/issues/378 + loop { + tx1.send(Ok(LockupEvent { + slot: 0, + time: 0, + lockup_address: String::from(""), + event: Some(Event::Empty(Empty {})), + })) + .await; + + sleep(Duration::new(1, 0)) + } + }); + + // Watcher thread + let watcher = tokio::spawn(async move { + // looping and sending our response using stream + loop { + let item = sub.1.recv(); + match item { + Ok(v) => { + // We only want to track lockups + if v.value.account.data.len() != size_of::() { + continue; + } + + println!("lockup changed in slot: {}", v.context.slot); + + let time = match rpc.get_block_time(v.context.slot) { + Ok(v) => v as u64, + Err(e) => { + println!("failed to fetch block time for event: {}", e); + continue; + } + }; + + // + let b = match Bridge::unpack_immutable::( + v.value.account.data.as_slice(), + ) { + Ok(v) => v, + Err(e) => { + println!("failed to deserialize lockup: {}", e); + continue; + } + }; + + let mut amount_b: [u8; 32] = [0; 32]; + b.amount.to_big_endian(&mut amount_b); + + let event = if b.vaa_time == 0 { + // The Lockup was created + LockupEvent { + slot: v.context.slot, + lockup_address: v.value.pubkey.to_string(), + time, + event: Some(Event::New(LockupEventNew { + nonce: b.nonce, + source_chain: CHAIN_ID_SOLANA as u32, + target_chain: b.to_chain_id as u32, + source_address: b.source_address.to_vec(), + target_address: b.foreign_address.to_vec(), + token_chain: b.asset.chain as u32, + token_address: b.asset.address.to_vec(), + amount: amount_b.to_vec(), + })), + } + } else { + // The VAA was submitted + LockupEvent { + slot: v.context.slot, + lockup_address: v.value.pubkey.to_string(), + time, + event: Some(Event::VaaPosted(LockupEventVaaPosted { + nonce: b.nonce, + source_chain: CHAIN_ID_SOLANA as u32, + target_chain: b.to_chain_id as u32, + source_address: b.source_address.to_vec(), + target_address: b.foreign_address.to_vec(), + token_chain: b.asset.chain as u32, + token_address: b.asset.address.to_vec(), + amount: amount_b.to_vec(), + vaa: b.vaa.to_vec(), + })), + } + }; + + let mut amount_b: [u8; 32] = [0; 32]; + b.amount.to_big_endian(&mut amount_b); + + if let Err(_) = tx.send(Ok(event)).await { + return; + }; } - - println!("lockup changed in slot: {}", v.context.slot); - - let time = match rpc.get_block_time(v.context.slot) { - Ok(v) => v as u64, - Err(e) => { - println!("failed to fetch block time for event: {}", e); - continue; - } - }; - - // - let b = match Bridge::unpack_immutable::( - v.value.account.data.as_slice(), - ) { - Ok(v) => v, - Err(e) => { - println!("failed to deserialize lockup: {}", e); - continue; - } - }; - - let mut amount_b: [u8; 32] = [0; 32]; - b.amount.to_big_endian(&mut amount_b); - - let event = if b.vaa_time == 0 { - // The Lockup was created - LockupEvent { - slot: v.context.slot, - lockup_address: v.value.pubkey.to_string(), - time, - event: Some(Event::New(LockupEventNew { - nonce: b.nonce, - source_chain: CHAIN_ID_SOLANA as u32, - target_chain: b.to_chain_id as u32, - source_address: b.source_address.to_vec(), - target_address: b.foreign_address.to_vec(), - token_chain: b.asset.chain as u32, - token_address: b.asset.address.to_vec(), - amount: amount_b.to_vec(), - })), - } - } else { - // The VAA was submitted - LockupEvent { - slot: v.context.slot, - lockup_address: v.value.pubkey.to_string(), - time, - event: Some(Event::VaaPosted(LockupEventVaaPosted { - nonce: b.nonce, - source_chain: CHAIN_ID_SOLANA as u32, - target_chain: b.to_chain_id as u32, - source_address: b.source_address.to_vec(), - target_address: b.foreign_address.to_vec(), - token_chain: b.asset.chain as u32, - token_address: b.asset.address.to_vec(), - amount: amount_b.to_vec(), - vaa: b.vaa.to_vec(), - })), - } - }; - - let mut amount_b: [u8; 32] = [0; 32]; - b.amount.to_big_endian(&mut amount_b); - - if let Err(_) = tx.send(Ok(event)).await { + Err(_) => { + tx.send(Err(Status::new(Code::Aborted, "watcher died"))) + .await; return; - }; - } - Err(_) => { - tx.send(Err(Status::new(Code::Aborted, "watcher died"))) - .await; - return; - } - }; - } - }); - tokio::spawn(async move { - // We need to keep the channel alive https://github.com/hyperium/tonic/issues/378 - loop { - tx1.send(Ok(LockupEvent { - slot: 0, - time: 0, - lockup_address: String::from(""), - event: Some(Event::Empty(Empty {})), - })) - .await; - - sleep(Duration::new(1, 0)) - } + } + }; + } + }); }); Ok(Response::new(rx)) } diff --git a/solana/bridge/src/processor.rs b/solana/bridge/src/processor.rs index f8fa0aff..75299a5b 100644 --- a/solana/bridge/src/processor.rs +++ b/solana/bridge/src/processor.rs @@ -198,7 +198,6 @@ impl Bridge { let mut transfer_data = transfer_info.data.borrow_mut(); let mut transfer: &mut TransferOutProposal = Self::unpack(&mut transfer_data)?; - info!("burning"); // Burn tokens Bridge::wrapped_burn( program_id, @@ -365,9 +364,11 @@ impl Bridge { // Initialize proposal transfer.is_initialized = true; - transfer.foreign_address = t.target; transfer.amount = t.amount; transfer.to_chain_id = t.chain_id; + transfer.source_address = mint_info.key.to_bytes(); + transfer.foreign_address = t.target; + transfer.nonce = t.nonce; // Don't use the user-given data as we don't check mint = AssetMeta.address transfer.asset = AssetMeta { diff --git a/solana/cli/src/main.rs b/solana/cli/src/main.rs index 521a933d..12d115db 100644 --- a/solana/cli/src/main.rs +++ b/solana/cli/src/main.rs @@ -1221,6 +1221,20 @@ where } } +pub fn is_u32(amount: T) -> Result<(), String> +where + T: AsRef + Display, +{ + if amount.as_ref().parse::().is_ok() { + Ok(()) + } else { + Err(format!( + "Unable to parse input amount as integer, provided: {}", + amount + )) + } +} + pub fn is_hex(value: T) -> Result<(), String> where T: AsRef + Display,