Making everything use ipv6 instead of ipv4

This commit is contained in:
godmodegalactus 2024-09-30 16:21:27 +02:00
parent 38f3ad86e2
commit 34b6b05121
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
7 changed files with 75 additions and 27 deletions

View File

@ -2,6 +2,7 @@ use crate::configure_client::configure_client;
use crate::quiche_client_loop::client_loop; use crate::quiche_client_loop::client_loop;
use quic_geyser_common::filters::Filter; use quic_geyser_common::filters::Filter;
use quic_geyser_common::message::Message; use quic_geyser_common::message::Message;
use quic_geyser_common::net::parse_host_port;
use quic_geyser_common::types::connections_parameters::ConnectionParameters; use quic_geyser_common::types::connections_parameters::ConnectionParameters;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -24,10 +25,9 @@ impl Client {
connection_parameters.max_ack_delay, connection_parameters.max_ack_delay,
connection_parameters.ack_exponent, connection_parameters.ack_exponent,
)?; )?;
let server_address: SocketAddr = server_address.parse()?; let server_address: SocketAddr = parse_host_port(&server_address)?;
let socket_addr: SocketAddr = "0.0.0.0:0" let socket_addr: SocketAddr =
.parse() parse_host_port("[::]:0").expect("Socket address should be returned");
.expect("Socket address should be returned");
let is_connected = Arc::new(AtomicBool::new(false)); let is_connected = Arc::new(AtomicBool::new(false));
let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel(); let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel();
let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel(); let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel();
@ -75,6 +75,7 @@ mod tests {
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter, filters::Filter,
message::Message, message::Message,
net::parse_host_port,
types::{ types::{
account::Account, connections_parameters::ConnectionParameters, account::Account, connections_parameters::ConnectionParameters,
slot_identifier::SlotIdentifier, slot_identifier::SlotIdentifier,
@ -103,8 +104,8 @@ mod tests {
#[test] #[test]
pub fn test_client() { pub fn test_client() {
let server_sock: SocketAddr = "0.0.0.0:30000".parse().unwrap(); let server_sock: SocketAddr = parse_host_port("[::]:30000").unwrap();
let url = format!("127.0.0.1:{}", server_sock.port()); let url = format!("::1:{}", server_sock.port());
let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2)); let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2));
let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20)); let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20));

View File

