From 855d8a09a7b85ce810246002858e43efe5aec99a Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Mon, 13 May 2024 17:49:17 +0200 Subject: [PATCH] Implementing quic geyser client --- Cargo.lock | 61 +++++++++++++++++++++++++++++++++ Cargo.toml | 1 + client/Cargo.toml | 9 +++++ client/src/client.rs | 81 ++++++++++++++++++++++++++++++++++++++++++++ client/src/lib.rs | 2 +- 5 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 client/src/client.rs diff --git a/Cargo.lock b/Cargo.lock index ae74fa3..cc7d609 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,6 +398,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "atty" version = "0.2.14" @@ -1296,6 +1318,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -1303,6 +1340,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -1311,6 +1349,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -1346,9 +1395,11 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -2395,6 +2446,16 @@ dependencies = [ [[package]] name = "quic-geyser-client" version = "0.1.0" +dependencies = [ + "anyhow", + "async-stream", + "futures", + "log", + "quic-geyser-common", + "quinn", + "solana-sdk", + "tokio", +] [[package]] name = "quic-geyser-common" diff --git a/Cargo.toml b/Cargo.toml index 263f207..a9a38b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ rcgen = "0.10.0" pkcs8 = "0.8.0" pem = "1.1.1" lz4 = "1.24.0" +async-stream = "0.3.5" quic-geyser-common = {path = "common", version="0.1.0"} quic-geyser-client = {path = "client", version="0.1.0"} diff --git a/client/Cargo.toml b/client/Cargo.toml index 9544101..a2bce09 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -6,3 +6,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +quinn = { workspace = "true" } +solana-sdk = { workspace = "true" } +anyhow = { workspace = "true" } +futures = { workspace = "true" } +async-stream = { workspace = "true" } +tokio = { workspace = "true" } +log = { workspace = "true" } + +quic-geyser-common = { path = "../common" } \ No newline at end of file diff --git a/client/src/client.rs b/client/src/client.rs new file mode 100644 index 0000000..be774b5 --- /dev/null +++ b/client/src/client.rs @@ -0,0 +1,81 @@ +use std::{net::SocketAddr, str::FromStr}; + +use async_stream::stream; +use futures::Stream; +use quic_geyser_common::filters::Filter; +use quic_geyser_common::message::Message; +use quic_geyser_common::quic::configure_client::configure_client; +use quic_geyser_common::quic::quinn_reciever::recv_message; +use quic_geyser_common::quic::quinn_sender::send_message; +use quinn::{Connection, ConnectionError}; +use solana_sdk::signature::Keypair; + +pub struct Client { + pub address: String, + connection: Connection, +} + +impl Client { + pub async fn new( + server_address: String, + identity: &Keypair, + max_concurrent_streams: u32, + ) -> anyhow::Result { + let endpoint = configure_client(identity, max_concurrent_streams).await?; + let socket_addr = SocketAddr::from_str(&server_address)?; + let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?; + let connection = connecting.await?; + + Ok(Client { + address: server_address, + connection, + }) + } + + pub async fn subscribe(&self, filters: Vec) -> anyhow::Result<()> { + let send_stream = self.connection.open_uni().await?; + send_message(send_stream, Message::Filters(filters)).await?; + Ok(()) + } + + pub fn get_stream(&self) -> impl Stream { + let connection = self.connection.clone(); + let (sender, mut reciever) = tokio::sync::mpsc::unbounded_channel::(); + tokio::spawn(async move { + loop { + let stream = connection.accept_uni().await; + match stream { + Ok(recv_stream) => { + let sender = sender.clone(); + tokio::spawn(async move { + let message = recv_message(recv_stream).await; + match message { + Ok(message) => { + let _ = sender.send(message); + } + Err(e) => { + log::error!("Error getting message {}", e); + } + } + }); + } + Err(e) => match &e { + ConnectionError::ConnectionClosed(_) + | ConnectionError::ApplicationClosed(_) + | ConnectionError::LocallyClosed => { + break; + } + _ => { + log::error!("Got {} while listing to the connection", e); + } + }, + } + } + }); + stream! { + while let Some(message) = reciever.recv().await { + yield message; + } + } + } +} diff --git a/client/src/lib.rs b/client/src/lib.rs index 8b13789..b9babe5 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1 +1 @@ - +pub mod client;