repurposes tvu_forwards socket for TVU over QUIC (#32737)

LegacyContactInfo.tvu_forwards is unused.
Repurposing the field for TVU over QUIC will avoid QUIC_PORT_OFFSET hack
in a backward compatible way.
This commit is contained in:
behzad nouri 2023-08-07 22:02:41 +00:00 committed by GitHub
parent df31bc13db
commit b7c2ad5b67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 42 additions and 27 deletions

1
Cargo.lock generated
View File

@ -5948,7 +5948,6 @@ dependencies = [
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
"regex",
"rustc_version 0.4.0",
"serde",
"serde_bytes",

View File

@ -214,7 +214,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>;
/// Window protocol messages
#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize, strum_macros::Display)]
#[frozen_abi(digest = "HXKJuZAK4LsweUTRbsxEcG9jHA9JR9s8MYmmjx2Nb5X1")]
#[frozen_abi(digest = "DPHju3YufeNw1qfr22ZWRgJdXb1TvZt8iwLqWXUTyrtW")]
pub enum RepairProtocol {
LegacyWindowIndex(LegacyContactInfo, Slot, u64),
LegacyHighestWindowIndex(LegacyContactInfo, Slot, u64),
@ -1911,6 +1911,7 @@ mod tests {
);
nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap();
nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu_quic((Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_repair((Ipv4Addr::LOCALHOST, 1237)).unwrap();
nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap();
nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap();
@ -1941,6 +1942,7 @@ mod tests {
);
nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap();
nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu_quic((Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_repair((Ipv4Addr::LOCALHOST, 1237)).unwrap();
nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap();
nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap();

View File

@ -52,7 +52,6 @@ thiserror = { workspace = true }
[dev-dependencies]
num_cpus = { workspace = true }
regex = { workspace = true }
serial_test = { workspace = true }
[build-dependencies]

View File

@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "Ctxue3UVFXXqnHoMVAPmfBoCy3Cyg7gNCYBY7Cg9P3so")]
#[frozen_abi(digest = "3U6DqJ4X4UE1DxRP1sbwP5QtyFxexMxzjLSKXXRDrt4q")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
@ -824,7 +824,7 @@ impl ClusterInfo {
}
let ip_addr = node.gossip().as_ref().map(SocketAddr::ip).ok();
Some(format!(
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
node.gossip()
.ok()
.filter(|addr| self.socket_addr_space.check(addr))
@ -846,6 +846,7 @@ impl ClusterInfo {
self.addr_to_string(&ip_addr, &node.tpu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::QUIC).ok()),
self.addr_to_string(&ip_addr, &node.repair().ok()),
self.addr_to_string(&ip_addr, &node.serve_repair().ok()),
node.shred_version(),
@ -856,9 +857,9 @@ impl ClusterInfo {
format!(
"IP Address |Age(ms)| Node identifier \
| Version |Gossip|TPUvote| TPU |TPUfwd| TVU |Repair|ServeR|ShredVer\n\
------------------+-------+---------------------------------------\
+---------+------+-------+------+------+------+------+------+--------\n\
| Version |Gossip|TPUvote| TPU |TPUfwd| TVU |TVU Q |Repair|ServeR|ShredVer\n\
------------------+-------+----------------------------------------------\
+---------+------+-------+------+------+------+------+------+------+--------\n\
{}\
Nodes: {}{}{}",
nodes.join(""),
@ -2826,8 +2827,8 @@ impl Node {
let (gossip_port, (gossip, ip_echo)) =
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
let ((_tvu_port, tvu), (_tvu_quic_port, tvu_quic)) =
bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
@ -2856,6 +2857,7 @@ impl Node {
}
set_socket!(set_gossip, gossip_addr, "gossip");
set_socket!(set_tvu, tvu.local_addr().unwrap(), "TVU");
set_socket!(set_tvu_quic, tvu_quic.local_addr().unwrap(), "TVU QUIC");
set_socket!(set_repair, repair.local_addr().unwrap(), "repair");
set_socket!(set_tpu, tpu.local_addr().unwrap(), "TPU");
set_socket!(
@ -2920,8 +2922,8 @@ impl Node {
) -> Self {
let (gossip_port, (gossip, ip_echo)) =
Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);
let ((tvu_port, tvu), (_tvu_quic_port, tvu_quic)) =
bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range);
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
@ -2952,6 +2954,7 @@ impl Node {
}
set_socket!(set_gossip, gossip_port, "gossip");
set_socket!(set_tvu, tvu_port, "TVU");
set_socket!(set_tvu_quic, tvu_quic_port, "TVU QUIC");
set_socket!(set_repair, repair_port, "repair");
set_socket!(set_tpu, tpu_port, "TPU");
set_socket!(set_tpu_forwards, tpu_forwards_port, "TPU-forwards");
@ -2995,10 +2998,7 @@ impl Node {
let (tvu_port, tvu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu multi_bind");
let (_tvu_port_quic, tvu_quic) = Self::bind(
bind_ip_addr,
(tvu_port + QUIC_PORT_OFFSET, tvu_port + QUIC_PORT_OFFSET + 1),
);
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
let (tpu_port, tpu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");
@ -3040,6 +3040,7 @@ impl Node {
let addr = gossip_addr.ip();
let _ = info.set_gossip((addr, gossip_port));
let _ = info.set_tvu((addr, tvu_port));
let _ = info.set_tvu_quic((addr, tvu_quic_port));
let _ = info.set_repair((addr, repair_port));
let _ = info.set_tpu(public_tpu_addr.unwrap_or_else(|| SocketAddr::new(addr, tpu_port)));
let _ = info.set_tpu_forwards(
@ -3618,6 +3619,7 @@ mod tests {
fn check_node_sockets(node: &Node, ip: IpAddr, range: (u16, u16)) {
check_socket(&node.sockets.gossip, ip, range);
check_socket(&node.sockets.repair, ip, range);
check_socket(&node.sockets.tvu_quic, ip, range);
check_sockets(&node.sockets.tvu, ip, range);
check_sockets(&node.sockets.tpu, ip, range);

View File

@ -240,7 +240,8 @@ impl ContactInfo {
SOCKET_TAG_TPU_FORWARDS_QUIC
);
set_socket!(set_tpu_vote, SOCKET_TAG_TPU_VOTE);
set_socket!(set_tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC);
set_socket!(set_tvu, SOCKET_TAG_TVU);
set_socket!(set_tvu_quic, SOCKET_TAG_TVU_QUIC);
remove_socket!(remove_serve_repair, SOCKET_TAG_SERVE_REPAIR);
remove_socket!(remove_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
@ -355,6 +356,7 @@ impl ContactInfo {
let mut node = Self::new(*pubkey, wallclock, /*shred_version:*/ 0u16);
node.set_gossip((Ipv4Addr::LOCALHOST, 8000)).unwrap();
node.set_tvu((Ipv4Addr::LOCALHOST, 8001)).unwrap();
node.set_tvu_quic((Ipv4Addr::LOCALHOST, 8002)).unwrap();
node.set_repair((Ipv4Addr::LOCALHOST, 8007)).unwrap();
node.set_tpu((Ipv4Addr::LOCALHOST, 8003)).unwrap(); // quic: 8009
node.set_tpu_forwards((Ipv4Addr::LOCALHOST, 8004)).unwrap(); // quic: 8010
@ -378,6 +380,7 @@ impl ContactInfo {
let (addr, port) = (socket.ip(), socket.port());
node.set_gossip((addr, port + 1)).unwrap();
node.set_tvu((addr, port + 2)).unwrap();
node.set_tvu_quic((addr, port + 3)).unwrap();
node.set_repair((addr, port + 4)).unwrap();
node.set_tpu((addr, port)).unwrap(); // quic: port + 6
node.set_tpu_forwards((addr, port + 5)).unwrap(); // quic: port + 11

View File

@ -25,8 +25,8 @@ pub struct LegacyContactInfo {
gossip: SocketAddr,
/// address to connect to for replication
tvu: SocketAddr,
/// address to forward shreds to
tvu_forwards: SocketAddr,
/// TVU over QUIC protocol.
tvu_quic: SocketAddr,
/// address to send repair responses to
repair: SocketAddr,
/// transactions address
@ -64,6 +64,16 @@ macro_rules! get_socket {
Ok(socket).copied()
}
};
($name:ident, $quic:ident) => {
pub fn $name(&self, protocol: Protocol) -> Result<SocketAddr, Error> {
let socket = match protocol {
Protocol::QUIC => &self.$quic,
Protocol::UDP => &self.$name,
};
sanitize_socket(socket)?;
Ok(socket).copied()
}
};
(@quic $name:ident) => {
pub fn $name(&self, protocol: Protocol) -> Result<SocketAddr, Error> {
let socket = &self.$name;
@ -113,7 +123,7 @@ impl Default for LegacyContactInfo {
id: Pubkey::default(),
gossip: socketaddr_any!(),
tvu: socketaddr_any!(),
tvu_forwards: socketaddr_any!(),
tvu_quic: socketaddr_any!(),
repair: socketaddr_any!(),
tpu: socketaddr_any!(),
tpu_forwards: socketaddr_any!(),
@ -133,7 +143,7 @@ impl LegacyContactInfo {
id: *id,
gossip: socketaddr!(Ipv4Addr::LOCALHOST, 1234),
tvu: socketaddr!(Ipv4Addr::LOCALHOST, 1235),
tvu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1236),
tvu_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1236),
repair: socketaddr!(Ipv4Addr::LOCALHOST, 1237),
tpu: socketaddr!(Ipv4Addr::LOCALHOST, 1238),
tpu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1239),
@ -195,7 +205,7 @@ impl LegacyContactInfo {
}
get_socket!(gossip);
get_socket!(@quic tvu);
get_socket!(tvu, tvu_quic);
get_socket!(repair);
get_socket!(@quic tpu);
get_socket!(@quic tpu_forwards);
@ -263,7 +273,7 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo {
id: *node.pubkey(),
gossip: unwrap_socket!(gossip),
tvu: unwrap_socket!(tvu, Protocol::UDP),
tvu_forwards: SOCKET_ADDR_UNSPECIFIED,
tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
repair: unwrap_socket!(repair),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),

View File

@ -16,7 +16,7 @@ EXPOSE 9900/tcp
EXPOSE 8000/udp
# gossip
EXPOSE 8001/udp
# tvu_forwards
# tvu_quic
EXPOSE 8002/udp
# tpu
EXPOSE 8003/udp

View File

@ -71,7 +71,7 @@ pub struct AdminRpcContactInfo {
pub id: String,
pub gossip: SocketAddr,
pub tvu: SocketAddr,
pub tvu_forwards: SocketAddr,
pub tvu_quic: SocketAddr,
pub repair: SocketAddr,
pub tpu: SocketAddr,
pub tpu_forwards: SocketAddr,
@ -103,7 +103,7 @@ impl From<ContactInfo> for AdminRpcContactInfo {
last_updated_timestamp: node.wallclock(),
gossip: unwrap_socket!(gossip),
tvu: unwrap_socket!(tvu, Protocol::UDP),
tvu_forwards: SOCKET_ADDR_UNSPECIFIED,
tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
repair: unwrap_socket!(repair),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
@ -121,7 +121,7 @@ impl Display for AdminRpcContactInfo {
writeln!(f, "Identity: {}", self.id)?;
writeln!(f, "Gossip: {}", self.gossip)?;
writeln!(f, "TVU: {}", self.tvu)?;
writeln!(f, "TVU Forwards: {}", self.tvu_forwards)?;
writeln!(f, "TVU QUIC: {}", self.tvu_quic)?;
writeln!(f, "Repair: {}", self.repair)?;
writeln!(f, "TPU: {}", self.tpu)?;
writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;