@ -300,8 +300,7 @@ pub fn create_quiche_client_thread(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv6Addr, SocketAddr},
str::FromStr,
sync::{atomic::AtomicBool, mpsc, Arc}, sync::{atomic::AtomicBool, mpsc, Arc},
thread::sleep, thread::sleep,
time::Duration, time::Duration,
@ -317,6 +316,7 @@ mod tests {
config::QuicParameters, config::QuicParameters,
filters::Filter, filters::Filter,
message::Message, message::Message,
net::parse_host_port,
types::block_meta::SlotMeta, types::block_meta::SlotMeta,
}; };
@ -328,7 +328,7 @@ mod tests {
fn test_send_and_recieve_of_large_account_with_client_loop() { fn test_send_and_recieve_of_large_account_with_client_loop() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
// Setup the event loop. // Setup the event loop.
let socket_addr = SocketAddr::from_str("0.0.0.0:10900").unwrap(); let socket_addr = parse_host_port("[::]:10900").unwrap();
let port = 10900; let port = 10900;
let maximum_concurrent_streams = 100; let maximum_concurrent_streams = 100;
@ -420,14 +420,14 @@ mod tests {
}); });
// client loop // client loop
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let server_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port);
let (client_sx_queue, rx_sent_queue) = mpsc::channel(); let (client_sx_queue, rx_sent_queue) = mpsc::channel();
let (sx_recv_queue, client_rx_queue) = mpsc::channel(); let (sx_recv_queue, client_rx_queue) = mpsc::channel();
let _client_loop_jh = std::thread::spawn(move || { let _client_loop_jh = std::thread::spawn(move || {
let client_config = let client_config =
configure_client(maximum_concurrent_streams, 20_000_000, 1, 25, 3).unwrap(); configure_client(maximum_concurrent_streams, 20_000_000, 1, 25, 3).unwrap();
let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); let socket_addr: SocketAddr = parse_host_port("[::]:0").unwrap();
let is_connected = Arc::new(AtomicBool::new(false)); let is_connected = Arc::new(AtomicBool::new(false));
if let Err(e) = client_loop( if let Err(e) = client_loop(
client_config, client_config,

View File

@ -4,19 +4,20 @@ use quic_geyser_common::defaults::DEFAULT_MAX_RECIEVE_WINDOW_SIZE;
use quic_geyser_common::defaults::MAX_PAYLOAD_BUFFER; use quic_geyser_common::defaults::MAX_PAYLOAD_BUFFER;
use quic_geyser_common::filters::Filter; use quic_geyser_common::filters::Filter;
use quic_geyser_common::message::Message; use quic_geyser_common::message::Message;
use quic_geyser_common::net::parse_host_port;
use quic_geyser_common::types::connections_parameters::ConnectionParameters; use quic_geyser_common::types::connections_parameters::ConnectionParameters;
use quinn::{ use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, RecvStream, ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, RecvStream,
SendStream, TokioRuntime, TransportConfig, VarInt, SendStream, TokioRuntime, TransportConfig, VarInt,
}; };
use std::net::{SocketAddr, UdpSocket}; use std::net::UdpSocket;
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> Endpoint { pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> Endpoint {
let mut endpoint = { let mut endpoint = {
let client_socket = UdpSocket::bind("0.0.0.0:0").expect("Client socket should be binded"); let client_socket = UdpSocket::bind(parse_host_port("[::]:0").unwrap())
.expect("Client socket should be binded");
let mut config = EndpointConfig::default(); let mut config = EndpointConfig::default();
config config
.max_udp_payload_size(MAX_PAYLOAD_BUFFER.try_into().unwrap()) .max_udp_payload_size(MAX_PAYLOAD_BUFFER.try_into().unwrap())
@ -105,7 +106,7 @@ impl Client {
)> { )> {
let timeout: u64 = connection_parameters.timeout_in_seconds; let timeout: u64 = connection_parameters.timeout_in_seconds;
let endpoint = create_client_endpoint(connection_parameters); let endpoint = create_client_endpoint(connection_parameters);
let socket_addr = SocketAddr::from_str(&server_address)?; let socket_addr = parse_host_port(&server_address)?;
let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?; let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?;
let (message_sx_queue, message_rx_queue) = let (message_sx_queue, message_rx_queue) =
@ -118,6 +119,11 @@ impl Client {
// limit client to respond to 128k streams in parallel // limit client to respond to 128k streams in parallel
let semaphore = Arc::new(tokio::sync::Semaphore::new(128 * 1024)); let semaphore = Arc::new(tokio::sync::Semaphore::new(128 * 1024));
loop { loop {
// sender is closed / no messages to send
if message_sx_queue.is_closed() {
bail!("quic client stopped, sender closed");
}
let permit = semaphore.clone().acquire_owned().await.unwrap(); let permit = semaphore.clone().acquire_owned().await.unwrap();
let stream: Result<RecvStream, ConnectionError> = connection.accept_uni().await; let stream: Result<RecvStream, ConnectionError> = connection.accept_uni().await;
match stream { match stream {
@ -130,7 +136,7 @@ impl Client {
match message { match message {
Ok(message) => { Ok(message) => {
if let Err(e) = sender.send(message) { if let Err(e) = sender.send(message) {
log::error!("Message sent error : {:?}", e) log::error!("Message sent error : {:?}", e);
} }
} }
Err(e) => { Err(e) => {
@ -219,6 +225,7 @@ mod tests {
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter, filters::Filter,
message::Message, message::Message,
net::parse_host_port,
types::{ types::{
account::Account, connections_parameters::ConnectionParameters, account::Account, connections_parameters::ConnectionParameters,
slot_identifier::SlotIdentifier, slot_identifier::SlotIdentifier,
@ -247,7 +254,7 @@ mod tests {
#[tokio::test] #[tokio::test]
pub async fn test_non_blocking_client() { pub async fn test_non_blocking_client() {
let server_sock: SocketAddr = "0.0.0.0:20000".parse().unwrap(); let server_sock: SocketAddr = parse_host_port("[::]:20000").unwrap();
let url = format!("127.0.0.1:{}", server_sock.port()); let url = format!("127.0.0.1:{}", server_sock.port());
let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2)); let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2));

View File

@ -4,5 +4,6 @@ pub mod config;
pub mod defaults; pub mod defaults;
pub mod filters; pub mod filters;
pub mod message; pub mod message;
pub mod net;
pub mod plugin_error; pub mod plugin_error;
pub mod types; pub mod types;

44
common/src/net.rs Normal file
View File

@ -0,0 +1,44 @@
use std::{
net::{IpAddr, SocketAddr, ToSocketAddrs},
str::FromStr,
};
pub fn parse_host(host: &str) -> anyhow::Result<IpAddr> {
IpAddr::from_str(host).map_err(|e| anyhow::anyhow!("{:?}", e))
}
pub fn parse_host_port(host_port: &str) -> anyhow::Result<SocketAddr> {
let addrs: Vec<_> = host_port
.to_socket_addrs()
.map_err(|err| anyhow::anyhow!("Unable to resolve host {host_port}: {err}"))?
.collect();
if addrs.is_empty() {
Err(anyhow::anyhow!("Unable to resolve host: {host_port}"))
} else {
Ok(addrs[0])
}
}
#[cfg(test)]
mod test {
use super::{parse_host, parse_host_port};
#[test]
fn test_parse_host_port() {
parse_host_port("localhost:1234").unwrap();
parse_host_port("localhost").unwrap_err();
parse_host_port("127.0.0.0:1234").unwrap();
parse_host_port("127.0.0.0").unwrap_err();
parse_host_port("[::]:1234").unwrap();
parse_host_port("fcs-ams1._peer.internal:1234").unwrap();
parse_host_port("fcs-ams1._peer.internal:8172").unwrap();
}
#[test]
fn test_parse_host() {
parse_host("127.0.0.1").unwrap();
parse_host("::").unwrap();
parse_host("localhost:1234").unwrap_err();
// parse_host("http://fcs-ams1._peer.internal").unwrap();
}
}

View File

@ -1,19 +1,15 @@
use std::{
net::SocketAddr,
str::FromStr,
time::{Duration, Instant},
};
use clap::Parser; use clap::Parser;
use cli::Args; use cli::Args;
use itertools::Itertools; use itertools::Itertools;
use quic_geyser_common::{ use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage}, channel_message::{AccountData, ChannelMessage},
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
net::parse_host_port,
}; };
use quic_geyser_server::quic_server::QuicServer; use quic_geyser_server::quic_server::QuicServer;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::time::{Duration, Instant};
pub mod cli; pub mod cli;
@ -22,7 +18,7 @@ pub fn main() {
let args = Args::parse(); let args = Args::parse();
let config = ConfigQuicPlugin { let config = ConfigQuicPlugin {
address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), address: parse_host_port(format!("[::]:{}", args.port).as_str()).unwrap(),
log_level: "info".to_string(), log_level: "info".to_string(),
quic_parameters: QuicParameters { quic_parameters: QuicParameters {
max_number_of_streams_per_client: args.number_of_streams, max_number_of_streams_per_client: args.number_of_streams,

View File

@ -1,5 +1,3 @@
use std::{net::SocketAddr, str::FromStr};
use clap::Parser; use clap::Parser;
use cli::Args; use cli::Args;
use quic_geyser_blocking_client::client::Client; use quic_geyser_blocking_client::client::Client;
@ -7,6 +5,7 @@ use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage}, channel_message::{AccountData, ChannelMessage},
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter, filters::Filter,
net::parse_host_port,
types::connections_parameters::ConnectionParameters, types::connections_parameters::ConnectionParameters,
}; };
use quic_geyser_server::quic_server::QuicServer; use quic_geyser_server::quic_server::QuicServer;
@ -39,7 +38,7 @@ pub fn main() -> anyhow::Result<()> {
])?; ])?;
let quic_config = ConfigQuicPlugin { let quic_config = ConfigQuicPlugin {
address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), address: parse_host_port(format!("[::]:{}", args.port).as_str()).unwrap(),
log_level: "info".to_string(), log_level: "info".to_string(),
quic_parameters: QuicParameters { quic_parameters: QuicParameters {
max_number_of_streams_per_client: args.max_number_of_streams_per_client, max_number_of_streams_per_client: args.max_number_of_streams_per_client,