Dynamic erasure (#4653)

Remove erasure-related constants

Remove unneeded `Iterator::collect` call

Document erasure module

Randomize coding blobs used for repair
This commit is contained in:
Mark E. Sinclair 2019-06-20 20:27:41 -05:00 committed by GitHub
parent 4069ef2e02
commit ada4d16c4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1132 additions and 1078 deletions

File diff suppressed because it is too large Load Diff

View File

@ -39,6 +39,10 @@ pub mod columns {
#[derive(Debug)]
/// The root column
pub struct Root;
#[derive(Debug)]
/// The index column
pub struct Index;
}
pub trait Backend: Sized + Send + Sync {

View File

@ -71,14 +71,21 @@ impl Backend for Kvs {
impl Column<Kvs> for cf::Coding {
const NAME: &'static str = super::ERASURE_CF;
type Index = (u64, u64);
type Index = (u64, u64, u64);
fn key(index: (u64, u64)) -> Key {
cf::Data::key(index)
fn key((slot, set_index, index): (u64, u64, u64)) -> Vec<u8> {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[..8], slot);
BigEndian::write_u64(&mut key.0[8..16], set_index);
BigEndian::write_u64(&mut key.0[16..], index);
key
}
fn index(key: &Key) -> (u64, u64) {
cf::Data::index(key)
fn index(key: &Key) -> (u64, u64, u64) {
let slot = BigEndian::read_u64(&key.0[..8]);
let set_index = BigEndian::read_u64(&key.0[8..16]);
let index = BigEndian::read_u64(&key.0[16..]);
(slot, set_index, index)
}
}
@ -172,8 +179,12 @@ impl Column<Kvs> for cf::SlotMeta {
}
}
impl Column<Kvs> for cf::SlotMeta {
const NAME: &'static str = super::META_CF;
impl TypedColumn<Kvs> for cf::SlotMeta {
type Type = super::SlotMeta;
}
impl Column<Kvs> for cf::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64;
fn key(slot: u64) -> Key {
@ -187,8 +198,8 @@ impl Column<Kvs> for cf::SlotMeta {
}
}
impl TypedColumn<Kvs> for cf::SlotMeta {
type Type = super::SlotMeta;
impl TypedColumn<Kvs> for cf::Index {
type Type = crate::blocktree::meta::Index;
}
impl Column<Kvs> for cf::ErasureMeta {

View File

@ -1,6 +1,6 @@
use crate::erasure::{NUM_CODING, NUM_DATA};
use crate::erasure::CodingHeader;
use solana_metrics::datapoint;
use std::borrow::Borrow;
use std::{collections::BTreeMap, ops::RangeBounds};
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
// The Meta column family
@ -27,6 +27,43 @@ pub struct SlotMeta {
pub is_connected: bool,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Index recording presence/absence of blobs
pub struct Index {
pub slot: u64,
data: DataIndex,
coding: CodingIndex,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct DataIndex {
slot: u64,
/// Map representing presence/absence of data blobs
index: BTreeMap<u64, bool>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Erasure coding information
pub struct CodingIndex {
slot: u64,
/// Map from set index, to hashmap from blob index to presence bool
index: BTreeMap<u64, BTreeMap<u64, bool>>,
}
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Erasure coding information
pub struct ErasureMeta {
header: CodingHeader,
set_index: u64,
}
#[derive(Debug, PartialEq)]
pub enum ErasureMetaStatus {
CanRecover,
DataFull,
StillNeed(usize),
}
impl SlotMeta {
pub fn is_full(&self) -> bool {
// last_index is std::u64::MAX when it has no information about how
@ -72,274 +109,174 @@ impl SlotMeta {
}
}
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
/// Erasure coding information
pub struct ErasureMeta {
/// Which erasure set in the slot this is
pub set_index: u64,
/// Size of shards in this erasure set
pub size: usize,
/// Bitfield representing presence/absence of data blobs
data: u64,
/// Bitfield representing presence/absence of coding blobs
coding: u64,
impl Index {
pub(in crate::blocktree) fn new(slot: u64) -> Self {
Index {
slot,
data: DataIndex::default(),
coding: CodingIndex::default(),
}
}
pub fn data(&self) -> &DataIndex {
&self.data
}
pub fn coding(&self) -> &CodingIndex {
&self.coding
}
pub fn data_mut(&mut self) -> &mut DataIndex {
&mut self.data
}
pub fn coding_mut(&mut self) -> &mut CodingIndex {
&mut self.coding
}
}
#[derive(Debug, PartialEq)]
pub enum ErasureMetaStatus {
CanRecover,
DataFull,
StillNeed(usize),
impl CodingIndex {
pub fn is_set_present(&self, set_index: u64) -> bool {
self.index.contains_key(&set_index)
}
pub fn present_in_set(&self, set_index: u64) -> usize {
match self.index.get(&set_index) {
Some(map) => map.values().filter(|presence| **presence).count(),
None => 0,
}
}
pub fn is_present(&self, set_index: u64, blob_index: u64) -> bool {
match self.index.get(&set_index) {
Some(map) => *map.get(&blob_index).unwrap_or(&false),
None => false,
}
}
pub fn set_present(&mut self, set_index: u64, blob_index: u64, present: bool) {
let set_map = self
.index
.entry(set_index)
.or_insert_with(BTreeMap::default);
set_map.insert(blob_index, present);
}
}
impl DataIndex {
pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index
.range(bounds)
.filter(|(_, presence)| **presence)
.count()
}
pub fn is_present(&self, index: u64) -> bool {
*self.index.get(&index).unwrap_or(&false)
}
pub fn set_present(&mut self, index: u64, presence: bool) {
self.index.insert(index, presence);
}
}
impl ErasureMeta {
pub fn new(set_index: u64) -> ErasureMeta {
pub(in crate::blocktree) fn new(set_index: u64) -> ErasureMeta {
ErasureMeta {
header: CodingHeader::default(),
set_index,
size: 0,
data: 0,
coding: 0,
}
}
pub fn status(&self) -> ErasureMetaStatus {
let (data_missing, coding_missing) =
(NUM_DATA - self.num_data(), NUM_CODING - self.num_coding());
if data_missing > 0 && data_missing + coding_missing <= NUM_CODING {
assert!(self.size != 0);
ErasureMetaStatus::CanRecover
pub fn session_info(&self) -> CodingHeader {
self.header
}
pub fn set_session_info(&mut self, header: CodingHeader) {
self.header = header;
}
pub fn status(&self, index: &Index) -> ErasureMetaStatus {
use ErasureMetaStatus::*;
let start_idx = self.header.start_index;
let end_idx = start_idx + self.header.data_count as u64;
let num_coding = index.coding().present_in_set(self.header.set_index);
let num_data = index.data().present_in_bounds(start_idx..end_idx);
assert!(self.header.shard_size != 0);
let (data_missing, coding_missing) = (
self.header.data_count - num_data,
self.header.parity_count - num_coding,
);
let total_missing = data_missing + coding_missing;
if data_missing > 0 && total_missing <= self.header.parity_count {
CanRecover
} else if data_missing == 0 {
ErasureMetaStatus::DataFull
DataFull
} else {
ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING)
StillNeed(total_missing - self.header.parity_count)
}
}
pub fn num_coding(&self) -> usize {
self.coding.count_ones() as usize
}
pub fn num_data(&self) -> usize {
self.data.count_ones() as usize
}
pub fn is_coding_present(&self, index: u64) -> bool {
if let Some(position) = self.data_index_in_set(index) {
self.coding & (1 << position) != 0
} else {
false
}
}
pub fn set_size(&mut self, size: usize) {
self.size = size;
}
pub fn size(&self) -> usize {
self.size
}
pub fn set_coding_present(&mut self, index: u64, present: bool) {
if let Some(position) = self.data_index_in_set(index) {
if present {
self.coding |= 1 << position;
} else {
self.coding &= !(1 << position);
}
}
}
pub fn is_data_present(&self, index: u64) -> bool {
if let Some(position) = self.data_index_in_set(index) {
self.data & (1 << position) != 0
} else {
false
}
}
pub fn set_data_present(&mut self, index: u64, present: bool) {
if let Some(position) = self.data_index_in_set(index) {
if present {
self.data |= 1 << position;
} else {
self.data &= !(1 << position);
}
}
}
pub fn set_data_multi<I, Idx>(&mut self, indexes: I, present: bool)
where
I: IntoIterator<Item = Idx>,
Idx: Borrow<u64>,
{
for index in indexes.into_iter() {
self.set_data_present(*index.borrow(), present);
}
}
pub fn set_coding_multi<I, Idx>(&mut self, indexes: I, present: bool)
where
I: IntoIterator<Item = Idx>,
Idx: Borrow<u64>,
{
for index in indexes.into_iter() {
self.set_coding_present(*index.borrow(), present);
}
}
pub fn set_index_for(index: u64) -> u64 {
index / NUM_DATA as u64
}
pub fn data_index_in_set(&self, index: u64) -> Option<u64> {
let set_index = Self::set_index_for(index);
if set_index == self.set_index {
Some(index - self.start_index())
} else {
None
}
}
pub fn coding_index_in_set(&self, index: u64) -> Option<u64> {
self.data_index_in_set(index).map(|i| i + NUM_DATA as u64)
}
pub fn start_index(&self) -> u64 {
self.set_index * NUM_DATA as u64
}
/// returns a tuple of (data_end, coding_end)
pub fn end_indexes(&self) -> (u64, u64) {
let start = self.start_index();
(start + NUM_DATA as u64, start + NUM_CODING as u64)
}
}
#[test]
fn test_meta_indexes() {
use rand::{thread_rng, Rng};
// to avoid casts everywhere
const NUM_DATA: u64 = crate::erasure::NUM_DATA as u64;
#[cfg(test)]
mod test {
use super::*;
let mut rng = thread_rng();
const NUM_DATA: u64 = 7;
const NUM_CODING: u64 = 8;
for _ in 0..100 {
let set_index = rng.gen_range(0, 1_000);
let blob_index = (set_index * NUM_DATA) + rng.gen_range(0, NUM_DATA);
assert_eq!(set_index, ErasureMeta::set_index_for(blob_index));
let e_meta = ErasureMeta::new(set_index);
assert_eq!(e_meta.start_index(), set_index * NUM_DATA);
let (data_end_idx, coding_end_idx) = e_meta.end_indexes();
assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA);
assert_eq!(coding_end_idx, set_index * NUM_DATA + NUM_CODING as u64);
fn sample_header() -> CodingHeader {
CodingHeader {
shard_size: 1,
data_count: NUM_DATA as usize,
parity_count: NUM_CODING as usize,
..CodingHeader::default()
}
}
let mut e_meta = ErasureMeta::new(0);
#[test]
fn test_erasure_meta_status() {
let set_index = 0;
assert_eq!(e_meta.data_index_in_set(0), Some(0));
assert_eq!(e_meta.data_index_in_set(NUM_DATA / 2), Some(NUM_DATA / 2));
assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), Some(NUM_DATA - 1));
assert_eq!(e_meta.data_index_in_set(NUM_DATA), None);
assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None);
let header = sample_header();
let mut e_meta = ErasureMeta::new(set_index);
e_meta.set_session_info(header);
e_meta.set_index = 1;
let mut index = Index::new(0);
assert_eq!(e_meta.data_index_in_set(0), None);
assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), None);
assert_eq!(e_meta.data_index_in_set(NUM_DATA), Some(0));
assert_eq!(
e_meta.data_index_in_set(NUM_DATA * 2 - 1),
Some(NUM_DATA - 1)
);
assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None);
}
assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(7));
#[test]
fn test_meta_coding_present() {
let mut e_meta = ErasureMeta::default();
for i in 0..NUM_DATA {
index.data_mut().set_present(i, true);
}
e_meta.set_coding_multi(0..NUM_CODING as u64, true);
for i in 0..NUM_CODING as u64 {
assert_eq!(e_meta.is_coding_present(i), true);
}
for i in NUM_CODING as u64..NUM_DATA as u64 {
assert_eq!(e_meta.is_coding_present(i), false);
}
assert_eq!(e_meta.status(&index), ErasureMetaStatus::DataFull);
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64);
let start_idx = e_meta.start_index();
e_meta.set_coding_multi(start_idx..start_idx + NUM_CODING as u64, true);
index.data_mut().set_present(NUM_DATA - 1, false);
for i in start_idx..start_idx + NUM_CODING as u64 {
e_meta.set_coding_present(i, true);
assert_eq!(e_meta.is_coding_present(i), true);
}
for i in start_idx + NUM_CODING as u64..start_idx + NUM_DATA as u64 {
assert_eq!(e_meta.is_coding_present(i), false);
}
}
#[test]
fn test_erasure_meta_status() {
use rand::{seq::SliceRandom, thread_rng};
// Local constansts just used to avoid repetitive casts
const N_DATA: u64 = crate::erasure::NUM_DATA as u64;
const N_CODING: u64 = crate::erasure::NUM_CODING as u64;
let mut e_meta = ErasureMeta::default();
let mut rng = thread_rng();
let data_indexes: Vec<u64> = (0..N_DATA).collect();
let coding_indexes: Vec<u64> = (0..N_CODING).collect();
assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA));
e_meta.set_data_multi(0..N_DATA, true);
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
e_meta.size = 1;
e_meta.set_coding_multi(0..N_CODING, true);
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
for &idx in data_indexes.choose_multiple(&mut rng, NUM_CODING) {
e_meta.set_data_present(idx, false);
assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
}
e_meta.set_data_multi(0..N_DATA, true);
for &idx in coding_indexes.choose_multiple(&mut rng, NUM_CODING) {
e_meta.set_coding_present(idx, false);
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
}
}
#[test]
fn test_meta_data_present() {
let mut e_meta = ErasureMeta::default();
e_meta.set_data_multi(0..NUM_DATA as u64, true);
for i in 0..NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), true);
}
for i in NUM_DATA as u64..2 * NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), false);
}
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64);
let start_idx = e_meta.start_index();
e_meta.set_data_multi(start_idx..start_idx + NUM_DATA as u64, true);
for i in start_idx..start_idx + NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), true);
}
for i in start_idx - NUM_DATA as u64..start_idx {
assert_eq!(e_meta.is_data_present(i), false);
assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(1));
for i in 0..NUM_DATA - 2 {
index.data_mut().set_present(i, false);
}
assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(6));
for i in 0..NUM_CODING {
index.coding_mut().set_present(set_index, i, true);
}
index.data_mut().set_present(NUM_DATA - 1, false);
for i in 0..NUM_DATA - 1 {
index.data_mut().set_present(i, true);
assert_eq!(e_meta.status(&index), ErasureMetaStatus::CanRecover);
}
}
}

