client: Add nonblocking udp client and connection trait (#25775)

* client: Add nonblocking udp client

* Address feedback
This commit is contained in:
Jon Cinque 2022-06-06 19:28:31 +02:00 committed by GitHub
parent 136eb43f7d
commit 67a11ce4b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 175 additions and 19 deletions

View File

@ -1,8 +1,9 @@
use {
crate::{
quic_client::QuicTpuConnection, tpu_connection::ClientStats, udp_client::UdpTpuConnection,
quic_client::QuicTpuConnection,
tpu_connection::{ClientStats, Connection},
udp_client::UdpTpuConnection,
},
enum_dispatch::enum_dispatch,
indexmap::map::IndexMap,
lazy_static::lazy_static,
rand::{thread_rng, Rng},
@ -20,12 +21,6 @@ use {
// Should be non-zero
static MAX_CONNECTIONS: usize = 1024;
#[enum_dispatch(TpuConnection)]
pub enum Connection {
UdpTpuConnection,
QuicTpuConnection,
}
#[derive(Default)]
pub struct ConnectionCacheStats {
cache_hits: AtomicU64,

View File

@ -2,3 +2,5 @@ pub mod pubsub_client;
pub mod quic_client;
pub mod rpc_client;
pub mod tpu_client;
pub mod tpu_connection;
pub mod udp_client;

View File

@ -0,0 +1,40 @@
//! Trait defining async send functions, to be used for UDP or QUIC sending
use {
crate::nonblocking::udp_client::UdpTpuConnection,
async_trait::async_trait,
enum_dispatch::enum_dispatch,
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::net::SocketAddr,
};
// Due to the existence of `crate::connection_cache::Connection`, if this is named
// `Connection`, enum_dispatch gets confused between the two and throws errors when
// trying to convert later.
#[enum_dispatch]
pub enum NonblockingConnection {
UdpTpuConnection,
}
#[async_trait]
#[enum_dispatch(NonblockingConnection)]
pub trait TpuConnection {
fn tpu_addr(&self) -> &SocketAddr;
async fn serialize_and_send_transaction(
&self,
transaction: &VersionedTransaction,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(&wire_transaction).await
}
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
}

View File

@ -0,0 +1,115 @@
//! Simple UDP client that communicates with the given UDP port with UDP and provides
//! an interface for sending transactions
use {
crate::nonblocking::tpu_connection::TpuConnection,
async_trait::async_trait,
core::iter::repeat,
solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send,
std::net::{IpAddr, Ipv4Addr, SocketAddr},
tokio::net::UdpSocket,
};
pub struct UdpTpuConnection {
socket: UdpSocket,
addr: SocketAddr,
}
impl UdpTpuConnection {
pub fn new(tpu_addr: SocketAddr) -> Self {
let socket =
solana_net_utils::bind_in_validator_port_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.unwrap();
socket.set_nonblocking(true).unwrap();
Self::new_with_std_socket(tpu_addr, socket)
}
pub fn new_with_std_socket(tpu_addr: SocketAddr, socket: std::net::UdpSocket) -> Self {
let socket = UdpSocket::from_std(socket).unwrap();
Self {
socket,
addr: tpu_addr,
}
}
}
#[async_trait]
impl TpuConnection for UdpTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
&self.addr
}
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
self.socket
.send_to(wire_transaction.as_ref(), self.addr)
.await?;
Ok(())
}
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect();
batch_send(&self.socket, &pkts).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use {
crate::nonblocking::{tpu_connection::TpuConnection, udp_client::UdpTpuConnection},
solana_sdk::packet::{Packet, PACKET_DATA_SIZE},
solana_streamer::nonblocking::recvmmsg::recv_mmsg,
std::net::{IpAddr, Ipv4Addr},
tokio::net::UdpSocket,
};
async fn check_send_one(connection: &UdpTpuConnection, reader: &UdpSocket) {
let packet = vec![111u8; PACKET_DATA_SIZE];
connection.send_wire_transaction(&packet).await.unwrap();
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap();
assert_eq!(1, recv);
}
async fn check_send_batch(connection: &UdpTpuConnection, reader: &UdpSocket) {
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
connection
.send_wire_transaction_batch(&packets)
.await
.unwrap();
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap();
assert_eq!(32, recv);
}
#[tokio::test]
async fn test_send_from_addr() {
let addr_str = "0.0.0.0:50100";
let addr = addr_str.parse().unwrap();
let connection = UdpTpuConnection::new(addr);
let reader = UdpSocket::bind(addr_str).await.expect("bind");
check_send_one(&connection, &reader).await;
check_send_batch(&connection, &reader).await;
}
#[tokio::test]
async fn test_send_from_socket() {
let addr_str = "0.0.0.0:50101";
let addr = addr_str.parse().unwrap();
let socket =
solana_net_utils::bind_in_validator_port_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.unwrap();
socket.set_nonblocking(true).unwrap();
let connection = UdpTpuConnection::new_with_std_socket(addr, socket);
let reader = UdpSocket::bind(addr_str).await.expect("bind");
check_send_one(&connection, &reader).await;
check_send_batch(&connection, &reader).await;
}
}

View File

@ -1,7 +1,5 @@
use {
crate::{
connection_cache::Connection, quic_client::QuicTpuConnection, udp_client::UdpTpuConnection,
},
crate::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection},
enum_dispatch::enum_dispatch,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_metrics::MovingStat,
@ -26,6 +24,12 @@ pub struct ClientStats {
}
#[enum_dispatch]
pub enum Connection {
UdpTpuConnection,
QuicTpuConnection,
}
#[enum_dispatch(Connection)]
pub trait TpuConnection {
fn tpu_addr(&self) -> &SocketAddr;

View File

@ -4,7 +4,6 @@
use {
crate::{connection_cache::ConnectionCacheStats, tpu_connection::TpuConnection},
core::iter::repeat,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::transport::Result as TransportResult,
solana_streamer::sendmmsg::batch_send,
std::{
@ -20,14 +19,11 @@ pub struct UdpTpuConnection {
impl UdpTpuConnection {
pub fn new_from_addr(tpu_addr: SocketAddr) -> Self {
let (_, client_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.unwrap();
let socket =
solana_net_utils::bind_in_validator_port_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.unwrap();
Self {
socket: client_socket,
socket,
addr: tpu_addr,
}
}

View File

@ -439,6 +439,10 @@ pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpS
))
}
pub fn bind_in_validator_port_range(ip_addr: IpAddr) -> io::Result<UdpSocket> {
bind_in_range(ip_addr, VALIDATOR_PORT_RANGE).map(|(_, socket)| socket)
}
// binds many sockets to the same port in a range
pub fn multi_bind_in_range(
ip_addr: IpAddr,