use signed repair request variants (#28283)

This commit is contained in:
Jeff Biseda 2022-10-10 14:09:45 -07:00 committed by GitHub
parent 2929c8f7a2
commit 15050b14b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 245 deletions

View File

@ -643,7 +643,6 @@ impl AncestorHashesService {
repair_stats,
outstanding_requests,
identity_keypair,
&root_bank,
) {
request_throttle.push(timestamp());
repairable_dead_slot_pool.take(&slot).unwrap();
@ -719,7 +718,6 @@ impl AncestorHashesService {
repair_stats: &mut AncestorRepairRequestsStats,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
identity_keypair: &Keypair,
root_bank: &Bank,
) -> bool {
let sampled_validators = serve_repair.repair_request_ancestor_hashes_sample_peers(
duplicate_slot,
@ -738,7 +736,6 @@ impl AncestorHashesService {
.add_request(AncestorHashesRepairType(duplicate_slot), timestamp());
let request_bytes = serve_repair.ancestor_repair_request_bytes(
identity_keypair,
root_bank,
pubkey,
duplicate_slot,
nonce,
@ -1164,7 +1161,6 @@ mod test {
} = ManageAncestorHashesState::new(vote_simulator.bank_forks);
let RepairInfo {
bank_forks,
cluster_info: requester_cluster_info,
cluster_slots,
repair_validators,
@ -1181,7 +1177,6 @@ mod test {
&mut repair_stats,
&outstanding_requests,
&requester_cluster_info.keypair(),
&bank_forks.read().unwrap().root_bank(),
);
assert!(ancestor_hashes_request_statuses.is_empty());
@ -1200,7 +1195,6 @@ mod test {
&mut repair_stats,
&outstanding_requests,
&requester_cluster_info.keypair(),
&bank_forks.read().unwrap().root_bank(),
);
assert_eq!(ancestor_hashes_request_statuses.len(), 1);

View File

@ -272,9 +272,6 @@ impl RepairService {
let mut add_votes_elapsed;
let root_bank = repair_info.bank_forks.read().unwrap().root_bank();
let sign_repair_requests_feature_epoch =
ServeRepair::sign_repair_requests_activated_epoch(&root_bank);
let repairs = {
let new_root = root_bank.slot();
@ -331,16 +328,6 @@ impl RepairService {
repairs
.iter()
.filter_map(|repair_request| {
let sign_repair_request = ServeRepair::should_sign_repair_request(
repair_request.slot(),
&root_bank,
sign_repair_requests_feature_epoch,
);
let maybe_keypair = if sign_repair_request {
Some(identity_keypair)
} else {
None
};
let (to, req) = serve_repair
.repair_request(
&repair_info.cluster_slots,
@ -349,7 +336,7 @@ impl RepairService {
&mut repair_stats,
&repair_info.repair_validators,
&mut outstanding_requests,
maybe_keypair,
identity_keypair,
)
.ok()?;
Some((req, to))
@ -617,6 +604,7 @@ impl RepairService {
repair_socket: &UdpSocket,
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
identity_keypair: &Keypair,
) {
duplicate_slot_repair_statuses.retain(|slot, status| {
Self::update_duplicate_slot_repair_addr(
@ -641,6 +629,7 @@ impl RepairService {
serve_repair,
repair_stats,
nonce,
identity_keypair,
) {
info!(
"repair req send_to {} ({}) error {:?}",
@ -667,13 +656,14 @@ impl RepairService {
serve_repair: &ServeRepair,
repair_stats: &mut RepairStats,
nonce: Nonce,
identity_keypair: &Keypair,
) -> Result<()> {
let req = serve_repair.map_repair_request(
repair_type,
repair_pubkey,
repair_stats,
nonce,
None,
identity_keypair,
)?;
repair_socket.send_to(&req, to)?;
Ok(())
@ -1091,10 +1081,9 @@ mod test {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let cluster_slots = ClusterSlots::default();
let serve_repair = ServeRepair::new(
Arc::new(new_test_cluster_info(Node::new_localhost().info)),
bank_forks,
);
let cluster_info = Arc::new(new_test_cluster_info(Node::new_localhost().info));
let identity_keypair = cluster_info.keypair().clone();
let serve_repair = ServeRepair::new(cluster_info, bank_forks);
let mut duplicate_slot_repair_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
@ -1129,6 +1118,7 @@ mod test {
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
);
assert!(duplicate_slot_repair_statuses
.get(&dead_slot)
@ -1154,6 +1144,7 @@ mod test {
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
);
assert_eq!(duplicate_slot_repair_statuses.len(), 1);
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
@ -1172,6 +1163,7 @@ mod test {
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
);
assert!(duplicate_slot_repair_statuses.is_empty());
}

View File

@ -29,16 +29,14 @@ use {
data_budget::DataBudget,
packet::{Packet, PacketBatch, PacketBatchRecycler},
},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::Slot,
feature_set::sign_repair_requests,
hash::{Hash, HASH_BYTES},
packet::PACKET_DATA_SIZE,
pubkey::{Pubkey, PUBKEY_BYTES},
signature::{Signable, Signature, Signer, SIGNATURE_BYTES},
signer::keypair::Keypair,
stake_history::Epoch,
timing::{duration_as_ms, timestamp},
},
solana_streamer::{
@ -317,10 +315,6 @@ impl ServeRepair {
}
}
fn my_info(&self) -> ContactInfo {
self.cluster_info.my_contact_info()
}
pub(crate) fn my_id(&self) -> Pubkey {
self.cluster_info.id()
}
@ -429,24 +423,6 @@ impl ServeRepair {
}
}
pub(crate) fn sign_repair_requests_activated_epoch(root_bank: &Bank) -> Option<Epoch> {
root_bank
.feature_set
.activated_slot(&sign_repair_requests::id())
.map(|slot| root_bank.epoch_schedule().get_epoch(slot))
}
pub(crate) fn should_sign_repair_request(
slot: Slot,
root_bank: &Bank,
sign_repairs_epoch: Option<Epoch>,
) -> bool {
match sign_repairs_epoch {
None => false,
Some(feature_epoch) => feature_epoch < root_bank.epoch_schedule().get_epoch(slot),
}
}
/// Process messages from the network
fn run_listen(
&self,
@ -755,38 +731,22 @@ impl ServeRepair {
pub fn ancestor_repair_request_bytes(
&self,
keypair: &Keypair,
root_bank: &Bank,
repair_peer_id: &Pubkey,
request_slot: Slot,
nonce: Nonce,
) -> Result<Vec<u8>> {
let sign_repairs_epoch = Self::sign_repair_requests_activated_epoch(root_bank);
let require_sig =
Self::should_sign_repair_request(request_slot, root_bank, sign_repairs_epoch);
let (request_proto, maybe_keypair) = if require_sig {
let header = RepairRequestHeader {
signature: Signature::default(),
sender: self.my_id(),
recipient: *repair_peer_id,
timestamp: timestamp(),
nonce,
};
(
RepairProtocol::AncestorHashes {
header,
slot: request_slot,
},
Some(keypair),
)
} else {
(
RepairProtocol::LegacyAncestorHashes(self.my_info(), request_slot, nonce),
None,
)
let header = RepairRequestHeader {
signature: Signature::default(),
sender: self.my_id(),
recipient: *repair_peer_id,
timestamp: timestamp(),
nonce,
};
Self::repair_proto_to_bytes(&request_proto, maybe_keypair)
let request = RepairProtocol::AncestorHashes {
header,
slot: request_slot,
};
Self::repair_proto_to_bytes(&request, keypair)
}
pub(crate) fn repair_request(
@ -797,7 +757,7 @@ impl ServeRepair {
repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &mut OutstandingShredRepairs,
identity_keypair: Option<&Keypair>,
identity_keypair: &Keypair,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
@ -873,67 +833,41 @@ impl ServeRepair {
repair_peer_id: &Pubkey,
repair_stats: &mut RepairStats,
nonce: Nonce,
identity_keypair: Option<&Keypair>,
identity_keypair: &Keypair,
) -> Result<Vec<u8>> {
let header = if identity_keypair.is_some() {
Some(RepairRequestHeader {
signature: Signature::default(),
sender: self.my_id(),
recipient: *repair_peer_id,
timestamp: timestamp(),
nonce,
})
} else {
None
let header = RepairRequestHeader {
signature: Signature::default(),
sender: self.my_id(),
recipient: *repair_peer_id,
timestamp: timestamp(),
nonce,
};
let request_proto = match repair_request {
ShredRepairType::Shred(slot, shred_index) => {
repair_stats
.shred
.update(repair_peer_id, *slot, *shred_index);
if let Some(header) = header {
RepairProtocol::WindowIndex {
header,
slot: *slot,
shred_index: *shred_index,
}
} else {
RepairProtocol::LegacyWindowIndexWithNonce(
self.my_info(),
*slot,
*shred_index,
nonce,
)
RepairProtocol::WindowIndex {
header,
slot: *slot,
shred_index: *shred_index,
}
}
ShredRepairType::HighestShred(slot, shred_index) => {
repair_stats
.highest_shred
.update(repair_peer_id, *slot, *shred_index);
if let Some(header) = header {
RepairProtocol::HighestWindowIndex {
header,
slot: *slot,
shred_index: *shred_index,
}
} else {
RepairProtocol::LegacyHighestWindowIndexWithNonce(
self.my_info(),
*slot,
*shred_index,
nonce,
)
RepairProtocol::HighestWindowIndex {
header,
slot: *slot,
shred_index: *shred_index,
}
}
ShredRepairType::Orphan(slot) => {
repair_stats.orphan.update(repair_peer_id, *slot, 0);
if let Some(header) = header {
RepairProtocol::Orphan {
header,
slot: *slot,
}
} else {
RepairProtocol::LegacyOrphanWithNonce(self.my_info(), *slot, nonce)
RepairProtocol::Orphan {
header,
slot: *slot,
}
}
};
@ -994,17 +928,12 @@ impl ServeRepair {
}
}
pub fn repair_proto_to_bytes(
request: &RepairProtocol,
keypair: Option<&Keypair>,
) -> Result<Vec<u8>> {
pub fn repair_proto_to_bytes(request: &RepairProtocol, keypair: &Keypair) -> Result<Vec<u8>> {
debug_assert!(request.supports_signature());
let mut payload = serialize(&request)?;
if let Some(keypair) = keypair {
debug_assert!(request.supports_signature());
let signable_data = [&payload[..4], &payload[4 + SIGNATURE_BYTES..]].concat();
let signature = keypair.sign_message(&signable_data[..]);
payload[4..4 + SIGNATURE_BYTES].copy_from_slice(signature.as_ref());
}
let signable_data = [&payload[..4], &payload[4 + SIGNATURE_BYTES..]].concat();
let signature = keypair.sign_message(&signable_data[..]);
payload[4..4 + SIGNATURE_BYTES].copy_from_slice(signature.as_ref());
Ok(payload)
}
@ -1233,7 +1162,7 @@ mod tests {
&repair_peer_id,
&mut RepairStats::default(),
456,
Some(&keypair),
&keypair,
)
.unwrap();
@ -1257,7 +1186,7 @@ mod tests {
#[test]
fn test_serialize_deserialize_ancestor_hashes_request() {
let slot = 50;
let slot: Slot = 50;
let nonce = 70;
let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(new_test_cluster_info(me));
@ -1268,11 +1197,10 @@ mod tests {
let mut bank = Bank::new_for_tests(&genesis_config);
bank.feature_set = Arc::new(FeatureSet::all_enabled());
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks.clone());
let serve_repair = ServeRepair::new(cluster_info, bank_forks);
let root_bank = bank_forks.read().unwrap().root_bank();
let request_bytes = serve_repair
.ancestor_repair_request_bytes(&keypair, &root_bank, &repair_peer_id, slot, nonce)
.ancestor_repair_request_bytes(&keypair, &repair_peer_id, slot, nonce)
.unwrap();
let mut cursor = Cursor::new(&request_bytes[..]);
let deserialized_request: RepairProtocol =
@ -1294,35 +1222,10 @@ mod tests {
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
let mut bank = Bank::new_for_tests(&genesis_config);
let mut feature_set = FeatureSet::all_enabled();
feature_set.deactivate(&sign_repair_requests::id());
bank.feature_set = Arc::new(feature_set);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let serve_repair = ServeRepair::new(cluster_info, bank_forks.clone());
let root_bank = bank_forks.read().unwrap().root_bank();
let request_bytes = serve_repair
.ancestor_repair_request_bytes(&keypair, &root_bank, &repair_peer_id, slot, nonce)
.unwrap();
let mut cursor = Cursor::new(&request_bytes[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), request_bytes.len() as u64);
if let RepairProtocol::LegacyAncestorHashes(ci, deserialized_slot, deserialized_nonce) =
deserialized_request
{
assert_eq!(slot, deserialized_slot);
assert_eq!(nonce, deserialized_nonce);
assert_eq!(&serve_repair.my_id(), &ci.id);
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
}
#[test]
fn test_map_requests_signed_unsigned() {
fn test_map_requests_signed() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
@ -1337,20 +1240,20 @@ mod tests {
let nonce = 70;
let request = ShredRepairType::Shred(slot, shred_index);
let rsp = serve_repair
let request_bytes = serve_repair
.map_repair_request(
&request,
&repair_peer_id,
&mut RepairStats::default(),
nonce,
Some(&keypair),
&keypair,
)
.unwrap();
let mut cursor = Cursor::new(&rsp[..]);
let mut cursor = Cursor::new(&request_bytes[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), rsp.len() as u64);
assert_eq!(cursor.position(), request_bytes.len() as u64);
if let RepairProtocol::WindowIndex {
header,
slot: deserialized_slot,
@ -1362,7 +1265,7 @@ mod tests {
assert_eq!(header.nonce, nonce);
assert_eq!(&header.sender, &serve_repair.my_id());
assert_eq!(&header.recipient, &repair_peer_id);
let signed_data = [&rsp[..4], &rsp[4 + SIGNATURE_BYTES..]].concat();
let signed_data = [&request_bytes[..4], &request_bytes[4 + SIGNATURE_BYTES..]].concat();
assert!(header
.signature
.verify(keypair.pubkey().as_ref(), &signed_data));
@ -1370,50 +1273,21 @@ mod tests {
panic!("unexpected request type {:?}", &deserialized_request);
}
let rsp = serve_repair
.map_repair_request(
&request,
&repair_peer_id,
&mut RepairStats::default(),
nonce,
None,
)
.unwrap();
let mut cursor = Cursor::new(&rsp[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), rsp.len() as u64);
if let RepairProtocol::LegacyWindowIndexWithNonce(
ci,
deserialized_slot,
deserialized_shred_index,
deserialized_nonce,
) = deserialized_request
{
assert_eq!(slot, deserialized_slot);
assert_eq!(shred_index, deserialized_shred_index);
assert_eq!(nonce, deserialized_nonce);
assert_eq!(&serve_repair.my_id(), &ci.id);
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
let request = ShredRepairType::HighestShred(slot, shred_index);
let rsp = serve_repair
let request_bytes = serve_repair
.map_repair_request(
&request,
&repair_peer_id,
&mut RepairStats::default(),
nonce,
Some(&keypair),
&keypair,
)
.unwrap();
let mut cursor = Cursor::new(&rsp[..]);
let mut cursor = Cursor::new(&request_bytes[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), rsp.len() as u64);
assert_eq!(cursor.position(), request_bytes.len() as u64);
if let RepairProtocol::HighestWindowIndex {
header,
slot: deserialized_slot,
@ -1425,42 +1299,13 @@ mod tests {
assert_eq!(header.nonce, nonce);
assert_eq!(&header.sender, &serve_repair.my_id());
assert_eq!(&header.recipient, &repair_peer_id);
let signed_data = [&rsp[..4], &rsp[4 + SIGNATURE_BYTES..]].concat();
let signed_data = [&request_bytes[..4], &request_bytes[4 + SIGNATURE_BYTES..]].concat();
assert!(header
.signature
.verify(keypair.pubkey().as_ref(), &signed_data));
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
let rsp = serve_repair
.map_repair_request(
&request,
&repair_peer_id,
&mut RepairStats::default(),
nonce,
None,
)
.unwrap();
let mut cursor = Cursor::new(&rsp[..]);
let deserialized_request: RepairProtocol =
deserialize_from_with_limit(&mut cursor).unwrap();
assert_eq!(cursor.position(), rsp.len() as u64);
if let RepairProtocol::LegacyHighestWindowIndexWithNonce(
ci,
deserialized_slot,
deserialized_shred_index,
deserialized_nonce,
) = deserialized_request
{
assert_eq!(slot, deserialized_slot);
assert_eq!(shred_index, deserialized_shred_index);
assert_eq!(nonce, deserialized_nonce);
assert_eq!(&serve_repair.my_id(), &ci.id);
} else {
panic!("unexpected request type {:?}", &deserialized_request);
}
}
#[test]
@ -1687,6 +1532,7 @@ mod tests {
let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(new_test_cluster_info(me));
let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks);
let identity_keypair = cluster_info.keypair().clone();
let mut outstanding_requests = OutstandingShredRepairs::default();
let rv = serve_repair.repair_request(
&cluster_slots,
@ -1695,7 +1541,7 @@ mod tests {
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
None,
&identity_keypair,
);
assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers)));
@ -1724,7 +1570,7 @@ mod tests {
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
None,
&identity_keypair,
)
.unwrap();
assert_eq!(nxt.serve_repair, serve_repair_addr);
@ -1759,7 +1605,7 @@ mod tests {
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
None,
&identity_keypair,
)
.unwrap();
if rv.0 == serve_repair_addr {
@ -2015,6 +1861,7 @@ mod tests {
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
cluster_info.insert_info(contact_info2.clone());
cluster_info.insert_info(contact_info3.clone());
let identity_keypair = cluster_info.keypair().clone();
let serve_repair = ServeRepair::new(cluster_info, bank_forks);
// If:
@ -2032,7 +1879,7 @@ mod tests {
&mut RepairStats::default(),
&known_validators,
&mut OutstandingShredRepairs::default(),
None,
&identity_keypair,
)
.is_err());
}
@ -2050,7 +1897,7 @@ mod tests {
&mut RepairStats::default(),
&known_validators,
&mut OutstandingShredRepairs::default(),
None,
&identity_keypair,
)
.is_ok());
@ -2072,7 +1919,7 @@ mod tests {
&mut RepairStats::default(),
&None,
&mut OutstandingShredRepairs::default(),
None,
&identity_keypair,
)
.is_ok());
}

View File

@ -646,7 +646,7 @@ fn run_dos<T: 'static + BenchTpsClient + Send + Sync>(
slot,
shred_index: 0,
};
ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap()
ServeRepair::repair_proto_to_bytes(&req, &keypair).unwrap()
}
DataType::RepairShred => {
let slot = 100;
@ -657,14 +657,14 @@ fn run_dos<T: 'static + BenchTpsClient + Send + Sync>(
slot,
shred_index: 0,
};
ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap()
ServeRepair::repair_proto_to_bytes(&req, &keypair).unwrap()
}
DataType::RepairOrphan => {
let slot = 100;
let keypair = Keypair::new();
let header = RepairRequestHeader::new(keypair.pubkey(), target_id, timestamp(), 0);
let req = RepairProtocol::Orphan { header, slot };
ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap()
ServeRepair::repair_proto_to_bytes(&req, &keypair).unwrap()
}
DataType::Random => {
vec![0; params.data_size]

View File

@ -482,10 +482,6 @@ pub mod compact_vote_state_updates {
solana_sdk::declare_id!("86HpNqzutEZwLcPxS6EHDcMNYWk6ikhteg9un7Y2PBKE");
}
pub mod sign_repair_requests {
solana_sdk::declare_id!("sigrs6u1EWeHuoKFkY8RR7qcSsPmrAeBBPESyf5pnYe");
}
pub mod incremental_snapshot_only_incremental_hash_calculation {
solana_sdk::declare_id!("25vqsfjk7Nv1prsQJmA4Xu1bN61s8LXCBGUPp8Rfy1UF");
}
@ -641,7 +637,6 @@ lazy_static! {
(loosen_cpi_size_restriction::id(), "loosen cpi size restrictions #26641"),
(use_default_units_in_fee_calculation::id(), "use default units per instruction in fee calculation #26785"),
(compact_vote_state_updates::id(), "Compact vote state updates to lower block size"),
(sign_repair_requests::id(), "sign repair requests #26834"),
(incremental_snapshot_only_incremental_hash_calculation::id(), "only hash accounts in incremental snapshot during incremental snapshot creation #26799"),
(disable_cpi_setting_executable_and_rent_epoch::id(), "disable setting is_executable and_rent_epoch in CPI #26987"),
(relax_authority_signer_check_for_lookup_table_creation::id(), "relax authority signer check for lookup table creation #27205"),