Implementing quic geyser client

This commit is contained in:
Godmode Galactus 2024-05-13 17:49:17 +02:00
parent da2dd63704
commit 855d8a09a7
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
5 changed files with 153 additions and 1 deletions

61
Cargo.lock generated
View File

@ -398,6 +398,28 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.61",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -1296,6 +1318,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@ -1303,6 +1340,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -1311,6 +1349,17 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
@ -1346,9 +1395,11 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
@ -2395,6 +2446,16 @@ dependencies = [
[[package]]
name = "quic-geyser-client"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"futures",
"log",
"quic-geyser-common",
"quinn",
"solana-sdk",
"tokio",
]
[[package]]
name = "quic-geyser-common"

View File

@ -44,6 +44,7 @@ rcgen = "0.10.0"
pkcs8 = "0.8.0"
pem = "1.1.1"
lz4 = "1.24.0"
async-stream = "0.3.5"
quic-geyser-common = {path = "common", version="0.1.0"}
quic-geyser-client = {path = "client", version="0.1.0"}

View File

@ -6,3 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
quinn = { workspace = "true" }
solana-sdk = { workspace = "true" }
anyhow = { workspace = "true" }
futures = { workspace = "true" }
async-stream = { workspace = "true" }
tokio = { workspace = "true" }
log = { workspace = "true" }
quic-geyser-common = { path = "../common" }

81
client/src/client.rs Normal file
View File

@ -0,0 +1,81 @@
use std::{net::SocketAddr, str::FromStr};
use async_stream::stream;
use futures::Stream;
use quic_geyser_common::filters::Filter;
use quic_geyser_common::message::Message;
use quic_geyser_common::quic::configure_client::configure_client;
use quic_geyser_common::quic::quinn_reciever::recv_message;
use quic_geyser_common::quic::quinn_sender::send_message;
use quinn::{Connection, ConnectionError};
use solana_sdk::signature::Keypair;
pub struct Client {
pub address: String,
connection: Connection,
}
impl Client {
pub async fn new(
server_address: String,
identity: &Keypair,
max_concurrent_streams: u32,
) -> anyhow::Result<Client> {
let endpoint = configure_client(identity, max_concurrent_streams).await?;
let socket_addr = SocketAddr::from_str(&server_address)?;
let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?;
let connection = connecting.await?;
Ok(Client {
address: server_address,
connection,
})
}
pub async fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let send_stream = self.connection.open_uni().await?;
send_message(send_stream, Message::Filters(filters)).await?;
Ok(())
}
pub fn get_stream(&self) -> impl Stream<Item = Message> {
let connection = self.connection.clone();
let (sender, mut reciever) = tokio::sync::mpsc::unbounded_channel::<Message>();
tokio::spawn(async move {
loop {
let stream = connection.accept_uni().await;
match stream {
Ok(recv_stream) => {
let sender = sender.clone();
tokio::spawn(async move {
let message = recv_message(recv_stream).await;
match message {
Ok(message) => {
let _ = sender.send(message);
}
Err(e) => {
log::error!("Error getting message {}", e);
}
}
});
}
Err(e) => match &e {
ConnectionError::ConnectionClosed(_)
| ConnectionError::ApplicationClosed(_)
| ConnectionError::LocallyClosed => {
break;
}
_ => {
log::error!("Got {} while listing to the connection", e);
}
},
}
}
});
stream! {
while let Some(message) = reciever.recv().await {
yield message;
}
}
}
}

View File

@ -1 +1 @@
pub mod client;