adds repair-protocol identifying serve-repair socket (#33028)

Working towards migrating repair over QUIC protocol, the commit adds
repair-protocol argument for identifying right serve-repair socket.
This commit is contained in:
behzad nouri 2023-08-28 21:34:09 +00:00 committed by GitHub
parent d5c2dacd07
commit 1171002f46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 126 additions and 94 deletions

View File

@ -9,7 +9,7 @@ use {
packet_threshold::DynamicPacketToProcessThreshold,
repair_service::{AncestorDuplicateSlotsSender, RepairInfo, RepairStatsGroup},
serve_repair::{
AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair,
self, AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair,
},
},
replay_stage::DUPLICATE_THRESHOLD,
@ -17,7 +17,7 @@ use {
bincode::serialize,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
dashmap::{mapref::entry::Entry::Occupied, DashMap},
solana_gossip::{cluster_info::ClusterInfo, ping_pong::Pong},
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol, ping_pong::Pong},
solana_ledger::blockstore::Blockstore,
solana_perf::{
packet::{deserialize_from_with_limit, Packet, PacketBatch},
@ -26,6 +26,7 @@ use {
solana_runtime::bank::Bank,
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
genesis_config::ClusterType,
pubkey::Pubkey,
signature::Signable,
signer::keypair::Keypair,
@ -207,11 +208,8 @@ impl AncestorHashesService {
Self { thread_hdls }
}
pub fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
pub(crate) fn join(self) -> thread::Result<()> {
self.thread_hdls.into_iter().try_for_each(JoinHandle::join)
}
/// Listen for responses to our ancestors hashes repair requests
@ -232,7 +230,7 @@ impl AncestorHashesService {
let mut last_stats_report = Instant::now();
let mut stats = AncestorHashesResponsesStats::default();
let mut packet_threshold = DynamicPacketToProcessThreshold::default();
loop {
while !exit.load(Ordering::Relaxed) {
let keypair = cluster_info.keypair().clone();
let result = Self::process_new_packets_from_channel(
&ancestor_hashes_request_statuses,
@ -253,9 +251,6 @@ impl AncestorHashesService {
return;
}
};
if exit.load(Ordering::Relaxed) {
return;
}
if last_stats_report.elapsed().as_secs() > 2 {
stats.report();
last_stats_report = Instant::now();
@ -639,6 +634,7 @@ impl AncestorHashesService {
request_throttle: &mut Vec<u64>,
) {
let root_bank = repair_info.bank_forks.read().unwrap().root_bank();
let cluster_type = root_bank.cluster_type();
for (slot, request_type) in retryable_slots_receiver.try_iter() {
datapoint_info!("ancestor-repair-retry", ("slot", slot, i64));
if request_type.is_pruned() {
@ -732,6 +728,7 @@ impl AncestorHashesService {
outstanding_requests,
identity_keypair,
request_type,
cluster_type,
) {
request_throttle.push(timestamp());
if request_type.is_pruned() {
@ -808,46 +805,53 @@ impl AncestorHashesService {
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
identity_keypair: &Keypair,
request_type: AncestorRequestType,
cluster_type: ClusterType,
) -> bool {
let sampled_validators = serve_repair.repair_request_ancestor_hashes_sample_peers(
let repair_protocol = serve_repair::get_repair_protocol(cluster_type);
let Ok(sampled_validators) = serve_repair.repair_request_ancestor_hashes_sample_peers(
duplicate_slot,
cluster_slots,
repair_validators,
);
repair_protocol,
) else {
return false;
};
if let Ok(sampled_validators) = sampled_validators {
for (pubkey, socket_addr) in sampled_validators.iter() {
repair_stats
.ancestor_requests
.update(pubkey, duplicate_slot, 0);
let nonce = outstanding_requests
.write()
.unwrap()
.add_request(AncestorHashesRepairType(duplicate_slot), timestamp());
let request_bytes = serve_repair.ancestor_repair_request_bytes(
identity_keypair,
pubkey,
duplicate_slot,
nonce,
);
if let Ok(request_bytes) = request_bytes {
for (pubkey, socket_addr) in &sampled_validators {
repair_stats
.ancestor_requests
.update(pubkey, duplicate_slot, 0);
let nonce = outstanding_requests
.write()
.unwrap()
.add_request(AncestorHashesRepairType(duplicate_slot), timestamp());
let Ok(request_bytes) = serve_repair.ancestor_repair_request_bytes(
identity_keypair,
pubkey,
duplicate_slot,
nonce,
) else {
continue;
};
match repair_protocol {
Protocol::UDP => {
let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr);
}
Protocol::QUIC => todo!(),
}
let ancestor_request_status = AncestorRequestStatus::new(
sampled_validators
.into_iter()
.map(|(_pk, socket_addr)| socket_addr),
duplicate_slot,
request_type,
);
assert!(!ancestor_hashes_request_statuses.contains_key(&duplicate_slot));
ancestor_hashes_request_statuses.insert(duplicate_slot, ancestor_request_status);
true
} else {
false
}
let ancestor_request_status = AncestorRequestStatus::new(
sampled_validators
.into_iter()
.map(|(_pk, socket_addr)| socket_addr),
duplicate_slot,
request_type,
);
assert!(ancestor_hashes_request_statuses
.insert(duplicate_slot, ancestor_request_status)
.is_none());
true
}
}
@ -1451,6 +1455,7 @@ mod test {
&outstanding_requests,
&requester_cluster_info.keypair(),
AncestorRequestType::DeadDuplicateConfirmed,
ClusterType::Development,
);
assert!(ancestor_hashes_request_statuses.is_empty());
@ -1499,6 +1504,7 @@ mod test {
&outstanding_requests,
&requester_cluster_info.keypair(),
AncestorRequestType::DeadDuplicateConfirmed,
ClusterType::Development,
);
assert_eq!(ancestor_hashes_request_statuses.len(), 1);
@ -1559,6 +1565,7 @@ mod test {
&outstanding_requests,
&requester_cluster_info.keypair(),
AncestorRequestType::PopularPruned,
ClusterType::Development,
);
assert_eq!(ancestor_hashes_request_statuses.len(), 1);

View File

@ -14,7 +14,7 @@ use {
duplicate_repair_status::AncestorDuplicateSlotToRepair,
outstanding_requests::OutstandingRequests,
repair_weight::RepairWeight,
serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
serve_repair::{self, ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
},
},
crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender},
@ -305,17 +305,14 @@ impl RepairService {
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);
let mut popular_pruned_forks_requests = HashSet::new();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
while !exit.load(Ordering::Relaxed) {
let mut set_root_elapsed;
let mut dump_slots_elapsed;
let mut get_votes_elapsed;
let mut add_votes_elapsed;
let root_bank = repair_info.bank_forks.read().unwrap().root_bank();
let repair_protocol = serve_repair::get_repair_protocol(root_bank.cluster_type());
let repairs = {
let new_root = root_bank.slot();
@ -425,17 +422,18 @@ impl RepairService {
let batch: Vec<(Vec<u8>, SocketAddr)> = {
let mut outstanding_requests = outstanding_requests.write().unwrap();
repairs
.iter()
.into_iter()
.filter_map(|repair_request| {
let (to, req) = serve_repair
.repair_request(
&repair_info.cluster_slots,
*repair_request,
repair_request,
&mut peers_cache,
&mut repair_stats,
&repair_info.repair_validators,
&mut outstanding_requests,
identity_keypair,
repair_protocol,
)
.ok()?;
Some((req, to))

View File

@ -34,6 +34,7 @@ use {
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::Slot,
genesis_config::ClusterType,
hash::{Hash, HASH_BYTES},
packet::PACKET_DATA_SIZE,
pubkey::{Pubkey, PUBKEY_BYTES},
@ -212,7 +213,7 @@ impl RepairRequestHeader {
pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>;
/// Window protocol messages
#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize, strum_macros::Display)]
#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize)]
#[frozen_abi(digest = "3VzVe3kMrG6ijkVPyCGeJVA9hQjWcFEZbAQPc5Zizrjm")]
pub enum RepairProtocol {
LegacyWindowIndex(LegacyContactInfo, Slot, u64),
@ -335,10 +336,17 @@ pub struct ServeRepair {
// Cache entry for repair peers for a slot.
pub(crate) struct RepairPeers {
asof: Instant,
peers: Vec<(Pubkey, /*ContactInfo.serve_repair:*/ SocketAddr)>,
peers: Vec<Node>,
weighted_index: WeightedIndex<u64>,
}
struct Node {
pubkey: Pubkey,
serve_repair: SocketAddr,
#[allow(dead_code)]
serve_repair_quic: SocketAddr,
}
impl RepairPeers {
fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result<Self> {
if peers.len() != weights.len() {
@ -348,8 +356,12 @@ impl RepairPeers {
.iter()
.zip(weights)
.filter_map(|(peer, &weight)| {
let addr = peer.serve_repair(Protocol::UDP).ok()?;
Some(((*peer.pubkey(), addr), weight))
let node = Node {
pubkey: *peer.pubkey(),
serve_repair: peer.serve_repair(Protocol::UDP).ok()?,
serve_repair_quic: peer.serve_repair(Protocol::QUIC).ok()?,
};
Some((node, weight))
})
.unzip();
if peers.is_empty() {
@ -363,9 +375,9 @@ impl RepairPeers {
})
}
fn sample<R: Rng>(&self, rng: &mut R) -> (Pubkey, SocketAddr) {
fn sample<R: Rng>(&self, rng: &mut R) -> &Node {
let index = self.weighted_index.sample(rng);
self.peers[index]
&self.peers[index]
}
}
@ -508,11 +520,8 @@ impl ServeRepair {
my_id: &Pubkey,
socket_addr_space: &SocketAddrSpace,
) -> Result<RepairRequestWithMeta> {
let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
return Err(Error::from(RepairVerifyError::Malformed));
}
let Ok(request) = packet.deserialize_slice(..) else {
return Err(Error::from(RepairVerifyError::Malformed));
};
let from_addr = packet.meta().socket_addr();
if !ContactInfo::is_valid_address(&from_addr, socket_addr_space) {
@ -520,7 +529,7 @@ impl ServeRepair {
}
Self::verify_signed_packet(my_id, packet, &request)?;
if request.sender() == my_id {
error!("self repair: from_addr={from_addr} my_id={my_id} request={request}");
error!("self repair: from_addr={from_addr} my_id={my_id} request={request:?}");
return Err(Error::from(RepairVerifyError::SelfRepair));
}
let stake = *epoch_staked_nodes
@ -804,7 +813,7 @@ impl ServeRepair {
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
let data_budget = DataBudget::default();
loop {
while !exit.load(Ordering::Relaxed) {
let result = self.run_listen(
&mut ping_cache,
&recycler,
@ -821,9 +830,6 @@ impl ServeRepair {
return;
}
};
if exit.load(Ordering::Relaxed) {
return;
}
if last_print.elapsed().as_secs() > 2 {
self.report_reset_stats(&mut stats);
last_print = Instant::now();
@ -1026,6 +1032,7 @@ impl ServeRepair {
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &mut OutstandingShredRepairs,
identity_keypair: &Keypair,
repair_protocol: Protocol,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
@ -1041,11 +1048,11 @@ impl ServeRepair {
peers_cache.get(&slot).unwrap()
}
};
let (peer, addr) = repair_peers.sample(&mut rand::thread_rng());
let peer = repair_peers.sample(&mut rand::thread_rng());
let nonce = outstanding_requests.add_request(repair_request, timestamp());
let out = self.map_repair_request(
&repair_request,
&peer,
&peer.pubkey,
repair_stats,
nonce,
identity_keypair,
@ -1055,7 +1062,10 @@ impl ServeRepair {
identity_keypair.pubkey(),
repair_request
);
Ok((addr, out))
match repair_protocol {
Protocol::UDP => Ok((peer.serve_repair, out)),
Protocol::QUIC => todo!(),
}
}
pub(crate) fn repair_request_ancestor_hashes_sample_peers(
@ -1063,6 +1073,7 @@ impl ServeRepair {
slot: Slot,
cluster_slots: &ClusterSlots,
repair_validators: &Option<HashSet<Pubkey>>,
repair_protocol: Protocol,
) -> Result<Vec<(Pubkey, SocketAddr)>> {
let repair_peers: Vec<_> = self.repair_peers(repair_validators, slot);
if repair_peers.is_empty() {
@ -1076,7 +1087,7 @@ impl ServeRepair {
.shuffle(&mut rand::thread_rng())
.map(|i| index[i])
.filter_map(|i| {
let addr = repair_peers[i].serve_repair(Protocol::UDP).ok()?;
let addr = repair_peers[i].serve_repair(repair_protocol).ok()?;
Some((*repair_peers[i].pubkey(), addr))
})
.take(get_ancestor_hash_repair_sample_size())
@ -1084,7 +1095,8 @@ impl ServeRepair {
Ok(peers)
}
pub fn repair_request_duplicate_compute_best_peer(
#[cfg(test)]
pub(crate) fn repair_request_duplicate_compute_best_peer(
&self,
slot: Slot,
cluster_slots: &ClusterSlots,
@ -1346,6 +1358,11 @@ impl ServeRepair {
}
}
#[inline]
pub(crate) fn get_repair_protocol(_: ClusterType) -> Protocol {
Protocol::UDP
}
#[cfg(test)]
mod tests {
use {
@ -1703,10 +1720,10 @@ mod tests {
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert!(matches!(
assert_matches!(
ServeRepair::verify_signed_packet(&my_keypair.pubkey(), &packet, &request),
Err(Error::RepairVerify(RepairVerifyError::IdMismatch))
));
);
// outside time window
let packet = {
@ -1725,10 +1742,10 @@ mod tests {
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert!(matches!(
assert_matches!(
ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request),
Err(Error::RepairVerify(RepairVerifyError::TimeSkew))
));
);
// bad signature
let packet = {
@ -1745,10 +1762,10 @@ mod tests {
packet
};
let request: RepairProtocol = packet.deserialize_slice(..).unwrap();
assert!(matches!(
assert_matches!(
ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request),
Err(Error::RepairVerify(RepairVerifyError::SigVerify))
));
);
}
#[test]
@ -1901,6 +1918,7 @@ mod tests {
&None,
&mut outstanding_requests,
&identity_keypair,
Protocol::UDP, // repair_protocol
);
assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers)));
@ -1919,6 +1937,8 @@ mod tests {
nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap();
nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap();
nxt.set_serve_repair(serve_repair_addr).unwrap();
nxt.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 1237))
.unwrap();
cluster_info.insert_info(nxt.clone());
let rv = serve_repair
.repair_request(
@ -1929,6 +1949,7 @@ mod tests {
&None,
&mut outstanding_requests,
&identity_keypair,
Protocol::UDP, // repair_protocol
)
.unwrap();
assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr);
@ -1949,6 +1970,8 @@ mod tests {
nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap();
nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap();
nxt.set_serve_repair(serve_repair_addr2).unwrap();
nxt.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 1237))
.unwrap();
cluster_info.insert_info(nxt);
let mut one = false;
let mut two = false;
@ -1963,6 +1986,7 @@ mod tests {
&None,
&mut outstanding_requests,
&identity_keypair,
Protocol::UDP, // repair_protocol
)
.unwrap();
if rv.0 == serve_repair_addr {
@ -2232,8 +2256,8 @@ mod tests {
for pubkey in &[solana_sdk::pubkey::new_rand(), *me.pubkey()] {
let known_validators = Some(vec![*pubkey].into_iter().collect());
assert!(serve_repair.repair_peers(&known_validators, 1).is_empty());
assert!(serve_repair
.repair_request(
assert_matches!(
serve_repair.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
@ -2241,8 +2265,10 @@ mod tests {
&known_validators,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
)
.is_err());
Protocol::UDP, // repair_protocol
),
Err(Error::ClusterInfo(ClusterInfoError::NoPeers))
);
}
// If known validator exists in gossip, should return repair successfully
@ -2250,8 +2276,8 @@ mod tests {
let repair_peers = serve_repair.repair_peers(&known_validators, 1);
assert_eq!(repair_peers.len(), 1);
assert_eq!(repair_peers[0].pubkey(), contact_info2.pubkey());
assert!(serve_repair
.repair_request(
assert_matches!(
serve_repair.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
@ -2259,8 +2285,10 @@ mod tests {
&known_validators,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
)
.is_ok());
Protocol::UDP, // repair_protocol
),
Ok(_)
);
// Using no known validators should default to all
// validator's available in gossip, excluding myself
@ -2272,8 +2300,8 @@ mod tests {
assert_eq!(repair_peers.len(), 2);
assert!(repair_peers.contains(contact_info2.pubkey()));
assert!(repair_peers.contains(contact_info3.pubkey()));
assert!(serve_repair
.repair_request(
assert_matches!(
serve_repair.repair_request(
&cluster_slots,
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
@ -2281,8 +2309,10 @@ mod tests {
&None,
&mut OutstandingShredRepairs::default(),
&identity_keypair,
)
.is_ok());
Protocol::UDP, // repair_protocol
),
Ok(_)
);
}
#[test]

View File

@ -59,10 +59,7 @@ impl ServeRepairService {
Self { thread_hdls }
}
pub fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
pub(crate) fn join(self) -> thread::Result<()> {
self.thread_hdls.into_iter().try_for_each(JoinHandle::join)
}
}