diff --git a/solana/agent/src/main.rs b/solana/agent/src/main.rs index d91831a40..316f68df0 100644 --- a/solana/agent/src/main.rs +++ b/solana/agent/src/main.rs @@ -108,132 +108,118 @@ impl Agent for AgentImpl { req: Request, ) -> Result, Status> { let (mut tx, mut rx) = mpsc::channel(1); - let mut tx1 = tx.clone(); let url = self.url.clone(); let bridge = self.bridge.clone(); let rpc_url = self.rpc_url.clone(); - match tokio::spawn(async move { - // Keepalive routine - tokio::spawn(async move { - // We need to keep the channel alive https://github.com/hyperium/tonic/issues/378 - loop { - tx1.send(Ok(LockupEvent { - slot: 0, - time: 0, - lockup_address: String::from(""), - event: Some(Event::Empty(Empty {})), - })) - .await - .unwrap(); + tokio::spawn(async move { + let rpc = RpcClient::new(rpc_url.to_string()); + let sub = PubsubClient::program_subscribe(&url, &bridge).unwrap(); + // looping and sending our response using stream + loop { + let item = sub.1.recv(); + match item { + Ok(v) => { + // We only want to track lockups + if v.value.account.data.len() != size_of::() { + continue; + } - sleep(Duration::new(1, 0)) - } - }); + println!("lockup changed in slot: {}", v.context.slot); - // Watcher thread - tokio::spawn(async move { - let rpc = RpcClient::new(rpc_url.to_string()); - let sub = PubsubClient::program_subscribe(&url, &bridge).unwrap(); - // looping and sending our response using stream - loop { - let item = sub.1.recv(); - match item { - Ok(v) => { - // We only want to track lockups - if v.value.account.data.len() != size_of::() { + let time = match rpc.get_block_time(v.context.slot) { + Ok(v) => v as u64, + Err(e) => { + println!("failed to fetch block time for event: {}", e); continue; } + }; - println!("lockup changed in slot: {}", v.context.slot); + let b = match Bridge::unpack_immutable::( + v.value.account.data.as_slice(), + ) { + Ok(v) => v, + Err(e) => { + println!("failed to deserialize lockup: {}", e); + continue; + } + }; - let time = match rpc.get_block_time(v.context.slot) { - Ok(v) => v as u64, - Err(e) => { - println!("failed to fetch block time for event: {}", e); - continue; - } - }; + let mut amount_b: [u8; 32] = [0; 32]; + b.amount.to_big_endian(&mut amount_b); - // - let b = match Bridge::unpack_immutable::( - v.value.account.data.as_slice(), - ) { - Ok(v) => v, - Err(e) => { - println!("failed to deserialize lockup: {}", e); - continue; - } - }; + let event = if b.vaa_time == 0 { + // The Lockup was created + LockupEvent { + slot: v.context.slot, + lockup_address: v.value.pubkey.to_string(), + time, + event: Some(Event::New(LockupEventNew { + nonce: b.nonce, + source_chain: CHAIN_ID_SOLANA as u32, + target_chain: b.to_chain_id as u32, + source_address: b.source_address.to_vec(), + target_address: b.foreign_address.to_vec(), + token_chain: b.asset.chain as u32, + token_address: b.asset.address.to_vec(), + token_decimals: b.asset.decimals as u32, + amount: amount_b.to_vec(), + })), + } + } else { + // The VAA was submitted + LockupEvent { + slot: v.context.slot, + lockup_address: v.value.pubkey.to_string(), + time, + event: Some(Event::VaaPosted(LockupEventVaaPosted { + nonce: b.nonce, + source_chain: CHAIN_ID_SOLANA as u32, + target_chain: b.to_chain_id as u32, + source_address: b.source_address.to_vec(), + target_address: b.foreign_address.to_vec(), + token_chain: b.asset.chain as u32, + token_address: b.asset.address.to_vec(), + token_decimals: b.asset.decimals as u32, + amount: amount_b.to_vec(), + vaa: b.vaa.to_vec(), + })), + } + }; - let mut amount_b: [u8; 32] = [0; 32]; - b.amount.to_big_endian(&mut amount_b); + let mut amount_b: [u8; 32] = [0; 32]; + b.amount.to_big_endian(&mut amount_b); - let event = if b.vaa_time == 0 { - // The Lockup was created - LockupEvent { - slot: v.context.slot, - lockup_address: v.value.pubkey.to_string(), - time, - event: Some(Event::New(LockupEventNew { - nonce: b.nonce, - source_chain: CHAIN_ID_SOLANA as u32, - target_chain: b.to_chain_id as u32, - source_address: b.source_address.to_vec(), - target_address: b.foreign_address.to_vec(), - token_chain: b.asset.chain as u32, - token_address: b.asset.address.to_vec(), - token_decimals: b.asset.decimals as u32, - amount: amount_b.to_vec(), - })), - } - } else { - // The VAA was submitted - LockupEvent { - slot: v.context.slot, - lockup_address: v.value.pubkey.to_string(), - time, - event: Some(Event::VaaPosted(LockupEventVaaPosted { - nonce: b.nonce, - source_chain: CHAIN_ID_SOLANA as u32, - target_chain: b.to_chain_id as u32, - source_address: b.source_address.to_vec(), - target_address: b.foreign_address.to_vec(), - token_chain: b.asset.chain as u32, - token_address: b.asset.address.to_vec(), - token_decimals: b.asset.decimals as u32, - amount: amount_b.to_vec(), - vaa: b.vaa.to_vec(), - })), - } - }; - - let mut amount_b: [u8; 32] = [0; 32]; - b.amount.to_big_endian(&mut amount_b); - - if let Err(e) = tx.send(Ok(event)).await { - println!("sending event failed: {}", e); - return; - }; - } - Err(e) => { - println!("watcher died: {}", e); - tx.send(Err(Status::new(Code::Aborted, "watcher died"))) - .await; + if let Err(e) = tx.send(Ok(event)).await { + println!("sending event failed: {}", e); return; - } - }; - } - }); - }) - .await - { - Ok(_) => Ok(Response::new(rx)), - Err(_) => Err(Status::new( - Code::Unavailable, - "failed to connect to solana", - )), - } + }; + // We need to push a second message to flush the channel + // https://github.com/hyperium/tonic/issues/378 + if let Err(e) = tx + .send(Ok(LockupEvent { + slot: 0, + time: 0, + lockup_address: String::from(""), + event: Some(Event::Empty(Empty {})), + })) + .await + { + println!("sending event failed: {}", e); + return; + }; + } + Err(e) => { + println!("watcher died: {}", e); + tx.send(Err(Status::new(Code::Aborted, "watcher died"))) + .await; + return; + } + }; + } + }); + + Ok(Response::new(rx)) } }