Add AncestorHashesRepair type (#18681)

This commit is contained in:
carllin 2021-07-15 19:29:53 -07:00 committed by GitHub
parent 37ee0b5599
commit 8a846b048e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 300 additions and 101 deletions

1
Cargo.lock generated
View File

@ -4626,6 +4626,7 @@ dependencies = [
"solana-transaction-status",
"solana-version",
"solana-vote-program",
"static_assertions",
"symlink",
"systemstat",
"tempfile",

View File

@ -82,6 +82,7 @@ serde_json = "1.0.56"
serial_test = "0.5.1"
solana-stake-program = { path = "../programs/stake", version = "=1.8.0" }
solana-version = { path = "../version", version = "=1.8.0" }
static_assertions = "1.1.0"
symlink = "0.1.0"
systemstat = "0.1.8"
tokio_02 = { version = "0.2", package = "tokio", features = ["full"] }

View File

@ -1,4 +1,4 @@
use crate::serve_repair::RepairType;
use crate::serve_repair::ShredRepairType;
use itertools::Itertools;
use solana_gossip::{
cluster_info::ClusterInfo, contact_info::ContactInfo, crds::Cursor, epoch_slots::EpochSlots,
@ -186,14 +186,14 @@ impl ClusterSlots {
&self,
self_id: &Pubkey,
root: Slot,
) -> Vec<RepairType> {
) -> Vec<ShredRepairType> {
let my_slots = self.collect(self_id);
self.cluster_slots
.read()
.unwrap()
.keys()
.filter(|x| **x > root && !my_slots.contains(*x))
.map(|x| RepairType::HighestShred(*x, 0))
.map(|x| ShredRepairType::HighestShred(*x, 0))
.collect()
}
}
@ -390,7 +390,7 @@ mod tests {
let self_id = solana_sdk::pubkey::new_rand();
assert_eq!(
cs.generate_repairs_for_missing_slots(&self_id, 0),
vec![RepairType::HighestShred(1, 0)]
vec![ShredRepairType::HighestShred(1, 0)]
)
}

View File

@ -73,13 +73,13 @@ pub struct RequestStatus<T> {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::serve_repair::RepairType;
use crate::serve_repair::ShredRepairType;
use solana_ledger::shred::Shred;
use solana_sdk::timing::timestamp;
#[test]
fn test_add_request() {
let repair_type = RepairType::Orphan(9);
let repair_type = ShredRepairType::Orphan(9);
let mut outstanding_requests = OutstandingRequests::default();
let nonce = outstanding_requests.add_request(repair_type, timestamp());
let request_status = outstanding_requests.requests.get(&nonce).unwrap();
@ -92,7 +92,7 @@ pub(crate) mod tests {
#[test]
fn test_timeout_expired_remove() {
let repair_type = RepairType::Orphan(9);
let repair_type = ShredRepairType::Orphan(9);
let mut outstanding_requests = OutstandingRequests::default();
let nonce = outstanding_requests.add_request(repair_type, timestamp());
let shred = Shred::new_empty_data_shred();
@ -109,7 +109,7 @@ pub(crate) mod tests {
#[test]
fn test_register_response() {
let repair_type = RepairType::Orphan(9);
let repair_type = ShredRepairType::Orphan(9);
let mut outstanding_requests = OutstandingRequests::default();
let nonce = outstanding_requests.add_request(repair_type, timestamp());

View File

@ -17,23 +17,23 @@ pub fn repair_response_packet(
.get_data_shred(slot, shred_index)
.expect("Blockstore could not get data shred");
shred
.map(|shred| repair_response_packet_from_shred(shred, dest, nonce))
.map(|shred| repair_response_packet_from_bytes(shred, dest, nonce))
.unwrap_or(None)
}
pub fn repair_response_packet_from_shred(
shred: Vec<u8>,
pub fn repair_response_packet_from_bytes(
bytes: Vec<u8>,
dest: &SocketAddr,
nonce: Nonce,
) -> Option<Packet> {
let mut packet = Packet::default();
packet.meta.size = shred.len() + SIZE_OF_NONCE;
packet.meta.size = bytes.len() + SIZE_OF_NONCE;
if packet.meta.size > packet.data.len() {
return None;
}
packet.meta.set_addr(dest);
packet.data[..shred.len()].copy_from_slice(&shred);
let mut wr = io::Cursor::new(&mut packet.data[shred.len()..]);
packet.data[..bytes.len()].copy_from_slice(&bytes);
let mut wr = io::Cursor::new(&mut packet.data[bytes.len()..]);
bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce");
Some(packet)
}
@ -77,7 +77,7 @@ mod test {
Shredder::sign_shred(&keypair, &mut shred);
trace!("signature {}", shred.common_header.signature);
let nonce = 9;
let mut packet = repair_response_packet_from_shred(
let mut packet = repair_response_packet_from_bytes(
shred.payload,
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
nonce,

View File

@ -7,7 +7,7 @@ use crate::{
repair_weight::RepairWeight,
replay_stage::DUPLICATE_THRESHOLD,
result::Result,
serve_repair::{RepairType, ServeRepair, REPAIR_PEERS_CACHE_CAPACITY},
serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use lru::LruCache;
@ -40,8 +40,7 @@ pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type OutstandingRepairs = OutstandingRequests<RepairType>;
pub type OutstandingRepairs = OutstandingRequests<ShredRepairType>;
#[derive(Default, Debug)]
pub struct SlotRepairs {
@ -366,9 +365,9 @@ impl RepairService {
blockstore: &Blockstore,
max_repairs: usize,
repair_range: &RepairSlotRange,
) -> Result<Vec<RepairType>> {
) -> Result<Vec<ShredRepairType>> {
// Slot height and shred indexes for shreds we want to repair
let mut repairs: Vec<RepairType> = vec![];
let mut repairs: Vec<ShredRepairType> = vec![];
for slot in repair_range.start..=repair_range.end {
if repairs.len() >= max_repairs {
break;
@ -399,11 +398,11 @@ impl RepairService {
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<RepairType> {
) -> Vec<ShredRepairType> {
if max_repairs == 0 || slot_meta.is_full() {
vec![]
} else if slot_meta.consumed == slot_meta.received {
vec![RepairType::HighestShred(slot, slot_meta.received)]
vec![ShredRepairType::HighestShred(slot, slot_meta.received)]
} else {
let reqs = blockstore.find_missing_data_indexes(
slot,
@ -413,7 +412,7 @@ impl RepairService {
max_repairs,
);
reqs.into_iter()
.map(|i| RepairType::Shred(slot, i))
.map(|i| ShredRepairType::Shred(slot, i))
.collect()
}
}
@ -421,7 +420,7 @@ impl RepairService {
/// Repairs any fork starting at the input slot
pub fn generate_repairs_for_fork<'a>(
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
repairs: &mut Vec<ShredRepairType>,
max_repairs: usize,
slot: Slot,
duplicate_slot_repair_statuses: &impl Contains<'a, Slot>,
@ -453,7 +452,7 @@ impl RepairService {
fn generate_duplicate_repairs_for_slot(
blockstore: &Blockstore,
slot: Slot,
) -> Option<Vec<RepairType>> {
) -> Option<Vec<ShredRepairType>> {
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
if slot_meta.is_full() {
// If the slot is full, no further need to repair this slot
@ -527,7 +526,7 @@ impl RepairService {
#[allow(dead_code)]
fn serialize_and_send_request(
repair_type: &RepairType,
repair_type: &ShredRepairType,
repair_socket: &UdpSocket,
repair_pubkey: &Pubkey,
to: &SocketAddr,
@ -709,7 +708,10 @@ mod test {
&HashSet::default(),
None,
),
vec![RepairType::Orphan(2), RepairType::HighestShred(0, 0)]
vec![
ShredRepairType::Orphan(2),
ShredRepairType::HighestShred(0, 0)
]
);
}
@ -740,7 +742,7 @@ mod test {
&HashSet::default(),
None
),
vec![RepairType::HighestShred(0, 0)]
vec![ShredRepairType::HighestShred(0, 0)]
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
@ -776,11 +778,11 @@ mod test {
.unwrap();
// sleep so that the holes are ready for repair
sleep(Duration::from_secs(1));
let expected: Vec<RepairType> = (0..num_slots)
let expected: Vec<ShredRepairType> = (0..num_slots)
.flat_map(|slot| {
missing_indexes_per_slot
.iter()
.map(move |shred_index| RepairType::Shred(slot as u64, *shred_index))
.map(move |shred_index| ShredRepairType::Shred(slot as u64, *shred_index))
})
.collect();
@ -832,8 +834,8 @@ mod test {
blockstore.insert_shreds(shreds, None, false).unwrap();
// We didn't get the last shred for this slot, so ask for the highest shred for that slot
let expected: Vec<RepairType> =
vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)];
let expected: Vec<ShredRepairType> =
vec![ShredRepairType::HighestShred(0, num_shreds_per_slot - 1)];
let mut repair_weight = RepairWeight::new(0);
assert_eq!(
@ -876,13 +878,13 @@ mod test {
start: slots[start],
end: slots[end],
};
let expected: Vec<RepairType> = (repair_slot_range.start
let expected: Vec<ShredRepairType> = (repair_slot_range.start
..=repair_slot_range.end)
.map(|slot_index| {
if slots.contains(&(slot_index as u64)) {
RepairType::Shred(slot_index as u64, 0)
ShredRepairType::Shred(slot_index as u64, 0)
} else {
RepairType::HighestShred(slot_index as u64, 0)
ShredRepairType::HighestShred(slot_index as u64, 0)
}
})
.collect();
@ -922,10 +924,10 @@ mod test {
}
let end = 4;
let expected: Vec<RepairType> = vec![
RepairType::HighestShred(end - 2, 0),
RepairType::HighestShred(end - 1, 0),
RepairType::HighestShred(end, 0),
let expected: Vec<ShredRepairType> = vec![
ShredRepairType::HighestShred(end - 2, 0),
ShredRepairType::HighestShred(end - 1, 0),
ShredRepairType::HighestShred(end, 0),
];
let repair_slot_range = RepairSlotRange { start: 2, end };

View File

@ -1,6 +1,6 @@
use crate::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairTiming,
repair_weighted_traversal, serve_repair::RepairType, tree_diff::TreeDiff,
repair_weighted_traversal, serve_repair::ShredRepairType, tree_diff::TreeDiff,
};
use solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore};
use solana_measure::measure::Measure;
@ -147,7 +147,7 @@ impl RepairWeight {
max_new_shreds: usize,
ignore_slots: &impl Contains<'a, Slot>,
repair_timing: Option<&mut RepairTiming>,
) -> Vec<RepairType> {
) -> Vec<ShredRepairType> {
let mut repairs = vec![];
let mut get_best_orphans_elapsed = Measure::start("get_best_orphans");
// Update the orphans in order from heaviest to least heavy
@ -248,7 +248,7 @@ impl RepairWeight {
fn get_best_shreds<'a>(
&mut self,
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
repairs: &mut Vec<ShredRepairType>,
max_new_shreds: usize,
ignore_slots: &impl Contains<'a, Slot>,
) {
@ -265,7 +265,7 @@ impl RepairWeight {
fn get_best_orphans(
&mut self,
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
repairs: &mut Vec<ShredRepairType>,
epoch_stakes: &HashMap<Epoch, EpochStakes>,
epoch_schedule: &EpochSchedule,
max_new_orphans: usize,
@ -306,7 +306,7 @@ impl RepairWeight {
if let Some(new_orphan_root) = new_orphan_root {
if new_orphan_root != self.root && !best_orphans.contains(&new_orphan_root) {
best_orphans.insert(new_orphan_root);
repairs.push(RepairType::Orphan(new_orphan_root));
repairs.push(ShredRepairType::Orphan(new_orphan_root));
}
}
}
@ -317,7 +317,7 @@ impl RepairWeight {
if best_orphans.len() < max_new_orphans {
for new_orphan in blockstore.orphans_iterator(self.root + 1).unwrap() {
if !best_orphans.contains(&new_orphan) {
repairs.push(RepairType::Orphan(new_orphan));
repairs.push(ShredRepairType::Orphan(new_orphan));
best_orphans.insert(new_orphan);
}

View File

@ -1,6 +1,6 @@
use crate::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairService,
serve_repair::RepairType, tree_diff::TreeDiff,
serve_repair::ShredRepairType, tree_diff::TreeDiff,
};
use solana_ledger::blockstore::Blockstore;
use solana_runtime::contains::Contains;
@ -73,7 +73,7 @@ impl<'a> Iterator for RepairWeightTraversal<'a> {
pub fn get_best_repair_shreds<'a>(
tree: &HeaviestSubtreeForkChoice,
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
repairs: &mut Vec<ShredRepairType>,
max_new_shreds: usize,
ignore_slots: &impl Contains<'a, Slot>,
) {
@ -227,7 +227,7 @@ pub mod test {
repairs,
[0, 1, 2, 4, 3, 5]
.iter()
.map(|slot| RepairType::HighestShred(*slot, last_shred))
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
@ -254,7 +254,7 @@ pub mod test {
repairs,
[0, 1, 2, 4, 6, 7]
.iter()
.map(|slot| RepairType::HighestShred(*slot, last_shred))
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
@ -289,7 +289,7 @@ pub mod test {
repairs,
[1, 7, 3, 5]
.iter()
.map(|slot| RepairType::HighestShred(*slot, last_shred))
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
@ -308,7 +308,7 @@ pub mod test {
repairs,
[1, 7, 8, 3]
.iter()
.map(|slot| RepairType::HighestShred(*slot, last_shred))
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
}
@ -332,7 +332,7 @@ pub mod test {
repairs,
[0, 1, 2, 4, 6, 7, 3, 5]
.iter()
.map(|slot| RepairType::HighestShred(*slot, last_shred))
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
}
@ -357,7 +357,7 @@ pub mod test {
repairs,
[0, 2, 4, 5]
.iter()
.map(|slot| RepairType::HighestShred(*slot, last_shred))
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
@ -377,7 +377,7 @@ pub mod test {
repairs,
[0, 4, 6, 7, 5]
.iter()
.map(|slot| RepairType::HighestShred(*slot, last_shred))
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
@ -396,7 +396,7 @@ pub mod test {
repairs,
[0, 4, 5]
.iter()
.map(|slot| RepairType::HighestShred(*slot, last_shred))
.map(|slot| ShredRepairType::HighestShred(*slot, last_shred))
.collect::<Vec<_>>()
);
}

View File

@ -17,13 +17,16 @@ use solana_gossip::{
weighted_shuffle::weighted_best,
};
use solana_ledger::{
ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash},
blockstore::Blockstore,
shred::{Nonce, Shred},
shred::{Nonce, Shred, SIZE_OF_NONCE},
};
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_debug;
use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler};
use solana_sdk::{clock::Slot, pubkey::Pubkey, timing::duration_as_ms};
use solana_sdk::{
clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms,
};
use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
collections::HashSet,
@ -34,52 +37,99 @@ use std::{
time::{Duration, Instant},
};
type SlotHash = (Slot, Hash);
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
// Number of slots to cache their respective repair peers and sampling weights.
pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128;
// Limit cache entries ttl in order to avoid re-using outdated data.
const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10);
pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize =
PACKET_DATA_SIZE -
SIZE_OF_NONCE -
4 /*(response version enum discriminator)*/ -
4 /*slot_hash length*/;
pub const MAX_ANCESTOR_RESPONSES: usize =
MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::<SlotHash>();
#[cfg(test)]
static_assertions::const_assert_eq!(MAX_ANCESTOR_RESPONSES, 30);
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RepairType {
pub enum ShredRepairType {
Orphan(Slot),
HighestShred(Slot, u64),
Shred(Slot, u64),
}
impl RepairType {
impl ShredRepairType {
pub fn slot(&self) -> Slot {
match self {
RepairType::Orphan(slot) => *slot,
RepairType::HighestShred(slot, _) => *slot,
RepairType::Shred(slot, _) => *slot,
ShredRepairType::Orphan(slot) => *slot,
ShredRepairType::HighestShred(slot, _) => *slot,
ShredRepairType::Shred(slot, _) => *slot,
}
}
}
impl RequestResponse for RepairType {
impl RequestResponse for ShredRepairType {
type Response = Shred;
fn num_expected_responses(&self) -> u32 {
match self {
RepairType::Orphan(_) => (MAX_ORPHAN_REPAIR_RESPONSES + 1) as u32, // run_orphan uses <= MAX_ORPHAN_REPAIR_RESPONSES
RepairType::HighestShred(_, _) => 1,
RepairType::Shred(_, _) => 1,
ShredRepairType::Orphan(_) => (MAX_ORPHAN_REPAIR_RESPONSES + 1) as u32, // run_orphan uses <= MAX_ORPHAN_REPAIR_RESPONSES
ShredRepairType::HighestShred(_, _) => 1,
ShredRepairType::Shred(_, _) => 1,
}
}
fn verify_response(&self, response_shred: &Shred) -> bool {
match self {
RepairType::Orphan(slot) => response_shred.slot() <= *slot,
RepairType::HighestShred(slot, index) => {
ShredRepairType::Orphan(slot) => response_shred.slot() <= *slot,
ShredRepairType::HighestShred(slot, index) => {
response_shred.slot() as u64 == *slot && response_shred.index() as u64 >= *index
}
RepairType::Shred(slot, index) => {
ShredRepairType::Shred(slot, index) => {
response_shred.slot() as u64 == *slot && response_shred.index() as u64 == *index
}
}
}
}
pub struct AncestorHashesRepair(Slot);
#[derive(Serialize, Deserialize)]
pub enum AncestorHashesResponseVersion {
Current(Vec<SlotHash>),
}
impl AncestorHashesResponseVersion {
#[cfg(test)]
fn into_slot_hashes(self) -> Vec<SlotHash> {
match self {
AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes,
}
}
fn slot_hashes(&self) -> &[SlotHash] {
match self {
AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes,
}
}
fn max_ancestors_in_response(&self) -> usize {
match self {
AncestorHashesResponseVersion::Current(_) => MAX_ANCESTOR_RESPONSES,
}
}
}
impl RequestResponse for AncestorHashesRepair {
type Response = AncestorHashesResponseVersion;
fn num_expected_responses(&self) -> u32 {
1
}
fn verify_response(&self, response: &AncestorHashesResponseVersion) -> bool {
response.slot_hashes().len() <= response.max_ancestors_in_response()
}
}
#[derive(Default)]
pub struct ServeRepairStats {
pub total_packets: usize,
@ -89,17 +139,19 @@ pub struct ServeRepairStats {
pub window_index: usize,
pub highest_window_index: usize,
pub orphan: usize,
pub ancestor_hashes: usize,
}
/// Window protocol messages
#[derive(Serialize, Deserialize, Debug)]
pub enum RepairProtocol {
WindowIndex(ContactInfo, u64, u64),
HighestWindowIndex(ContactInfo, u64, u64),
Orphan(ContactInfo, u64),
WindowIndexWithNonce(ContactInfo, u64, u64, Nonce),
HighestWindowIndexWithNonce(ContactInfo, u64, u64, Nonce),
OrphanWithNonce(ContactInfo, u64, Nonce),
WindowIndex(ContactInfo, Slot, u64),
HighestWindowIndex(ContactInfo, Slot, u64),
Orphan(ContactInfo, Slot),
WindowIndexWithNonce(ContactInfo, Slot, u64, Nonce),
HighestWindowIndexWithNonce(ContactInfo, Slot, u64, Nonce),
OrphanWithNonce(ContactInfo, Slot, Nonce),
AncestorHashes(ContactInfo, Slot, Nonce),
}
#[derive(Clone)]
@ -168,6 +220,7 @@ impl ServeRepair {
RepairProtocol::WindowIndexWithNonce(ref from, _, _, _) => from,
RepairProtocol::HighestWindowIndexWithNonce(ref from, _, _, _) => from,
RepairProtocol::OrphanWithNonce(ref from, _, _) => from,
RepairProtocol::AncestorHashes(ref from, _, _) => from,
}
}
@ -235,6 +288,13 @@ impl ServeRepair {
"OrphanWithNonce",
)
}
RepairProtocol::AncestorHashes(_, slot, nonce) => {
stats.ancestor_hashes += 1;
(
Self::run_ancestor_hashes(recycler, from_addr, blockstore, *slot, *nonce),
"AncestorHashes",
)
}
_ => (None, "Unsupported repair type"),
}
};
@ -319,7 +379,10 @@ impl ServeRepair {
stats.highest_window_index
);
inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan);
inc_new_counter_debug!(
"serve_repair-request-ancestor-hashes",
stats.ancestor_hashes
);
*stats = ServeRepairStats::default();
}
@ -420,7 +483,7 @@ impl ServeRepair {
pub(crate) fn repair_request(
&self,
cluster_slots: &ClusterSlots,
repair_request: RepairType,
repair_request: ShredRepairType,
peers_cache: &mut LruCache<Slot, RepairPeers>,
repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
@ -464,25 +527,25 @@ impl ServeRepair {
pub fn map_repair_request(
&self,
repair_request: &RepairType,
repair_request: &ShredRepairType,
repair_peer_id: &Pubkey,
repair_stats: &mut RepairStats,
nonce: Nonce,
) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, shred_index) => {
ShredRepairType::Shred(slot, shred_index) => {
repair_stats
.shred
.update(repair_peer_id, *slot, *shred_index);
Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?)
}
RepairType::HighestShred(slot, shred_index) => {
ShredRepairType::HighestShred(slot, shred_index) => {
repair_stats
.highest_shred
.update(repair_peer_id, *slot, *shred_index);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?)
}
RepairType::Orphan(slot) => {
ShredRepairType::Orphan(slot) => {
repair_stats.orphan.update(repair_peer_id, *slot, 0);
Ok(self.orphan_bytes(*slot, nonce)?)
}
@ -620,6 +683,40 @@ impl ServeRepair {
}
Some(res)
}
fn run_ancestor_hashes(
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
slot: Slot,
nonce: Nonce,
) -> Option<Packets> {
let blockstore = blockstore?;
let ancestor_slot_hashes = if blockstore.is_duplicate_confirmed(slot) {
let ancestor_iterator =
AncestorIteratorWithHash::from(AncestorIterator::new_inclusive(slot, blockstore));
ancestor_iterator.take(MAX_ANCESTOR_RESPONSES).collect()
} else {
// If this slot is not duplicate confirmed, return nothing
vec![]
};
let response = AncestorHashesResponseVersion::Current(ancestor_slot_hashes);
let serialized_response = serialize(&response).ok()?;
// Could probably directly write response into packet via `serialize_into()`
// instead of incurring extra copy in `repair_response_packet_from_bytes`, but
// serialize_into doesn't return the written size...
let packet = repair_response::repair_response_packet_from_bytes(
serialized_response,
from_addr,
nonce,
)?;
Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_ancestor_hashes",
vec![packet],
))
}
}
#[cfg(test)]
@ -676,7 +773,7 @@ mod tests {
nonce,
)
.expect("packets");
let request = RepairType::HighestShred(slot, index);
let request = ShredRepairType::HighestShred(slot, index);
verify_responses(&request, rv.packets.iter());
let rv: Vec<Shred> = rv
@ -762,7 +859,7 @@ mod tests {
nonce,
)
.expect("packets");
let request = RepairType::Shred(slot, index);
let request = ShredRepairType::Shred(slot, index);
verify_responses(&request, rv.packets.iter());
let rv: Vec<Shred> = rv
.packets
@ -788,7 +885,7 @@ mod tests {
let mut outstanding_requests = OutstandingRepairs::default();
let rv = serve_repair.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
@ -816,7 +913,7 @@ mod tests {
let rv = serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
@ -850,7 +947,7 @@ mod tests {
let rv = serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
@ -923,7 +1020,7 @@ mod tests {
.collect();
// Verify responses
let request = RepairType::Orphan(slot);
let request = ShredRepairType::Orphan(slot);
verify_responses(&request, rv.iter());
let expected: Vec<_> = (slot..slot + num_slots)
@ -1006,6 +1103,89 @@ mod tests {
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_run_ancestor_hashes() {
solana_logger::setup();
let recycler = PacketsRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let slot = 0;
let num_slots = MAX_ANCESTOR_RESPONSES as u64;
let nonce = 10;
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
// Create slots [slot, slot + num_slots) with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
// We don't have slot `slot + num_slots`, so we return empty
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
slot + num_slots,
nonce,
)
.expect("run_ancestor_hashes packets")
.packets;
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response: AncestorHashesResponseVersion =
limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]).unwrap();
assert!(ancestor_hashes_response.into_slot_hashes().is_empty());
// `slot + num_slots - 1` is not marked duplicate confirmed so nothing should return
// empty
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
slot + num_slots - 1,
nonce,
)
.expect("run_ancestor_hashes packets")
.packets;
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response: AncestorHashesResponseVersion =
limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]).unwrap();
assert!(ancestor_hashes_response.into_slot_hashes().is_empty());
// Set duplicate confirmed
let mut expected_ancestors = Vec::with_capacity(num_slots as usize);
expected_ancestors.resize(num_slots as usize, (0, Hash::default()));
for (i, duplicate_confirmed_slot) in (slot..slot + num_slots).enumerate() {
let frozen_hash = Hash::new_unique();
expected_ancestors[num_slots as usize - i - 1] =
(duplicate_confirmed_slot, frozen_hash);
blockstore.insert_bank_hash(duplicate_confirmed_slot, frozen_hash, true);
}
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
slot + num_slots - 1,
nonce,
)
.expect("run_ancestor_hashes packets")
.packets;
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response: AncestorHashesResponseVersion =
limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]).unwrap();
assert_eq!(
ancestor_hashes_response.into_slot_hashes(),
expected_ancestors
);
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_repair_with_repair_validators() {
let cluster_slots = ClusterSlots::default();
@ -1031,7 +1211,7 @@ mod tests {
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
@ -1048,7 +1228,7 @@ mod tests {
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
@ -1069,7 +1249,7 @@ mod tests {
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
ShredRepairType::Shred(0, 0),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
@ -1079,13 +1259,13 @@ mod tests {
}
#[test]
fn test_verify_response() {
let repair = RepairType::Orphan(9);
fn test_verify_shred_response() {
let repair = ShredRepairType::Orphan(9);
// Ensure new options are addded to this test
match repair {
RepairType::Orphan(_) => (),
RepairType::HighestShred(_, _) => (),
RepairType::Shred(_, _) => (),
ShredRepairType::Orphan(_) => (),
ShredRepairType::HighestShred(_, _) => (),
ShredRepairType::Shred(_, _) => (),
};
let slot = 9;
@ -1094,7 +1274,7 @@ mod tests {
// Orphan
let mut shred = Shred::new_empty_data_shred();
shred.set_slot(slot);
let request = RepairType::Orphan(slot);
let request = ShredRepairType::Orphan(slot);
assert!(request.verify_response(&shred));
shred.set_slot(slot - 1);
assert!(request.verify_response(&shred));
@ -1105,7 +1285,7 @@ mod tests {
shred = Shred::new_empty_data_shred();
shred.set_slot(slot);
shred.set_index(index);
let request = RepairType::HighestShred(slot, index as u64);
let request = ShredRepairType::HighestShred(slot, index as u64);
assert!(request.verify_response(&shred));
shred.set_index(index + 1);
assert!(request.verify_response(&shred));
@ -1121,7 +1301,7 @@ mod tests {
shred = Shred::new_empty_data_shred();
shred.set_slot(slot);
shred.set_index(index);
let request = RepairType::Shred(slot, index as u64);
let request = ShredRepairType::Shred(slot, index as u64);
assert!(request.verify_response(&shred));
shred.set_index(index + 1);
assert!(!request.verify_response(&shred));
@ -1130,11 +1310,26 @@ mod tests {
assert!(!request.verify_response(&shred));
}
fn verify_responses<'a>(request: &RepairType, packets: impl Iterator<Item = &'a Packet>) {
fn verify_responses<'a>(request: &ShredRepairType, packets: impl Iterator<Item = &'a Packet>) {
for packet in packets {
let shred_payload = packet.data.to_vec();
let shred = Shred::new_from_serialized_shred(shred_payload).unwrap();
request.verify_response(&shred);
}
}
#[test]
fn test_verify_ancestor_response() {
let request_slot = MAX_ANCESTOR_RESPONSES as Slot;
let repair = AncestorHashesRepair(request_slot);
let mut response: Vec<SlotHash> = (0..request_slot)
.into_iter()
.map(|slot| (slot, Hash::new_unique()))
.collect();
assert!(repair.verify_response(&AncestorHashesResponseVersion::Current(response.clone())));
// over the allowed limit, should fail
response.push((request_slot, Hash::new_unique()));
assert!(!repair.verify_response(&AncestorHashesResponseVersion::Current(response)));
}
}

View File

@ -806,7 +806,7 @@ mod test {
#[test]
fn test_prune_shreds() {
use crate::serve_repair::RepairType;
use crate::serve_repair::ShredRepairType;
use std::net::{IpAddr, Ipv4Addr};
solana_logger::setup();
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 5, 6, 6, 0);
@ -818,7 +818,7 @@ mod test {
nonce: 0,
};
let outstanding_requests = Arc::new(RwLock::new(OutstandingRepairs::default()));
let repair_type = RepairType::Orphan(9);
let repair_type = ShredRepairType::Orphan(9);
let nonce = outstanding_requests
.write()
.unwrap()