diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 7a219dc3ea..c070c32a10 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -23,7 +23,10 @@ use solana_poh::poh_recorder::WorkingBankEntry; use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::timing::timestamp; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; -use solana_streamer::sendmmsg::{batch_send, SendPktsError}; +use solana_streamer::{ + sendmmsg::{batch_send, SendPktsError}, + socket::is_global, +}; use std::sync::atomic::AtomicU64; use std::{ collections::HashMap, @@ -415,7 +418,11 @@ pub fn broadcast_shreds( .filter_map(|shred| { let seed = shred.seed(Some(self_pubkey), &root_bank); let node = cluster_nodes.get_broadcast_peer(seed)?; - Some((&shred.payload[..], &node.tvu)) + if is_global(&node.tvu) { + Some((&shred.payload[..], &node.tvu)) + } else { + None + } }) .collect(); shred_select.stop(); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 544c09983a..45dcdb7b2e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -64,6 +64,7 @@ use { solana_streamer::{ packet, sendmmsg::{multi_target_send, SendPktsError}, + socket::is_global, streamer::{PacketReceiver, PacketSender}, }, solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY, @@ -1178,7 +1179,7 @@ impl ClusterInfo { .filter(|node| { node.id != self_pubkey && node.shred_version == self_shred_version - && ContactInfo::is_valid_address(&node.tvu) + && ContactInfo::is_valid_tvu_address(&node.tvu) }) .cloned() .collect() @@ -1235,9 +1236,14 @@ impl ClusterInfo { .iter() .map(|peer| &peer.tvu_forwards) .filter(|addr| ContactInfo::is_valid_address(addr)) + .filter(|addr| is_global(addr)) .collect() } else { - peers.iter().map(|peer| &peer.tvu).collect() + peers + .iter() + .map(|peer| &peer.tvu) + .filter(|addr| is_global(addr)) + .collect() }; let data = &packet.data[..packet.meta.size]; diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 0143c0b3a6..721e80be22 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -193,10 +193,17 @@ impl ContactInfo { /// port must not be 0 /// ip must be specified and not multicast /// loopback ip is only allowed in tests - pub fn is_valid_address(addr: &SocketAddr) -> bool { + // Keeping this for now not to break tvu-peers and turbine shuffle order of + // nodes when arranging nodes on retransmit tree. Private IP addresses in + // turbine are filtered out just before sending packets. + pub(crate) fn is_valid_tvu_address(addr: &SocketAddr) -> bool { (addr.port() != 0) && Self::is_valid_ip(addr.ip()) } + pub fn is_valid_address(addr: &SocketAddr) -> bool { + Self::is_valid_tvu_address(addr) && solana_streamer::socket::is_global(addr) + } + pub fn client_facing_addr(&self) -> (SocketAddr, SocketAddr) { (self.rpc, self.tpu) } diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index e0b9bd7176..c8ba5f6d4b 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -2,6 +2,7 @@ pub mod packet; pub mod recvmmsg; pub mod sendmmsg; +pub mod socket; pub mod streamer; #[macro_use] diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index f8837d3b43..faa67237d5 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -1,5 +1,8 @@ //! The `packet` module defines data structures and methods to pull data from the network. -use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; +use crate::{ + recvmmsg::{recv_mmsg, NUM_RCVMMSGS}, + socket::is_global, +}; pub use solana_perf::packet::{ limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS, PACKETS_PER_BATCH, @@ -56,8 +59,10 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Res pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> { for p in &obj.packets { - let a = p.meta.addr(); - socket.send_to(&p.data[..p.meta.size], &a)?; + let addr = p.meta.addr(); + if is_global(&addr) { + socket.send_to(&p.data[..p.meta.size], &addr)?; + } } Ok(()) } diff --git a/streamer/src/socket.rs b/streamer/src/socket.rs new file mode 100644 index 0000000000..08d40f171b --- /dev/null +++ b/streamer/src/socket.rs @@ -0,0 +1,28 @@ +use std::net::SocketAddr; + +// TODO: remove these once IpAddr::is_global is stable. + +#[cfg(test)] +pub fn is_global(_: &SocketAddr) -> bool { + true +} + +#[cfg(not(test))] +pub fn is_global(addr: &SocketAddr) -> bool { + use std::net::IpAddr; + + match addr.ip() { + IpAddr::V4(addr) => { + // TODO: Consider excluding: + // addr.is_loopback() || addr.is_link_local() + // || addr.is_broadcast() || addr.is_documentation() + // || addr.is_unspecified() + !addr.is_private() + } + IpAddr::V6(_) => { + // TODO: Consider excluding: + // addr.is_loopback() || addr.is_unspecified(), + true + } + } +}