Blocktree+Erasure tests of basic erasure functionality (#3535)

* Remove WindowSlot; add Blocktree based tests to erasure
This commit is contained in:
Mark E. Sinclair 2019-03-28 01:55:51 -05:00 committed by GitHub
parent c30eb6185c
commit 50b0a5ae83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 354 additions and 752 deletions

View File

@ -2121,6 +2121,165 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Write entries
let gap = 10;
assert!(gap > 3);
let num_entries = 10;
let mut blobs = make_tiny_test_entries(num_entries).to_single_entry_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64 * gap);
b.set_slot(slot);
}
blocktree.write_blobs(&blobs).unwrap();
// Index of the first blob is 0
// Index of the second blob is "gap"
// Thus, the missing indexes should then be [1, gap - 1] for the input index
// range of [0, gap)
let expected: Vec<u64> = (1..gap).collect();
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize),
expected
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize),
expected,
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize),
vec![gap - 2, gap - 1],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1),
vec![gap - 2],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap, 1),
vec![1],
);
// Test with end indexes that are greater than the last item in the ledger
let mut expected: Vec<u64> = (1..gap).collect();
expected.push(gap + 1);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize),
expected,
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
for i in 0..num_entries as u64 {
for j in 0..i {
let expected: Vec<u64> = (j..i)
.flat_map(|k| {
let begin = k * gap + 1;
let end = (k + 1) * gap;
(begin..end)
})
.collect();
assert_eq!(
blocktree.find_missing_data_indexes(
slot,
j * gap,
i * gap,
((i - j) * gap) as usize
),
expected,
);
}
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes_sanity() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Early exit conditions
let empty: Vec<u64> = vec![];
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
let mut blobs = make_tiny_test_entries(2).to_single_entry_blobs();
const ONE: u64 = 1;
const OTHER: u64 = 4;
blobs[0].set_index(ONE);
blobs[1].set_index(OTHER);
// Insert one blob at index = first_index
blocktree.write_blobs(&blobs).unwrap();
const STARTS: u64 = OTHER * 2;
const END: u64 = OTHER * 3;
const MAX: usize = 10;
// The first blob has index = first_index. Thus, for i < first_index,
// given the input range of [i, first_index], the missing indexes should be
// [i, first_index - 1]
for start in 0..STARTS {
let result = blocktree.find_missing_data_indexes(
slot, start, // start
END, //end
MAX, //max
);
let expected: Vec<u64> = (start..END).filter(|i| *i != ONE && *i != OTHER).collect();
assert_eq!(result, expected);
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_no_missing_blob_indexes() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Write entries
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_single_entry_shared_blobs();
crate::packet::index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
blocktree.write_blobs(blobs).unwrap();
let empty: Vec<u64> = vec![];
for i in 0..num_entries as u64 {
for j in 0..i {
assert_eq!(
blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize),
empty
);
}
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
pub fn entries_to_blobs(
entries: &Vec<Entry>,
slot: u64,

View File

@ -1,7 +1,5 @@
//! Set of functions for emulating windowing functions from a database ledger implementation
use crate::blocktree::*;
#[cfg(feature = "erasure")]
use crate::erasure;
use crate::packet::{SharedBlob, BLOB_HEADER_SIZE};
use crate::result::Result;
use crate::streamer::BlobSender;
@ -49,346 +47,18 @@ pub fn process_blob(blocktree: &Arc<Blocktree>, blob: &SharedBlob) -> Result<()>
blocktree.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?;
}
#[cfg(feature = "erasure")]
{
// TODO: Support per-slot erasure. Issue: https://github.com/solana-labs/solana/issues/2441
if let Err(e) = try_erasure(blocktree, 0) {
trace!(
"erasure::recover failed to write recovered coding blobs. Err: {:?}",
e
);
}
}
Ok(())
}
#[cfg(feature = "erasure")]
fn try_erasure(blocktree: &Arc<Blocktree>, slot_index: u64) -> Result<()> {
let meta = blocktree.meta(slot_index)?;
if let Some(meta) = meta {
let (data, coding) = erasure::recover(blocktree, slot_index, meta.consumed)?;
for c in coding {
let c = c.read().unwrap();
blocktree.put_coding_blob_bytes(
0,
c.index(),
&c.data[..BLOB_HEADER_SIZE + c.size()],
)?;
}
blocktree.write_shared_blobs(data)
} else {
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
#[cfg(all(feature = "erasure", test))]
use crate::entry::reconstruct_entries_from_blobs;
use crate::entry::{make_tiny_test_entries, EntrySlice};
#[cfg(all(feature = "erasure", test))]
use crate::erasure::test::{generate_blocktree_from_window, setup_window_ledger};
#[cfg(all(feature = "erasure", test))]
use crate::erasure::{NUM_CODING, NUM_DATA};
use crate::packet::{index_blobs, Blob};
use crate::packet::index_blobs;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;
#[test]
pub fn test_find_missing_data_indexes_sanity() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Early exit conditions
let empty: Vec<u64> = vec![];
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
let mut blobs = make_tiny_test_entries(2).to_single_entry_blobs();
const ONE: u64 = 1;
const OTHER: u64 = 4;
blobs[0].set_index(ONE);
blobs[1].set_index(OTHER);
// Insert one blob at index = first_index
blocktree.write_blobs(&blobs).unwrap();
const STARTS: u64 = OTHER * 2;
const END: u64 = OTHER * 3;
const MAX: usize = 10;
// The first blob has index = first_index. Thus, for i < first_index,
// given the input range of [i, first_index], the missing indexes should be
// [i, first_index - 1]
for start in 0..STARTS {
let result = blocktree.find_missing_data_indexes(
slot, start, // start
END, //end
MAX, //max
);
let expected: Vec<u64> = (start..END).filter(|i| *i != ONE && *i != OTHER).collect();
assert_eq!(result, expected);
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_find_missing_data_indexes() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Write entries
let gap = 10;
assert!(gap > 3);
let num_entries = 10;
let mut blobs = make_tiny_test_entries(num_entries).to_single_entry_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64 * gap);
b.set_slot(slot);
}
blocktree.write_blobs(&blobs).unwrap();
// Index of the first blob is 0
// Index of the second blob is "gap"
// Thus, the missing indexes should then be [1, gap - 1] for the input index
// range of [0, gap)
let expected: Vec<u64> = (1..gap).collect();
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize),
expected
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize),
expected,
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize),
vec![gap - 2, gap - 1],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1),
vec![gap - 2],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap, 1),
vec![1],
);
// Test with end indexes that are greater than the last item in the ledger
let mut expected: Vec<u64> = (1..gap).collect();
expected.push(gap + 1);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize),
expected,
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
for i in 0..num_entries as u64 {
for j in 0..i {
let expected: Vec<u64> = (j..i)
.flat_map(|k| {
let begin = k * gap + 1;
let end = (k + 1) * gap;
(begin..end)
})
.collect();
assert_eq!(
blocktree.find_missing_data_indexes(
slot,
j * gap,
i * gap,
((i - j) * gap) as usize
),
expected,
);
}
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_find_missing_data_indexes_slots() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_entries_per_slot = 10;
let num_slots = 2;
let mut blobs =
make_tiny_test_entries(num_slots * num_entries_per_slot).to_single_entry_blobs();
// Insert every nth entry for each slot
let nth = 3;
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(((i % num_entries_per_slot) * nth) as u64);
b.set_slot((i / num_entries_per_slot) as u64);
}
blocktree.write_blobs(&blobs).unwrap();
let mut expected: Vec<u64> = (0..num_entries_per_slot)
.flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64))
.collect();
// For each slot, find all missing indexes in the range [0, num_entries_per_slot * nth]
for slot in 0..num_slots {
assert_eq!(
blocktree.find_missing_data_indexes(
slot as u64,
0,
(num_entries_per_slot * nth) as u64,
num_entries_per_slot * nth as usize
),
expected,
);
}
// Test with a limit on the number of returned entries
for slot in 0..num_slots {
assert_eq!(
blocktree.find_missing_data_indexes(
slot as u64,
0,
(num_entries_per_slot * nth) as u64,
num_entries_per_slot * (nth - 1)
)[..],
expected[..num_entries_per_slot * (nth - 1)],
);
}
// Try to find entries in the range [num_entries_per_slot * nth..num_entries_per_slot * (nth + 1)
// that don't exist in the ledger.
let extra_entries =
(num_entries_per_slot * nth) as u64..(num_entries_per_slot * (nth + 1)) as u64;
expected.extend(extra_entries);
// For each slot, find all missing indexes in the range [0, num_entries_per_slot * nth]
for slot in 0..num_slots {
assert_eq!(
blocktree.find_missing_data_indexes(
slot as u64,
0,
(num_entries_per_slot * (nth + 1)) as u64,
num_entries_per_slot * (nth + 1),
),
expected,
);
}
}
#[test]
pub fn test_no_missing_blob_indexes() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Write entries
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_single_entry_shared_blobs();
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
blocktree.write_blobs(blobs).unwrap();
let empty: Vec<u64> = vec![];
for i in 0..num_entries as u64 {
for j in 0..i {
assert_eq!(
blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize),
empty
);
}
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[cfg(all(feature = "erasure", test))]
#[test]
pub fn test_try_erasure() {
// Setup the window
let offset = 0;
let num_blobs = NUM_DATA + 2;
let slot = 0;
let mut window = setup_window_ledger(offset, num_blobs, false, slot);
let end_index = (offset + num_blobs) % window.len();
// Test erasing a data block and an erasure block
let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING);
let erased_index = coding_start % window.len();
// Create a hole in the window
let erased_data = window[erased_index].data.clone();
let erased_coding = window[erased_index].coding.clone().unwrap();
window[erased_index].data = None;
window[erased_index].coding = None;
// Generate the blocktree from the window
let ledger_path = get_tmp_ledger_path!();
let blocktree = Arc::new(generate_blocktree_from_window(&ledger_path, &window, false));
try_erasure(&blocktree, 0).expect("Expected successful erasure attempt");
window[erased_index].data = erased_data;
{
let data_blobs: Vec<_> = window[erased_index..end_index]
.iter()
.map(|entry| entry.data.clone().unwrap())
.collect();
let locks: Vec<_> = data_blobs.iter().map(|blob| blob.read().unwrap()).collect();
let locked_data: Vec<&Blob> = locks.iter().map(|lock| &**lock).collect();
let (expected, _) = reconstruct_entries_from_blobs(locked_data).unwrap();
assert_eq!(
blocktree
.get_slot_entries(
0,
erased_index as u64,
Some((end_index - erased_index) as u64)
)
.unwrap(),
expected
);
}
let erased_coding_l = erased_coding.read().unwrap();
assert_eq!(
&blocktree
.get_coding_blob_bytes(slot, erased_index as u64)
.unwrap()
.unwrap()[BLOB_HEADER_SIZE..],
&erased_coding_l.data()[..erased_coding_l.size() as usize],
);
}
#[test]
fn test_process_blob() {
let blocktree_path = get_tmp_ledger_path!();

View File

@ -362,7 +362,7 @@ impl CodingGenerator {
// Recover the missing data and coding blobs from the input ledger. Returns a vector
// of the recovered missing data blobs and a vector of the recovered coding blobs
pub fn recover(
blocktree: &Arc<Blocktree>,
blocktree: &Blocktree,
slot: u64,
start_idx: u64,
) -> Result<(Vec<SharedBlob>, Vec<SharedBlob>)> {
@ -498,23 +498,31 @@ fn categorize_blob(
#[cfg(test)]
pub mod test {
#[derive(Default, Clone)]
pub struct WindowSlot {
pub data: Option<SharedBlob>,
pub coding: Option<SharedBlob>,
pub leader_unknown: bool,
}
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::Blocktree;
use crate::entry::{make_tiny_test_entries, EntrySlice};
use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
use rand::{thread_rng, Rng};
use solana_sdk::pubkey::Pubkey;
use crate::packet::{index_blobs, SharedBlob};
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;
/// Specifies the contents of a 16-data-blob and 4-coding-blob erasure set
/// Exists to be passed to `generate_blocktree_with_coding`
#[derive(Debug, Copy, Clone)]
pub struct ErasureSpec {
/// Which 16-blob erasure set this represents
pub set_index: usize,
pub num_data: usize,
pub num_coding: usize,
}
/// Specifies the contents of a slot
/// Exists to be passed to `generate_blocktree_with_coding`
#[derive(Debug, Clone)]
pub struct SlotSpec {
pub slot: u64,
pub set_specs: Vec<ErasureSpec>,
}
#[test]
fn test_coding() {
@ -634,278 +642,192 @@ pub mod test {
}
}
// TODO: Temprorary function used in tests to generate a database ledger
// from the window (which is used to generate the erasure coding)
// until we also transition generate_coding() and BroadcastStage to use Blocktree
// Github issue: https://github.com/solana-labs/solana/issues/1899.
pub fn generate_blocktree_from_window(
ledger_path: &str,
window: &[WindowSlot],
use_random: bool,
) -> Blocktree {
let blocktree =
Blocktree::open(ledger_path).expect("Expected to be able to open database ledger");
for slot in window {
if let Some(ref data) = slot.data {
// If we're using gibberish blobs, skip validation checks and insert
// directly into the ledger
if use_random {
let data = data.read().unwrap();
blocktree
.put_data_blob_bytes(
data.slot(),
data.index(),
&data.data[..data.data_size() as usize],
)
.expect("Expected successful put into data column of ledger");
} else {
blocktree
.write_shared_blobs(vec![data].into_iter())
.unwrap();
#[test]
fn test_generate_blocktree_with_coding() {
let cases = vec![
(NUM_DATA, NUM_CODING, 7, 5),
(NUM_DATA - 6, NUM_CODING - 1, 5, 7),
];
for (num_data, num_coding, num_slots, num_sets_per_slot) in cases {
let ledger_path = get_tmp_ledger_path!();
let specs = (0..num_slots)
.map(|slot| {
let set_specs = (0..num_sets_per_slot)
.map(|set_index| ErasureSpec {
set_index,
num_data,
num_coding,
})
.collect();
SlotSpec { slot, set_specs }
})
.collect::<Vec<_>>();
let blocktree = generate_blocktree_with_coding(&ledger_path, &specs);
for spec in specs.iter() {
let slot = spec.slot;
for erasure_spec in spec.set_specs.iter() {
let set_index = erasure_spec.set_index as u64;
let start_index = set_index * NUM_DATA as u64;
for i in 0..erasure_spec.num_data as u64 {
let opt_bytes = blocktree
.get_data_blob_bytes(slot, start_index + i)
.unwrap();
assert!(opt_bytes.is_some());
}
for i in 0..erasure_spec.num_coding as u64 {
let coding_start_index = start_index as usize + (NUM_DATA - NUM_CODING);
let opt_bytes = blocktree
.get_coding_blob_bytes(slot, coding_start_index as u64 + i)
.unwrap();
assert!(opt_bytes.is_some());
}
}
}
if let Some(ref coding) = slot.coding {
let coding_lock = coding.read().unwrap();
drop(blocktree);
Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction");
}
}
let index = coding_lock.index();
#[test]
fn test_blocktree_recover_basic() {
let ledger_path = get_tmp_ledger_path!();
let data_size = coding_lock.size();
// Missing 1 data blob
let spec = SlotSpec {
slot: 0,
set_specs: vec![ErasureSpec {
set_index: 0,
num_data: NUM_DATA - 1,
num_coding: 4,
}],
};
blocktree
.put_coding_blob_bytes(
coding_lock.slot(),
index,
&coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],
)
.unwrap();
let blocktree = generate_blocktree_with_coding(&ledger_path, &[spec]);
let (recovered_data, recovered_coding) =
recover(&blocktree, 0, 0).expect("Expect successful recovery");
assert!(recovered_coding.is_empty());
assert!(recovered_data.len() == 1);
drop(blocktree);
Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction");
}
#[test]
fn test_blocktree_recover_basic2() {
let ledger_path = get_tmp_ledger_path!();
// Missing 1 data blob in [0, 16)
// [16..32) complete
let spec1 = SlotSpec {
slot: 0,
set_specs: vec![
ErasureSpec {
set_index: 0,
num_data: NUM_DATA - 1,
num_coding: NUM_CODING,
},
ErasureSpec {
set_index: 1,
num_data: NUM_DATA,
num_coding: NUM_CODING,
},
],
};
// Missing 1 coding and 1 data blbo
let spec2 = SlotSpec {
slot: 3,
set_specs: vec![ErasureSpec {
set_index: 3,
num_data: NUM_DATA - 1,
num_coding: NUM_CODING - 1,
}],
};
let blocktree = generate_blocktree_with_coding(&ledger_path, &[spec1, spec2]);
let (recovered_data, recovered_coding) =
recover(&blocktree, 0, 0).expect("Expect successful recovery");
assert!(recovered_coding.is_empty());
assert_eq!(recovered_data.len(), 1);
let (recovered_data, recovered_coding) =
recover(&blocktree, 0, NUM_DATA as u64).expect("Expect successful recovery");
assert!(recovered_coding.is_empty());
assert!(recovered_data.is_empty());
let (recovered_data, recovered_coding) =
recover(&blocktree, 3, 3 * NUM_DATA as u64).expect("Expect successful recovery");
assert_eq!(recovered_coding.len(), 1);
assert_eq!(recovered_data.len(), 1);
drop(blocktree);
Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction");
}
/// Genarates a ledger according to the given specs. Does not generate a valid ledger with
/// chaining and etc.
pub fn generate_blocktree_with_coding(ledger_path: &str, specs: &[SlotSpec]) -> Blocktree {
let blocktree = Blocktree::open(ledger_path).unwrap();
for spec in specs {
let slot = spec.slot;
for erasure_spec in spec.set_specs.iter() {
let set_index = erasure_spec.set_index as usize;
let start_index = set_index * NUM_DATA;
let mut blobs = make_tiny_test_entries(NUM_DATA).to_single_entry_shared_blobs();
index_blobs(
&blobs,
&Keypair::new().pubkey(),
start_index as u64,
slot,
0,
);
let mut coding_generator = CodingGenerator::new();
let mut coding_blobs = coding_generator.next(&blobs).unwrap();
blobs.drain(erasure_spec.num_data..);
coding_blobs.drain(erasure_spec.num_coding..);
for shared_blob in blobs {
let blob = shared_blob.read().unwrap();
let size = blob.size() as usize + BLOB_HEADER_SIZE;
blocktree
.put_data_blob_bytes(blob.slot(), blob.index(), &blob.data[..size])
.unwrap();
}
for shared_blob in coding_blobs {
let blob = shared_blob.read().unwrap();
let size = blob.size() as usize + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size])
.unwrap();
}
}
}
blocktree
}
fn generate_coding(
id: &Pubkey,
window: &mut [WindowSlot],
receive_index: u64,
num_blobs: usize,
transmit_index_coding: &mut u64,
) -> Result<()> {
// beginning of the coding blobs of the block that receive_index points into
let coding_index_start =
receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64;
let start_idx = receive_index as usize % window.len();
let mut block_start = start_idx - (start_idx % NUM_DATA);
loop {
let block_end = block_start + NUM_DATA;
if block_end > (start_idx + num_blobs) {
break;
}
info!(
"generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}",
id, block_start, block_end, start_idx, num_blobs
);
let mut max_data_size = 0;
// find max_data_size, maybe bail if not all the data is here
for i in block_start..block_end {
let n = i % window.len();
trace!("{} window[{}] = {:?}", id, n, window[n].data);
if let Some(b) = &window[n].data {
max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size);
} else {
trace!("{} data block is null @ {}", id, n);
return Ok(());
}
}
// round up to the nearest jerasure alignment
max_data_size = align!(max_data_size, JERASURE_ALIGN);
let mut data_blobs = Vec::with_capacity(NUM_DATA);
for i in block_start..block_end {
let n = i % window.len();
if let Some(b) = &window[n].data {
// make sure extra bytes in each blob are zero-d out for generation of
// coding blobs
let mut b_wl = b.write().unwrap();
for i in b_wl.meta.size..max_data_size {
b_wl.data[i] = 0;
}
data_blobs.push(b);
}
}
// getting ready to do erasure coding, means that we're potentially
// going back in time, tell our caller we've inserted coding blocks
// starting at coding_index_start
*transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
let coding_start = block_end - NUM_CODING;
for i in coding_start..block_end {
let n = i % window.len();
assert!(window[n].coding.is_none());
window[n].coding = Some(SharedBlob::default());
let coding = window[n].coding.clone().unwrap();
let mut coding_wl = coding.write().unwrap();
for i in 0..max_data_size {
coding_wl.data[i] = 0;
}
// copy index and forward flag from the data blob
if let Some(data) = &window[n].data {
let data_rl = data.read().unwrap();
let index = data_rl.index();
let slot = data_rl.slot();
let id = data_rl.id();
let should_forward = data_rl.should_forward();
trace!(
"{} copying index {} should_forward {:?} from data to coding",
should_forward,
index,
should_forward
);
coding_wl.set_index(index);
coding_wl.set_slot(slot);
coding_wl.set_id(&id);
coding_wl.forward(should_forward);
}
coding_wl.set_size(max_data_size);
coding_wl.set_coding();
coding_blobs.push(coding.clone());
}
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
let data_ptrs: Vec<_> = data_locks
.iter()
.enumerate()
.map(|(i, l)| {
trace!("{} i: {} data: {}", id, i, l.data[0]);
&l.data[..max_data_size]
})
.collect();
let mut coding_locks: Vec<_> =
coding_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut coding_ptrs: Vec<_> = coding_locks
.iter_mut()
.enumerate()
.map(|(i, l)| {
trace!("{} i: {} coding: {}", id, i, l.data[0],);
&mut l.data_mut()[..max_data_size]
})
.collect();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!(
"{} start_idx: {} data: {}:{} coding: {}:{}",
id, start_idx, block_start, block_end, coding_start, block_end
);
block_start = block_end;
}
Ok(())
}
pub fn setup_window_ledger(
offset: usize,
num_blobs: usize,
use_random_window: bool,
slot: u64,
) -> Vec<WindowSlot> {
// Generate a window
let mut window = {
if use_random_window {
generate_window(offset, num_blobs, slot)
} else {
generate_entry_window(offset, num_blobs)
}
};
for slot in &window {
if let Some(blob) = &slot.data {
let blob_r = blob.read().unwrap();
assert!(!blob_r.is_coding());
}
}
// Generate the coding blocks
let mut index = (NUM_DATA + 2) as u64;
assert!(generate_coding(
&Pubkey::default(),
&mut window,
offset as u64,
num_blobs,
&mut index
)
.is_ok());
assert_eq!(index, (NUM_DATA - NUM_CODING) as u64);
// put junk in the tails, simulates re-used blobs
scramble_window_tails(&mut window, num_blobs);
window
}
const WINDOW_SIZE: usize = 64;
fn generate_window(offset: usize, num_blobs: usize, slot: u64) -> Vec<WindowSlot> {
let mut window = vec![
WindowSlot {
data: None,
coding: None,
leader_unknown: false,
};
WINDOW_SIZE
];
let mut blobs = Vec::with_capacity(num_blobs);
for i in 0..num_blobs {
let b = SharedBlob::default();
let b_ = b.clone();
let mut w = b.write().unwrap();
// generate a random length, multiple of 4 between 8 and 32
let data_len = if i == 3 {
BLOB_DATA_SIZE
} else {
(thread_rng().gen_range(2, 8) * 4) + 1
};
w.set_size(data_len);
for k in 0..data_len {
w.data_mut()[k] = (k + i) as u8;
}
// overfill, simulates re-used blobs
for i in BLOB_HEADER_SIZE + data_len..BLOB_SIZE {
w.data[i] = thread_rng().gen();
}
blobs.push(b_);
}
// Make some dummy slots
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot, 0);
for b in blobs {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
window[idx].data = Some(b);
}
window
}
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let blobs = make_tiny_test_entries(num_blobs).to_single_entry_shared_blobs();
@ -913,153 +835,4 @@ pub mod test {
blobs
}
fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec<WindowSlot> {
let mut window = vec![
WindowSlot {
data: None,
coding: None,
leader_unknown: false,
};
WINDOW_SIZE
];
let blobs = generate_test_blobs(offset, num_blobs);
for b in blobs.into_iter() {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
window[idx].data = Some(b);
}
window
}
fn scramble_window_tails(window: &mut [WindowSlot], num_blobs: usize) {
for i in 0..num_blobs {
if let Some(b) = &window[i].data {
let size = {
let b_l = b.read().unwrap();
b_l.meta.size
} as usize;
let mut b_l = b.write().unwrap();
for i in size..BLOB_SIZE {
b_l.data[i] = thread_rng().gen();
}
}
}
}
// Remove a data block, test for successful recovery
#[test]
pub fn test_window_recover_basic() {
solana_logger::setup();
// Setup the window
let offset = 0;
let num_blobs = NUM_DATA + 2;
let mut window = setup_window_ledger(offset, num_blobs, true, 0);
// Test erasing a data block
let erase_offset = offset % window.len();
// Create a hole in the window
let refwindow = window[erase_offset].data.clone();
window[erase_offset].data = None;
// Generate the blocktree from the window
let ledger_path = get_tmp_ledger_path!();
let blocktree = Arc::new(generate_blocktree_from_window(&ledger_path, &window, true));
// Recover it from coding
let (recovered_data, recovered_coding) = recover(&blocktree, 0, offset as u64)
.expect("Expected successful recovery of erased blobs");
assert!(recovered_coding.is_empty());
{
// Check the result, block is here to drop locks
let recovered_blob = recovered_data
.first()
.expect("Expected recovered data blob to exist");
let ref_l = refwindow.clone().unwrap();
let ref_l2 = ref_l.read().unwrap();
let result = recovered_blob.read().unwrap();
assert_eq!(result.size(), ref_l2.size());
assert_eq!(
result.data[..ref_l2.data_size() as usize],
ref_l2.data[..ref_l2.data_size() as usize]
);
assert_eq!(result.index(), offset as u64);
assert_eq!(result.slot(), 0 as u64);
}
drop(blocktree);
Blocktree::destroy(&ledger_path)
.expect("Expected successful destruction of database ledger");
}
// Remove a data and coding block, test for successful recovery
#[test]
pub fn test_window_recover_basic2() {
solana_logger::setup();
// Setup the window
let offset = 0;
let num_blobs = NUM_DATA + 2;
let mut window = setup_window_ledger(offset, num_blobs, true, 0);
// Tests erasing a coding block and a data block
let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING);
let erase_offset = coding_start % window.len();
// Create a hole in the window
let refwindowdata = window[erase_offset].data.clone();
let refwindowcoding = window[erase_offset].coding.clone();
window[erase_offset].data = None;
window[erase_offset].coding = None;
let ledger_path = get_tmp_ledger_path!();
let blocktree = Arc::new(generate_blocktree_from_window(&ledger_path, &window, true));
// Recover it from coding
let (recovered_data, recovered_coding) = recover(&blocktree, 0, offset as u64)
.expect("Expected successful recovery of erased blobs");
{
let recovered_data_blob = recovered_data
.first()
.expect("Expected recovered data blob to exist");
let recovered_coding_blob = recovered_coding
.first()
.expect("Expected recovered coding blob to exist");
// Check the recovered data result
let ref_l = refwindowdata.clone().unwrap();
let ref_l2 = ref_l.read().unwrap();
let result = recovered_data_blob.read().unwrap();
assert_eq!(result.size(), ref_l2.size());
assert_eq!(
result.data[..ref_l2.data_size() as usize],
ref_l2.data[..ref_l2.data_size() as usize]
);
assert_eq!(result.index(), coding_start as u64);
assert_eq!(result.slot(), 0 as u64);
// Check the recovered erasure result
let ref_l = refwindowcoding.clone().unwrap();
let ref_l2 = ref_l.read().unwrap();
let result = recovered_coding_blob.read().unwrap();
assert_eq!(result.size(), ref_l2.size());
assert_eq!(
result.data()[..ref_l2.size() as usize],
ref_l2.data()[..ref_l2.size() as usize]
);
assert_eq!(result.index(), coding_start as u64);
assert_eq!(result.slot(), 0 as u64);
}
drop(blocktree);
Blocktree::destroy(&ledger_path)
.expect("Expected successful destruction of database ledger");
}
}