parent
586c794c8a
commit
8fe4f5d0b9
|
@ -1,9 +1,11 @@
|
|||
use {
|
||||
crate::repair::{outstanding_requests, serve_repair},
|
||||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::{pubkey::Pubkey, quic::NotifyKeyUpdate},
|
||||
std::{
|
||||
collections::HashSet,
|
||||
net::UdpSocket,
|
||||
sync::{Arc, RwLock},
|
||||
},
|
||||
};
|
||||
|
@ -15,4 +17,7 @@ pub struct AdminRpcRequestMetadataPostInit {
|
|||
pub vote_account: Pubkey,
|
||||
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
|
||||
pub notifies: Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>,
|
||||
pub repair_socket: Arc<UdpSocket>,
|
||||
pub outstanding_repair_requests:
|
||||
Arc<RwLock<outstanding_requests::OutstandingRequests<serve_repair::ShredRepairType>>>,
|
||||
}
|
||||
|
|
|
@ -15,11 +15,15 @@ use {
|
|||
outstanding_requests::OutstandingRequests,
|
||||
quic_endpoint::LocalRequest,
|
||||
repair_weight::RepairWeight,
|
||||
serve_repair::{self, ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
|
||||
serve_repair::{
|
||||
self, RepairProtocol, RepairRequestHeader, ServeRepair, ShredRepairType,
|
||||
REPAIR_PEERS_CACHE_CAPACITY,
|
||||
},
|
||||
},
|
||||
},
|
||||
crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender},
|
||||
lru::LruCache,
|
||||
solana_client::connection_cache::Protocol,
|
||||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_ledger::{
|
||||
blockstore::{Blockstore, SlotMeta},
|
||||
|
@ -678,6 +682,70 @@ impl RepairService {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn request_repair_for_shred_from_peer(
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
pubkey: Pubkey,
|
||||
slot: u64,
|
||||
shred_index: u64,
|
||||
repair_socket: &UdpSocket,
|
||||
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
||||
) {
|
||||
let peer_repair_addr = cluster_info
|
||||
.lookup_contact_info(&pubkey, |node| node.serve_repair(Protocol::UDP))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
Self::request_repair_for_shred_from_address(
|
||||
cluster_info,
|
||||
pubkey,
|
||||
peer_repair_addr,
|
||||
slot,
|
||||
shred_index,
|
||||
repair_socket,
|
||||
outstanding_repair_requests,
|
||||
);
|
||||
}
|
||||
|
||||
fn request_repair_for_shred_from_address(
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
pubkey: Pubkey,
|
||||
address: SocketAddr,
|
||||
slot: u64,
|
||||
shred_index: u64,
|
||||
repair_socket: &UdpSocket,
|
||||
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
||||
) {
|
||||
// Setup repair request
|
||||
let identity_keypair = cluster_info.keypair();
|
||||
let repair_request = ShredRepairType::Shred(slot, shred_index);
|
||||
let nonce = outstanding_repair_requests
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_request(repair_request, timestamp());
|
||||
|
||||
// Create repair request
|
||||
let header = RepairRequestHeader::new(cluster_info.id(), pubkey, timestamp(), nonce);
|
||||
let request_proto = RepairProtocol::WindowIndex {
|
||||
header,
|
||||
slot,
|
||||
shred_index,
|
||||
};
|
||||
let packet_buf =
|
||||
ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap();
|
||||
|
||||
// Prepare packet batch to send
|
||||
let reqs = vec![(packet_buf, address)];
|
||||
|
||||
// Send packet batch
|
||||
match batch_send(repair_socket, &reqs[..]) {
|
||||
Ok(()) => {
|
||||
trace!("successfully sent repair request!");
|
||||
}
|
||||
Err(SendPktsError::IoError(err, _num_failed)) => {
|
||||
error!("batch_send failed to send packet - error = {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end
|
||||
#[cfg(test)]
|
||||
pub fn generate_repairs_in_range(
|
||||
|
@ -859,6 +927,7 @@ pub(crate) fn sleep_shred_deferment_period() {
|
|||
mod test {
|
||||
use {
|
||||
super::*,
|
||||
crate::repair::quic_endpoint::RemoteRequest,
|
||||
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
|
||||
solana_ledger::{
|
||||
blockstore::{
|
||||
|
@ -883,6 +952,59 @@ mod test {
|
|||
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_request_repair_for_shred_from_address() {
|
||||
// Setup cluster and repair info
|
||||
let cluster_info = Arc::new(new_test_cluster_info());
|
||||
let pubkey = cluster_info.id();
|
||||
let slot = 100;
|
||||
let shred_index = 50;
|
||||
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let address = reader.local_addr().unwrap();
|
||||
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));
|
||||
|
||||
// Send a repair request
|
||||
RepairService::request_repair_for_shred_from_address(
|
||||
cluster_info.clone(),
|
||||
pubkey,
|
||||
address,
|
||||
slot,
|
||||
shred_index,
|
||||
&sender,
|
||||
outstanding_repair_requests,
|
||||
);
|
||||
|
||||
// Receive and translate repair packet
|
||||
let mut packets = vec![solana_sdk::packet::Packet::default(); 1];
|
||||
let _recv_count = solana_streamer::recvmmsg::recv_mmsg(&reader, &mut packets[..]).unwrap();
|
||||
let packet = &packets[0];
|
||||
let Some(bytes) = packet.data(..).map(Vec::from) else {
|
||||
panic!("packet data not found");
|
||||
};
|
||||
let remote_request = RemoteRequest {
|
||||
remote_pubkey: None,
|
||||
remote_address: packet.meta().socket_addr(),
|
||||
bytes,
|
||||
response_sender: None,
|
||||
};
|
||||
|
||||
// Deserialize and check the request
|
||||
let deserialized =
|
||||
serve_repair::deserialize_request::<RepairProtocol>(&remote_request).unwrap();
|
||||
match deserialized {
|
||||
RepairProtocol::WindowIndex {
|
||||
slot: deserialized_slot,
|
||||
shred_index: deserialized_shred_index,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(deserialized_slot, slot);
|
||||
assert_eq!(deserialized_shred_index, shred_index);
|
||||
}
|
||||
_ => panic!("unexpected repair protocol"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_repair_orphan() {
|
||||
let ledger_path = get_tmp_ledger_path_auto_delete!();
|
||||
|
|
|
@ -1384,7 +1384,9 @@ pub(crate) fn get_repair_protocol(_: ClusterType) -> Protocol {
|
|||
Protocol::UDP
|
||||
}
|
||||
|
||||
fn deserialize_request<T>(request: &RemoteRequest) -> std::result::Result<T, bincode::Error>
|
||||
pub(crate) fn deserialize_request<T>(
|
||||
request: &RemoteRequest,
|
||||
) -> std::result::Result<T, bincode::Error>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
|
|
|
@ -14,7 +14,10 @@ use {
|
|||
consensus::{tower_storage::TowerStorage, Tower},
|
||||
cost_update_service::CostUpdateService,
|
||||
drop_bank_service::DropBankService,
|
||||
repair::{quic_endpoint::LocalRequest, repair_service::RepairInfo},
|
||||
repair::{
|
||||
quic_endpoint::LocalRequest,
|
||||
repair_service::{OutstandingShredRepairs, RepairInfo},
|
||||
},
|
||||
replay_stage::{ReplayStage, ReplayStageConfig},
|
||||
rewards_recorder_service::RewardsRecorderSender,
|
||||
shred_fetch_stage::ShredFetchStage,
|
||||
|
@ -138,6 +141,7 @@ impl Tvu {
|
|||
turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
|
||||
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
|
||||
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
|
||||
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
||||
) -> Result<Self, String> {
|
||||
let TvuSockets {
|
||||
repair: repair_socket,
|
||||
|
@ -228,6 +232,7 @@ impl Tvu {
|
|||
ancestor_hashes_replay_update_receiver,
|
||||
dumped_slots_receiver,
|
||||
popular_pruned_forks_sender,
|
||||
outstanding_repair_requests,
|
||||
)
|
||||
};
|
||||
|
||||
|
@ -442,6 +447,7 @@ pub mod tests {
|
|||
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
|
||||
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
|
||||
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
|
||||
let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
|
||||
let tvu = Tvu::new(
|
||||
&vote_keypair.pubkey(),
|
||||
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
|
||||
|
@ -496,6 +502,7 @@ pub mod tests {
|
|||
turbine_quic_endpoint_sender,
|
||||
turbine_quic_endpoint_receiver,
|
||||
repair_quic_endpoint_sender,
|
||||
outstanding_repair_requests,
|
||||
)
|
||||
.expect("assume success");
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
|
|
|
@ -1255,13 +1255,16 @@ impl Validator {
|
|||
};
|
||||
let last_vote = tower.last_vote();
|
||||
|
||||
let outstanding_repair_requests =
|
||||
Arc::<RwLock<repair::repair_service::OutstandingShredRepairs>>::default();
|
||||
|
||||
let tvu = Tvu::new(
|
||||
vote_account,
|
||||
authorized_voter_keypairs,
|
||||
&bank_forks,
|
||||
&cluster_info,
|
||||
TvuSockets {
|
||||
repair: node.sockets.repair,
|
||||
repair: node.sockets.repair.try_clone().unwrap(),
|
||||
retransmit: node.sockets.retransmit_sockets,
|
||||
fetch: node.sockets.tvu,
|
||||
ancestor_hashes_requests: node.sockets.ancestor_hashes_requests,
|
||||
|
@ -1307,6 +1310,7 @@ impl Validator {
|
|||
turbine_quic_endpoint_sender.clone(),
|
||||
turbine_quic_endpoint_receiver,
|
||||
repair_quic_endpoint_sender,
|
||||
outstanding_repair_requests.clone(),
|
||||
)?;
|
||||
|
||||
if in_wen_restart {
|
||||
|
@ -1383,6 +1387,8 @@ impl Validator {
|
|||
vote_account: *vote_account,
|
||||
repair_whitelist: config.repair_whitelist.clone(),
|
||||
notifies: key_notifies,
|
||||
repair_socket: Arc::new(node.sockets.repair),
|
||||
outstanding_repair_requests,
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
|
|
|
@ -386,9 +386,8 @@ impl WindowService {
|
|||
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
|
||||
dumped_slots_receiver: DumpedSlotsReceiver,
|
||||
popular_pruned_forks_sender: PopularPrunedForksSender,
|
||||
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
|
||||
) -> WindowService {
|
||||
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
|
||||
|
||||
let cluster_info = repair_info.cluster_info.clone();
|
||||
let bank_forks = repair_info.bank_forks.clone();
|
||||
|
||||
|
@ -401,7 +400,7 @@ impl WindowService {
|
|||
repair_quic_endpoint_response_sender,
|
||||
repair_info,
|
||||
verified_vote_receiver,
|
||||
outstanding_requests.clone(),
|
||||
outstanding_repair_requests.clone(),
|
||||
ancestor_hashes_replay_update_receiver,
|
||||
dumped_slots_receiver,
|
||||
popular_pruned_forks_sender,
|
||||
|
@ -426,7 +425,7 @@ impl WindowService {
|
|||
duplicate_sender,
|
||||
completed_data_sets_sender,
|
||||
retransmit_sender,
|
||||
outstanding_requests,
|
||||
outstanding_repair_requests,
|
||||
);
|
||||
|
||||
WindowService {
|
||||
|
|
|
@ -13,6 +13,7 @@ use {
|
|||
solana_core::{
|
||||
admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
|
||||
consensus::{tower_storage::TowerStorage, Tower},
|
||||
repair::repair_service,
|
||||
validator::ValidatorStartProgress,
|
||||
},
|
||||
solana_geyser_plugin_manager::GeyserPluginManagerRequest,
|
||||
|
@ -207,6 +208,15 @@ pub trait AdminRpc {
|
|||
#[rpc(meta, name = "contactInfo")]
|
||||
fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
|
||||
|
||||
#[rpc(meta, name = "repairShredFromPeer")]
|
||||
fn repair_shred_from_peer(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
pubkey: Pubkey,
|
||||
slot: u64,
|
||||
shred_index: u64,
|
||||
) -> Result<()>;
|
||||
|
||||
#[rpc(meta, name = "repairWhitelist")]
|
||||
fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
|
||||
|
||||
|
@ -487,6 +497,28 @@ impl AdminRpc for AdminRpcImpl {
|
|||
meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
|
||||
}
|
||||
|
||||
fn repair_shred_from_peer(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
pubkey: Pubkey,
|
||||
slot: u64,
|
||||
shred_index: u64,
|
||||
) -> Result<()> {
|
||||
debug!("repair_shred_from_peer request received");
|
||||
|
||||
meta.with_post_init(|post_init| {
|
||||
repair_service::RepairService::request_repair_for_shred_from_peer(
|
||||
post_init.cluster_info.clone(),
|
||||
pubkey,
|
||||
slot,
|
||||
shred_index,
|
||||
&post_init.repair_socket.clone(),
|
||||
post_init.outstanding_repair_requests.clone(),
|
||||
);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
|
||||
debug!("repair_whitelist request received");
|
||||
|
||||
|
@ -895,6 +927,10 @@ mod tests {
|
|||
vote_account,
|
||||
repair_whitelist,
|
||||
notifies: Vec::new(),
|
||||
repair_socket: Arc::new(std::net::UdpSocket::bind("0.0.0.0:0").unwrap()),
|
||||
outstanding_repair_requests: Arc::<
|
||||
RwLock<repair_service::OutstandingShredRepairs>,
|
||||
>::default(),
|
||||
}))),
|
||||
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
|
||||
rpc_to_plugin_manager_sender: None,
|
||||
|
|
|
@ -1497,6 +1497,33 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
|
|||
.help("Output display mode")
|
||||
)
|
||||
)
|
||||
.subcommand(SubCommand::with_name("repair-shred-from-peer")
|
||||
.about("Request a repair from the specified validator")
|
||||
.arg(
|
||||
Arg::with_name("pubkey")
|
||||
.long("pubkey")
|
||||
.value_name("PUBKEY")
|
||||
.takes_value(true)
|
||||
.validator(is_pubkey)
|
||||
.help("Identity pubkey of the validator to repair from")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("slot")
|
||||
.long("slot")
|
||||
.value_name("SLOT")
|
||||
.takes_value(true)
|
||||
.validator(is_parsable::<u64>)
|
||||
.help("Slot to repair")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("shred")
|
||||
.long("shred")
|
||||
.value_name("SHRED")
|
||||
.takes_value(true)
|
||||
.validator(is_parsable::<u64>)
|
||||
.help("Shred to repair")
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("repair-whitelist")
|
||||
.about("Manage the validator's repair protocol whitelist")
|
||||
|
|
|
@ -793,6 +793,24 @@ pub fn main() {
|
|||
});
|
||||
return;
|
||||
}
|
||||
("repair-shred-from-peer", Some(subcommand_matches)) => {
|
||||
let pubkey = value_t_or_exit!(subcommand_matches, "pubkey", Pubkey);
|
||||
let slot = value_t_or_exit!(subcommand_matches, "slot", u64);
|
||||
let shred_index = value_t_or_exit!(subcommand_matches, "shred", u64);
|
||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||
admin_rpc_service::runtime()
|
||||
.block_on(async move {
|
||||
admin_client
|
||||
.await?
|
||||
.repair_shred_from_peer(pubkey, slot, shred_index)
|
||||
.await
|
||||
})
|
||||
.unwrap_or_else(|err| {
|
||||
println!("repair shred from peer failed: {err}");
|
||||
exit(1);
|
||||
});
|
||||
return;
|
||||
}
|
||||
("repair-whitelist", Some(repair_whitelist_subcommand_matches)) => {
|
||||
match repair_whitelist_subcommand_matches.subcommand() {
|
||||
("get", Some(subcommand_matches)) => {
|
||||
|
|
Loading…
Reference in New Issue