solana: fix missing transfer info and cli checks

This commit is contained in:
Hendrik Hofstadt 2020-08-21 23:41:45 +02:00
parent 5b8c6e89bd
commit 6b113853bd
3 changed files with 122 additions and 102 deletions

View File

@ -107,115 +107,120 @@ impl Agent for AgentImpl {
&self,
req: Request<WatchLockupsRequest>,
) -> Result<Response<Self::WatchLockupsStream>, Status> {
let (mut tx, rx) = mpsc::channel(1);
let (mut tx, mut rx) = mpsc::channel(1);
let mut tx1 = tx.clone();
let url = self.url.clone();
let bridge = self.bridge.clone();
let rpc_url = self.rpc_url.clone();
// creating a new task
tokio::spawn(async move {
// looping and sending our response using stream
let rpc = RpcClient::new(rpc_url.to_string());
let sub = PubsubClient::program_subscribe(&url, &bridge).unwrap();
loop {
let item = sub.1.recv();
match item {
Ok(v) => {
let rpc = RpcClient::new(rpc_url.to_string());
// We only want to track lockups
if v.value.account.data.len() != size_of::<TransferOutProposal>() {
continue;
// Keepalive routine
let keepalive = tokio::spawn(async move {
// We need to keep the channel alive https://github.com/hyperium/tonic/issues/378
loop {
tx1.send(Ok(LockupEvent {
slot: 0,
time: 0,
lockup_address: String::from(""),
event: Some(Event::Empty(Empty {})),
}))
.await;
sleep(Duration::new(1, 0))
}
});
// Watcher thread
let watcher = tokio::spawn(async move {
// looping and sending our response using stream
loop {
let item = sub.1.recv();
match item {
Ok(v) => {
// We only want to track lockups
if v.value.account.data.len() != size_of::<TransferOutProposal>() {
continue;
}
println!("lockup changed in slot: {}", v.context.slot);
let time = match rpc.get_block_time(v.context.slot) {
Ok(v) => v as u64,
Err(e) => {
println!("failed to fetch block time for event: {}", e);
continue;
}
};
//
let b = match Bridge::unpack_immutable::<TransferOutProposal>(
v.value.account.data.as_slice(),
) {
Ok(v) => v,
Err(e) => {
println!("failed to deserialize lockup: {}", e);
continue;
}
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
let event = if b.vaa_time == 0 {
// The Lockup was created
LockupEvent {
slot: v.context.slot,
lockup_address: v.value.pubkey.to_string(),
time,
event: Some(Event::New(LockupEventNew {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
amount: amount_b.to_vec(),
})),
}
} else {
// The VAA was submitted
LockupEvent {
slot: v.context.slot,
lockup_address: v.value.pubkey.to_string(),
time,
event: Some(Event::VaaPosted(LockupEventVaaPosted {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
amount: amount_b.to_vec(),
vaa: b.vaa.to_vec(),
})),
}
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
if let Err(_) = tx.send(Ok(event)).await {
return;
};
}
println!("lockup changed in slot: {}", v.context.slot);
let time = match rpc.get_block_time(v.context.slot) {
Ok(v) => v as u64,
Err(e) => {
println!("failed to fetch block time for event: {}", e);
continue;
}
};
//
let b = match Bridge::unpack_immutable::<TransferOutProposal>(
v.value.account.data.as_slice(),
) {
Ok(v) => v,
Err(e) => {
println!("failed to deserialize lockup: {}", e);
continue;
}
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
let event = if b.vaa_time == 0 {
// The Lockup was created
LockupEvent {
slot: v.context.slot,
lockup_address: v.value.pubkey.to_string(),
time,
event: Some(Event::New(LockupEventNew {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
amount: amount_b.to_vec(),
})),
}
} else {
// The VAA was submitted
LockupEvent {
slot: v.context.slot,
lockup_address: v.value.pubkey.to_string(),
time,
event: Some(Event::VaaPosted(LockupEventVaaPosted {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
amount: amount_b.to_vec(),
vaa: b.vaa.to_vec(),
})),
}
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
if let Err(_) = tx.send(Ok(event)).await {
Err(_) => {
tx.send(Err(Status::new(Code::Aborted, "watcher died")))
.await;
return;
};
}
Err(_) => {
tx.send(Err(Status::new(Code::Aborted, "watcher died")))
.await;
return;
}
};
}
});
tokio::spawn(async move {
// We need to keep the channel alive https://github.com/hyperium/tonic/issues/378
loop {
tx1.send(Ok(LockupEvent {
slot: 0,
time: 0,
lockup_address: String::from(""),
event: Some(Event::Empty(Empty {})),
}))
.await;
sleep(Duration::new(1, 0))
}
}
};
}
});
});
Ok(Response::new(rx))
}

View File

@ -198,7 +198,6 @@ impl Bridge {
let mut transfer_data = transfer_info.data.borrow_mut();
let mut transfer: &mut TransferOutProposal = Self::unpack(&mut transfer_data)?;
info!("burning");
// Burn tokens
Bridge::wrapped_burn(
program_id,
@ -365,9 +364,11 @@ impl Bridge {
// Initialize proposal
transfer.is_initialized = true;
transfer.foreign_address = t.target;
transfer.amount = t.amount;
transfer.to_chain_id = t.chain_id;
transfer.source_address = mint_info.key.to_bytes();
transfer.foreign_address = t.target;
transfer.nonce = t.nonce;
// Don't use the user-given data as we don't check mint = AssetMeta.address
transfer.asset = AssetMeta {

View File

@ -1221,6 +1221,20 @@ where
}
}
pub fn is_u32<T>(amount: T) -> Result<(), String>
where
T: AsRef<str> + Display,
{
if amount.as_ref().parse::<u32>().is_ok() {
Ok(())
} else {
Err(format!(
"Unable to parse input amount as integer, provided: {}",
amount
))
}
}
pub fn is_hex<T>(value: T) -> Result<(), String>
where
T: AsRef<str> + Display,