sign repair requests (#26833)

This commit is contained in:
Jeff Biseda 2022-07-31 15:48:51 -07:00 committed by GitHub
parent 8db5a6a4f2
commit 857be1e237
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1094 additions and 200 deletions

View File

@ -21,6 +21,7 @@ use {
solana_sdk::{
clock::{Slot, SLOT_MS},
pubkey::Pubkey,
signer::keypair::Keypair,
timing::timestamp,
},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
@ -451,7 +452,10 @@ impl AncestorHashesService {
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
retryable_slots_receiver: RetryableSlotsReceiver,
) -> JoinHandle<()> {
let serve_repair = ServeRepair::new(repair_info.cluster_info.clone());
let serve_repair = ServeRepair::new(
repair_info.cluster_info.clone(),
repair_info.bank_forks.clone(),
);
let mut repair_stats = AncestorRepairRequestsStats::default();
let mut dead_slot_pool = HashSet::new();
@ -540,6 +544,8 @@ impl AncestorHashesService {
// Keep around the last second of requests in the throttler.
request_throttle.retain(|request_time| *request_time > (timestamp() - 1000));
let identity_keypair: &Keypair = &repair_info.cluster_info.keypair().clone();
let number_of_allowed_requests =
MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND.saturating_sub(request_throttle.len());
@ -563,6 +569,8 @@ impl AncestorHashesService {
slot,
repair_stats,
outstanding_requests,
identity_keypair,
&root_bank,
) {
request_throttle.push(timestamp());
repairable_dead_slot_pool.take(&slot).unwrap();
@ -627,6 +635,7 @@ impl AncestorHashesService {
/// Returns true if a request was successfully made and the status
/// added to `ancestor_hashes_request_statuses`
#[allow(clippy::too_many_arguments)]
fn initiate_ancestor_hashes_requests_for_duplicate_slot(
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
ancestor_hashes_request_socket: &UdpSocket,
@ -636,6 +645,8 @@ impl AncestorHashesService {
duplicate_slot: Slot,
repair_stats: &mut AncestorRepairRequestsStats,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
identity_keypair: &Keypair,
root_bank: &Bank,
) -> bool {
let sampled_validators = serve_repair.repair_request_ancestor_hashes_sample_peers(
duplicate_slot,
@ -652,8 +663,13 @@ impl AncestorHashesService {
.write()
.unwrap()
.add_request(AncestorHashesRepairType(duplicate_slot), timestamp());
let request_bytes =
serve_repair.ancestor_repair_request_bytes(duplicate_slot, nonce);
let request_bytes = serve_repair.ancestor_repair_request_bytes(
identity_keypair,
root_bank,
pubkey,
duplicate_slot,
nonce,
);
if let Ok(request_bytes) = request_bytes {
let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr);
}
@ -877,14 +893,17 @@ mod test {
fn new(slot_to_query: Slot) -> Self {
assert!(slot_to_query >= MAX_ANCESTOR_RESPONSES as Slot);
let vote_simulator = VoteSimulator::new(3);
let responder_node = Node::new_localhost();
let cluster_info = ClusterInfo::new(
responder_node.info.clone(),
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
let responder_serve_repair =
Arc::new(RwLock::new(ServeRepair::new(Arc::new(cluster_info))));
let responder_serve_repair = Arc::new(RwLock::new(ServeRepair::new(
Arc::new(cluster_info),
vote_simulator.bank_forks,
)));
// Set up thread to give us responses
let ledger_path = get_tmp_ledger_path!();
@ -968,7 +987,8 @@ mod test {
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
));
let requester_serve_repair = ServeRepair::new(requester_cluster_info.clone());
let requester_serve_repair =
ServeRepair::new(requester_cluster_info.clone(), bank_forks.clone());
let (duplicate_slots_reset_sender, _duplicate_slots_reset_receiver) = unbounded();
let repair_info = RepairInfo {
bank_forks,
@ -1074,6 +1094,7 @@ mod test {
} = ManageAncestorHashesState::new(vote_simulator.bank_forks);
let RepairInfo {
bank_forks,
cluster_info: requester_cluster_info,
cluster_slots,
repair_validators,
@ -1089,6 +1110,8 @@ mod test {
dead_slot,
&mut repair_stats,
&outstanding_requests,
&requester_cluster_info.keypair(),
&bank_forks.read().unwrap().root_bank(),
);
assert!(ancestor_hashes_request_statuses.is_empty());
@ -1106,6 +1129,8 @@ mod test {
dead_slot,
&mut repair_stats,
&outstanding_requests,
&requester_cluster_info.keypair(),
&bank_forks.read().unwrap().root_bank(),
);
assert_eq!(ancestor_hashes_request_statuses.len(), 1);

View File

@ -17,7 +17,10 @@ use {
solana_ledger::blockstore::{Blockstore, SlotMeta},
solana_measure::measure::Measure,
solana_runtime::{bank_forks::BankForks, contains::Contains},
solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey},
solana_sdk::{
clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey,
signer::keypair::Keypair,
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
std::{
collections::{HashMap, HashSet},
@ -246,7 +249,10 @@ impl RepairService {
outstanding_requests: &RwLock<OutstandingShredRepairs>,
) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(repair_info.cluster_info.clone());
let serve_repair = ServeRepair::new(
repair_info.cluster_info.clone(),
repair_info.bank_forks.clone(),
);
let id = repair_info.cluster_info.id();
let mut repair_stats = RepairStats::default();
let mut repair_timing = RepairTiming::default();
@ -265,8 +271,11 @@ impl RepairService {
let mut get_votes_elapsed;
let mut add_votes_elapsed;
let root_bank = repair_info.bank_forks.read().unwrap().root_bank();
let sign_repair_requests_feature_epoch =
ServeRepair::sign_repair_requests_activated_epoch(&root_bank);
let repairs = {
let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone();
let new_root = root_bank.slot();
// Purge outdated slots from the weighting heuristic
@ -314,12 +323,24 @@ impl RepairService {
repairs
};
let identity_keypair: &Keypair = &repair_info.cluster_info.keypair().clone();
let mut build_repairs_batch_elapsed = Measure::start("build_repairs_batch_elapsed");
let batch: Vec<(Vec<u8>, SocketAddr)> = {
let mut outstanding_requests = outstanding_requests.write().unwrap();
repairs
.iter()
.filter_map(|repair_request| {
let sign_repair_request = ServeRepair::should_sign_repair_request(
repair_request.slot(),
&root_bank,
sign_repair_requests_feature_epoch,
);
let maybe_keypair = if sign_repair_request {
Some(identity_keypair)
} else {
None
};
let (to, req) = serve_repair
.repair_request(
&repair_info.cluster_slots,
@ -328,6 +349,7 @@ impl RepairService {
&mut repair_stats,
&repair_info.repair_validators,
&mut outstanding_requests,
maybe_keypair,
)
.ok()?;
Some((req, to))
@ -653,8 +675,13 @@ impl RepairService {
repair_stats: &mut RepairStats,
nonce: Nonce,
) -> Result<()> {
let req =
serve_repair.map_repair_request(repair_type, repair_pubkey, repair_stats, nonce)?;
let req = serve_repair.map_repair_request(
repair_type,
repair_pubkey,
repair_stats,
nonce,
None,
)?;
repair_socket.send_to(&req, to)?;
Ok(())
}
@ -722,9 +749,11 @@ mod test {
blockstore::{
make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, Blockstore,
},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
shred::max_ticks_per_n_shreds,
},
solana_runtime::bank::Bank,
solana_sdk::signature::Keypair,
solana_streamer::socket::SocketAddrSpace,
std::collections::HashSet,
@ -1044,11 +1073,16 @@ mod test {
#[test]
pub fn test_generate_and_send_duplicate_repairs() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let cluster_slots = ClusterSlots::default();
let serve_repair =
ServeRepair::new(Arc::new(new_test_cluster_info(Node::new_localhost().info)));
let serve_repair = ServeRepair::new(
Arc::new(new_test_cluster_info(Node::new_localhost().info)),
bank_forks,
);
let mut duplicate_slot_repair_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
@ -1127,12 +1161,15 @@ mod test {
#[test]
pub fn test_update_duplicate_slot_repair_addr() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let dummy_addr = Some((
Pubkey::default(),
UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(),
));
let cluster_info = Arc::new(new_test_cluster_info(Node::new_localhost().info));
let serve_repair = ServeRepair::new(cluster_info.clone());
let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks);
let valid_repair_peer = Node::new_localhost().info;
// Signal that this peer has confirmed the dead slot, and is thus

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +1,10 @@
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
use {
crate::packet_hasher::PacketHasher,
crate::{packet_hasher::PacketHasher, serve_repair::ServeRepair},
crossbeam_channel::{unbounded, Sender},
lru::LruCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::bank_forks::BankForks,
@ -33,10 +34,14 @@ impl ShredFetchStage {
shred_version: u16,
name: &'static str,
flags: PacketFlags,
repair_context: Option<(&UdpSocket, &ClusterInfo)>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE);
let mut last_updated = Instant::now();
let mut keypair = repair_context
.as_ref()
.map(|(_, cluster_info)| cluster_info.keypair().clone());
// In the case of bank_forks=None, setup to accept any slot range
let mut last_root = 0;
@ -59,8 +64,25 @@ impl ShredFetchStage {
let root_bank = bank_forks_r.root_bank();
slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch());
}
keypair = repair_context
.as_ref()
.map(|(_, cluster_info)| cluster_info.keypair().clone());
}
stats.shred_count += packet_batch.len();
if let Some((udp_socket, _)) = repair_context {
debug_assert_eq!(flags, PacketFlags::REPAIR);
debug_assert!(keypair.is_some());
if let Some(ref keypair) = keypair {
ServeRepair::handle_repair_response_pings(
udp_socket,
keypair,
&mut packet_batch,
&mut stats,
);
}
}
// Limit shreds to 2 epochs away.
let max_slot = last_slot + 2 * slots_per_epoch;
for packet in packet_batch.iter_mut() {
@ -94,6 +116,7 @@ impl ShredFetchStage {
shred_version: u16,
name: &'static str,
flags: PacketFlags,
repair_context: Option<(Arc<UdpSocket>, Arc<ClusterInfo>)>,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>) {
let (packet_sender, packet_receiver) = unbounded();
let streamers = sockets
@ -111,10 +134,12 @@ impl ShredFetchStage {
)
})
.collect();
let modifier_hdl = Builder::new()
.name("solana-tvu-fetch-stage-packet-modifier".to_string())
.spawn(move || {
let repair_context = repair_context
.as_ref()
.map(|(socket, cluster_info)| (socket.as_ref(), cluster_info.as_ref()));
Self::modify_packets(
packet_receiver,
sender,
@ -122,6 +147,7 @@ impl ShredFetchStage {
shred_version,
name,
flags,
repair_context,
)
})
.unwrap();
@ -135,6 +161,7 @@ impl ShredFetchStage {
sender: Sender<PacketBatch>,
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
exit: &Arc<AtomicBool>,
) -> Self {
let recycler = PacketBatchRecycler::warmed(100, 1024);
@ -148,6 +175,7 @@ impl ShredFetchStage {
shred_version,
"shred_fetch",
PacketFlags::empty(),
None, // repair_context
);
let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier(
@ -159,10 +187,11 @@ impl ShredFetchStage {
shred_version,
"shred_fetch_tvu_forwards",
PacketFlags::FORWARDED,
None, // repair_context
);
let (repair_receiver, repair_handler) = Self::packet_modifier(
vec![repair_socket],
vec![repair_socket.clone()],
exit,
sender,
recycler,
@ -170,6 +199,7 @@ impl ShredFetchStage {
shred_version,
"shred_fetch_repair",
PacketFlags::REPAIR,
Some((repair_socket, cluster_info)),
);
tvu_threads.extend(tvu_forwards_threads.into_iter());

View File

@ -152,6 +152,7 @@ impl Tvu {
fetch_sender,
tvu_config.shred_version,
bank_forks.clone(),
cluster_info.clone(),
exit,
);

View File

@ -883,7 +883,10 @@ impl Validator {
Some(stats_reporter_sender.clone()),
&exit,
);
let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone())));
let serve_repair = Arc::new(RwLock::new(ServeRepair::new(
cluster_info.clone(),
bank_forks.clone(),
)));
let serve_repair_service = ServeRepairService::new(
&serve_repair,
Some(blockstore.clone()),

View File

@ -49,7 +49,7 @@ use {
rpc_client::RpcClient,
tpu_connection::TpuConnection,
},
solana_core::serve_repair::RepairProtocol,
solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair},
solana_dos::cli::*,
solana_gossip::{
contact_info::ContactInfo,
@ -64,6 +64,7 @@ use {
stake,
system_instruction::{self, SystemInstruction},
system_program,
timing::timestamp,
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
@ -81,13 +82,6 @@ fn compute_rate_per_second(count: usize) -> usize {
(count * 1000) / SAMPLE_PERIOD_MS
}
fn get_repair_contact(nodes: &[ContactInfo]) -> ContactInfo {
let source = thread_rng().gen_range(0, nodes.len());
let mut contact = nodes[source].clone();
contact.id = solana_sdk::pubkey::new_rand();
contact
}
/// Provide functionality to generate several types of transactions:
///
/// 1. Without blockhash
@ -241,11 +235,11 @@ fn get_target(
nodes: &[ContactInfo],
mode: Mode,
entrypoint_addr: SocketAddr,
) -> Option<SocketAddr> {
) -> Option<(Pubkey, SocketAddr)> {
let mut target = None;
if nodes.is_empty() {
// skip-gossip case
target = Some(entrypoint_addr);
target = Some((solana_sdk::pubkey::new_rand(), entrypoint_addr));
} else {
info!("************ NODE ***********");
for node in nodes {
@ -257,13 +251,13 @@ fn get_target(
if node.gossip == entrypoint_addr {
info!("{}", node.gossip);
target = match mode {
Mode::Gossip => Some(node.gossip),
Mode::Tvu => Some(node.tvu),
Mode::TvuForwards => Some(node.tvu_forwards),
Mode::Tpu => Some(node.tpu),
Mode::TpuForwards => Some(node.tpu_forwards),
Mode::Repair => Some(node.repair),
Mode::ServeRepair => Some(node.serve_repair),
Mode::Gossip => Some((node.id, node.gossip)),
Mode::Tvu => Some((node.id, node.tvu)),
Mode::TvuForwards => Some((node.id, node.tvu_forwards)),
Mode::Tpu => Some((node.id, node.tpu)),
Mode::TpuForwards => Some((node.id, node.tpu_forwards)),
Mode::Repair => Some((node.id, node.repair)),
Mode::ServeRepair => Some((node.id, node.serve_repair)),
Mode::Rpc => None,
};
break;
@ -500,39 +494,47 @@ fn run_dos<T: 'static + BenchTpsClient + Send + Sync>(
} else if params.data_type == DataType::Transaction
&& params.transaction_params.unique_transactions
{
let target = target.expect("should have target");
info!("Targeting {}", target);
let (_, target_addr) = target.expect("should have target");
info!("Targeting {}", target_addr);
run_dos_transactions(
target,
target_addr,
iterations,
client,
params.transaction_params,
params.tpu_use_quic,
);
} else {
let target = target.expect("should have target");
info!("Targeting {}", target);
let (target_id, target_addr) = target.expect("should have target");
info!("Targeting {}", target_addr);
let mut data = match params.data_type {
DataType::RepairHighest => {
let slot = 100;
let req =
RepairProtocol::WindowIndexWithNonce(get_repair_contact(nodes), slot, 0, 0);
bincode::serialize(&req).unwrap()
let keypair = Keypair::new();
let header = RepairRequestHeader::new(keypair.pubkey(), target_id, timestamp(), 0);
let req = RepairProtocol::WindowIndex {
header,
slot,
shred_index: 0,
};
ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap()
}
DataType::RepairShred => {
let slot = 100;
let req = RepairProtocol::HighestWindowIndexWithNonce(
get_repair_contact(nodes),
let keypair = Keypair::new();
let header = RepairRequestHeader::new(keypair.pubkey(), target_id, timestamp(), 0);
let req = RepairProtocol::HighestWindowIndex {
header,
slot,
0,
0,
);
bincode::serialize(&req).unwrap()
shred_index: 0,
};
ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap()
}
DataType::RepairOrphan => {
let slot = 100;
let req = RepairProtocol::OrphanWithNonce(get_repair_contact(nodes), slot, 0);
bincode::serialize(&req).unwrap()
let keypair = Keypair::new();
let header = RepairRequestHeader::new(keypair.pubkey(), target_id, timestamp(), 0);
let req = RepairProtocol::Orphan { header, slot };
ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap()
}
DataType::Random => {
vec![0; params.data_size]
@ -574,7 +576,7 @@ fn run_dos<T: 'static + BenchTpsClient + Send + Sync>(
if params.data_type == DataType::Random {
thread_rng().fill(&mut data[..]);
}
let res = socket.send_to(&data, target);
let res = socket.send_to(&data, target_addr);
if res.is_err() {
error_count += 1;
}

View File

@ -109,6 +109,10 @@ impl Pong {
};
Ok(pong)
}
pub fn from(&self) -> &Pubkey {
&self.from
}
}
impl Sanitize for Pong {

View File

@ -30,6 +30,8 @@ pub struct ProcessShredsStats {
pub struct ShredFetchStats {
pub index_overrun: usize,
pub shred_count: usize,
pub ping_count: usize,
pub ping_err_verify_count: usize,
pub(crate) index_bad_deserialize: usize,
pub(crate) index_out_of_bounds: usize,
pub(crate) slot_bad_deserialize: usize,
@ -115,6 +117,8 @@ impl ShredFetchStats {
name,
("index_overrun", self.index_overrun, i64),
("shred_count", self.shred_count, i64),
("ping_count", self.ping_count, i64),
("ping_err_verify_count", self.ping_err_verify_count, i64),
("slot_bad_deserialize", self.slot_bad_deserialize, i64),
("index_bad_deserialize", self.index_bad_deserialize, i64),
("index_out_of_bounds", self.index_out_of_bounds, i64),

View File

@ -484,6 +484,10 @@ pub mod compact_vote_state_updates {
solana_sdk::declare_id!("86HpNqzutEZwLcPxS6EHDcMNYWk6ikhteg9un7Y2PBKE");
}
pub mod sign_repair_requests {
solana_sdk::declare_id!("sigrs6u1EWeHuoKFkY8RR7qcSsPmrAeBBPESyf5pnYe");
}
pub mod concurrent_replay_of_forks {
solana_sdk::declare_id!("9F2Dcu8xkBPKxiiy65XKPZYdCG3VZDpjDTuSmeYLozJe");
}
@ -607,6 +611,7 @@ lazy_static! {
(loosen_cpi_size_restriction::id(), "loosen cpi size restrictions #26641"),
(use_default_units_in_fee_calculation::id(), "use default units per instruction in fee calculation #26785"),
(compact_vote_state_updates::id(), "Compact vote state updates to lower block size"),
(sign_repair_requests::id(), "sign repair requests #26834"),
(concurrent_replay_of_forks::id(), "Allow slots from different forks to be replayed concurrently #26465"),
(incremental_snapshot_only_incremental_hash_calculation::id(), "only hash accounts in incremental snapshot during incremental snapshot creation #26799"),
/*************** ADD NEW FEATURES HERE ***************/