expands api parity between the new and the legacy contact-info (#30038)

Working towards replacing the legacy contact-info with the new one, the
commit expands api compatibility between the two.
This commit is contained in:
behzad nouri 2023-02-01 13:07:42 +00:00 committed by GitHub
parent 776ec1682e
commit ffc9c90cb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 310 additions and 72 deletions

View File

@ -172,23 +172,12 @@ mod tests {
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
};
cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8080,
)));
cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
8080,
)));
cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
8080,
)));
cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 4)),
8080,
)));
for k in 1..5 {
cluster.insert_info(ContactInfo::new_with_socketaddr(
&Keypair::new().pubkey(),
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, k)), 8080),
));
}
let tvu_peers1 = cluster.tvu_peers();
(0..5).for_each(|_| {
cluster

View File

@ -1,13 +1,16 @@
pub use crate::legacy_contact_info::LegacyContactInfo;
use {
crate::crds_value::MAX_WALLCLOCK,
matches::debug_assert_matches,
matches::{assert_matches, debug_assert_matches},
serde::{Deserialize, Deserializer, Serialize},
solana_sdk::{
pubkey::Pubkey,
quic::QUIC_PORT_OFFSET,
rpc_port::{DEFAULT_RPC_PORT, DEFAULT_RPC_PUBSUB_PORT},
sanitize::{Sanitize, SanitizeError},
serde_varint, short_vec,
},
solana_streamer::socket::SocketAddrSpace,
static_assertions::const_assert_eq,
std::{
collections::HashSet,
@ -42,6 +45,8 @@ pub enum Error {
InvalidIpAddrIndex { index: u8, num_addrs: usize },
#[error("Invalid port: {0}")]
InvalidPort(/*port:*/ u16),
#[error("Invalid {0:?} (udp) and {1:?} (quic) sockets")]
InvalidQuicSocket(Option<SocketAddr>, Option<SocketAddr>),
#[error("IP addresses saturated")]
IpAddrsSaturated,
#[error("Multicast IP address: {0}")]
@ -112,6 +117,42 @@ macro_rules! get_socket {
};
}
macro_rules! set_socket {
($name:ident, $key:ident) => {
pub fn $name<T>(&mut self, socket: T) -> Result<(), Error>
where
SocketAddr: From<T>,
{
let socket = SocketAddr::from(socket);
self.set_socket($key, socket)
}
};
($name:ident, $key:ident, $quic:ident) => {
pub fn $name<T>(&mut self, socket: T) -> Result<(), Error>
where
SocketAddr: From<T>,
{
let socket = SocketAddr::from(socket);
self.set_socket($key, socket)?;
self.set_socket($quic, get_quic_socket(&socket)?)
}
};
}
macro_rules! remove_socket {
($name:ident, $key:ident) => {
pub fn $name(&mut self) {
self.remove_socket($key);
}
};
($name:ident, $key:ident, $quic:ident) => {
pub fn $name(&mut self) {
self.remove_socket($key);
self.remove_socket($quic);
}
};
}
impl ContactInfo {
pub fn new(pubkey: Pubkey, wallclock: u64, shred_version: u16) -> Self {
Self {
@ -131,15 +172,32 @@ impl ContactInfo {
}
#[inline]
pub(crate) fn pubkey(&self) -> &Pubkey {
pub fn pubkey(&self) -> &Pubkey {
&self.pubkey
}
#[inline]
pub(crate) fn wallclock(&self) -> u64 {
pub fn wallclock(&self) -> u64 {
self.wallclock
}
#[inline]
pub fn shred_version(&self) -> u16 {
self.shred_version
}
pub fn set_pubkey(&mut self, pubkey: Pubkey) {
self.pubkey = pubkey
}
pub fn set_wallclock(&mut self, wallclock: u64) {
self.wallclock = wallclock;
}
pub fn set_shred_version(&mut self, shred_version: u16) {
self.shred_version = shred_version
}
get_socket!(gossip, SOCKET_TAG_GOSSIP);
get_socket!(repair, SOCKET_TAG_REPAIR);
get_socket!(rpc, SOCKET_TAG_RPC);
@ -153,6 +211,31 @@ impl ContactInfo {
get_socket!(tvu, SOCKET_TAG_TVU);
get_socket!(tvu_forwards, SOCKET_TAG_TVU_FORWARDS);
set_socket!(set_gossip, SOCKET_TAG_GOSSIP);
set_socket!(set_repair, SOCKET_TAG_REPAIR);
set_socket!(set_rpc, SOCKET_TAG_RPC);
set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
set_socket!(set_serve_repair, SOCKET_TAG_SERVE_REPAIR);
set_socket!(set_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
set_socket!(
set_tpu_forwards,
SOCKET_TAG_TPU_FORWARDS,
SOCKET_TAG_TPU_FORWARDS_QUIC
);
set_socket!(set_tpu_vote, SOCKET_TAG_TPU_VOTE);
set_socket!(set_tvu, SOCKET_TAG_TVU);
set_socket!(set_tvu_forwards, SOCKET_TAG_TVU_FORWARDS);
remove_socket!(remove_serve_repair, SOCKET_TAG_SERVE_REPAIR);
remove_socket!(remove_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
remove_socket!(
remove_tpu_forwards,
SOCKET_TAG_TPU_FORWARDS,
SOCKET_TAG_TPU_FORWARDS_QUIC
);
remove_socket!(remove_tvu, SOCKET_TAG_TVU);
remove_socket!(remove_tvu_forwards, SOCKET_TAG_TVU_FORWARDS);
#[cfg(test)]
fn get_socket(&self, key: u8) -> Result<SocketAddr, Error> {
let mut port = 0u16;
@ -247,6 +330,51 @@ impl ContactInfo {
}
}
}
pub fn is_valid_address(addr: &SocketAddr, socket_addr_space: &SocketAddrSpace) -> bool {
LegacyContactInfo::is_valid_address(addr, socket_addr_space)
}
// Only for tests and simulations.
pub fn new_localhost(pubkey: &Pubkey, wallclock: u64) -> Self {
let mut node = Self::new(*pubkey, wallclock, /*shred_version:*/ 0u16);
node.set_gossip((Ipv4Addr::LOCALHOST, 8000)).unwrap();
node.set_tvu((Ipv4Addr::LOCALHOST, 8001)).unwrap();
node.set_tvu_forwards((Ipv4Addr::LOCALHOST, 8002)).unwrap();
node.set_repair((Ipv4Addr::LOCALHOST, 8007)).unwrap();
node.set_tpu((Ipv4Addr::LOCALHOST, 8003)).unwrap(); // quic: 8009
node.set_tpu_forwards((Ipv4Addr::LOCALHOST, 8004)).unwrap(); // quic: 8010
node.set_tpu_vote((Ipv4Addr::LOCALHOST, 8005)).unwrap();
node.set_rpc((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PORT))
.unwrap();
node.set_rpc_pubsub((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((Ipv4Addr::LOCALHOST, 8008)).unwrap();
node
}
// Only for tests and simulations.
pub fn new_with_socketaddr(pubkey: &Pubkey, socket: &SocketAddr) -> Self {
assert_matches!(sanitize_socket(socket), Ok(()));
let mut node = Self::new(
*pubkey,
solana_sdk::timing::timestamp(), // wallclock,
0u16, // shred_version
);
let (addr, port) = (socket.ip(), socket.port());
node.set_gossip((addr, port + 1)).unwrap();
node.set_tvu((addr, port + 2)).unwrap();
node.set_tvu_forwards((addr, port + 3)).unwrap();
node.set_repair((addr, port + 4)).unwrap();
node.set_tpu((addr, port)).unwrap(); // quic: port + 6
node.set_tpu_forwards((addr, port + 5)).unwrap(); // quic: port + 11
node.set_tpu_vote((addr, port + 7)).unwrap();
node.set_rpc((addr, DEFAULT_RPC_PORT)).unwrap();
node.set_rpc_pubsub((addr, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((addr, port + 8)).unwrap();
node
}
}
impl<'de> Deserialize<'de> for ContactInfo {
@ -313,6 +441,35 @@ impl Sanitize for ContactInfo {
}
}
impl TryFrom<&ContactInfo> for LegacyContactInfo {
type Error = Error;
fn try_from(node: &ContactInfo) -> Result<Self, Self::Error> {
macro_rules! unwrap_socket {
($name:ident) => {
node.$name().ok().unwrap_or_else(socket_addr_unspecified)
};
}
sanitize_quic_offset(&node.tpu().ok(), &node.tpu_quic().ok())?;
sanitize_quic_offset(&node.tpu_forwards().ok(), &node.tpu_forwards_quic().ok())?;
Ok(Self {
id: *node.pubkey(),
gossip: unwrap_socket!(gossip),
tvu: unwrap_socket!(tvu),
tvu_forwards: unwrap_socket!(tvu_forwards),
repair: unwrap_socket!(repair),
tpu: unwrap_socket!(tpu),
tpu_forwards: unwrap_socket!(tpu_forwards),
tpu_vote: unwrap_socket!(tpu_vote),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),
serve_repair: unwrap_socket!(serve_repair),
wallclock: node.wallclock(),
shred_version: node.shred_version(),
})
}
}
// Workaround until feature(const_socketaddr) is stable.
fn socket_addr_unspecified() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16)
@ -382,11 +539,33 @@ fn sanitize_entries(addrs: &[IpAddr], sockets: &[SocketEntry]) -> Result<(), Err
Ok(())
}
// Verifies that the other socket is at QUIC_PORT_OFFSET from the first one.
fn sanitize_quic_offset(
socket: &Option<SocketAddr>, // udp
other: &Option<SocketAddr>, // quic: udp + QUIC_PORT_OFFSET
) -> Result<(), Error> {
(other == &socket.as_ref().map(get_quic_socket).transpose()?)
.then_some(())
.ok_or(Error::InvalidQuicSocket(*socket, *other))
}
// Returns the socket at QUIC_PORT_OFFSET from the given one.
fn get_quic_socket(socket: &SocketAddr) -> Result<SocketAddr, Error> {
Ok(SocketAddr::new(
socket.ip(),
socket
.port()
.checked_add(QUIC_PORT_OFFSET)
.ok_or_else(|| Error::InvalidPort(socket.port()))?,
))
}
#[cfg(test)]
mod tests {
use {
super::*,
rand::{seq::SliceRandom, Rng},
solana_sdk::signature::{Keypair, Signer},
std::{
collections::{HashMap, HashSet},
iter::repeat_with,
@ -421,6 +600,10 @@ mod tests {
port.checked_shr(shift).unwrap_or_default()
}
fn new_rand_socket<R: Rng>(rng: &mut R) -> SocketAddr {
SocketAddr::new(new_rand_addr(rng), new_rand_port(rng))
}
#[test]
fn test_sanitize_entries() {
let mut rng = rand::thread_rng();
@ -616,4 +799,110 @@ mod tests {
assert_eq!(node, other);
}
}
fn cross_verify_with_legacy(node: &ContactInfo) {
let old = LegacyContactInfo::try_from(node).unwrap();
assert_eq!(old.gossip, node.gossip().unwrap());
assert_eq!(old.repair, node.repair().unwrap());
assert_eq!(old.rpc, node.rpc().unwrap());
assert_eq!(old.rpc_pubsub, node.rpc_pubsub().unwrap());
assert_eq!(old.serve_repair, node.serve_repair().unwrap());
assert_eq!(old.tpu, node.tpu().unwrap());
assert_eq!(old.tpu_forwards, node.tpu_forwards().unwrap());
assert_eq!(
node.tpu_forwards_quic().unwrap(),
SocketAddr::new(
old.tpu_forwards.ip(),
old.tpu_forwards.port() + QUIC_PORT_OFFSET
)
);
assert_eq!(
node.tpu_quic().unwrap(),
SocketAddr::new(old.tpu.ip(), old.tpu.port() + QUIC_PORT_OFFSET)
);
assert_eq!(old.tpu_vote, node.tpu_vote().unwrap());
assert_eq!(old.tvu, node.tvu().unwrap());
assert_eq!(old.tvu_forwards, node.tvu_forwards().unwrap());
}
#[test]
fn test_new_localhost() {
let node = ContactInfo::new_localhost(
&Keypair::new().pubkey(),
solana_sdk::timing::timestamp(), // wallclock
);
cross_verify_with_legacy(&node);
}
#[test]
fn test_new_with_socketaddr() {
let mut rng = rand::thread_rng();
let socket = repeat_with(|| new_rand_socket(&mut rng))
.filter(|socket| matches!(sanitize_socket(socket), Ok(())))
.find(|socket| socket.port().checked_add(11).is_some())
.unwrap();
let node = ContactInfo::new_with_socketaddr(&Keypair::new().pubkey(), &socket);
cross_verify_with_legacy(&node);
}
#[test]
fn test_sanitize_quic_offset() {
let mut rng = rand::thread_rng();
let socket = repeat_with(|| new_rand_socket(&mut rng))
.filter(|socket| matches!(sanitize_socket(socket), Ok(())))
.find(|socket| socket.port().checked_add(QUIC_PORT_OFFSET).is_some())
.unwrap();
let mut other = get_quic_socket(&socket).unwrap();
assert_matches!(sanitize_quic_offset(&None, &None), Ok(()));
assert_matches!(
sanitize_quic_offset(&Some(socket), &None),
Err(Error::InvalidQuicSocket(_, _))
);
assert_matches!(sanitize_quic_offset(&Some(socket), &Some(other)), Ok(()));
assert_matches!(
sanitize_quic_offset(&Some(other), &Some(socket)),
Err(Error::InvalidQuicSocket(_, _))
);
other.set_ip(new_rand_addr(&mut rng));
assert_matches!(
sanitize_quic_offset(&Some(socket), &Some(other)),
Err(Error::InvalidQuicSocket(_, _))
);
other.set_ip(socket.ip());
assert_matches!(sanitize_quic_offset(&Some(socket), &Some(other)), Ok(()));
}
#[test]
fn test_quic_socket() {
let mut rng = rand::thread_rng();
let mut node = ContactInfo::new(
Keypair::new().pubkey(),
rng.gen(), // wallclock
rng.gen(), // shred_version
);
let socket = repeat_with(|| new_rand_socket(&mut rng))
.filter(|socket| matches!(sanitize_socket(socket), Ok(())))
.find(|socket| socket.port().checked_add(QUIC_PORT_OFFSET).is_some())
.unwrap();
// TPU socket.
node.set_tpu(socket).unwrap();
assert_eq!(node.tpu().unwrap(), socket);
assert_eq!(
node.tpu_quic().unwrap(),
SocketAddr::new(socket.ip(), socket.port() + QUIC_PORT_OFFSET)
);
node.remove_tpu();
assert_matches!(node.tpu(), Err(Error::InvalidPort(0)));
assert_matches!(node.tpu_quic(), Err(Error::InvalidPort(0)));
// TPU forwards socket.
node.set_tpu_forwards(socket).unwrap();
assert_eq!(node.tpu_forwards().unwrap(), socket);
assert_eq!(
node.tpu_forwards_quic().unwrap(),
SocketAddr::new(socket.ip(), socket.port() + QUIC_PORT_OFFSET)
);
node.remove_tpu_forwards();
assert_matches!(node.tpu_forwards(), Err(Error::InvalidPort(0)));
assert_matches!(node.tpu_forwards_quic(), Err(Error::InvalidPort(0)));
}
}

View File

@ -4,7 +4,6 @@ use {
pubkey::Pubkey,
rpc_port,
sanitize::{Sanitize, SanitizeError},
signature::{Keypair, Signer},
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
@ -117,30 +116,8 @@ impl LegacyContactInfo {
node
}
#[cfg(test)]
/// LegacyContactInfo with multicast addresses for adversarial testing.
pub fn new_multicast() -> Self {
let addr = socketaddr!("224.0.1.255:1000");
assert!(addr.ip().is_multicast());
Self {
id: solana_sdk::pubkey::new_rand(),
gossip: addr,
tvu: addr,
tvu_forwards: addr,
repair: addr,
tpu: addr,
tpu_forwards: addr,
tpu_vote: addr,
rpc: addr,
rpc_pubsub: addr,
serve_repair: addr,
wallclock: 0,
shred_version: 0,
}
}
// Used in tests
pub fn new_with_pubkey_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self {
pub fn new_with_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self {
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut nxt_addr = *addr;
nxt_addr.set_port(addr.port() + nxt);
@ -174,12 +151,6 @@ impl LegacyContactInfo {
}
}
// Used in tests
pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self {
let keypair = Keypair::new();
Self::new_with_pubkey_socketaddr(&keypair.pubkey(), bind_addr)
}
// Construct a LegacyContactInfo that's only usable for gossip
pub fn new_gossip_entry_point(gossip_addr: &SocketAddr) -> Self {
Self {
@ -208,7 +179,7 @@ impl LegacyContactInfo {
(self.rpc, self.tpu)
}
pub fn valid_client_facing_addr(
pub(crate) fn valid_client_facing_addr(
&self,
socket_addr_space: &SocketAddrSpace,
) -> Option<(SocketAddr, SocketAddr)> {
@ -224,7 +195,10 @@ impl LegacyContactInfo {
#[cfg(test)]
mod tests {
use super::*;
use {
super::*,
solana_sdk::signature::{Keypair, Signer},
};
#[test]
fn test_is_valid_address() {
@ -263,18 +237,7 @@ mod tests {
assert!(ci.tpu_vote.ip().is_unspecified());
assert!(ci.serve_repair.ip().is_unspecified());
}
#[test]
fn test_multicast() {
let ci = LegacyContactInfo::new_multicast();
assert!(ci.gossip.ip().is_multicast());
assert!(ci.tvu.ip().is_multicast());
assert!(ci.tpu_forwards.ip().is_multicast());
assert!(ci.rpc.ip().is_multicast());
assert!(ci.rpc_pubsub.ip().is_multicast());
assert!(ci.tpu.ip().is_multicast());
assert!(ci.tpu_vote.ip().is_multicast());
assert!(ci.serve_repair.ip().is_multicast());
}
#[test]
fn test_entry_point() {
let addr = socketaddr!("127.0.0.1:10");
@ -291,7 +254,7 @@ mod tests {
#[test]
fn test_socketaddr() {
let addr = socketaddr!("127.0.0.1:10");
let ci = LegacyContactInfo::new_with_socketaddr(&addr);
let ci = LegacyContactInfo::new_with_socketaddr(&Keypair::new().pubkey(), &addr);
assert_eq!(ci.tpu, addr);
assert_eq!(ci.tpu_vote.port(), 17);
assert_eq!(ci.gossip.port(), 11);
@ -305,7 +268,7 @@ mod tests {
#[test]
fn replayed_data_new_with_socketaddr_with_pubkey() {
let keypair = Keypair::new();
let d1 = LegacyContactInfo::new_with_pubkey_socketaddr(
let d1 = LegacyContactInfo::new_with_socketaddr(
&keypair.pubkey(),
&socketaddr!("127.0.0.1:1234"),
);

View File

@ -4765,7 +4765,7 @@ pub mod tests {
let validator_exit = create_validator_exit(&exit);
let cluster_info = Arc::new(new_test_cluster_info());
let identity = cluster_info.id();
cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr(
cluster_info.insert_info(ContactInfo::new_with_socketaddr(
&leader_pubkey,
&socketaddr!("127.0.0.1:1234"),
));
@ -6384,11 +6384,8 @@ pub mod tests {
io.extend_with(rpc_full::FullImpl.to_delegate());
let cluster_info = Arc::new({
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let contact_info = ContactInfo {
id: keypair.pubkey(),
..contact_info
};
let contact_info =
ContactInfo::new_with_socketaddr(&keypair.pubkey(), &socketaddr!("127.0.0.1:1234"));
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
});
let tpu_address = cluster_info.my_contact_info().tpu;