Adding a test for transfer between server and client
This commit is contained in:
parent
55085c9120
commit
f1d31b33d6
|
@ -778,6 +778,22 @@ dependencies = [
|
|||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "3.2.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"bitflags 1.3.2",
|
||||
"clap_lex 0.2.4",
|
||||
"indexmap 1.9.3",
|
||||
"once_cell",
|
||||
"strsim 0.10.0",
|
||||
"termcolor",
|
||||
"textwrap",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.4"
|
||||
|
@ -796,7 +812,7 @@ checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4"
|
|||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
"clap_lex",
|
||||
"clap_lex 0.7.0",
|
||||
"strsim 0.11.1",
|
||||
]
|
||||
|
||||
|
@ -812,6 +828,15 @@ dependencies = [
|
|||
"syn 2.0.61",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
|
||||
dependencies = [
|
||||
"os_str_bytes",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.7.0"
|
||||
|
@ -1384,7 +1409,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"agave-geyser-plugin-interface",
|
||||
"anyhow",
|
||||
"clap",
|
||||
"clap 4.5.4",
|
||||
"quic-geyser-common",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
@ -1421,7 +1446,7 @@ dependencies = [
|
|||
"futures-sink",
|
||||
"futures-util",
|
||||
"http",
|
||||
"indexmap",
|
||||
"indexmap 2.2.6",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
|
@ -1446,6 +1471,12 @@ dependencies = [
|
|||
"ahash 0.7.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.13.2"
|
||||
|
@ -1651,6 +1682,16 @@ dependencies = [
|
|||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.2.6"
|
||||
|
@ -2125,6 +2166,12 @@ version = "0.1.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "os_str_bytes"
|
||||
version = "6.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.2"
|
||||
|
@ -2355,11 +2402,13 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"log",
|
||||
"lz4",
|
||||
"pem 3.0.4",
|
||||
"quinn",
|
||||
"rustls",
|
||||
"serde",
|
||||
"solana-net-utils",
|
||||
"solana-sdk",
|
||||
"solana-streamer",
|
||||
"tokio",
|
||||
|
@ -3092,6 +3141,28 @@ dependencies = [
|
|||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-net-utils"
|
||||
version = "1.18.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b1f2634fd50743e2ca075e663e07b0bd5c2f94db0ac320ce5bc2022e0002d82d"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"clap 3.2.25",
|
||||
"crossbeam-channel",
|
||||
"log",
|
||||
"nix",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"socket2",
|
||||
"solana-logger",
|
||||
"solana-sdk",
|
||||
"solana-version",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-perf"
|
||||
version = "1.18.12"
|
||||
|
@ -3299,7 +3370,7 @@ dependencies = [
|
|||
"crossbeam-channel",
|
||||
"futures-util",
|
||||
"histogram",
|
||||
"indexmap",
|
||||
"indexmap 2.2.6",
|
||||
"itertools",
|
||||
"libc",
|
||||
"log",
|
||||
|
@ -3346,6 +3417,22 @@ dependencies = [
|
|||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-version"
|
||||
version = "1.18.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42c7cef8aa9f1c633bf09dd91b8e635b6b30c40236652031b1800b245dc1bd02"
|
||||
dependencies = [
|
||||
"log",
|
||||
"rustc_version",
|
||||
"semver",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"solana-frozen-abi",
|
||||
"solana-frozen-abi-macro",
|
||||
"solana-sdk",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-vote-program"
|
||||
version = "1.18.12"
|
||||
|
@ -3745,6 +3832,12 @@ dependencies = [
|
|||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "textwrap"
|
||||
version = "0.16.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9"
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.60"
|
||||
|
@ -3904,7 +3997,7 @@ version = "0.19.15"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"indexmap 2.2.6",
|
||||
"toml_datetime",
|
||||
"winnow",
|
||||
]
|
||||
|
@ -3915,7 +4008,7 @@ version = "0.21.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"indexmap 2.2.6",
|
||||
"toml_datetime",
|
||||
"winnow",
|
||||
]
|
||||
|
|
|
@ -18,7 +18,9 @@ edition = "2021"
|
|||
tokio = "1.37.0"
|
||||
solana-sdk = "~1.18.11"
|
||||
agave-geyser-plugin-interface = "~1.18.11"
|
||||
solana-net-utils = "~1.18.11"
|
||||
solana-streamer = "~1.18.11"
|
||||
|
||||
itertools = "0.10.5"
|
||||
serde = "1.0.201"
|
||||
clap = "4.2.4"
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
|
|
@ -8,6 +8,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
solana-sdk = { workspace = "true" }
|
||||
solana-streamer = { workspace = "true" }
|
||||
solana-net-utils = { workspace = "true" }
|
||||
serde = { workspace = "true" }
|
||||
bincode = { workspace = "true" }
|
||||
lz4 = { workspace = "true" }
|
||||
|
@ -16,5 +17,6 @@ rustls = { workspace = "true", default-features = false }
|
|||
pem = { workspace = "true" }
|
||||
anyhow = { workspace = "true" }
|
||||
tokio = { workspace = "true" }
|
||||
log = { workspace = "true" }
|
||||
|
||||
[dev-dependencies]
|
|
@ -8,7 +8,7 @@ pub enum CompressionType {
|
|||
}
|
||||
|
||||
impl Default for CompressionType {
|
||||
fn default() -> Self{
|
||||
fn default() -> Self {
|
||||
Self::Lz4Fast(8)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,4 +7,4 @@ use solana_sdk::pubkey::Pubkey;
|
|||
pub struct AccountFilter {
|
||||
owner: Option<Pubkey>,
|
||||
accounts: Option<HashSet<Pubkey>>,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
pub mod filters;
|
||||
pub mod types;
|
||||
pub mod compression;
|
||||
pub mod filters;
|
||||
pub mod message;
|
||||
pub mod quic;
|
||||
pub mod quic;
|
||||
pub mod types;
|
||||
|
|
|
@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use crate::types::account::Account;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum Message {
|
||||
AccountMsg(Account),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
use std::{net::{IpAddr, Ipv4Addr}, sync::Arc, time::Duration};
|
||||
|
||||
use quinn::{ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt};
|
||||
use solana_sdk::signature::Keypair;
|
||||
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
|
||||
|
||||
use crate::quic::{configure_server::ALPN_GEYSER_PROTOCOL_ID, skip_verification::ClientSkipServerVerification};
|
||||
|
||||
pub fn create_client_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
|
||||
const DATAGRAM_RECEIVE_BUFFER_SIZE: usize = 64 * 1024 * 1024;
|
||||
const DATAGRAM_SEND_BUFFER_SIZE: usize = 64 * 1024 * 1024;
|
||||
const INITIAL_MAXIMUM_TRANSMISSION_UNIT: u16 = MINIMUM_MAXIMUM_TRANSMISSION_UNIT;
|
||||
const MINIMUM_MAXIMUM_TRANSMISSION_UNIT: u16 = 2000;
|
||||
|
||||
let mut endpoint = {
|
||||
let client_socket =
|
||||
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000))
|
||||
.expect("create_endpoint bind_in_range")
|
||||
.1;
|
||||
let mut config = EndpointConfig::default();
|
||||
config
|
||||
.max_udp_payload_size(MINIMUM_MAXIMUM_TRANSMISSION_UNIT)
|
||||
.expect("Should set max MTU");
|
||||
quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
|
||||
.expect("create_endpoint quinn::Endpoint::new")
|
||||
};
|
||||
|
||||
let mut crypto = rustls::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_custom_certificate_verifier(Arc::new(ClientSkipServerVerification {}))
|
||||
.with_client_auth_cert(vec![certificate], key)
|
||||
.unwrap();
|
||||
crypto.enable_early_data = true;
|
||||
crypto.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];
|
||||
|
||||
let mut config = ClientConfig::new(Arc::new(crypto));
|
||||
let mut transport_config = TransportConfig::default();
|
||||
|
||||
let timeout = IdleTimeout::try_from(Duration::from_secs(600)).unwrap();
|
||||
transport_config.max_idle_timeout(Some(timeout));
|
||||
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
|
||||
transport_config.datagram_receive_buffer_size(Some(DATAGRAM_RECEIVE_BUFFER_SIZE));
|
||||
transport_config.datagram_send_buffer_size(DATAGRAM_SEND_BUFFER_SIZE);
|
||||
transport_config.initial_mtu(INITIAL_MAXIMUM_TRANSMISSION_UNIT);
|
||||
transport_config.max_concurrent_bidi_streams(VarInt::from(0u8));
|
||||
transport_config.max_concurrent_uni_streams(VarInt::from(0u8));
|
||||
transport_config.min_mtu(MINIMUM_MAXIMUM_TRANSMISSION_UNIT);
|
||||
transport_config.mtu_discovery_config(None);
|
||||
transport_config.enable_segmentation_offload(false);
|
||||
config.transport_config(Arc::new(transport_config));
|
||||
|
||||
endpoint.set_default_client_config(config);
|
||||
|
||||
endpoint
|
||||
}
|
||||
|
||||
pub async fn configure_client(
|
||||
identity: &Keypair
|
||||
) -> anyhow::Result<Endpoint> {
|
||||
let (certificate, key) = new_self_signed_tls_certificate(
|
||||
&identity,
|
||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
)?;
|
||||
Ok(create_client_endpoint(certificate, key))
|
||||
}
|
|
@ -3,13 +3,15 @@ use std::{net::IpAddr, sync::Arc, time::Duration};
|
|||
use pem::Pem;
|
||||
use quinn::{IdleTimeout, ServerConfig};
|
||||
use solana_sdk::{pubkey::Pubkey, signature::Keypair};
|
||||
use solana_streamer::{quic::QuicServerError, tls_certificates::{get_pubkey_from_tls_certificate, new_self_signed_tls_certificate}};
|
||||
use solana_streamer::{
|
||||
quic::QuicServerError,
|
||||
tls_certificates::{get_pubkey_from_tls_certificate, new_self_signed_tls_certificate},
|
||||
};
|
||||
|
||||
use super::skip_client_verification::SkipClientVerification;
|
||||
use super::skip_verification::ServerSkipClientVerification;
|
||||
|
||||
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"quic_geyser_plugin";
|
||||
|
||||
|
||||
pub fn configure_server(
|
||||
identity_keypair: &Keypair,
|
||||
host: IpAddr,
|
||||
|
@ -24,7 +26,7 @@ pub fn configure_server(
|
|||
|
||||
let mut server_tls_config = rustls::ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_client_cert_verifier(SkipClientVerification::new())
|
||||
.with_client_cert_verifier(ServerSkipClientVerification::new())
|
||||
.with_single_cert(vec![cert], priv_key)?;
|
||||
server_tls_config.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];
|
||||
|
||||
|
@ -32,11 +34,11 @@ pub fn configure_server(
|
|||
server_config.use_retry(true);
|
||||
let config = Arc::get_mut(&mut server_config.transport).unwrap();
|
||||
|
||||
config.max_concurrent_uni_streams((0 as u32).into());
|
||||
config.max_concurrent_uni_streams((max_concurrent_streams as u32).into());
|
||||
let recv_size = (recieve_window_size as u32).into();
|
||||
config.stream_receive_window(recv_size);
|
||||
config.receive_window(recv_size);
|
||||
|
||||
|
||||
let timeout = Duration::from_secs(connection_timeout);
|
||||
let timeout = IdleTimeout::try_from(timeout).unwrap();
|
||||
config.max_idle_timeout(Some(timeout));
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
pub mod configure_client;
|
||||
pub mod configure_server;
|
||||
pub mod quinn_reciever;
|
||||
pub mod quinn_sender;
|
||||
pub mod skip_client_verification;
|
||||
pub mod skip_verification;
|
||||
|
|
|
@ -1,27 +1,168 @@
|
|||
use quinn::SendStream;
|
||||
use anyhow::bail;
|
||||
use quinn::RecvStream;
|
||||
|
||||
use crate::message::Message;
|
||||
|
||||
pub fn convert_to_binary(message: Message) -> anyhow::Result<Vec<u8>> {
|
||||
let mut binary = bincode::serialize(&message)?;
|
||||
let size = binary.len() as u64;
|
||||
// prepend size to the binary object
|
||||
let size_bytes = size.to_le_bytes().to_vec();
|
||||
binary.splice(0..0, size_bytes);
|
||||
Ok(binary)
|
||||
pub fn convert_binary_to_message(bytes: Vec<u8>) -> anyhow::Result<Message> {
|
||||
Ok(bincode::deserialize::<Message>(&bytes)?)
|
||||
}
|
||||
|
||||
pub async fn send_message(mut send_stream : SendStream, message: Message) -> anyhow::Result<()> {
|
||||
let binary = convert_to_binary(message)?;
|
||||
send_stream.write_all(&binary).await?;
|
||||
send_stream.finish().await?;
|
||||
Ok(())
|
||||
pub async fn recv_message(mut recv_stream: RecvStream) -> anyhow::Result<Message> {
|
||||
let chunk = recv_stream.read_chunk(8, true).await?;
|
||||
if let Some(chunk) = chunk {
|
||||
assert!(chunk.offset == 0);
|
||||
let size_bytes = chunk.bytes.to_vec();
|
||||
assert!(size_bytes.len() == 8);
|
||||
|
||||
let size_bytes: [u8; 8] = size_bytes.try_into().unwrap();
|
||||
let size = u64::from_le_bytes(size_bytes) as usize;
|
||||
let mut buffer: Vec<u8> = vec![0; size];
|
||||
while let Some(data) = recv_stream.read_chunk(size, false).await? {
|
||||
let bytes = data.bytes.to_vec();
|
||||
let offset = data.offset - 8;
|
||||
let begin_offset = offset as usize;
|
||||
let end_offset = offset as usize + data.bytes.len();
|
||||
buffer[begin_offset..end_offset].copy_from_slice(&bytes);
|
||||
}
|
||||
convert_binary_to_message(buffer)
|
||||
} else {
|
||||
bail!("Stream was finished")
|
||||
}
|
||||
}
|
||||
|
||||
mod tests {
|
||||
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use quinn::{Endpoint, EndpointConfig, TokioRuntime, VarInt};
|
||||
use solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Keypair};
|
||||
|
||||
use crate::{
|
||||
message::Message,
|
||||
quic::{
|
||||
configure_client::configure_client, configure_server::configure_server,
|
||||
quinn_reciever::recv_message, quinn_sender::send_message,
|
||||
},
|
||||
types::{account::Account, slot_identifier::SlotIdentifier},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_send_and_recieve_of_small_account() {
|
||||
let (config, _) = configure_server(
|
||||
&Keypair::new(),
|
||||
IpAddr::V4(Ipv4Addr::LOCALHOST),
|
||||
1,
|
||||
100000,
|
||||
1,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let port = sock.local_addr().unwrap().port();
|
||||
|
||||
let endpoint = Endpoint::new(
|
||||
EndpointConfig::default(),
|
||||
Some(config),
|
||||
sock,
|
||||
Arc::new(TokioRuntime),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let account = Account {
|
||||
slot_identifier: SlotIdentifier {
|
||||
slot: 12345678,
|
||||
blockhash: Hash::new_unique(),
|
||||
},
|
||||
pubkey: Pubkey::new_unique(),
|
||||
data: vec![6; 2],
|
||||
};
|
||||
let message = Message::AccountMsg(account);
|
||||
|
||||
let jh = {
|
||||
let sent_message = message.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let endpoint = configure_client(&Keypair::new()).await.unwrap();
|
||||
|
||||
let connecting = endpoint
|
||||
.connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), "tmp")
|
||||
.unwrap();
|
||||
let connection = connecting.await.unwrap();
|
||||
let send_stream = connection.open_uni().await.unwrap();
|
||||
send_message(send_stream, sent_message).await.unwrap();
|
||||
})
|
||||
};
|
||||
|
||||
let connecting = endpoint.accept().await.unwrap();
|
||||
let connection = connecting.await.unwrap();
|
||||
let recv_stream = connection.accept_uni().await.unwrap();
|
||||
|
||||
let recved_message = recv_message(recv_stream).await.unwrap();
|
||||
jh.await.unwrap();
|
||||
// assert if sent and recieved message match
|
||||
assert_eq!(message, recved_message);
|
||||
endpoint.close(VarInt::from_u32(0), b"");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_send_and_recieve_of_large_account() {
|
||||
let (config, _) = configure_server(
|
||||
&Keypair::new(),
|
||||
IpAddr::V4(Ipv4Addr::LOCALHOST),
|
||||
1,
|
||||
100000,
|
||||
60,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let port = sock.local_addr().unwrap().port();
|
||||
|
||||
let endpoint = Endpoint::new(
|
||||
EndpointConfig::default(),
|
||||
Some(config),
|
||||
sock,
|
||||
Arc::new(TokioRuntime),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let account = Account {
|
||||
slot_identifier: SlotIdentifier {
|
||||
slot: 12345678,
|
||||
blockhash: Hash::new_unique(),
|
||||
},
|
||||
pubkey: Pubkey::new_unique(),
|
||||
data: vec![6; 100_000_000],
|
||||
};
|
||||
let message = Message::AccountMsg(account);
|
||||
|
||||
let jh = {
|
||||
let sent_message = message.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let endpoint = configure_client(&Keypair::new()).await.unwrap();
|
||||
|
||||
let connecting = endpoint
|
||||
.connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), "tmp")
|
||||
.unwrap();
|
||||
let connection = connecting.await.unwrap();
|
||||
let send_stream = connection.open_uni().await.unwrap();
|
||||
send_message(send_stream, sent_message).await.unwrap();
|
||||
})
|
||||
};
|
||||
|
||||
let connecting = endpoint.accept().await.unwrap();
|
||||
let connection = connecting.await.unwrap();
|
||||
let recv_stream = connection.accept_uni().await.unwrap();
|
||||
|
||||
let recved_message = recv_message(recv_stream).await.unwrap();
|
||||
jh.await.unwrap();
|
||||
// assert if sent and recieved message match
|
||||
assert_eq!(message, recved_message);
|
||||
endpoint.close(VarInt::from_u32(0), b"");
|
||||
}
|
||||
|
||||
}
|
|
@ -1,30 +1,19 @@
|
|||
use anyhow::bail;
|
||||
use quinn::RecvStream;
|
||||
use quinn::SendStream;
|
||||
|
||||
use crate::message::Message;
|
||||
|
||||
pub fn convert_binary_to_message(bytes: Vec<u8>) -> anyhow::Result<Message> {
|
||||
Ok(bincode::deserialize::<Message>(&bytes)?)
|
||||
pub fn convert_to_binary(message: Message) -> anyhow::Result<Vec<u8>> {
|
||||
let mut binary = bincode::serialize(&message)?;
|
||||
let size = binary.len() as u64;
|
||||
// prepend size to the binary object
|
||||
let size_bytes = size.to_le_bytes().to_vec();
|
||||
binary.splice(0..0, size_bytes);
|
||||
Ok(binary)
|
||||
}
|
||||
|
||||
pub async fn recv_message(mut recv_stream: RecvStream) -> anyhow::Result<Message> {
|
||||
let chunk = recv_stream.read_chunk(8, true).await?;
|
||||
if let Some(chunk) = chunk {
|
||||
assert!(chunk.offset == 0);
|
||||
let size_bytes = chunk.bytes.to_vec();
|
||||
assert!(size_bytes.len() == 8);
|
||||
|
||||
let size_bytes: [u8; 8] = size_bytes.try_into().unwrap();
|
||||
let size = u64::from_le_bytes(size_bytes) as usize;
|
||||
let mut buffer: Vec<u8> = vec![0; size];
|
||||
while let Some(data) = recv_stream.read_chunk(size, false).await? {
|
||||
let bytes = data.bytes.to_vec();
|
||||
let begin_offset = data.offset as usize;
|
||||
let end_offset = data.offset as usize + data.bytes.len();
|
||||
buffer[begin_offset..end_offset].copy_from_slice(&bytes);
|
||||
}
|
||||
convert_binary_to_message(buffer)
|
||||
} else {
|
||||
bail!("Stream was finished")
|
||||
}
|
||||
}
|
||||
pub async fn send_message(mut send_stream: SendStream, message: Message) -> anyhow::Result<()> {
|
||||
let binary = convert_to_binary(message)?;
|
||||
send_stream.write_all(&binary).await?;
|
||||
send_stream.finish().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
use std::{sync::Arc, time::SystemTime};
|
||||
|
||||
use rustls::{server::ClientCertVerified, Certificate, DistinguishedName};
|
||||
|
||||
pub struct SkipClientVerification;
|
||||
|
||||
impl SkipClientVerification {
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl rustls::server::ClientCertVerifier for SkipClientVerification {
|
||||
fn client_auth_root_subjects(&self) -> &[DistinguishedName] {
|
||||
&[]
|
||||
}
|
||||
|
||||
fn verify_client_cert(
|
||||
&self,
|
||||
_end_entity: &Certificate,
|
||||
_intermediates: &[Certificate],
|
||||
_now: SystemTime,
|
||||
) -> Result<ClientCertVerified, rustls::Error> {
|
||||
Ok(rustls::server::ClientCertVerified::assertion())
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
use std::{sync::Arc, time::SystemTime};
|
||||
|
||||
use rustls::{server::ClientCertVerified, Certificate, DistinguishedName};
|
||||
|
||||
pub struct ServerSkipClientVerification;
|
||||
|
||||
impl ServerSkipClientVerification {
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl rustls::server::ClientCertVerifier for ServerSkipClientVerification {
|
||||
fn client_auth_root_subjects(&self) -> &[DistinguishedName] {
|
||||
&[]
|
||||
}
|
||||
|
||||
fn verify_client_cert(
|
||||
&self,
|
||||
_end_entity: &Certificate,
|
||||
_intermediates: &[Certificate],
|
||||
_now: SystemTime,
|
||||
) -> Result<ClientCertVerified, rustls::Error> {
|
||||
Ok(rustls::server::ClientCertVerified::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ClientSkipServerVerification;
|
||||
|
||||
impl ClientSkipServerVerification {
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl rustls::client::ServerCertVerifier for ClientSkipServerVerification {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::Certificate,
|
||||
_intermediates: &[rustls::Certificate],
|
||||
_server_name: &rustls::ServerName,
|
||||
_scts: &mut dyn Iterator<Item = &[u8]>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: std::time::SystemTime,
|
||||
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::ServerCertVerified::assertion())
|
||||
}
|
||||
}
|
|
@ -3,9 +3,9 @@ use solana_sdk::pubkey::Pubkey;
|
|||
|
||||
use super::slot_identifier::SlotIdentifier;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct Account {
|
||||
slot_identifier: SlotIdentifier,
|
||||
pubkey: Pubkey,
|
||||
data: Vec<u8>,
|
||||
}
|
||||
pub slot_identifier: SlotIdentifier,
|
||||
pub pubkey: Pubkey,
|
||||
pub data: Vec<u8>,
|
||||
}
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
pub mod account;
|
||||
pub mod slot_identifier;
|
||||
pub mod slot_identifier;
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use solana_sdk::hash::Hash;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct SlotIdentifier {
|
||||
slot: u64,
|
||||
blockhash : Hash,
|
||||
}
|
||||
pub slot: u64,
|
||||
pub blockhash: Hash,
|
||||
}
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
use std::{fs::read_to_string, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, path::Path};
|
||||
use std::{
|
||||
fs::read_to_string,
|
||||
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use agave_geyser_plugin_interface::geyser_plugin_interface::GeyserPluginError;
|
||||
use quic_geyser_common::compression::CompressionType;
|
||||
|
@ -31,18 +35,17 @@ pub struct ConfigQuicPlugin {
|
|||
#[serde(default = "ConfigQuicPlugin::default_address")]
|
||||
pub address: SocketAddr,
|
||||
#[serde(default)]
|
||||
pub quic_parameters : QuicParameters,
|
||||
pub quic_parameters: QuicParameters,
|
||||
#[serde(default)]
|
||||
pub compression_parameters: CompressionParameters,
|
||||
}
|
||||
|
||||
impl ConfigQuicPlugin {
|
||||
fn default_address()-> SocketAddr {
|
||||
fn default_address() -> SocketAddr {
|
||||
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 10800))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct QuicParameters {
|
||||
pub max_number_of_streams_per_client: u64,
|
||||
|
@ -50,8 +53,8 @@ pub struct QuicParameters {
|
|||
|
||||
impl Default for QuicParameters {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_number_of_streams_per_client: 4096
|
||||
Self {
|
||||
max_number_of_streams_per_client: 4096,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -59,4 +62,4 @@ impl Default for QuicParameters {
|
|||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct CompressionParameters {
|
||||
compression_type: CompressionType,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
pub mod config;
|
||||
pub mod quic;
|
||||
pub mod config;
|
|
@ -1 +1 @@
|
|||
mod quic_server;
|
||||
mod quic_server;
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
|
Loading…
Reference in New Issue