Sample peers if input peer missing for repair shred (#34818)
This commit is contained in:
parent
2c0278b8b2
commit
3d7ee19348
|
@ -1,5 +1,8 @@
|
|||
use {
|
||||
crate::repair::{outstanding_requests, serve_repair},
|
||||
crate::{
|
||||
cluster_slots_service::cluster_slots::ClusterSlots,
|
||||
repair::{outstanding_requests::OutstandingRequests, serve_repair::ShredRepairType},
|
||||
},
|
||||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::{pubkey::Pubkey, quic::NotifyKeyUpdate},
|
||||
|
@ -18,6 +21,6 @@ pub struct AdminRpcRequestMetadataPostInit {
|
|||
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
|
||||
pub notifies: Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>,
|
||||
pub repair_socket: Arc<UdpSocket>,
|
||||
pub outstanding_repair_requests:
|
||||
Arc<RwLock<outstanding_requests::OutstandingRequests<serve_repair::ShredRepairType>>>,
|
||||
pub outstanding_repair_requests: Arc<RwLock<OutstandingRequests<ShredRepairType>>>,
|
||||
pub cluster_slots: Arc<ClusterSlots>,
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ use {
|
|||
},
|
||||
crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender},
|
||||
lru::LruCache,
|
||||
rand::seq::SliceRandom,
|
||||
solana_client::connection_cache::Protocol,
|
||||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_ledger::{
|
||||
|
@ -58,6 +59,12 @@ use {
|
|||
const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200);
|
||||
const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK;
|
||||
|
||||
// When requesting repair for a specific shred through the admin RPC, we will
|
||||
// request up to NUM_PEERS_TO_SAMPLE_FOR_REPAIRS in the event a specific, valid
|
||||
// target node is not provided. This number was chosen to provide reasonable
|
||||
// chance of sampling duplicate in the event of cluster partition.
|
||||
const NUM_PEERS_TO_SAMPLE_FOR_REPAIRS: usize = 10;
|
||||
|
||||
pub type AncestorDuplicateSlotsSender = CrossbeamSender<AncestorDuplicateSlotToRepair>;
|
||||
pub type AncestorDuplicateSlotsReceiver = CrossbeamReceiver<AncestorDuplicateSlotToRepair>;
|
||||
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
|
||||
|
@ -682,27 +689,97 @@ impl RepairService {
|
|||
}
|
||||
}
|
||||
|
||||
fn get_repair_peers(
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
cluster_slots: Arc<ClusterSlots>,
|
||||
slot: u64,
|
||||
) -> Vec<(Pubkey, SocketAddr)> {
|
||||
// Find the repair peers that have this slot frozen.
|
||||
let Some(peers_with_slot) = cluster_slots.lookup(slot) else {
|
||||
warn!("No repair peers have frozen slot: {slot}");
|
||||
return vec![];
|
||||
};
|
||||
let peers_with_slot = peers_with_slot.read().unwrap();
|
||||
|
||||
// Filter out any peers that don't have a valid repair socket.
|
||||
let repair_peers: Vec<(Pubkey, SocketAddr, u32)> = peers_with_slot
|
||||
.iter()
|
||||
.filter_map(|(pubkey, stake)| {
|
||||
let peer_repair_addr = cluster_info
|
||||
.lookup_contact_info(pubkey, |node| node.serve_repair(Protocol::UDP));
|
||||
if let Some(Ok(peer_repair_addr)) = peer_repair_addr {
|
||||
trace!("Repair peer {pubkey} has a valid repair socket: {peer_repair_addr:?}");
|
||||
Some((
|
||||
*pubkey,
|
||||
peer_repair_addr,
|
||||
(*stake / solana_sdk::native_token::LAMPORTS_PER_SOL) as u32,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Sample a subset of the repair peers weighted by stake.
|
||||
let mut rng = rand::thread_rng();
|
||||
let Ok(weighted_sample_repair_peers) = repair_peers.choose_multiple_weighted(
|
||||
&mut rng,
|
||||
NUM_PEERS_TO_SAMPLE_FOR_REPAIRS,
|
||||
|(_, _, stake)| *stake,
|
||||
) else {
|
||||
return vec![];
|
||||
};
|
||||
|
||||
// Return the pubkey and repair socket address for the sampled peers.
|
||||
weighted_sample_repair_peers
|
||||
.collect::<Vec<_>>()
|
||||
.iter()
|
||||
.map(|(pubkey, addr, _)| (*pubkey, *addr))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn request_repair_for_shred_from_peer(
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
pubkey: Pubkey,
|
||||
cluster_slots: Arc<ClusterSlots>,
|
||||
pubkey: Option<Pubkey>,
|
||||
slot: u64,
|
||||
shred_index: u64,
|
||||
repair_socket: &UdpSocket,
|
||||
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
||||
) {
|
||||
let peer_repair_addr = cluster_info
|
||||
.lookup_contact_info(&pubkey, |node| node.serve_repair(Protocol::UDP))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
Self::request_repair_for_shred_from_address(
|
||||
cluster_info,
|
||||
pubkey,
|
||||
peer_repair_addr,
|
||||
slot,
|
||||
shred_index,
|
||||
repair_socket,
|
||||
outstanding_repair_requests,
|
||||
);
|
||||
let mut repair_peers = vec![];
|
||||
|
||||
// Check validity of passed in peer.
|
||||
if let Some(pubkey) = pubkey {
|
||||
let peer_repair_addr =
|
||||
cluster_info.lookup_contact_info(&pubkey, |node| node.serve_repair(Protocol::UDP));
|
||||
if let Some(Ok(peer_repair_addr)) = peer_repair_addr {
|
||||
trace!("Repair peer {pubkey} has valid repair socket: {peer_repair_addr:?}");
|
||||
repair_peers.push((pubkey, peer_repair_addr));
|
||||
}
|
||||
};
|
||||
|
||||
// Select weighted sample of valid peers if no valid peer was passed in.
|
||||
if repair_peers.is_empty() {
|
||||
debug!(
|
||||
"No pubkey was provided or no valid repair socket was found. \
|
||||
Sampling a set of repair peers instead."
|
||||
);
|
||||
repair_peers = Self::get_repair_peers(cluster_info.clone(), cluster_slots, slot);
|
||||
}
|
||||
|
||||
// Send repair request to each peer.
|
||||
for (pubkey, peer_repair_addr) in repair_peers {
|
||||
Self::request_repair_for_shred_from_address(
|
||||
cluster_info.clone(),
|
||||
pubkey,
|
||||
peer_repair_addr,
|
||||
slot,
|
||||
shred_index,
|
||||
repair_socket,
|
||||
outstanding_repair_requests.clone(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn request_repair_for_shred_from_address(
|
||||
|
@ -738,7 +815,7 @@ impl RepairService {
|
|||
// Send packet batch
|
||||
match batch_send(repair_socket, &reqs[..]) {
|
||||
Ok(()) => {
|
||||
trace!("successfully sent repair request!");
|
||||
debug!("successfully sent repair request to {pubkey} / {address}!");
|
||||
}
|
||||
Err(SendPktsError::IoError(err, _num_failed)) => {
|
||||
error!("batch_send failed to send packet - error = {:?}", err);
|
||||
|
|
|
@ -142,6 +142,7 @@ impl Tvu {
|
|||
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
|
||||
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
|
||||
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
||||
cluster_slots: Arc<ClusterSlots>,
|
||||
) -> Result<Self, String> {
|
||||
let TvuSockets {
|
||||
repair: repair_socket,
|
||||
|
@ -192,7 +193,6 @@ impl Tvu {
|
|||
Some(rpc_subscriptions.clone()),
|
||||
);
|
||||
|
||||
let cluster_slots = Arc::new(ClusterSlots::default());
|
||||
let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
|
||||
let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
|
||||
let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
|
||||
|
@ -448,6 +448,7 @@ pub mod tests {
|
|||
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
|
||||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
|
||||
let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
|
||||
let cluster_slots = Arc::new(ClusterSlots::default());
|
||||
let tvu = Tvu::new(
|
||||
&vote_keypair.pubkey(),
|
||||
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
|
||||
|
@ -503,6 +504,7 @@ pub mod tests {
|
|||
turbine_quic_endpoint_receiver,
|
||||
repair_quic_endpoint_sender,
|
||||
outstanding_repair_requests,
|
||||
cluster_slots,
|
||||
)
|
||||
.expect("assume success");
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
|
|
|
@ -1257,6 +1257,8 @@ impl Validator {
|
|||
|
||||
let outstanding_repair_requests =
|
||||
Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
|
||||
let cluster_slots =
|
||||
Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());
|
||||
|
||||
let tvu = Tvu::new(
|
||||
vote_account,
|
||||
|
@ -1311,6 +1313,7 @@ impl Validator {
|
|||
turbine_quic_endpoint_receiver,
|
||||
repair_quic_endpoint_sender,
|
||||
outstanding_repair_requests.clone(),
|
||||
cluster_slots.clone(),
|
||||
)?;
|
||||
|
||||
if in_wen_restart {
|
||||
|
@ -1389,6 +1392,7 @@ impl Validator {
|
|||
notifies: key_notifies,
|
||||
repair_socket: Arc::new(node.sockets.repair),
|
||||
outstanding_repair_requests,
|
||||
cluster_slots,
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
|
|
|
@ -212,7 +212,7 @@ pub trait AdminRpc {
|
|||
fn repair_shred_from_peer(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
pubkey: Pubkey,
|
||||
pubkey: Option<Pubkey>,
|
||||
slot: u64,
|
||||
shred_index: u64,
|
||||
) -> Result<()>;
|
||||
|
@ -500,7 +500,7 @@ impl AdminRpc for AdminRpcImpl {
|
|||
fn repair_shred_from_peer(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
pubkey: Pubkey,
|
||||
pubkey: Option<Pubkey>,
|
||||
slot: u64,
|
||||
shred_index: u64,
|
||||
) -> Result<()> {
|
||||
|
@ -509,6 +509,7 @@ impl AdminRpc for AdminRpcImpl {
|
|||
meta.with_post_init(|post_init| {
|
||||
repair_service::RepairService::request_repair_for_shred_from_peer(
|
||||
post_init.cluster_info.clone(),
|
||||
post_init.cluster_slots.clone(),
|
||||
pubkey,
|
||||
slot,
|
||||
shred_index,
|
||||
|
@ -931,6 +932,9 @@ mod tests {
|
|||
outstanding_repair_requests: Arc::<
|
||||
RwLock<repair_service::OutstandingShredRepairs>,
|
||||
>::default(),
|
||||
cluster_slots: Arc::new(
|
||||
solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default(),
|
||||
),
|
||||
}))),
|
||||
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
|
||||
rpc_to_plugin_manager_sender: None,
|
||||
|
|
|
@ -1503,6 +1503,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
|
|||
Arg::with_name("pubkey")
|
||||
.long("pubkey")
|
||||
.value_name("PUBKEY")
|
||||
.required(false)
|
||||
.takes_value(true)
|
||||
.validator(is_pubkey)
|
||||
.help("Identity pubkey of the validator to repair from")
|
||||
|
|
|
@ -794,7 +794,7 @@ pub fn main() {
|
|||
return;
|
||||
}
|
||||
("repair-shred-from-peer", Some(subcommand_matches)) => {
|
||||
let pubkey = value_t_or_exit!(subcommand_matches, "pubkey", Pubkey);
|
||||
let pubkey = value_t!(subcommand_matches, "pubkey", Pubkey).ok();
|
||||
let slot = value_t_or_exit!(subcommand_matches, "slot", u64);
|
||||
let shred_index = value_t_or_exit!(subcommand_matches, "shred", u64);
|
||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||
|
|
Loading…
Reference in New Issue