diff --git a/blocking_client/src/client.rs b/blocking_client/src/client.rs index 86d435d..a6aa499 100644 --- a/blocking_client/src/client.rs +++ b/blocking_client/src/client.rs @@ -2,6 +2,7 @@ use crate::configure_client::configure_client; use crate::quiche_client_loop::client_loop; use quic_geyser_common::filters::Filter; use quic_geyser_common::message::Message; +use quic_geyser_common::net::parse_host_port; use quic_geyser_common::types::connections_parameters::ConnectionParameters; use std::net::SocketAddr; use std::sync::atomic::AtomicBool; @@ -24,10 +25,9 @@ impl Client { connection_parameters.max_ack_delay, connection_parameters.ack_exponent, )?; - let server_address: SocketAddr = server_address.parse()?; - let socket_addr: SocketAddr = "0.0.0.0:0" - .parse() - .expect("Socket address should be returned"); + let server_address: SocketAddr = parse_host_port(&server_address)?; + let socket_addr: SocketAddr = + parse_host_port("[::]:0").expect("Socket address should be returned"); let is_connected = Arc::new(AtomicBool::new(false)); let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel(); let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel(); @@ -75,6 +75,7 @@ mod tests { config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, filters::Filter, message::Message, + net::parse_host_port, types::{ account::Account, connections_parameters::ConnectionParameters, slot_identifier::SlotIdentifier, @@ -103,8 +104,8 @@ mod tests { #[test] pub fn test_client() { - let server_sock: SocketAddr = "0.0.0.0:30000".parse().unwrap(); - let url = format!("127.0.0.1:{}", server_sock.port()); + let server_sock: SocketAddr = parse_host_port("[::]:30000").unwrap(); + let url = format!("::1:{}", server_sock.port()); let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2)); let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20)); diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index ec93c66..d0a5021 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -300,8 +300,7 @@ pub fn create_quiche_client_thread( #[cfg(test)] mod tests { use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - str::FromStr, + net::{IpAddr, Ipv6Addr, SocketAddr}, sync::{atomic::AtomicBool, mpsc, Arc}, thread::sleep, time::Duration, @@ -317,6 +316,7 @@ mod tests { config::QuicParameters, filters::Filter, message::Message, + net::parse_host_port, types::block_meta::SlotMeta, }; @@ -328,7 +328,7 @@ mod tests { fn test_send_and_recieve_of_large_account_with_client_loop() { tracing_subscriber::fmt::init(); // Setup the event loop. - let socket_addr = SocketAddr::from_str("0.0.0.0:10900").unwrap(); + let socket_addr = parse_host_port("[::]:10900").unwrap(); let port = 10900; let maximum_concurrent_streams = 100; @@ -420,14 +420,14 @@ mod tests { }); // client loop - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + let server_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port); let (client_sx_queue, rx_sent_queue) = mpsc::channel(); let (sx_recv_queue, client_rx_queue) = mpsc::channel(); let _client_loop_jh = std::thread::spawn(move || { let client_config = configure_client(maximum_concurrent_streams, 20_000_000, 1, 25, 3).unwrap(); - let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let socket_addr: SocketAddr = parse_host_port("[::]:0").unwrap(); let is_connected = Arc::new(AtomicBool::new(false)); if let Err(e) = client_loop( client_config, diff --git a/client/src/non_blocking/client.rs b/client/src/non_blocking/client.rs index f31c58f..81fdf2c 100644 --- a/client/src/non_blocking/client.rs +++ b/client/src/non_blocking/client.rs @@ -4,19 +4,20 @@ use quic_geyser_common::defaults::DEFAULT_MAX_RECIEVE_WINDOW_SIZE; use quic_geyser_common::defaults::MAX_PAYLOAD_BUFFER; use quic_geyser_common::filters::Filter; use quic_geyser_common::message::Message; +use quic_geyser_common::net::parse_host_port; use quic_geyser_common::types::connections_parameters::ConnectionParameters; use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, RecvStream, SendStream, TokioRuntime, TransportConfig, VarInt, }; -use std::net::{SocketAddr, UdpSocket}; -use std::str::FromStr; +use std::net::UdpSocket; use std::sync::Arc; use std::time::Duration; pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> Endpoint { let mut endpoint = { - let client_socket = UdpSocket::bind("0.0.0.0:0").expect("Client socket should be binded"); + let client_socket = UdpSocket::bind(parse_host_port("[::]:0").unwrap()) + .expect("Client socket should be binded"); let mut config = EndpointConfig::default(); config .max_udp_payload_size(MAX_PAYLOAD_BUFFER.try_into().unwrap()) @@ -105,7 +106,7 @@ impl Client { )> { let timeout: u64 = connection_parameters.timeout_in_seconds; let endpoint = create_client_endpoint(connection_parameters); - let socket_addr = SocketAddr::from_str(&server_address)?; + let socket_addr = parse_host_port(&server_address)?; let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?; let (message_sx_queue, message_rx_queue) = @@ -118,6 +119,11 @@ impl Client { // limit client to respond to 128k streams in parallel let semaphore = Arc::new(tokio::sync::Semaphore::new(128 * 1024)); loop { + // sender is closed / no messages to send + if message_sx_queue.is_closed() { + bail!("quic client stopped, sender closed"); + } + let permit = semaphore.clone().acquire_owned().await.unwrap(); let stream: Result = connection.accept_uni().await; match stream { @@ -130,7 +136,7 @@ impl Client { match message { Ok(message) => { if let Err(e) = sender.send(message) { - log::error!("Message sent error : {:?}", e) + log::error!("Message sent error : {:?}", e); } } Err(e) => { @@ -219,6 +225,7 @@ mod tests { config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, filters::Filter, message::Message, + net::parse_host_port, types::{ account::Account, connections_parameters::ConnectionParameters, slot_identifier::SlotIdentifier, @@ -247,7 +254,7 @@ mod tests { #[tokio::test] pub async fn test_non_blocking_client() { - let server_sock: SocketAddr = "0.0.0.0:20000".parse().unwrap(); + let server_sock: SocketAddr = parse_host_port("[::]:20000").unwrap(); let url = format!("127.0.0.1:{}", server_sock.port()); let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2)); diff --git a/common/src/lib.rs b/common/src/lib.rs index 4b8998b..a452165 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -4,5 +4,6 @@ pub mod config; pub mod defaults; pub mod filters; pub mod message; +pub mod net; pub mod plugin_error; pub mod types; diff --git a/common/src/net.rs b/common/src/net.rs new file mode 100644 index 0000000..f9024ae --- /dev/null +++ b/common/src/net.rs @@ -0,0 +1,44 @@ +use std::{ + net::{IpAddr, SocketAddr, ToSocketAddrs}, + str::FromStr, +}; + +pub fn parse_host(host: &str) -> anyhow::Result { + IpAddr::from_str(host).map_err(|e| anyhow::anyhow!("{:?}", e)) +} + +pub fn parse_host_port(host_port: &str) -> anyhow::Result { + let addrs: Vec<_> = host_port + .to_socket_addrs() + .map_err(|err| anyhow::anyhow!("Unable to resolve host {host_port}: {err}"))? + .collect(); + if addrs.is_empty() { + Err(anyhow::anyhow!("Unable to resolve host: {host_port}")) + } else { + Ok(addrs[0]) + } +} + +#[cfg(test)] +mod test { + use super::{parse_host, parse_host_port}; + + #[test] + fn test_parse_host_port() { + parse_host_port("localhost:1234").unwrap(); + parse_host_port("localhost").unwrap_err(); + parse_host_port("127.0.0.0:1234").unwrap(); + parse_host_port("127.0.0.0").unwrap_err(); + parse_host_port("[::]:1234").unwrap(); + parse_host_port("fcs-ams1._peer.internal:1234").unwrap(); + parse_host_port("fcs-ams1._peer.internal:8172").unwrap(); + } + + #[test] + fn test_parse_host() { + parse_host("127.0.0.1").unwrap(); + parse_host("::").unwrap(); + parse_host("localhost:1234").unwrap_err(); + // parse_host("http://fcs-ams1._peer.internal").unwrap(); + } +} diff --git a/examples/tester-server/src/main.rs b/examples/tester-server/src/main.rs index 22f37cf..b52f61c 100644 --- a/examples/tester-server/src/main.rs +++ b/examples/tester-server/src/main.rs @@ -1,19 +1,15 @@ -use std::{ - net::SocketAddr, - str::FromStr, - time::{Duration, Instant}, -}; - use clap::Parser; use cli::Args; use itertools::Itertools; use quic_geyser_common::{ channel_message::{AccountData, ChannelMessage}, config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, + net::parse_host_port, }; use quic_geyser_server::quic_server::QuicServer; use rand::{thread_rng, Rng}; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; +use std::time::{Duration, Instant}; pub mod cli; @@ -22,7 +18,7 @@ pub fn main() { let args = Args::parse(); let config = ConfigQuicPlugin { - address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), + address: parse_host_port(format!("[::]:{}", args.port).as_str()).unwrap(), log_level: "info".to_string(), quic_parameters: QuicParameters { max_number_of_streams_per_client: args.number_of_streams, diff --git a/proxy/src/main.rs b/proxy/src/main.rs index b04c6b6..89f8a7d 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -1,5 +1,3 @@ -use std::{net::SocketAddr, str::FromStr}; - use clap::Parser; use cli::Args; use quic_geyser_blocking_client::client::Client; @@ -7,6 +5,7 @@ use quic_geyser_common::{ channel_message::{AccountData, ChannelMessage}, config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, filters::Filter, + net::parse_host_port, types::connections_parameters::ConnectionParameters, }; use quic_geyser_server::quic_server::QuicServer; @@ -39,7 +38,7 @@ pub fn main() -> anyhow::Result<()> { ])?; let quic_config = ConfigQuicPlugin { - address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), + address: parse_host_port(format!("[::]:{}", args.port).as_str()).unwrap(), log_level: "info".to_string(), quic_parameters: QuicParameters { max_number_of_streams_per_client: args.max_number_of_streams_per_client,