From 12a6ae31bcbba478fea61523ce5a96219cdcb19c Mon Sep 17 00:00:00 2001 From: Hendrik Hofstadt Date: Mon, 11 Jan 2021 12:43:05 +0100 Subject: [PATCH] Watch slots for Solana liveness monitoring (#141) * add slot notifications to the agent * fix proto field naming --- proto/agent/v1/service.proto | 1 + solana/agent/src/main.rs | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/proto/agent/v1/service.proto b/proto/agent/v1/service.proto index bed686b2..057f055d 100644 --- a/proto/agent/v1/service.proto +++ b/proto/agent/v1/service.proto @@ -35,6 +35,7 @@ message LockupEvent { LockupEventNew new = 4; LockupEventVAAPosted vaaPosted = 5; Empty empty = 6; + Empty slotEvent = 7; } } diff --git a/solana/agent/src/main.rs b/solana/agent/src/main.rs index 304fc03f..9c984684 100644 --- a/solana/agent/src/main.rs +++ b/solana/agent/src/main.rs @@ -161,9 +161,12 @@ impl Agent for AgentImpl { _req: Request, ) -> Result, Status> { let (mut tx, rx) = mpsc::channel(1); + let mut tx2 = tx.clone(); let url = self.url.clone(); + let url2 = self.url.clone(); let bridge = self.bridge.clone(); let rpc_url = self.rpc_url.clone(); + let rpc_url2 = self.rpc_url.clone(); tokio::spawn(async move { let _rpc = RpcClient::new(rpc_url.to_string()); @@ -264,6 +267,37 @@ impl Agent for AgentImpl { } }); + tokio::spawn(async move { + let _rpc = RpcClient::new(rpc_url2.to_string()); + let sub = solana_client::pubsub_client::PubsubClient::slot_subscribe(&url2).unwrap(); + // looping and sending our response using stream + loop { + let item = sub.1.recv(); + match item { + Ok(v) => { + if let Err(e) = tx2 + .send(Ok(LockupEvent { + slot: v.slot, + time: 0, + lockup_address: String::from(""), + event: Some(Event::SlotEvent(Empty {})), + })) + .await + { + println!("sending event failed: {}", e); + return; + }; + } + Err(e) => { + println!("watcher died: {}", e); + tx2.send(Err(Status::new(Code::Aborted, "watcher died"))) + .await; + return; + } + }; + } + }); + Ok(Response::new(rx)) } }