agent: remove keepalive routine

This commit is contained in:
Hendrik Hofstadt 2020-08-31 14:20:23 +02:00
parent dfa3739de7
commit fb603d468c
1 changed files with 98 additions and 112 deletions

View File

@ -108,132 +108,118 @@ impl Agent for AgentImpl {
req: Request<WatchLockupsRequest>,
) -> Result<Response<Self::WatchLockupsStream>, Status> {
let (mut tx, mut rx) = mpsc::channel(1);
let mut tx1 = tx.clone();
let url = self.url.clone();
let bridge = self.bridge.clone();
let rpc_url = self.rpc_url.clone();
match tokio::spawn(async move {
// Keepalive routine
tokio::spawn(async move {
// We need to keep the channel alive https://github.com/hyperium/tonic/issues/378
loop {
tx1.send(Ok(LockupEvent {
slot: 0,
time: 0,
lockup_address: String::from(""),
event: Some(Event::Empty(Empty {})),
}))
.await
.unwrap();
tokio::spawn(async move {
let rpc = RpcClient::new(rpc_url.to_string());
let sub = PubsubClient::program_subscribe(&url, &bridge).unwrap();
// looping and sending our response using stream
loop {
let item = sub.1.recv();
match item {
Ok(v) => {
// We only want to track lockups
if v.value.account.data.len() != size_of::<TransferOutProposal>() {
continue;
}
sleep(Duration::new(1, 0))
}
});
println!("lockup changed in slot: {}", v.context.slot);
// Watcher thread
tokio::spawn(async move {
let rpc = RpcClient::new(rpc_url.to_string());
let sub = PubsubClient::program_subscribe(&url, &bridge).unwrap();
// looping and sending our response using stream
loop {
let item = sub.1.recv();
match item {
Ok(v) => {
// We only want to track lockups
if v.value.account.data.len() != size_of::<TransferOutProposal>() {
let time = match rpc.get_block_time(v.context.slot) {
Ok(v) => v as u64,
Err(e) => {
println!("failed to fetch block time for event: {}", e);
continue;
}
};
println!("lockup changed in slot: {}", v.context.slot);
let b = match Bridge::unpack_immutable::<TransferOutProposal>(
v.value.account.data.as_slice(),
) {
Ok(v) => v,
Err(e) => {
println!("failed to deserialize lockup: {}", e);
continue;
}
};
let time = match rpc.get_block_time(v.context.slot) {
Ok(v) => v as u64,
Err(e) => {
println!("failed to fetch block time for event: {}", e);
continue;
}
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
//
let b = match Bridge::unpack_immutable::<TransferOutProposal>(
v.value.account.data.as_slice(),
) {
Ok(v) => v,
Err(e) => {
println!("failed to deserialize lockup: {}", e);
continue;
}
};
let event = if b.vaa_time == 0 {
// The Lockup was created
LockupEvent {
slot: v.context.slot,
lockup_address: v.value.pubkey.to_string(),
time,
event: Some(Event::New(LockupEventNew {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
token_decimals: b.asset.decimals as u32,
amount: amount_b.to_vec(),
})),
}
} else {
// The VAA was submitted
LockupEvent {
slot: v.context.slot,
lockup_address: v.value.pubkey.to_string(),
time,
event: Some(Event::VaaPosted(LockupEventVaaPosted {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
token_decimals: b.asset.decimals as u32,
amount: amount_b.to_vec(),
vaa: b.vaa.to_vec(),
})),
}
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
let event = if b.vaa_time == 0 {
// The Lockup was created
LockupEvent {
slot: v.context.slot,
lockup_address: v.value.pubkey.to_string(),
time,
event: Some(Event::New(LockupEventNew {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
token_decimals: b.asset.decimals as u32,
amount: amount_b.to_vec(),
})),
}
} else {
// The VAA was submitted
LockupEvent {
slot: v.context.slot,
lockup_address: v.value.pubkey.to_string(),
time,
event: Some(Event::VaaPosted(LockupEventVaaPosted {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
token_decimals: b.asset.decimals as u32,
amount: amount_b.to_vec(),
vaa: b.vaa.to_vec(),
})),
}
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
if let Err(e) = tx.send(Ok(event)).await {
println!("sending event failed: {}", e);
return;
};
}
Err(e) => {
println!("watcher died: {}", e);
tx.send(Err(Status::new(Code::Aborted, "watcher died")))
.await;
if let Err(e) = tx.send(Ok(event)).await {
println!("sending event failed: {}", e);
return;
}
};
}
});
})
.await
{
Ok(_) => Ok(Response::new(rx)),
Err(_) => Err(Status::new(
Code::Unavailable,
"failed to connect to solana",
)),
}
};
// We need to push a second message to flush the channel
// https://github.com/hyperium/tonic/issues/378
if let Err(e) = tx
.send(Ok(LockupEvent {
slot: 0,
time: 0,
lockup_address: String::from(""),
event: Some(Event::Empty(Empty {})),
}))
.await
{
println!("sending event failed: {}", e);
return;
};
}
Err(e) => {
println!("watcher died: {}", e);
tx.send(Err(Status::new(Code::Aborted, "watcher died")))
.await;
return;
}
};
}
});
Ok(Response::new(rx))
}
}