diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index e3030d2555..ac32888bf7 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -172,23 +172,12 @@ mod tests { let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }; - cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), - 8080, - ))); - cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), - 8080, - ))); - cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), - 8080, - ))); - cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(192, 168, 1, 4)), - 8080, - ))); - + for k in 1..5 { + cluster.insert_info(ContactInfo::new_with_socketaddr( + &Keypair::new().pubkey(), + &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, k)), 8080), + )); + } let tvu_peers1 = cluster.tvu_peers(); (0..5).for_each(|_| { cluster diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index c99322516a..a0e7c65578 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -1,13 +1,16 @@ pub use crate::legacy_contact_info::LegacyContactInfo; use { crate::crds_value::MAX_WALLCLOCK, - matches::debug_assert_matches, + matches::{assert_matches, debug_assert_matches}, serde::{Deserialize, Deserializer, Serialize}, solana_sdk::{ pubkey::Pubkey, + quic::QUIC_PORT_OFFSET, + rpc_port::{DEFAULT_RPC_PORT, DEFAULT_RPC_PUBSUB_PORT}, sanitize::{Sanitize, SanitizeError}, serde_varint, short_vec, }, + solana_streamer::socket::SocketAddrSpace, static_assertions::const_assert_eq, std::{ collections::HashSet, @@ -42,6 +45,8 @@ pub enum Error { InvalidIpAddrIndex { index: u8, num_addrs: usize }, #[error("Invalid port: {0}")] InvalidPort(/*port:*/ u16), + #[error("Invalid {0:?} (udp) and {1:?} (quic) sockets")] + InvalidQuicSocket(Option, Option), #[error("IP addresses saturated")] IpAddrsSaturated, #[error("Multicast IP address: {0}")] @@ -112,6 +117,42 @@ macro_rules! get_socket { }; } +macro_rules! set_socket { + ($name:ident, $key:ident) => { + pub fn $name(&mut self, socket: T) -> Result<(), Error> + where + SocketAddr: From, + { + let socket = SocketAddr::from(socket); + self.set_socket($key, socket) + } + }; + ($name:ident, $key:ident, $quic:ident) => { + pub fn $name(&mut self, socket: T) -> Result<(), Error> + where + SocketAddr: From, + { + let socket = SocketAddr::from(socket); + self.set_socket($key, socket)?; + self.set_socket($quic, get_quic_socket(&socket)?) + } + }; +} + +macro_rules! remove_socket { + ($name:ident, $key:ident) => { + pub fn $name(&mut self) { + self.remove_socket($key); + } + }; + ($name:ident, $key:ident, $quic:ident) => { + pub fn $name(&mut self) { + self.remove_socket($key); + self.remove_socket($quic); + } + }; +} + impl ContactInfo { pub fn new(pubkey: Pubkey, wallclock: u64, shred_version: u16) -> Self { Self { @@ -131,15 +172,32 @@ impl ContactInfo { } #[inline] - pub(crate) fn pubkey(&self) -> &Pubkey { + pub fn pubkey(&self) -> &Pubkey { &self.pubkey } #[inline] - pub(crate) fn wallclock(&self) -> u64 { + pub fn wallclock(&self) -> u64 { self.wallclock } + #[inline] + pub fn shred_version(&self) -> u16 { + self.shred_version + } + + pub fn set_pubkey(&mut self, pubkey: Pubkey) { + self.pubkey = pubkey + } + + pub fn set_wallclock(&mut self, wallclock: u64) { + self.wallclock = wallclock; + } + + pub fn set_shred_version(&mut self, shred_version: u16) { + self.shred_version = shred_version + } + get_socket!(gossip, SOCKET_TAG_GOSSIP); get_socket!(repair, SOCKET_TAG_REPAIR); get_socket!(rpc, SOCKET_TAG_RPC); @@ -153,6 +211,31 @@ impl ContactInfo { get_socket!(tvu, SOCKET_TAG_TVU); get_socket!(tvu_forwards, SOCKET_TAG_TVU_FORWARDS); + set_socket!(set_gossip, SOCKET_TAG_GOSSIP); + set_socket!(set_repair, SOCKET_TAG_REPAIR); + set_socket!(set_rpc, SOCKET_TAG_RPC); + set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB); + set_socket!(set_serve_repair, SOCKET_TAG_SERVE_REPAIR); + set_socket!(set_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC); + set_socket!( + set_tpu_forwards, + SOCKET_TAG_TPU_FORWARDS, + SOCKET_TAG_TPU_FORWARDS_QUIC + ); + set_socket!(set_tpu_vote, SOCKET_TAG_TPU_VOTE); + set_socket!(set_tvu, SOCKET_TAG_TVU); + set_socket!(set_tvu_forwards, SOCKET_TAG_TVU_FORWARDS); + + remove_socket!(remove_serve_repair, SOCKET_TAG_SERVE_REPAIR); + remove_socket!(remove_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC); + remove_socket!( + remove_tpu_forwards, + SOCKET_TAG_TPU_FORWARDS, + SOCKET_TAG_TPU_FORWARDS_QUIC + ); + remove_socket!(remove_tvu, SOCKET_TAG_TVU); + remove_socket!(remove_tvu_forwards, SOCKET_TAG_TVU_FORWARDS); + #[cfg(test)] fn get_socket(&self, key: u8) -> Result { let mut port = 0u16; @@ -247,6 +330,51 @@ impl ContactInfo { } } } + + pub fn is_valid_address(addr: &SocketAddr, socket_addr_space: &SocketAddrSpace) -> bool { + LegacyContactInfo::is_valid_address(addr, socket_addr_space) + } + + // Only for tests and simulations. + pub fn new_localhost(pubkey: &Pubkey, wallclock: u64) -> Self { + let mut node = Self::new(*pubkey, wallclock, /*shred_version:*/ 0u16); + node.set_gossip((Ipv4Addr::LOCALHOST, 8000)).unwrap(); + node.set_tvu((Ipv4Addr::LOCALHOST, 8001)).unwrap(); + node.set_tvu_forwards((Ipv4Addr::LOCALHOST, 8002)).unwrap(); + node.set_repair((Ipv4Addr::LOCALHOST, 8007)).unwrap(); + node.set_tpu((Ipv4Addr::LOCALHOST, 8003)).unwrap(); // quic: 8009 + node.set_tpu_forwards((Ipv4Addr::LOCALHOST, 8004)).unwrap(); // quic: 8010 + node.set_tpu_vote((Ipv4Addr::LOCALHOST, 8005)).unwrap(); + node.set_rpc((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PORT)) + .unwrap(); + node.set_rpc_pubsub((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PUBSUB_PORT)) + .unwrap(); + node.set_serve_repair((Ipv4Addr::LOCALHOST, 8008)).unwrap(); + node + } + + // Only for tests and simulations. + pub fn new_with_socketaddr(pubkey: &Pubkey, socket: &SocketAddr) -> Self { + assert_matches!(sanitize_socket(socket), Ok(())); + let mut node = Self::new( + *pubkey, + solana_sdk::timing::timestamp(), // wallclock, + 0u16, // shred_version + ); + let (addr, port) = (socket.ip(), socket.port()); + node.set_gossip((addr, port + 1)).unwrap(); + node.set_tvu((addr, port + 2)).unwrap(); + node.set_tvu_forwards((addr, port + 3)).unwrap(); + node.set_repair((addr, port + 4)).unwrap(); + node.set_tpu((addr, port)).unwrap(); // quic: port + 6 + node.set_tpu_forwards((addr, port + 5)).unwrap(); // quic: port + 11 + node.set_tpu_vote((addr, port + 7)).unwrap(); + node.set_rpc((addr, DEFAULT_RPC_PORT)).unwrap(); + node.set_rpc_pubsub((addr, DEFAULT_RPC_PUBSUB_PORT)) + .unwrap(); + node.set_serve_repair((addr, port + 8)).unwrap(); + node + } } impl<'de> Deserialize<'de> for ContactInfo { @@ -313,6 +441,35 @@ impl Sanitize for ContactInfo { } } +impl TryFrom<&ContactInfo> for LegacyContactInfo { + type Error = Error; + + fn try_from(node: &ContactInfo) -> Result { + macro_rules! unwrap_socket { + ($name:ident) => { + node.$name().ok().unwrap_or_else(socket_addr_unspecified) + }; + } + sanitize_quic_offset(&node.tpu().ok(), &node.tpu_quic().ok())?; + sanitize_quic_offset(&node.tpu_forwards().ok(), &node.tpu_forwards_quic().ok())?; + Ok(Self { + id: *node.pubkey(), + gossip: unwrap_socket!(gossip), + tvu: unwrap_socket!(tvu), + tvu_forwards: unwrap_socket!(tvu_forwards), + repair: unwrap_socket!(repair), + tpu: unwrap_socket!(tpu), + tpu_forwards: unwrap_socket!(tpu_forwards), + tpu_vote: unwrap_socket!(tpu_vote), + rpc: unwrap_socket!(rpc), + rpc_pubsub: unwrap_socket!(rpc_pubsub), + serve_repair: unwrap_socket!(serve_repair), + wallclock: node.wallclock(), + shred_version: node.shred_version(), + }) + } +} + // Workaround until feature(const_socketaddr) is stable. fn socket_addr_unspecified() -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16) @@ -382,11 +539,33 @@ fn sanitize_entries(addrs: &[IpAddr], sockets: &[SocketEntry]) -> Result<(), Err Ok(()) } +// Verifies that the other socket is at QUIC_PORT_OFFSET from the first one. +fn sanitize_quic_offset( + socket: &Option, // udp + other: &Option, // quic: udp + QUIC_PORT_OFFSET +) -> Result<(), Error> { + (other == &socket.as_ref().map(get_quic_socket).transpose()?) + .then_some(()) + .ok_or(Error::InvalidQuicSocket(*socket, *other)) +} + +// Returns the socket at QUIC_PORT_OFFSET from the given one. +fn get_quic_socket(socket: &SocketAddr) -> Result { + Ok(SocketAddr::new( + socket.ip(), + socket + .port() + .checked_add(QUIC_PORT_OFFSET) + .ok_or_else(|| Error::InvalidPort(socket.port()))?, + )) +} + #[cfg(test)] mod tests { use { super::*, rand::{seq::SliceRandom, Rng}, + solana_sdk::signature::{Keypair, Signer}, std::{ collections::{HashMap, HashSet}, iter::repeat_with, @@ -421,6 +600,10 @@ mod tests { port.checked_shr(shift).unwrap_or_default() } + fn new_rand_socket(rng: &mut R) -> SocketAddr { + SocketAddr::new(new_rand_addr(rng), new_rand_port(rng)) + } + #[test] fn test_sanitize_entries() { let mut rng = rand::thread_rng(); @@ -616,4 +799,110 @@ mod tests { assert_eq!(node, other); } } + + fn cross_verify_with_legacy(node: &ContactInfo) { + let old = LegacyContactInfo::try_from(node).unwrap(); + assert_eq!(old.gossip, node.gossip().unwrap()); + assert_eq!(old.repair, node.repair().unwrap()); + assert_eq!(old.rpc, node.rpc().unwrap()); + assert_eq!(old.rpc_pubsub, node.rpc_pubsub().unwrap()); + assert_eq!(old.serve_repair, node.serve_repair().unwrap()); + assert_eq!(old.tpu, node.tpu().unwrap()); + assert_eq!(old.tpu_forwards, node.tpu_forwards().unwrap()); + assert_eq!( + node.tpu_forwards_quic().unwrap(), + SocketAddr::new( + old.tpu_forwards.ip(), + old.tpu_forwards.port() + QUIC_PORT_OFFSET + ) + ); + assert_eq!( + node.tpu_quic().unwrap(), + SocketAddr::new(old.tpu.ip(), old.tpu.port() + QUIC_PORT_OFFSET) + ); + assert_eq!(old.tpu_vote, node.tpu_vote().unwrap()); + assert_eq!(old.tvu, node.tvu().unwrap()); + assert_eq!(old.tvu_forwards, node.tvu_forwards().unwrap()); + } + + #[test] + fn test_new_localhost() { + let node = ContactInfo::new_localhost( + &Keypair::new().pubkey(), + solana_sdk::timing::timestamp(), // wallclock + ); + cross_verify_with_legacy(&node); + } + + #[test] + fn test_new_with_socketaddr() { + let mut rng = rand::thread_rng(); + let socket = repeat_with(|| new_rand_socket(&mut rng)) + .filter(|socket| matches!(sanitize_socket(socket), Ok(()))) + .find(|socket| socket.port().checked_add(11).is_some()) + .unwrap(); + let node = ContactInfo::new_with_socketaddr(&Keypair::new().pubkey(), &socket); + cross_verify_with_legacy(&node); + } + + #[test] + fn test_sanitize_quic_offset() { + let mut rng = rand::thread_rng(); + let socket = repeat_with(|| new_rand_socket(&mut rng)) + .filter(|socket| matches!(sanitize_socket(socket), Ok(()))) + .find(|socket| socket.port().checked_add(QUIC_PORT_OFFSET).is_some()) + .unwrap(); + let mut other = get_quic_socket(&socket).unwrap(); + assert_matches!(sanitize_quic_offset(&None, &None), Ok(())); + assert_matches!( + sanitize_quic_offset(&Some(socket), &None), + Err(Error::InvalidQuicSocket(_, _)) + ); + assert_matches!(sanitize_quic_offset(&Some(socket), &Some(other)), Ok(())); + assert_matches!( + sanitize_quic_offset(&Some(other), &Some(socket)), + Err(Error::InvalidQuicSocket(_, _)) + ); + other.set_ip(new_rand_addr(&mut rng)); + assert_matches!( + sanitize_quic_offset(&Some(socket), &Some(other)), + Err(Error::InvalidQuicSocket(_, _)) + ); + other.set_ip(socket.ip()); + assert_matches!(sanitize_quic_offset(&Some(socket), &Some(other)), Ok(())); + } + + #[test] + fn test_quic_socket() { + let mut rng = rand::thread_rng(); + let mut node = ContactInfo::new( + Keypair::new().pubkey(), + rng.gen(), // wallclock + rng.gen(), // shred_version + ); + let socket = repeat_with(|| new_rand_socket(&mut rng)) + .filter(|socket| matches!(sanitize_socket(socket), Ok(()))) + .find(|socket| socket.port().checked_add(QUIC_PORT_OFFSET).is_some()) + .unwrap(); + // TPU socket. + node.set_tpu(socket).unwrap(); + assert_eq!(node.tpu().unwrap(), socket); + assert_eq!( + node.tpu_quic().unwrap(), + SocketAddr::new(socket.ip(), socket.port() + QUIC_PORT_OFFSET) + ); + node.remove_tpu(); + assert_matches!(node.tpu(), Err(Error::InvalidPort(0))); + assert_matches!(node.tpu_quic(), Err(Error::InvalidPort(0))); + // TPU forwards socket. + node.set_tpu_forwards(socket).unwrap(); + assert_eq!(node.tpu_forwards().unwrap(), socket); + assert_eq!( + node.tpu_forwards_quic().unwrap(), + SocketAddr::new(socket.ip(), socket.port() + QUIC_PORT_OFFSET) + ); + node.remove_tpu_forwards(); + assert_matches!(node.tpu_forwards(), Err(Error::InvalidPort(0))); + assert_matches!(node.tpu_forwards_quic(), Err(Error::InvalidPort(0))); + } } diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index 9f41c79d65..f478ce3b22 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -4,7 +4,6 @@ use { pubkey::Pubkey, rpc_port, sanitize::{Sanitize, SanitizeError}, - signature::{Keypair, Signer}, timing::timestamp, }, solana_streamer::socket::SocketAddrSpace, @@ -117,30 +116,8 @@ impl LegacyContactInfo { node } - #[cfg(test)] - /// LegacyContactInfo with multicast addresses for adversarial testing. - pub fn new_multicast() -> Self { - let addr = socketaddr!("224.0.1.255:1000"); - assert!(addr.ip().is_multicast()); - Self { - id: solana_sdk::pubkey::new_rand(), - gossip: addr, - tvu: addr, - tvu_forwards: addr, - repair: addr, - tpu: addr, - tpu_forwards: addr, - tpu_vote: addr, - rpc: addr, - rpc_pubsub: addr, - serve_repair: addr, - wallclock: 0, - shred_version: 0, - } - } - // Used in tests - pub fn new_with_pubkey_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self { + pub fn new_with_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self { fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { let mut nxt_addr = *addr; nxt_addr.set_port(addr.port() + nxt); @@ -174,12 +151,6 @@ impl LegacyContactInfo { } } - // Used in tests - pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self { - let keypair = Keypair::new(); - Self::new_with_pubkey_socketaddr(&keypair.pubkey(), bind_addr) - } - // Construct a LegacyContactInfo that's only usable for gossip pub fn new_gossip_entry_point(gossip_addr: &SocketAddr) -> Self { Self { @@ -208,7 +179,7 @@ impl LegacyContactInfo { (self.rpc, self.tpu) } - pub fn valid_client_facing_addr( + pub(crate) fn valid_client_facing_addr( &self, socket_addr_space: &SocketAddrSpace, ) -> Option<(SocketAddr, SocketAddr)> { @@ -224,7 +195,10 @@ impl LegacyContactInfo { #[cfg(test)] mod tests { - use super::*; + use { + super::*, + solana_sdk::signature::{Keypair, Signer}, + }; #[test] fn test_is_valid_address() { @@ -263,18 +237,7 @@ mod tests { assert!(ci.tpu_vote.ip().is_unspecified()); assert!(ci.serve_repair.ip().is_unspecified()); } - #[test] - fn test_multicast() { - let ci = LegacyContactInfo::new_multicast(); - assert!(ci.gossip.ip().is_multicast()); - assert!(ci.tvu.ip().is_multicast()); - assert!(ci.tpu_forwards.ip().is_multicast()); - assert!(ci.rpc.ip().is_multicast()); - assert!(ci.rpc_pubsub.ip().is_multicast()); - assert!(ci.tpu.ip().is_multicast()); - assert!(ci.tpu_vote.ip().is_multicast()); - assert!(ci.serve_repair.ip().is_multicast()); - } + #[test] fn test_entry_point() { let addr = socketaddr!("127.0.0.1:10"); @@ -291,7 +254,7 @@ mod tests { #[test] fn test_socketaddr() { let addr = socketaddr!("127.0.0.1:10"); - let ci = LegacyContactInfo::new_with_socketaddr(&addr); + let ci = LegacyContactInfo::new_with_socketaddr(&Keypair::new().pubkey(), &addr); assert_eq!(ci.tpu, addr); assert_eq!(ci.tpu_vote.port(), 17); assert_eq!(ci.gossip.port(), 11); @@ -305,7 +268,7 @@ mod tests { #[test] fn replayed_data_new_with_socketaddr_with_pubkey() { let keypair = Keypair::new(); - let d1 = LegacyContactInfo::new_with_pubkey_socketaddr( + let d1 = LegacyContactInfo::new_with_socketaddr( &keypair.pubkey(), &socketaddr!("127.0.0.1:1234"), ); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index d1a5e70975..415ebf91ca 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -4765,7 +4765,7 @@ pub mod tests { let validator_exit = create_validator_exit(&exit); let cluster_info = Arc::new(new_test_cluster_info()); let identity = cluster_info.id(); - cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( + cluster_info.insert_info(ContactInfo::new_with_socketaddr( &leader_pubkey, &socketaddr!("127.0.0.1:1234"), )); @@ -6384,11 +6384,8 @@ pub mod tests { io.extend_with(rpc_full::FullImpl.to_delegate()); let cluster_info = Arc::new({ let keypair = Arc::new(Keypair::new()); - let contact_info = ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let contact_info = ContactInfo { - id: keypair.pubkey(), - ..contact_info - }; + let contact_info = + ContactInfo::new_with_socketaddr(&keypair.pubkey(), &socketaddr!("127.0.0.1:1234")); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }); let tpu_address = cluster_info.my_contact_info().tpu;