Enable connections from server to client
This commit is contained in:
parent
f1d31b33d6
commit
211f439d37
|
@ -6,7 +6,7 @@ use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
|
||||||
|
|
||||||
use crate::quic::{configure_server::ALPN_GEYSER_PROTOCOL_ID, skip_verification::ClientSkipServerVerification};
|
use crate::quic::{configure_server::ALPN_GEYSER_PROTOCOL_ID, skip_verification::ClientSkipServerVerification};
|
||||||
|
|
||||||
pub fn create_client_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
|
pub fn create_client_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey, maximum_streams: u32) -> Endpoint {
|
||||||
const DATAGRAM_RECEIVE_BUFFER_SIZE: usize = 64 * 1024 * 1024;
|
const DATAGRAM_RECEIVE_BUFFER_SIZE: usize = 64 * 1024 * 1024;
|
||||||
const DATAGRAM_SEND_BUFFER_SIZE: usize = 64 * 1024 * 1024;
|
const DATAGRAM_SEND_BUFFER_SIZE: usize = 64 * 1024 * 1024;
|
||||||
const INITIAL_MAXIMUM_TRANSMISSION_UNIT: u16 = MINIMUM_MAXIMUM_TRANSMISSION_UNIT;
|
const INITIAL_MAXIMUM_TRANSMISSION_UNIT: u16 = MINIMUM_MAXIMUM_TRANSMISSION_UNIT;
|
||||||
|
@ -38,12 +38,12 @@ pub fn create_client_endpoint(certificate: rustls::Certificate, key: rustls::Pri
|
||||||
|
|
||||||
let timeout = IdleTimeout::try_from(Duration::from_secs(600)).unwrap();
|
let timeout = IdleTimeout::try_from(Duration::from_secs(600)).unwrap();
|
||||||
transport_config.max_idle_timeout(Some(timeout));
|
transport_config.max_idle_timeout(Some(timeout));
|
||||||
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
|
transport_config.keep_alive_interval(Some(Duration::from_secs(1)));
|
||||||
transport_config.datagram_receive_buffer_size(Some(DATAGRAM_RECEIVE_BUFFER_SIZE));
|
transport_config.datagram_receive_buffer_size(Some(DATAGRAM_RECEIVE_BUFFER_SIZE));
|
||||||
transport_config.datagram_send_buffer_size(DATAGRAM_SEND_BUFFER_SIZE);
|
transport_config.datagram_send_buffer_size(DATAGRAM_SEND_BUFFER_SIZE);
|
||||||
transport_config.initial_mtu(INITIAL_MAXIMUM_TRANSMISSION_UNIT);
|
transport_config.initial_mtu(INITIAL_MAXIMUM_TRANSMISSION_UNIT);
|
||||||
transport_config.max_concurrent_bidi_streams(VarInt::from(0u8));
|
transport_config.max_concurrent_bidi_streams(VarInt::from(maximum_streams));
|
||||||
transport_config.max_concurrent_uni_streams(VarInt::from(0u8));
|
transport_config.max_concurrent_uni_streams(VarInt::from(maximum_streams));
|
||||||
transport_config.min_mtu(MINIMUM_MAXIMUM_TRANSMISSION_UNIT);
|
transport_config.min_mtu(MINIMUM_MAXIMUM_TRANSMISSION_UNIT);
|
||||||
transport_config.mtu_discovery_config(None);
|
transport_config.mtu_discovery_config(None);
|
||||||
transport_config.enable_segmentation_offload(false);
|
transport_config.enable_segmentation_offload(false);
|
||||||
|
@ -55,11 +55,12 @@ pub fn create_client_endpoint(certificate: rustls::Certificate, key: rustls::Pri
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn configure_client(
|
pub async fn configure_client(
|
||||||
identity: &Keypair
|
identity: &Keypair,
|
||||||
|
maximum_concurrent_streams: u32,
|
||||||
) -> anyhow::Result<Endpoint> {
|
) -> anyhow::Result<Endpoint> {
|
||||||
let (certificate, key) = new_self_signed_tls_certificate(
|
let (certificate, key) = new_self_signed_tls_certificate(
|
||||||
&identity,
|
&identity,
|
||||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||||
)?;
|
)?;
|
||||||
Ok(create_client_endpoint(certificate, key))
|
Ok(create_client_endpoint(certificate, key, maximum_concurrent_streams))
|
||||||
}
|
}
|
|
@ -85,26 +85,25 @@ mod tests {
|
||||||
let sent_message = message.clone();
|
let sent_message = message.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let endpoint = configure_client(&Keypair::new()).await.unwrap();
|
let endpoint = configure_client(&Keypair::new(), 1).await.unwrap();
|
||||||
|
|
||||||
let connecting = endpoint
|
let connecting = endpoint
|
||||||
.connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), "tmp")
|
.connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), "tmp")
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let connection = connecting.await.unwrap();
|
let connection = connecting.await.unwrap();
|
||||||
let send_stream = connection.open_uni().await.unwrap();
|
let recv_stream = connection.accept_uni().await.unwrap();
|
||||||
send_message(send_stream, sent_message).await.unwrap();
|
let recved_message = recv_message(recv_stream).await.unwrap();
|
||||||
|
// assert if sent and recieved message match
|
||||||
|
assert_eq!(sent_message, recved_message);
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
let connecting = endpoint.accept().await.unwrap();
|
let connecting = endpoint.accept().await.unwrap();
|
||||||
let connection = connecting.await.unwrap();
|
let connection = connecting.await.unwrap();
|
||||||
let recv_stream = connection.accept_uni().await.unwrap();
|
|
||||||
|
|
||||||
let recved_message = recv_message(recv_stream).await.unwrap();
|
let send_stream = connection.open_uni().await.unwrap();
|
||||||
|
send_message(send_stream, message).await.unwrap();
|
||||||
jh.await.unwrap();
|
jh.await.unwrap();
|
||||||
// assert if sent and recieved message match
|
|
||||||
assert_eq!(message, recved_message);
|
|
||||||
endpoint.close(VarInt::from_u32(0), b"");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -143,7 +142,7 @@ mod tests {
|
||||||
let sent_message = message.clone();
|
let sent_message = message.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let endpoint = configure_client(&Keypair::new()).await.unwrap();
|
let endpoint = configure_client(&Keypair::new(), 0).await.unwrap();
|
||||||
|
|
||||||
let connecting = endpoint
|
let connecting = endpoint
|
||||||
.connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), "tmp")
|
.connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), "tmp")
|
||||||
|
|
Loading…
Reference in New Issue