View File

@ -30,7 +30,9 @@ impl Backend for Rocks {
type Error = rocksdb::Error;
fn open(path: &Path) -> Result<Rocks> {
use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta};
use crate::blocktree::db::columns::{
Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta,
};
fs::create_dir_all(&path)?;
@ -40,12 +42,14 @@ impl Backend for Rocks {
// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options());
let index_desc = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options());
let cfs = vec![
meta_cf_descriptor,
@ -55,6 +59,7 @@ impl Backend for Rocks {
erasure_meta_cf_descriptor,
orphans_cf_descriptor,
root_cf_descriptor,
index_desc,
];
// Open the database
@ -64,12 +69,15 @@ impl Backend for Rocks {
}
fn columns(&self) -> Vec<&'static str> {
use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta};
use crate::blocktree::db::columns::{
Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta,
};
vec![
Coding::NAME,
ErasureMeta::NAME,
DeadSlots::NAME,
Index::NAME,
Data::NAME,
Orphans::NAME,
Root::NAME,
@ -135,14 +143,21 @@ impl Backend for Rocks {
impl Column<Rocks> for cf::Coding {
const NAME: &'static str = super::ERASURE_CF;
type Index = (u64, u64);
type Index = (u64, u64, u64);
fn key(index: (u64, u64)) -> Vec<u8> {
cf::Data::key(index)
fn key((slot, set_index, index): (u64, u64, u64)) -> Vec<u8> {
let mut key = vec![0; 24];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u64(&mut key[8..16], set_index);
BigEndian::write_u64(&mut key[16..], index);
key
}
fn index(key: &[u8]) -> (u64, u64) {
cf::Data::index(key)
fn index(key: &[u8]) -> (u64, u64, u64) {
let slot = BigEndian::read_u64(&key[..8]);
let set_index = BigEndian::read_u64(&key[8..16]);
let index = BigEndian::read_u64(&key[16..]);
(slot, set_index, index)
}
}
@ -263,6 +278,25 @@ impl TypedColumn<Rocks> for cf::ErasureMeta {
type Type = super::ErasureMeta;
}
impl Column<Rocks> for cf::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
}
impl TypedColumn<Rocks> for cf::Index {
type Type = crate::blocktree::meta::Index;
}
impl DbCursor<Rocks> for DBRawIterator {
fn valid(&self) -> bool {
DBRawIterator::valid(self)

View File

@ -3,7 +3,6 @@ use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastR
use self::standard_broadcast_run::StandardBroadcastRun;
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::erasure::CodingGenerator;
use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result};
use crate::service::Service;
@ -80,7 +79,6 @@ trait BroadcastRun {
}
struct Broadcast {
coding_generator: CodingGenerator,
thread_pool: ThreadPool,
}
@ -115,10 +113,7 @@ impl BroadcastStage {
blocktree: &Arc<Blocktree>,
mut broadcast_stage_run: impl BroadcastRun,
) -> BroadcastStageReturnType {
let coding_generator = CodingGenerator::default();
let mut broadcast = Broadcast {
coding_generator,
thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.build()

View File

@ -1,6 +1,6 @@
use crate::entry::Entry;
use crate::entry::EntrySlice;
use crate::erasure::CodingGenerator;
use crate::erasure;
use crate::packet::{self, SharedBlob};
use crate::poh_recorder::WorkingBankEntries;
use crate::result::Result;
@ -81,7 +81,7 @@ pub(super) fn entries_to_blobs(
last_tick: u64,
bank: &Bank,
keypair: &Keypair,
coding_generator: &mut CodingGenerator,
set_index: &mut u64,
) -> (Vec<SharedBlob>, Vec<SharedBlob>) {
let blobs = generate_data_blobs(
ventries,
@ -92,7 +92,18 @@ pub(super) fn entries_to_blobs(
&keypair,
);
let coding = generate_coding_blobs(&blobs, &thread_pool, coding_generator, &keypair);
let start_index = blobs[0].read().unwrap().index();
let coding = generate_coding_blobs(
&blobs,
&thread_pool,
bank.slot(),
*set_index,
start_index,
&keypair,
);
*set_index += 1;
(blobs, coding)
}
@ -141,10 +152,15 @@ pub(super) fn generate_data_blobs(
pub(super) fn generate_coding_blobs(
blobs: &[SharedBlob],
thread_pool: &ThreadPool,
coding_generator: &mut CodingGenerator,
slot: u64,
set_index: u64,
start_index: u64,
keypair: &Keypair,
) -> Vec<SharedBlob> {
let coding = coding_generator.next(&blobs);
let set_len = blobs.len();
let coding = erasure::encode_shared(slot, set_index, start_index, blobs, set_len)
.expect("Erasure coding failed");
thread_pool.install(|| {
coding.par_iter().for_each(|c| {

View File

@ -1,11 +1,13 @@
use super::*;
use solana_sdk::hash::Hash;
pub(super) struct FailEntryVerificationBroadcastRun {}
pub(super) struct FailEntryVerificationBroadcastRun {
set_index: u64,
}
impl FailEntryVerificationBroadcastRun {
pub(super) fn new() -> Self {
Self {}
Self { set_index: 0 }
}
}
@ -49,7 +51,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
&mut self.set_index,
);
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;

View File

@ -10,12 +10,14 @@ struct BroadcastStats {
pub(super) struct StandardBroadcastRun {
stats: BroadcastStats,
set_index: u64,
}
impl StandardBroadcastRun {
pub(super) fn new() -> Self {
Self {
stats: BroadcastStats::default(),
set_index: 0,
}
}
@ -79,7 +81,7 @@ impl BroadcastRun for StandardBroadcastRun {
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
&mut self.set_index,
);
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;

View File

@ -133,7 +133,7 @@ mod tests {
hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes....
let golden: Hash = "E2HZjSC6VgH4nmEiTbMDATTeBcFjwSYz7QYvU7doGNhD"
let golden: Hash = "AMKCEbK6txetPPEQ8JDVdrqpefgtGRzN6ng2gSmKy6Fv"
.parse()
.unwrap();

View File

@ -4,8 +4,7 @@ use crate::crds_value::EpochSlots;
use crate::result::Result;
use crate::service::Service;
use byteorder::{ByteOrder, LittleEndian};
use rand::seq::SliceRandom;
use rand::SeedableRng;
use rand::{seq::SliceRandom, Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_metrics::datapoint;
use solana_runtime::epoch_schedule::EpochSchedule;
@ -272,6 +271,8 @@ impl ClusterInfoRepairListener {
let mut total_data_blobs_sent = 0;
let mut total_coding_blobs_sent = 0;
let mut num_slots_repaired = 0;
let mut rng = rand::thread_rng();
let max_confirmed_repairee_epoch =
epoch_schedule.get_stakers_epoch(repairee_epoch_slots.root);
let max_confirmed_repairee_slot =
@ -305,20 +306,38 @@ impl ClusterInfoRepairListener {
// a database iterator over the slots because by the time this node is
// sending the blobs in this slot for repair, we expect these slots
// to be full.
if let Some(blob_data) = blocktree
.get_data_blob_bytes(slot, blob_index as u64)
if let Some(data_blob) = blocktree
.get_data_blob(slot, blob_index as u64)
.expect("Failed to read data blob from blocktree")
{
socket.send_to(&blob_data[..], repairee_tvu)?;
socket.send_to(&data_blob.data[..], repairee_tvu)?;
total_data_blobs_sent += 1;
}
if let Some(coding_bytes) = blocktree
.get_coding_blob_bytes(slot, blob_index as u64)
.expect("Failed to read coding blob from blocktree")
{
socket.send_to(&coding_bytes[..], repairee_tvu)?;
total_coding_blobs_sent += 1;
if let Some(coding_header) = data_blob.get_coding_header() {
let ratio = std::cmp::max(
1,
coding_header.parity_count / coding_header.data_count,
);
for _ in 0..ratio {
let chosen_index =
rng.gen_range(0, coding_header.parity_count as u64);
let coding_blob_opt = blocktree
.get_coding_blob(
slot,
coding_header.set_index,
chosen_index,
)
.expect("Failed to read coding blob from blocktree");
if let Some(coding_blob) = coding_blob_opt {
socket.send_to(&coding_blob.data[..], repairee_tvu)?;
total_coding_blobs_sent += 1;
}
}
}
}
}

View File

@ -41,237 +41,261 @@
//!
//!
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use std::cmp;
use std::convert::AsMut;
use std::sync::{Arc, RwLock};
use std::{
borrow::BorrowMut,
sync::{Arc, RwLock},
};
use reed_solomon_erasure::ReedSolomon;
//TODO(sakridge) pick these values
/// Number of data blobs
pub const NUM_DATA: usize = 8;
/// Number of coding blobs; also the maximum number that can go missing.
pub const NUM_CODING: usize = 8;
/// Total number of blobs in an erasure set; includes data and coding blobs
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
/// Max number of data blobs in an erasure set; Also max number of parity blobs.
pub const MAX_SET_SIZE: usize = 255;
type Result<T> = std::result::Result<T, reed_solomon_erasure::Error>;
/// Represents an erasure "session" with a particular configuration and number of data and coding
/// blobs
#[derive(Debug, Clone)]
pub struct Session(ReedSolomon);
/// Generates coding blobs on demand given data blobs
#[derive(Debug, Clone)]
pub struct CodingGenerator {
/// SharedBlobs that couldn't be used in last call to next()
leftover: Vec<SharedBlob>,
session: Arc<Session>,
/// This struct is stored in the header of any data blob that has been encoded
/// Every coding blob contains it.
#[derive(Clone, Copy, Default, Debug, PartialEq, Serialize, Deserialize)]
pub struct CodingHeader {
/// Index of the first data blob in the set
pub start_index: u64,
/// Index of the erasure set. Slot relative.
pub set_index: u64,
/// Number of data blobs in the set.
pub data_count: usize,
/// Number of parity blobs in the set.
pub parity_count: usize,
/// Size of the largest data blob in the set, including the header.
pub shard_size: usize,
}
impl Session {
pub fn new(data_count: usize, coding_count: usize) -> Result<Session> {
let rs = ReedSolomon::new(data_count, coding_count)?;
Ok(Session(rs))
impl CodingHeader {
/// returns the set-relative index of the blob with the given index.
pub fn data_index_in_set(&self, index: u64) -> u64 {
index - self.start_index
}
/// Create coding blocks by overwriting `parity`
pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> {
self.0.encode_sep(data, parity)?;
Ok(())
/// returns the set-relative index of the coding blob with the given index.
/// in the context of erasure/recovery coding blobs come after data-blobs.
pub fn coding_index_in_set(&self, index: u64) -> u64 {
index + self.data_count as u64
}
/// Recover data + coding blocks into data blocks
/// # Arguments
/// * `data` - array of data blocks to recover into
/// * `coding` - array of coding blocks
/// * `erasures` - list of indices in data where blocks should be recovered
pub fn decode_blocks(&self, blocks: &mut [&mut [u8]], present: &[bool]) -> Result<()> {
self.0.reconstruct(blocks, present)?;
Ok(())
/// returns the end boundary indexes of the data and coding blobs in this set, respectively.
pub fn end_indexes(&self) -> (u64, u64) {
let start = self.start_index;
(start + self.data_count as u64, self.parity_count as u64)
}
}
/// Returns `(number_of_data_blobs, number_of_coding_blobs)`
pub fn dimensions(&self) -> (usize, usize) {
(self.0.data_shard_count(), self.0.parity_shard_count())
}
/// Erasure code data blobs.
///
/// # Arguments
///
/// * `slot` - slot all blobs belong to
/// * `set_index` - index of the erasure set being encoded
/// * `start_index` - index of the first data blob
/// * `blobs` - data blobs to be encoded. an amount greater than `MAX_SET_SIZE` causes errors.
/// * `parity` - number of parity blobs to create. values greater than `MAX_SET_SIZE` cause errors.
pub fn encode<B: BorrowMut<Blob>>(
slot: u64,
set_index: u64,
start_index: u64,
blobs: &mut [B],
parity: usize,
) -> Result<Vec<Blob>> {
let data = blobs.len();
// this would fail if there are too few or too many blobs
let rs = ReedSolomon::new(data, parity)?;
let mut header = CodingHeader {
data_count: data,
parity_count: parity,
start_index,
set_index,
shard_size: 0,
};
/// Reconstruct any missing blobs in this erasure set if possible
/// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata
/// Assumes that the user has sliced into the blobs appropriately already. else recovery will
/// return an error or garbage data
pub fn reconstruct_blobs<B>(
&self,
blobs: &mut [B],
present: &[bool],
size: usize,
block_start_idx: u64,
slot: u64,
) -> Result<(Vec<Blob>, Vec<Blob>)>
where
B: AsMut<[u8]>,
{
let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect();
let shard_size = blobs
.iter_mut()
.map(|blob| (*blob).borrow().data_size() as usize)
.max()
.expect("must be >=1 blobs");
trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,);
//header.shard_size = crate::packet::BLOB_DATA_SIZE;
header.shard_size = shard_size;
// Decode the blocks
self.decode_blocks(blocks.as_mut_slice(), &present)?;
let slices = blobs
.iter_mut()
.map(|b| {
let blob: &mut Blob = b.borrow_mut();
blob.set_coding_header(&header);
&blob.data[..shard_size]
})
.collect::<Vec<_>>();
let mut recovered_data = vec![];
let mut recovered_coding = vec![];
let mut parity_blocks = (0..parity).map(|_| vec![0; shard_size]).collect::<Vec<_>>();
let mut parity_slices = parity_blocks
.iter_mut()
.map(|v| &mut v[..])
.collect::<Vec<_>>();
let erasures = present
.iter()
.enumerate()
.filter_map(|(i, present)| if *present { None } else { Some(i) });
rs.encode_sep(&slices[..], &mut parity_slices[..])?;
// Create the missing blobs from the reconstructed data
for n in erasures {
let data_size;
let idx;
let first_byte;
let parity = parity_blocks
.into_iter()
.enumerate()
.map(|(idx, block)| {
let mut blob = Blob::default();
(&mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + shard_size])
.copy_from_slice(&block);
blob.set_slot(slot);
blob.set_size(shard_size - BLOB_HEADER_SIZE);
//blob.set_data_size(shard_size as u64);
blob.set_coding();
blob.set_coding_header(&header);
blob.set_index(idx as u64);
if n < NUM_DATA {
let mut blob = Blob::new(&blocks[n]);
blob
})
.collect();
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
idx = n as u64 + block_start_idx;
first_byte = blob.data[0];
Ok(parity)
}
blob.set_size(data_size);
recovered_data.push(blob);
} else {
let mut blob = Blob::default();
blob.data_mut()[..size].copy_from_slice(&blocks[n]);
data_size = size;
idx = (n as u64 + block_start_idx) - NUM_DATA as u64;
first_byte = blob.data[0];
/// See `encode`.
/// Convenience function to encode and return `Arc<RwLock<Blob>>`s
pub fn encode_shared(
slot: u64,
set_index: u64,
start_index: u64,
blobs: &[SharedBlob],
parity: usize,
) -> Result<Vec<SharedBlob>> {
let mut locks = blobs
.iter()
.map(|shared_blob| shared_blob.write().unwrap())
.collect::<Vec<_>>();
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
recovered_coding.push(blob);
}
let mut blobs = locks.iter_mut().map(|lock| &mut **lock).collect::<Vec<_>>();
trace!(
"[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}",
n,
idx,
data_size,
first_byte
let parity_blobs = encode(slot, set_index, start_index, &mut blobs[..], parity)?
.into_iter()
.map(|blob| Arc::new(RwLock::new(blob)))
.collect();
Ok(parity_blobs)
}
/// Attempt to recover missing blobs
/// # Arguments
/// * `info` - the encoding parameters for this erasure set
/// * `slot` - the slot that these blobs belong to
/// * `blobs` - data blobs, followed by parity blobs. blobs must be in order or the recovery will
/// succeed but return garbage.
/// * `present` - each element indicates the presence of the blob with the same set-relative index
pub fn decode<B>(
info: &CodingHeader,
slot: u64,
blobs: &mut [B],
present: &[bool],
) -> Result<(Vec<Blob>, Vec<Blob>)>
where
B: BorrowMut<Blob>,
{
let rs = ReedSolomon::new(info.data_count as usize, info.parity_count as usize)?;
let mut blocks = vec![];
for (idx, blob) in blobs.iter_mut().enumerate() {
if idx < info.data_count {
blocks.push(&mut blob.borrow_mut().data[..info.shard_size as usize]);
} else {
blocks.push(
&mut blob.borrow_mut().data
[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + info.shard_size as usize],
);
}
Ok((recovered_data, recovered_coding))
}
assert_eq!(
blocks.len(),
rs.data_shard_count() + rs.parity_shard_count()
);
rs.reconstruct(&mut blocks[..], present)?;
let mut recovered_data = vec![];
let mut recovered_coding = vec![];
let erasures = present
.iter()
.enumerate()
.filter_map(|(i, present)| if *present { None } else { Some(i) });
let shard_size = info.shard_size as usize;
// Create the missing blobs from the reconstructed data
for n in erasures {
let data_size;
let idx;
let first_byte;
if n < info.data_count {
let mut blob: Box<Blob> = Box::default();
(&mut blob.data[..shard_size]).copy_from_slice(&blocks[n]);
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
idx = n as u64 + info.start_index;
first_byte = blob.data[0];
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
blob.set_coding_header(info);
recovered_data.push(*blob);
} else {
let mut blob = Blob::default();
(&mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + shard_size])
.copy_from_slice(&blocks[n]);
data_size = shard_size;
idx = (n - info.data_count) as u64;
first_byte = blob.data[0];
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
blob.set_coding_header(info);
recovered_coding.push(blob);
}
trace!(
"[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}",
n,
idx,
data_size,
first_byte
);
}
Ok((recovered_data, recovered_coding))
}
impl CodingGenerator {
pub fn new(session: Arc<Session>) -> Self {
CodingGenerator {
leftover: Vec::with_capacity(session.0.data_shard_count()),
session,
}
}
/// See `decode`
/// Convenience function to accept and return `Arc<RwLock<Blob>>`s
pub fn decode_shared(
info: &CodingHeader,
slot: u64,
blobs: &[SharedBlob],
present: &[bool],
) -> Result<(Vec<Blob>, Vec<Blob>)> {
let mut locks = blobs
.iter()
.map(|shared_blob| shared_blob.write().unwrap())
.collect::<Vec<_>>();
/// Yields next set of coding blobs, if any.
/// Must be called with consecutive data blobs within a slot.
///
/// Passing in a slice with the first blob having a new slot will cause internal state to
/// reset, so the above concern does not apply to slot boundaries, only indexes within a slot
/// must be consecutive.
///
/// If used improperly, it my return garbage coding blobs, but will not give an
/// error.
pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec<SharedBlob> {
let (num_data, num_coding) = self.session.dimensions();
let mut next_coding =
Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding);
let mut blobs = locks.iter_mut().map(|lock| &mut **lock).collect::<Vec<_>>();
if !self.leftover.is_empty()
&& !next_data.is_empty()
&& self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot()
{
self.leftover.clear();
}
let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
for data_blobs in next_data.chunks(num_data) {
if data_blobs.len() < num_data {
self.leftover = data_blobs.to_vec();
break;
}
self.leftover.clear();
// find max_data_size for the erasure set
let max_data_size = data_blobs
.iter()
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max));
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
let data_ptrs: Vec<_> = data_locks
.iter()
.map(|l| &l.data[..max_data_size])
.collect();
let mut coding_blobs = Vec::with_capacity(num_coding);
for data_blob in &data_locks[..num_coding] {
let index = data_blob.index();
let slot = data_blob.slot();
let id = data_blob.id();
let mut coding_blob = Blob::default();
coding_blob.set_index(index);
coding_blob.set_slot(slot);
coding_blob.set_id(&id);
coding_blob.set_size(max_data_size);
coding_blob.set_coding();
coding_blobs.push(coding_blob);
}
if {
let mut coding_ptrs: Vec<_> = coding_blobs
.iter_mut()
.map(|blob| &mut blob.data_mut()[..max_data_size])
.collect();
self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice())
}
.is_ok()
{
next_coding.append(&mut coding_blobs);
}
}
next_coding
.into_iter()
.map(|blob| Arc::new(RwLock::new(blob)))
.collect()
}
}
impl Default for Session {
fn default() -> Session {
Session::new(NUM_DATA, NUM_CODING).unwrap()
}
}
impl Default for CodingGenerator {
fn default() -> Self {
let session = Session::default();
CodingGenerator {
leftover: Vec::with_capacity(session.0.data_shard_count()),
session: Arc::new(session),
}
}
decode(info, slot, &mut blobs[..], present)
}
#[cfg(test)]
@ -285,6 +309,10 @@ pub mod test {
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::borrow::Borrow;
const NUM_DATA: usize = 8;
const NUM_CODING: usize = 9;
const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
/// Specifies the contents of a 16-data-blob and 4-coding-blob erasure set
/// Exists to be passed to `generate_blocktree_with_coding`
#[derive(Debug, Copy, Clone)]
@ -293,6 +321,8 @@ pub mod test {
pub set_index: u64,
pub num_data: usize,
pub num_coding: usize,
pub data_count: usize,
pub parity_count: usize,
}
/// Specifies the contents of a slot
@ -318,107 +348,54 @@ pub mod test {
pub start_index: u64,
pub coding: Vec<SharedBlob>,
pub data: Vec<SharedBlob>,
pub data_count: usize,
pub parity_count: usize,
}
#[test]
fn test_coding() {
const N_DATA: usize = 4;
const N_CODING: usize = 2;
let session = Session::new(N_DATA, N_CODING).unwrap();
let mut vs: Vec<Vec<u8>> = (0..N_DATA as u8).map(|i| (i..(16 + i)).collect()).collect();
let v_orig: Vec<u8> = vs[0].clone();
let mut coding_blocks: Vec<_> = (0..N_CODING).map(|_| vec![0u8; 16]).collect();
let mut coding_blocks_slices: Vec<_> =
coding_blocks.iter_mut().map(Vec::as_mut_slice).collect();
let v_slices: Vec<_> = vs.iter().map(Vec::as_slice).collect();
session
.encode(v_slices.as_slice(), coding_blocks_slices.as_mut_slice())
.expect("encoding must succeed");
trace!("test_coding: coding blocks:");
for b in &coding_blocks {
trace!("test_coding: {:?}", b);
}
let erasure: usize = 1;
let present = &mut [true; N_DATA + N_CODING];
present[erasure] = false;
let erased = vs[erasure].clone();
// clear an entry
vs[erasure as usize].copy_from_slice(&[0; 16]);
let mut blocks: Vec<_> = vs
.iter_mut()
.chain(coding_blocks.iter_mut())
.map(Vec::as_mut_slice)
.collect();
session
.decode_blocks(blocks.as_mut_slice(), present)
.expect("decoding must succeed");
trace!("test_coding: vs:");
for v in &vs {
trace!("test_coding: {:?}", v);
}
assert_eq!(v_orig, vs[0]);
assert_eq!(erased, vs[erasure]);
}
fn test_toss_and_recover(
session: &Session,
data_blobs: &[SharedBlob],
coding_blobs: &[SharedBlob],
block_start_idx: usize,
) {
let size = coding_blobs[0].read().unwrap().size();
fn test_toss_and_recover(slot: u64, data_blobs: &[SharedBlob], coding_blobs: &[SharedBlob]) {
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE);
blobs.push(SharedBlob::default()); // empty data, erasure at zero
for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] {
for blob in &data_blobs[1..] {
// skip first blob
blobs.push(blob.clone());
}
blobs.push(SharedBlob::default()); // empty coding, erasure at zero
for blob in &coding_blobs[1..NUM_CODING] {
for blob in &coding_blobs[1..] {
blobs.push(blob.clone());
}
let info = coding_blobs[0]
.read()
.unwrap()
.get_coding_header()
.expect("coding info");
// toss one data and one coding
let mut present = vec![true; blobs.len()];
present[0] = false;
present[NUM_DATA] = false;
present[data_blobs.len()] = false;
let (recovered_data, recovered_coding) = session
.reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0)
.expect("reconstruction must succeed");
let (recovered_data, recovered_coding) =
decode_shared(&info, slot, &mut blobs[..], &present)
.expect("reconstruction must succeed");
assert_eq!(recovered_data.len(), 1);
assert_eq!(recovered_coding.len(), 1);
assert_eq!(
blobs[1].read().unwrap().meta,
data_blobs[block_start_idx + 1].read().unwrap().meta
data_blobs[1].read().unwrap().meta
);
assert_eq!(
blobs[1].read().unwrap().data(),
data_blobs[block_start_idx + 1].read().unwrap().data()
);
assert_eq!(
recovered_data[0].meta,
data_blobs[block_start_idx].read().unwrap().meta
data_blobs[1].read().unwrap().data()
);
assert_eq!(recovered_data[0].meta, data_blobs[0].read().unwrap().meta);
assert_eq!(
recovered_data[0].data(),
data_blobs[block_start_idx].read().unwrap().data()
data_blobs[0].read().unwrap().data()
);
assert_eq!(
recovered_coding[0].data(),
@ -428,72 +405,27 @@ pub mod test {
#[test]
fn test_erasure_generate_coding() {
solana_logger::setup();
// trivial case
let mut coding_generator = CodingGenerator::default();
let blobs = Vec::new();
for _ in 0..NUM_DATA * 2 {
let coding = coding_generator.next(&blobs);
assert!(coding.is_empty());
}
// test coding by iterating one blob at a time
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
let test_blobs = generate_shared_test_blobs(0, NUM_DATA * 2);
for (i, blob) in data_blobs.iter().cloned().enumerate() {
let coding_blobs = coding_generator.next(&[blob]);
for (idx, data_blobs) in test_blobs.chunks_exact(NUM_DATA).enumerate() {
let coding_blobs = encode_shared(
0,
idx as u64,
(idx * NUM_DATA) as u64,
&data_blobs[..],
NUM_CODING,
)
.unwrap();
if !coding_blobs.is_empty() {
assert_eq!(i % NUM_DATA, NUM_DATA - 1);
assert_eq!(coding_blobs.len(), NUM_CODING);
for j in 0..NUM_CODING {
assert_eq!(
coding_blobs[j].read().unwrap().index(),
((i / NUM_DATA) * NUM_DATA + j) as u64
);
}
test_toss_and_recover(
&coding_generator.session,
&data_blobs,
&coding_blobs,
i - (i % NUM_DATA),
);
}
test_toss_and_recover(0, &data_blobs, &coding_blobs);
}
}
#[test]
fn test_erasure_generate_coding_reset_on_new_slot() {
solana_logger::setup();
let mut coding_generator = CodingGenerator::default();
// test coding by iterating one blob at a time
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
for i in NUM_DATA..NUM_DATA * 2 {
data_blobs[i].write().unwrap().set_slot(1);
}
let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]);
assert!(coding_blobs.is_empty());
let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]);
assert_eq!(coding_blobs.len(), NUM_CODING);
test_toss_and_recover(
&coding_generator.session,
&data_blobs,
&coding_blobs,
NUM_DATA,
);
}
#[test]
fn test_erasure_generate_blocktree_with_coding() {
solana_logger::setup();
let cases = vec![
(NUM_DATA, NUM_CODING, 7, 5),
(NUM_DATA - 6, NUM_CODING - 1, 5, 7),
@ -509,6 +441,8 @@ pub mod test {
set_index,
num_data,
num_coding,
data_count: NUM_DATA,
parity_count: NUM_CODING,
})
.collect();
@ -523,18 +457,17 @@ pub mod test {
for erasure_spec in spec.set_specs.iter() {
let start_index = erasure_spec.set_index * NUM_DATA as u64;
let (data_end, coding_end) = (
start_index + erasure_spec.num_data as u64,
start_index + erasure_spec.num_coding as u64,
);
let data_end = start_index + erasure_spec.num_data as u64;
for idx in start_index..data_end {
let opt_bytes = blocktree.get_data_blob_bytes(slot, idx).unwrap();
assert!(opt_bytes.is_some());
}
for idx in start_index..coding_end {
let opt_bytes = blocktree.get_coding_blob_bytes(slot, idx).unwrap();
for idx in 0..erasure_spec.num_coding {
let opt_bytes = blocktree
.get_coding_blob_bytes(slot, erasure_spec.set_index, idx as u64)
.unwrap();
assert!(opt_bytes.is_some());
}
}
@ -547,10 +480,7 @@ pub mod test {
#[test]
fn test_recovery_with_model() {
use std::thread;
const MAX_ERASURE_SETS: u64 = 16;
const N_THREADS: usize = 2;
const N_SLOTS: u64 = 10;
solana_logger::setup();
@ -563,85 +493,66 @@ pub mod test {
set_index,
num_data: NUM_DATA,
num_coding: NUM_CODING,
parity_count: NUM_CODING,
data_count: NUM_DATA,
})
.collect();
SlotSpec { slot, set_specs }
});
let mut handles = vec![];
let session = Arc::new(Session::default());
for slot_model in generate_ledger_model(specs) {
for erasure_set in slot_model.chunks {
let erased_coding = erasure_set.coding[0].clone();
let erased_data = erasure_set.data[..3].to_vec();
let info = erasure_set.coding[0]
.read()
.unwrap()
.get_coding_header()
.expect("coding info");
for i in 0..N_THREADS {
let specs = specs.clone();
let session = Arc::clone(&session);
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
let handle = thread::Builder::new()
.name(i.to_string())
.spawn(move || {
for slot_model in generate_ledger_model(specs) {
for erasure_set in slot_model.chunks {
let erased_coding = erasure_set.coding[0].clone();
let erased_data = erasure_set.data[..3].to_vec();
blobs.push(SharedBlob::default());
blobs.push(SharedBlob::default());
blobs.push(SharedBlob::default());
for blob in erasure_set.data.into_iter().skip(3) {
blobs.push(blob);
}
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
blobs.push(SharedBlob::default());
for blob in erasure_set.coding.into_iter().skip(1) {
blobs.push(blob);
}
blobs.push(SharedBlob::default());
blobs.push(SharedBlob::default());
blobs.push(SharedBlob::default());
for blob in erasure_set.data.into_iter().skip(3) {
blobs.push(blob);
}
let mut present = vec![true; ERASURE_SET_SIZE];
present[0] = false;
present[1] = false;
present[2] = false;
present[NUM_DATA] = false;
blobs.push(SharedBlob::default());
for blob in erasure_set.coding.into_iter().skip(1) {
blobs.push(blob);
}
decode_shared(&info, slot_model.slot, &mut blobs, &present)
.expect("reconstruction must succeed");
let size = erased_coding.read().unwrap().size() as usize;
for (expected, recovered) in erased_data.iter().zip(blobs.iter()) {
let expected = expected.read().unwrap();
let mut recovered = recovered.write().unwrap();
let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE;
recovered.set_size(data_size);
let corrupt = data_size > BLOB_DATA_SIZE;
assert!(!corrupt, "CORRUPTION {}", data_size);
assert_eq!(expected.data(), recovered.data());
}
let mut present = vec![true; ERASURE_SET_SIZE];
present[0] = false;
present[1] = false;
present[2] = false;
present[NUM_DATA] = false;
assert_eq!(
erased_coding.read().unwrap().data(),
blobs[NUM_DATA].read().unwrap().data()
);
session
.reconstruct_shared_blobs(
&mut blobs,
&present,
size,
erasure_set.set_index * NUM_DATA as u64,
slot_model.slot,
)
.expect("reconstruction must succeed");
for (expected, recovered) in erased_data.iter().zip(blobs.iter()) {
let expected = expected.read().unwrap();
let mut recovered = recovered.write().unwrap();
let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE;
recovered.set_size(data_size);
let corrupt = data_size > BLOB_DATA_SIZE;
assert!(!corrupt, "CORRUPTION");
assert_eq!(&*expected, &*recovered);
}
assert_eq!(
erased_coding.read().unwrap().data(),
blobs[NUM_DATA].read().unwrap().data()
);
debug!("passed set: {}", erasure_set.set_index);
}
debug!("passed slot: {}", slot_model.slot);
}
})
.expect("thread build error");
handles.push(handle);
debug!("passed set: {}", erasure_set.set_index);
}
debug!("passed slot: {}", slot_model.slot);
}
handles.into_iter().for_each(|h| h.join().unwrap());
}
/// Generates a model of a ledger containing certain data and coding blobs according to a spec
@ -653,8 +564,6 @@ pub mod test {
IntoIt: Iterator<Item = S> + Clone + 'a,
S: Borrow<SlotSpec>,
{
let mut coding_generator = CodingGenerator::default();
specs.into_iter().map(move |spec| {
let spec = spec.borrow();
let slot = spec.slot;
@ -665,8 +574,10 @@ pub mod test {
.map(|erasure_spec| {
let set_index = erasure_spec.set_index as usize;
let start_index = set_index * NUM_DATA;
let (parity_count, data_count) =
(erasure_spec.parity_count, erasure_spec.data_count);
let mut blobs = generate_test_blobs(0, NUM_DATA);
let mut blobs = generate_shared_test_blobs(0, data_count);
index_blobs(
&blobs,
&Keypair::new().pubkey(),
@ -675,7 +586,14 @@ pub mod test {
0,
);
let mut coding_blobs = coding_generator.next(&blobs);
let mut coding_blobs = encode_shared(
slot,
set_index as u64,
start_index as u64,
&blobs,
parity_count,
)
.unwrap();
blobs.drain(erasure_spec.num_data..);
coding_blobs.drain(erasure_spec.num_coding..);
@ -685,6 +603,8 @@ pub mod test {
set_index: set_index as u64,
data: blobs,
coding: coding_blobs,
parity_count,
data_count,
}
})
.collect();
@ -711,6 +631,7 @@ pub mod test {
blocktree
.put_coding_blob_bytes_raw(
slot,
erasure_set.set_index,
blob.index(),
&blob.data[..blob.size() + BLOB_HEADER_SIZE],
)
@ -722,16 +643,7 @@ pub mod test {
blocktree
}
// fn verify_test_blobs(offset: usize, blobs: &[SharedBlob]) -> bool {
// let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
//
// blobs.iter().enumerate().all(|(i, blob)| {
// let blob = blob.read().unwrap();
// blob.index() as usize == i + offset && blob.data() == &data[..]
// })
// }
//
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
pub fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<Blob> {
let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
let blobs: Vec<_> = (0..num_blobs)
@ -748,35 +660,26 @@ pub mod test {
index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0);
blobs
.into_iter()
.map(|shared| shared.read().unwrap().clone())
.collect()
}
impl Session {
fn reconstruct_shared_blobs(
&self,
blobs: &mut [SharedBlob],
present: &[bool],
size: usize,
block_start_idx: u64,
slot: u64,
) -> Result<(Vec<Blob>, Vec<Blob>)> {
let mut locks: Vec<std::sync::RwLockWriteGuard<_>> = blobs
.iter()
.map(|shared_blob| shared_blob.write().unwrap())
.collect();
pub fn generate_shared_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
let mut slices: Vec<_> = locks
.iter_mut()
.enumerate()
.map(|(i, blob)| {
if i < NUM_DATA {
&mut blob.data[..size]
} else {
&mut blob.data_mut()[..size]
}
})
.collect();
let blobs: Vec<_> = (0..num_blobs)
.into_iter()
.map(|_| {
let mut blob = Blob::default();
blob.data_mut().copy_from_slice(&data);
blob.set_size(data.len());
Arc::new(RwLock::new(blob))
})
.collect();
self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot)
}
index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0);
blobs
}
}

View File

@ -1,6 +1,9 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use crate::result::{Error, Result};
use crate::{
erasure::CodingHeader,
recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
result::{Error, Result},
};
use bincode;
use byteorder::{ByteOrder, LittleEndian};
use serde::Serialize;
@ -333,7 +336,7 @@ pub fn packets_to_blobs<T: Borrow<Packet>>(packets: &[T]) -> Vec<Blob> {
}
macro_rules! range {
($prev:expr, $type:ident) => {
($prev:expr, $type:ty) => {
$prev..$prev + size_of::<$type>()
};
}
@ -343,17 +346,13 @@ const FORWARDED_RANGE: std::ops::Range<usize> = range!(SIGNATURE_RANGE.end, bool
const PARENT_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, u64);
const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
const CODING_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Option<CodingHeader>);
const ID_RANGE: std::ops::Range<usize> = range!(CODING_RANGE.end, Pubkey);
const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32);
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
macro_rules! align {
($x:expr, $align:expr) => {
$x + ($align - 1) & !($align - 1)
};
}
pub const BLOB_HEADER_SIZE: usize = SIZE_RANGE.end;
pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure
pub const SIGNABLE_START: usize = PARENT_RANGE.start;
pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2;
@ -422,6 +421,14 @@ impl Blob {
self.data[ID_RANGE].copy_from_slice(id.as_ref())
}
pub fn set_coding_header(&mut self, header: &CodingHeader) {
bincode::serialize_into(&mut self.data[CODING_RANGE], &Some(*header)).unwrap();
}
pub fn get_coding_header(&self) -> Option<CodingHeader> {
bincode::deserialize(&self.data[CODING_RANGE]).unwrap()
}
/// Used to determine whether or not this blob should be forwarded in retransmit
/// A bool is used here instead of a flag because this item is not intended to be signed when
/// blob signatures are introduced
@ -468,10 +475,10 @@ impl Blob {
}
pub fn data(&self) -> &[u8] {
&self.data[BLOB_HEADER_SIZE..]
&self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE]
}
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[BLOB_HEADER_SIZE..]
&mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE]
}
pub fn size(&self) -> usize {
let size = self.data_size() as usize;

View File

@ -4,7 +4,7 @@
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::packet::{Blob, SharedBlob};
use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
@ -62,19 +62,14 @@ pub fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result
}
}))?;
for blob in blobs {
// TODO: Once the original leader signature is added to the blob, make sure that
// the blob was originally generated by the expected leader for this slot
// Insert the new blob into block tree
blocktree.put_many_coding_blobs(blobs.iter().filter_map(move |blob| {
if blob.is_coding() {
blocktree.put_coding_blob_bytes(
blob.slot(),
blob.index(),
&blob.data[..BLOB_HEADER_SIZE + blob.size()],
)?;
Some(&**blob)
} else {
None
}
}
}))?;
Ok(())
}