Fixes to repair and orphan logic for data shreds (#5587)
This commit is contained in:
parent
d651cb7a25
commit
0dc0594aaa
|
@ -469,12 +469,7 @@ impl Blocktree {
|
||||||
) -> Result<bool> {
|
) -> Result<bool> {
|
||||||
let slot = shred.slot();
|
let slot = shred.slot();
|
||||||
let index = u64::from(shred.index());
|
let index = u64::from(shred.index());
|
||||||
let parent = if let Shred::FirstInSlot(s) = shred {
|
let parent = shred.parent();
|
||||||
debug!("got first in slot");
|
|
||||||
s.header.parent
|
|
||||||
} else {
|
|
||||||
std::u64::MAX
|
|
||||||
};
|
|
||||||
|
|
||||||
let last_in_slot = if let Shred::LastInSlot(_) = shred {
|
let last_in_slot = if let Shred::LastInSlot(_) = shred {
|
||||||
debug!("got last in slot");
|
debug!("got last in slot");
|
||||||
|
@ -1169,7 +1164,7 @@ impl Blocktree {
|
||||||
end_index: u64,
|
end_index: u64,
|
||||||
max_missing: usize,
|
max_missing: usize,
|
||||||
) -> Vec<u64> {
|
) -> Vec<u64> {
|
||||||
if let Ok(mut db_iterator) = self.db.cursor::<cf::Data>() {
|
if let Ok(mut db_iterator) = self.db.cursor::<cf::ShredData>() {
|
||||||
Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing)
|
Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing)
|
||||||
} else {
|
} else {
|
||||||
vec![]
|
vec![]
|
||||||
|
@ -1187,11 +1182,6 @@ impl Blocktree {
|
||||||
.map(|x| x.0)
|
.map(|x| x.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_ledger_blobs(&self) -> impl Iterator<Item = Blob> + '_ {
|
|
||||||
let iter = self.db.iter::<cf::Data>(None).unwrap();
|
|
||||||
iter.map(|(_, blob_data)| Blob::new(&blob_data))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_slot_entries_with_blob_count(
|
pub fn get_slot_entries_with_blob_count(
|
||||||
&self,
|
&self,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
|
@ -2439,28 +2429,6 @@ pub fn create_new_tmp_ledger(name: &str, genesis_block: &GenesisBlock) -> (PathB
|
||||||
(ledger_path, blockhash)
|
(ledger_path, blockhash)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! tmp_copy_blocktree {
|
|
||||||
($from:expr) => {
|
|
||||||
tmp_copy_blocktree($from, tmp_ledger_name!())
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn tmp_copy_blocktree(from: &Path, name: &str) -> PathBuf {
|
|
||||||
let path = get_tmp_ledger_path(name);
|
|
||||||
|
|
||||||
let blocktree = Blocktree::open(from).unwrap();
|
|
||||||
let blobs = blocktree.read_ledger_blobs();
|
|
||||||
let genesis_block = GenesisBlock::load(from).unwrap();
|
|
||||||
|
|
||||||
Blocktree::destroy(&path).expect("Expected successful database destruction");
|
|
||||||
let blocktree = Blocktree::open(&path).unwrap();
|
|
||||||
blocktree.write_blobs(blobs).unwrap();
|
|
||||||
genesis_block.write(&path).unwrap();
|
|
||||||
|
|
||||||
path
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod tests {
|
pub mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -3659,15 +3627,17 @@ pub mod tests {
|
||||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||||
|
|
||||||
// Write entries
|
// Write entries
|
||||||
let gap = 10;
|
let gap: u64 = 10;
|
||||||
assert!(gap > 3);
|
assert!(gap > 3);
|
||||||
let num_entries = 10;
|
let num_entries = 10;
|
||||||
let mut blobs = make_tiny_test_entries(num_entries).to_single_entry_blobs();
|
let entries = make_tiny_test_entries(num_entries);
|
||||||
for (i, b) in blobs.iter_mut().enumerate() {
|
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
|
||||||
b.set_index(i as u64 * gap);
|
let num_shreds = shreds.len();
|
||||||
|
for (i, b) in shreds.iter_mut().enumerate() {
|
||||||
|
b.set_index(i as u32 * gap as u32);
|
||||||
b.set_slot(slot);
|
b.set_slot(slot);
|
||||||
}
|
}
|
||||||
blocktree.write_blobs(&blobs).unwrap();
|
blocktree.insert_shreds(&shreds).unwrap();
|
||||||
|
|
||||||
// Index of the first blob is 0
|
// Index of the first blob is 0
|
||||||
// Index of the second blob is "gap"
|
// Index of the second blob is "gap"
|
||||||
|
@ -3711,7 +3681,7 @@ pub mod tests {
|
||||||
&expected[..expected.len() - 1],
|
&expected[..expected.len() - 1],
|
||||||
);
|
);
|
||||||
|
|
||||||
for i in 0..num_entries as u64 {
|
for i in 0..num_shreds as u64 {
|
||||||
for j in 0..i {
|
for j in 0..i {
|
||||||
let expected: Vec<u64> = (j..i)
|
let expected: Vec<u64> = (j..i)
|
||||||
.flat_map(|k| {
|
.flat_map(|k| {
|
||||||
|
@ -3750,16 +3720,17 @@ pub mod tests {
|
||||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
|
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
|
||||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
|
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
|
||||||
|
|
||||||
let mut blobs = make_tiny_test_entries(2).to_single_entry_blobs();
|
let entries = make_tiny_test_entries(20);
|
||||||
|
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
|
||||||
|
|
||||||
const ONE: u64 = 1;
|
const ONE: u64 = 1;
|
||||||
const OTHER: u64 = 4;
|
const OTHER: u64 = 4;
|
||||||
|
|
||||||
blobs[0].set_index(ONE);
|
shreds[0].set_index(ONE as u32);
|
||||||
blobs[1].set_index(OTHER);
|
shreds[1].set_index(OTHER as u32);
|
||||||
|
|
||||||
// Insert one blob at index = first_index
|
// Insert one blob at index = first_index
|
||||||
blocktree.write_blobs(&blobs).unwrap();
|
blocktree.insert_shreds(&shreds[0..2]).unwrap();
|
||||||
|
|
||||||
const STARTS: u64 = OTHER * 2;
|
const STARTS: u64 = OTHER * 2;
|
||||||
const END: u64 = OTHER * 3;
|
const END: u64 = OTHER * 3;
|
||||||
|
@ -3789,16 +3760,14 @@ pub mod tests {
|
||||||
|
|
||||||
// Write entries
|
// Write entries
|
||||||
let num_entries = 10;
|
let num_entries = 10;
|
||||||
let shared_blobs = make_tiny_test_entries(num_entries).to_single_entry_shared_blobs();
|
let entries = make_tiny_test_entries(num_entries);
|
||||||
|
let shreds = entries_to_test_shreds(entries, slot, 0, true);
|
||||||
|
let num_shreds = shreds.len();
|
||||||
|
|
||||||
crate::packet::index_blobs(&shared_blobs, &Pubkey::new_rand(), 0, slot, 0);
|
blocktree.insert_shreds(&shreds).unwrap();
|
||||||
|
|
||||||
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
|
|
||||||
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
|
|
||||||
blocktree.write_blobs(blobs).unwrap();
|
|
||||||
|
|
||||||
let empty: Vec<u64> = vec![];
|
let empty: Vec<u64> = vec![];
|
||||||
for i in 0..num_entries as u64 {
|
for i in 0..num_shreds as u64 {
|
||||||
for j in 0..i {
|
for j in 0..i {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize),
|
blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize),
|
||||||
|
|
|
@ -88,7 +88,13 @@ impl BroadcastRun for StandardBroadcastRun {
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
trace!("Renew shredder with same parent slot {:?}", parent_slot);
|
trace!("Renew shredder with same parent slot {:?}", parent_slot);
|
||||||
Shredder::new(bank.slot(), None, 0.0, keypair, latest_blob_index as u32)
|
Shredder::new(
|
||||||
|
bank.slot(),
|
||||||
|
Some(parent_slot),
|
||||||
|
0.0,
|
||||||
|
keypair,
|
||||||
|
latest_blob_index as u32,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
trace!("New shredder with parent slot {:?}", parent_slot);
|
trace!("New shredder with parent slot {:?}", parent_slot);
|
||||||
|
|
|
@ -406,7 +406,8 @@ impl Service for RepairService {
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::blocktree::tests::{
|
use crate::blocktree::tests::{
|
||||||
make_chaining_slot_entries, make_many_slot_entries, make_slot_entries,
|
make_chaining_slot_entries, make_chaining_slot_entries_using_shreds,
|
||||||
|
make_many_slot_entries_using_shreds, make_slot_entries,
|
||||||
};
|
};
|
||||||
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
|
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
|
||||||
use crate::cluster_info::Node;
|
use crate::cluster_info::Node;
|
||||||
|
@ -468,21 +469,26 @@ mod test {
|
||||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||||
|
|
||||||
let nth = 3;
|
let nth = 3;
|
||||||
let num_entries_per_slot = 5 * nth;
|
|
||||||
let num_slots = 2;
|
let num_slots = 2;
|
||||||
|
|
||||||
// Create some blobs
|
// Create some blobs
|
||||||
let (blobs, _) =
|
let (mut shreds, _) =
|
||||||
make_many_slot_entries(0, num_slots as u64, num_entries_per_slot as u64);
|
make_many_slot_entries_using_shreds(0, num_slots as u64, 50 as u64);
|
||||||
|
let num_shreds = shreds.len() as u64;
|
||||||
|
let num_shreds_per_slot = num_shreds / num_slots;
|
||||||
|
|
||||||
// write every nth blob
|
// write every nth blob
|
||||||
let blobs_to_write: Vec<_> = blobs.iter().step_by(nth as usize).collect();
|
let mut shreds_to_write = vec![];
|
||||||
|
let mut missing_indexes_per_slot = vec![];
|
||||||
blocktree.write_blobs(blobs_to_write).unwrap();
|
for i in (0..num_shreds).rev() {
|
||||||
|
let index = i % num_shreds_per_slot;
|
||||||
let missing_indexes_per_slot: Vec<u64> = (0..num_entries_per_slot / nth - 1)
|
if index % nth == 0 {
|
||||||
.flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64))
|
shreds_to_write.insert(0, shreds.remove(i as usize));
|
||||||
.collect();
|
} else if i < num_shreds_per_slot {
|
||||||
|
missing_indexes_per_slot.insert(0, index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
blocktree.insert_shreds(&shreds_to_write).unwrap();
|
||||||
|
|
||||||
let expected: Vec<RepairType> = (0..num_slots)
|
let expected: Vec<RepairType> = (0..num_slots)
|
||||||
.flat_map(|slot| {
|
.flat_map(|slot| {
|
||||||
|
@ -541,9 +547,9 @@ mod test {
|
||||||
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
|
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
|
||||||
let num_entries_per_slot = 10;
|
let num_entries_per_slot = 10;
|
||||||
|
|
||||||
let blobs = make_chaining_slot_entries(&slots, num_entries_per_slot);
|
let shreds = make_chaining_slot_entries_using_shreds(&slots, num_entries_per_slot);
|
||||||
for (slot_blobs, _) in blobs.iter() {
|
for (slot_shreds, _) in shreds.iter() {
|
||||||
blocktree.write_blobs(&slot_blobs[1..]).unwrap();
|
blocktree.insert_shreds(&slot_shreds[1..]).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate through all possible combinations of start..end (inclusive on both
|
// Iterate through all possible combinations of start..end (inclusive on both
|
||||||
|
|
|
@ -34,6 +34,17 @@ impl Shred {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn parent(&self) -> u64 {
|
||||||
|
match self {
|
||||||
|
Shred::FirstInSlot(s) => s.header.data_header.parent,
|
||||||
|
Shred::FirstInFECSet(s)
|
||||||
|
| Shred::Data(s)
|
||||||
|
| Shred::LastInFECSet(s)
|
||||||
|
| Shred::LastInSlot(s) => s.header.parent,
|
||||||
|
Shred::Coding(_) => std::u64::MAX,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_slot(&mut self, slot: u64) {
|
pub fn set_slot(&mut self, slot: u64) {
|
||||||
match self {
|
match self {
|
||||||
Shred::FirstInSlot(s) => s.header.data_header.common_header.slot = slot,
|
Shred::FirstInSlot(s) => s.header.data_header.common_header.slot = slot,
|
||||||
|
@ -127,6 +138,7 @@ pub struct ShredCommonHeader {
|
||||||
pub struct DataShredHeader {
|
pub struct DataShredHeader {
|
||||||
_reserved: CodingShredHeader,
|
_reserved: CodingShredHeader,
|
||||||
pub common_header: ShredCommonHeader,
|
pub common_header: ShredCommonHeader,
|
||||||
|
pub parent: u64,
|
||||||
pub last_in_slot: u8,
|
pub last_in_slot: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,6 +296,7 @@ pub struct Shredder {
|
||||||
slot: u64,
|
slot: u64,
|
||||||
index: u32,
|
index: u32,
|
||||||
pub parent: Option<u64>,
|
pub parent: Option<u64>,
|
||||||
|
parent_slot: u64,
|
||||||
fec_rate: f32,
|
fec_rate: f32,
|
||||||
signer: Arc<Keypair>,
|
signer: Arc<Keypair>,
|
||||||
pub shreds: Vec<Vec<u8>>,
|
pub shreds: Vec<Vec<u8>>,
|
||||||
|
@ -301,8 +314,15 @@ impl Write for Shredder {
|
||||||
self.parent
|
self.parent
|
||||||
.take()
|
.take()
|
||||||
.map(|parent| {
|
.map(|parent| {
|
||||||
// If parent slot is provided, assume it's first shred in slot
|
self.parent_slot = parent;
|
||||||
Shred::FirstInSlot(self.new_first_shred(parent))
|
// If parent slot is available
|
||||||
|
if self.index == 0 {
|
||||||
|
// If index is 0, it's the first shred in slot
|
||||||
|
Shred::FirstInSlot(self.new_first_shred(parent))
|
||||||
|
} else {
|
||||||
|
// Or, it is the first shred in FEC set
|
||||||
|
Shred::FirstInFECSet(self.new_data_shred())
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.unwrap_or_else(||
|
.unwrap_or_else(||
|
||||||
// If parent slot is not provided, and since there's no existing shred,
|
// If parent slot is not provided, and since there's no existing shred,
|
||||||
|
@ -371,6 +391,7 @@ impl Shredder {
|
||||||
slot,
|
slot,
|
||||||
index,
|
index,
|
||||||
parent,
|
parent,
|
||||||
|
parent_slot: 0,
|
||||||
fec_rate,
|
fec_rate,
|
||||||
signer: signer.clone(),
|
signer: signer.clone(),
|
||||||
..Shredder::default()
|
..Shredder::default()
|
||||||
|
@ -403,6 +424,7 @@ impl Shredder {
|
||||||
let mut data_shred = DataShred::default();
|
let mut data_shred = DataShred::default();
|
||||||
data_shred.header.common_header.slot = self.slot;
|
data_shred.header.common_header.slot = self.slot;
|
||||||
data_shred.header.common_header.index = self.index;
|
data_shred.header.common_header.index = self.index;
|
||||||
|
data_shred.header.parent = self.parent_slot;
|
||||||
data_shred
|
data_shred
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,6 +432,7 @@ impl Shredder {
|
||||||
fn new_first_shred(&self, parent: u64) -> FirstDataShred {
|
fn new_first_shred(&self, parent: u64) -> FirstDataShred {
|
||||||
let mut first_shred = FirstDataShred::default();
|
let mut first_shred = FirstDataShred::default();
|
||||||
first_shred.header.parent = parent;
|
first_shred.header.parent = parent;
|
||||||
|
first_shred.header.data_header.parent = parent;
|
||||||
first_shred.header.data_header.common_header.slot = self.slot;
|
first_shred.header.data_header.common_header.slot = self.slot;
|
||||||
first_shred.header.data_header.common_header.index = self.index;
|
first_shred.header.data_header.common_header.index = self.index;
|
||||||
first_shred
|
first_shred
|
||||||
|
|
|
@ -114,7 +114,7 @@ where
|
||||||
|
|
||||||
blocktree.insert_shreds(&shreds)?;
|
blocktree.insert_shreds(&shreds)?;
|
||||||
|
|
||||||
info!(
|
trace!(
|
||||||
"Elapsed processing time in recv_window(): {}",
|
"Elapsed processing time in recv_window(): {}",
|
||||||
duration_as_ms(&now.elapsed())
|
duration_as_ms(&now.elapsed())
|
||||||
);
|
);
|
||||||
|
|
|
@ -15,7 +15,7 @@ use solana_sdk::{client::SyncClient, poh_config::PohConfig, timing};
|
||||||
use std::{collections::HashSet, thread::sleep, time::Duration};
|
use std::{collections::HashSet, thread::sleep, time::Duration};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[serial]
|
||||||
fn test_ledger_cleanup_service() {
|
fn test_ledger_cleanup_service() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
error!("test_ledger_cleanup_service");
|
error!("test_ledger_cleanup_service");
|
||||||
|
@ -69,7 +69,7 @@ fn test_spend_and_verify_all_nodes_1() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[serial]
|
||||||
fn test_spend_and_verify_all_nodes_2() {
|
fn test_spend_and_verify_all_nodes_2() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
error!("test_spend_and_verify_all_nodes_2");
|
error!("test_spend_and_verify_all_nodes_2");
|
||||||
|
@ -84,7 +84,7 @@ fn test_spend_and_verify_all_nodes_2() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[serial]
|
||||||
fn test_spend_and_verify_all_nodes_3() {
|
fn test_spend_and_verify_all_nodes_3() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
error!("test_spend_and_verify_all_nodes_3");
|
error!("test_spend_and_verify_all_nodes_3");
|
||||||
|
|
Loading…
Reference in New Issue