Watch slots for Solana liveness monitoring (#141)
* add slot notifications to the agent * fix proto field naming
This commit is contained in:
parent
03ef5e1017
commit
12a6ae31bc
|
@ -35,6 +35,7 @@ message LockupEvent {
|
||||||
LockupEventNew new = 4;
|
LockupEventNew new = 4;
|
||||||
LockupEventVAAPosted vaaPosted = 5;
|
LockupEventVAAPosted vaaPosted = 5;
|
||||||
Empty empty = 6;
|
Empty empty = 6;
|
||||||
|
Empty slotEvent = 7;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -161,9 +161,12 @@ impl Agent for AgentImpl {
|
||||||
_req: Request<WatchLockupsRequest>,
|
_req: Request<WatchLockupsRequest>,
|
||||||
) -> Result<Response<Self::WatchLockupsStream>, Status> {
|
) -> Result<Response<Self::WatchLockupsStream>, Status> {
|
||||||
let (mut tx, rx) = mpsc::channel(1);
|
let (mut tx, rx) = mpsc::channel(1);
|
||||||
|
let mut tx2 = tx.clone();
|
||||||
let url = self.url.clone();
|
let url = self.url.clone();
|
||||||
|
let url2 = self.url.clone();
|
||||||
let bridge = self.bridge.clone();
|
let bridge = self.bridge.clone();
|
||||||
let rpc_url = self.rpc_url.clone();
|
let rpc_url = self.rpc_url.clone();
|
||||||
|
let rpc_url2 = self.rpc_url.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _rpc = RpcClient::new(rpc_url.to_string());
|
let _rpc = RpcClient::new(rpc_url.to_string());
|
||||||
|
@ -264,6 +267,37 @@ impl Agent for AgentImpl {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _rpc = RpcClient::new(rpc_url2.to_string());
|
||||||
|
let sub = solana_client::pubsub_client::PubsubClient::slot_subscribe(&url2).unwrap();
|
||||||
|
// looping and sending our response using stream
|
||||||
|
loop {
|
||||||
|
let item = sub.1.recv();
|
||||||
|
match item {
|
||||||
|
Ok(v) => {
|
||||||
|
if let Err(e) = tx2
|
||||||
|
.send(Ok(LockupEvent {
|
||||||
|
slot: v.slot,
|
||||||
|
time: 0,
|
||||||
|
lockup_address: String::from(""),
|
||||||
|
event: Some(Event::SlotEvent(Empty {})),
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
println!("sending event failed: {}", e);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("watcher died: {}", e);
|
||||||
|
tx2.send(Err(Status::new(Code::Aborted, "watcher died")))
|
||||||
|
.await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Ok(Response::new(rx))
|
Ok(Response::new(rx))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue