adds socket address for repair service over QUIC (#32834)

Working towards migrating repair to QUIC.
This commit is contained in:
behzad nouri 2023-08-15 17:09:09 +00:00 committed by GitHub
parent e700dde617
commit 0de8ccfda9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 80 additions and 32 deletions

View File

@ -869,7 +869,7 @@ mod test {
},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
contact_info::{ContactInfo, Protocol},
},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce},
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
@ -1400,7 +1400,7 @@ mod test {
nonce,
);
if let Ok(request_bytes) = request_bytes {
let socket = responder_info.serve_repair().unwrap();
let socket = responder_info.serve_repair(Protocol::UDP).unwrap();
let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket);
}
}
@ -1470,7 +1470,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
@ -1511,7 +1511,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,
@ -1571,7 +1571,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,
@ -1947,7 +1947,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
@ -2010,7 +2010,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,

View File

@ -18,7 +18,7 @@ use {
},
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
legacy_contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo},
contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo, Protocol},
ping_pong::{self, PingCache, Pong},
weighted_shuffle::WeightedShuffle,
},
@ -214,7 +214,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>;
/// Window protocol messages
#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize, strum_macros::Display)]
#[frozen_abi(digest = "7vZyACjc13qQYWUsqWbdidLXR3uNXpmqUZaKeV3gKuY2")]
#[frozen_abi(digest = "3VzVe3kMrG6ijkVPyCGeJVA9hQjWcFEZbAQPc5Zizrjm")]
pub enum RepairProtocol {
LegacyWindowIndex(LegacyContactInfo, Slot, u64),
LegacyHighestWindowIndex(LegacyContactInfo, Slot, u64),
@ -350,7 +350,7 @@ impl RepairPeers {
.iter()
.zip(weights)
.filter_map(|(peer, &weight)| {
let addr = peer.serve_repair().ok()?;
let addr = peer.serve_repair(Protocol::UDP).ok()?;
Some(((*peer.pubkey(), addr), weight))
})
.unzip();
@ -1078,7 +1078,7 @@ impl ServeRepair {
.shuffle(&mut rand::thread_rng())
.map(|i| index[i])
.filter_map(|i| {
let addr = repair_peers[i].serve_repair().ok()?;
let addr = repair_peers[i].serve_repair(Protocol::UDP).ok()?;
Some((*repair_peers[i].pubkey(), addr))
})
.take(get_ancestor_hash_repair_sample_size())
@ -1102,7 +1102,10 @@ impl ServeRepair {
.unzip();
let k = WeightedIndex::new(weights)?.sample(&mut rand::thread_rng());
let n = index[k];
Ok((*repair_peers[n].pubkey(), repair_peers[n].serve_repair()?))
Ok((
*repair_peers[n].pubkey(),
repair_peers[n].serve_repair(Protocol::UDP)?,
))
}
pub(crate) fn map_repair_request(
@ -1930,8 +1933,8 @@ mod tests {
&identity_keypair,
)
.unwrap();
assert_eq!(nxt.serve_repair().unwrap(), serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair().unwrap());
assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair(Protocol::UDP).unwrap());
let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243);
let mut nxt = ContactInfo::new(

View File

@ -449,7 +449,9 @@ fn get_target(
Some((*node.pubkey(), node.tpu_forwards(protocol).unwrap()))
}
Mode::Repair => todo!("repair socket is not gossiped anymore!"),
Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())),
Mode::ServeRepair => {
Some((*node.pubkey(), node.serve_repair(Protocol::UDP).unwrap()))
}
Mode::Rpc => None,
};
break;

View File

@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "4jtxvWyeFwfDQTTGh4yJLyukALzRNVJ9WNnCbFeJUmaS")]
#[frozen_abi(digest = "6T2sn92PMrTijsgncH3bBZL4K5GUowb442cCw4y4DuwV")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
@ -847,7 +847,7 @@ impl ClusterInfo {
self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::QUIC).ok()),
self.addr_to_string(&ip_addr, &node.serve_repair().ok()),
self.addr_to_string(&ip_addr, &node.serve_repair(contact_info::Protocol::UDP).ok()),
node.shred_version(),
))
}
@ -1345,7 +1345,7 @@ impl ClusterInfo {
node.pubkey() != &self_pubkey
&& node.shred_version() == self_shred_version
&& self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP))
&& self.check_socket_addr_space(&node.serve_repair())
&& self.check_socket_addr_space(&node.serve_repair(contact_info::Protocol::UDP))
&& match gossip_crds.get::<&LowestSlot>(*node.pubkey()) {
None => true, // fallback to legacy behavior
Some(lowest_slot) => lowest_slot.lowest <= slot,
@ -2799,6 +2799,7 @@ pub struct Sockets {
pub repair: UdpSocket,
pub retransmit_sockets: Vec<UdpSocket>,
pub serve_repair: UdpSocket,
pub serve_repair_quic: UdpSocket,
pub ancestor_hashes_requests: UdpSocket,
pub tpu_quic: UdpSocket,
pub tpu_forwards_quic: UdpSocket,
@ -2839,6 +2840,7 @@ impl Node {
let broadcast = vec![UdpSocket::bind(&unspecified_bind_addr).unwrap()];
let retransmit_socket = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let mut info = ContactInfo::new(
@ -2871,6 +2873,11 @@ impl Node {
serve_repair.local_addr().unwrap(),
"serve-repair"
);
set_socket!(
set_serve_repair_quic,
serve_repair_quic.local_addr().unwrap(),
"serve-repair QUIC"
);
Node {
info,
sockets: Sockets {
@ -2885,6 +2892,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
serve_repair_quic,
ancestor_hashes_requests,
tpu_quic,
tpu_forwards_quic,
@ -2930,6 +2938,7 @@ impl Node {
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);
let (_, broadcast) = Self::bind(bind_ip_addr, port_range);
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);
@ -2959,6 +2968,11 @@ impl Node {
set_socket!(set_rpc, rpc_port, "RPC");
set_socket!(set_rpc_pubsub, rpc_pubsub_port, "RPC-pubsub");
set_socket!(set_serve_repair, serve_repair_port, "serve-repair");
set_socket!(
set_serve_repair_quic,
serve_repair_quic_port,
"serve-repair QUIC"
);
trace!("new ContactInfo: {:?}", info);
Node {
@ -2975,6 +2989,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
serve_repair_quic,
ancestor_hashes_requests,
tpu_quic,
tpu_forwards_quic,
@ -3023,6 +3038,7 @@ impl Node {
let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);
let (_, broadcast) =
multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind");
@ -3044,6 +3060,7 @@ impl Node {
);
let _ = info.set_tpu_vote((addr, tpu_vote_port));
let _ = info.set_serve_repair((addr, serve_repair_port));
let _ = info.set_serve_repair((addr, serve_repair_quic_port));
trace!("new ContactInfo: {:?}", info);
Node {
@ -3059,6 +3076,7 @@ impl Node {
repair,
retransmit_sockets,
serve_repair,
serve_repair_quic,
ip_echo: Some(ip_echo),
ancestor_hashes_requests,
tpu_quic,

View File

@ -29,6 +29,7 @@ const SOCKET_TAG_GOSSIP: u8 = 0;
const SOCKET_TAG_RPC: u8 = 2;
const SOCKET_TAG_RPC_PUBSUB: u8 = 3;
const SOCKET_TAG_SERVE_REPAIR: u8 = 4;
const SOCKET_TAG_SERVE_REPAIR_QUIC: u8 = 1;
const SOCKET_TAG_TPU: u8 = 5;
const SOCKET_TAG_TPU_FORWARDS: u8 = 6;
const SOCKET_TAG_TPU_FORWARDS_QUIC: u8 = 7;
@ -224,7 +225,11 @@ impl ContactInfo {
get_socket!(gossip, SOCKET_TAG_GOSSIP);
get_socket!(rpc, SOCKET_TAG_RPC);
get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
get_socket!(serve_repair, SOCKET_TAG_SERVE_REPAIR);
get_socket!(
serve_repair,
SOCKET_TAG_SERVE_REPAIR,
SOCKET_TAG_SERVE_REPAIR_QUIC
);
get_socket!(tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
get_socket!(
tpu_forwards,
@ -238,6 +243,7 @@ impl ContactInfo {
set_socket!(set_rpc, SOCKET_TAG_RPC);
set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
set_socket!(set_serve_repair, SOCKET_TAG_SERVE_REPAIR);
set_socket!(set_serve_repair_quic, SOCKET_TAG_SERVE_REPAIR_QUIC);
set_socket!(set_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
set_socket!(
set_tpu_forwards,
@ -248,7 +254,11 @@ impl ContactInfo {
set_socket!(set_tvu, SOCKET_TAG_TVU);
set_socket!(set_tvu_quic, SOCKET_TAG_TVU_QUIC);
remove_socket!(remove_serve_repair, SOCKET_TAG_SERVE_REPAIR);
remove_socket!(
remove_serve_repair,
SOCKET_TAG_SERVE_REPAIR,
SOCKET_TAG_SERVE_REPAIR_QUIC
);
remove_socket!(remove_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
remove_socket!(
remove_tpu_forwards,
@ -370,6 +380,8 @@ impl ContactInfo {
node.set_rpc_pubsub((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((Ipv4Addr::LOCALHOST, 8008)).unwrap();
node.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 8006))
.unwrap();
node
}
@ -392,6 +404,7 @@ impl ContactInfo {
node.set_rpc_pubsub((addr, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((addr, port + 8)).unwrap();
node.set_serve_repair_quic((addr, port + 4)).unwrap();
node
}
}
@ -733,9 +746,13 @@ mod tests {
sockets.get(&SOCKET_TAG_RPC_PUBSUB)
);
assert_eq!(
node.serve_repair().ok().as_ref(),
node.serve_repair(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_SERVE_REPAIR)
);
assert_eq!(
node.serve_repair(Protocol::QUIC).ok().as_ref(),
sockets.get(&SOCKET_TAG_SERVE_REPAIR_QUIC)
);
assert_eq!(
node.tpu(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU)
@ -813,7 +830,14 @@ mod tests {
assert_eq!(old.gossip().unwrap(), node.gossip().unwrap());
assert_eq!(old.rpc().unwrap(), node.rpc().unwrap());
assert_eq!(old.rpc_pubsub().unwrap(), node.rpc_pubsub().unwrap());
assert_eq!(old.serve_repair().unwrap(), node.serve_repair().unwrap());
assert_eq!(
old.serve_repair(Protocol::QUIC).unwrap(),
node.serve_repair(Protocol::QUIC).unwrap()
);
assert_eq!(
old.serve_repair(Protocol::UDP).unwrap(),
node.serve_repair(Protocol::UDP).unwrap()
);
assert_eq!(
old.tpu(Protocol::QUIC).unwrap(),
node.tpu(Protocol::QUIC).unwrap()

View File

@ -27,7 +27,8 @@ pub struct LegacyContactInfo {
tvu: SocketAddr,
/// TVU over QUIC protocol.
tvu_quic: SocketAddr,
unused: SocketAddr,
/// repair service over QUIC protocol.
serve_repair_quic: SocketAddr,
/// transactions address
tpu: SocketAddr,
/// address to forward unprocessed transactions to
@ -123,7 +124,7 @@ impl Default for LegacyContactInfo {
gossip: socketaddr_any!(),
tvu: socketaddr_any!(),
tvu_quic: socketaddr_any!(),
unused: socketaddr_any!(),
serve_repair_quic: socketaddr_any!(),
tpu: socketaddr_any!(),
tpu_forwards: socketaddr_any!(),
tpu_vote: socketaddr_any!(),
@ -143,7 +144,7 @@ impl LegacyContactInfo {
gossip: socketaddr!(Ipv4Addr::LOCALHOST, 1234),
tvu: socketaddr!(Ipv4Addr::LOCALHOST, 1235),
tvu_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1236),
unused: socketaddr!(Ipv4Addr::LOCALHOST, 1237),
serve_repair_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1237),
tpu: socketaddr!(Ipv4Addr::LOCALHOST, 1238),
tpu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1239),
tpu_vote: socketaddr!(Ipv4Addr::LOCALHOST, 1240),
@ -210,7 +211,7 @@ impl LegacyContactInfo {
get_socket!(tpu_vote);
get_socket!(rpc);
get_socket!(rpc_pubsub);
get_socket!(serve_repair);
get_socket!(serve_repair, serve_repair_quic);
set_socket!(set_gossip, gossip);
set_socket!(set_rpc, rpc);
@ -272,13 +273,13 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo {
gossip: unwrap_socket!(gossip),
tvu: unwrap_socket!(tvu, Protocol::UDP),
tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
unused: SOCKET_ADDR_UNSPECIFIED,
serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
tpu_vote: unwrap_socket!(tpu_vote),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),
serve_repair: unwrap_socket!(serve_repair),
serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
wallclock: node.wallclock(),
shred_version: node.shred_version(),
})

View File

@ -72,7 +72,7 @@ pub struct AdminRpcContactInfo {
pub gossip: SocketAddr,
pub tvu: SocketAddr,
pub tvu_quic: SocketAddr,
pub unused: SocketAddr,
pub serve_repair_quic: SocketAddr,
pub tpu: SocketAddr,
pub tpu_forwards: SocketAddr,
pub tpu_vote: SocketAddr,
@ -104,13 +104,13 @@ impl From<ContactInfo> for AdminRpcContactInfo {
gossip: unwrap_socket!(gossip),
tvu: unwrap_socket!(tvu, Protocol::UDP),
tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
unused: SOCKET_ADDR_UNSPECIFIED,
serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
tpu_vote: unwrap_socket!(tpu_vote),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),
serve_repair: unwrap_socket!(serve_repair),
serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
shred_version: node.shred_version(),
}
}

View File

@ -81,7 +81,7 @@ fn verify_reachable_ports(
};
let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
if verify_address(&node.info.serve_repair().ok()) {
if verify_address(&node.info.serve_repair(Protocol::UDP).ok()) {
udp_sockets.push(&node.sockets.serve_repair);
}
if verify_address(&node.info.tpu(Protocol::UDP).ok()) {